Compare commits

..

10 Commits

20 changed files with 607 additions and 114 deletions

View File

@@ -7,7 +7,12 @@
"Bash(sed -n '465,478p' \"C:\\\\src\\\\Galabau\\\\glb-spielwiese\\\\database\\\\packages\\\\pck_net_storage.pkb\")", "Bash(sed -n '465,478p' \"C:\\\\src\\\\Galabau\\\\glb-spielwiese\\\\database\\\\packages\\\\pck_net_storage.pkb\")",
"Bash(sed -n '523,535p' \"C:\\\\src\\\\Galabau\\\\glb-spielwiese\\\\database\\\\packages\\\\pck_net_storage.pkb\")", "Bash(sed -n '523,535p' \"C:\\\\src\\\\Galabau\\\\glb-spielwiese\\\\database\\\\packages\\\\pck_net_storage.pkb\")",
"Bash(sed -n '582,600p' \"C:\\\\src\\\\Galabau\\\\glb-spielwiese\\\\database\\\\packages\\\\pck_net_storage.pkb\")", "Bash(sed -n '582,600p' \"C:\\\\src\\\\Galabau\\\\glb-spielwiese\\\\database\\\\packages\\\\pck_net_storage.pkb\")",
"WebFetch(domain:docs.public.oneportal.content.oci.oraclecloud.com)" "WebFetch(domain:docs.public.oneportal.content.oci.oraclecloud.com)",
"Bash(./mvnw compile *)",
"WebFetch(domain:medium.com)",
"WebFetch(domain:quarkus.io)",
"WebFetch(domain:github.com)",
"WebFetch(domain:walidhajeri.hashnode.dev)"
] ]
} }
} }

View File

@@ -61,9 +61,11 @@ Alle zur Laufzeit via `pck_system.f_get_par_wert_by_programmid`:
| `NETSTORE_MARKER_SB` | Name der Marker-Datei im Object Store, der von DB beim Verarbeiten angelegt wird, wenn eine oder mehrere Dateien eines ZIPs nicht automatisiert importiert werden konnten. Das soll signalisieren, dass ein Sachbearbeiter die Dateien in diesem Unterordner manuell prüfen und importieren muss. z.B.: `_BITTE_PRÜFEN_` | | `NETSTORE_MARKER_SB` | Name der Marker-Datei im Object Store, der von DB beim Verarbeiten angelegt wird, wenn eine oder mehrere Dateien eines ZIPs nicht automatisiert importiert werden konnten. Das soll signalisieren, dass ein Sachbearbeiter die Dateien in diesem Unterordner manuell prüfen und importieren muss. z.B.: `_BITTE_PRÜFEN_` |
| `NETSTORE_BA_PREFIX` | Pfad in Object Storage, wo BA-Daten liegen. Muss mit einem `/` enden, z.B. `BA/Eingang/` | | `NETSTORE_BA_PREFIX` | Pfad in Object Storage, wo BA-Daten liegen. Muss mit einem `/` enden, z.B. `BA/Eingang/` |
| `NETSTORE_BA_IMPORT` | Name des Unterordners von NETSTORE_BA_PREFIX im Object Storage, wo entpackte Dateien, die noch importiert werden müssen, zwischengespeichert werden. | | `NETSTORE_BA_IMPORT` | Name des Unterordners von NETSTORE_BA_PREFIX im Object Storage, wo entpackte Dateien, die noch importiert werden müssen, zwischengespeichert werden. |
| `NETSTORE_BA_ARCHIV` | Der Basis-Name des Unterordners von NETSTORE_BA_PREFIX im Object Storage, wo verarbeitete BA-Imports abgelegt werden. Der Name darf nicht mit / enden oder beginnen, z.B. "Verarbeitet". Beim Import wird an diesen Namen immer die aktuelle Jahreszahl angehangen, sodass der finale Ordner z.B. "Verarbeitet 2026" heißt. |
| `BA_IMPORT_SB_MIT_ID` | Mitarbeiter-ID für Import von BA Daten (z.B. Korrespondenzen). Diese Mitarbeiter-ID bekommt eine Wiedervorlage, für jede Datei, die nicht automatisch importiert werden konnte. | | `BA_IMPORT_SB_MIT_ID` | Mitarbeiter-ID für Import von BA Daten (z.B. Korrespondenzen). Diese Mitarbeiter-ID bekommt eine Wiedervorlage, für jede Datei, die nicht automatisch importiert werden konnte. |
| `AUTOMATON_BASE_URL` | Base-URL des Quarkus Dateieingang Service, z.B. `http://dateieingang:8080` | | `AUTOMATON_BASE_URL` | Base-URL des Quarkus Dateieingang Service, z.B. `http://dateieingang:8080` |
| `AUTOMATON_API_KEY` | API-Key für den Quarkus Dateieingang Service (Header `X-Api-Key`) | | `AUTOMATON_API_KEY` | API-Key für den Quarkus Dateieingang Service (Header `X-Api-Key`) |
| `NETSTORE_ORDS_APIKEY` | API-Key, den der Quarkus Server nutzt, um den ORDS aufzurufen |
--- ---

View File

@@ -22,13 +22,14 @@ create or replace package body pck_auto_import as
l_response clob; l_response clob;
l_http_status number; l_http_status number;
l_log_action constant varchar2(512 char) := 'BA_KORRESPONDENZEN_DATEIEINGANG_AUTOMATION'; l_log_action constant varchar2(512 char) := 'BA_KORRESPONDENZEN_DATEIEINGANG_AUTOMATION';
l_automaton_endpoint constant varchar2(256 char) := 'api/process-incoming-ba-korrespondenz';
begin begin
-- Schritt 1: Offene Batches in OCI verarbeiten -- Schritt 1: Offene Batches in OCI verarbeiten
p_process_incoming_ba_data; p_process_incoming_ba_data;
-- Schritt 2: Quarkus anstoßen — Fehler werden geloggt, nicht eskaliert -- Schritt 2: Quarkus anstoßen — Fehler werden geloggt, nicht eskaliert
begin begin
l_service_url := pck_system.f_get_par_wert_by_programmid('AUTOMATON_BASE_URL') || '/api/process-incoming'; l_service_url := pck_system.f_get_par_wert_by_programmid('AUTOMATON_BASE_URL') || l_automaton_endpoint;
l_api_key := pck_system.f_get_par_wert_by_programmid('AUTOMATON_API_KEY'); l_api_key := pck_system.f_get_par_wert_by_programmid('AUTOMATON_API_KEY');
apex_web_service.g_request_headers.delete; apex_web_service.g_request_headers.delete;
@@ -179,7 +180,7 @@ create or replace package body pck_auto_import as
l_log_action varchar2(512 char) := 'IMPORT_BA_DATA'; l_log_action varchar2(512 char) := 'IMPORT_BA_DATA';
begin begin
-- Zielordner Name zusammenstellen -- Zielordner Name zusammenstellen
l_target_prefix := pck_system.f_get_par_wert_by_programmid('NETSTORE_BA_PREFIX') || 'Verarbeitet ' || to_char(sysdate, 'YYYY') || '/'; l_target_prefix := pck_system.f_get_par_wert_by_programmid('NETSTORE_BA_PREFIX') || pck_system.f_get_par_wert_by_programmid('NETSTORE_BA_ARCHIV') || ' ' || to_char(sysdate, 'YYYY') || '/';
l_eingang_prefix := pck_system.f_get_par_wert_by_programmid('NETSTORE_BA_PREFIX') || pck_system.f_get_par_wert_by_programmid('NETSTORE_BA_IMPORT'); l_eingang_prefix := pck_system.f_get_par_wert_by_programmid('NETSTORE_BA_PREFIX') || pck_system.f_get_par_wert_by_programmid('NETSTORE_BA_IMPORT');
-- Unterordner in eingangs-ordner auflisten (es gibt einen Ordner für jeden entpackte ZIP-Datei) -- Unterordner in eingangs-ordner auflisten (es gibt einen Ordner für jeden entpackte ZIP-Datei)

View File

@@ -1,2 +1,3 @@
.env .env
target target
*private-key.pem

View File

@@ -1,3 +1,3 @@
{ {
"java.configuration.updateBuildConfiguration": "interactive" "java.configuration.updateBuildConfiguration": "disabled"
} }

View File

