package de.galabau.dateieingang.pipeline; import de.galabau.dateieingang.exception.OciException; import de.galabau.dateieingang.exception.OrdsException; import de.galabau.dateieingang.exception.SftpException; import de.galabau.dateieingang.exception.ZipException; import de.galabau.dateieingang.model.ProcessingContext; import de.galabau.dateieingang.model.ProcessingStatus; import de.galabau.dateieingang.oci.OciUploadService; import de.galabau.dateieingang.ords.OrdsNotificationService; import de.galabau.dateieingang.sftp.SftpService; import de.galabau.dateieingang.zip.ZipExtractionService; import io.quarkus.logging.Log; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import org.eclipse.microprofile.context.ManagedExecutor; import org.slf4j.MDC; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.Comparator; import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; /** * Orchestriert den gesamten Dateieingang-Workflow: SFTP → ZIP → OCI → ORDS. * Läuft asynchron im Hintergrund, gibt dem Aufrufer sofort zurück. */ @ApplicationScoped public class FileProcessingPipeline { @Inject SftpService sftpService; @Inject ZipExtractionService zipExtractionService; @Inject OciUploadService ociUploadService; @Inject OrdsNotificationService ordsNotificationService; @Inject ManagedExecutor executor; private final AtomicBoolean isRunning = new AtomicBoolean(false); /** * Startet die Pipeline asynchron im Hintergrund (fire & forget). * Gibt {@code false} zurück wenn bereits ein Lauf aktiv ist. * * @return {@code true} wenn die Pipeline gestartet wurde, {@code false} bei aktivem Lauf */ public boolean tryProcessAllAsync() { if (!isRunning.compareAndSet(false, true)) { Log.warn("Pipeline läuft bereits — neuer Trigger abgewiesen"); return false; } executor.submit(() -> { try { processAll(); } finally { isRunning.set(false); } }); return true; } void processAll() { Log.info("Pipeline-Lauf gestartet"); List zipFiles; try { zipFiles = sftpService.listZipFiles(); } catch (SftpException e) { Log.errorf(e, "SFTP-Listing fehlgeschlagen — Pipeline-Lauf abgebrochen"); return; } if (zipFiles.isEmpty()) { Log.info("Keine neuen ZIP-Dateien auf dem SFTP-Server gefunden"); return; } Log.infof("%d neue ZIP-Datei(en) auf dem SFTP-Server gefunden", zipFiles.size()); for (String zipFilename : zipFiles) { Log.infof("Datei gefunden: %s", zipFilename); processZip(zipFilename); } Log.info("Pipeline-Lauf abgeschlossen"); } private void processZip(String zipFilename) { ProcessingContext context = new ProcessingContext(UUID.randomUUID(), zipFilename); MDC.put("runId", context.runId.toString()); try { // --- Download --- MDC.put("step", "sftp-download"); context.localZipPath = sftpService.download(zipFilename); Log.infof("ZIP '%s' heruntergeladen (%d Bytes)", zipFilename, Files.size(context.localZipPath)); // --- Entpacken --- MDC.put("step", "zip-extract"); zipExtractionService.extract(context); Log.infof("ZIP '%s' entpackt: %d Datei(en)", zipFilename, context.extractedFiles.size()); // --- OCI Upload (Stub) --- MDC.put("step", "oci-upload"); ociUploadService.upload(context); // --- SFTP Rename → .processed --- MDC.put("step", "sftp-rename"); sftpService.renameRemote(zipFilename, zipFilename + ".processed"); Log.infof("SFTP Rename: '%s' → '%s.processed'", zipFilename, zipFilename); // --- ORDS Notify (Stub) --- MDC.put("step", "ords-notify"); ordsNotificationService.notify(context); context.status = ProcessingStatus.ORDS_NOTIFIED; } catch (SftpException | ZipException | OciException | OrdsException e) { Log.errorf(e, "Verarbeitung von '%s' fehlgeschlagen: %s", zipFilename, e.getMessage()); context.status = ProcessingStatus.FAILED; tryRenameToError(zipFilename); } catch (IOException e) { Log.errorf(e, "I/O-Fehler bei der Verarbeitung von '%s'", zipFilename); context.status = ProcessingStatus.FAILED; tryRenameToError(zipFilename); } finally { cleanup(context); MDC.clear(); } } private void tryRenameToError(String zipFilename) { try { MDC.put("step", "sftp-rename"); sftpService.renameRemote(zipFilename, zipFilename + ".error"); Log.infof("SFTP Rename: '%s' → '%s.error'", zipFilename, zipFilename); } catch (SftpException e) { Log.warnf(e, "Umbenennen zu .error fehlgeschlagen für '%s' — Datei bleibt auf SFTP zur manuellen Prüfung", zipFilename); } } private void cleanup(ProcessingContext context) { MDC.put("step", "cleanup"); try { if (context.localZipPath != null) { Files.deleteIfExists(context.localZipPath); Log.debugf("Lokale ZIP gelöscht: %s", context.localZipPath); } if (context.localExtractDir != null) { deleteLocalDirectory(context.localExtractDir); Log.debugf("Lokales Entpack-Verzeichnis gelöscht: %s", context.localExtractDir); } } catch (IOException e) { Log.warnf(e, "Cleanup für Lauf %s fehlgeschlagen — lokale Dateien verbleiben ggf. in %s", context.runId, context.localZipPath != null ? context.localZipPath.getParent() : "unbekannt"); } } private void deleteLocalDirectory(Path dir) throws IOException { if (!Files.exists(dir)) { return; } try (Stream walk = Files.walk(dir)) { walk.sorted(Comparator.reverseOrder()).forEach(path -> { try { Files.delete(path); } catch (IOException e) { Log.warnf(e, "Konnte nicht löschen: %s", path); } }); } } }