diff --git a/quarkus-automaton/src/main/java/de/galabau/dateieingang/api/FileProcessingResource.java b/quarkus-automaton/src/main/java/de/galabau/dateieingang/api/FileProcessingResource.java index f08855a..53cad99 100644 --- a/quarkus-automaton/src/main/java/de/galabau/dateieingang/api/FileProcessingResource.java +++ b/quarkus-automaton/src/main/java/de/galabau/dateieingang/api/FileProcessingResource.java @@ -43,6 +43,7 @@ public class FileProcessingResource { return Response.status(Response.Status.UNAUTHORIZED).build(); } + Log.info("API-Key valide, Pipeline-Trigger wird verarbeitet"); boolean started = pipeline.tryProcessAllAsync(); if (!started) { diff --git a/quarkus-automaton/src/main/java/de/galabau/dateieingang/config/OciConfig.java b/quarkus-automaton/src/main/java/de/galabau/dateieingang/config/OciConfig.java index 18b1519..0529d9a 100644 --- a/quarkus-automaton/src/main/java/de/galabau/dateieingang/config/OciConfig.java +++ b/quarkus-automaton/src/main/java/de/galabau/dateieingang/config/OciConfig.java @@ -42,4 +42,11 @@ public interface OciConfig { * Dev: relativer Pfad zum Projektverzeichnis (Default: {@code oci-private-key.pem}). */ String privateKeyPath(); + + /** + * Dateiname des DB-Processing-Markers, der nach dem Upload aller Nutzdateien in OCI abgelegt wird. + * Default: {@code _READY_FOR_DB_PROCESSING_}. + * Muss mit der APEX Automation und dem ORDS-Package abgestimmt sein. + */ + String markerFilenameDbProcessing(); } diff --git a/quarkus-automaton/src/main/java/de/galabau/dateieingang/oci/OciUploadService.java b/quarkus-automaton/src/main/java/de/galabau/dateieingang/oci/OciUploadService.java index 189f288..cf3f2b9 100644 --- a/quarkus-automaton/src/main/java/de/galabau/dateieingang/oci/OciUploadService.java +++ b/quarkus-automaton/src/main/java/de/galabau/dateieingang/oci/OciUploadService.java @@ -8,7 +8,6 @@ import de.galabau.dateieingang.config.OciConfig; import de.galabau.dateieingang.exception.OciException; import de.galabau.dateieingang.model.FileEntry; import de.galabau.dateieingang.model.ProcessingContext; -import de.galabau.dateieingang.model.ProcessingStatus; import io.quarkus.logging.Log; import jakarta.annotation.PostConstruct; import jakarta.enterprise.context.ApplicationScoped; @@ -50,6 +49,7 @@ public class OciUploadService { .build(); this.client = ObjectStorageClient.builder().build(auth); + Log.infof("OCI ObjectStorage-Client initialisiert (Region: %s, Bucket: %s)", config.region(), config.bucket()); } /** @@ -61,24 +61,24 @@ public class OciUploadService { * @throws OciException bei Verbindungs- oder Upload-Fehlern */ public void upload(ProcessingContext context) throws OciException { - context.status = ProcessingStatus.PARTIALLY_UPLOADED; - List files = context.extractedFiles.stream() .filter(e -> !e.isMarker) .toList(); + Log.infof("OCI-Upload gestartet: %d Datei(en) für '%s'", files.size(), context.zipNameWithoutExt); + for (FileEntry entry : files) { String key = buildKey(context.zipNameWithoutExt, entry.relativePath); entry.ociKey = key; putFile(key, context.localExtractDir.resolve(entry.relativePath), entry.fileSize); - Log.debugf("Datei hochgeladen: %s (%d Bytes)", key, entry.fileSize); + Log.infof("Datei hochgeladen: %s (%d Bytes)", key, entry.fileSize); } - String markerKey = buildKey(context.zipNameWithoutExt, "_READY_FOR_DB_PROCESSING_"); - putEmpty(markerKey); + String markerKey = buildKey(context.zipNameWithoutExt, config.markerFilenameDbProcessing()); + Log.infof("Lade Marker hoch: '%s'", markerKey); + uploadMarker(markerKey); context.markerUploaded = true; - context.status = ProcessingStatus.MARKER_UPLOADED; Log.infof("OCI-Upload abgeschlossen: %d Datei(en) + Marker in '%s'", files.size(), buildPrefix(context.zipNameWithoutExt)); } @@ -105,7 +105,7 @@ public class OciUploadService { } } - private void putEmpty(String key) throws OciException { + private void uploadMarker(String key) throws OciException { try (InputStream is = InputStream.nullInputStream()) { client.putObject(PutObjectRequest.builder() .namespaceName(config.namespace()) diff --git a/quarkus-automaton/src/main/java/de/galabau/dateieingang/ords/OrdsNotificationService.java b/quarkus-automaton/src/main/java/de/galabau/dateieingang/ords/OrdsNotificationService.java index dd30bcc..9240a8e 100644 --- a/quarkus-automaton/src/main/java/de/galabau/dateieingang/ords/OrdsNotificationService.java +++ b/quarkus-automaton/src/main/java/de/galabau/dateieingang/ords/OrdsNotificationService.java @@ -31,8 +31,9 @@ public class OrdsNotificationService { OrdsConfig config; /** - * Sendet eine Benachrichtigung an den ORDS-Endpunkt. - * Wird bei transienten Fehlern bis zu 3-mal wiederholt (1s Backoff). + * Löst die DB-Verarbeitung via ORDS aus ({@code pck_auto_import.p_process_incoming_ba_data}). + * Wird bei transienten Fehlern bis zu 3-mal wiederholt (1s Backoff, 10s Timeout je Versuch). + * Maximale Wartezeit: ca. 33 Sekunden (3 × 10s + 3 × 1s Backoff). * * @param context enthält {@code zipNameWithoutExt} für das Log * @throws OrdsException wenn der ORDS-Aufruf nach allen Retries fehlschlägt @@ -40,7 +41,8 @@ public class OrdsNotificationService { @Retry(maxRetries = 3, delay = 1000, delayUnit = ChronoUnit.MILLIS, retryOn = OrdsException.class) @Timeout(value = 10, unit = ChronoUnit.SECONDS) - public void notify(ProcessingContext context) throws OrdsException { + public void triggerDbProcessing(ProcessingContext context) throws OrdsException { + Log.infof("Rufe ORDS-Endpunkt auf für '%s'", context.zipNameWithoutExt); Response response; try { response = ordsClient.processIncomingBaData(config.apiKey()); diff --git a/quarkus-automaton/src/main/java/de/galabau/dateieingang/pipeline/FileProcessingPipeline.java b/quarkus-automaton/src/main/java/de/galabau/dateieingang/pipeline/FileProcessingPipeline.java index 6376f5f..20949de 100644 --- a/quarkus-automaton/src/main/java/de/galabau/dateieingang/pipeline/FileProcessingPipeline.java +++ b/quarkus-automaton/src/main/java/de/galabau/dateieingang/pipeline/FileProcessingPipeline.java @@ -1,5 +1,6 @@ package de.galabau.dateieingang.pipeline; +import de.galabau.dateieingang.config.SftpConfig; import de.galabau.dateieingang.exception.OciException; import de.galabau.dateieingang.exception.OrdsException; import de.galabau.dateieingang.exception.SftpException; @@ -19,6 +20,8 @@ import org.slf4j.MDC; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Duration; +import java.time.LocalDateTime; import java.util.Comparator; import java.util.List; import java.util.UUID; @@ -44,6 +47,9 @@ public class FileProcessingPipeline { @Inject OrdsNotificationService ordsNotificationService; + @Inject + SftpConfig sftpConfig; + @Inject ManagedExecutor executor; @@ -70,9 +76,11 @@ public class FileProcessingPipeline { return true; } - void processAll() { + private void processAll() { Log.info("Pipeline-Lauf gestartet"); + preProcessingCleanup(); + List zipFiles; try { zipFiles = sftpService.listZipFiles(); @@ -99,6 +107,7 @@ public class FileProcessingPipeline { private void processZip(String zipFilename) { ProcessingContext context = new ProcessingContext(UUID.randomUUID(), zipFilename); MDC.put("runId", context.runId.toString()); + Log.infof("Starte Verarbeitung von '%s' [runId=%s]", zipFilename, context.runId); try { // --- Download --- @@ -115,18 +124,20 @@ public class FileProcessingPipeline { // --- OCI Upload --- MDC.put("step", "oci-upload"); + context.status = ProcessingStatus.PARTIALLY_UPLOADED; ociUploadService.upload(context); + context.status = ProcessingStatus.MARKER_UPLOADED; // --- SFTP Rename → .processed --- MDC.put("step", "sftp-rename"); - sftpService.renameRemote(zipFilename, zipFilename + ".processed"); - Log.infof("SFTP Rename: '%s' → '%s.processed'", zipFilename, zipFilename); + sftpService.renameFile(zipFilename, zipFilename + ".processed"); // --- ORDS Notify --- MDC.put("step", "ords-notify"); - ordsNotificationService.notify(context); + ordsNotificationService.triggerDbProcessing(context); context.status = ProcessingStatus.ORDS_NOTIFIED; + Log.infof("Verarbeitung erfolgreich abgeschlossen: '%s'", zipFilename); } catch (SftpException | ZipException | OciException | OrdsException e) { Log.errorf(e, "Verarbeitung von '%s' fehlgeschlagen: %s", zipFilename, e.getMessage()); @@ -137,7 +148,10 @@ public class FileProcessingPipeline { context.status = ProcessingStatus.FAILED; tryRenameToError(zipFilename); } finally { - cleanup(context); + postProcessingCleanup(context); + long durationSeconds = Duration.between(context.startTime, LocalDateTime.now()).toSeconds(); + Log.infof("Lauf %s abgeschlossen — Status: %s, Dauer: %ds", + context.runId, context.status, durationSeconds); MDC.clear(); } } @@ -145,24 +159,73 @@ public class FileProcessingPipeline { private void tryRenameToError(String zipFilename) { try { MDC.put("step", "sftp-rename"); - sftpService.renameRemote(zipFilename, zipFilename + ".error"); - Log.infof("SFTP Rename: '%s' → '%s.error'", zipFilename, zipFilename); + sftpService.renameFile(zipFilename, zipFilename + ".error"); } catch (SftpException e) { Log.warnf(e, "Umbenennen zu .error fehlgeschlagen für '%s' — Datei bleibt auf SFTP zur manuellen Prüfung", zipFilename); } } - private void cleanup(ProcessingContext context) { - MDC.put("step", "cleanup"); + /** + * Bereinigt verwaiste lokale Arbeitsdateien aus fehlgeschlagenen Vorläufen. + * + *

Wird einmal pro Pipeline-Lauf vor dem SFTP-Listing aufgerufen. + * Notwendig weil {@link #postProcessingCleanup} zwar im {@code finally}-Block läuft, + * aber bei I/O-Fehlern selbst fehlschlagen kann — in diesem Fall bleiben ZIP-Dateien + * und Entpack-Verzeichnisse im Arbeitsverzeichnis zurück. Ohne diesen Schritt würden + * sich diese Reste akkumulieren und das Arbeitsverzeichnis über Zeit vollschreiben. + * + *

Ein Zeitstempel-Schwellwert ist nicht nötig: der {@code AtomicBoolean}-Guard in + * {@link #tryProcessAllAsync} stellt sicher dass nie zwei Läufe gleichzeitig aktiv sind. + * Alles was beim Start eines Laufs im Arbeitsverzeichnis liegt, ist daher garantiert + * ein Überrest eines abgeschlossenen oder abgebrochenen Vorlaufs. + */ + private void preProcessingCleanup() { + MDC.put("step", "pre-cleanup"); + Path workDir = Path.of(sftpConfig.localWorkDir()); + if (!Files.exists(workDir)) { + return; + } + try (Stream entries = Files.list(workDir)) { + entries.forEach(path -> { + try { + if (Files.isDirectory(path)) { + deleteLocalDirectory(path); + Log.warnf("Verwaistes Entpack-Verzeichnis gelöscht: %s", path); + } else { + Files.delete(path); + Log.warnf("Verwaiste Datei gelöscht: %s", path); + } + } catch (IOException e) { + Log.warnf(e, "Pre-Cleanup fehlgeschlagen für: %s", path); + } + }); + } catch (IOException e) { + Log.warnf(e, "Pre-Cleanup: Arbeitsverzeichnis konnte nicht gelesen werden: %s", workDir); + } + } + + /** + * Bereinigt die lokalen Arbeitsdateien eines abgeschlossenen Laufs (ZIP + Entpack-Verzeichnis). + * + *

Wird im {@code finally}-Block von {@link #processZip} aufgerufen, also sowohl nach + * erfolgreichem Abschluss als auch nach Fehlern. Schlägt dieser Cleanup bei I/O-Problemen + * fehl, verbleiben die Dateien im Arbeitsverzeichnis — sie werden dann beim nächsten + * Pipeline-Lauf durch {@link #preProcessingCleanup} entfernt. + * + * @param context enthält die Pfade der zu löschenden lokalen ZIP und des Entpack-Verzeichnisses + */ + private void postProcessingCleanup(ProcessingContext context) { + MDC.put("step", "post-cleanup"); + Log.infof("Cleanup gestartet für Lauf %s", context.runId); try { if (context.localZipPath != null) { Files.deleteIfExists(context.localZipPath); - Log.debugf("Lokale ZIP gelöscht: %s", context.localZipPath); + Log.infof("Lokale ZIP gelöscht: %s", context.localZipPath); } if (context.localExtractDir != null) { deleteLocalDirectory(context.localExtractDir); - Log.debugf("Lokales Entpack-Verzeichnis gelöscht: %s", context.localExtractDir); + Log.infof("Lokales Entpack-Verzeichnis gelöscht: %s", context.localExtractDir); } } catch (IOException e) { Log.warnf(e, "Cleanup für Lauf %s fehlgeschlagen — lokale Dateien verbleiben ggf. in %s", diff --git a/quarkus-automaton/src/main/java/de/galabau/dateieingang/sftp/SftpService.java b/quarkus-automaton/src/main/java/de/galabau/dateieingang/sftp/SftpService.java index 42db230..0bafbaf 100644 --- a/quarkus-automaton/src/main/java/de/galabau/dateieingang/sftp/SftpService.java +++ b/quarkus-automaton/src/main/java/de/galabau/dateieingang/sftp/SftpService.java @@ -9,7 +9,6 @@ import jakarta.inject.Inject; import net.schmizz.sshj.SSHClient; import net.schmizz.sshj.sftp.RemoteResourceInfo; import net.schmizz.sshj.sftp.SFTPClient; -import net.schmizz.sshj.transport.verification.PromiscuousVerifier; import java.io.IOException; import java.nio.file.Files; @@ -27,6 +26,7 @@ public class SftpService { void init() { try { Files.createDirectories(Path.of(config.localWorkDir())); + Log.infof("Lokales Arbeitsverzeichnis: %s", config.localWorkDir()); } catch (IOException e) { throw new RuntimeException("Lokales Arbeitsverzeichnis konnte nicht erstellt werden: " + config.localWorkDir(), e); @@ -45,8 +45,10 @@ public class SftpService { private T withSftp(SftpOperation operation) throws SftpException { try (SSHClient ssh = new SSHClient()) { configureHostKeyVerification(ssh); + Log.infof("Verbinde zu SFTP %s:%d", config.host(), config.port()); ssh.connect(config.host(), config.port()); authenticate(ssh); + Log.infof("SFTP-Verbindung hergestellt"); try (SFTPClient sftp = ssh.newSFTPClient()) { return operation.execute(sftp); } @@ -56,21 +58,20 @@ public class SftpService { } } - private void configureHostKeyVerification(SSHClient ssh) { + private void configureHostKeyVerification(SSHClient ssh) throws SftpException { if (config.hostKeyFingerprint().isPresent()) { ssh.addHostKeyVerifier(config.hostKeyFingerprint().get()); } else { - Log.warn("SFTP Host-Key-Fingerprint nicht konfiguriert"); - throw new IllegalStateException( - "SFTP Host-Key-Fingerprint muss konfiguriert sein!" - ); + throw new SftpException("SFTP Host-Key-Fingerprint nicht konfiguriert — Verbindung abgelehnt"); } } private void authenticate(SSHClient ssh) throws IOException { if (config.privateKeyPath().isPresent()) { + Log.infof("SFTP-Authentifizierung via Public-Key für Benutzer '%s'", config.username()); ssh.authPublickey(config.username(), config.privateKeyPath().get()); } else { + Log.infof("SFTP-Authentifizierung via Passwort für Benutzer '%s'", config.username()); ssh.authPassword(config.username(), config.password()); } } @@ -82,6 +83,7 @@ public class SftpService { * @throws SftpException bei Verbindungs- oder Lesefehler */ public List listZipFiles() throws SftpException { + Log.infof("Lese SFTP-Verzeichnis '%s'", config.remotePath()); return withSftp(sftp -> sftp.ls(config.remotePath()).stream() .filter(RemoteResourceInfo::isRegularFile) @@ -100,6 +102,7 @@ public class SftpService { */ public Path download(String filename) throws SftpException { Path localFile = Path.of(config.localWorkDir(), filename); + Log.infof("Starte Download: '%s'", filename); withSftp(sftp -> { sftp.get(config.remotePath() + "/" + filename, localFile.toString()); return null; @@ -115,7 +118,8 @@ public class SftpService { * @param newFilename neuer Dateiname, z.B. {@code export_2026-04-08.zip.processed} * @throws SftpException bei Verbindungs- oder Umbenennfehler */ - public void renameRemote(String filename, String newFilename) throws SftpException { + public void renameFile(String filename, String newFilename) throws SftpException { + Log.infof("SFTP Rename: '%s' → '%s'", filename, newFilename); withSftp(sftp -> { sftp.rename( config.remotePath() + "/" + filename, @@ -123,5 +127,6 @@ public class SftpService { ); return null; }); + Log.infof("SFTP Rename erfolgreich: '%s'", newFilename); } } diff --git a/quarkus-automaton/src/main/java/de/galabau/dateieingang/zip/ZipExtractionService.java b/quarkus-automaton/src/main/java/de/galabau/dateieingang/zip/ZipExtractionService.java index 3981c50..dc3f0e3 100644 --- a/quarkus-automaton/src/main/java/de/galabau/dateieingang/zip/ZipExtractionService.java +++ b/quarkus-automaton/src/main/java/de/galabau/dateieingang/zip/ZipExtractionService.java @@ -1,9 +1,12 @@ package de.galabau.dateieingang.zip; +import de.galabau.dateieingang.config.OciConfig; import de.galabau.dateieingang.exception.ZipException; import de.galabau.dateieingang.model.FileEntry; import de.galabau.dateieingang.model.ProcessingContext; +import io.quarkus.logging.Log; import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; import org.apache.commons.compress.archivers.zip.ZipArchiveEntry; import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream; @@ -19,6 +22,9 @@ import java.util.List; @ApplicationScoped public class ZipExtractionService { + @Inject + OciConfig ociConfig; + /** * Entpackt die ZIP-Datei aus {@code context.localZipPath} in ein gleichnamiges Unterverzeichnis. * Setzt {@code context.localExtractDir} und {@code context.extractedFiles}. @@ -35,6 +41,7 @@ public class ZipExtractionService { try { Files.createDirectories(extractDir); + Log.infof("Entpacke ZIP '%s'", context.zipFilename); try (ZipArchiveInputStream zis = new ZipArchiveInputStream( new BufferedInputStream(Files.newInputStream(context.localZipPath)))) { @@ -51,14 +58,20 @@ public class ZipExtractionService { Files.copy(zis, targetFile, StandardCopyOption.REPLACE_EXISTING); boolean isMarker = Path.of(entryName).getFileName() - .toString().equals("_READY_FOR_DB_PROCESSING_"); + .toString().equals(ociConfig.markerFilenameDbProcessing()); - entries.add(new FileEntry(entryName, Files.size(targetFile), isMarker)); + FileEntry fileEntry = new FileEntry(entryName, Files.size(targetFile), isMarker); + entries.add(fileEntry); + Log.infof("Extrahiert: '%s' (%d Bytes)", entryName, fileEntry.fileSize); + if (fileEntry.isMarker) { + Log.infof("Marker-Datei gefunden: '%s'", entryName); + } } } } context.extractedFiles = entries; + Log.infof("Extraktion abgeschlossen: %d Datei(en) aus '%s'", entries.size(), context.zipFilename); } catch (IOException e) { throw new ZipException("ZIP '" + context.zipFilename + "' konnte nicht entpackt werden: " diff --git a/quarkus-automaton/src/main/resources/application.properties b/quarkus-automaton/src/main/resources/application.properties index 2b9db08..0ad6d5c 100644 --- a/quarkus-automaton/src/main/resources/application.properties +++ b/quarkus-automaton/src/main/resources/application.properties @@ -19,6 +19,9 @@ galabau.sftp.local-work-dir=/tmp/sftp-work # galabau.sftp.private-key-passphrase=${SFTP_KEY_PASSPHRASE} # ===== OCI Object Storage ===== +# Dateiname des DB-Processing-Markers, der nach dem Upload aller Nutzdateien in OCI abgelegt wird +galabau.oci.marker-filename-db-processing=${OCI_MARKER_FILENAME_DB_PROCESSING:_READY_FOR_DB_PROCESSING_} + galabau.oci.namespace=${OCI_NAMESPACE} galabau.oci.region=${OCI_REGION} galabau.oci.bucket=${OCI_BUCKET}