Compare commits

...

2 Commits

5 changed files with 72 additions and 59 deletions

View File

@@ -12,7 +12,10 @@
"WebFetch(domain:medium.com)", "WebFetch(domain:medium.com)",
"WebFetch(domain:quarkus.io)", "WebFetch(domain:quarkus.io)",
"WebFetch(domain:github.com)", "WebFetch(domain:github.com)",
"WebFetch(domain:walidhajeri.hashnode.dev)" "WebFetch(domain:walidhajeri.hashnode.dev)",
"Bash(find \"C:\\\\\\\\src\\\\\\\\Galabau\\\\\\\\glb-spielwiese\\\\\\\\automaton\" -name \"FileProcessingPipeline.java\")",
"PowerShell(Get-ChildItem -Path \"C:\\\\src\\\\Galabau\\\\glb-spielwiese\\\\quarkus-automaton\\\\src\\\\main\\\\java\" -Recurse | Where-Object { !$_.PSIsContainer } | Select-Object FullName)",
"PowerShell(cmd /c \"dir /s /b C:\\\\src\\\\Galabau\\\\glb-spielwiese\\\\quarkus-automaton\\\\src\\\\main\\\\java\")"
] ]
} }
} }

View File

@@ -55,7 +55,7 @@ create or replace package body pck_auto_import as
pck_log.p_info( pck_log.p_info(
i_module => c_log_module i_module => c_log_module
,i_action => l_log_action ,i_action => l_log_action
,i_message => 'Verarbeitung offener OCI-Batches gestartet' ,i_message => 'Verarbeitung offener BA-Korrespondenz-Dateien aus OCI gestartet'
); );
p_process_incoming_ba_data; p_process_incoming_ba_data;
@@ -63,7 +63,7 @@ create or replace package body pck_auto_import as
pck_log.p_info( pck_log.p_info(
i_module => c_log_module i_module => c_log_module
,i_action => l_log_action ,i_action => l_log_action
,i_message => 'Verarbeitung offener OCI-Batches abgeschlossen' ,i_message => 'Verarbeitung offener BA-Korrespondenz-Dateien aus OCI abgeschlossen'
); );
-- Quarkus anstoßen — Fehler werden geloggt, nicht eskaliert -- Quarkus anstoßen — Fehler werden geloggt, nicht eskaliert
@@ -264,7 +264,7 @@ create or replace package body pck_auto_import as
pck_log.p_info( pck_log.p_info(
i_module => c_log_module i_module => c_log_module
,i_action => l_log_action ,i_action => l_log_action
,i_message => 'Verarbeitung von eingehen BA Korrespondenzen gestartet' ,i_message => 'BA-KORRESPONDENZEN IMPORT-START: Verarbeitung von eingehen BA Korrespondenzen gestartet'
); );
-- Zielordner Name zusammenstellen -- Zielordner Name zusammenstellen
@@ -314,7 +314,7 @@ create or replace package body pck_auto_import as
pck_log.p_info( pck_log.p_info(
i_module => c_log_module i_module => c_log_module
,i_action => l_log_action ,i_action => l_log_action
,i_message => 'Kein DB-Verarbeitungsmarker vorhanden — Batch wird übersprungen (Upload noch nicht abgeschlossen)' ,i_message => 'Kein DB-Verarbeitungsmarker vorhanden — Entpackter ZIP-Ordner wird übersprungen (Upload noch nicht abgeschlossen)'
,i_object_ref => rec_folder.object_key ,i_object_ref => rec_folder.object_key
); );
continue; continue;
@@ -333,7 +333,7 @@ create or replace package body pck_auto_import as
pck_log.p_info( pck_log.p_info(
i_module => c_log_module i_module => c_log_module
,i_action => l_log_action ,i_action => l_log_action
,i_message => 'Batch-Verarbeitung gestartet — Zielordner: "' || l_target_folder || '"' ,i_message => 'ZIP-START: Verarbeitung von entpacktem ZIP-Ordner gestartet — Zielordner: "' || l_target_folder || '"'
,i_object_ref => rec_folder.object_key ,i_object_ref => rec_folder.object_key
); );
@@ -415,7 +415,7 @@ create or replace package body pck_auto_import as
pck_log.p_warn( pck_log.p_warn(
i_module => c_log_module i_module => c_log_module
,i_action => l_log_action ,i_action => l_log_action
,i_message => 'Batch mit Fehlern abgeschlossen — mind. eine Datei konnte nicht importiert werden, SB-Marker wird hochgeladen...' ,i_message => 'ZIP-ENDE: Entpackter ZIP-Ordner mit Fehlern abgeschlossen — mind. eine Datei konnte nicht importiert werden, SB-Marker wird hochgeladen...'
,i_object_ref => rec_folder.object_key ,i_object_ref => rec_folder.object_key
); );
@@ -431,11 +431,19 @@ create or replace package body pck_auto_import as
pck_log.p_info( pck_log.p_info(
i_module => c_log_module i_module => c_log_module
,i_action => l_log_action ,i_action => l_log_action
,i_message => 'Batch abgeschlossen, alle Dateien erfolgreich importiert' ,i_message => 'ZIP-ENDE: Entpackter ZIP-Ordner abgeschlossen, alle Dateien erfolgreich importiert'
,i_object_ref => rec_folder.object_key ,i_object_ref => rec_folder.object_key
); );
end if; end if;
end loop; end loop;
pck_log.p_info(
i_module => c_log_module
,i_action => l_log_action
,i_message => 'BA-KORRESPONDENZEN IMPORT-ENDE: Verarbeitung von eingehen BA Korrespondenzen abgeschlossen'
);
end p_process_incoming_ba_data; end p_process_incoming_ba_data;
end pck_auto_import; end pck_auto_import;

