Quarkus Projekt Struktur und SFTP Zugriff implementiert
This commit is contained in:
@@ -82,33 +82,27 @@ Logs werden via OTLP direkt an Loki geschickt — kein Promtail, kein manuelles
|
|||||||
|
|
||||||
## Umgebungsvariablen
|
## Umgebungsvariablen
|
||||||
|
|
||||||
Für lokale Entwicklung eine `.env`-Datei im Projektverzeichnis anlegen:
|
Quarkus lädt eine `.env`-Datei im Projektverzeichnis **automatisch** — kein `source` nötig.
|
||||||
|
Format: `KEY=VALUE` ohne `export`.
|
||||||
|
|
||||||
```bash
|
```properties
|
||||||
# API-Absicherung des REST Endpoints
|
# API-Absicherung des REST Endpoints
|
||||||
export GALABAU_API_KEY=<lokaler-dev-key>
|
GALABAU_API_KEY=dev-key
|
||||||
|
|
||||||
# SFTP
|
# SFTP
|
||||||
export GALABAU_SFTP_PASSWORD=<sftp-passwort>
|
GALABAU_SFTP_HOST=sftp.lieferant.de
|
||||||
# Alternativ (Public-Key-Auth):
|
GALABAU_SFTP_USERNAME=sftpuser
|
||||||
# export SFTP_KEY_PASSPHRASE=<passphrase>
|
GALABAU_SFTP_PASSWORD=<sftp-passwort>
|
||||||
|
# GALABAU_SFTP_HOST_KEY_FINGERPRINT=SHA256:...
|
||||||
|
|
||||||
# OCI Object Storage Credentials
|
# OCI Object Storage Credentials (erst nötig wenn OCI-Stub durch echte Implementierung ersetzt)
|
||||||
export OCI_TENANCY_ID=ocid1.tenancy.oc1..xxx
|
# OCI_TENANCY_ID=ocid1.tenancy.oc1..xxx
|
||||||
export OCI_USER_ID=ocid1.user.oc1..xxx
|
# OCI_USER_ID=ocid1.user.oc1..xxx
|
||||||
export OCI_FINGERPRINT=aa:bb:cc:dd:...
|
# OCI_FINGERPRINT=aa:bb:cc:dd:...
|
||||||
# Lokal: Pfad zur eigenen OCI Key-Datei
|
# OCI_PRIVATE_KEY_PATH=~/.oci/oci_api_key.pem
|
||||||
export OCI_PRIVATE_KEY_PATH=~/.oci/oci_api_key.pem
|
|
||||||
# In Produktion (Kubernetes): gemountetes Secret, z.B. /etc/oci/private-key.pem
|
|
||||||
|
|
||||||
# ORDS
|
# ORDS (erst nötig wenn ORDS-Stub durch echte Implementierung ersetzt)
|
||||||
export GALABAU_ORDS_API_KEY=<ords-api-key>
|
# GALABAU_ORDS_API_KEY=<ords-api-key>
|
||||||
```
|
|
||||||
|
|
||||||
Importieren:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
source .env
|
|
||||||
```
|
```
|
||||||
|
|
||||||
> **.env niemals committen** — in `.gitignore` eintragen.
|
> **.env niemals committen** — in `.gitignore` eintragen.
|
||||||
|
|||||||
@@ -93,7 +93,8 @@
|
|||||||
<!-- Test -->
|
<!-- Test -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>io.quarkus</groupId>
|
<groupId>io.quarkus</groupId>
|
||||||
<artifactId>quarkus-junit5</artifactId>
|
<artifactId>quarkus-junit</artifactId>
|
||||||
|
<version>3.34.1</version>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
|
|||||||
@@ -0,0 +1,59 @@
|
|||||||
|
package de.galabau.dateieingang.api;
|
||||||
|
|
||||||
|
import de.galabau.dateieingang.config.ApplicationConfig;
|
||||||
|
import de.galabau.dateieingang.pipeline.FileProcessingPipeline;
|
||||||
|
import io.quarkus.logging.Log;
|
||||||
|
import jakarta.enterprise.context.ApplicationScoped;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
|
import jakarta.ws.rs.HeaderParam;
|
||||||
|
import jakarta.ws.rs.POST;
|
||||||
|
import jakarta.ws.rs.Path;
|
||||||
|
import jakarta.ws.rs.Produces;
|
||||||
|
import jakarta.ws.rs.core.MediaType;
|
||||||
|
import jakarta.ws.rs.core.Response;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* REST-Endpunkt für den Dateieingang-Trigger.
|
||||||
|
* Wird von der APEX Automation stündlich per HTTP POST aufgerufen (fire & forget).
|
||||||
|
*/
|
||||||
|
@Path("/api/process-incoming")
|
||||||
|
@ApplicationScoped
|
||||||
|
public class FileProcessingResource {
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
ApplicationConfig config;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
FileProcessingPipeline pipeline;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Nimmt einen Trigger von der APEX Automation entgegen und startet die Pipeline asynchron.
|
||||||
|
* Gibt sofort {@code 202 Accepted} zurück — Fehler in der Pipeline landen im Log.
|
||||||
|
*
|
||||||
|
* @param apiKey API-Key aus dem Header {@code X-Api-Key}
|
||||||
|
* @return 202 bei erfolgreichem Start, 401 bei fehlendem/falschem Key, 409 wenn Pipeline läuft
|
||||||
|
*/
|
||||||
|
@POST
|
||||||
|
@Produces(MediaType.APPLICATION_JSON)
|
||||||
|
public Response triggerProcessing(@HeaderParam("X-Api-Key") String apiKey) {
|
||||||
|
if (apiKey == null || !config.api().key().equals(apiKey)) {
|
||||||
|
Log.warn("Trigger abgelehnt — ungültiger oder fehlender API-Key");
|
||||||
|
return Response.status(Response.Status.UNAUTHORIZED).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean started = pipeline.tryProcessAllAsync();
|
||||||
|
|
||||||
|
if (!started) {
|
||||||
|
return Response.status(409)
|
||||||
|
.entity(Map.of("message", "Pipeline läuft bereits"))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
Log.info("Pipeline-Trigger akzeptiert, Verarbeitung startet im Hintergrund");
|
||||||
|
return Response.accepted()
|
||||||
|
.entity(Map.of("message", "Pipeline gestartet"))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,15 @@
|
|||||||
|
package de.galabau.dateieingang.config;
|
||||||
|
|
||||||
|
import io.smallrye.config.ConfigMapping;
|
||||||
|
|
||||||
|
/** Zentrale Konfiguration des Dateieingang-Service. */
|
||||||
|
@ConfigMapping(prefix = "galabau")
|
||||||
|
public interface ApplicationConfig {
|
||||||
|
|
||||||
|
Api api();
|
||||||
|
|
||||||
|
interface Api {
|
||||||
|
/** API-Key für den eingehenden REST-Endpunkt (Header: X-Api-Key). */
|
||||||
|
String key();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,13 @@
|
|||||||
|
package de.galabau.dateieingang.exception;
|
||||||
|
|
||||||
|
/** Fehler beim Upload in OCI Object Storage (Auth, Netzwerk, API-Fehler). */
|
||||||
|
public class OciException extends Exception {
|
||||||
|
|
||||||
|
public OciException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public OciException(String message, Throwable cause) {
|
||||||
|
super(message, cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,13 @@
|
|||||||
|
package de.galabau.dateieingang.exception;
|
||||||
|
|
||||||
|
/** Fehler beim Aufruf des ORDS-Endpunkts (Netzwerk, HTTP-Fehler). */
|
||||||
|
public class OrdsException extends Exception {
|
||||||
|
|
||||||
|
public OrdsException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public OrdsException(String message, Throwable cause) {
|
||||||
|
super(message, cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,13 @@
|
|||||||
|
package de.galabau.dateieingang.exception;
|
||||||
|
|
||||||
|
/** Fehler bei SFTP-Operationen (Verbindung, Download, Rename). */
|
||||||
|
public class SftpException extends Exception {
|
||||||
|
|
||||||
|
public SftpException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public SftpException(String message, Throwable cause) {
|
||||||
|
super(message, cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,13 @@
|
|||||||
|
package de.galabau.dateieingang.exception;
|
||||||
|
|
||||||
|
/** Fehler beim Entpacken einer ZIP-Datei (beschädigt, Path-Traversal-Versuch, I/O-Fehler). */
|
||||||
|
public class ZipException extends Exception {
|
||||||
|
|
||||||
|
public ZipException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ZipException(String message, Throwable cause) {
|
||||||
|
super(message, cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,29 @@
|
|||||||
|
package de.galabau.dateieingang.model;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Repräsentiert eine einzelne Datei aus einer entpackten ZIP.
|
||||||
|
* Das Feld {@code ociKey} wird erst beim OCI-Upload befüllt.
|
||||||
|
*/
|
||||||
|
public class FileEntry {
|
||||||
|
|
||||||
|
/** Relativer Pfad innerhalb der ZIP, z.B. {@code subdir/datei.csv}. */
|
||||||
|
public final String relativePath;
|
||||||
|
|
||||||
|
/** Dateigröße in Bytes. */
|
||||||
|
public final long fileSize;
|
||||||
|
|
||||||
|
/** {@code true} für die Marker-Datei {@code _READY_FOR_DB_PROCESSING_}. */
|
||||||
|
public final boolean isMarker;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Vollständiger OCI Object Key, z.B. {@code eingang/export-2026-04-08/subdir/datei.csv}.
|
||||||
|
* Wird von {@code OciUploadService} gesetzt.
|
||||||
|
*/
|
||||||
|
public String ociKey;
|
||||||
|
|
||||||
|
public FileEntry(String relativePath, long fileSize, boolean isMarker) {
|
||||||
|
this.relativePath = relativePath;
|
||||||
|
this.fileSize = fileSize;
|
||||||
|
this.isMarker = isMarker;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,50 @@
|
|||||||
|
package de.galabau.dateieingang.model;
|
||||||
|
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Zustandsobjekt eines einzelnen ZIP-Verarbeitungslaufs.
|
||||||
|
* Wird zu Beginn jeder ZIP-Verarbeitung erstellt und durch die Pipeline-Steps angereichert.
|
||||||
|
*/
|
||||||
|
public class ProcessingContext {
|
||||||
|
|
||||||
|
/** Eindeutige Lauf-ID — wird als MDC-Feld {@code runId} gesetzt. */
|
||||||
|
public final UUID runId;
|
||||||
|
|
||||||
|
/** Originaler ZIP-Dateiname auf dem SFTP-Server, z.B. {@code export_2026-04-08.zip}. */
|
||||||
|
public final String zipFilename;
|
||||||
|
|
||||||
|
/** ZIP-Name ohne Endung, z.B. {@code export_2026-04-08}. Wird als OCI-Unterordner genutzt. */
|
||||||
|
public final String zipNameWithoutExt;
|
||||||
|
|
||||||
|
/** Startzeitpunkt des Laufs. */
|
||||||
|
public final LocalDateTime startTime;
|
||||||
|
|
||||||
|
/** Lokaler Pfad der heruntergeladenen ZIP-Datei. Gesetzt von {@code SftpService}. */
|
||||||
|
public Path localZipPath;
|
||||||
|
|
||||||
|
/** Lokales Verzeichnis mit entpackten Dateien. Gesetzt von {@code ZipExtractionService}. */
|
||||||
|
public Path localExtractDir;
|
||||||
|
|
||||||
|
/** Entpackte Dateien. Gesetzt von {@code ZipExtractionService}. */
|
||||||
|
public List<FileEntry> extractedFiles = new ArrayList<>();
|
||||||
|
|
||||||
|
/** {@code true} wenn der Marker erfolgreich in OCI hochgeladen wurde. */
|
||||||
|
public boolean markerUploaded = false;
|
||||||
|
|
||||||
|
/** Aktueller Verarbeitungsstatus. */
|
||||||
|
public ProcessingStatus status = ProcessingStatus.PENDING;
|
||||||
|
|
||||||
|
public ProcessingContext(UUID runId, String zipFilename) {
|
||||||
|
this.runId = runId;
|
||||||
|
this.zipFilename = zipFilename;
|
||||||
|
this.zipNameWithoutExt = zipFilename.endsWith(".zip")
|
||||||
|
? zipFilename.substring(0, zipFilename.length() - 4)
|
||||||
|
: zipFilename;
|
||||||
|
this.startTime = LocalDateTime.now();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,10 @@
|
|||||||
|
package de.galabau.dateieingang.model;
|
||||||
|
|
||||||
|
/** Status eines ZIP-Verarbeitungslaufs. */
|
||||||
|
public enum ProcessingStatus {
|
||||||
|
PENDING,
|
||||||
|
PARTIALLY_UPLOADED,
|
||||||
|
MARKER_UPLOADED,
|
||||||
|
ORDS_NOTIFIED,
|
||||||
|
FAILED
|
||||||
|
}
|
||||||
@@ -0,0 +1,32 @@
|
|||||||
|
package de.galabau.dateieingang.oci;
|
||||||
|
|
||||||
|
import de.galabau.dateieingang.exception.OciException;
|
||||||
|
import de.galabau.dateieingang.model.ProcessingContext;
|
||||||
|
import de.galabau.dateieingang.model.ProcessingStatus;
|
||||||
|
import io.quarkus.logging.Log;
|
||||||
|
import jakarta.enterprise.context.ApplicationScoped;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lädt die entpackten Dateien und den Marker in OCI Object Storage hoch.
|
||||||
|
*
|
||||||
|
* <p><b>Stub:</b> OCI-Upload ist noch nicht implementiert.
|
||||||
|
* Der Upload wird übersprungen und der Status auf {@link ProcessingStatus#MARKER_UPLOADED} gesetzt,
|
||||||
|
* damit der Rest der Pipeline (SFTP-Rename, ORDS-Notify) getestet werden kann.
|
||||||
|
*/
|
||||||
|
@ApplicationScoped
|
||||||
|
public class OciUploadService {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lädt alle Dateien aus {@code context.extractedFiles} sowie den Marker in OCI hoch.
|
||||||
|
*
|
||||||
|
* @param context enthält die Liste der hochzuladenden Dateien und den Ziel-Prefix
|
||||||
|
* @throws OciException bei persistenten OCI-Fehlern (4xx) nach Retry-Erschöpfung
|
||||||
|
*/
|
||||||
|
public void upload(ProcessingContext context) throws OciException {
|
||||||
|
// TODO: OCI-Upload implementieren (OCI SDK, SimpleAuthenticationDetailsProvider)
|
||||||
|
Log.infof("[STUB] OCI-Upload übersprungen für '%s' (%d Dateien) — wird später implementiert",
|
||||||
|
context.zipNameWithoutExt, context.extractedFiles.size());
|
||||||
|
context.markerUploaded = true;
|
||||||
|
context.status = ProcessingStatus.MARKER_UPLOADED;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,30 @@
|
|||||||
|
package de.galabau.dateieingang.ords;
|
||||||
|
|
||||||
|
import de.galabau.dateieingang.exception.OrdsException;
|
||||||
|
import de.galabau.dateieingang.model.ProcessingContext;
|
||||||
|
import io.quarkus.logging.Log;
|
||||||
|
import jakarta.enterprise.context.ApplicationScoped;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Benachrichtigt den ORDS-Endpunkt {@code pck_auto_import.p_process_incoming_files},
|
||||||
|
* damit die DB-Verarbeitung sofort angestoßen wird.
|
||||||
|
*
|
||||||
|
* <p><b>Stub:</b> ORDS-Aufruf ist noch nicht implementiert.
|
||||||
|
* Bei einem Ausfall wäre die Verarbeitung ohnehin durch die APEX Automation abgesichert
|
||||||
|
* (diese findet den Marker beim nächsten Stundenlauf).
|
||||||
|
*/
|
||||||
|
@ApplicationScoped
|
||||||
|
public class OrdsNotificationService {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sendet eine Benachrichtigung an den ORDS-Endpunkt.
|
||||||
|
*
|
||||||
|
* @param context enthält {@code zipNameWithoutExt} und {@code runId} für den Request-Body
|
||||||
|
* @throws OrdsException wenn der ORDS-Aufruf nach allen Retries fehlschlägt
|
||||||
|
*/
|
||||||
|
public void notify(ProcessingContext context) throws OrdsException {
|
||||||
|
// TODO: ORDS REST-Client implementieren (MicroProfile REST Client + @Retry)
|
||||||
|
Log.infof("[STUB] ORDS-Benachrichtigung übersprungen für '%s' — wird später implementiert",
|
||||||
|
context.zipNameWithoutExt);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,188 @@
|
|||||||
|
package de.galabau.dateieingang.pipeline;
|
||||||
|
|
||||||
|
import de.galabau.dateieingang.exception.OciException;
|
||||||
|
import de.galabau.dateieingang.exception.OrdsException;
|
||||||
|
import de.galabau.dateieingang.exception.SftpException;
|
||||||
|
import de.galabau.dateieingang.exception.ZipException;
|
||||||
|
import de.galabau.dateieingang.model.ProcessingContext;
|
||||||
|
import de.galabau.dateieingang.model.ProcessingStatus;
|
||||||
|
import de.galabau.dateieingang.oci.OciUploadService;
|
||||||
|
import de.galabau.dateieingang.ords.OrdsNotificationService;
|
||||||
|
import de.galabau.dateieingang.sftp.SftpService;
|
||||||
|
import de.galabau.dateieingang.zip.ZipExtractionService;
|
||||||
|
import io.quarkus.logging.Log;
|
||||||
|
import jakarta.enterprise.context.ApplicationScoped;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
|
import org.eclipse.microprofile.context.ManagedExecutor;
|
||||||
|
import org.slf4j.MDC;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Orchestriert den gesamten Dateieingang-Workflow: SFTP → ZIP → OCI → ORDS.
|
||||||
|
* Läuft asynchron im Hintergrund, gibt dem Aufrufer sofort zurück.
|
||||||
|
*/
|
||||||
|
@ApplicationScoped
|
||||||
|
public class FileProcessingPipeline {
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
SftpService sftpService;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
ZipExtractionService zipExtractionService;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
OciUploadService ociUploadService;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
OrdsNotificationService ordsNotificationService;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
ManagedExecutor executor;
|
||||||
|
|
||||||
|
private final AtomicBoolean isRunning = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Startet die Pipeline asynchron im Hintergrund (fire & forget).
|
||||||
|
* Gibt {@code false} zurück wenn bereits ein Lauf aktiv ist.
|
||||||
|
*
|
||||||
|
* @return {@code true} wenn die Pipeline gestartet wurde, {@code false} bei aktivem Lauf
|
||||||
|
*/
|
||||||
|
public boolean tryProcessAllAsync() {
|
||||||
|
if (!isRunning.compareAndSet(false, true)) {
|
||||||
|
Log.warn("Pipeline läuft bereits — neuer Trigger abgewiesen");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
executor.submit(() -> {
|
||||||
|
try {
|
||||||
|
processAll();
|
||||||
|
} finally {
|
||||||
|
isRunning.set(false);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void processAll() {
|
||||||
|
Log.info("Pipeline-Lauf gestartet");
|
||||||
|
|
||||||
|
List<String> zipFiles;
|
||||||
|
try {
|
||||||
|
zipFiles = sftpService.listZipFiles();
|
||||||
|
} catch (SftpException e) {
|
||||||
|
Log.errorf(e, "SFTP-Listing fehlgeschlagen — Pipeline-Lauf abgebrochen");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (zipFiles.isEmpty()) {
|
||||||
|
Log.info("Keine neuen ZIP-Dateien auf dem SFTP-Server gefunden");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Log.infof("%d neue ZIP-Datei(en) auf dem SFTP-Server gefunden", zipFiles.size());
|
||||||
|
|
||||||
|
for (String zipFilename : zipFiles) {
|
||||||
|
Log.infof("Datei gefunden: %s", zipFilename);
|
||||||
|
processZip(zipFilename);
|
||||||
|
}
|
||||||
|
|
||||||
|
Log.info("Pipeline-Lauf abgeschlossen");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void processZip(String zipFilename) {
|
||||||
|
ProcessingContext context = new ProcessingContext(UUID.randomUUID(), zipFilename);
|
||||||
|
MDC.put("runId", context.runId.toString());
|
||||||
|
|
||||||
|
try {
|
||||||
|
// --- Download ---
|
||||||
|
MDC.put("step", "sftp-download");
|
||||||
|
context.localZipPath = sftpService.download(zipFilename);
|
||||||
|
Log.infof("ZIP '%s' heruntergeladen (%d Bytes)", zipFilename,
|
||||||
|
Files.size(context.localZipPath));
|
||||||
|
|
||||||
|
// --- Entpacken ---
|
||||||
|
MDC.put("step", "zip-extract");
|
||||||
|
zipExtractionService.extract(context);
|
||||||
|
Log.infof("ZIP '%s' entpackt: %d Datei(en)", zipFilename,
|
||||||
|
context.extractedFiles.size());
|
||||||
|
|
||||||
|
// --- OCI Upload (Stub) ---
|
||||||
|
MDC.put("step", "oci-upload");
|
||||||
|
ociUploadService.upload(context);
|
||||||
|
|
||||||
|
// --- SFTP Rename → .processed ---
|
||||||
|
MDC.put("step", "sftp-rename");
|
||||||
|
sftpService.renameRemote(zipFilename, zipFilename + ".processed");
|
||||||
|
Log.infof("SFTP Rename: '%s' → '%s.processed'", zipFilename, zipFilename);
|
||||||
|
|
||||||
|
// --- ORDS Notify (Stub) ---
|
||||||
|
MDC.put("step", "ords-notify");
|
||||||
|
ordsNotificationService.notify(context);
|
||||||
|
|
||||||
|
context.status = ProcessingStatus.ORDS_NOTIFIED;
|
||||||
|
|
||||||
|
} catch (SftpException | ZipException | OciException | OrdsException e) {
|
||||||
|
Log.errorf(e, "Verarbeitung von '%s' fehlgeschlagen: %s", zipFilename, e.getMessage());
|
||||||
|
context.status = ProcessingStatus.FAILED;
|
||||||
|
tryRenameToError(zipFilename);
|
||||||
|
} catch (IOException e) {
|
||||||
|
Log.errorf(e, "I/O-Fehler bei der Verarbeitung von '%s'", zipFilename);
|
||||||
|
context.status = ProcessingStatus.FAILED;
|
||||||
|
tryRenameToError(zipFilename);
|
||||||
|
} finally {
|
||||||
|
cleanup(context);
|
||||||
|
MDC.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void tryRenameToError(String zipFilename) {
|
||||||
|
try {
|
||||||
|
MDC.put("step", "sftp-rename");
|
||||||
|
sftpService.renameRemote(zipFilename, zipFilename + ".error");
|
||||||
|
Log.infof("SFTP Rename: '%s' → '%s.error'", zipFilename, zipFilename);
|
||||||
|
} catch (SftpException e) {
|
||||||
|
Log.warnf(e, "Umbenennen zu .error fehlgeschlagen für '%s' — Datei bleibt auf SFTP zur manuellen Prüfung",
|
||||||
|
zipFilename);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void cleanup(ProcessingContext context) {
|
||||||
|
MDC.put("step", "cleanup");
|
||||||
|
try {
|
||||||
|
if (context.localZipPath != null) {
|
||||||
|
Files.deleteIfExists(context.localZipPath);
|
||||||
|
Log.debugf("Lokale ZIP gelöscht: %s", context.localZipPath);
|
||||||
|
}
|
||||||
|
if (context.localExtractDir != null) {
|
||||||
|
deleteLocalDirectory(context.localExtractDir);
|
||||||
|
Log.debugf("Lokales Entpack-Verzeichnis gelöscht: %s", context.localExtractDir);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
Log.warnf(e, "Cleanup für Lauf %s fehlgeschlagen — lokale Dateien verbleiben ggf. in %s",
|
||||||
|
context.runId,
|
||||||
|
context.localZipPath != null ? context.localZipPath.getParent() : "unbekannt");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void deleteLocalDirectory(Path dir) throws IOException {
|
||||||
|
if (!Files.exists(dir)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try (Stream<Path> walk = Files.walk(dir)) {
|
||||||
|
walk.sorted(Comparator.reverseOrder()).forEach(path -> {
|
||||||
|
try {
|
||||||
|
Files.delete(path);
|
||||||
|
} catch (IOException e) {
|
||||||
|
Log.warnf(e, "Konnte nicht löschen: %s", path);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,47 @@
|
|||||||
|
package de.galabau.dateieingang.sftp;
|
||||||
|
|
||||||
|
import io.smallrye.config.ConfigMapping;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
/** SFTP-Konfiguration. Credentials kommen ausschließlich aus Umgebungsvariablen. */
|
||||||
|
@ConfigMapping(prefix = "galabau.sftp")
|
||||||
|
public interface SftpConfig {
|
||||||
|
|
||||||
|
/** SFTP-Hostname, z.B. {@code sftp.lieferant.de}. */
|
||||||
|
String host();
|
||||||
|
|
||||||
|
/** SFTP-Port, Standard: 22. */
|
||||||
|
int port();
|
||||||
|
|
||||||
|
/** SFTP-Benutzername. */
|
||||||
|
String username();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* SFTP-Passwort. Nur verwendet wenn kein Private Key konfiguriert ist.
|
||||||
|
* Aus Env-Var {@code GALABAU_SFTP_PASSWORD}.
|
||||||
|
*/
|
||||||
|
String password();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* SHA256-Fingerprint des SFTP-Host-Keys, z.B. {@code SHA256:AbCdEf...}.
|
||||||
|
* Ermitteln: {@code ssh-keyscan host | ssh-keygen -lf -}
|
||||||
|
* Wenn nicht gesetzt: PromiscuousVerifier (nur Dev — gibt WARN aus).
|
||||||
|
*/
|
||||||
|
Optional<String> hostKeyFingerprint();
|
||||||
|
|
||||||
|
/** Remote-Verzeichnis auf dem SFTP-Server, z.B. {@code /outgoing}. */
|
||||||
|
String remotePath();
|
||||||
|
|
||||||
|
/** Lokales Arbeitsverzeichnis für Downloads, z.B. {@code /tmp/sftp-work}. */
|
||||||
|
String localWorkDir();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pfad zur Private-Key-Datei für Public-Key-Auth (empfohlen für Produktion).
|
||||||
|
* Wenn gesetzt, wird Passwort-Auth ignoriert.
|
||||||
|
*/
|
||||||
|
Optional<String> privateKeyPath();
|
||||||
|
|
||||||
|
/** Passphrase für den Private Key, falls verschlüsselt. */
|
||||||
|
Optional<String> privateKeyPassphrase();
|
||||||
|
}
|
||||||
@@ -0,0 +1,124 @@
|
|||||||
|
package de.galabau.dateieingang.sftp;
|
||||||
|
|
||||||
|
import de.galabau.dateieingang.exception.SftpException;
|
||||||
|
import io.quarkus.logging.Log;
|
||||||
|
import jakarta.annotation.PostConstruct;
|
||||||
|
import jakarta.enterprise.context.ApplicationScoped;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
|
import net.schmizz.sshj.SSHClient;
|
||||||
|
import net.schmizz.sshj.sftp.RemoteResourceInfo;
|
||||||
|
import net.schmizz.sshj.sftp.SFTPClient;
|
||||||
|
import net.schmizz.sshj.transport.verification.PromiscuousVerifier;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/** Kapselt alle SFTP-Operationen: Auflisten, Download und Umbenennen. */
|
||||||
|
@ApplicationScoped
|
||||||
|
public class SftpService {
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
SftpConfig config;
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
void init() {
|
||||||
|
try {
|
||||||
|
Files.createDirectories(Path.of(config.localWorkDir()));
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException("Lokales Arbeitsverzeichnis konnte nicht erstellt werden: "
|
||||||
|
+ config.localWorkDir(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@FunctionalInterface
|
||||||
|
private interface SftpOperation<T> {
|
||||||
|
T execute(SFTPClient sftp) throws IOException;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Öffnet eine SFTP-Verbindung, führt die Operation aus und trennt danach sauber.
|
||||||
|
* Credentials und Host-Key-Verifikation werden aus {@link SftpConfig} gelesen.
|
||||||
|
*/
|
||||||
|
private <T> T withSftp(SftpOperation<T> operation) throws SftpException {
|
||||||
|
try (SSHClient ssh = new SSHClient()) {
|
||||||
|
configureHostKeyVerification(ssh);
|
||||||
|
ssh.connect(config.host(), config.port());
|
||||||
|
authenticate(ssh);
|
||||||
|
try (SFTPClient sftp = ssh.newSFTPClient()) {
|
||||||
|
return operation.execute(sftp);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new SftpException("SFTP-Operation fehlgeschlagen auf " + config.host()
|
||||||
|
+ ": " + e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void configureHostKeyVerification(SSHClient ssh) {
|
||||||
|
if (config.hostKeyFingerprint().isPresent()) {
|
||||||
|
ssh.addHostKeyVerifier(config.hostKeyFingerprint().get());
|
||||||
|
} else {
|
||||||
|
Log.warn("SFTP Host-Key-Fingerprint nicht konfiguriert — PromiscuousVerifier aktiv (nur Dev!)");
|
||||||
|
ssh.addHostKeyVerifier(new PromiscuousVerifier());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void authenticate(SSHClient ssh) throws IOException {
|
||||||
|
if (config.privateKeyPath().isPresent()) {
|
||||||
|
ssh.authPublickey(config.username(), config.privateKeyPath().get());
|
||||||
|
} else {
|
||||||
|
ssh.authPassword(config.username(), config.password());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Listet alle {@code *.zip}-Dateien im konfigurierten Remote-Verzeichnis.
|
||||||
|
*
|
||||||
|
* @return Liste der Dateinamen (ohne Pfad), z.B. {@code ["export_2026-04-08.zip"]}
|
||||||
|
* @throws SftpException bei Verbindungs- oder Lesefehler
|
||||||
|
*/
|
||||||
|
public List<String> listZipFiles() throws SftpException {
|
||||||
|
return withSftp(sftp ->
|
||||||
|
sftp.ls(config.remotePath()).stream()
|
||||||
|
.filter(RemoteResourceInfo::isRegularFile)
|
||||||
|
.map(RemoteResourceInfo::getName)
|
||||||
|
.filter(name -> name.endsWith(".zip"))
|
||||||
|
.toList()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lädt eine Datei vom SFTP-Server in das lokale Arbeitsverzeichnis herunter.
|
||||||
|
*
|
||||||
|
* @param filename Dateiname auf dem Remote-Server, z.B. {@code export_2026-04-08.zip}
|
||||||
|
* @return Lokaler Pfad der heruntergeladenen Datei
|
||||||
|
* @throws SftpException bei Verbindungs- oder Downloadfehler
|
||||||
|
*/
|
||||||
|
public Path download(String filename) throws SftpException {
|
||||||
|
Path localFile = Path.of(config.localWorkDir(), filename);
|
||||||
|
withSftp(sftp -> {
|
||||||
|
sftp.get(config.remotePath() + "/" + filename, localFile.toString());
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
return localFile;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Benennt eine Datei auf dem Remote-SFTP-Server um.
|
||||||
|
* Wird nach Erfolg ({@code .processed}) oder Fehler ({@code .error}) aufgerufen.
|
||||||
|
*
|
||||||
|
* @param filename aktueller Dateiname, z.B. {@code export_2026-04-08.zip}
|
||||||
|
* @param newFilename neuer Dateiname, z.B. {@code export_2026-04-08.zip.processed}
|
||||||
|
* @throws SftpException bei Verbindungs- oder Umbenennfehler
|
||||||
|
*/
|
||||||
|
public void renameRemote(String filename, String newFilename) throws SftpException {
|
||||||
|
withSftp(sftp -> {
|
||||||
|
sftp.rename(
|
||||||
|
config.remotePath() + "/" + filename,
|
||||||
|
config.remotePath() + "/" + newFilename
|
||||||
|
);
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,83 @@
|
|||||||
|
package de.galabau.dateieingang.zip;
|
||||||
|
|
||||||
|
import de.galabau.dateieingang.exception.ZipException;
|
||||||
|
import de.galabau.dateieingang.model.FileEntry;
|
||||||
|
import de.galabau.dateieingang.model.ProcessingContext;
|
||||||
|
import jakarta.enterprise.context.ApplicationScoped;
|
||||||
|
import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
|
||||||
|
import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
|
||||||
|
|
||||||
|
import java.io.BufferedInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.StandardCopyOption;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/** Entpackt ZIP-Dateien und befüllt den {@link ProcessingContext} mit den extrahierten Dateien. */
|
||||||
|
@ApplicationScoped
|
||||||
|
public class ZipExtractionService {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Entpackt die ZIP-Datei aus {@code context.localZipPath} in ein gleichnamiges Unterverzeichnis.
|
||||||
|
* Setzt {@code context.localExtractDir} und {@code context.extractedFiles}.
|
||||||
|
* Verzeichnisstruktur innerhalb der ZIP wird beibehalten.
|
||||||
|
*
|
||||||
|
* @param context muss {@code localZipPath} und {@code zipNameWithoutExt} gesetzt haben
|
||||||
|
* @throws ZipException bei beschädigter ZIP, Path-Traversal-Versuch oder I/O-Fehler
|
||||||
|
*/
|
||||||
|
public void extract(ProcessingContext context) throws ZipException {
|
||||||
|
Path extractDir = context.localZipPath.getParent().resolve(context.zipNameWithoutExt);
|
||||||
|
context.localExtractDir = extractDir;
|
||||||
|
|
||||||
|
List<FileEntry> entries = new ArrayList<>();
|
||||||
|
|
||||||
|
try {
|
||||||
|
Files.createDirectories(extractDir);
|
||||||
|
|
||||||
|
try (ZipArchiveInputStream zis = new ZipArchiveInputStream(
|
||||||
|
new BufferedInputStream(Files.newInputStream(context.localZipPath)))) {
|
||||||
|
|
||||||
|
ZipArchiveEntry entry;
|
||||||
|
while ((entry = zis.getNextEntry()) != null) {
|
||||||
|
String entryName = entry.getName();
|
||||||
|
|
||||||
|
if (entry.isDirectory()) {
|
||||||
|
Files.createDirectories(extractDir.resolve(entryName));
|
||||||
|
} else {
|
||||||
|
Path targetFile = resolveAndValidate(extractDir, entryName, context.zipFilename);
|
||||||
|
Files.createDirectories(targetFile.getParent());
|
||||||
|
Files.copy(zis, targetFile, StandardCopyOption.REPLACE_EXISTING);
|
||||||
|
|
||||||
|
boolean isMarker = Path.of(entryName).getFileName()
|
||||||
|
.toString().equals("_READY_FOR_DB_PROCESSING_");
|
||||||
|
|
||||||
|
entries.add(new FileEntry(entryName, Files.size(targetFile), isMarker));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
context.extractedFiles = entries;
|
||||||
|
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ZipException("ZIP '" + context.zipFilename + "' konnte nicht entpackt werden: "
|
||||||
|
+ e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Löst den Entry-Pfad auf und prüft auf Path-Traversal-Versuche (Zip-Slip-Schutz).
|
||||||
|
*
|
||||||
|
* @throws ZipException wenn der aufgelöste Pfad außerhalb von {@code extractDir} liegt
|
||||||
|
*/
|
||||||
|
private Path resolveAndValidate(Path extractDir, String entryName, String zipFilename)
|
||||||
|
throws ZipException {
|
||||||
|
Path resolved = extractDir.resolve(entryName).normalize();
|
||||||
|
if (!resolved.startsWith(extractDir)) {
|
||||||
|
throw new ZipException("ZIP '" + zipFilename + "' enthält Path-Traversal-Versuch: "
|
||||||
|
+ entryName);
|
||||||
|
}
|
||||||
|
return resolved;
|
||||||
|
}
|
||||||
|
}
|
||||||
44
agent-backend/src/main/resources/application.properties
Normal file
44
agent-backend/src/main/resources/application.properties
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
quarkus.application.name=dateieingang-service
|
||||||
|
|
||||||
|
# ===== API =====
|
||||||
|
# Quarkus liest ${VAR:default} nativ aus Umgebungsvariablen
|
||||||
|
galabau.api.key=${GALABAU_API_KEY:dev-key}
|
||||||
|
|
||||||
|
# ===== SFTP =====
|
||||||
|
galabau.sftp.host=${GALABAU_SFTP_HOST:localhost}
|
||||||
|
galabau.sftp.port=${GALABAU_SFTP_PORT:22}
|
||||||
|
galabau.sftp.username=${GALABAU_SFTP_USERNAME:sftpuser}
|
||||||
|
galabau.sftp.password=${GALABAU_SFTP_PASSWORD:}
|
||||||
|
# Fingerprint auf host: ssh-keyscan <host> | ssh-keygen -lf -
|
||||||
|
galabau.sftp.host-key-fingerprint=${GALABAU_SFTP_HOST_KEY_FINGERPRINT:SHA256:xyz}
|
||||||
|
# Verzeichnis auf dem SFTP-Server, in dem der Lieferant ZIP-Dateien ablegt
|
||||||
|
galabau.sftp.remote-path=${GALABAU_SFTP_REMOTE_PATH:/bundesagenturfuerarbeit/austausch/test/galaeingang}
|
||||||
|
# Temporäres lokales Verzeichnis für Download + Entpacken — wird nach jeder ZIP bereinigt
|
||||||
|
galabau.sftp.local-work-dir=/tmp/sftp-work
|
||||||
|
# galabau.sftp.private-key-path=/etc/secrets/sftp-key
|
||||||
|
# galabau.sftp.private-key-passphrase=${SFTP_KEY_PASSPHRASE}
|
||||||
|
|
||||||
|
# ===== OCI (Stub — noch nicht aktiv) =====
|
||||||
|
# galabau.oci.namespace=${OCI_NAMESPACE}
|
||||||
|
# galabau.oci.region=${OCI_REGION}
|
||||||
|
# galabau.oci.bucket=${OCI_BUCKET}
|
||||||
|
# galabau.oci.tenant-prefix=mandant_42/
|
||||||
|
# galabau.oci.incoming-prefix=eingang/
|
||||||
|
# galabau.oci.tenancy-id=${OCI_TENANCY_ID}
|
||||||
|
# galabau.oci.user-id=${OCI_USER_ID}
|
||||||
|
# galabau.oci.fingerprint=${OCI_FINGERPRINT}
|
||||||
|
# galabau.oci.private-key-path=${OCI_PRIVATE_KEY_PATH}
|
||||||
|
|
||||||
|
# ===== ORDS (Stub — noch nicht aktiv) =====
|
||||||
|
# galabau.ords.base-url=${GALABAU_ORDS_BASE_URL:http://ords:8080}
|
||||||
|
# galabau.ords.process-incoming-path=/ords/.../auto_import/process_incoming
|
||||||
|
# galabau.ords.api-key=${GALABAU_ORDS_API_KEY}
|
||||||
|
# quarkus.rest-client.ords-client.url=${galabau.ords.base-url}
|
||||||
|
|
||||||
|
# ===== Observability =====
|
||||||
|
%prod.quarkus.otel.exporter.otlp.endpoint=${OTEL_EXPORTER_OTLP_ENDPOINT:http://localhost:4317}
|
||||||
|
%dev.quarkus.observability.lgtm.grafana-port=3000
|
||||||
|
%dev.quarkus.observability.lgtm.otel-grpc-port=4317
|
||||||
|
%dev.quarkus.otel.logs.enabled=true
|
||||||
|
#%prod.quarkus.otel.logs.enabled=true
|
||||||
|
%prod.quarkus.log.console.json=true
|
||||||
Reference in New Issue
Block a user