Error handling verbessert und OCI Verbindungsaufbau Problem behoben
This commit is contained in:
@@ -110,7 +110,7 @@ quarkus-automaton/
|
||||
| `sftp-download` | `SftpService` | SSHJ | Lädt ZIP in lokales Arbeitsverzeichnis |
|
||||
| `zip-extract` | `ZipExtractionService` | Apache Commons Compress | Entpackt ZIP, preserviert Ordnerstruktur |
|
||||
| `oci-upload` | `OciUploadService` | OCI SDK | Lädt Dateien + Marker zu OCI Object Storage |
|
||||
| `sftp-rename` | `SftpService` | SSHJ | Remote-Rename zu `.processed` oder `.error` |
|
||||
| `sftp-rename` | `SftpService` | SSHJ | Remote-Rename zu `.processed` (bei Erfolg) oder `.error` (nur bei ungültiger ZIP) |
|
||||
| `ords-notify` | `OrdsNotificationService` | MicroProfile REST Client | Ruft ORDS-Endpunkt auf |
|
||||
| `cleanup` | `FileProcessingPipeline` | pure Java | Löscht lokale Arbeitsdateien (ZIP + entpackte Dateien) |
|
||||
|
||||
@@ -244,14 +244,14 @@ n8n fire-and-forget-Verhalten.
|
||||
|
||||
### Fehlerklassen
|
||||
|
||||
| Fehler | Typ | Retry | Verhalten |
|
||||
| Fehler | Typ | Umbenennung | Verhalten |
|
||||
|---|---|---|---|
|
||||
| SFTP-Verbindung fehlgeschlagen | transient | nein | Nächster APEX-Lauf (1h) versucht es |
|
||||
| ZIP beschädigt | persistent | nein | ZIP auf SFTP umbenennen zu `.error`, Log |
|
||||
| OCI-Verbindung fehlgeschlagen (z.B. 503) | transient | ja (exponential backoff) | @Retry |
|
||||
| OCI-Upload einer Datei schlägt fehl | persistent | nein | SFTP-Rename zu `.error`, Log — bereits hochgeladene OCI-Dateien bleiben (idempotent) |
|
||||
| ORDS-Aufruf schlägt fehl | transient | ja (2-3x) | Marker liegt vor → APEX Automation schlägt beim nächsten Lauf ein |
|
||||
| Allgemein technischer Fehler | fallabhängig | siehe SmallRye Fault Tolerance | Exception-Log |
|
||||
| SFTP-Verbindung / Download fehlgeschlagen | transient | keine | Datei bleibt auf SFTP — nächster APEX-Lauf (1h) versucht es |
|
||||
| ZIP beschädigt / ungültig | persistent | → `.error` | Datei ist defekt, manuelle Prüfung nötig |
|
||||
| OCI-Verbindung fehlgeschlagen | transient | keine | Datei bleibt auf SFTP — nächster Lauf versucht erneut (OCI PUT idempotent) |
|
||||
| SFTP-Rename zu `.processed` fehlgeschlagen | transient | keine | ORDS wurde noch nicht aufgerufen (kommt danach) — kein Doppelimport; nächster Lauf wiederholt den Schritt |
|
||||
| ORDS-Aufruf schlägt fehl | transient | keine (`.processed` bereits gesetzt) | Marker liegt in OCI vor — APEX Automation findet ihn beim nächsten Lauf |
|
||||
| Unerwarteter Laufzeitfehler | fallabhängig | keine | Exception wird geloggt, Datei bleibt auf SFTP |
|
||||
|
||||
### Retry-Strategie (SmallRye Fault Tolerance)
|
||||
|
||||
@@ -309,17 +309,23 @@ Credentials, Fehlerbehandlung).
|
||||
Pipeline.processAll():
|
||||
1. SftpService.listZipFiles() → ["export_2026-04-08.zip", ...]
|
||||
2. für jede ZIP:
|
||||
a. SftpService.download(zip) → lokale Datei
|
||||
b. ZipExtractionService.extract() → ProcessingContext mit FileEntry-Liste
|
||||
c. OciUploadService.upload() → Dateien + Marker in OCI
|
||||
d. SftpService.renameRemote(.processed oder .error)
|
||||
e. OrdsNotificationService.notify()
|
||||
a. SftpService.download(zip) → lokale Datei
|
||||
b. ZipExtractionService.extract() → ProcessingContext mit FileEntry-Liste
|
||||
↳ ZipException → Rename zu .error, Abbruch
|
||||
c. OciUploadService.uploadFiles() → Dateien in OCI (noch kein Marker)
|
||||
d. SftpService.renameRemote(.processed)
|
||||
e. OciUploadService.uploadMarker() → Marker in OCI (erst nach Rename — siehe Invariante)
|
||||
f. OrdsNotificationService.notify()
|
||||
f. cleanup: lokale ZIP + Entpack-Verzeichnis löschen ← immer, auch bei Fehler
|
||||
```
|
||||
|
||||
**Cleanup (Schritt f) läuft immer** — in einem `finally`-Block — damit kein Disk-Vollaufen
|
||||
bei Fehlern oder großen ZIPs.
|
||||
|
||||
**Umbenennung zu `.error`** erfolgt ausschließlich bei `ZipException` (defekte/ungültige Datei).
|
||||
Bei Infrastrukturfehlern (SFTP, OCI, ORDS) bleibt die Datei unverändert auf dem SFTP und wird
|
||||
beim nächsten Lauf automatisch erneut verarbeitet.
|
||||
|
||||
---
|
||||
|
||||
## OCI-Authentifizierung (SimpleAuthenticationDetailsProvider)
|
||||
@@ -343,11 +349,13 @@ public class OciUploadService {
|
||||
.tenantId(config.tenancyId())
|
||||
.userId(config.userId())
|
||||
.fingerprint(config.fingerprint())
|
||||
.region(Region.fromRegionId(config.region()))
|
||||
.privateKeySupplier(new FilePrivateKeySupplier(config.privateKeyPath()))
|
||||
.privateKeySupplier(() -> Files.newInputStream(Path.of(config.privateKeyPath())))
|
||||
.build();
|
||||
|
||||
this.client = ObjectStorageClient.builder().build(auth);
|
||||
// Endpoint explizit setzen — verhindert blockierenden HTTP-Discovery-Call im SDK
|
||||
client = ObjectStorageClient.builder()
|
||||
.endpoint("https://objectstorage." + config.region() + ".oraclecloud.com")
|
||||
.build(auth);
|
||||
}
|
||||
}
|
||||
```
|
||||
@@ -452,6 +460,12 @@ public class ProcessIncomingRequest {
|
||||
<artifactId>oci-java-sdk-objectstorage</artifactId>
|
||||
<version>3.44.0</version>
|
||||
</dependency>
|
||||
<!-- HTTP-Provider für OCI SDK (jersey3 = Jakarta EE 9+, kompatibel mit Quarkus) -->
|
||||
<dependency>
|
||||
<groupId>com.oracle.oci.sdk</groupId>
|
||||
<artifactId>oci-java-sdk-common-httpclient-jersey3</artifactId>
|
||||
<version>3.44.0</version>
|
||||
</dependency>
|
||||
|
||||
<!-- ZIP -->
|
||||
<dependency>
|
||||
|
||||
@@ -82,6 +82,12 @@
|
||||
<artifactId>oci-java-sdk-objectstorage</artifactId>
|
||||
<version>3.44.0</version>
|
||||
</dependency>
|
||||
<!-- HTTP-Provider für OCI SDK (jersey3 = Jakarta EE 9+, kompatibel mit Quarkus) -->
|
||||
<dependency>
|
||||
<groupId>com.oracle.oci.sdk</groupId>
|
||||
<artifactId>oci-java-sdk-common-httpclient-jersey3</artifactId>
|
||||
<version>3.44.0</version>
|
||||
</dependency>
|
||||
|
||||
<!-- ZIP-Verarbeitung -->
|
||||
<dependency>
|
||||
|
||||
@@ -38,7 +38,6 @@ public class OciUploadService {
|
||||
.tenantId(config.tenancyId())
|
||||
.userId(config.userId())
|
||||
.fingerprint(config.fingerprint())
|
||||
.region(com.oracle.bmc.Region.fromRegionId(config.region()))
|
||||
.privateKeySupplier(() -> {
|
||||
try {
|
||||
return Files.newInputStream(Path.of(config.privateKeyPath()));
|
||||
@@ -48,22 +47,23 @@ public class OciUploadService {
|
||||
}
|
||||
})
|
||||
.build();
|
||||
|
||||
Log.info("Authentifizierung...");
|
||||
this.client = ObjectStorageClient.builder().build(auth);
|
||||
Log.info("Auhtentifizierung...");
|
||||
client = ObjectStorageClient.builder()
|
||||
.endpoint("https://objectstorage." + config.region() + ".oraclecloud.com")
|
||||
.build(auth);
|
||||
Log.infof("OCI ObjectStorage-Client initialisiert (Region: %s, Bucket: %s)", config.region(), config.bucket());
|
||||
}
|
||||
|
||||
/**
|
||||
* Lädt alle Dateien aus {@code context.extractedFiles} sowie den Marker in OCI hoch.
|
||||
* Dateien mit {@code isMarker = true} werden übersprungen — der Marker wird separat
|
||||
* am Ende hochgeladen, um sicherzustellen dass er erst nach allen Dateien erscheint.
|
||||
* Lädt alle Nutzdateien aus {@code context.extractedFiles} in OCI hoch — ohne Marker.
|
||||
* Der Marker wird erst nach dem SFTP-Rename zu {@code .processed} gesetzt (siehe
|
||||
* {@link #uploadMarker}), damit APEX Automation den Batch nie verarbeitet bevor die
|
||||
* ZIP-Datei auf dem SFTP als verarbeitet markiert ist.
|
||||
*
|
||||
* @param context enthält die Liste der hochzuladenden Dateien und den Ziel-Prefix
|
||||
* @throws OciException bei Verbindungs- oder Upload-Fehlern
|
||||
*/
|
||||
public void upload(ProcessingContext context) throws OciException {
|
||||
Log.info("OCI Upload gestartet.");
|
||||
public void uploadFiles(ProcessingContext context) throws OciException {
|
||||
List<FileEntry> files = context.extractedFiles.stream()
|
||||
.filter(e -> !e.isMarker)
|
||||
.toList();
|
||||
@@ -77,13 +77,24 @@ public class OciUploadService {
|
||||
Log.infof("Datei hochgeladen: %s (%d Bytes)", key, entry.fileSize);
|
||||
}
|
||||
|
||||
Log.infof("OCI-Upload Dateien abgeschlossen: %d Datei(en) in '%s'",
|
||||
files.size(), buildPrefix(context.zipNameWithoutExt));
|
||||
}
|
||||
|
||||
/**
|
||||
* Setzt den Marker in OCI — signalisiert der DB-Verarbeitung, dass der Batch vollständig ist.
|
||||
* Wird erst nach dem SFTP-Rename zu {@code .processed} aufgerufen, damit Marker und
|
||||
* SFTP-Zustand immer konsistent sind: Marker vorhanden ↔ ZIP bereits als verarbeitet markiert.
|
||||
*
|
||||
* @param context enthält den Ziel-Prefix für den Marker-Key
|
||||
* @throws OciException bei Verbindungs- oder Upload-Fehlern
|
||||
*/
|
||||
public void uploadMarker(ProcessingContext context) throws OciException {
|
||||
String markerKey = buildKey(context.zipNameWithoutExt, config.markerFilenameDbProcessing());
|
||||
Log.infof("Lade Marker hoch: '%s'", markerKey);
|
||||
uploadMarker(markerKey);
|
||||
|
||||
putMarker(markerKey);
|
||||
context.markerUploaded = true;
|
||||
Log.infof("OCI-Upload abgeschlossen: %d Datei(en) + Marker in '%s'",
|
||||
files.size(), buildPrefix(context.zipNameWithoutExt));
|
||||
Log.infof("Marker hochgeladen: '%s'", markerKey);
|
||||
}
|
||||
|
||||
private String buildPrefix(String zipNameWithoutExt) {
|
||||
@@ -108,7 +119,7 @@ public class OciUploadService {
|
||||
}
|
||||
}
|
||||
|
||||
private void uploadMarker(String key) throws OciException {
|
||||
private void putMarker(String key) throws OciException {
|
||||
try (InputStream is = InputStream.nullInputStream()) {
|
||||
client.putObject(PutObjectRequest.builder()
|
||||
.namespaceName(config.namespace())
|
||||
|
||||
@@ -69,6 +69,8 @@ public class FileProcessingPipeline {
|
||||
executor.submit(() -> {
|
||||
try {
|
||||
processAll();
|
||||
} catch (Exception e) {
|
||||
Log.errorf(e, "Unerwarteter Fehler im Pipeline-Lauf");
|
||||
} finally {
|
||||
isRunning.set(false);
|
||||
}
|
||||
@@ -91,6 +93,7 @@ public class FileProcessingPipeline {
|
||||
|
||||
if (zipFiles.isEmpty()) {
|
||||
Log.info("Keine neuen ZIP-Dateien auf dem SFTP-Server gefunden");
|
||||
Log.info("Pipeline-Lauf abgeschlossen");
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -122,17 +125,24 @@ public class FileProcessingPipeline {
|
||||
Log.infof("ZIP '%s' entpackt: %d Datei(en)", zipFilename,
|
||||
context.extractedFiles.size());
|
||||
|
||||
// --- OCI Upload ---
|
||||
// --- OCI Upload (Dateien, noch kein Marker) ---
|
||||
MDC.put("step", "oci-upload");
|
||||
context.status = ProcessingStatus.PARTIALLY_UPLOADED;
|
||||
Log.info("Starte OCI-Upload");
|
||||
ociUploadService.upload(context);
|
||||
context.status = ProcessingStatus.MARKER_UPLOADED;
|
||||
ociUploadService.uploadFiles(context);
|
||||
|
||||
// --- SFTP Rename → .processed ---
|
||||
// Erst nach erfolgreichem Datei-Upload — Marker kommt danach,
|
||||
// damit Marker-Präsenz in OCI ↔ ZIP bereits .processed auf SFTP.
|
||||
MDC.put("step", "sftp-rename");
|
||||
sftpService.renameFile(zipFilename, zipFilename + ".processed");
|
||||
|
||||
// --- OCI Marker ---
|
||||
// Signalisiert der DB-Verarbeitung, dass der Batch vollständig hochgeladen ist.
|
||||
MDC.put("step", "oci-marker");
|
||||
ociUploadService.uploadMarker(context);
|
||||
context.status = ProcessingStatus.MARKER_UPLOADED;
|
||||
|
||||
// --- ORDS Notify ---
|
||||
MDC.put("step", "ords-notify");
|
||||
ordsNotificationService.triggerDbProcessing(context);
|
||||
@@ -140,14 +150,19 @@ public class FileProcessingPipeline {
|
||||
context.status = ProcessingStatus.ORDS_NOTIFIED;
|
||||
Log.infof("Verarbeitung erfolgreich abgeschlossen: '%s'", zipFilename);
|
||||
|
||||
} catch (SftpException | ZipException | OciException | OrdsException e) {
|
||||
Log.errorf(e, "Verarbeitung von '%s' fehlgeschlagen: %s", zipFilename, e.getMessage());
|
||||
} catch (ZipException e) {
|
||||
Log.errorf(e, "Ungültige ZIP-Datei '%s' — wird zu .error umbenannt", zipFilename);
|
||||
context.status = ProcessingStatus.FAILED;
|
||||
tryRenameToError(zipFilename);
|
||||
} catch (SftpException | OciException | OrdsException e) {
|
||||
Log.errorf(e, "Verarbeitung von '%s' fehlgeschlagen (Infrastruktur): %s", zipFilename, e.getMessage());
|
||||
context.status = ProcessingStatus.FAILED;
|
||||
} catch (IOException e) {
|
||||
Log.errorf(e, "I/O-Fehler bei der Verarbeitung von '%s'", zipFilename);
|
||||
context.status = ProcessingStatus.FAILED;
|
||||
tryRenameToError(zipFilename);
|
||||
} catch (RuntimeException e) {
|
||||
Log.errorf(e, "Unerwarteter Laufzeitfehler bei der Verarbeitung von '%s'", zipFilename);
|
||||
context.status = ProcessingStatus.FAILED;
|
||||
} finally {
|
||||
postProcessingCleanup(context);
|
||||
long durationSeconds = Duration.between(context.startTime, LocalDateTime.now()).toSeconds();
|
||||
|
||||
Reference in New Issue
Block a user