package de.galabau.dateieingang.pipeline; import de.galabau.dateieingang.config.SftpConfig; import de.galabau.dateieingang.exception.OciException; import de.galabau.dateieingang.exception.OrdsException; import de.galabau.dateieingang.exception.SftpException; import de.galabau.dateieingang.exception.ZipException; import de.galabau.dateieingang.model.ProcessingContext; import de.galabau.dateieingang.model.ProcessingStatus; import de.galabau.dateieingang.oci.OciUploadService; import de.galabau.dateieingang.ords.OrdsNotificationService; import de.galabau.dateieingang.sftp.SftpService; import de.galabau.dateieingang.zip.ZipExtractionService; import io.quarkus.logging.Log; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import org.eclipse.microprofile.context.ManagedExecutor; import org.slf4j.MDC; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; import java.time.LocalDateTime; import java.util.Comparator; import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; /** * Orchestriert den gesamten Dateieingang-Workflow: SFTP → ZIP → OCI → ORDS. * Läuft asynchron im Hintergrund, gibt dem Aufrufer sofort zurück. */ @ApplicationScoped public class FileProcessingPipeline { @Inject SftpService sftpService; @Inject ZipExtractionService zipExtractionService; @Inject OciUploadService ociUploadService; @Inject OrdsNotificationService ordsNotificationService; @Inject SftpConfig sftpConfig; @Inject ManagedExecutor executor; private final AtomicBoolean isRunning = new AtomicBoolean(false); /** * Startet die Pipeline asynchron im Hintergrund (fire & forget). * Gibt {@code false} zurück wenn bereits ein Lauf aktiv ist. * * @return {@code true} wenn die Pipeline gestartet wurde, {@code false} bei aktivem Lauf */ public boolean tryProcessAllAsync() { if (!isRunning.compareAndSet(false, true)) { Log.warn("Pipeline läuft bereits — neuer Trigger abgewiesen"); return false; } executor.submit(() -> { try { processAll(); } finally { isRunning.set(false); } }); return true; } private void processAll() { Log.info("Pipeline-Lauf gestartet"); preProcessingCleanup(); List zipFiles; try { zipFiles = sftpService.listZipFiles(); } catch (SftpException e) { Log.errorf(e, "SFTP-Listing fehlgeschlagen — Pipeline-Lauf abgebrochen"); return; } if (zipFiles.isEmpty()) { Log.info("Keine neuen ZIP-Dateien auf dem SFTP-Server gefunden"); return; } Log.infof("%d neue ZIP-Datei(en) auf dem SFTP-Server gefunden", zipFiles.size()); for (String zipFilename : zipFiles) { Log.infof("Datei gefunden: %s", zipFilename); processZip(zipFilename); } Log.info("Pipeline-Lauf abgeschlossen"); } private void processZip(String zipFilename) { ProcessingContext context = new ProcessingContext(UUID.randomUUID(), zipFilename); MDC.put("runId", context.runId.toString()); Log.infof("Starte Verarbeitung von '%s' [runId=%s]", zipFilename, context.runId); try { // --- Download --- MDC.put("step", "sftp-download"); context.localZipPath = sftpService.download(zipFilename); Log.infof("ZIP '%s' heruntergeladen (%d Bytes)", zipFilename, Files.size(context.localZipPath)); // --- Entpacken --- MDC.put("step", "zip-extract"); zipExtractionService.extract(context); Log.infof("ZIP '%s' entpackt: %d Datei(en)", zipFilename, context.extractedFiles.size()); // --- OCI Upload --- MDC.put("step", "oci-upload"); context.status = ProcessingStatus.PARTIALLY_UPLOADED; ociUploadService.upload(context); context.status = ProcessingStatus.MARKER_UPLOADED; // --- SFTP Rename → .processed --- MDC.put("step", "sftp-rename"); sftpService.renameFile(zipFilename, zipFilename + ".processed"); // --- ORDS Notify --- MDC.put("step", "ords-notify"); ordsNotificationService.triggerDbProcessing(context); context.status = ProcessingStatus.ORDS_NOTIFIED; Log.infof("Verarbeitung erfolgreich abgeschlossen: '%s'", zipFilename); } catch (SftpException | ZipException | OciException | OrdsException e) { Log.errorf(e, "Verarbeitung von '%s' fehlgeschlagen: %s", zipFilename, e.getMessage()); context.status = ProcessingStatus.FAILED; tryRenameToError(zipFilename); } catch (IOException e) { Log.errorf(e, "I/O-Fehler bei der Verarbeitung von '%s'", zipFilename); context.status = ProcessingStatus.FAILED; tryRenameToError(zipFilename); } finally { postProcessingCleanup(context); long durationSeconds = Duration.between(context.startTime, LocalDateTime.now()).toSeconds(); Log.infof("Lauf %s abgeschlossen — Status: %s, Dauer: %ds", context.runId, context.status, durationSeconds); MDC.clear(); } } private void tryRenameToError(String zipFilename) { try { MDC.put("step", "sftp-rename"); sftpService.renameFile(zipFilename, zipFilename + ".error"); } catch (SftpException e) { Log.warnf(e, "Umbenennen zu .error fehlgeschlagen für '%s' — Datei bleibt auf SFTP zur manuellen Prüfung", zipFilename); } } /** * Bereinigt verwaiste lokale Arbeitsdateien aus fehlgeschlagenen Vorläufen. * *

Wird einmal pro Pipeline-Lauf vor dem SFTP-Listing aufgerufen. * Notwendig weil {@link #postProcessingCleanup} zwar im {@code finally}-Block läuft, * aber bei I/O-Fehlern selbst fehlschlagen kann — in diesem Fall bleiben ZIP-Dateien * und Entpack-Verzeichnisse im Arbeitsverzeichnis zurück. Ohne diesen Schritt würden * sich diese Reste akkumulieren und das Arbeitsverzeichnis über Zeit vollschreiben. * *

Ein Zeitstempel-Schwellwert ist nicht nötig: der {@code AtomicBoolean}-Guard in * {@link #tryProcessAllAsync} stellt sicher dass nie zwei Läufe gleichzeitig aktiv sind. * Alles was beim Start eines Laufs im Arbeitsverzeichnis liegt, ist daher garantiert * ein Überrest eines abgeschlossenen oder abgebrochenen Vorlaufs. */ private void preProcessingCleanup() { MDC.put("step", "pre-cleanup"); Path workDir = Path.of(sftpConfig.localWorkDir()); if (!Files.exists(workDir)) { return; } try (Stream entries = Files.list(workDir)) { entries.forEach(path -> { try { if (Files.isDirectory(path)) { deleteLocalDirectory(path); Log.warnf("Verwaistes Entpack-Verzeichnis gelöscht: %s", path); } else { Files.delete(path); Log.warnf("Verwaiste Datei gelöscht: %s", path); } } catch (IOException e) { Log.warnf(e, "Pre-Cleanup fehlgeschlagen für: %s", path); } }); } catch (IOException e) { Log.warnf(e, "Pre-Cleanup: Arbeitsverzeichnis konnte nicht gelesen werden: %s", workDir); } } /** * Bereinigt die lokalen Arbeitsdateien eines abgeschlossenen Laufs (ZIP + Entpack-Verzeichnis). * *

Wird im {@code finally}-Block von {@link #processZip} aufgerufen, also sowohl nach * erfolgreichem Abschluss als auch nach Fehlern. Schlägt dieser Cleanup bei I/O-Problemen * fehl, verbleiben die Dateien im Arbeitsverzeichnis — sie werden dann beim nächsten * Pipeline-Lauf durch {@link #preProcessingCleanup} entfernt. * * @param context enthält die Pfade der zu löschenden lokalen ZIP und des Entpack-Verzeichnisses */ private void postProcessingCleanup(ProcessingContext context) { MDC.put("step", "post-cleanup"); Log.infof("Cleanup gestartet für Lauf %s", context.runId); try { if (context.localZipPath != null) { Files.deleteIfExists(context.localZipPath); Log.infof("Lokale ZIP gelöscht: %s", context.localZipPath); } if (context.localExtractDir != null) { deleteLocalDirectory(context.localExtractDir); Log.infof("Lokales Entpack-Verzeichnis gelöscht: %s", context.localExtractDir); } } catch (IOException e) { Log.warnf(e, "Cleanup für Lauf %s fehlgeschlagen — lokale Dateien verbleiben ggf. in %s", context.runId, context.localZipPath != null ? context.localZipPath.getParent() : "unbekannt"); } } private void deleteLocalDirectory(Path dir) throws IOException { if (!Files.exists(dir)) { return; } try (Stream walk = Files.walk(dir)) { walk.sorted(Comparator.reverseOrder()).forEach(path -> { try { Files.delete(path); } catch (IOException e) { Log.warnf(e, "Konnte nicht löschen: %s", path); } }); } } }