Files
gala-ki-spielwiese/quarkus-automaton/src/main/java/de/galabau/dateieingang/pipeline/FileProcessingPipeline.java

280 lines
11 KiB
Java
Raw Normal View History

package de.galabau.dateieingang.pipeline;
2026-04-22 09:45:24 +02:00
import de.galabau.dateieingang.config.SftpConfig;
import de.galabau.dateieingang.exception.OciException;
import de.galabau.dateieingang.exception.OrdsException;
import de.galabau.dateieingang.exception.SftpException;
import de.galabau.dateieingang.exception.ZipException;
import de.galabau.dateieingang.model.ProcessingContext;
import de.galabau.dateieingang.model.ProcessingStatus;
import de.galabau.dateieingang.oci.OciUploadService;
import de.galabau.dateieingang.ords.OrdsNotificationService;
import de.galabau.dateieingang.sftp.SftpService;
import de.galabau.dateieingang.zip.ZipExtractionService;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.context.ManagedExecutor;
import org.slf4j.MDC;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
2026-04-22 09:45:24 +02:00
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Comparator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
/**
* Orchestriert den gesamten Dateieingang-Workflow: SFTP ZIP OCI ORDS.
* Läuft asynchron im Hintergrund, gibt dem Aufrufer sofort zurück.
*/
@ApplicationScoped
public class FileProcessingPipeline {
@Inject
SftpService sftpService;
@Inject
ZipExtractionService zipExtractionService;
@Inject
OciUploadService ociUploadService;
@Inject
OrdsNotificationService ordsNotificationService;
2026-04-22 09:45:24 +02:00
@Inject
SftpConfig sftpConfig;
@Inject
ManagedExecutor executor;
private final AtomicBoolean isRunning = new AtomicBoolean(false);
/**
* Startet die Pipeline asynchron im Hintergrund (fire & forget).
* Gibt {@code false} zurück wenn bereits ein Lauf aktiv ist.
*
* @return {@code true} wenn die Pipeline gestartet wurde, {@code false} bei aktivem Lauf
*/
public boolean tryProcessAllAsync() {
if (!isRunning.compareAndSet(false, true)) {
Log.warn("Pipeline läuft bereits — neuer Trigger abgewiesen");
return false;
}
executor.submit(() -> {
try {
processAll();
2026-04-22 15:25:24 +02:00
} catch (Throwable e) { // nicht exception catchen, weil Error in OCI SDK auftreten können, die Throwable aber nicht Excption sind. Die würden sonst nicht geloggt
Log.errorf(e, "Unerwarteter Fehler im Pipeline-Lauf");
} finally {
isRunning.set(false);
}
});
return true;
}
2026-04-22 09:45:24 +02:00
private void processAll() {
UUID pipelineRunId = UUID.randomUUID();
MDC.put("pipelineRunId", pipelineRunId.toString());
Log.infof("Pipeline-Lauf gestartet [pipelineRunId=%s]", pipelineRunId);
try {
preProcessingCleanup();
List<String> zipFiles;
try {
zipFiles = sftpService.listZipFiles();
} catch (SftpException e) {
Log.errorf(e, "SFTP-Listing fehlgeschlagen — Pipeline-Lauf abgebrochen");
return;
}
if (zipFiles.isEmpty()) {
Log.info("Keine neuen ZIP-Dateien auf dem SFTP-Server gefunden");
return;
}
Log.infof("%d neue ZIP-Datei(en) auf dem SFTP-Server gefunden", zipFiles.size());
for (String zipFilename : zipFiles) {
Log.infof("ZIP-Datei gefunden: %s", zipFilename);
processZip(zipFilename);
}
MDC.put("step", "ords-notify");
try {
ordsNotificationService.triggerDbProcessing();
} catch (OrdsException e) {
Log.errorf(e, "ORDS-Benachrichtigung fehlgeschlagen — DB-Verarbeitung wird beim nächsten Lauf ausgelöst");
}
} finally {
Log.infof("Pipeline-Lauf abgeschlossen [pipelineRunId=%s]", pipelineRunId);
MDC.clear();
}
}
private void processZip(String zipFilename) {
ProcessingContext context = new ProcessingContext(UUID.randomUUID(), zipFilename);
MDC.put("fileId", context.fileId.toString());
Log.infof("Starte Verarbeitung von '%s' [fileId=%s]", zipFilename, context.fileId);
try {
// --- Download ---
MDC.put("step", "sftp-download");
context.localZipPath = sftpService.download(zipFilename);
Log.infof("ZIP '%s' heruntergeladen (%d Bytes)", zipFilename,
Files.size(context.localZipPath));
2026-05-13 16:17:03 +02:00
// --- OCI ZIP-Archiv ---
MDC.put("step", "oci-zip-archive");
Log.info("Starte ZIP-Upload in OCI");
ociUploadService.uploadZipFile(context);
// --- Entpacken ---
MDC.put("step", "zip-extract");
zipExtractionService.extract(context);
Log.infof("ZIP '%s' entpackt: %d Datei(en)", zipFilename,
context.extractedFiles.size());
// --- OCI Upload (Dateien, noch kein Marker) ---
MDC.put("step", "oci-upload");
2026-04-22 09:45:24 +02:00
context.status = ProcessingStatus.PARTIALLY_UPLOADED;
2026-04-22 13:07:01 +02:00
Log.info("Starte OCI-Upload");
ociUploadService.uploadFiles(context);
// --- SFTP Delete ---
// Erst nach erfolgreichem Datei-Upload — Marker kommt danach,
// damit Marker-Präsenz in OCI ↔ ZIP bereits vom SFTP gelöscht ist.
MDC.put("step", "sftp-delete");
sftpService.deleteFile(zipFilename);
// --- OCI Marker ---
// Signalisiert der DB-Verarbeitung, dass der Batch vollständig hochgeladen ist.
MDC.put("step", "oci-marker");
ociUploadService.uploadMarker(context);
context.status = ProcessingStatus.MARKER_UPLOADED;
2026-04-22 09:45:24 +02:00
Log.infof("Verarbeitung erfolgreich abgeschlossen: '%s'", zipFilename);
} catch (ZipException e) {
Log.errorf(e, "Ungültige ZIP-Datei '%s' — wird zu .error umbenannt", zipFilename);
context.status = ProcessingStatus.FAILED;
tryRenameToError(zipFilename);
} catch (SftpException | OciException e) {
Log.errorf(e, "Verarbeitung von '%s' fehlgeschlagen (Infrastruktur): %s", zipFilename, e.getMessage());
context.status = ProcessingStatus.FAILED;
} catch (IOException e) {
Log.errorf(e, "I/O-Fehler bei der Verarbeitung von '%s'", zipFilename);
context.status = ProcessingStatus.FAILED;
} catch (RuntimeException e) {
Log.errorf(e, "Unerwarteter Laufzeitfehler bei der Verarbeitung von '%s'", zipFilename);
context.status = ProcessingStatus.FAILED;
} finally {
2026-04-22 09:45:24 +02:00
postProcessingCleanup(context);
2026-04-22 16:20:05 +02:00
long duration = Duration.between(context.startTime, LocalDateTime.now()).toMillis();
Log.infof("Datei %s abgeschlossen — Status: %s, Dauer: %dms [fileId=%s]",
zipFilename, context.status, duration, context.fileId);
2026-04-22 16:20:05 +02:00
Log.info("-----------------------------------------------------------------------------------------------------");
MDC.remove("fileId");
MDC.remove("step");
}
}
private void tryRenameToError(String zipFilename) {
try {
MDC.put("step", "sftp-rename");
2026-04-22 09:45:24 +02:00
sftpService.renameFile(zipFilename, zipFilename + ".error");
} catch (SftpException e) {
Log.warnf(e, "Umbenennen zu .error fehlgeschlagen für '%s' — Datei bleibt auf SFTP zur manuellen Prüfung",
zipFilename);
}
}
2026-04-22 09:45:24 +02:00
/**
* Bereinigt verwaiste lokale Arbeitsdateien aus fehlgeschlagenen Vorläufen.
*
* <p>Wird einmal pro Pipeline-Lauf <em>vor</em> dem SFTP-Listing aufgerufen.
* Notwendig weil {@link #postProcessingCleanup} zwar im {@code finally}-Block läuft,
* aber bei I/O-Fehlern selbst fehlschlagen kann in diesem Fall bleiben ZIP-Dateien
* und Entpack-Verzeichnisse im Arbeitsverzeichnis zurück. Ohne diesen Schritt würden
* sich diese Reste akkumulieren und das Arbeitsverzeichnis über Zeit vollschreiben.
*
* <p>Ein Zeitstempel-Schwellwert ist nicht nötig: der {@code AtomicBoolean}-Guard in
* {@link #tryProcessAllAsync} stellt sicher dass nie zwei Läufe gleichzeitig aktiv sind.
* Alles was beim Start eines Laufs im Arbeitsverzeichnis liegt, ist daher garantiert
* ein Überrest eines abgeschlossenen oder abgebrochenen Vorlaufs.
*/
private void preProcessingCleanup() {
MDC.put("step", "pre-cleanup");
Path workDir = Path.of(sftpConfig.localWorkDir());
if (!Files.exists(workDir)) {
return;
}
try (Stream<Path> entries = Files.list(workDir)) {
entries.forEach(path -> {
try {
if (Files.isDirectory(path)) {
deleteLocalDirectory(path);
Log.warnf("Verwaistes Entpack-Verzeichnis gelöscht: %s", path);
} else {
Files.delete(path);
Log.warnf("Verwaiste Datei gelöscht: %s", path);
}
} catch (IOException e) {
Log.warnf(e, "Pre-Cleanup fehlgeschlagen für: %s", path);
}
});
} catch (IOException e) {
Log.warnf(e, "Pre-Cleanup: Arbeitsverzeichnis konnte nicht gelesen werden: %s", workDir);
}
}
/**
* Bereinigt die lokalen Arbeitsdateien eines abgeschlossenen Laufs (ZIP + Entpack-Verzeichnis).
*
* <p>Wird im {@code finally}-Block von {@link #processZip} aufgerufen, also sowohl nach
* erfolgreichem Abschluss als auch nach Fehlern. Schlägt dieser Cleanup bei I/O-Problemen
* fehl, verbleiben die Dateien im Arbeitsverzeichnis sie werden dann beim nächsten
* Pipeline-Lauf durch {@link #preProcessingCleanup} entfernt.
*
* @param context enthält die Pfade der zu löschenden lokalen ZIP und des Entpack-Verzeichnisses
*/
private void postProcessingCleanup(ProcessingContext context) {
MDC.put("step", "post-cleanup");
Log.infof("Cleanup gestartet für Datei '%s'", context.zipFilename);
try {
if (context.localZipPath != null) {
Files.deleteIfExists(context.localZipPath);
2026-04-22 09:45:24 +02:00
Log.infof("Lokale ZIP gelöscht: %s", context.localZipPath);
}
if (context.localExtractDir != null) {
deleteLocalDirectory(context.localExtractDir);
2026-04-22 09:45:24 +02:00
Log.infof("Lokales Entpack-Verzeichnis gelöscht: %s", context.localExtractDir);
}
} catch (IOException e) {
Log.warnf(e, "Cleanup für '%s' fehlgeschlagen — lokale Dateien verbleiben ggf. in %s",
context.zipFilename,
context.localZipPath != null ? context.localZipPath.getParent() : "unbekannt");
}
}
private void deleteLocalDirectory(Path dir) throws IOException {
if (!Files.exists(dir)) {
return;
}
try (Stream<Path> walk = Files.walk(dir)) {
walk.sorted(Comparator.reverseOrder()).forEach(path -> {
try {
Files.delete(path);
} catch (IOException e) {
Log.warnf(e, "Konnte nicht löschen: %s", path);
}
});
}
}
}