@@ -110,7 +110,7 @@ quarkus-automaton/
| `sftp-download` | `SftpService` | SSHJ | Lädt ZIP in lokales Arbeitsverzeichnis | | `sftp-download` | `SftpService` | SSHJ | Lädt ZIP in lokales Arbeitsverzeichnis |
| `zip-extract` | `ZipExtractionService` | Apache Commons Compress | Entpackt ZIP, preserviert Ordnerstruktur | | `zip-extract` | `ZipExtractionService` | Apache Commons Compress | Entpackt ZIP, preserviert Ordnerstruktur |
| `oci-upload` | `OciUploadService` | OCI SDK | Lädt Dateien + Marker zu OCI Object Storage | | `oci-upload` | `OciUploadService` | OCI SDK | Lädt Dateien + Marker zu OCI Object Storage |
| `sftp-rename` | `SftpService` | SSHJ | Remote-Rename zu `.processed` oder `.error` | | `sftp-rename` | `SftpService` | SSHJ | Remote-Rename zu `.processed` (bei Erfolg) oder `.error` (nur bei ungültiger ZIP) |
| `ords-notify` | `OrdsNotificationService` | MicroProfile REST Client | Ruft ORDS-Endpunkt auf | | `ords-notify` | `OrdsNotificationService` | MicroProfile REST Client | Ruft ORDS-Endpunkt auf |
| `cleanup` | `FileProcessingPipeline` | pure Java | Löscht lokale Arbeitsdateien (ZIP + entpackte Dateien) | | `cleanup` | `FileProcessingPipeline` | pure Java | Löscht lokale Arbeitsdateien (ZIP + entpackte Dateien) |
@@ -244,14 +244,14 @@ n8n fire-and-forget-Verhalten.
### Fehlerklassen ### Fehlerklassen
| Fehler | Typ | Retry | Verhalten | | Fehler | Typ | Umbenennung | Verhalten |
|---|---|---|---| |---|---|---|---|
| SFTP-Verbindung fehlgeschlagen | transient | nein | Nächster APEX-Lauf (1h) versucht es | | SFTP-Verbindung / Download fehlgeschlagen | transient | keine | Datei bleibt auf SFTP — nächster APEX-Lauf (1h) versucht es |
| ZIP beschädigt | persistent | nein | ZIP auf SFTP umbenennen zu `.error`, Log | | ZIP beschädigt / ungültig | persistent | `.error` | Datei ist defekt, manuelle Prüfung nötig |
| OCI-Verbindung fehlgeschlagen (z.B. 503) | transient | ja (exponential backoff) | @Retry | | OCI-Verbindung fehlgeschlagen | transient | keine | Datei bleibt auf SFTP — nächster Lauf versucht erneut (OCI PUT idempotent) |
| OCI-Upload einer Datei schlägt fehl | persistent | nein | SFTP-Rename zu `.error`, Log — bereits hochgeladene OCI-Dateien bleiben (idempotent) | | SFTP-Rename zu `.processed` fehlgeschlagen | transient | keine | ORDS wurde noch nicht aufgerufen (kommt danach) — kein Doppelimport; nächster Lauf wiederholt den Schritt |
| ORDS-Aufruf schlägt fehl | transient | ja (2-3x) | Marker liegt vor APEX Automation schlägt beim nächsten Lauf ein | | ORDS-Aufruf schlägt fehl | transient | keine (`.processed` bereits gesetzt) | Marker liegt in OCI vor APEX Automation findet ihn beim nächsten Lauf |
| Allgemein technischer Fehler | fallabhängig | siehe SmallRye Fault Tolerance | Exception-Log | | Unerwarteter Laufzeitfehler | fallabhängig | keine | Exception wird geloggt, Datei bleibt auf SFTP |
### Retry-Strategie (SmallRye Fault Tolerance) ### Retry-Strategie (SmallRye Fault Tolerance)
@@ -309,17 +309,23 @@ Credentials, Fehlerbehandlung).
Pipeline.processAll(): Pipeline.processAll():
1. SftpService.listZipFiles() → ["export_2026-04-08.zip", ...] 1. SftpService.listZipFiles() → ["export_2026-04-08.zip", ...]
2. für jede ZIP: 2. für jede ZIP:
a. SftpService.download(zip) → lokale Datei a. SftpService.download(zip) → lokale Datei
b. ZipExtractionService.extract() → ProcessingContext mit FileEntry-Liste b. ZipExtractionService.extract() → ProcessingContext mit FileEntry-Liste
c. OciUploadService.upload() → Dateien + Marker in OCI ↳ ZipException → Rename zu .error, Abbruch
d. SftpService.renameRemote(.processed oder .error) c. OciUploadService.uploadFiles() → Dateien in OCI (noch kein Marker)
e. OrdsNotificationService.notify() d. SftpService.renameRemote(.processed)
e. OciUploadService.uploadMarker() → Marker in OCI (erst nach Rename — siehe Invariante)
f. OrdsNotificationService.notify()
f. cleanup: lokale ZIP + Entpack-Verzeichnis löschen ← immer, auch bei Fehler f. cleanup: lokale ZIP + Entpack-Verzeichnis löschen ← immer, auch bei Fehler
``` ```
**Cleanup (Schritt f) läuft immer** — in einem `finally`-Block — damit kein Disk-Vollaufen **Cleanup (Schritt f) läuft immer** — in einem `finally`-Block — damit kein Disk-Vollaufen
bei Fehlern oder großen ZIPs. bei Fehlern oder großen ZIPs.
**Umbenennung zu `.error`** erfolgt ausschließlich bei `ZipException` (defekte/ungültige Datei).
Bei Infrastrukturfehlern (SFTP, OCI, ORDS) bleibt die Datei unverändert auf dem SFTP und wird
beim nächsten Lauf automatisch erneut verarbeitet.
--- ---
## OCI-Authentifizierung (SimpleAuthenticationDetailsProvider) ## OCI-Authentifizierung (SimpleAuthenticationDetailsProvider)
@@ -343,11 +349,13 @@ public class OciUploadService {
.tenantId(config.tenancyId()) .tenantId(config.tenancyId())
.userId(config.userId()) .userId(config.userId())
.fingerprint(config.fingerprint()) .fingerprint(config.fingerprint())
.region(Region.fromRegionId(config.region())) .privateKeySupplier(() -> Files.newInputStream(Path.of(config.privateKeyPath())))
.privateKeySupplier(new FilePrivateKeySupplier(config.privateKeyPath()))
.build(); .build();
this.client = ObjectStorageClient.builder().build(auth); // Endpoint explizit setzen — verhindert blockierenden HTTP-Discovery-Call im SDK
client = ObjectStorageClient.builder()
.endpoint("https://objectstorage." + config.region() + ".oraclecloud.com")
.build(auth);
} }
} }
``` ```
@@ -452,6 +460,12 @@ public class ProcessIncomingRequest {
<artifactId>oci-java-sdk-objectstorage</artifactId> <artifactId>oci-java-sdk-objectstorage</artifactId>
<version>3.44.0</version> <version>3.44.0</version>
</dependency> </dependency>
<!-- HTTP-Provider für OCI SDK (jersey3 = Jakarta EE 9+, kompatibel mit Quarkus) -->
<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-java-sdk-common-httpclient-jersey3</artifactId>
<version>3.44.0</version>
</dependency>
<!-- ZIP --> <!-- ZIP -->
<dependency> <dependency>

View File

@@ -75,11 +75,11 @@
<version>0.38.0</version> <version>0.38.0</version>
</dependency> </dependency>
<!-- OCI Object Storage SDK --> <!-- OCI Object Storage SDK — Shaded Full JAR: Jersey und alle internen Abhängigkeiten sind unter
<!-- Aktuelle Version: https://mvnrepository.com/artifact/com.oracle.oci.sdk/oci-java-sdk-objectstorage --> shaded.com.oracle.oci.javasdk.* relokiert, sodass Quarkus RESTEasy die OCI-Provider nicht scannt -->
<dependency> <dependency>
<groupId>com.oracle.oci.sdk</groupId> <groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-java-sdk-objectstorage</artifactId> <artifactId>oci-java-sdk-shaded-full</artifactId>
<version>3.44.0</version> <version>3.44.0</version>
</dependency> </dependency>

View File

@@ -18,7 +18,7 @@ import java.util.Map;
* REST-Endpunkt für den Dateieingang-Trigger. * REST-Endpunkt für den Dateieingang-Trigger.
* Wird von der APEX Automation stündlich per HTTP POST aufgerufen (fire & forget). * Wird von der APEX Automation stündlich per HTTP POST aufgerufen (fire & forget).
*/ */
@Path("/api/process-incoming") @Path("/api/process-incoming-ba-korrespondenz")
@ApplicationScoped @ApplicationScoped
public class FileProcessingResource { public class FileProcessingResource {
@@ -38,11 +38,15 @@ public class FileProcessingResource {
@POST @POST
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
public Response triggerProcessing(@HeaderParam("X-Api-Key") String apiKey) { public Response triggerProcessing(@HeaderParam("X-Api-Key") String apiKey) {
//Log.infof("API-key correct: %s", config.api().key());
//Log.infof("API-key received: %s", apiKey);
if (apiKey == null || !config.api().key().equals(apiKey)) { if (apiKey == null || !config.api().key().equals(apiKey)) {
Log.warn("Trigger abgelehnt — ungültiger oder fehlender API-Key"); Log.warn("Trigger abgelehnt — ungültiger oder fehlender API-Key");
return Response.status(Response.Status.UNAUTHORIZED).build(); return Response.status(Response.Status.UNAUTHORIZED).build();
} }
Log.info("API-Key valide, Pipeline-Trigger wird verarbeitet");
boolean started = pipeline.tryProcessAllAsync(); boolean started = pipeline.tryProcessAllAsync();
if (!started) { if (!started) {

View File

@@ -0,0 +1,52 @@
package de.galabau.dateieingang.config;
import io.smallrye.config.ConfigMapping;
/** OCI Object Storage Konfiguration. Credentials kommen ausschließlich aus Umgebungsvariablen. */
@ConfigMapping(prefix = "galabau.oci")
public interface OciConfig {
/** OCI Object Storage Namespace, z.B. {@code frhqaxi5sgcg}. */
String namespace();
/** OCI Region, z.B. {@code eu-frankfurt-1}. */
String region();
/** OCI Bucket-Name. */
String bucket();
/**
* Root-Prefix für alle Objekte im Bucket, z.B. {@code mandant_42/}.
* Muss mit {@code /} enden.
*/
String tenantPrefix();
/**
* Prefix für eingehende Dateien unterhalb von {@code tenantPrefix},
* z.B. {@code eingang/}. Muss mit {@code /} enden.
*/
String incomingPrefix();
/** OCI Tenancy OCID. Aus Env-Var {@code OCI_TENANCY_ID}. */
String tenancyId();
/** OCI User OCID. Aus Env-Var {@code OCI_USER_ID}. */
String userId();
/** API Key Fingerprint, z.B. {@code aa:bb:cc:...}. Aus Env-Var {@code OCI_FINGERPRINT}. */
String fingerprint();
/**
* Dateisystempfad zur PEM-Datei des OCI API Keys.
* Produktion: absoluter Pfad zum Kubernetes Secret Volume Mount.
* Dev: relativer Pfad zum Projektverzeichnis (Default: {@code oci-private-key.pem}).
*/
String privateKeyPath();
/**
* Dateiname des DB-Processing-Markers, der nach dem Upload aller Nutzdateien in OCI abgelegt wird.
* Default: {@code _READY_FOR_DB_PROCESSING_}.
* Muss mit der APEX Automation und dem ORDS-Package abgestimmt sein.
*/
String markerFilenameDbProcessing();
}

View File

@@ -0,0 +1,21 @@
package de.galabau.dateieingang.config;
import io.smallrye.config.ConfigMapping;
/** ORDS-Konfiguration für den Dateieingang-Endpunkt. */
@ConfigMapping(prefix = "galabau.ords")
public interface OrdsConfig {
/**
* Base-URL des ORDS-Moduls bis einschließlich Modul-Pfad,
* z.B. {@code https://apex.example.com/ords/myschema/auto_import}.
* Wird direkt als {@code quarkus.rest-client.ords-client.url} verwendet.
*/
String baseUrl();
/**
* API-Key für den ORDS-Endpunkt (Header: {@code X-Api-Key}).
* Aus Env-Var {@code GALABAU_ORDS_API_KEY}.
*/
String apiKey();
}

View File

@@ -1,4 +1,4 @@
package de.galabau.dateieingang.sftp; package de.galabau.dateieingang.config;
import io.smallrye.config.ConfigMapping; import io.smallrye.config.ConfigMapping;

View File

@@ -1,32 +1,143 @@
package de.galabau.dateieingang.oci; package de.galabau.dateieingang.oci;
import com.oracle.bmc.Region;
import com.oracle.bmc.auth.SimpleAuthenticationDetailsProvider;
import com.oracle.bmc.objectstorage.ObjectStorage;
import com.oracle.bmc.objectstorage.ObjectStorageClient;
import com.oracle.bmc.objectstorage.requests.PutObjectRequest;
import de.galabau.dateieingang.config.OciConfig;
import de.galabau.dateieingang.exception.OciException; import de.galabau.dateieingang.exception.OciException;
import de.galabau.dateieingang.model.FileEntry;
import de.galabau.dateieingang.model.ProcessingContext; import de.galabau.dateieingang.model.ProcessingContext;
import de.galabau.dateieingang.model.ProcessingStatus;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import io.quarkus.runtime.Startup;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
/** /**
* Lädt die entpackten Dateien und den Marker in OCI Object Storage hoch. * Lädt die entpackten Dateien und den Marker in OCI Object Storage hoch.
* * Authentifizierung via OCI HTTP Signature V1 (entspricht APEX Web Credential vom Typ OCI).
* <p><b>Stub:</b> OCI-Upload ist noch nicht implementiert.
* Der Upload wird übersprungen und der Status auf {@link ProcessingStatus#MARKER_UPLOADED} gesetzt,
* damit der Rest der Pipeline (SFTP-Rename, ORDS-Notify) getestet werden kann.
*/ */
//@Startup
@ApplicationScoped @ApplicationScoped
public class OciUploadService { public class OciUploadService {
@Inject
OciConfig config;
private ObjectStorage client;
@PostConstruct
void init() {
Log.info("Initialisiere OCI ObjectStorage-Client...");
try {
SimpleAuthenticationDetailsProvider auth = SimpleAuthenticationDetailsProvider.builder()
.tenantId(config.tenancyId())
.userId(config.userId())
.fingerprint(config.fingerprint())
.region(Region.fromRegionId(config.region()))
.privateKeySupplier(() -> {
try {
return Files.newInputStream(Path.of(config.privateKeyPath()));
} catch (IOException e) {
throw new RuntimeException("OCI Private Key nicht lesbar: "
+ config.privateKeyPath(), e);
}
})
.build();
Log.info("Authentifizierung...");
client = ObjectStorageClient.builder()
.build(auth);
Log.infof("OCI ObjectStorage-Client initialisiert (Region: %s, Bucket: %s)", config.region(), config.bucket());
} catch (Throwable e) {
Log.errorf(e, "OCI ObjectStorage-Client Initialisierung fehlgeschlagen");
throw new RuntimeException("OCI-Client konnte nicht initialisiert werden", e);
}
}
/** /**
* Lädt alle Dateien aus {@code context.extractedFiles} sowie den Marker in OCI hoch. * Lädt alle Nutzdateien aus {@code context.extractedFiles} in OCI hoch — ohne Marker.
* Der Marker wird erst nach dem SFTP-Rename zu {@code .processed} gesetzt (siehe
* {@link #uploadMarker}), damit APEX Automation den Batch nie verarbeitet bevor die
* ZIP-Datei auf dem SFTP als verarbeitet markiert ist.
* *
* @param context enthält die Liste der hochzuladenden Dateien und den Ziel-Prefix * @param context enthält die Liste der hochzuladenden Dateien und den Ziel-Prefix
* @throws OciException bei persistenten OCI-Fehlern (4xx) nach Retry-Erschöpfung * @throws OciException bei Verbindungs- oder Upload-Fehlern
*/ */
public void upload(ProcessingContext context) throws OciException { public void uploadFiles(ProcessingContext context) throws OciException {
// TODO: OCI-Upload implementieren (OCI SDK, SimpleAuthenticationDetailsProvider) List<FileEntry> files = context.extractedFiles.stream()
Log.infof("[STUB] OCI-Upload übersprungen für '%s' (%d Dateien) — wird später implementiert", .filter(e -> !e.isMarker)
context.zipNameWithoutExt, context.extractedFiles.size()); .toList();
Log.infof("OCI-Upload: %d Datei(en) für '%s'", files.size(), context.zipNameWithoutExt);
for (FileEntry entry : files) {
String key = buildKey(context.zipNameWithoutExt, entry.relativePath);
entry.ociKey = key;
putFile(key, context.localExtractDir.resolve(entry.relativePath), entry.fileSize);
Log.infof("Datei hochgeladen: %s (%d Bytes)", key, entry.fileSize);
}
Log.infof("OCI-Upload Dateien abgeschlossen: %d Datei(en) in '%s'",
files.size(), buildPrefix(context.zipNameWithoutExt));
}
/**
* Setzt den Marker in OCI — signalisiert der DB-Verarbeitung, dass der Batch vollständig ist.
* Wird erst nach dem SFTP-Rename zu {@code .processed} aufgerufen, damit Marker und
* SFTP-Zustand immer konsistent sind: Marker vorhanden ↔ ZIP bereits als verarbeitet markiert.
*
* @param context enthält den Ziel-Prefix für den Marker-Key
* @throws OciException bei Verbindungs- oder Upload-Fehlern
*/
public void uploadMarker(ProcessingContext context) throws OciException {
String markerKey = buildKey(context.zipNameWithoutExt, config.markerFilenameDbProcessing());
Log.infof("Lade Marker hoch: '%s'", markerKey);
putMarker(markerKey);
context.markerUploaded = true; context.markerUploaded = true;
context.status = ProcessingStatus.MARKER_UPLOADED; Log.infof("Marker hochgeladen: '%s'", markerKey);
}
private String buildPrefix(String zipNameWithoutExt) {
return config.tenantPrefix() + config.incomingPrefix() + zipNameWithoutExt + "/";
}
private String buildKey(String zipNameWithoutExt, String relativePath) {
return buildPrefix(zipNameWithoutExt) + relativePath;
}
private void putFile(String key, Path localFile, long fileSize) throws OciException {
try (InputStream is = Files.newInputStream(localFile)) {
client.putObject(PutObjectRequest.builder()
.namespaceName(config.namespace())
.bucketName(config.bucket())
.objectName(key)
.putObjectBody(is)
.contentLength(fileSize)
.build());
} catch (Exception e) {
throw new OciException("OCI-Upload fehlgeschlagen für '" + key + "'", e);
}
}
private void putMarker(String key) throws OciException {
try (InputStream is = InputStream.nullInputStream()) {
client.putObject(PutObjectRequest.builder()
.namespaceName(config.namespace())
.bucketName(config.bucket())
.objectName(key)
.putObjectBody(is)
.contentLength(0L)
.build());
} catch (Exception e) {
throw new OciException("OCI-Upload Marker fehlgeschlagen für '" + key + "'", e);
}
} }
} }

View File

@@ -0,0 +1,30 @@
package de.galabau.dateieingang.ords;
import jakarta.ws.rs.HeaderParam;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.Response;
import org.eclipse.microprofile.rest.client.annotation.RegisterProvider;
import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;
/**
* MicroProfile REST Client für den ORDS-Endpunkt des Dateieingang-Service.
* Base-URL wird über {@code quarkus.rest-client.ords-client.url} konfiguriert
* und zeigt auf den ORDS-Modul-Pfad (z.B. {@code .../ords/myschema/auto_import}).
* Weitere Endpunkte zukünftiger Pipelines werden hier als neue Methoden ergänzt.
*/
@RegisterRestClient(configKey = "ords-client")
@RegisterProvider(OrdsLoggingFilter.class)
public interface OrdsClient {
/**
* Ruft {@code pck_auto_import.p_process_incoming_ba_data} über ORDS auf.
* Die Prozedur verarbeitet alle offenen OCI-Batches (Unterordner mit Marker).
*
* @param apiKey API-Key aus {@code galabau.ords.api-key} (Header: {@code X-Api-Key})
* @return ORDS-Response (erwartet: 2xx)
*/
@POST
@Path("/process_incoming_ba_data")
Response processIncomingBaData(@HeaderParam("X-Api-Key") String apiKey);
}

View File

@@ -0,0 +1,54 @@
package de.galabau.dateieingang.ords;
import io.quarkus.logging.Log;
import jakarta.ws.rs.client.ClientRequestContext;
import jakarta.ws.rs.client.ClientRequestFilter;
import jakarta.ws.rs.client.ClientResponseContext;
import jakarta.ws.rs.client.ClientResponseFilter;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.stream.Collectors;
/** Loggt ORDS-Requests und -Responses im selben Format wie der DefaultClientLogger, maskiert jedoch den X-Api-Key-Header. */
public class OrdsLoggingFilter implements ClientRequestFilter, ClientResponseFilter {
@Override
public void filter(ClientRequestContext ctx) {
String headers = ctx.getHeaders().entrySet().stream()
.map(e -> e.getKey() + "=" + (
"X-Api-Key".equals(e.getKey())
? mask(String.valueOf(e.getValue().getFirst()))
: e.getValue().getFirst()
))
.collect(Collectors.joining(" "));
String body = ctx.hasEntity() ? "<body>" : "Empty body";
Log.infof("Request: %s %s Headers[%s], %s", ctx.getMethod(), ctx.getUri(), headers, body);
}
@Override
public void filter(ClientRequestContext req, ClientResponseContext res) throws IOException {
String headers = res.getHeaders().entrySet().stream()
.map(e -> e.getKey() + "=" + e.getValue().getFirst())
.collect(Collectors.joining(" "));
String body = "";
if (res.hasEntity()) {
byte[] bytes = res.getEntityStream().readAllBytes();
body = new String(bytes, StandardCharsets.UTF_8);
res.setEntityStream(new ByteArrayInputStream(bytes));
}
Log.infof("Response: %s %s, Status[%d %s], Headers[%s], Body:\n%s",
req.getMethod(), req.getUri(),
res.getStatus(), res.getStatusInfo().getReasonPhrase(),
headers, body);
}
private static String mask(String value) {
return value.substring(0, Math.min(4, value.length())) + "***";
}
}

View File

@@ -1,30 +1,62 @@
package de.galabau.dateieingang.ords; package de.galabau.dateieingang.ords;
import de.galabau.dateieingang.config.OrdsConfig;
import de.galabau.dateieingang.exception.OrdsException; import de.galabau.dateieingang.exception.OrdsException;
import de.galabau.dateieingang.model.ProcessingContext; import de.galabau.dateieingang.model.ProcessingContext;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.core.Response;
import org.eclipse.microprofile.faulttolerance.Retry;
import org.eclipse.microprofile.faulttolerance.Timeout;
import org.eclipse.microprofile.rest.client.inject.RestClient;
import java.time.temporal.ChronoUnit;
/** /**
* Benachrichtigt den ORDS-Endpunkt {@code pck_auto_import.p_process_incoming_files}, * Benachrichtigt den ORDS-Endpunkt {@code pck_auto_import.p_process_incoming_ba_data},
* damit die DB-Verarbeitung sofort angestoßen wird. * damit die DB-Verarbeitung sofort angestoßen wird.
* *
* <p><b>Stub:</b> ORDS-Aufruf ist noch nicht implementiert. * <p>Bei Ausfall ist die Verarbeitung durch die APEX Automation abgesichert:
* Bei einem Ausfall wäre die Verarbeitung ohnehin durch die APEX Automation abgesichert * Sie findet den Marker beim nächsten Stundenlauf und ruft die Prozedur selbst auf.
* (diese findet den Marker beim nächsten Stundenlauf).
*/ */
@ApplicationScoped @ApplicationScoped
public class OrdsNotificationService { public class OrdsNotificationService {
@Inject
@RestClient
OrdsClient ordsClient;
@Inject
OrdsConfig config;
/** /**
* Sendet eine Benachrichtigung an den ORDS-Endpunkt. * Löst die DB-Verarbeitung via ORDS aus ({@code pck_auto_import.p_process_incoming_ba_data}).
* Wird bei transienten Fehlern bis zu 3-mal wiederholt (1s Backoff, 10s Timeout je Versuch).
* Maximale Wartezeit: ca. 33 Sekunden (3 × 10s + 3 × 1s Backoff).
* *
* @param context enthält {@code zipNameWithoutExt} und {@code runId} für den Request-Body * @param context enthält {@code zipNameWithoutExt} für das Log
* @throws OrdsException wenn der ORDS-Aufruf nach allen Retries fehlschlägt * @throws OrdsException wenn der ORDS-Aufruf nach allen Retries fehlschlägt
*/ */
public void notify(ProcessingContext context) throws OrdsException { @Retry(maxRetries = 3, delay = 1000, delayUnit = ChronoUnit.MILLIS,
// TODO: ORDS REST-Client implementieren (MicroProfile REST Client + @Retry) retryOn = OrdsException.class)
Log.infof("[STUB] ORDS-Benachrichtigung übersprungen für '%s' — wird später implementiert", @Timeout(value = 10, unit = ChronoUnit.SECONDS)
context.zipNameWithoutExt); public void triggerDbProcessing(ProcessingContext context) throws OrdsException {
Log.infof("Rufe ORDS-Endpunkt auf für '%s'", context.zipNameWithoutExt);
Response response;
try {
response = ordsClient.processIncomingBaData(config.apiKey());
} catch (Exception e) {
throw new OrdsException("ORDS-Verbindung fehlgeschlagen für '"
+ context.zipNameWithoutExt + "'", e);
}
int status = response.getStatus();
if (status >= 400) {
throw new OrdsException("ORDS antwortete mit HTTP " + status
+ " für '" + context.zipNameWithoutExt + "'");
}
Log.infof("ORDS-Endpunkt aufgerufen, HTTP %d für '%s'", status, context.zipNameWithoutExt);
} }
} }

View File

@@ -1,5 +1,6 @@
package de.galabau.dateieingang.pipeline; package de.galabau.dateieingang.pipeline;
import de.galabau.dateieingang.config.SftpConfig;
import de.galabau.dateieingang.exception.OciException; import de.galabau.dateieingang.exception.OciException;
import de.galabau.dateieingang.exception.OrdsException; import de.galabau.dateieingang.exception.OrdsException;
import de.galabau.dateieingang.exception.SftpException; import de.galabau.dateieingang.exception.SftpException;
@@ -19,6 +20,8 @@ import org.slf4j.MDC;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
@@ -44,6 +47,9 @@ public class FileProcessingPipeline {
@Inject @Inject
OrdsNotificationService ordsNotificationService; OrdsNotificationService ordsNotificationService;
@Inject
SftpConfig sftpConfig;
@Inject @Inject
ManagedExecutor executor; ManagedExecutor executor;
@@ -63,6 +69,8 @@ public class FileProcessingPipeline {
executor.submit(() -> { executor.submit(() -> {
try { try {
processAll(); processAll();
} catch (Throwable e) { // nicht exception catchen, weil Error in OCI SDK auftreten können, die Throwable aber nicht Excption sind. Die würden sonst nicht geloggt
Log.errorf(e, "Unerwarteter Fehler im Pipeline-Lauf");
} finally { } finally {
isRunning.set(false); isRunning.set(false);
} }
@@ -70,9 +78,11 @@ public class FileProcessingPipeline {
return true; return true;
} }
void processAll() { private void processAll() {
Log.info("Pipeline-Lauf gestartet"); Log.info("Pipeline-Lauf gestartet");
preProcessingCleanup();
List<String> zipFiles; List<String> zipFiles;
try { try {
zipFiles = sftpService.listZipFiles(); zipFiles = sftpService.listZipFiles();
@@ -83,6 +93,7 @@ public class FileProcessingPipeline {
if (zipFiles.isEmpty()) { if (zipFiles.isEmpty()) {
Log.info("Keine neuen ZIP-Dateien auf dem SFTP-Server gefunden"); Log.info("Keine neuen ZIP-Dateien auf dem SFTP-Server gefunden");
Log.info("Pipeline-Lauf abgeschlossen");
return; return;
} }
@@ -99,6 +110,7 @@ public class FileProcessingPipeline {
private void processZip(String zipFilename) { private void processZip(String zipFilename) {
ProcessingContext context = new ProcessingContext(UUID.randomUUID(), zipFilename); ProcessingContext context = new ProcessingContext(UUID.randomUUID(), zipFilename);
MDC.put("runId", context.runId.toString()); MDC.put("runId", context.runId.toString());
Log.infof("Starte Verarbeitung von '%s' [runId=%s]", zipFilename, context.runId);
try { try {
// --- Download --- // --- Download ---
@@ -113,31 +125,50 @@ public class FileProcessingPipeline {
Log.infof("ZIP '%s' entpackt: %d Datei(en)", zipFilename, Log.infof("ZIP '%s' entpackt: %d Datei(en)", zipFilename,
context.extractedFiles.size()); context.extractedFiles.size());
// --- OCI Upload (Stub) --- // --- OCI Upload (Dateien, noch kein Marker) ---
MDC.put("step", "oci-upload"); MDC.put("step", "oci-upload");
ociUploadService.upload(context); context.status = ProcessingStatus.PARTIALLY_UPLOADED;
Log.info("Starte OCI-Upload");
ociUploadService.uploadFiles(context);
// --- SFTP Rename → .processed --- // --- SFTP Rename → .processed ---
// Erst nach erfolgreichem Datei-Upload — Marker kommt danach,
// damit Marker-Präsenz in OCI ↔ ZIP bereits .processed auf SFTP.
MDC.put("step", "sftp-rename"); MDC.put("step", "sftp-rename");
sftpService.renameRemote(zipFilename, zipFilename + ".processed"); sftpService.renameFile(zipFilename, zipFilename + ".processed");
Log.infof("SFTP Rename: '%s' → '%s.processed'", zipFilename, zipFilename);
// --- ORDS Notify (Stub) --- // --- OCI Marker ---
// Signalisiert der DB-Verarbeitung, dass der Batch vollständig hochgeladen ist.
MDC.put("step", "oci-marker");
ociUploadService.uploadMarker(context);
context.status = ProcessingStatus.MARKER_UPLOADED;
// --- ORDS Notify ---
MDC.put("step", "ords-notify"); MDC.put("step", "ords-notify");
ordsNotificationService.notify(context); ordsNotificationService.triggerDbProcessing(context);
context.status = ProcessingStatus.ORDS_NOTIFIED; context.status = ProcessingStatus.ORDS_NOTIFIED;
Log.infof("Verarbeitung erfolgreich abgeschlossen: '%s'", zipFilename);
} catch (SftpException | ZipException | OciException | OrdsException e) { } catch (ZipException e) {
Log.errorf(e, "Verarbeitung von '%s' fehlgeschlagen: %s", zipFilename, e.getMessage()); Log.errorf(e, "Ungültige ZIP-Datei '%s' — wird zu .error umbenannt", zipFilename);
context.status = ProcessingStatus.FAILED; context.status = ProcessingStatus.FAILED;
tryRenameToError(zipFilename); tryRenameToError(zipFilename);
} catch (SftpException | OciException | OrdsException e) {
Log.errorf(e, "Verarbeitung von '%s' fehlgeschlagen (Infrastruktur): %s", zipFilename, e.getMessage());
context.status = ProcessingStatus.FAILED;
} catch (IOException e) { } catch (IOException e) {
Log.errorf(e, "I/O-Fehler bei der Verarbeitung von '%s'", zipFilename); Log.errorf(e, "I/O-Fehler bei der Verarbeitung von '%s'", zipFilename);
context.status = ProcessingStatus.FAILED; context.status = ProcessingStatus.FAILED;
tryRenameToError(zipFilename); } catch (RuntimeException e) {
Log.errorf(e, "Unerwarteter Laufzeitfehler bei der Verarbeitung von '%s'", zipFilename);
context.status = ProcessingStatus.FAILED;
} finally { } finally {
cleanup(context); postProcessingCleanup(context);
long duration = Duration.between(context.startTime, LocalDateTime.now()).toMillis();
Log.infof("Lauf %s für Datei %s abgeschlossen — Status: %s, Dauer: %dms",
context.runId, zipFilename, context.status, duration);
Log.info("-----------------------------------------------------------------------------------------------------");
MDC.clear(); MDC.clear();
} }
} }
@@ -145,24 +176,73 @@ public class FileProcessingPipeline {
private void tryRenameToError(String zipFilename) { private void tryRenameToError(String zipFilename) {
try { try {
MDC.put("step", "sftp-rename"); MDC.put("step", "sftp-rename");
sftpService.renameRemote(zipFilename, zipFilename + ".error"); sftpService.renameFile(zipFilename, zipFilename + ".error");
Log.infof("SFTP Rename: '%s' → '%s.error'", zipFilename, zipFilename);
} catch (SftpException e) { } catch (SftpException e) {
Log.warnf(e, "Umbenennen zu .error fehlgeschlagen für '%s' — Datei bleibt auf SFTP zur manuellen Prüfung", Log.warnf(e, "Umbenennen zu .error fehlgeschlagen für '%s' — Datei bleibt auf SFTP zur manuellen Prüfung",
zipFilename); zipFilename);
} }
} }
private void cleanup(ProcessingContext context) { /**
MDC.put("step", "cleanup"); * Bereinigt verwaiste lokale Arbeitsdateien aus fehlgeschlagenen Vorläufen.
*
* <p>Wird einmal pro Pipeline-Lauf <em>vor</em> dem SFTP-Listing aufgerufen.
* Notwendig weil {@link #postProcessingCleanup} zwar im {@code finally}-Block läuft,
* aber bei I/O-Fehlern selbst fehlschlagen kann — in diesem Fall bleiben ZIP-Dateien
* und Entpack-Verzeichnisse im Arbeitsverzeichnis zurück. Ohne diesen Schritt würden
* sich diese Reste akkumulieren und das Arbeitsverzeichnis über Zeit vollschreiben.
*
* <p>Ein Zeitstempel-Schwellwert ist nicht nötig: der {@code AtomicBoolean}-Guard in
* {@link #tryProcessAllAsync} stellt sicher dass nie zwei Läufe gleichzeitig aktiv sind.
* Alles was beim Start eines Laufs im Arbeitsverzeichnis liegt, ist daher garantiert
* ein Überrest eines abgeschlossenen oder abgebrochenen Vorlaufs.
*/
private void preProcessingCleanup() {
MDC.put("step", "pre-cleanup");
Path workDir = Path.of(sftpConfig.localWorkDir());
if (!Files.exists(workDir)) {
return;
}
try (Stream<Path> entries = Files.list(workDir)) {
entries.forEach(path -> {
try {
if (Files.isDirectory(path)) {
deleteLocalDirectory(path);
Log.warnf("Verwaistes Entpack-Verzeichnis gelöscht: %s", path);
} else {
Files.delete(path);
Log.warnf("Verwaiste Datei gelöscht: %s", path);
}
} catch (IOException e) {
Log.warnf(e, "Pre-Cleanup fehlgeschlagen für: %s", path);
}
});
} catch (IOException e) {
Log.warnf(e, "Pre-Cleanup: Arbeitsverzeichnis konnte nicht gelesen werden: %s", workDir);
}
}
/**
* Bereinigt die lokalen Arbeitsdateien eines abgeschlossenen Laufs (ZIP + Entpack-Verzeichnis).
*
* <p>Wird im {@code finally}-Block von {@link #processZip} aufgerufen, also sowohl nach
* erfolgreichem Abschluss als auch nach Fehlern. Schlägt dieser Cleanup bei I/O-Problemen
* fehl, verbleiben die Dateien im Arbeitsverzeichnis — sie werden dann beim nächsten
* Pipeline-Lauf durch {@link #preProcessingCleanup} entfernt.
*
* @param context enthält die Pfade der zu löschenden lokalen ZIP und des Entpack-Verzeichnisses
*/
private void postProcessingCleanup(ProcessingContext context) {
MDC.put("step", "post-cleanup");
Log.infof("Cleanup gestartet für Lauf %s", context.runId);
try { try {
if (context.localZipPath != null) { if (context.localZipPath != null) {
Files.deleteIfExists(context.localZipPath); Files.deleteIfExists(context.localZipPath);
Log.debugf("Lokale ZIP gelöscht: %s", context.localZipPath); Log.infof("Lokale ZIP gelöscht: %s", context.localZipPath);
} }
if (context.localExtractDir != null) { if (context.localExtractDir != null) {
deleteLocalDirectory(context.localExtractDir); deleteLocalDirectory(context.localExtractDir);
Log.debugf("Lokales Entpack-Verzeichnis gelöscht: %s", context.localExtractDir); Log.infof("Lokales Entpack-Verzeichnis gelöscht: %s", context.localExtractDir);
} }
} catch (IOException e) { } catch (IOException e) {
Log.warnf(e, "Cleanup für Lauf %s fehlgeschlagen — lokale Dateien verbleiben ggf. in %s", Log.warnf(e, "Cleanup für Lauf %s fehlgeschlagen — lokale Dateien verbleiben ggf. in %s",

View File

@@ -1,5 +1,6 @@
package de.galabau.dateieingang.sftp; package de.galabau.dateieingang.sftp;
import de.galabau.dateieingang.config.SftpConfig;
import de.galabau.dateieingang.exception.SftpException; import de.galabau.dateieingang.exception.SftpException;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import jakarta.annotation.PostConstruct; import jakarta.annotation.PostConstruct;
@@ -8,7 +9,6 @@ import jakarta.inject.Inject;
import net.schmizz.sshj.SSHClient; import net.schmizz.sshj.SSHClient;
import net.schmizz.sshj.sftp.RemoteResourceInfo; import net.schmizz.sshj.sftp.RemoteResourceInfo;
import net.schmizz.sshj.sftp.SFTPClient; import net.schmizz.sshj.sftp.SFTPClient;
import net.schmizz.sshj.transport.verification.PromiscuousVerifier;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
@@ -26,6 +26,7 @@ public class SftpService {
void init() { void init() {
try { try {
Files.createDirectories(Path.of(config.localWorkDir())); Files.createDirectories(Path.of(config.localWorkDir()));
Log.infof("Lokales Arbeitsverzeichnis: %s", config.localWorkDir());
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException("Lokales Arbeitsverzeichnis konnte nicht erstellt werden: " throw new RuntimeException("Lokales Arbeitsverzeichnis konnte nicht erstellt werden: "
+ config.localWorkDir(), e); + config.localWorkDir(), e);
@@ -44,8 +45,10 @@ public class SftpService {
private <T> T withSftp(SftpOperation<T> operation) throws SftpException { private <T> T withSftp(SftpOperation<T> operation) throws SftpException {
try (SSHClient ssh = new SSHClient()) { try (SSHClient ssh = new SSHClient()) {
configureHostKeyVerification(ssh); configureHostKeyVerification(ssh);
Log.infof("Verbinde zu SFTP %s:%d", config.host(), config.port());
ssh.connect(config.host(), config.port()); ssh.connect(config.host(), config.port());
authenticate(ssh); authenticate(ssh);
Log.infof("SFTP-Verbindung hergestellt");
try (SFTPClient sftp = ssh.newSFTPClient()) { try (SFTPClient sftp = ssh.newSFTPClient()) {
return operation.execute(sftp); return operation.execute(sftp);
} }
@@ -55,19 +58,20 @@ public class SftpService {
} }
} }
private void configureHostKeyVerification(SSHClient ssh) { private void configureHostKeyVerification(SSHClient ssh) throws SftpException {
if (config.hostKeyFingerprint().isPresent()) { if (config.hostKeyFingerprint().isPresent()) {
ssh.addHostKeyVerifier(config.hostKeyFingerprint().get()); ssh.addHostKeyVerifier(config.hostKeyFingerprint().get());
} else { } else {
Log.warn("SFTP Host-Key-Fingerprint nicht konfiguriert — PromiscuousVerifier aktiv (nur Dev!)"); throw new SftpException("SFTP Host-Key-Fingerprint nicht konfiguriert — Verbindung abgelehnt");
ssh.addHostKeyVerifier(new PromiscuousVerifier());
} }
} }
private void authenticate(SSHClient ssh) throws IOException { private void authenticate(SSHClient ssh) throws IOException {
if (config.privateKeyPath().isPresent()) { if (config.privateKeyPath().isPresent()) {
Log.infof("SFTP-Authentifizierung via Public-Key für Benutzer '%s'", config.username());
ssh.authPublickey(config.username(), config.privateKeyPath().get()); ssh.authPublickey(config.username(), config.privateKeyPath().get());
} else { } else {
Log.infof("SFTP-Authentifizierung via Passwort für Benutzer '%s'", config.username());
ssh.authPassword(config.username(), config.password()); ssh.authPassword(config.username(), config.password());
} }
} }
@@ -79,6 +83,7 @@ public class SftpService {
* @throws SftpException bei Verbindungs- oder Lesefehler * @throws SftpException bei Verbindungs- oder Lesefehler
*/ */
public List<String> listZipFiles() throws SftpException { public List<String> listZipFiles() throws SftpException {
Log.infof("Lese SFTP-Verzeichnis '%s'", config.remotePath());
return withSftp(sftp -> return withSftp(sftp ->
sftp.ls(config.remotePath()).stream() sftp.ls(config.remotePath()).stream()
.filter(RemoteResourceInfo::isRegularFile) .filter(RemoteResourceInfo::isRegularFile)
@@ -97,6 +102,7 @@ public class SftpService {
*/ */
public Path download(String filename) throws SftpException { public Path download(String filename) throws SftpException {
Path localFile = Path.of(config.localWorkDir(), filename); Path localFile = Path.of(config.localWorkDir(), filename);
Log.infof("Starte Download: '%s'", filename);
withSftp(sftp -> { withSftp(sftp -> {
sftp.get(config.remotePath() + "/" + filename, localFile.toString()); sftp.get(config.remotePath() + "/" + filename, localFile.toString());
return null; return null;
@@ -112,7 +118,8 @@ public class SftpService {
* @param newFilename neuer Dateiname, z.B. {@code export_2026-04-08.zip.processed} * @param newFilename neuer Dateiname, z.B. {@code export_2026-04-08.zip.processed}
* @throws SftpException bei Verbindungs- oder Umbenennfehler * @throws SftpException bei Verbindungs- oder Umbenennfehler
*/ */
public void renameRemote(String filename, String newFilename) throws SftpException { public void renameFile(String filename, String newFilename) throws SftpException {
Log.infof("SFTP Rename: '%s' → '%s'", filename, newFilename);
withSftp(sftp -> { withSftp(sftp -> {
sftp.rename( sftp.rename(
config.remotePath() + "/" + filename, config.remotePath() + "/" + filename,
@@ -120,5 +127,6 @@ public class SftpService {
); );
return null; return null;
}); });
Log.infof("SFTP Rename erfolgreich: '%s'", newFilename);
} }
} }

View File

@@ -1,9 +1,12 @@
package de.galabau.dateieingang.zip; package de.galabau.dateieingang.zip;
import de.galabau.dateieingang.config.OciConfig;
import de.galabau.dateieingang.exception.ZipException; import de.galabau.dateieingang.exception.ZipException;
import de.galabau.dateieingang.model.FileEntry; import de.galabau.dateieingang.model.FileEntry;
import de.galabau.dateieingang.model.ProcessingContext; import de.galabau.dateieingang.model.ProcessingContext;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.compress.archivers.zip.ZipArchiveEntry; import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream; import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
@@ -19,6 +22,9 @@ import java.util.List;
@ApplicationScoped @ApplicationScoped
public class ZipExtractionService { public class ZipExtractionService {
@Inject
OciConfig ociConfig;
/** /**
* Entpackt die ZIP-Datei aus {@code context.localZipPath} in ein gleichnamiges Unterverzeichnis. * Entpackt die ZIP-Datei aus {@code context.localZipPath} in ein gleichnamiges Unterverzeichnis.
* Setzt {@code context.localExtractDir} und {@code context.extractedFiles}. * Setzt {@code context.localExtractDir} und {@code context.extractedFiles}.
@@ -35,6 +41,7 @@ public class ZipExtractionService {
try { try {
Files.createDirectories(extractDir); Files.createDirectories(extractDir);
Log.infof("Entpacke ZIP '%s'", context.zipFilename);
try (ZipArchiveInputStream zis = new ZipArchiveInputStream( try (ZipArchiveInputStream zis = new ZipArchiveInputStream(
new BufferedInputStream(Files.newInputStream(context.localZipPath)))) { new BufferedInputStream(Files.newInputStream(context.localZipPath)))) {
@@ -51,14 +58,20 @@ public class ZipExtractionService {
Files.copy(zis, targetFile, StandardCopyOption.REPLACE_EXISTING); Files.copy(zis, targetFile, StandardCopyOption.REPLACE_EXISTING);
boolean isMarker = Path.of(entryName).getFileName() boolean isMarker = Path.of(entryName).getFileName()
.toString().equals("_READY_FOR_DB_PROCESSING_"); .toString().equals(ociConfig.markerFilenameDbProcessing());
entries.add(new FileEntry(entryName, Files.size(targetFile), isMarker)); FileEntry fileEntry = new FileEntry(entryName, Files.size(targetFile), isMarker);
entries.add(fileEntry);
Log.infof("Extrahiert: '%s' (%d Bytes)", entryName, fileEntry.fileSize);
if (fileEntry.isMarker) {
Log.infof("Marker-Datei gefunden: '%s'", entryName);
}
} }
} }
} }
context.extractedFiles = entries; context.extractedFiles = entries;
Log.infof("Extraktion abgeschlossen: %d Datei(en) aus '%s'", entries.size(), context.zipFilename);
} catch (IOException e) { } catch (IOException e) {
throw new ZipException("ZIP '" + context.zipFilename + "' konnte nicht entpackt werden: " throw new ZipException("ZIP '" + context.zipFilename + "' konnte nicht entpackt werden: "

View File

@@ -18,27 +18,34 @@ galabau.sftp.local-work-dir=/tmp/sftp-work
# galabau.sftp.private-key-path=/etc/secrets/sftp-key # galabau.sftp.private-key-path=/etc/secrets/sftp-key
# galabau.sftp.private-key-passphrase=${SFTP_KEY_PASSPHRASE} # galabau.sftp.private-key-passphrase=${SFTP_KEY_PASSPHRASE}
# ===== OCI (Stub — noch nicht aktiv) ===== # ===== OCI Object Storage =====
# galabau.oci.namespace=${OCI_NAMESPACE} # Dateiname des DB-Processing-Markers, der nach dem Upload aller Nutzdateien in OCI abgelegt wird
# galabau.oci.region=${OCI_REGION} galabau.oci.marker-filename-db-processing=${OCI_MARKER_FILENAME_DB_PROCESSING:_READY_FOR_DB_PROCESSING_}
# galabau.oci.bucket=${OCI_BUCKET}
# galabau.oci.tenant-prefix=mandant_42/
# galabau.oci.incoming-prefix=eingang/
# galabau.oci.tenancy-id=${OCI_TENANCY_ID}
# galabau.oci.user-id=${OCI_USER_ID}
# galabau.oci.fingerprint=${OCI_FINGERPRINT}
# galabau.oci.private-key-path=${OCI_PRIVATE_KEY_PATH}
# ===== ORDS (Stub — noch nicht aktiv) ===== galabau.oci.namespace=${OCI_NAMESPACE}
# galabau.ords.base-url=${GALABAU_ORDS_BASE_URL:http://ords:8080} galabau.oci.region=${OCI_REGION}
# galabau.ords.process-incoming-path=/ords/.../auto_import/process_incoming galabau.oci.bucket=${OCI_BUCKET}
# galabau.ords.api-key=${GALABAU_ORDS_API_KEY} # Root-Prefix im Bucket, muss mit / enden
# quarkus.rest-client.ords-client.url=${galabau.ords.base-url} galabau.oci.tenant-prefix=${OCI_TENANT_PREFIX:testmandant-42/}
# Eingangs-Prefix unterhalb von tenant-prefix, muss mit / enden
galabau.oci.incoming-prefix=${OCI_INCOMING_FILES_PATH:BA/Eingang/Import/}
galabau.oci.tenancy-id=${OCI_TENANCY_ID}
galabau.oci.user-id=${OCI_USER_ID}
galabau.oci.fingerprint=${OCI_FINGERPRINT}
%prod.galabau.oci.private-key-path=${OCI_PRIVATE_KEY_PATH}
%dev.galabau.oci.private-key-path=${OCI_PRIVATE_KEY_PATH:oci-private-key.pem}
# ===== ORDS =====
# Base-URL bis einschließlich Modul-Pfad, z.B. https://apex.example.com/ords/myschema/auto_import
galabau.ords.base-url=${GALABAU_ORDS_BASE_URL}
galabau.ords.api-key=${GALABAU_ORDS_API_KEY}
# MicroProfile REST Client liest die URL aus dieser Property. Das hier ist eine einfache weiterleitung auf die env variable GALABAU_ORDS_BASE_URL (s.o.)
quarkus.rest-client.ords-client.url=${galabau.ords.base-url}
# ===== Observability ===== # ===== Observability =====
%prod.quarkus.otel.exporter.otlp.endpoint=${OTEL_EXPORTER_OTLP_ENDPOINT:http://localhost:4317} %prod.quarkus.otel.exporter.otlp.endpoint=${OTEL_EXPORTER_OTLP_ENDPOINT:http://localhost:4317}
%dev.quarkus.observability.lgtm.grafana-port=3000 #%dev.quarkus.observability.lgtm.grafana-port=3000
%dev.quarkus.observability.lgtm.otel-grpc-port=4317 #%dev.quarkus.observability.lgtm.otel-grpc-port=4317
%dev.quarkus.otel.logs.enabled=true quarkus.otel.logs.enabled=true
#%prod.quarkus.otel.logs.enabled=true #%prod.quarkus.otel.logs.enabled=true
%prod.quarkus.log.console.json=true #%prod.quarkus.log.console.json=true

View File

@@ -30,7 +30,8 @@ Details zur DB-Verarbeitung: `database/docs/plan_pck_net_storage.md`
│ im letzten Quarkus-Lauf fehlgeschlagen) │ │ im letzten Quarkus-Lauf fehlgeschlagen) │
│ │ │ │
│ 2. Dateieingang Service aufrufen (fire & forget) │ │ 2. Dateieingang Service aufrufen (fire & forget) │
│ HTTP POST /api/process-incoming (Header: X-Api-Key) │ │ HTTP POST /api/process-incoming-ba-korrespondenz |
| (Header: X-Api-Key) │
└────────────────────────────┬────────────────────────────────────┘ └────────────────────────────┬────────────────────────────────────┘
@@ -42,11 +43,13 @@ Details zur DB-Verarbeitung: `database/docs/plan_pck_net_storage.md`
│ 3c. Alle Dateien in OCI eingang/<zip-name>/ hochladen │ │ 3c. Alle Dateien in OCI eingang/<zip-name>/ hochladen │
│ (Unterordner aus der ZIP werden beibehalten) │ │ (Unterordner aus der ZIP werden beibehalten) │
│ → Fehler stoppt Verarbeitung dieser ZIP │ │ → Fehler stoppt Verarbeitung dieser ZIP │
│ 3d. Marker eingang/<zip-name>/_READY_FOR_DB_PROCESSING_ │ 3d. ZIP auf SFTP umbenennen zu .processed
hochladen → bei ungültiger ZIP: .error (manuelle Prüfung nötig)
3e. ZIP auf SFTP umbenennen (.processed oder .error) → bei Infrastrukturfehlern: keine Umbenennung, Retry
→ erst NACH erfolgreichem Marker-Upload 3e. Marker eingang/<zip-name>/_READY_FOR_DB_PROCESSING_
3f. ORDS-Endpunkt aufrufen (pck_auto_import.p_process_incoming_ba_data) hochladen — ERST NACH dem SFTP-Rename (siehe unten)
│ 3f. ORDS-Endpunkt aufrufen |
| (pck_auto_import.p_process_incoming_ba_data) │
│ 3g. Lokale Arbeitsdateien löschen │ │ 3g. Lokale Arbeitsdateien löschen │
└────────────────────────────┬────────────────────────────────────┘ └────────────────────────────┬────────────────────────────────────┘
@@ -95,30 +98,85 @@ Daran können die Sachbearbeiter erkennen, dass der Ordner nicht mehr automatisc
## Fehlerfall-Verhalten ## Fehlerfall-Verhalten
**Service: Upload einer Datei schlägt fehl** **Service: ZIP ist beschädigt oder ungültig**
- Verarbeitung dieser ZIP stoppt sofort - SFTP: ZIP → `.error` (manuelle Prüfung nötig)
- Kein Marker wird geschrieben, ZIP auf SFTP wird zu `.error` umbenannt - OCI: kein Upload, kein Marker
- ORDS wird nicht aufgerufen - DB: wird nicht aufgerufen
- Bereits hochgeladene Dateien werden beim nächsten Trigger überschrieben (OCI PUT idempotent)
**Service: ORDS-Aufruf schlägt fehl** **Service: SFTP-Download fehlgeschlagen**
- Marker liegt in `eingang/<zip-name>/`, Dateien sind vollständig hochgeladen - SFTP: ZIP bleibt unverändert, wird beim nächsten Stundenlauf erneut versucht
- Beim nächsten Stundenlauf findet APEX Automation den Marker und verarbeitet - OCI: kein Upload, kein Marker
- DB: wird nicht aufgerufen
**Service: OCI-Upload (Dateien) fehlgeschlagen**
- SFTP: ZIP bleibt unverändert, wird beim nächsten Stundenlauf erneut versucht
- OCI: teilweise hochgeladene Dateien bleiben liegen (kein Marker → DB ignoriert den Ordner); beim Retry werden sie überschrieben (OCI PUT ist idempotent)
- DB: wird nicht aufgerufen
**Service: SFTP-Rename zu `.processed` fehlgeschlagen**
- SFTP: ZIP bleibt unverändert, wird beim nächsten Stundenlauf erneut versucht
- OCI: Dateien hochgeladen, noch kein Marker (Marker kommt erst nach dem Rename)
- DB: wird nicht aufgerufen
- beim nächsten Stundenlauf werden die Dateien aber nicht importiert, da APEX Automation ohne Marker nichts findet
- d.h. erst nachdem die ZIP Datei erneut abgearbeitet und komplett in OCI hochgeladen wurde (diesmal mit .processed-Umbennung auf SFTP & Marker in OCI) werden die Dateien abgearbeitet
**Service: OCI-Marker-Upload fehlgeschlagen**
- SFTP: ZIP ist bereits `.processed` — Quarkus greift sie nie wieder auf
- OCI: Dateien vollständig hochgeladen, Marker fehlt → DB-Verarbeitung wird nicht ausgelöst
- DB wird die Dateien wegen dem fehlendem Marker nie automatisiert abarbeiten, aber man sieht das recht einfach über den OCI Dateibrowser in Apex
- DB: wird nicht aufgerufen
- **Manueller Fix:** Marker-Datei `eingang/<zip-name>/_READY_FOR_DB_PROCESSING_` in OCI von Hand anlegen (leere Datei) — APEX Automation verarbeitet den Batch dann beim nächsten Stundenlauf
**Service: ORDS-Aufruf fehlgeschlagen**
- SFTP: ZIP ist bereits `.processed` — Quarkus greift sie nie wieder auf
- OCI: Dateien + Marker vollständig hochgeladen
- DB: APEX Automation findet den Marker beim nächsten Stundenlauf und verarbeitet ihn (Schritt 1) — kein Doppelimport, da Quarkus die `.processed`-Datei nicht erneut verarbeitet
**DB: Verarbeitung einer einzelnen Datei schlägt fehl** **DB: Verarbeitung einer einzelnen Datei schlägt fehl**
- Rollback — Datei bleibt in `eingang/<zip-name>/` - OCI `eingang/`: Datei bleibt in `eingang/<zip-name>/` (Rollback)
- ERROR in `lg_app_log` mit `log_object_ref = eingang/<zip-name>/datei.csv` - OCI `zielordner/`: keine Änderung
- Nächste Dateien im Batch werden weiterverarbeitet - DB: Rollback, ERROR in `lg_app_log` mit `log_object_ref = eingang/<zip-name>/datei.csv`, nächste Dateien im Batch werden weiterverarbeitet
**DB: Batch-Abschluss (nach dem Datei-Loop)** **DB: Batch-Abschluss (nach dem Datei-Loop)**
- DB-Marker (`_READY_FOR_DB_PROCESSING_`) wird **immer gelöscht** — kein automatischer Retry - Alle Dateien erfolgreich: `eingang/<zip-name>/` ist leer, Marker wird gelöscht
- Liegen noch Dateien im Unterordner: SB-Marker (`_BITTE_PRÜFEN_`) wird angelegt → Sachbearbeiter müssen manuell eingreifen - Noch Dateien übrig: Marker wird gelöscht, SB-Marker (`_BITTE_PRÜFEN_`) wird angelegt → Sachbearbeiter müssen manuell eingreifen
- Alle Dateien erfolgreich: INFO in `lg_app_log`, Unterordner ist leer
**DB: p_move_object schlägt nach erfolgreichem Import fehl** **DB: p_move_object schlägt nach erfolgreichem Import fehl**
- Rollback des Imports → sauberer Ausgangszustand - OCI `eingang/`: Datei bleibt in `eingang/<zip-name>/` (Rollback des gesamten Imports)
- Datei bleibt in `eingang/<zip-name>/` - OCI `zielordner/`: keine Änderung
- DB-Marker wird trotzdem am Ende des Loops gelöscht; falls noch Dateien übrig → SB-Marker - DB: Marker wird am Ende des Loops trotzdem gelöscht; falls noch Dateien übrig → SB-Marker
---
## Design-Entscheidung: Marker wird nach dem SFTP-Rename gesetzt
Der OCI-Marker `_READY_FOR_DB_PROCESSING_` wird bewusst **nach** dem SFTP-Rename zu `.processed`
hochgeladen — nicht davor. Das erzeugt eine harte Invariante:
> **Marker in OCI vorhanden ↔ ZIP auf SFTP bereits `.processed`**
### Warum ist das wichtig?
APEX Automation ruft `p_process_incoming_ba_data` in jedem Stundenlauf einmal direkt auf
(Schritt 1, Fallback), und Quarkus ruft dieselbe Funktion via ORDS auf (Schritt 3f, schneller Pfad).
Ohne die Invariante könnte folgender Race entstehen:
1. Quarkus lädt Dateien + Marker hoch, schlägt dann beim SFTP-Rename fehl
2. APEX Schritt 1 findet den Marker → importiert Daten
3. Quarkus wiederholt den Lauf, ruft ORDS auf → zweiter Import derselben Daten
Mit der Invariante ist dieser Fall ausgeschlossen: APEX Schritt 1 findet nur dann einen Marker,
wenn die ZIP auf dem SFTP bereits `.processed` ist. Ist sie das, greift Quarkus sie im Retry
nicht mehr an — `listZipFiles()` gibt nur `.zip`-Dateien zurück.
### Einzig verbleibender manueller Fehlerfall
Schlägt der Marker-Upload fehl (nach erfolgreichem SFTP-Rename), ist der Zustand eindeutig
erkennbar: `.processed` auf SFTP, Dateien in OCI ohne Marker. Manueller Fix: Marker-Datei
in OCI von Hand anlegen. Dieser Fall erfordert keine DB-seitige Idempotenz, da Quarkus
die Datei nicht erneut verarbeitet und ORDS nicht aufruft.
--- ---