Logging und refactoring verbesserungen
This commit is contained in:
@@ -43,6 +43,7 @@ public class FileProcessingResource {
|
||||
return Response.status(Response.Status.UNAUTHORIZED).build();
|
||||
}
|
||||
|
||||
Log.info("API-Key valide, Pipeline-Trigger wird verarbeitet");
|
||||
boolean started = pipeline.tryProcessAllAsync();
|
||||
|
||||
if (!started) {
|
||||
|
||||
@@ -42,4 +42,11 @@ public interface OciConfig {
|
||||
* Dev: relativer Pfad zum Projektverzeichnis (Default: {@code oci-private-key.pem}).
|
||||
*/
|
||||
String privateKeyPath();
|
||||
|
||||
/**
|
||||
* Dateiname des DB-Processing-Markers, der nach dem Upload aller Nutzdateien in OCI abgelegt wird.
|
||||
* Default: {@code _READY_FOR_DB_PROCESSING_}.
|
||||
* Muss mit der APEX Automation und dem ORDS-Package abgestimmt sein.
|
||||
*/
|
||||
String markerFilenameDbProcessing();
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ import de.galabau.dateieingang.config.OciConfig;
|
||||
import de.galabau.dateieingang.exception.OciException;
|
||||
import de.galabau.dateieingang.model.FileEntry;
|
||||
import de.galabau.dateieingang.model.ProcessingContext;
|
||||
import de.galabau.dateieingang.model.ProcessingStatus;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
@@ -50,6 +49,7 @@ public class OciUploadService {
|
||||
.build();
|
||||
|
||||
this.client = ObjectStorageClient.builder().build(auth);
|
||||
Log.infof("OCI ObjectStorage-Client initialisiert (Region: %s, Bucket: %s)", config.region(), config.bucket());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -61,24 +61,24 @@ public class OciUploadService {
|
||||
* @throws OciException bei Verbindungs- oder Upload-Fehlern
|
||||
*/
|
||||
public void upload(ProcessingContext context) throws OciException {
|
||||
context.status = ProcessingStatus.PARTIALLY_UPLOADED;
|
||||
|
||||
List<FileEntry> files = context.extractedFiles.stream()
|
||||
.filter(e -> !e.isMarker)
|
||||
.toList();
|
||||
|
||||
Log.infof("OCI-Upload gestartet: %d Datei(en) für '%s'", files.size(), context.zipNameWithoutExt);
|
||||
|
||||
for (FileEntry entry : files) {
|
||||
String key = buildKey(context.zipNameWithoutExt, entry.relativePath);
|
||||
entry.ociKey = key;
|
||||
putFile(key, context.localExtractDir.resolve(entry.relativePath), entry.fileSize);
|
||||
Log.debugf("Datei hochgeladen: %s (%d Bytes)", key, entry.fileSize);
|
||||
Log.infof("Datei hochgeladen: %s (%d Bytes)", key, entry.fileSize);
|
||||
}
|
||||
|
||||
String markerKey = buildKey(context.zipNameWithoutExt, "_READY_FOR_DB_PROCESSING_");
|
||||
putEmpty(markerKey);
|
||||
String markerKey = buildKey(context.zipNameWithoutExt, config.markerFilenameDbProcessing());
|
||||
Log.infof("Lade Marker hoch: '%s'", markerKey);
|
||||
uploadMarker(markerKey);
|
||||
|
||||
context.markerUploaded = true;
|
||||
context.status = ProcessingStatus.MARKER_UPLOADED;
|
||||
Log.infof("OCI-Upload abgeschlossen: %d Datei(en) + Marker in '%s'",
|
||||
files.size(), buildPrefix(context.zipNameWithoutExt));
|
||||
}
|
||||
@@ -105,7 +105,7 @@ public class OciUploadService {
|
||||
}
|
||||
}
|
||||
|
||||
private void putEmpty(String key) throws OciException {
|
||||
private void uploadMarker(String key) throws OciException {
|
||||
try (InputStream is = InputStream.nullInputStream()) {
|
||||
client.putObject(PutObjectRequest.builder()
|
||||
.namespaceName(config.namespace())
|
||||
|
||||
@@ -31,8 +31,9 @@ public class OrdsNotificationService {
|
||||
OrdsConfig config;
|
||||
|
||||
/**
|
||||
* Sendet eine Benachrichtigung an den ORDS-Endpunkt.
|
||||
* Wird bei transienten Fehlern bis zu 3-mal wiederholt (1s Backoff).
|
||||
* Löst die DB-Verarbeitung via ORDS aus ({@code pck_auto_import.p_process_incoming_ba_data}).
|
||||
* Wird bei transienten Fehlern bis zu 3-mal wiederholt (1s Backoff, 10s Timeout je Versuch).
|
||||
* Maximale Wartezeit: ca. 33 Sekunden (3 × 10s + 3 × 1s Backoff).
|
||||
*
|
||||
* @param context enthält {@code zipNameWithoutExt} für das Log
|
||||
* @throws OrdsException wenn der ORDS-Aufruf nach allen Retries fehlschlägt
|
||||
@@ -40,7 +41,8 @@ public class OrdsNotificationService {
|
||||
@Retry(maxRetries = 3, delay = 1000, delayUnit = ChronoUnit.MILLIS,
|
||||
retryOn = OrdsException.class)
|
||||
@Timeout(value = 10, unit = ChronoUnit.SECONDS)
|
||||
public void notify(ProcessingContext context) throws OrdsException {
|
||||
public void triggerDbProcessing(ProcessingContext context) throws OrdsException {
|
||||
Log.infof("Rufe ORDS-Endpunkt auf für '%s'", context.zipNameWithoutExt);
|
||||
Response response;
|
||||
try {
|
||||
response = ordsClient.processIncomingBaData(config.apiKey());
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package de.galabau.dateieingang.pipeline;
|
||||
|
||||
import de.galabau.dateieingang.config.SftpConfig;
|
||||
import de.galabau.dateieingang.exception.OciException;
|
||||
import de.galabau.dateieingang.exception.OrdsException;
|
||||
import de.galabau.dateieingang.exception.SftpException;
|
||||
@@ -19,6 +20,8 @@ import org.slf4j.MDC;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
@@ -44,6 +47,9 @@ public class FileProcessingPipeline {
|
||||
@Inject
|
||||
OrdsNotificationService ordsNotificationService;
|
||||
|
||||
@Inject
|
||||
SftpConfig sftpConfig;
|
||||
|
||||
@Inject
|
||||
ManagedExecutor executor;
|
||||
|
||||
@@ -70,9 +76,11 @@ public class FileProcessingPipeline {
|
||||
return true;
|
||||
}
|
||||
|
||||
void processAll() {
|
||||
private void processAll() {
|
||||
Log.info("Pipeline-Lauf gestartet");
|
||||
|
||||
preProcessingCleanup();
|
||||
|
||||
List<String> zipFiles;
|
||||
try {
|
||||
zipFiles = sftpService.listZipFiles();
|
||||
@@ -99,6 +107,7 @@ public class FileProcessingPipeline {
|
||||
private void processZip(String zipFilename) {
|
||||
ProcessingContext context = new ProcessingContext(UUID.randomUUID(), zipFilename);
|
||||
MDC.put("runId", context.runId.toString());
|
||||
Log.infof("Starte Verarbeitung von '%s' [runId=%s]", zipFilename, context.runId);
|
||||
|
||||
try {
|
||||
// --- Download ---
|
||||
@@ -115,18 +124,20 @@ public class FileProcessingPipeline {
|
||||
|
||||
// --- OCI Upload ---
|
||||
MDC.put("step", "oci-upload");
|
||||
context.status = ProcessingStatus.PARTIALLY_UPLOADED;
|
||||
ociUploadService.upload(context);
|
||||
context.status = ProcessingStatus.MARKER_UPLOADED;
|
||||
|
||||
// --- SFTP Rename → .processed ---
|
||||
MDC.put("step", "sftp-rename");
|
||||
sftpService.renameRemote(zipFilename, zipFilename + ".processed");
|
||||
Log.infof("SFTP Rename: '%s' → '%s.processed'", zipFilename, zipFilename);
|
||||
sftpService.renameFile(zipFilename, zipFilename + ".processed");
|
||||
|
||||
// --- ORDS Notify ---
|
||||
MDC.put("step", "ords-notify");
|
||||
ordsNotificationService.notify(context);
|
||||
ordsNotificationService.triggerDbProcessing(context);
|
||||
|
||||
context.status = ProcessingStatus.ORDS_NOTIFIED;
|
||||
Log.infof("Verarbeitung erfolgreich abgeschlossen: '%s'", zipFilename);
|
||||
|
||||
} catch (SftpException | ZipException | OciException | OrdsException e) {
|
||||
Log.errorf(e, "Verarbeitung von '%s' fehlgeschlagen: %s", zipFilename, e.getMessage());
|
||||
@@ -137,7 +148,10 @@ public class FileProcessingPipeline {
|
||||
context.status = ProcessingStatus.FAILED;
|
||||
tryRenameToError(zipFilename);
|
||||
} finally {
|
||||
cleanup(context);
|
||||
postProcessingCleanup(context);
|
||||
long durationSeconds = Duration.between(context.startTime, LocalDateTime.now()).toSeconds();
|
||||
Log.infof("Lauf %s abgeschlossen — Status: %s, Dauer: %ds",
|
||||
context.runId, context.status, durationSeconds);
|
||||
MDC.clear();
|
||||
}
|
||||
}
|
||||
@@ -145,24 +159,73 @@ public class FileProcessingPipeline {
|
||||
private void tryRenameToError(String zipFilename) {
|
||||
try {
|
||||
MDC.put("step", "sftp-rename");
|
||||
sftpService.renameRemote(zipFilename, zipFilename + ".error");
|
||||
Log.infof("SFTP Rename: '%s' → '%s.error'", zipFilename, zipFilename);
|
||||
sftpService.renameFile(zipFilename, zipFilename + ".error");
|
||||
} catch (SftpException e) {
|
||||
Log.warnf(e, "Umbenennen zu .error fehlgeschlagen für '%s' — Datei bleibt auf SFTP zur manuellen Prüfung",
|
||||
zipFilename);
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanup(ProcessingContext context) {
|
||||
MDC.put("step", "cleanup");
|
||||
/**
|
||||
* Bereinigt verwaiste lokale Arbeitsdateien aus fehlgeschlagenen Vorläufen.
|
||||
*
|
||||
* <p>Wird einmal pro Pipeline-Lauf <em>vor</em> dem SFTP-Listing aufgerufen.
|
||||
* Notwendig weil {@link #postProcessingCleanup} zwar im {@code finally}-Block läuft,
|
||||
* aber bei I/O-Fehlern selbst fehlschlagen kann — in diesem Fall bleiben ZIP-Dateien
|
||||
* und Entpack-Verzeichnisse im Arbeitsverzeichnis zurück. Ohne diesen Schritt würden
|
||||
* sich diese Reste akkumulieren und das Arbeitsverzeichnis über Zeit vollschreiben.
|
||||
*
|
||||
* <p>Ein Zeitstempel-Schwellwert ist nicht nötig: der {@code AtomicBoolean}-Guard in
|
||||
* {@link #tryProcessAllAsync} stellt sicher dass nie zwei Läufe gleichzeitig aktiv sind.
|
||||
* Alles was beim Start eines Laufs im Arbeitsverzeichnis liegt, ist daher garantiert
|
||||
* ein Überrest eines abgeschlossenen oder abgebrochenen Vorlaufs.
|
||||
*/
|
||||
private void preProcessingCleanup() {
|
||||
MDC.put("step", "pre-cleanup");
|
||||
Path workDir = Path.of(sftpConfig.localWorkDir());
|
||||
if (!Files.exists(workDir)) {
|
||||
return;
|
||||
}
|
||||
try (Stream<Path> entries = Files.list(workDir)) {
|
||||
entries.forEach(path -> {
|
||||
try {
|
||||
if (Files.isDirectory(path)) {
|
||||
deleteLocalDirectory(path);
|
||||
Log.warnf("Verwaistes Entpack-Verzeichnis gelöscht: %s", path);
|
||||
} else {
|
||||
Files.delete(path);
|
||||
Log.warnf("Verwaiste Datei gelöscht: %s", path);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
Log.warnf(e, "Pre-Cleanup fehlgeschlagen für: %s", path);
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
Log.warnf(e, "Pre-Cleanup: Arbeitsverzeichnis konnte nicht gelesen werden: %s", workDir);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Bereinigt die lokalen Arbeitsdateien eines abgeschlossenen Laufs (ZIP + Entpack-Verzeichnis).
|
||||
*
|
||||
* <p>Wird im {@code finally}-Block von {@link #processZip} aufgerufen, also sowohl nach
|
||||
* erfolgreichem Abschluss als auch nach Fehlern. Schlägt dieser Cleanup bei I/O-Problemen
|
||||
* fehl, verbleiben die Dateien im Arbeitsverzeichnis — sie werden dann beim nächsten
|
||||
* Pipeline-Lauf durch {@link #preProcessingCleanup} entfernt.
|
||||
*
|
||||
* @param context enthält die Pfade der zu löschenden lokalen ZIP und des Entpack-Verzeichnisses
|
||||
*/
|
||||
private void postProcessingCleanup(ProcessingContext context) {
|
||||
MDC.put("step", "post-cleanup");
|
||||
Log.infof("Cleanup gestartet für Lauf %s", context.runId);
|
||||
try {
|
||||
if (context.localZipPath != null) {
|
||||
Files.deleteIfExists(context.localZipPath);
|
||||
Log.debugf("Lokale ZIP gelöscht: %s", context.localZipPath);
|
||||
Log.infof("Lokale ZIP gelöscht: %s", context.localZipPath);
|
||||
}
|
||||
if (context.localExtractDir != null) {
|
||||
deleteLocalDirectory(context.localExtractDir);
|
||||
Log.debugf("Lokales Entpack-Verzeichnis gelöscht: %s", context.localExtractDir);
|
||||
Log.infof("Lokales Entpack-Verzeichnis gelöscht: %s", context.localExtractDir);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
Log.warnf(e, "Cleanup für Lauf %s fehlgeschlagen — lokale Dateien verbleiben ggf. in %s",
|
||||
|
||||
@@ -9,7 +9,6 @@ import jakarta.inject.Inject;
|
||||
import net.schmizz.sshj.SSHClient;
|
||||
import net.schmizz.sshj.sftp.RemoteResourceInfo;
|
||||
import net.schmizz.sshj.sftp.SFTPClient;
|
||||
import net.schmizz.sshj.transport.verification.PromiscuousVerifier;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
@@ -27,6 +26,7 @@ public class SftpService {
|
||||
void init() {
|
||||
try {
|
||||
Files.createDirectories(Path.of(config.localWorkDir()));
|
||||
Log.infof("Lokales Arbeitsverzeichnis: %s", config.localWorkDir());
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Lokales Arbeitsverzeichnis konnte nicht erstellt werden: "
|
||||
+ config.localWorkDir(), e);
|
||||
@@ -45,8 +45,10 @@ public class SftpService {
|
||||
private <T> T withSftp(SftpOperation<T> operation) throws SftpException {
|
||||
try (SSHClient ssh = new SSHClient()) {
|
||||
configureHostKeyVerification(ssh);
|
||||
Log.infof("Verbinde zu SFTP %s:%d", config.host(), config.port());
|
||||
ssh.connect(config.host(), config.port());
|
||||
authenticate(ssh);
|
||||
Log.infof("SFTP-Verbindung hergestellt");
|
||||
try (SFTPClient sftp = ssh.newSFTPClient()) {
|
||||
return operation.execute(sftp);
|
||||
}
|
||||
@@ -56,21 +58,20 @@ public class SftpService {
|
||||
}
|
||||
}
|
||||
|
||||
private void configureHostKeyVerification(SSHClient ssh) {
|
||||
private void configureHostKeyVerification(SSHClient ssh) throws SftpException {
|
||||
if (config.hostKeyFingerprint().isPresent()) {
|
||||
ssh.addHostKeyVerifier(config.hostKeyFingerprint().get());
|
||||
} else {
|
||||
Log.warn("SFTP Host-Key-Fingerprint nicht konfiguriert");
|
||||
throw new IllegalStateException(
|
||||
"SFTP Host-Key-Fingerprint muss konfiguriert sein!"
|
||||
);
|
||||
throw new SftpException("SFTP Host-Key-Fingerprint nicht konfiguriert — Verbindung abgelehnt");
|
||||
}
|
||||
}
|
||||
|
||||
private void authenticate(SSHClient ssh) throws IOException {
|
||||
if (config.privateKeyPath().isPresent()) {
|
||||
Log.infof("SFTP-Authentifizierung via Public-Key für Benutzer '%s'", config.username());
|
||||
ssh.authPublickey(config.username(), config.privateKeyPath().get());
|
||||
} else {
|
||||
Log.infof("SFTP-Authentifizierung via Passwort für Benutzer '%s'", config.username());
|
||||
ssh.authPassword(config.username(), config.password());
|
||||
}
|
||||
}
|
||||
@@ -82,6 +83,7 @@ public class SftpService {
|
||||
* @throws SftpException bei Verbindungs- oder Lesefehler
|
||||
*/
|
||||
public List<String> listZipFiles() throws SftpException {
|
||||
Log.infof("Lese SFTP-Verzeichnis '%s'", config.remotePath());
|
||||
return withSftp(sftp ->
|
||||
sftp.ls(config.remotePath()).stream()
|
||||
.filter(RemoteResourceInfo::isRegularFile)
|
||||
@@ -100,6 +102,7 @@ public class SftpService {
|
||||
*/
|
||||
public Path download(String filename) throws SftpException {
|
||||
Path localFile = Path.of(config.localWorkDir(), filename);
|
||||
Log.infof("Starte Download: '%s'", filename);
|
||||
withSftp(sftp -> {
|
||||
sftp.get(config.remotePath() + "/" + filename, localFile.toString());
|
||||
return null;
|
||||
@@ -115,7 +118,8 @@ public class SftpService {
|
||||
* @param newFilename neuer Dateiname, z.B. {@code export_2026-04-08.zip.processed}
|
||||
* @throws SftpException bei Verbindungs- oder Umbenennfehler
|
||||
*/
|
||||
public void renameRemote(String filename, String newFilename) throws SftpException {
|
||||
public void renameFile(String filename, String newFilename) throws SftpException {
|
||||
Log.infof("SFTP Rename: '%s' → '%s'", filename, newFilename);
|
||||
withSftp(sftp -> {
|
||||
sftp.rename(
|
||||
config.remotePath() + "/" + filename,
|
||||
@@ -123,5 +127,6 @@ public class SftpService {
|
||||
);
|
||||
return null;
|
||||
});
|
||||
Log.infof("SFTP Rename erfolgreich: '%s'", newFilename);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
package de.galabau.dateieingang.zip;
|
||||
|
||||
import de.galabau.dateieingang.config.OciConfig;
|
||||
import de.galabau.dateieingang.exception.ZipException;
|
||||
import de.galabau.dateieingang.model.FileEntry;
|
||||
import de.galabau.dateieingang.model.ProcessingContext;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
|
||||
import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
|
||||
|
||||
@@ -19,6 +22,9 @@ import java.util.List;
|
||||
@ApplicationScoped
|
||||
public class ZipExtractionService {
|
||||
|
||||
@Inject
|
||||
OciConfig ociConfig;
|
||||
|
||||
/**
|
||||
* Entpackt die ZIP-Datei aus {@code context.localZipPath} in ein gleichnamiges Unterverzeichnis.
|
||||
* Setzt {@code context.localExtractDir} und {@code context.extractedFiles}.
|
||||
@@ -35,6 +41,7 @@ public class ZipExtractionService {
|
||||
|
||||
try {
|
||||
Files.createDirectories(extractDir);
|
||||
Log.infof("Entpacke ZIP '%s'", context.zipFilename);
|
||||
|
||||
try (ZipArchiveInputStream zis = new ZipArchiveInputStream(
|
||||
new BufferedInputStream(Files.newInputStream(context.localZipPath)))) {
|
||||
@@ -51,14 +58,20 @@ public class ZipExtractionService {
|
||||
Files.copy(zis, targetFile, StandardCopyOption.REPLACE_EXISTING);
|
||||
|
||||
boolean isMarker = Path.of(entryName).getFileName()
|
||||
.toString().equals("_READY_FOR_DB_PROCESSING_");
|
||||
.toString().equals(ociConfig.markerFilenameDbProcessing());
|
||||
|
||||
entries.add(new FileEntry(entryName, Files.size(targetFile), isMarker));
|
||||
FileEntry fileEntry = new FileEntry(entryName, Files.size(targetFile), isMarker);
|
||||
entries.add(fileEntry);
|
||||
Log.infof("Extrahiert: '%s' (%d Bytes)", entryName, fileEntry.fileSize);
|
||||
if (fileEntry.isMarker) {
|
||||
Log.infof("Marker-Datei gefunden: '%s'", entryName);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
context.extractedFiles = entries;
|
||||
Log.infof("Extraktion abgeschlossen: %d Datei(en) aus '%s'", entries.size(), context.zipFilename);
|
||||
|
||||
} catch (IOException e) {
|
||||
throw new ZipException("ZIP '" + context.zipFilename + "' konnte nicht entpackt werden: "
|
||||
|
||||
@@ -19,6 +19,9 @@ galabau.sftp.local-work-dir=/tmp/sftp-work
|
||||
# galabau.sftp.private-key-passphrase=${SFTP_KEY_PASSPHRASE}
|
||||
|
||||
# ===== OCI Object Storage =====
|
||||
# Dateiname des DB-Processing-Markers, der nach dem Upload aller Nutzdateien in OCI abgelegt wird
|
||||
galabau.oci.marker-filename-db-processing=${OCI_MARKER_FILENAME_DB_PROCESSING:_READY_FOR_DB_PROCESSING_}
|
||||
|
||||
galabau.oci.namespace=${OCI_NAMESPACE}
|
||||
galabau.oci.region=${OCI_REGION}
|
||||
galabau.oci.bucket=${OCI_BUCKET}
|
||||
|
||||
Reference in New Issue
Block a user