diff --git a/.claude/settings.local.json b/.claude/settings.local.json index f5c663d..84e2f73 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -12,7 +12,10 @@ "WebFetch(domain:medium.com)", "WebFetch(domain:quarkus.io)", "WebFetch(domain:github.com)", - "WebFetch(domain:walidhajeri.hashnode.dev)" + "WebFetch(domain:walidhajeri.hashnode.dev)", + "Bash(find \"C:\\\\\\\\src\\\\\\\\Galabau\\\\\\\\glb-spielwiese\\\\\\\\automaton\" -name \"FileProcessingPipeline.java\")", + "PowerShell(Get-ChildItem -Path \"C:\\\\src\\\\Galabau\\\\glb-spielwiese\\\\quarkus-automaton\\\\src\\\\main\\\\java\" -Recurse | Where-Object { !$_.PSIsContainer } | Select-Object FullName)", + "PowerShell(cmd /c \"dir /s /b C:\\\\src\\\\Galabau\\\\glb-spielwiese\\\\quarkus-automaton\\\\src\\\\main\\\\java\")" ] } } diff --git a/quarkus-automaton/src/main/java/de/galabau/dateieingang/model/ProcessingContext.java b/quarkus-automaton/src/main/java/de/galabau/dateieingang/model/ProcessingContext.java index b082588..d747dbc 100644 --- a/quarkus-automaton/src/main/java/de/galabau/dateieingang/model/ProcessingContext.java +++ b/quarkus-automaton/src/main/java/de/galabau/dateieingang/model/ProcessingContext.java @@ -12,8 +12,8 @@ import java.util.UUID; */ public class ProcessingContext { - /** Eindeutige Lauf-ID — wird als MDC-Feld {@code runId} gesetzt. */ - public final UUID runId; + /** Eindeutige Datei-ID — wird als MDC-Feld {@code fileId} gesetzt. */ + public final UUID fileId; /** Originaler ZIP-Dateiname auf dem SFTP-Server, z.B. {@code export_2026-04-08.zip}. */ public final String zipFilename; @@ -39,8 +39,8 @@ public class ProcessingContext { /** Aktueller Verarbeitungsstatus. */ public ProcessingStatus status = ProcessingStatus.PENDING; - public ProcessingContext(UUID runId, String zipFilename) { - this.runId = runId; + public ProcessingContext(UUID fileId, String zipFilename) { + this.fileId = fileId; this.zipFilename = zipFilename; this.zipNameWithoutExt = zipFilename.endsWith(".zip") ? zipFilename.substring(0, zipFilename.length() - 4) diff --git a/quarkus-automaton/src/main/java/de/galabau/dateieingang/ords/OrdsNotificationService.java b/quarkus-automaton/src/main/java/de/galabau/dateieingang/ords/OrdsNotificationService.java index 9240a8e..004ea93 100644 --- a/quarkus-automaton/src/main/java/de/galabau/dateieingang/ords/OrdsNotificationService.java +++ b/quarkus-automaton/src/main/java/de/galabau/dateieingang/ords/OrdsNotificationService.java @@ -2,7 +2,6 @@ package de.galabau.dateieingang.ords; import de.galabau.dateieingang.config.OrdsConfig; import de.galabau.dateieingang.exception.OrdsException; -import de.galabau.dateieingang.model.ProcessingContext; import io.quarkus.logging.Log; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -35,28 +34,25 @@ public class OrdsNotificationService { * Wird bei transienten Fehlern bis zu 3-mal wiederholt (1s Backoff, 10s Timeout je Versuch). * Maximale Wartezeit: ca. 33 Sekunden (3 × 10s + 3 × 1s Backoff). * - * @param context enthält {@code zipNameWithoutExt} für das Log * @throws OrdsException wenn der ORDS-Aufruf nach allen Retries fehlschlägt */ @Retry(maxRetries = 3, delay = 1000, delayUnit = ChronoUnit.MILLIS, retryOn = OrdsException.class) @Timeout(value = 10, unit = ChronoUnit.SECONDS) - public void triggerDbProcessing(ProcessingContext context) throws OrdsException { - Log.infof("Rufe ORDS-Endpunkt auf für '%s'", context.zipNameWithoutExt); + public void triggerDbProcessing() throws OrdsException { + Log.info("Rufe ORDS-Endpunkt auf"); Response response; try { response = ordsClient.processIncomingBaData(config.apiKey()); } catch (Exception e) { - throw new OrdsException("ORDS-Verbindung fehlgeschlagen für '" - + context.zipNameWithoutExt + "'", e); + throw new OrdsException("ORDS-Verbindung fehlgeschlagen", e); } int status = response.getStatus(); if (status >= 400) { - throw new OrdsException("ORDS antwortete mit HTTP " + status - + " für '" + context.zipNameWithoutExt + "'"); + throw new OrdsException("ORDS antwortete mit HTTP " + status); } - Log.infof("ORDS-Endpunkt aufgerufen, HTTP %d für '%s'", status, context.zipNameWithoutExt); + Log.infof("ORDS-Endpunkt aufgerufen, HTTP %d", status); } } diff --git a/quarkus-automaton/src/main/java/de/galabau/dateieingang/pipeline/FileProcessingPipeline.java b/quarkus-automaton/src/main/java/de/galabau/dateieingang/pipeline/FileProcessingPipeline.java index c733097..8a63180 100644 --- a/quarkus-automaton/src/main/java/de/galabau/dateieingang/pipeline/FileProcessingPipeline.java +++ b/quarkus-automaton/src/main/java/de/galabau/dateieingang/pipeline/FileProcessingPipeline.java @@ -79,38 +79,49 @@ public class FileProcessingPipeline { } private void processAll() { - Log.info("Pipeline-Lauf gestartet"); + UUID pipelineRunId = UUID.randomUUID(); + MDC.put("pipelineRunId", pipelineRunId.toString()); + Log.infof("Pipeline-Lauf gestartet [pipelineRunId=%s]", pipelineRunId); - preProcessingCleanup(); - - List zipFiles; try { - zipFiles = sftpService.listZipFiles(); - } catch (SftpException e) { - Log.errorf(e, "SFTP-Listing fehlgeschlagen — Pipeline-Lauf abgebrochen"); - return; + preProcessingCleanup(); + + List zipFiles; + try { + zipFiles = sftpService.listZipFiles(); + } catch (SftpException e) { + Log.errorf(e, "SFTP-Listing fehlgeschlagen — Pipeline-Lauf abgebrochen"); + return; + } + + if (zipFiles.isEmpty()) { + Log.info("Keine neuen ZIP-Dateien auf dem SFTP-Server gefunden"); + return; + } + + Log.infof("%d neue ZIP-Datei(en) auf dem SFTP-Server gefunden", zipFiles.size()); + + for (String zipFilename : zipFiles) { + Log.infof("ZIP-Datei gefunden: %s", zipFilename); + processZip(zipFilename); + } + + MDC.put("step", "ords-notify"); + try { + ordsNotificationService.triggerDbProcessing(); + } catch (OrdsException e) { + Log.errorf(e, "ORDS-Benachrichtigung fehlgeschlagen — DB-Verarbeitung wird beim nächsten Lauf ausgelöst"); + } + } finally { + Log.infof("Pipeline-Lauf abgeschlossen [pipelineRunId=%s]", pipelineRunId); + MDC.clear(); } - - if (zipFiles.isEmpty()) { - Log.info("Keine neuen ZIP-Dateien auf dem SFTP-Server gefunden"); - Log.info("Pipeline-Lauf abgeschlossen"); - return; - } - - Log.infof("%d neue ZIP-Datei(en) auf dem SFTP-Server gefunden", zipFiles.size()); - - for (String zipFilename : zipFiles) { - Log.infof("Datei gefunden: %s", zipFilename); - processZip(zipFilename); - } - - Log.info("Pipeline-Lauf abgeschlossen"); } private void processZip(String zipFilename) { ProcessingContext context = new ProcessingContext(UUID.randomUUID(), zipFilename); - MDC.put("runId", context.runId.toString()); - Log.infof("Starte Verarbeitung von '%s' [runId=%s]", zipFilename, context.runId); + MDC.put("fileId", context.fileId.toString()); + Log.infof("Starte Verarbeitung von '%s' [fileId=%s]", zipFilename, context.fileId); try { // --- Download --- @@ -142,19 +153,13 @@ public class FileProcessingPipeline { MDC.put("step", "oci-marker"); ociUploadService.uploadMarker(context); context.status = ProcessingStatus.MARKER_UPLOADED; - - // --- ORDS Notify --- - MDC.put("step", "ords-notify"); - ordsNotificationService.triggerDbProcessing(context); - - context.status = ProcessingStatus.ORDS_NOTIFIED; Log.infof("Verarbeitung erfolgreich abgeschlossen: '%s'", zipFilename); } catch (ZipException e) { Log.errorf(e, "Ungültige ZIP-Datei '%s' — wird zu .error umbenannt", zipFilename); context.status = ProcessingStatus.FAILED; tryRenameToError(zipFilename); - } catch (SftpException | OciException | OrdsException e) { + } catch (SftpException | OciException e) { Log.errorf(e, "Verarbeitung von '%s' fehlgeschlagen (Infrastruktur): %s", zipFilename, e.getMessage()); context.status = ProcessingStatus.FAILED; } catch (IOException e) { @@ -166,10 +171,11 @@ public class FileProcessingPipeline { } finally { postProcessingCleanup(context); long duration = Duration.between(context.startTime, LocalDateTime.now()).toMillis(); - Log.infof("Lauf %s für Datei %s abgeschlossen — Status: %s, Dauer: %dms", - context.runId, zipFilename, context.status, duration); + Log.infof("Datei %s abgeschlossen — Status: %s, Dauer: %dms [fileId=%s]", + zipFilename, context.status, duration, context.fileId); Log.info("-----------------------------------------------------------------------------------------------------"); - MDC.clear(); + MDC.remove("fileId"); + MDC.remove("step"); } } @@ -234,7 +240,7 @@ public class FileProcessingPipeline { */ private void postProcessingCleanup(ProcessingContext context) { MDC.put("step", "post-cleanup"); - Log.infof("Cleanup gestartet für Lauf %s", context.runId); + Log.infof("Cleanup gestartet für Datei '%s'", context.zipFilename); try { if (context.localZipPath != null) { Files.deleteIfExists(context.localZipPath); @@ -245,8 +251,8 @@ public class FileProcessingPipeline { Log.infof("Lokales Entpack-Verzeichnis gelöscht: %s", context.localExtractDir); } } catch (IOException e) { - Log.warnf(e, "Cleanup für Lauf %s fehlgeschlagen — lokale Dateien verbleiben ggf. in %s", - context.runId, + Log.warnf(e, "Cleanup für '%s' fehlgeschlagen — lokale Dateien verbleiben ggf. in %s", + context.zipFilename, context.localZipPath != null ? context.localZipPath.getParent() : "unbekannt"); } }