runId in fileId für Logging umbenannt, neue pipelineRunId eingeführt und ORDS aufruf einmalig ans Ende der Pipeline gesetzt, anstatt nach jeder Datei
This commit is contained in:
@@ -12,7 +12,10 @@
|
|||||||
"WebFetch(domain:medium.com)",
|
"WebFetch(domain:medium.com)",
|
||||||
"WebFetch(domain:quarkus.io)",
|
"WebFetch(domain:quarkus.io)",
|
||||||
"WebFetch(domain:github.com)",
|
"WebFetch(domain:github.com)",
|
||||||
"WebFetch(domain:walidhajeri.hashnode.dev)"
|
"WebFetch(domain:walidhajeri.hashnode.dev)",
|
||||||
|
"Bash(find \"C:\\\\\\\\src\\\\\\\\Galabau\\\\\\\\glb-spielwiese\\\\\\\\automaton\" -name \"FileProcessingPipeline.java\")",
|
||||||
|
"PowerShell(Get-ChildItem -Path \"C:\\\\src\\\\Galabau\\\\glb-spielwiese\\\\quarkus-automaton\\\\src\\\\main\\\\java\" -Recurse | Where-Object { !$_.PSIsContainer } | Select-Object FullName)",
|
||||||
|
"PowerShell(cmd /c \"dir /s /b C:\\\\src\\\\Galabau\\\\glb-spielwiese\\\\quarkus-automaton\\\\src\\\\main\\\\java\")"
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,8 +12,8 @@ import java.util.UUID;
|
|||||||
*/
|
*/
|
||||||
public class ProcessingContext {
|
public class ProcessingContext {
|
||||||
|
|
||||||
/** Eindeutige Lauf-ID — wird als MDC-Feld {@code runId} gesetzt. */
|
/** Eindeutige Datei-ID — wird als MDC-Feld {@code fileId} gesetzt. */
|
||||||
public final UUID runId;
|
public final UUID fileId;
|
||||||
|
|
||||||
/** Originaler ZIP-Dateiname auf dem SFTP-Server, z.B. {@code export_2026-04-08.zip}. */
|
/** Originaler ZIP-Dateiname auf dem SFTP-Server, z.B. {@code export_2026-04-08.zip}. */
|
||||||
public final String zipFilename;
|
public final String zipFilename;
|
||||||
@@ -39,8 +39,8 @@ public class ProcessingContext {
|
|||||||
/** Aktueller Verarbeitungsstatus. */
|
/** Aktueller Verarbeitungsstatus. */
|
||||||
public ProcessingStatus status = ProcessingStatus.PENDING;
|
public ProcessingStatus status = ProcessingStatus.PENDING;
|
||||||
|
|
||||||
public ProcessingContext(UUID runId, String zipFilename) {
|
public ProcessingContext(UUID fileId, String zipFilename) {
|
||||||
this.runId = runId;
|
this.fileId = fileId;
|
||||||
this.zipFilename = zipFilename;
|
this.zipFilename = zipFilename;
|
||||||
this.zipNameWithoutExt = zipFilename.endsWith(".zip")
|
this.zipNameWithoutExt = zipFilename.endsWith(".zip")
|
||||||
? zipFilename.substring(0, zipFilename.length() - 4)
|
? zipFilename.substring(0, zipFilename.length() - 4)
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ package de.galabau.dateieingang.ords;
|
|||||||
|
|
||||||
import de.galabau.dateieingang.config.OrdsConfig;
|
import de.galabau.dateieingang.config.OrdsConfig;
|
||||||
import de.galabau.dateieingang.exception.OrdsException;
|
import de.galabau.dateieingang.exception.OrdsException;
|
||||||
import de.galabau.dateieingang.model.ProcessingContext;
|
|
||||||
import io.quarkus.logging.Log;
|
import io.quarkus.logging.Log;
|
||||||
import jakarta.enterprise.context.ApplicationScoped;
|
import jakarta.enterprise.context.ApplicationScoped;
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
@@ -35,28 +34,25 @@ public class OrdsNotificationService {
|
|||||||
* Wird bei transienten Fehlern bis zu 3-mal wiederholt (1s Backoff, 10s Timeout je Versuch).
|
* Wird bei transienten Fehlern bis zu 3-mal wiederholt (1s Backoff, 10s Timeout je Versuch).
|
||||||
* Maximale Wartezeit: ca. 33 Sekunden (3 × 10s + 3 × 1s Backoff).
|
* Maximale Wartezeit: ca. 33 Sekunden (3 × 10s + 3 × 1s Backoff).
|
||||||
*
|
*
|
||||||
* @param context enthält {@code zipNameWithoutExt} für das Log
|
|
||||||
* @throws OrdsException wenn der ORDS-Aufruf nach allen Retries fehlschlägt
|
* @throws OrdsException wenn der ORDS-Aufruf nach allen Retries fehlschlägt
|
||||||
*/
|
*/
|
||||||
@Retry(maxRetries = 3, delay = 1000, delayUnit = ChronoUnit.MILLIS,
|
@Retry(maxRetries = 3, delay = 1000, delayUnit = ChronoUnit.MILLIS,
|
||||||
retryOn = OrdsException.class)
|
retryOn = OrdsException.class)
|
||||||
@Timeout(value = 10, unit = ChronoUnit.SECONDS)
|
@Timeout(value = 10, unit = ChronoUnit.SECONDS)
|
||||||
public void triggerDbProcessing(ProcessingContext context) throws OrdsException {
|
public void triggerDbProcessing() throws OrdsException {
|
||||||
Log.infof("Rufe ORDS-Endpunkt auf für '%s'", context.zipNameWithoutExt);
|
Log.info("Rufe ORDS-Endpunkt auf");
|
||||||
Response response;
|
Response response;
|
||||||
try {
|
try {
|
||||||
response = ordsClient.processIncomingBaData(config.apiKey());
|
response = ordsClient.processIncomingBaData(config.apiKey());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new OrdsException("ORDS-Verbindung fehlgeschlagen für '"
|
throw new OrdsException("ORDS-Verbindung fehlgeschlagen", e);
|
||||||
+ context.zipNameWithoutExt + "'", e);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int status = response.getStatus();
|
int status = response.getStatus();
|
||||||
if (status >= 400) {
|
if (status >= 400) {
|
||||||
throw new OrdsException("ORDS antwortete mit HTTP " + status
|
throw new OrdsException("ORDS antwortete mit HTTP " + status);
|
||||||
+ " für '" + context.zipNameWithoutExt + "'");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Log.infof("ORDS-Endpunkt aufgerufen, HTTP %d für '%s'", status, context.zipNameWithoutExt);
|
Log.infof("ORDS-Endpunkt aufgerufen, HTTP %d", status);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -79,38 +79,49 @@ public class FileProcessingPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void processAll() {
|
private void processAll() {
|
||||||
Log.info("Pipeline-Lauf gestartet");
|
UUID pipelineRunId = UUID.randomUUID();
|
||||||
|
MDC.put("pipelineRunId", pipelineRunId.toString());
|
||||||
|
Log.infof("Pipeline-Lauf gestartet [pipelineRunId=%s]", pipelineRunId);
|
||||||
|
|
||||||
preProcessingCleanup();
|
|
||||||
|
|
||||||
List<String> zipFiles;
|
|
||||||
try {
|
try {
|
||||||
zipFiles = sftpService.listZipFiles();
|
preProcessingCleanup();
|
||||||
} catch (SftpException e) {
|
|
||||||
Log.errorf(e, "SFTP-Listing fehlgeschlagen — Pipeline-Lauf abgebrochen");
|
List<String> zipFiles;
|
||||||
return;
|
try {
|
||||||
|
zipFiles = sftpService.listZipFiles();
|
||||||
|
} catch (SftpException e) {
|
||||||
|
Log.errorf(e, "SFTP-Listing fehlgeschlagen — Pipeline-Lauf abgebrochen");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (zipFiles.isEmpty()) {
|
||||||
|
Log.info("Keine neuen ZIP-Dateien auf dem SFTP-Server gefunden");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Log.infof("%d neue ZIP-Datei(en) auf dem SFTP-Server gefunden", zipFiles.size());
|
||||||
|
|
||||||
|
for (String zipFilename : zipFiles) {
|
||||||
|
Log.infof("ZIP-Datei gefunden: %s", zipFilename);
|
||||||
|
processZip(zipFilename);
|
||||||
|
}
|
||||||
|
|
||||||
|
MDC.put("step", "ords-notify");
|
||||||
|
try {
|
||||||
|
ordsNotificationService.triggerDbProcessing();
|
||||||
|
} catch (OrdsException e) {
|
||||||
|
Log.errorf(e, "ORDS-Benachrichtigung fehlgeschlagen — DB-Verarbeitung wird beim nächsten Lauf ausgelöst");
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
Log.infof("Pipeline-Lauf abgeschlossen [pipelineRunId=%s]", pipelineRunId);
|
||||||
|
MDC.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (zipFiles.isEmpty()) {
|
|
||||||
Log.info("Keine neuen ZIP-Dateien auf dem SFTP-Server gefunden");
|
|
||||||
Log.info("Pipeline-Lauf abgeschlossen");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
Log.infof("%d neue ZIP-Datei(en) auf dem SFTP-Server gefunden", zipFiles.size());
|
|
||||||
|
|
||||||
for (String zipFilename : zipFiles) {
|
|
||||||
Log.infof("Datei gefunden: %s", zipFilename);
|
|
||||||
processZip(zipFilename);
|
|
||||||
}
|
|
||||||
|
|
||||||
Log.info("Pipeline-Lauf abgeschlossen");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processZip(String zipFilename) {
|
private void processZip(String zipFilename) {
|
||||||
ProcessingContext context = new ProcessingContext(UUID.randomUUID(), zipFilename);
|
ProcessingContext context = new ProcessingContext(UUID.randomUUID(), zipFilename);
|
||||||
MDC.put("runId", context.runId.toString());
|
MDC.put("fileId", context.fileId.toString());
|
||||||
Log.infof("Starte Verarbeitung von '%s' [runId=%s]", zipFilename, context.runId);
|
Log.infof("Starte Verarbeitung von '%s' [fileId=%s]", zipFilename, context.fileId);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// --- Download ---
|
// --- Download ---
|
||||||
@@ -142,19 +153,13 @@ public class FileProcessingPipeline {
|
|||||||
MDC.put("step", "oci-marker");
|
MDC.put("step", "oci-marker");
|
||||||
ociUploadService.uploadMarker(context);
|
ociUploadService.uploadMarker(context);
|
||||||
context.status = ProcessingStatus.MARKER_UPLOADED;
|
context.status = ProcessingStatus.MARKER_UPLOADED;
|
||||||
|
|
||||||
// --- ORDS Notify ---
|
|
||||||
MDC.put("step", "ords-notify");
|
|
||||||
ordsNotificationService.triggerDbProcessing(context);
|
|
||||||
|
|
||||||
context.status = ProcessingStatus.ORDS_NOTIFIED;
|
|
||||||
Log.infof("Verarbeitung erfolgreich abgeschlossen: '%s'", zipFilename);
|
Log.infof("Verarbeitung erfolgreich abgeschlossen: '%s'", zipFilename);
|
||||||
|
|
||||||
} catch (ZipException e) {
|
} catch (ZipException e) {
|
||||||
Log.errorf(e, "Ungültige ZIP-Datei '%s' — wird zu .error umbenannt", zipFilename);
|
Log.errorf(e, "Ungültige ZIP-Datei '%s' — wird zu .error umbenannt", zipFilename);
|
||||||
context.status = ProcessingStatus.FAILED;
|
context.status = ProcessingStatus.FAILED;
|
||||||
tryRenameToError(zipFilename);
|
tryRenameToError(zipFilename);
|
||||||
} catch (SftpException | OciException | OrdsException e) {
|
} catch (SftpException | OciException e) {
|
||||||
Log.errorf(e, "Verarbeitung von '%s' fehlgeschlagen (Infrastruktur): %s", zipFilename, e.getMessage());
|
Log.errorf(e, "Verarbeitung von '%s' fehlgeschlagen (Infrastruktur): %s", zipFilename, e.getMessage());
|
||||||
context.status = ProcessingStatus.FAILED;
|
context.status = ProcessingStatus.FAILED;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
@@ -166,10 +171,11 @@ public class FileProcessingPipeline {
|
|||||||
} finally {
|
} finally {
|
||||||
postProcessingCleanup(context);
|
postProcessingCleanup(context);
|
||||||
long duration = Duration.between(context.startTime, LocalDateTime.now()).toMillis();
|
long duration = Duration.between(context.startTime, LocalDateTime.now()).toMillis();
|
||||||
Log.infof("Lauf %s für Datei %s abgeschlossen — Status: %s, Dauer: %dms",
|
Log.infof("Datei %s abgeschlossen — Status: %s, Dauer: %dms [fileId=%s]",
|
||||||
context.runId, zipFilename, context.status, duration);
|
zipFilename, context.status, duration, context.fileId);
|
||||||
Log.info("-----------------------------------------------------------------------------------------------------");
|
Log.info("-----------------------------------------------------------------------------------------------------");
|
||||||
MDC.clear();
|
MDC.remove("fileId");
|
||||||
|
MDC.remove("step");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -234,7 +240,7 @@ public class FileProcessingPipeline {
|
|||||||
*/
|
*/
|
||||||
private void postProcessingCleanup(ProcessingContext context) {
|
private void postProcessingCleanup(ProcessingContext context) {
|
||||||
MDC.put("step", "post-cleanup");
|
MDC.put("step", "post-cleanup");
|
||||||
Log.infof("Cleanup gestartet für Lauf %s", context.runId);
|
Log.infof("Cleanup gestartet für Datei '%s'", context.zipFilename);
|
||||||
try {
|
try {
|
||||||
if (context.localZipPath != null) {
|
if (context.localZipPath != null) {
|
||||||
Files.deleteIfExists(context.localZipPath);
|
Files.deleteIfExists(context.localZipPath);
|
||||||
@@ -245,8 +251,8 @@ public class FileProcessingPipeline {
|
|||||||
Log.infof("Lokales Entpack-Verzeichnis gelöscht: %s", context.localExtractDir);
|
Log.infof("Lokales Entpack-Verzeichnis gelöscht: %s", context.localExtractDir);
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
Log.warnf(e, "Cleanup für Lauf %s fehlgeschlagen — lokale Dateien verbleiben ggf. in %s",
|
Log.warnf(e, "Cleanup für '%s' fehlgeschlagen — lokale Dateien verbleiben ggf. in %s",
|
||||||
context.runId,
|
context.zipFilename,
|
||||||
context.localZipPath != null ? context.localZipPath.getParent() : "unbekannt");
|
context.localZipPath != null ? context.localZipPath.getParent() : "unbekannt");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user