2026-04-08 15:20:21 +02:00
package de.galabau.dateieingang.pipeline ;
2026-04-22 09:45:24 +02:00
import de.galabau.dateieingang.config.SftpConfig ;
2026-04-08 15:20:21 +02:00
import de.galabau.dateieingang.exception.OciException ;
import de.galabau.dateieingang.exception.OrdsException ;
import de.galabau.dateieingang.exception.SftpException ;
import de.galabau.dateieingang.exception.ZipException ;
import de.galabau.dateieingang.model.ProcessingContext ;
import de.galabau.dateieingang.model.ProcessingStatus ;
import de.galabau.dateieingang.oci.OciUploadService ;
import de.galabau.dateieingang.ords.OrdsNotificationService ;
import de.galabau.dateieingang.sftp.SftpService ;
import de.galabau.dateieingang.zip.ZipExtractionService ;
import io.quarkus.logging.Log ;
import jakarta.enterprise.context.ApplicationScoped ;
import jakarta.inject.Inject ;
import org.eclipse.microprofile.context.ManagedExecutor ;
import org.slf4j.MDC ;
import java.io.IOException ;
import java.nio.file.Files ;
import java.nio.file.Path ;
2026-04-22 09:45:24 +02:00
import java.time.Duration ;
import java.time.LocalDateTime ;
2026-04-08 15:20:21 +02:00
import java.util.Comparator ;
import java.util.List ;
import java.util.UUID ;
import java.util.concurrent.atomic.AtomicBoolean ;
import java.util.stream.Stream ;
/ * *
* Orchestriert den gesamten Dateieingang - Workflow : SFTP → ZIP → OCI → ORDS .
* Läuft asynchron im Hintergrund , gibt dem Aufrufer sofort zurück .
* /
@ApplicationScoped
public class FileProcessingPipeline {
@Inject
SftpService sftpService ;
@Inject
ZipExtractionService zipExtractionService ;
@Inject
OciUploadService ociUploadService ;
@Inject
OrdsNotificationService ordsNotificationService ;
2026-04-22 09:45:24 +02:00
@Inject
SftpConfig sftpConfig ;
2026-04-08 15:20:21 +02:00
@Inject
ManagedExecutor executor ;
private final AtomicBoolean isRunning = new AtomicBoolean ( false ) ;
/ * *
* Startet die Pipeline asynchron im Hintergrund ( fire & forget ) .
* Gibt { @code false } zurück wenn bereits ein Lauf aktiv ist .
*
* @return { @code true } wenn die Pipeline gestartet wurde , { @code false } bei aktivem Lauf
* /
public boolean tryProcessAllAsync ( ) {
if ( ! isRunning . compareAndSet ( false , true ) ) {
Log . warn ( " Pipeline läuft bereits — neuer Trigger abgewiesen " ) ;
return false ;
}
executor . submit ( ( ) - > {
try {
processAll ( ) ;
2026-04-22 15:25:24 +02:00
} catch ( Throwable e ) { // nicht exception catchen, weil Error in OCI SDK auftreten können, die Throwable aber nicht Excption sind. Die würden sonst nicht geloggt
2026-04-22 14:04:46 +02:00
Log . errorf ( e , " Unerwarteter Fehler im Pipeline-Lauf " ) ;
2026-04-08 15:20:21 +02:00
} finally {
isRunning . set ( false ) ;
}
} ) ;
return true ;
}
2026-04-22 09:45:24 +02:00
private void processAll ( ) {
2026-05-04 08:41:03 +02:00
UUID pipelineRunId = UUID . randomUUID ( ) ;
MDC . put ( " pipelineRunId " , pipelineRunId . toString ( ) ) ;
Log . infof ( " Pipeline-Lauf gestartet [pipelineRunId=%s] " , pipelineRunId ) ;
2026-04-08 15:20:21 +02:00
try {
2026-05-04 08:41:03 +02:00
preProcessingCleanup ( ) ;
2026-04-08 15:20:21 +02:00
2026-05-04 08:41:03 +02:00
List < String > zipFiles ;
try {
zipFiles = sftpService . listZipFiles ( ) ;
} catch ( SftpException e ) {
Log . errorf ( e , " SFTP-Listing fehlgeschlagen — Pipeline-Lauf abgebrochen " ) ;
return ;
}
2026-04-08 15:20:21 +02:00
2026-05-04 08:41:03 +02:00
if ( zipFiles . isEmpty ( ) ) {
Log . info ( " Keine neuen ZIP-Dateien auf dem SFTP-Server gefunden " ) ;
return ;
}
2026-04-08 15:20:21 +02:00
2026-05-04 08:41:03 +02:00
Log . infof ( " %d neue ZIP-Datei(en) auf dem SFTP-Server gefunden " , zipFiles . size ( ) ) ;
for ( String zipFilename : zipFiles ) {
Log . infof ( " ZIP-Datei gefunden: %s " , zipFilename ) ;
processZip ( zipFilename ) ;
}
2026-04-08 15:20:21 +02:00
2026-05-04 08:41:03 +02:00
MDC . put ( " step " , " ords-notify " ) ;
try {
ordsNotificationService . triggerDbProcessing ( ) ;
} catch ( OrdsException e ) {
Log . errorf ( e , " ORDS-Benachrichtigung fehlgeschlagen — DB-Verarbeitung wird beim nächsten Lauf ausgelöst " ) ;
}
} finally {
Log . infof ( " Pipeline-Lauf abgeschlossen [pipelineRunId=%s] " , pipelineRunId ) ;
MDC . clear ( ) ;
}
2026-04-08 15:20:21 +02:00
}
private void processZip ( String zipFilename ) {
ProcessingContext context = new ProcessingContext ( UUID . randomUUID ( ) , zipFilename ) ;
2026-05-04 08:41:03 +02:00
MDC . put ( " fileId " , context . fileId . toString ( ) ) ;
Log . infof ( " Starte Verarbeitung von '%s' [fileId=%s] " , zipFilename , context . fileId ) ;
2026-04-08 15:20:21 +02:00
try {
// --- Download ---
MDC . put ( " step " , " sftp-download " ) ;
context . localZipPath = sftpService . download ( zipFilename ) ;
Log . infof ( " ZIP '%s' heruntergeladen (%d Bytes) " , zipFilename ,
Files . size ( context . localZipPath ) ) ;
// --- Entpacken ---
MDC . put ( " step " , " zip-extract " ) ;
zipExtractionService . extract ( context ) ;
Log . infof ( " ZIP '%s' entpackt: %d Datei(en) " , zipFilename ,
context . extractedFiles . size ( ) ) ;
2026-04-22 14:04:46 +02:00
// --- OCI Upload (Dateien, noch kein Marker) ---
2026-04-08 15:20:21 +02:00
MDC . put ( " step " , " oci-upload " ) ;
2026-04-22 09:45:24 +02:00
context . status = ProcessingStatus . PARTIALLY_UPLOADED ;
2026-04-22 13:07:01 +02:00
Log . info ( " Starte OCI-Upload " ) ;
2026-04-22 14:04:46 +02:00
ociUploadService . uploadFiles ( context ) ;
2026-04-08 15:20:21 +02:00
2026-04-30 16:12:46 +02:00
// --- SFTP Delete ---
2026-04-22 14:04:46 +02:00
// Erst nach erfolgreichem Datei-Upload — Marker kommt danach,
2026-04-30 16:12:46 +02:00
// damit Marker-Präsenz in OCI ↔ ZIP bereits vom SFTP gelöscht ist.
MDC . put ( " step " , " sftp-delete " ) ;
sftpService . deleteFile ( zipFilename ) ;
2026-04-08 15:20:21 +02:00
2026-04-22 14:04:46 +02:00
// --- OCI Marker ---
// Signalisiert der DB-Verarbeitung, dass der Batch vollständig hochgeladen ist.
MDC . put ( " step " , " oci-marker " ) ;
ociUploadService . uploadMarker ( context ) ;
context . status = ProcessingStatus . MARKER_UPLOADED ;
2026-04-22 09:45:24 +02:00
Log . infof ( " Verarbeitung erfolgreich abgeschlossen: '%s' " , zipFilename ) ;
2026-04-08 15:20:21 +02:00
2026-04-22 14:04:46 +02:00
} catch ( ZipException e ) {
Log . errorf ( e , " Ungültige ZIP-Datei '%s' — wird zu .error umbenannt " , zipFilename ) ;
2026-04-08 15:20:21 +02:00
context . status = ProcessingStatus . FAILED ;
tryRenameToError ( zipFilename ) ;
2026-05-04 08:41:03 +02:00
} catch ( SftpException | OciException e ) {
2026-04-22 14:04:46 +02:00
Log . errorf ( e , " Verarbeitung von '%s' fehlgeschlagen (Infrastruktur): %s " , zipFilename , e . getMessage ( ) ) ;
context . status = ProcessingStatus . FAILED ;
2026-04-08 15:20:21 +02:00
} catch ( IOException e ) {
Log . errorf ( e , " I/O-Fehler bei der Verarbeitung von '%s' " , zipFilename ) ;
context . status = ProcessingStatus . FAILED ;
2026-04-22 14:04:46 +02:00
} catch ( RuntimeException e ) {
Log . errorf ( e , " Unerwarteter Laufzeitfehler bei der Verarbeitung von '%s' " , zipFilename ) ;
context . status = ProcessingStatus . FAILED ;
2026-04-08 15:20:21 +02:00
} finally {
2026-04-22 09:45:24 +02:00
postProcessingCleanup ( context ) ;
2026-04-22 16:20:05 +02:00
long duration = Duration . between ( context . startTime , LocalDateTime . now ( ) ) . toMillis ( ) ;
2026-05-04 08:41:03 +02:00
Log . infof ( " Datei %s abgeschlossen — Status: %s, Dauer: %dms [fileId=%s] " ,
zipFilename , context . status , duration , context . fileId ) ;
2026-04-22 16:20:05 +02:00
Log . info ( " ----------------------------------------------------------------------------------------------------- " ) ;
2026-05-04 08:41:03 +02:00
MDC . remove ( " fileId " ) ;
MDC . remove ( " step " ) ;
2026-04-08 15:20:21 +02:00
}
}
private void tryRenameToError ( String zipFilename ) {
try {
MDC . put ( " step " , " sftp-rename " ) ;
2026-04-22 09:45:24 +02:00
sftpService . renameFile ( zipFilename , zipFilename + " .error " ) ;
2026-04-08 15:20:21 +02:00
} catch ( SftpException e ) {
Log . warnf ( e , " Umbenennen zu .error fehlgeschlagen für '%s' — Datei bleibt auf SFTP zur manuellen Prüfung " ,
zipFilename ) ;
}
}
2026-04-22 09:45:24 +02:00
/ * *
* Bereinigt verwaiste lokale Arbeitsdateien aus fehlgeschlagenen Vorläufen .
*
* < p > Wird einmal pro Pipeline - Lauf < em > vor < / em > dem SFTP - Listing aufgerufen .
* Notwendig weil { @link # postProcessingCleanup } zwar im { @code finally } - Block läuft ,
* aber bei I / O - Fehlern selbst fehlschlagen kann — in diesem Fall bleiben ZIP - Dateien
* und Entpack - Verzeichnisse im Arbeitsverzeichnis zurück . Ohne diesen Schritt würden
* sich diese Reste akkumulieren und das Arbeitsverzeichnis über Zeit vollschreiben .
*
* < p > Ein Zeitstempel - Schwellwert ist nicht nötig : der { @code AtomicBoolean } - Guard in
* { @link # tryProcessAllAsync } stellt sicher dass nie zwei Läufe gleichzeitig aktiv sind .
* Alles was beim Start eines Laufs im Arbeitsverzeichnis liegt , ist daher garantiert
* ein Überrest eines abgeschlossenen oder abgebrochenen Vorlaufs .
* /
private void preProcessingCleanup ( ) {
MDC . put ( " step " , " pre-cleanup " ) ;
Path workDir = Path . of ( sftpConfig . localWorkDir ( ) ) ;
if ( ! Files . exists ( workDir ) ) {
return ;
}
try ( Stream < Path > entries = Files . list ( workDir ) ) {
entries . forEach ( path - > {
try {
if ( Files . isDirectory ( path ) ) {
deleteLocalDirectory ( path ) ;
Log . warnf ( " Verwaistes Entpack-Verzeichnis gelöscht: %s " , path ) ;
} else {
Files . delete ( path ) ;
Log . warnf ( " Verwaiste Datei gelöscht: %s " , path ) ;
}
} catch ( IOException e ) {
Log . warnf ( e , " Pre-Cleanup fehlgeschlagen für: %s " , path ) ;
}
} ) ;
} catch ( IOException e ) {
Log . warnf ( e , " Pre-Cleanup: Arbeitsverzeichnis konnte nicht gelesen werden: %s " , workDir ) ;
}
}
/ * *
* Bereinigt die lokalen Arbeitsdateien eines abgeschlossenen Laufs ( ZIP + Entpack - Verzeichnis ) .
*
* < p > Wird im { @code finally } - Block von { @link # processZip } aufgerufen , also sowohl nach
* erfolgreichem Abschluss als auch nach Fehlern . Schlägt dieser Cleanup bei I / O - Problemen
* fehl , verbleiben die Dateien im Arbeitsverzeichnis — sie werden dann beim nächsten
* Pipeline - Lauf durch { @link # preProcessingCleanup } entfernt .
*
* @param context enthält die Pfade der zu löschenden lokalen ZIP und des Entpack - Verzeichnisses
* /
private void postProcessingCleanup ( ProcessingContext context ) {
MDC . put ( " step " , " post-cleanup " ) ;
2026-05-04 08:41:03 +02:00
Log . infof ( " Cleanup gestartet für Datei '%s' " , context . zipFilename ) ;
2026-04-08 15:20:21 +02:00
try {
if ( context . localZipPath ! = null ) {
Files . deleteIfExists ( context . localZipPath ) ;
2026-04-22 09:45:24 +02:00
Log . infof ( " Lokale ZIP gelöscht: %s " , context . localZipPath ) ;
2026-04-08 15:20:21 +02:00
}
if ( context . localExtractDir ! = null ) {
deleteLocalDirectory ( context . localExtractDir ) ;
2026-04-22 09:45:24 +02:00
Log . infof ( " Lokales Entpack-Verzeichnis gelöscht: %s " , context . localExtractDir ) ;
2026-04-08 15:20:21 +02:00
}
} catch ( IOException e ) {
2026-05-04 08:41:03 +02:00
Log . warnf ( e , " Cleanup für '%s' fehlgeschlagen — lokale Dateien verbleiben ggf. in %s " ,
context . zipFilename ,
2026-04-08 15:20:21 +02:00
context . localZipPath ! = null ? context . localZipPath . getParent ( ) : " unbekannt " ) ;
}
}
private void deleteLocalDirectory ( Path dir ) throws IOException {
if ( ! Files . exists ( dir ) ) {
return ;
}
try ( Stream < Path > walk = Files . walk ( dir ) ) {
walk . sorted ( Comparator . reverseOrder ( ) ) . forEach ( path - > {
try {
Files . delete ( path ) ;
} catch ( IOException e ) {
Log . warnf ( e , " Konnte nicht löschen: %s " , path ) ;
}
} ) ;
}
}
}