diff --git a/.claude/settings.local.json b/.claude/settings.local.json index 82f8eef..8b4d58d 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -8,7 +8,8 @@ "Bash(sed -n '523,535p' \"C:\\\\src\\\\Galabau\\\\glb-spielwiese\\\\database\\\\packages\\\\pck_net_storage.pkb\")", "Bash(sed -n '582,600p' \"C:\\\\src\\\\Galabau\\\\glb-spielwiese\\\\database\\\\packages\\\\pck_net_storage.pkb\")", "WebFetch(domain:docs.public.oneportal.content.oci.oraclecloud.com)", - "Bash(./mvnw compile *)" + "Bash(./mvnw compile *)", + "WebFetch(domain:medium.com)" ] } } diff --git a/quarkus-automaton/docs/Plan.md b/quarkus-automaton/docs/Plan.md index daca2c4..0862d6c 100644 --- a/quarkus-automaton/docs/Plan.md +++ b/quarkus-automaton/docs/Plan.md @@ -110,7 +110,7 @@ quarkus-automaton/ | `sftp-download` | `SftpService` | SSHJ | Lädt ZIP in lokales Arbeitsverzeichnis | | `zip-extract` | `ZipExtractionService` | Apache Commons Compress | Entpackt ZIP, preserviert Ordnerstruktur | | `oci-upload` | `OciUploadService` | OCI SDK | Lädt Dateien + Marker zu OCI Object Storage | -| `sftp-rename` | `SftpService` | SSHJ | Remote-Rename zu `.processed` oder `.error` | +| `sftp-rename` | `SftpService` | SSHJ | Remote-Rename zu `.processed` (bei Erfolg) oder `.error` (nur bei ungültiger ZIP) | | `ords-notify` | `OrdsNotificationService` | MicroProfile REST Client | Ruft ORDS-Endpunkt auf | | `cleanup` | `FileProcessingPipeline` | pure Java | Löscht lokale Arbeitsdateien (ZIP + entpackte Dateien) | @@ -244,14 +244,14 @@ n8n fire-and-forget-Verhalten. ### Fehlerklassen -| Fehler | Typ | Retry | Verhalten | +| Fehler | Typ | Umbenennung | Verhalten | |---|---|---|---| -| SFTP-Verbindung fehlgeschlagen | transient | nein | Nächster APEX-Lauf (1h) versucht es | -| ZIP beschädigt | persistent | nein | ZIP auf SFTP umbenennen zu `.error`, Log | -| OCI-Verbindung fehlgeschlagen (z.B. 503) | transient | ja (exponential backoff) | @Retry | -| OCI-Upload einer Datei schlägt fehl | persistent | nein | SFTP-Rename zu `.error`, Log — bereits hochgeladene OCI-Dateien bleiben (idempotent) | -| ORDS-Aufruf schlägt fehl | transient | ja (2-3x) | Marker liegt vor → APEX Automation schlägt beim nächsten Lauf ein | -| Allgemein technischer Fehler | fallabhängig | siehe SmallRye Fault Tolerance | Exception-Log | +| SFTP-Verbindung / Download fehlgeschlagen | transient | keine | Datei bleibt auf SFTP — nächster APEX-Lauf (1h) versucht es | +| ZIP beschädigt / ungültig | persistent | → `.error` | Datei ist defekt, manuelle Prüfung nötig | +| OCI-Verbindung fehlgeschlagen | transient | keine | Datei bleibt auf SFTP — nächster Lauf versucht erneut (OCI PUT idempotent) | +| SFTP-Rename zu `.processed` fehlgeschlagen | transient | keine | ORDS wurde noch nicht aufgerufen (kommt danach) — kein Doppelimport; nächster Lauf wiederholt den Schritt | +| ORDS-Aufruf schlägt fehl | transient | keine (`.processed` bereits gesetzt) | Marker liegt in OCI vor — APEX Automation findet ihn beim nächsten Lauf | +| Unerwarteter Laufzeitfehler | fallabhängig | keine | Exception wird geloggt, Datei bleibt auf SFTP | ### Retry-Strategie (SmallRye Fault Tolerance) @@ -309,17 +309,23 @@ Credentials, Fehlerbehandlung). Pipeline.processAll(): 1. SftpService.listZipFiles() → ["export_2026-04-08.zip", ...] 2. für jede ZIP: - a. SftpService.download(zip) → lokale Datei - b. ZipExtractionService.extract() → ProcessingContext mit FileEntry-Liste - c. OciUploadService.upload() → Dateien + Marker in OCI - d. SftpService.renameRemote(.processed oder .error) - e. OrdsNotificationService.notify() + a. SftpService.download(zip) → lokale Datei + b. ZipExtractionService.extract() → ProcessingContext mit FileEntry-Liste + ↳ ZipException → Rename zu .error, Abbruch + c. OciUploadService.uploadFiles() → Dateien in OCI (noch kein Marker) + d. SftpService.renameRemote(.processed) + e. OciUploadService.uploadMarker() → Marker in OCI (erst nach Rename — siehe Invariante) + f. OrdsNotificationService.notify() f. cleanup: lokale ZIP + Entpack-Verzeichnis löschen ← immer, auch bei Fehler ``` **Cleanup (Schritt f) läuft immer** — in einem `finally`-Block — damit kein Disk-Vollaufen bei Fehlern oder großen ZIPs. +**Umbenennung zu `.error`** erfolgt ausschließlich bei `ZipException` (defekte/ungültige Datei). +Bei Infrastrukturfehlern (SFTP, OCI, ORDS) bleibt die Datei unverändert auf dem SFTP und wird +beim nächsten Lauf automatisch erneut verarbeitet. + --- ## OCI-Authentifizierung (SimpleAuthenticationDetailsProvider) @@ -343,11 +349,13 @@ public class OciUploadService { .tenantId(config.tenancyId()) .userId(config.userId()) .fingerprint(config.fingerprint()) - .region(Region.fromRegionId(config.region())) - .privateKeySupplier(new FilePrivateKeySupplier(config.privateKeyPath())) + .privateKeySupplier(() -> Files.newInputStream(Path.of(config.privateKeyPath()))) .build(); - this.client = ObjectStorageClient.builder().build(auth); + // Endpoint explizit setzen — verhindert blockierenden HTTP-Discovery-Call im SDK + client = ObjectStorageClient.builder() + .endpoint("https://objectstorage." + config.region() + ".oraclecloud.com") + .build(auth); } } ``` @@ -452,6 +460,12 @@ public class ProcessIncomingRequest { oci-java-sdk-objectstorage 3.44.0 + + + com.oracle.oci.sdk + oci-java-sdk-common-httpclient-jersey3 + 3.44.0 + diff --git a/quarkus-automaton/pom.xml b/quarkus-automaton/pom.xml index f175e51..0dddb51 100644 --- a/quarkus-automaton/pom.xml +++ b/quarkus-automaton/pom.xml @@ -82,6 +82,12 @@ oci-java-sdk-objectstorage 3.44.0 + + + com.oracle.oci.sdk + oci-java-sdk-common-httpclient-jersey3 + 3.44.0 + diff --git a/quarkus-automaton/src/main/java/de/galabau/dateieingang/oci/OciUploadService.java b/quarkus-automaton/src/main/java/de/galabau/dateieingang/oci/OciUploadService.java index d7ec843..0134b28 100644 --- a/quarkus-automaton/src/main/java/de/galabau/dateieingang/oci/OciUploadService.java +++ b/quarkus-automaton/src/main/java/de/galabau/dateieingang/oci/OciUploadService.java @@ -38,7 +38,6 @@ public class OciUploadService { .tenantId(config.tenancyId()) .userId(config.userId()) .fingerprint(config.fingerprint()) - .region(com.oracle.bmc.Region.fromRegionId(config.region())) .privateKeySupplier(() -> { try { return Files.newInputStream(Path.of(config.privateKeyPath())); @@ -48,22 +47,23 @@ public class OciUploadService { } }) .build(); - - Log.info("Authentifizierung..."); - this.client = ObjectStorageClient.builder().build(auth); + Log.info("Auhtentifizierung..."); + client = ObjectStorageClient.builder() + .endpoint("https://objectstorage." + config.region() + ".oraclecloud.com") + .build(auth); Log.infof("OCI ObjectStorage-Client initialisiert (Region: %s, Bucket: %s)", config.region(), config.bucket()); } /** - * Lädt alle Dateien aus {@code context.extractedFiles} sowie den Marker in OCI hoch. - * Dateien mit {@code isMarker = true} werden übersprungen — der Marker wird separat - * am Ende hochgeladen, um sicherzustellen dass er erst nach allen Dateien erscheint. + * Lädt alle Nutzdateien aus {@code context.extractedFiles} in OCI hoch — ohne Marker. + * Der Marker wird erst nach dem SFTP-Rename zu {@code .processed} gesetzt (siehe + * {@link #uploadMarker}), damit APEX Automation den Batch nie verarbeitet bevor die + * ZIP-Datei auf dem SFTP als verarbeitet markiert ist. * * @param context enthält die Liste der hochzuladenden Dateien und den Ziel-Prefix * @throws OciException bei Verbindungs- oder Upload-Fehlern */ - public void upload(ProcessingContext context) throws OciException { - Log.info("OCI Upload gestartet."); + public void uploadFiles(ProcessingContext context) throws OciException { List files = context.extractedFiles.stream() .filter(e -> !e.isMarker) .toList(); @@ -77,13 +77,24 @@ public class OciUploadService { Log.infof("Datei hochgeladen: %s (%d Bytes)", key, entry.fileSize); } + Log.infof("OCI-Upload Dateien abgeschlossen: %d Datei(en) in '%s'", + files.size(), buildPrefix(context.zipNameWithoutExt)); + } + + /** + * Setzt den Marker in OCI — signalisiert der DB-Verarbeitung, dass der Batch vollständig ist. + * Wird erst nach dem SFTP-Rename zu {@code .processed} aufgerufen, damit Marker und + * SFTP-Zustand immer konsistent sind: Marker vorhanden ↔ ZIP bereits als verarbeitet markiert. + * + * @param context enthält den Ziel-Prefix für den Marker-Key + * @throws OciException bei Verbindungs- oder Upload-Fehlern + */ + public void uploadMarker(ProcessingContext context) throws OciException { String markerKey = buildKey(context.zipNameWithoutExt, config.markerFilenameDbProcessing()); Log.infof("Lade Marker hoch: '%s'", markerKey); - uploadMarker(markerKey); - + putMarker(markerKey); context.markerUploaded = true; - Log.infof("OCI-Upload abgeschlossen: %d Datei(en) + Marker in '%s'", - files.size(), buildPrefix(context.zipNameWithoutExt)); + Log.infof("Marker hochgeladen: '%s'", markerKey); } private String buildPrefix(String zipNameWithoutExt) { @@ -108,7 +119,7 @@ public class OciUploadService { } } - private void uploadMarker(String key) throws OciException { + private void putMarker(String key) throws OciException { try (InputStream is = InputStream.nullInputStream()) { client.putObject(PutObjectRequest.builder() .namespaceName(config.namespace()) diff --git a/quarkus-automaton/src/main/java/de/galabau/dateieingang/pipeline/FileProcessingPipeline.java b/quarkus-automaton/src/main/java/de/galabau/dateieingang/pipeline/FileProcessingPipeline.java index 164b082..7cd676f 100644 --- a/quarkus-automaton/src/main/java/de/galabau/dateieingang/pipeline/FileProcessingPipeline.java +++ b/quarkus-automaton/src/main/java/de/galabau/dateieingang/pipeline/FileProcessingPipeline.java @@ -69,6 +69,8 @@ public class FileProcessingPipeline { executor.submit(() -> { try { processAll(); + } catch (Exception e) { + Log.errorf(e, "Unerwarteter Fehler im Pipeline-Lauf"); } finally { isRunning.set(false); } @@ -91,6 +93,7 @@ public class FileProcessingPipeline { if (zipFiles.isEmpty()) { Log.info("Keine neuen ZIP-Dateien auf dem SFTP-Server gefunden"); + Log.info("Pipeline-Lauf abgeschlossen"); return; } @@ -122,17 +125,24 @@ public class FileProcessingPipeline { Log.infof("ZIP '%s' entpackt: %d Datei(en)", zipFilename, context.extractedFiles.size()); - // --- OCI Upload --- + // --- OCI Upload (Dateien, noch kein Marker) --- MDC.put("step", "oci-upload"); context.status = ProcessingStatus.PARTIALLY_UPLOADED; Log.info("Starte OCI-Upload"); - ociUploadService.upload(context); - context.status = ProcessingStatus.MARKER_UPLOADED; + ociUploadService.uploadFiles(context); // --- SFTP Rename → .processed --- + // Erst nach erfolgreichem Datei-Upload — Marker kommt danach, + // damit Marker-Präsenz in OCI ↔ ZIP bereits .processed auf SFTP. MDC.put("step", "sftp-rename"); sftpService.renameFile(zipFilename, zipFilename + ".processed"); + // --- OCI Marker --- + // Signalisiert der DB-Verarbeitung, dass der Batch vollständig hochgeladen ist. + MDC.put("step", "oci-marker"); + ociUploadService.uploadMarker(context); + context.status = ProcessingStatus.MARKER_UPLOADED; + // --- ORDS Notify --- MDC.put("step", "ords-notify"); ordsNotificationService.triggerDbProcessing(context); @@ -140,14 +150,19 @@ public class FileProcessingPipeline { context.status = ProcessingStatus.ORDS_NOTIFIED; Log.infof("Verarbeitung erfolgreich abgeschlossen: '%s'", zipFilename); - } catch (SftpException | ZipException | OciException | OrdsException e) { - Log.errorf(e, "Verarbeitung von '%s' fehlgeschlagen: %s", zipFilename, e.getMessage()); + } catch (ZipException e) { + Log.errorf(e, "Ungültige ZIP-Datei '%s' — wird zu .error umbenannt", zipFilename); context.status = ProcessingStatus.FAILED; tryRenameToError(zipFilename); + } catch (SftpException | OciException | OrdsException e) { + Log.errorf(e, "Verarbeitung von '%s' fehlgeschlagen (Infrastruktur): %s", zipFilename, e.getMessage()); + context.status = ProcessingStatus.FAILED; } catch (IOException e) { Log.errorf(e, "I/O-Fehler bei der Verarbeitung von '%s'", zipFilename); context.status = ProcessingStatus.FAILED; - tryRenameToError(zipFilename); + } catch (RuntimeException e) { + Log.errorf(e, "Unerwarteter Laufzeitfehler bei der Verarbeitung von '%s'", zipFilename); + context.status = ProcessingStatus.FAILED; } finally { postProcessingCleanup(context); long durationSeconds = Duration.between(context.startTime, LocalDateTime.now()).toSeconds(); diff --git a/workflow_dateieingang.md b/workflow_dateieingang.md index 1aaafe8..67bc67c 100644 --- a/workflow_dateieingang.md +++ b/workflow_dateieingang.md @@ -43,10 +43,11 @@ Details zur DB-Verarbeitung: `database/docs/plan_pck_net_storage.md` │ 3c. Alle Dateien in OCI eingang// hochladen │ │ (Unterordner aus der ZIP werden beibehalten) │ │ → Fehler stoppt Verarbeitung dieser ZIP │ -│ 3d. Marker eingang//_READY_FOR_DB_PROCESSING_ │ -│ hochladen │ -│ 3e. ZIP auf SFTP umbenennen (.processed oder .error) │ -│ → erst NACH erfolgreichem Marker-Upload │ +│ 3d. ZIP auf SFTP umbenennen zu .processed │ +│ → bei ungültiger ZIP: .error (manuelle Prüfung nötig) │ +│ → bei Infrastrukturfehlern: keine Umbenennung, Retry │ +│ 3e. Marker eingang//_READY_FOR_DB_PROCESSING_ │ +│ hochladen — ERST NACH dem SFTP-Rename (siehe unten) │ │ 3f. ORDS-Endpunkt aufrufen | | (pck_auto_import.p_process_incoming_ba_data) │ │ 3g. Lokale Arbeitsdateien löschen │ @@ -97,30 +98,85 @@ Daran können die Sachbearbeiter erkennen, dass der Ordner nicht mehr automatisc ## Fehlerfall-Verhalten -**Service: Upload einer Datei schlägt fehl** -- Verarbeitung dieser ZIP stoppt sofort -- Kein Marker wird geschrieben, ZIP auf SFTP wird zu `.error` umbenannt -- ORDS wird nicht aufgerufen -- Bereits hochgeladene Dateien werden beim nächsten Trigger überschrieben (OCI PUT idempotent) +**Service: ZIP ist beschädigt oder ungültig** +- SFTP: ZIP → `.error` (manuelle Prüfung nötig) +- OCI: kein Upload, kein Marker +- DB: wird nicht aufgerufen -**Service: ORDS-Aufruf schlägt fehl** -- Marker liegt in `eingang//`, Dateien sind vollständig hochgeladen -- Beim nächsten Stundenlauf findet APEX Automation den Marker und verarbeitet +**Service: SFTP-Download fehlgeschlagen** +- SFTP: ZIP bleibt unverändert, wird beim nächsten Stundenlauf erneut versucht +- OCI: kein Upload, kein Marker +- DB: wird nicht aufgerufen + +**Service: OCI-Upload (Dateien) fehlgeschlagen** +- SFTP: ZIP bleibt unverändert, wird beim nächsten Stundenlauf erneut versucht +- OCI: teilweise hochgeladene Dateien bleiben liegen (kein Marker → DB ignoriert den Ordner); beim Retry werden sie überschrieben (OCI PUT ist idempotent) +- DB: wird nicht aufgerufen + +**Service: SFTP-Rename zu `.processed` fehlgeschlagen** +- SFTP: ZIP bleibt unverändert, wird beim nächsten Stundenlauf erneut versucht +- OCI: Dateien hochgeladen, noch kein Marker (Marker kommt erst nach dem Rename) +- DB: wird nicht aufgerufen + - beim nächsten Stundenlauf werden die Dateien aber nicht importiert, da APEX Automation ohne Marker nichts findet + - d.h. erst nachdem die ZIP Datei erneut abgearbeitet und komplett in OCI hochgeladen wurde (diesmal mit .processed-Umbennung auf SFTP & Marker in OCI) werden die Dateien abgearbeitet + + +**Service: OCI-Marker-Upload fehlgeschlagen** +- SFTP: ZIP ist bereits `.processed` — Quarkus greift sie nie wieder auf +- OCI: Dateien vollständig hochgeladen, Marker fehlt → DB-Verarbeitung wird nicht ausgelöst + - DB wird die Dateien wegen dem fehlendem Marker nie automatisiert abarbeiten, aber man sieht das recht einfach über den OCI Dateibrowser in Apex + +- DB: wird nicht aufgerufen +- **Manueller Fix:** Marker-Datei `eingang//_READY_FOR_DB_PROCESSING_` in OCI von Hand anlegen (leere Datei) — APEX Automation verarbeitet den Batch dann beim nächsten Stundenlauf + +**Service: ORDS-Aufruf fehlgeschlagen** +- SFTP: ZIP ist bereits `.processed` — Quarkus greift sie nie wieder auf +- OCI: Dateien + Marker vollständig hochgeladen +- DB: APEX Automation findet den Marker beim nächsten Stundenlauf und verarbeitet ihn (Schritt 1) — kein Doppelimport, da Quarkus die `.processed`-Datei nicht erneut verarbeitet **DB: Verarbeitung einer einzelnen Datei schlägt fehl** -- Rollback — Datei bleibt in `eingang//` -- ERROR in `lg_app_log` mit `log_object_ref = eingang//datei.csv` -- Nächste Dateien im Batch werden weiterverarbeitet +- OCI `eingang/`: Datei bleibt in `eingang//` (Rollback) +- OCI `zielordner/`: keine Änderung +- DB: Rollback, ERROR in `lg_app_log` mit `log_object_ref = eingang//datei.csv`, nächste Dateien im Batch werden weiterverarbeitet **DB: Batch-Abschluss (nach dem Datei-Loop)** -- DB-Marker (`_READY_FOR_DB_PROCESSING_`) wird **immer gelöscht** — kein automatischer Retry -- Liegen noch Dateien im Unterordner: SB-Marker (`_BITTE_PRÜFEN_`) wird angelegt → Sachbearbeiter müssen manuell eingreifen -- Alle Dateien erfolgreich: INFO in `lg_app_log`, Unterordner ist leer +- Alle Dateien erfolgreich: `eingang//` ist leer, Marker wird gelöscht +- Noch Dateien übrig: Marker wird gelöscht, SB-Marker (`_BITTE_PRÜFEN_`) wird angelegt → Sachbearbeiter müssen manuell eingreifen **DB: p_move_object schlägt nach erfolgreichem Import fehl** -- Rollback des Imports → sauberer Ausgangszustand -- Datei bleibt in `eingang//` -- DB-Marker wird trotzdem am Ende des Loops gelöscht; falls noch Dateien übrig → SB-Marker +- OCI `eingang/`: Datei bleibt in `eingang//` (Rollback des gesamten Imports) +- OCI `zielordner/`: keine Änderung +- DB: Marker wird am Ende des Loops trotzdem gelöscht; falls noch Dateien übrig → SB-Marker + +--- + +## Design-Entscheidung: Marker wird nach dem SFTP-Rename gesetzt + +Der OCI-Marker `_READY_FOR_DB_PROCESSING_` wird bewusst **nach** dem SFTP-Rename zu `.processed` +hochgeladen — nicht davor. Das erzeugt eine harte Invariante: + +> **Marker in OCI vorhanden ↔ ZIP auf SFTP bereits `.processed`** + +### Warum ist das wichtig? + +APEX Automation ruft `p_process_incoming_ba_data` in jedem Stundenlauf einmal direkt auf +(Schritt 1, Fallback), und Quarkus ruft dieselbe Funktion via ORDS auf (Schritt 3f, schneller Pfad). +Ohne die Invariante könnte folgender Race entstehen: + +1. Quarkus lädt Dateien + Marker hoch, schlägt dann beim SFTP-Rename fehl +2. APEX Schritt 1 findet den Marker → importiert Daten +3. Quarkus wiederholt den Lauf, ruft ORDS auf → zweiter Import derselben Daten + +Mit der Invariante ist dieser Fall ausgeschlossen: APEX Schritt 1 findet nur dann einen Marker, +wenn die ZIP auf dem SFTP bereits `.processed` ist. Ist sie das, greift Quarkus sie im Retry +nicht mehr an — `listZipFiles()` gibt nur `.zip`-Dateien zurück. + +### Einzig verbleibender manueller Fehlerfall + +Schlägt der Marker-Upload fehl (nach erfolgreichem SFTP-Rename), ist der Zustand eindeutig +erkennbar: `.processed` auf SFTP, Dateien in OCI ohne Marker. Manueller Fix: Marker-Datei +in OCI von Hand anlegen. Dieser Fall erfordert keine DB-seitige Idempotenz, da Quarkus +die Datei nicht erneut verarbeitet und ORDS nicht aufruft. ---