View File

@@ -12,8 +12,8 @@ import java.util.UUID;
*/ */
public class ProcessingContext { public class ProcessingContext {
/** Eindeutige Lauf-ID — wird als MDC-Feld {@code runId} gesetzt. */ /** Eindeutige Datei-ID — wird als MDC-Feld {@code fileId} gesetzt. */
public final UUID runId; public final UUID fileId;
/** Originaler ZIP-Dateiname auf dem SFTP-Server, z.B. {@code export_2026-04-08.zip}. */ /** Originaler ZIP-Dateiname auf dem SFTP-Server, z.B. {@code export_2026-04-08.zip}. */
public final String zipFilename; public final String zipFilename;
@@ -39,8 +39,8 @@ public class ProcessingContext {
/** Aktueller Verarbeitungsstatus. */ /** Aktueller Verarbeitungsstatus. */
public ProcessingStatus status = ProcessingStatus.PENDING; public ProcessingStatus status = ProcessingStatus.PENDING;
public ProcessingContext(UUID runId, String zipFilename) { public ProcessingContext(UUID fileId, String zipFilename) {
this.runId = runId; this.fileId = fileId;
this.zipFilename = zipFilename; this.zipFilename = zipFilename;
this.zipNameWithoutExt = zipFilename.endsWith(".zip") this.zipNameWithoutExt = zipFilename.endsWith(".zip")
? zipFilename.substring(0, zipFilename.length() - 4) ? zipFilename.substring(0, zipFilename.length() - 4)

View File

@@ -2,7 +2,6 @@ package de.galabau.dateieingang.ords;
import de.galabau.dateieingang.config.OrdsConfig; import de.galabau.dateieingang.config.OrdsConfig;
import de.galabau.dateieingang.exception.OrdsException; import de.galabau.dateieingang.exception.OrdsException;
import de.galabau.dateieingang.model.ProcessingContext;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject; import jakarta.inject.Inject;
@@ -35,28 +34,25 @@ public class OrdsNotificationService {
* Wird bei transienten Fehlern bis zu 3-mal wiederholt (1s Backoff, 10s Timeout je Versuch). * Wird bei transienten Fehlern bis zu 3-mal wiederholt (1s Backoff, 10s Timeout je Versuch).
* Maximale Wartezeit: ca. 33 Sekunden (3 × 10s + 3 × 1s Backoff). * Maximale Wartezeit: ca. 33 Sekunden (3 × 10s + 3 × 1s Backoff).
* *
* @param context enthält {@code zipNameWithoutExt} für das Log
* @throws OrdsException wenn der ORDS-Aufruf nach allen Retries fehlschlägt * @throws OrdsException wenn der ORDS-Aufruf nach allen Retries fehlschlägt
*/ */
@Retry(maxRetries = 3, delay = 1000, delayUnit = ChronoUnit.MILLIS, @Retry(maxRetries = 3, delay = 1000, delayUnit = ChronoUnit.MILLIS,
retryOn = OrdsException.class) retryOn = OrdsException.class)
@Timeout(value = 10, unit = ChronoUnit.SECONDS) @Timeout(value = 10, unit = ChronoUnit.SECONDS)
public void triggerDbProcessing(ProcessingContext context) throws OrdsException { public void triggerDbProcessing() throws OrdsException {
Log.infof("Rufe ORDS-Endpunkt auf für '%s'", context.zipNameWithoutExt); Log.info("Rufe ORDS-Endpunkt auf");
Response response; Response response;
try { try {
response = ordsClient.processIncomingBaData(config.apiKey()); response = ordsClient.processIncomingBaData(config.apiKey());
} catch (Exception e) { } catch (Exception e) {
throw new OrdsException("ORDS-Verbindung fehlgeschlagen für '" throw new OrdsException("ORDS-Verbindung fehlgeschlagen", e);
+ context.zipNameWithoutExt + "'", e);
} }
int status = response.getStatus(); int status = response.getStatus();
if (status >= 400) { if (status >= 400) {
throw new OrdsException("ORDS antwortete mit HTTP " + status throw new OrdsException("ORDS antwortete mit HTTP " + status);
+ " für '" + context.zipNameWithoutExt + "'");
} }
Log.infof("ORDS-Endpunkt aufgerufen, HTTP %d für '%s'", status, context.zipNameWithoutExt); Log.infof("ORDS-Endpunkt aufgerufen, HTTP %d", status);
} }
} }

View File

@@ -79,38 +79,49 @@ public class FileProcessingPipeline {
} }
private void processAll() { private void processAll() {
Log.info("Pipeline-Lauf gestartet"); UUID pipelineRunId = UUID.randomUUID();
MDC.put("pipelineRunId", pipelineRunId.toString());
Log.infof("Pipeline-Lauf gestartet [pipelineRunId=%s]", pipelineRunId);
preProcessingCleanup();
List<String> zipFiles;
try { try {
zipFiles = sftpService.listZipFiles(); preProcessingCleanup();
} catch (SftpException e) {
Log.errorf(e, "SFTP-Listing fehlgeschlagen — Pipeline-Lauf abgebrochen"); List<String> zipFiles;
return; try {
zipFiles = sftpService.listZipFiles();
} catch (SftpException e) {
Log.errorf(e, "SFTP-Listing fehlgeschlagen — Pipeline-Lauf abgebrochen");
return;
}
if (zipFiles.isEmpty()) {
Log.info("Keine neuen ZIP-Dateien auf dem SFTP-Server gefunden");
return;
}
Log.infof("%d neue ZIP-Datei(en) auf dem SFTP-Server gefunden", zipFiles.size());
for (String zipFilename : zipFiles) {
Log.infof("ZIP-Datei gefunden: %s", zipFilename);
processZip(zipFilename);
}
MDC.put("step", "ords-notify");
try {
ordsNotificationService.triggerDbProcessing();
} catch (OrdsException e) {
Log.errorf(e, "ORDS-Benachrichtigung fehlgeschlagen — DB-Verarbeitung wird beim nächsten Lauf ausgelöst");
}
} finally {
Log.infof("Pipeline-Lauf abgeschlossen [pipelineRunId=%s]", pipelineRunId);
MDC.clear();
} }
if (zipFiles.isEmpty()) {
Log.info("Keine neuen ZIP-Dateien auf dem SFTP-Server gefunden");
Log.info("Pipeline-Lauf abgeschlossen");
return;
}
Log.infof("%d neue ZIP-Datei(en) auf dem SFTP-Server gefunden", zipFiles.size());
for (String zipFilename : zipFiles) {
Log.infof("Datei gefunden: %s", zipFilename);
processZip(zipFilename);
}
Log.info("Pipeline-Lauf abgeschlossen");
} }
private void processZip(String zipFilename) { private void processZip(String zipFilename) {
ProcessingContext context = new ProcessingContext(UUID.randomUUID(), zipFilename); ProcessingContext context = new ProcessingContext(UUID.randomUUID(), zipFilename);
MDC.put("runId", context.runId.toString()); MDC.put("fileId", context.fileId.toString());
Log.infof("Starte Verarbeitung von '%s' [runId=%s]", zipFilename, context.runId); Log.infof("Starte Verarbeitung von '%s' [fileId=%s]", zipFilename, context.fileId);
try { try {
// --- Download --- // --- Download ---
@@ -142,19 +153,13 @@ public class FileProcessingPipeline {
MDC.put("step", "oci-marker"); MDC.put("step", "oci-marker");
ociUploadService.uploadMarker(context); ociUploadService.uploadMarker(context);
context.status = ProcessingStatus.MARKER_UPLOADED; context.status = ProcessingStatus.MARKER_UPLOADED;
// --- ORDS Notify ---
MDC.put("step", "ords-notify");
ordsNotificationService.triggerDbProcessing(context);
context.status = ProcessingStatus.ORDS_NOTIFIED;
Log.infof("Verarbeitung erfolgreich abgeschlossen: '%s'", zipFilename); Log.infof("Verarbeitung erfolgreich abgeschlossen: '%s'", zipFilename);
} catch (ZipException e) { } catch (ZipException e) {
Log.errorf(e, "Ungültige ZIP-Datei '%s' — wird zu .error umbenannt", zipFilename); Log.errorf(e, "Ungültige ZIP-Datei '%s' — wird zu .error umbenannt", zipFilename);
context.status = ProcessingStatus.FAILED; context.status = ProcessingStatus.FAILED;
tryRenameToError(zipFilename); tryRenameToError(zipFilename);
} catch (SftpException | OciException | OrdsException e) { } catch (SftpException | OciException e) {
Log.errorf(e, "Verarbeitung von '%s' fehlgeschlagen (Infrastruktur): %s", zipFilename, e.getMessage()); Log.errorf(e, "Verarbeitung von '%s' fehlgeschlagen (Infrastruktur): %s", zipFilename, e.getMessage());
context.status = ProcessingStatus.FAILED; context.status = ProcessingStatus.FAILED;
} catch (IOException e) { } catch (IOException e) {
@@ -166,10 +171,11 @@ public class FileProcessingPipeline {
} finally { } finally {
postProcessingCleanup(context); postProcessingCleanup(context);
long duration = Duration.between(context.startTime, LocalDateTime.now()).toMillis(); long duration = Duration.between(context.startTime, LocalDateTime.now()).toMillis();
Log.infof("Lauf %s für Datei %s abgeschlossen — Status: %s, Dauer: %dms", Log.infof("Datei %s abgeschlossen — Status: %s, Dauer: %dms [fileId=%s]",
context.runId, zipFilename, context.status, duration); zipFilename, context.status, duration, context.fileId);
Log.info("-----------------------------------------------------------------------------------------------------"); Log.info("-----------------------------------------------------------------------------------------------------");
MDC.clear(); MDC.remove("fileId");
MDC.remove("step");
} }
} }
@@ -234,7 +240,7 @@ public class FileProcessingPipeline {
*/ */
private void postProcessingCleanup(ProcessingContext context) { private void postProcessingCleanup(ProcessingContext context) {
MDC.put("step", "post-cleanup"); MDC.put("step", "post-cleanup");
Log.infof("Cleanup gestartet für Lauf %s", context.runId); Log.infof("Cleanup gestartet für Datei '%s'", context.zipFilename);
try { try {
if (context.localZipPath != null) { if (context.localZipPath != null) {
Files.deleteIfExists(context.localZipPath); Files.deleteIfExists(context.localZipPath);
@@ -245,8 +251,8 @@ public class FileProcessingPipeline {
Log.infof("Lokales Entpack-Verzeichnis gelöscht: %s", context.localExtractDir); Log.infof("Lokales Entpack-Verzeichnis gelöscht: %s", context.localExtractDir);
} }
} catch (IOException e) { } catch (IOException e) {
Log.warnf(e, "Cleanup für Lauf %s fehlgeschlagen — lokale Dateien verbleiben ggf. in %s", Log.warnf(e, "Cleanup für '%s' fehlgeschlagen — lokale Dateien verbleiben ggf. in %s",
context.runId, context.zipFilename,
context.localZipPath != null ? context.localZipPath.getParent() : "unbekannt"); context.localZipPath != null ? context.localZipPath.getParent() : "unbekannt");
} }
} }