Compare commits
10 Commits
25e854d427
...
b159bdd351
| Author | SHA1 | Date | |
|---|---|---|---|
| b159bdd351 | |||
| d36b346f98 | |||
| 1c303f1376 | |||
| aa0ed5d763 | |||
| e7fb09069c | |||
| f7a9113a57 | |||
| 9a445288f8 | |||
| cbcc6922a4 | |||
| 599912ef94 | |||
| 8f7fd949f4 |
@@ -7,7 +7,12 @@
|
||||
"Bash(sed -n '465,478p' \"C:\\\\src\\\\Galabau\\\\glb-spielwiese\\\\database\\\\packages\\\\pck_net_storage.pkb\")",
|
||||
"Bash(sed -n '523,535p' \"C:\\\\src\\\\Galabau\\\\glb-spielwiese\\\\database\\\\packages\\\\pck_net_storage.pkb\")",
|
||||
"Bash(sed -n '582,600p' \"C:\\\\src\\\\Galabau\\\\glb-spielwiese\\\\database\\\\packages\\\\pck_net_storage.pkb\")",
|
||||
"WebFetch(domain:docs.public.oneportal.content.oci.oraclecloud.com)"
|
||||
"WebFetch(domain:docs.public.oneportal.content.oci.oraclecloud.com)",
|
||||
"Bash(./mvnw compile *)",
|
||||
"WebFetch(domain:medium.com)",
|
||||
"WebFetch(domain:quarkus.io)",
|
||||
"WebFetch(domain:github.com)",
|
||||
"WebFetch(domain:walidhajeri.hashnode.dev)"
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
@@ -61,9 +61,11 @@ Alle zur Laufzeit via `pck_system.f_get_par_wert_by_programmid`:
|
||||
| `NETSTORE_MARKER_SB` | Name der Marker-Datei im Object Store, der von DB beim Verarbeiten angelegt wird, wenn eine oder mehrere Dateien eines ZIPs nicht automatisiert importiert werden konnten. Das soll signalisieren, dass ein Sachbearbeiter die Dateien in diesem Unterordner manuell prüfen und importieren muss. z.B.: `_BITTE_PRÜFEN_` |
|
||||
| `NETSTORE_BA_PREFIX` | Pfad in Object Storage, wo BA-Daten liegen. Muss mit einem `/` enden, z.B. `BA/Eingang/` |
|
||||
| `NETSTORE_BA_IMPORT` | Name des Unterordners von NETSTORE_BA_PREFIX im Object Storage, wo entpackte Dateien, die noch importiert werden müssen, zwischengespeichert werden. |
|
||||
| `NETSTORE_BA_ARCHIV` | Der Basis-Name des Unterordners von NETSTORE_BA_PREFIX im Object Storage, wo verarbeitete BA-Imports abgelegt werden. Der Name darf nicht mit / enden oder beginnen, z.B. "Verarbeitet". Beim Import wird an diesen Namen immer die aktuelle Jahreszahl angehangen, sodass der finale Ordner z.B. "Verarbeitet 2026" heißt. |
|
||||
| `BA_IMPORT_SB_MIT_ID` | Mitarbeiter-ID für Import von BA Daten (z.B. Korrespondenzen). Diese Mitarbeiter-ID bekommt eine Wiedervorlage, für jede Datei, die nicht automatisch importiert werden konnte. |
|
||||
| `AUTOMATON_BASE_URL` | Base-URL des Quarkus Dateieingang Service, z.B. `http://dateieingang:8080` |
|
||||
| `AUTOMATON_API_KEY` | API-Key für den Quarkus Dateieingang Service (Header `X-Api-Key`) |
|
||||
| `NETSTORE_ORDS_APIKEY` | API-Key, den der Quarkus Server nutzt, um den ORDS aufzurufen |
|
||||
|
||||
---
|
||||
|
||||
|
||||
@@ -22,13 +22,14 @@ create or replace package body pck_auto_import as
|
||||
l_response clob;
|
||||
l_http_status number;
|
||||
l_log_action constant varchar2(512 char) := 'BA_KORRESPONDENZEN_DATEIEINGANG_AUTOMATION';
|
||||
l_automaton_endpoint constant varchar2(256 char) := 'api/process-incoming-ba-korrespondenz';
|
||||
begin
|
||||
-- Schritt 1: Offene Batches in OCI verarbeiten
|
||||
p_process_incoming_ba_data;
|
||||
|
||||
-- Schritt 2: Quarkus anstoßen — Fehler werden geloggt, nicht eskaliert
|
||||
begin
|
||||
l_service_url := pck_system.f_get_par_wert_by_programmid('AUTOMATON_BASE_URL') || '/api/process-incoming';
|
||||
l_service_url := pck_system.f_get_par_wert_by_programmid('AUTOMATON_BASE_URL') || l_automaton_endpoint;
|
||||
l_api_key := pck_system.f_get_par_wert_by_programmid('AUTOMATON_API_KEY');
|
||||
|
||||
apex_web_service.g_request_headers.delete;
|
||||
@@ -179,7 +180,7 @@ create or replace package body pck_auto_import as
|
||||
l_log_action varchar2(512 char) := 'IMPORT_BA_DATA';
|
||||
begin
|
||||
-- Zielordner Name zusammenstellen
|
||||
l_target_prefix := pck_system.f_get_par_wert_by_programmid('NETSTORE_BA_PREFIX') || 'Verarbeitet ' || to_char(sysdate, 'YYYY') || '/';
|
||||
l_target_prefix := pck_system.f_get_par_wert_by_programmid('NETSTORE_BA_PREFIX') || pck_system.f_get_par_wert_by_programmid('NETSTORE_BA_ARCHIV') || ' ' || to_char(sysdate, 'YYYY') || '/';
|
||||
l_eingang_prefix := pck_system.f_get_par_wert_by_programmid('NETSTORE_BA_PREFIX') || pck_system.f_get_par_wert_by_programmid('NETSTORE_BA_IMPORT');
|
||||
|
||||
-- Unterordner in eingangs-ordner auflisten (es gibt einen Ordner für jeden entpackte ZIP-Datei)
|
||||
|
||||
1
quarkus-automaton/.gitignore
vendored
1
quarkus-automaton/.gitignore
vendored
@@ -1,2 +1,3 @@
|
||||
.env
|
||||
target
|
||||
*private-key.pem
|
||||
2
quarkus-automaton/.vscode/settings.json
vendored
2
quarkus-automaton/.vscode/settings.json
vendored
@@ -1,3 +1,3 @@
|
||||
{
|
||||
"java.configuration.updateBuildConfiguration": "interactive"
|
||||
"java.configuration.updateBuildConfiguration": "disabled"
|
||||
}
|
||||
@@ -110,7 +110,7 @@ quarkus-automaton/
|
||||
| `sftp-download` | `SftpService` | SSHJ | Lädt ZIP in lokales Arbeitsverzeichnis |
|
||||
| `zip-extract` | `ZipExtractionService` | Apache Commons Compress | Entpackt ZIP, preserviert Ordnerstruktur |
|
||||
| `oci-upload` | `OciUploadService` | OCI SDK | Lädt Dateien + Marker zu OCI Object Storage |
|
||||
| `sftp-rename` | `SftpService` | SSHJ | Remote-Rename zu `.processed` oder `.error` |
|
||||
| `sftp-rename` | `SftpService` | SSHJ | Remote-Rename zu `.processed` (bei Erfolg) oder `.error` (nur bei ungültiger ZIP) |
|
||||
| `ords-notify` | `OrdsNotificationService` | MicroProfile REST Client | Ruft ORDS-Endpunkt auf |
|
||||
| `cleanup` | `FileProcessingPipeline` | pure Java | Löscht lokale Arbeitsdateien (ZIP + entpackte Dateien) |
|
||||
|
||||
@@ -244,14 +244,14 @@ n8n fire-and-forget-Verhalten.
|
||||
|
||||
### Fehlerklassen
|
||||
|
||||
| Fehler | Typ | Retry | Verhalten |
|
||||
| Fehler | Typ | Umbenennung | Verhalten |
|
||||
|---|---|---|---|
|
||||
| SFTP-Verbindung fehlgeschlagen | transient | nein | Nächster APEX-Lauf (1h) versucht es |
|
||||
| ZIP beschädigt | persistent | nein | ZIP auf SFTP umbenennen zu `.error`, Log |
|
||||
| OCI-Verbindung fehlgeschlagen (z.B. 503) | transient | ja (exponential backoff) | @Retry |
|
||||
| OCI-Upload einer Datei schlägt fehl | persistent | nein | SFTP-Rename zu `.error`, Log — bereits hochgeladene OCI-Dateien bleiben (idempotent) |
|
||||
| ORDS-Aufruf schlägt fehl | transient | ja (2-3x) | Marker liegt vor → APEX Automation schlägt beim nächsten Lauf ein |
|
||||
| Allgemein technischer Fehler | fallabhängig | siehe SmallRye Fault Tolerance | Exception-Log |
|
||||
| SFTP-Verbindung / Download fehlgeschlagen | transient | keine | Datei bleibt auf SFTP — nächster APEX-Lauf (1h) versucht es |
|
||||
| ZIP beschädigt / ungültig | persistent | → `.error` | Datei ist defekt, manuelle Prüfung nötig |
|
||||
| OCI-Verbindung fehlgeschlagen | transient | keine | Datei bleibt auf SFTP — nächster Lauf versucht erneut (OCI PUT idempotent) |
|
||||
| SFTP-Rename zu `.processed` fehlgeschlagen | transient | keine | ORDS wurde noch nicht aufgerufen (kommt danach) — kein Doppelimport; nächster Lauf wiederholt den Schritt |
|
||||
| ORDS-Aufruf schlägt fehl | transient | keine (`.processed` bereits gesetzt) | Marker liegt in OCI vor — APEX Automation findet ihn beim nächsten Lauf |
|
||||
| Unerwarteter Laufzeitfehler | fallabhängig | keine | Exception wird geloggt, Datei bleibt auf SFTP |
|
||||
|
||||
### Retry-Strategie (SmallRye Fault Tolerance)
|
||||
|
||||
@@ -311,15 +311,21 @@ Pipeline.processAll():
|
||||
2. für jede ZIP:
|
||||
a. SftpService.download(zip) → lokale Datei
|
||||
b. ZipExtractionService.extract() → ProcessingContext mit FileEntry-Liste
|
||||
c. OciUploadService.upload() → Dateien + Marker in OCI
|
||||
d. SftpService.renameRemote(.processed oder .error)
|
||||
e. OrdsNotificationService.notify()
|
||||
↳ ZipException → Rename zu .error, Abbruch
|
||||
c. OciUploadService.uploadFiles() → Dateien in OCI (noch kein Marker)
|
||||
d. SftpService.renameRemote(.processed)
|
||||
e. OciUploadService.uploadMarker() → Marker in OCI (erst nach Rename — siehe Invariante)
|
||||
f. OrdsNotificationService.notify()
|
||||
f. cleanup: lokale ZIP + Entpack-Verzeichnis löschen ← immer, auch bei Fehler
|
||||
```
|
||||
|
||||
**Cleanup (Schritt f) läuft immer** — in einem `finally`-Block — damit kein Disk-Vollaufen
|
||||
bei Fehlern oder großen ZIPs.
|
||||
|
||||
**Umbenennung zu `.error`** erfolgt ausschließlich bei `ZipException` (defekte/ungültige Datei).
|
||||
Bei Infrastrukturfehlern (SFTP, OCI, ORDS) bleibt die Datei unverändert auf dem SFTP und wird
|
||||
beim nächsten Lauf automatisch erneut verarbeitet.
|
||||
|
||||
---
|
||||
|
||||
## OCI-Authentifizierung (SimpleAuthenticationDetailsProvider)
|
||||
@@ -343,11 +349,13 @@ public class OciUploadService {
|
||||
.tenantId(config.tenancyId())
|
||||
.userId(config.userId())
|
||||
.fingerprint(config.fingerprint())
|
||||
.region(Region.fromRegionId(config.region()))
|
||||
.privateKeySupplier(new FilePrivateKeySupplier(config.privateKeyPath()))
|
||||
.privateKeySupplier(() -> Files.newInputStream(Path.of(config.privateKeyPath())))
|
||||
.build();
|
||||
|
||||
this.client = ObjectStorageClient.builder().build(auth);
|
||||
// Endpoint explizit setzen — verhindert blockierenden HTTP-Discovery-Call im SDK
|
||||
client = ObjectStorageClient.builder()
|
||||
.endpoint("https://objectstorage." + config.region() + ".oraclecloud.com")
|
||||
.build(auth);
|
||||
}
|
||||
}
|
||||
```
|
||||
@@ -452,6 +460,12 @@ public class ProcessIncomingRequest {
|
||||
<artifactId>oci-java-sdk-objectstorage</artifactId>
|
||||
<version>3.44.0</version>
|
||||
</dependency>
|
||||
<!-- HTTP-Provider für OCI SDK (jersey3 = Jakarta EE 9+, kompatibel mit Quarkus) -->
|
||||
<dependency>
|
||||
<groupId>com.oracle.oci.sdk</groupId>
|
||||
<artifactId>oci-java-sdk-common-httpclient-jersey3</artifactId>
|
||||
<version>3.44.0</version>
|
||||
</dependency>
|
||||
|
||||
<!-- ZIP -->
|
||||
<dependency>
|
||||
|
||||
@@ -75,11 +75,11 @@
|
||||
<version>0.38.0</version>
|
||||
</dependency>
|
||||
|
||||
<!-- OCI Object Storage SDK -->
|
||||
<!-- Aktuelle Version: https://mvnrepository.com/artifact/com.oracle.oci.sdk/oci-java-sdk-objectstorage -->
|
||||
<!-- OCI Object Storage SDK — Shaded Full JAR: Jersey und alle internen Abhängigkeiten sind unter
|
||||
shaded.com.oracle.oci.javasdk.* relokiert, sodass Quarkus RESTEasy die OCI-Provider nicht scannt -->
|
||||
<dependency>
|
||||
<groupId>com.oracle.oci.sdk</groupId>
|
||||
<artifactId>oci-java-sdk-objectstorage</artifactId>
|
||||
<artifactId>oci-java-sdk-shaded-full</artifactId>
|
||||
<version>3.44.0</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ import java.util.Map;
|
||||
* REST-Endpunkt für den Dateieingang-Trigger.
|
||||
* Wird von der APEX Automation stündlich per HTTP POST aufgerufen (fire & forget).
|
||||
*/
|
||||
@Path("/api/process-incoming")
|
||||
@Path("/api/process-incoming-ba-korrespondenz")
|
||||
@ApplicationScoped
|
||||
public class FileProcessingResource {
|
||||
|
||||
@@ -38,11 +38,15 @@ public class FileProcessingResource {
|
||||
@POST
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
public Response triggerProcessing(@HeaderParam("X-Api-Key") String apiKey) {
|
||||
//Log.infof("API-key correct: %s", config.api().key());
|
||||
//Log.infof("API-key received: %s", apiKey);
|
||||
|
||||
if (apiKey == null || !config.api().key().equals(apiKey)) {
|
||||
Log.warn("Trigger abgelehnt — ungültiger oder fehlender API-Key");
|
||||
return Response.status(Response.Status.UNAUTHORIZED).build();
|
||||
}
|
||||
|
||||
Log.info("API-Key valide, Pipeline-Trigger wird verarbeitet");
|
||||
boolean started = pipeline.tryProcessAllAsync();
|
||||
|
||||
if (!started) {
|
||||
|
||||
@@ -0,0 +1,52 @@
|
||||
package de.galabau.dateieingang.config;
|
||||
|
||||
import io.smallrye.config.ConfigMapping;
|
||||
|
||||
/** OCI Object Storage Konfiguration. Credentials kommen ausschließlich aus Umgebungsvariablen. */
|
||||
@ConfigMapping(prefix = "galabau.oci")
|
||||
public interface OciConfig {
|
||||
|
||||
/** OCI Object Storage Namespace, z.B. {@code frhqaxi5sgcg}. */
|
||||
String namespace();
|
||||
|
||||
/** OCI Region, z.B. {@code eu-frankfurt-1}. */
|
||||
String region();
|
||||
|
||||
/** OCI Bucket-Name. */
|
||||
String bucket();
|
||||
|
||||
/**
|
||||
* Root-Prefix für alle Objekte im Bucket, z.B. {@code mandant_42/}.
|
||||
* Muss mit {@code /} enden.
|
||||
*/
|
||||
String tenantPrefix();
|
||||
|
||||
/**
|
||||
* Prefix für eingehende Dateien unterhalb von {@code tenantPrefix},
|
||||
* z.B. {@code eingang/}. Muss mit {@code /} enden.
|
||||
*/
|
||||
String incomingPrefix();
|
||||
|
||||
/** OCI Tenancy OCID. Aus Env-Var {@code OCI_TENANCY_ID}. */
|
||||
String tenancyId();
|
||||
|
||||
/** OCI User OCID. Aus Env-Var {@code OCI_USER_ID}. */
|
||||
String userId();
|
||||
|
||||
/** API Key Fingerprint, z.B. {@code aa:bb:cc:...}. Aus Env-Var {@code OCI_FINGERPRINT}. */
|
||||
String fingerprint();
|
||||
|
||||
/**
|
||||
* Dateisystempfad zur PEM-Datei des OCI API Keys.
|
||||
* Produktion: absoluter Pfad zum Kubernetes Secret Volume Mount.
|
||||
* Dev: relativer Pfad zum Projektverzeichnis (Default: {@code oci-private-key.pem}).
|
||||
*/
|
||||
String privateKeyPath();
|
||||
|
||||
/**
|
||||
* Dateiname des DB-Processing-Markers, der nach dem Upload aller Nutzdateien in OCI abgelegt wird.
|
||||
* Default: {@code _READY_FOR_DB_PROCESSING_}.
|
||||
* Muss mit der APEX Automation und dem ORDS-Package abgestimmt sein.
|
||||
*/
|
||||
String markerFilenameDbProcessing();
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
package de.galabau.dateieingang.config;
|
||||
|
||||
import io.smallrye.config.ConfigMapping;
|
||||
|
||||
/** ORDS-Konfiguration für den Dateieingang-Endpunkt. */
|
||||
@ConfigMapping(prefix = "galabau.ords")
|
||||
public interface OrdsConfig {
|
||||
|
||||
/**
|
||||
* Base-URL des ORDS-Moduls bis einschließlich Modul-Pfad,
|
||||
* z.B. {@code https://apex.example.com/ords/myschema/auto_import}.
|
||||
* Wird direkt als {@code quarkus.rest-client.ords-client.url} verwendet.
|
||||
*/
|
||||
String baseUrl();
|
||||
|
||||
/**
|
||||
* API-Key für den ORDS-Endpunkt (Header: {@code X-Api-Key}).
|
||||
* Aus Env-Var {@code GALABAU_ORDS_API_KEY}.
|
||||
*/
|
||||
String apiKey();
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
package de.galabau.dateieingang.sftp;
|
||||
package de.galabau.dateieingang.config;
|
||||
|
||||
import io.smallrye.config.ConfigMapping;
|
||||
|
||||
@@ -1,32 +1,143 @@
|
||||
package de.galabau.dateieingang.oci;
|
||||
|
||||
import com.oracle.bmc.Region;
|
||||
import com.oracle.bmc.auth.SimpleAuthenticationDetailsProvider;
|
||||
import com.oracle.bmc.objectstorage.ObjectStorage;
|
||||
import com.oracle.bmc.objectstorage.ObjectStorageClient;
|
||||
import com.oracle.bmc.objectstorage.requests.PutObjectRequest;
|
||||
import de.galabau.dateieingang.config.OciConfig;
|
||||
import de.galabau.dateieingang.exception.OciException;
|
||||
import de.galabau.dateieingang.model.FileEntry;
|
||||
import de.galabau.dateieingang.model.ProcessingContext;
|
||||
import de.galabau.dateieingang.model.ProcessingStatus;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.Startup;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Lädt die entpackten Dateien und den Marker in OCI Object Storage hoch.
|
||||
*
|
||||
* <p><b>Stub:</b> OCI-Upload ist noch nicht implementiert.
|
||||
* Der Upload wird übersprungen und der Status auf {@link ProcessingStatus#MARKER_UPLOADED} gesetzt,
|
||||
* damit der Rest der Pipeline (SFTP-Rename, ORDS-Notify) getestet werden kann.
|
||||
* Authentifizierung via OCI HTTP Signature V1 (entspricht APEX Web Credential vom Typ OCI).
|
||||
*/
|
||||
//@Startup
|
||||
@ApplicationScoped
|
||||
public class OciUploadService {
|
||||
|
||||
@Inject
|
||||
OciConfig config;
|
||||
|
||||
private ObjectStorage client;
|
||||
|
||||
@PostConstruct
|
||||
void init() {
|
||||
Log.info("Initialisiere OCI ObjectStorage-Client...");
|
||||
try {
|
||||
SimpleAuthenticationDetailsProvider auth = SimpleAuthenticationDetailsProvider.builder()
|
||||
.tenantId(config.tenancyId())
|
||||
.userId(config.userId())
|
||||
.fingerprint(config.fingerprint())
|
||||
.region(Region.fromRegionId(config.region()))
|
||||
.privateKeySupplier(() -> {
|
||||
try {
|
||||
return Files.newInputStream(Path.of(config.privateKeyPath()));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("OCI Private Key nicht lesbar: "
|
||||
+ config.privateKeyPath(), e);
|
||||
}
|
||||
})
|
||||
.build();
|
||||
Log.info("Authentifizierung...");
|
||||
client = ObjectStorageClient.builder()
|
||||
.build(auth);
|
||||
Log.infof("OCI ObjectStorage-Client initialisiert (Region: %s, Bucket: %s)", config.region(), config.bucket());
|
||||
} catch (Throwable e) {
|
||||
Log.errorf(e, "OCI ObjectStorage-Client Initialisierung fehlgeschlagen");
|
||||
throw new RuntimeException("OCI-Client konnte nicht initialisiert werden", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Lädt alle Dateien aus {@code context.extractedFiles} sowie den Marker in OCI hoch.
|
||||
* Lädt alle Nutzdateien aus {@code context.extractedFiles} in OCI hoch — ohne Marker.
|
||||
* Der Marker wird erst nach dem SFTP-Rename zu {@code .processed} gesetzt (siehe
|
||||
* {@link #uploadMarker}), damit APEX Automation den Batch nie verarbeitet bevor die
|
||||
* ZIP-Datei auf dem SFTP als verarbeitet markiert ist.
|
||||
*
|
||||
* @param context enthält die Liste der hochzuladenden Dateien und den Ziel-Prefix
|
||||
* @throws OciException bei persistenten OCI-Fehlern (4xx) nach Retry-Erschöpfung
|
||||
* @throws OciException bei Verbindungs- oder Upload-Fehlern
|
||||
*/
|
||||
public void upload(ProcessingContext context) throws OciException {
|
||||
// TODO: OCI-Upload implementieren (OCI SDK, SimpleAuthenticationDetailsProvider)
|
||||
Log.infof("[STUB] OCI-Upload übersprungen für '%s' (%d Dateien) — wird später implementiert",
|
||||
context.zipNameWithoutExt, context.extractedFiles.size());
|
||||
public void uploadFiles(ProcessingContext context) throws OciException {
|
||||
List<FileEntry> files = context.extractedFiles.stream()
|
||||
.filter(e -> !e.isMarker)
|
||||
.toList();
|
||||
|
||||
Log.infof("OCI-Upload: %d Datei(en) für '%s'", files.size(), context.zipNameWithoutExt);
|
||||
|
||||
for (FileEntry entry : files) {
|
||||
String key = buildKey(context.zipNameWithoutExt, entry.relativePath);
|
||||
entry.ociKey = key;
|
||||
putFile(key, context.localExtractDir.resolve(entry.relativePath), entry.fileSize);
|
||||
Log.infof("Datei hochgeladen: %s (%d Bytes)", key, entry.fileSize);
|
||||
}
|
||||
|
||||
Log.infof("OCI-Upload Dateien abgeschlossen: %d Datei(en) in '%s'",
|
||||
files.size(), buildPrefix(context.zipNameWithoutExt));
|
||||
}
|
||||
|
||||
/**
|
||||
* Setzt den Marker in OCI — signalisiert der DB-Verarbeitung, dass der Batch vollständig ist.
|
||||
* Wird erst nach dem SFTP-Rename zu {@code .processed} aufgerufen, damit Marker und
|
||||
* SFTP-Zustand immer konsistent sind: Marker vorhanden ↔ ZIP bereits als verarbeitet markiert.
|
||||
*
|
||||
* @param context enthält den Ziel-Prefix für den Marker-Key
|
||||
* @throws OciException bei Verbindungs- oder Upload-Fehlern
|
||||
*/
|
||||
public void uploadMarker(ProcessingContext context) throws OciException {
|
||||
String markerKey = buildKey(context.zipNameWithoutExt, config.markerFilenameDbProcessing());
|
||||
Log.infof("Lade Marker hoch: '%s'", markerKey);
|
||||
putMarker(markerKey);
|
||||
context.markerUploaded = true;
|
||||
context.status = ProcessingStatus.MARKER_UPLOADED;
|
||||
Log.infof("Marker hochgeladen: '%s'", markerKey);
|
||||
}
|
||||
|
||||
private String buildPrefix(String zipNameWithoutExt) {
|
||||
return config.tenantPrefix() + config.incomingPrefix() + zipNameWithoutExt + "/";
|
||||
}
|
||||
|
||||
private String buildKey(String zipNameWithoutExt, String relativePath) {
|
||||
return buildPrefix(zipNameWithoutExt) + relativePath;
|
||||
}
|
||||
|
||||
private void putFile(String key, Path localFile, long fileSize) throws OciException {
|
||||
try (InputStream is = Files.newInputStream(localFile)) {
|
||||
client.putObject(PutObjectRequest.builder()
|
||||
.namespaceName(config.namespace())
|
||||
.bucketName(config.bucket())
|
||||
.objectName(key)
|
||||
.putObjectBody(is)
|
||||
.contentLength(fileSize)
|
||||
.build());
|
||||
} catch (Exception e) {
|
||||
throw new OciException("OCI-Upload fehlgeschlagen für '" + key + "'", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void putMarker(String key) throws OciException {
|
||||
try (InputStream is = InputStream.nullInputStream()) {
|
||||
client.putObject(PutObjectRequest.builder()
|
||||
.namespaceName(config.namespace())
|
||||
.bucketName(config.bucket())
|
||||
.objectName(key)
|
||||
.putObjectBody(is)
|
||||
.contentLength(0L)
|
||||
.build());
|
||||
} catch (Exception e) {
|
||||
throw new OciException("OCI-Upload Marker fehlgeschlagen für '" + key + "'", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,30 @@
|
||||
package de.galabau.dateieingang.ords;
|
||||
|
||||
import jakarta.ws.rs.HeaderParam;
|
||||
import jakarta.ws.rs.POST;
|
||||
import jakarta.ws.rs.Path;
|
||||
import jakarta.ws.rs.core.Response;
|
||||
import org.eclipse.microprofile.rest.client.annotation.RegisterProvider;
|
||||
import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;
|
||||
|
||||
/**
|
||||
* MicroProfile REST Client für den ORDS-Endpunkt des Dateieingang-Service.
|
||||
* Base-URL wird über {@code quarkus.rest-client.ords-client.url} konfiguriert
|
||||
* und zeigt auf den ORDS-Modul-Pfad (z.B. {@code .../ords/myschema/auto_import}).
|
||||
* Weitere Endpunkte zukünftiger Pipelines werden hier als neue Methoden ergänzt.
|
||||
*/
|
||||
@RegisterRestClient(configKey = "ords-client")
|
||||
@RegisterProvider(OrdsLoggingFilter.class)
|
||||
public interface OrdsClient {
|
||||
|
||||
/**
|
||||
* Ruft {@code pck_auto_import.p_process_incoming_ba_data} über ORDS auf.
|
||||
* Die Prozedur verarbeitet alle offenen OCI-Batches (Unterordner mit Marker).
|
||||
*
|
||||
* @param apiKey API-Key aus {@code galabau.ords.api-key} (Header: {@code X-Api-Key})
|
||||
* @return ORDS-Response (erwartet: 2xx)
|
||||
*/
|
||||
@POST
|
||||
@Path("/process_incoming_ba_data")
|
||||
Response processIncomingBaData(@HeaderParam("X-Api-Key") String apiKey);
|
||||
}
|
||||
@@ -0,0 +1,54 @@
|
||||
package de.galabau.dateieingang.ords;
|
||||
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.ws.rs.client.ClientRequestContext;
|
||||
import jakarta.ws.rs.client.ClientRequestFilter;
|
||||
import jakarta.ws.rs.client.ClientResponseContext;
|
||||
import jakarta.ws.rs.client.ClientResponseFilter;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/** Loggt ORDS-Requests und -Responses im selben Format wie der DefaultClientLogger, maskiert jedoch den X-Api-Key-Header. */
|
||||
public class OrdsLoggingFilter implements ClientRequestFilter, ClientResponseFilter {
|
||||
|
||||
@Override
|
||||
public void filter(ClientRequestContext ctx) {
|
||||
String headers = ctx.getHeaders().entrySet().stream()
|
||||
.map(e -> e.getKey() + "=" + (
|
||||
"X-Api-Key".equals(e.getKey())
|
||||
? mask(String.valueOf(e.getValue().getFirst()))
|
||||
: e.getValue().getFirst()
|
||||
))
|
||||
.collect(Collectors.joining(" "));
|
||||
|
||||
String body = ctx.hasEntity() ? "<body>" : "Empty body";
|
||||
|
||||
Log.infof("Request: %s %s Headers[%s], %s", ctx.getMethod(), ctx.getUri(), headers, body);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void filter(ClientRequestContext req, ClientResponseContext res) throws IOException {
|
||||
String headers = res.getHeaders().entrySet().stream()
|
||||
.map(e -> e.getKey() + "=" + e.getValue().getFirst())
|
||||
.collect(Collectors.joining(" "));
|
||||
|
||||
String body = "";
|
||||
if (res.hasEntity()) {
|
||||
byte[] bytes = res.getEntityStream().readAllBytes();
|
||||
body = new String(bytes, StandardCharsets.UTF_8);
|
||||
res.setEntityStream(new ByteArrayInputStream(bytes));
|
||||
}
|
||||
|
||||
Log.infof("Response: %s %s, Status[%d %s], Headers[%s], Body:\n%s",
|
||||
req.getMethod(), req.getUri(),
|
||||
res.getStatus(), res.getStatusInfo().getReasonPhrase(),
|
||||
headers, body);
|
||||
}
|
||||
|
||||
private static String mask(String value) {
|
||||
return value.substring(0, Math.min(4, value.length())) + "***";
|
||||
}
|
||||
}
|
||||
@@ -1,30 +1,62 @@
|
||||
package de.galabau.dateieingang.ords;
|
||||
|
||||
import de.galabau.dateieingang.config.OrdsConfig;
|
||||
import de.galabau.dateieingang.exception.OrdsException;
|
||||
import de.galabau.dateieingang.model.ProcessingContext;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.ws.rs.core.Response;
|
||||
import org.eclipse.microprofile.faulttolerance.Retry;
|
||||
import org.eclipse.microprofile.faulttolerance.Timeout;
|
||||
import org.eclipse.microprofile.rest.client.inject.RestClient;
|
||||
|
||||
import java.time.temporal.ChronoUnit;
|
||||
|
||||
/**
|
||||
* Benachrichtigt den ORDS-Endpunkt {@code pck_auto_import.p_process_incoming_files},
|
||||
* Benachrichtigt den ORDS-Endpunkt {@code pck_auto_import.p_process_incoming_ba_data},
|
||||
* damit die DB-Verarbeitung sofort angestoßen wird.
|
||||
*
|
||||
* <p><b>Stub:</b> ORDS-Aufruf ist noch nicht implementiert.
|
||||
* Bei einem Ausfall wäre die Verarbeitung ohnehin durch die APEX Automation abgesichert
|
||||
* (diese findet den Marker beim nächsten Stundenlauf).
|
||||
* <p>Bei Ausfall ist die Verarbeitung durch die APEX Automation abgesichert:
|
||||
* Sie findet den Marker beim nächsten Stundenlauf und ruft die Prozedur selbst auf.
|
||||
*/
|
||||
@ApplicationScoped
|
||||
public class OrdsNotificationService {
|
||||
|
||||
@Inject
|
||||
@RestClient
|
||||
OrdsClient ordsClient;
|
||||
|
||||
@Inject
|
||||
OrdsConfig config;
|
||||
|
||||
/**
|
||||
* Sendet eine Benachrichtigung an den ORDS-Endpunkt.
|
||||
* Löst die DB-Verarbeitung via ORDS aus ({@code pck_auto_import.p_process_incoming_ba_data}).
|
||||
* Wird bei transienten Fehlern bis zu 3-mal wiederholt (1s Backoff, 10s Timeout je Versuch).
|
||||
* Maximale Wartezeit: ca. 33 Sekunden (3 × 10s + 3 × 1s Backoff).
|
||||
*
|
||||
* @param context enthält {@code zipNameWithoutExt} und {@code runId} für den Request-Body
|
||||
* @param context enthält {@code zipNameWithoutExt} für das Log
|
||||
* @throws OrdsException wenn der ORDS-Aufruf nach allen Retries fehlschlägt
|
||||
*/
|
||||
public void notify(ProcessingContext context) throws OrdsException {
|
||||
// TODO: ORDS REST-Client implementieren (MicroProfile REST Client + @Retry)
|
||||
Log.infof("[STUB] ORDS-Benachrichtigung übersprungen für '%s' — wird später implementiert",
|
||||
context.zipNameWithoutExt);
|
||||
@Retry(maxRetries = 3, delay = 1000, delayUnit = ChronoUnit.MILLIS,
|
||||
retryOn = OrdsException.class)
|
||||
@Timeout(value = 10, unit = ChronoUnit.SECONDS)
|
||||
public void triggerDbProcessing(ProcessingContext context) throws OrdsException {
|
||||
Log.infof("Rufe ORDS-Endpunkt auf für '%s'", context.zipNameWithoutExt);
|
||||
Response response;
|
||||
try {
|
||||
response = ordsClient.processIncomingBaData(config.apiKey());
|
||||
} catch (Exception e) {
|
||||
throw new OrdsException("ORDS-Verbindung fehlgeschlagen für '"
|
||||
+ context.zipNameWithoutExt + "'", e);
|
||||
}
|
||||
|
||||
int status = response.getStatus();
|
||||
if (status >= 400) {
|
||||
throw new OrdsException("ORDS antwortete mit HTTP " + status
|
||||
+ " für '" + context.zipNameWithoutExt + "'");
|
||||
}
|
||||
|
||||
Log.infof("ORDS-Endpunkt aufgerufen, HTTP %d für '%s'", status, context.zipNameWithoutExt);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package de.galabau.dateieingang.pipeline;
|
||||
|
||||
import de.galabau.dateieingang.config.SftpConfig;
|
||||
import de.galabau.dateieingang.exception.OciException;
|
||||
import de.galabau.dateieingang.exception.OrdsException;
|
||||
import de.galabau.dateieingang.exception.SftpException;
|
||||
@@ -19,6 +20,8 @@ import org.slf4j.MDC;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
@@ -44,6 +47,9 @@ public class FileProcessingPipeline {
|
||||
@Inject
|
||||
OrdsNotificationService ordsNotificationService;
|
||||
|
||||
@Inject
|
||||
SftpConfig sftpConfig;
|
||||
|
||||
@Inject
|
||||
ManagedExecutor executor;
|
||||
|
||||
@@ -63,6 +69,8 @@ public class FileProcessingPipeline {
|
||||
executor.submit(() -> {
|
||||
try {
|
||||
processAll();
|
||||
} catch (Throwable e) { // nicht exception catchen, weil Error in OCI SDK auftreten können, die Throwable aber nicht Excption sind. Die würden sonst nicht geloggt
|
||||
Log.errorf(e, "Unerwarteter Fehler im Pipeline-Lauf");
|
||||
} finally {
|
||||
isRunning.set(false);
|
||||
}
|
||||
@@ -70,9 +78,11 @@ public class FileProcessingPipeline {
|
||||
return true;
|
||||
}
|
||||
|
||||
void processAll() {
|
||||
private void processAll() {
|
||||
Log.info("Pipeline-Lauf gestartet");
|
||||
|
||||
preProcessingCleanup();
|
||||
|
||||
List<String> zipFiles;
|
||||
try {
|
||||
zipFiles = sftpService.listZipFiles();
|
||||
@@ -83,6 +93,7 @@ public class FileProcessingPipeline {
|
||||
|
||||
if (zipFiles.isEmpty()) {
|
||||
Log.info("Keine neuen ZIP-Dateien auf dem SFTP-Server gefunden");
|
||||
Log.info("Pipeline-Lauf abgeschlossen");
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -99,6 +110,7 @@ public class FileProcessingPipeline {
|
||||
private void processZip(String zipFilename) {
|
||||
ProcessingContext context = new ProcessingContext(UUID.randomUUID(), zipFilename);
|
||||
MDC.put("runId", context.runId.toString());
|
||||
Log.infof("Starte Verarbeitung von '%s' [runId=%s]", zipFilename, context.runId);
|
||||
|
||||
try {
|
||||
// --- Download ---
|
||||
@@ -113,31 +125,50 @@ public class FileProcessingPipeline {
|
||||
Log.infof("ZIP '%s' entpackt: %d Datei(en)", zipFilename,
|
||||
context.extractedFiles.size());
|
||||
|
||||
// --- OCI Upload (Stub) ---
|
||||
// --- OCI Upload (Dateien, noch kein Marker) ---
|
||||
MDC.put("step", "oci-upload");
|
||||
ociUploadService.upload(context);
|
||||
context.status = ProcessingStatus.PARTIALLY_UPLOADED;
|
||||
Log.info("Starte OCI-Upload");
|
||||
ociUploadService.uploadFiles(context);
|
||||
|
||||
// --- SFTP Rename → .processed ---
|
||||
// Erst nach erfolgreichem Datei-Upload — Marker kommt danach,
|
||||
// damit Marker-Präsenz in OCI ↔ ZIP bereits .processed auf SFTP.
|
||||
MDC.put("step", "sftp-rename");
|
||||
sftpService.renameRemote(zipFilename, zipFilename + ".processed");
|
||||
Log.infof("SFTP Rename: '%s' → '%s.processed'", zipFilename, zipFilename);
|
||||
sftpService.renameFile(zipFilename, zipFilename + ".processed");
|
||||
|
||||
// --- ORDS Notify (Stub) ---
|
||||
// --- OCI Marker ---
|
||||
// Signalisiert der DB-Verarbeitung, dass der Batch vollständig hochgeladen ist.
|
||||
MDC.put("step", "oci-marker");
|
||||
ociUploadService.uploadMarker(context);
|
||||
context.status = ProcessingStatus.MARKER_UPLOADED;
|
||||
|
||||
// --- ORDS Notify ---
|
||||
MDC.put("step", "ords-notify");
|
||||
ordsNotificationService.notify(context);
|
||||
ordsNotificationService.triggerDbProcessing(context);
|
||||
|
||||
context.status = ProcessingStatus.ORDS_NOTIFIED;
|
||||
Log.infof("Verarbeitung erfolgreich abgeschlossen: '%s'", zipFilename);
|
||||
|
||||
} catch (SftpException | ZipException | OciException | OrdsException e) {
|
||||
Log.errorf(e, "Verarbeitung von '%s' fehlgeschlagen: %s", zipFilename, e.getMessage());
|
||||
} catch (ZipException e) {
|
||||
Log.errorf(e, "Ungültige ZIP-Datei '%s' — wird zu .error umbenannt", zipFilename);
|
||||
context.status = ProcessingStatus.FAILED;
|
||||
tryRenameToError(zipFilename);
|
||||
} catch (SftpException | OciException | OrdsException e) {
|
||||
Log.errorf(e, "Verarbeitung von '%s' fehlgeschlagen (Infrastruktur): %s", zipFilename, e.getMessage());
|
||||
context.status = ProcessingStatus.FAILED;
|
||||
} catch (IOException e) {
|
||||
Log.errorf(e, "I/O-Fehler bei der Verarbeitung von '%s'", zipFilename);
|
||||
context.status = ProcessingStatus.FAILED;
|
||||
tryRenameToError(zipFilename);
|
||||
} catch (RuntimeException e) {
|
||||
Log.errorf(e, "Unerwarteter Laufzeitfehler bei der Verarbeitung von '%s'", zipFilename);
|
||||
context.status = ProcessingStatus.FAILED;
|
||||
} finally {
|
||||
cleanup(context);
|
||||
postProcessingCleanup(context);
|
||||
long duration = Duration.between(context.startTime, LocalDateTime.now()).toMillis();
|
||||
Log.infof("Lauf %s für Datei %s abgeschlossen — Status: %s, Dauer: %dms",
|
||||
context.runId, zipFilename, context.status, duration);
|
||||
Log.info("-----------------------------------------------------------------------------------------------------");
|
||||
MDC.clear();
|
||||
}
|
||||
}
|
||||
@@ -145,24 +176,73 @@ public class FileProcessingPipeline {
|
||||
private void tryRenameToError(String zipFilename) {
|
||||
try {
|
||||
MDC.put("step", "sftp-rename");
|
||||
sftpService.renameRemote(zipFilename, zipFilename + ".error");
|
||||
Log.infof("SFTP Rename: '%s' → '%s.error'", zipFilename, zipFilename);
|
||||
sftpService.renameFile(zipFilename, zipFilename + ".error");
|
||||
} catch (SftpException e) {
|
||||
Log.warnf(e, "Umbenennen zu .error fehlgeschlagen für '%s' — Datei bleibt auf SFTP zur manuellen Prüfung",
|
||||
zipFilename);
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanup(ProcessingContext context) {
|
||||
MDC.put("step", "cleanup");
|
||||
/**
|
||||
* Bereinigt verwaiste lokale Arbeitsdateien aus fehlgeschlagenen Vorläufen.
|
||||
*
|
||||
* <p>Wird einmal pro Pipeline-Lauf <em>vor</em> dem SFTP-Listing aufgerufen.
|
||||
* Notwendig weil {@link #postProcessingCleanup} zwar im {@code finally}-Block läuft,
|
||||
* aber bei I/O-Fehlern selbst fehlschlagen kann — in diesem Fall bleiben ZIP-Dateien
|
||||
* und Entpack-Verzeichnisse im Arbeitsverzeichnis zurück. Ohne diesen Schritt würden
|
||||
* sich diese Reste akkumulieren und das Arbeitsverzeichnis über Zeit vollschreiben.
|
||||
*
|
||||
* <p>Ein Zeitstempel-Schwellwert ist nicht nötig: der {@code AtomicBoolean}-Guard in
|
||||
* {@link #tryProcessAllAsync} stellt sicher dass nie zwei Läufe gleichzeitig aktiv sind.
|
||||
* Alles was beim Start eines Laufs im Arbeitsverzeichnis liegt, ist daher garantiert
|
||||
* ein Überrest eines abgeschlossenen oder abgebrochenen Vorlaufs.
|
||||
*/
|
||||
private void preProcessingCleanup() {
|
||||
MDC.put("step", "pre-cleanup");
|
||||
Path workDir = Path.of(sftpConfig.localWorkDir());
|
||||
if (!Files.exists(workDir)) {
|
||||
return;
|
||||
}
|
||||
try (Stream<Path> entries = Files.list(workDir)) {
|
||||
entries.forEach(path -> {
|
||||
try {
|
||||
if (Files.isDirectory(path)) {
|
||||
deleteLocalDirectory(path);
|
||||
Log.warnf("Verwaistes Entpack-Verzeichnis gelöscht: %s", path);
|
||||
} else {
|
||||
Files.delete(path);
|
||||
Log.warnf("Verwaiste Datei gelöscht: %s", path);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
Log.warnf(e, "Pre-Cleanup fehlgeschlagen für: %s", path);
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
Log.warnf(e, "Pre-Cleanup: Arbeitsverzeichnis konnte nicht gelesen werden: %s", workDir);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Bereinigt die lokalen Arbeitsdateien eines abgeschlossenen Laufs (ZIP + Entpack-Verzeichnis).
|
||||
*
|
||||
* <p>Wird im {@code finally}-Block von {@link #processZip} aufgerufen, also sowohl nach
|
||||
* erfolgreichem Abschluss als auch nach Fehlern. Schlägt dieser Cleanup bei I/O-Problemen
|
||||
* fehl, verbleiben die Dateien im Arbeitsverzeichnis — sie werden dann beim nächsten
|
||||
* Pipeline-Lauf durch {@link #preProcessingCleanup} entfernt.
|
||||
*
|
||||
* @param context enthält die Pfade der zu löschenden lokalen ZIP und des Entpack-Verzeichnisses
|
||||
*/
|
||||
private void postProcessingCleanup(ProcessingContext context) {
|
||||
MDC.put("step", "post-cleanup");
|
||||
Log.infof("Cleanup gestartet für Lauf %s", context.runId);
|
||||
try {
|
||||
if (context.localZipPath != null) {
|
||||
Files.deleteIfExists(context.localZipPath);
|
||||
Log.debugf("Lokale ZIP gelöscht: %s", context.localZipPath);
|
||||
Log.infof("Lokale ZIP gelöscht: %s", context.localZipPath);
|
||||
}
|
||||
if (context.localExtractDir != null) {
|
||||
deleteLocalDirectory(context.localExtractDir);
|
||||
Log.debugf("Lokales Entpack-Verzeichnis gelöscht: %s", context.localExtractDir);
|
||||
Log.infof("Lokales Entpack-Verzeichnis gelöscht: %s", context.localExtractDir);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
Log.warnf(e, "Cleanup für Lauf %s fehlgeschlagen — lokale Dateien verbleiben ggf. in %s",
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package de.galabau.dateieingang.sftp;
|
||||
|
||||
import de.galabau.dateieingang.config.SftpConfig;
|
||||
import de.galabau.dateieingang.exception.SftpException;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
@@ -8,7 +9,6 @@ import jakarta.inject.Inject;
|
||||
import net.schmizz.sshj.SSHClient;
|
||||
import net.schmizz.sshj.sftp.RemoteResourceInfo;
|
||||
import net.schmizz.sshj.sftp.SFTPClient;
|
||||
import net.schmizz.sshj.transport.verification.PromiscuousVerifier;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
@@ -26,6 +26,7 @@ public class SftpService {
|
||||
void init() {
|
||||
try {
|
||||
Files.createDirectories(Path.of(config.localWorkDir()));
|
||||
Log.infof("Lokales Arbeitsverzeichnis: %s", config.localWorkDir());
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Lokales Arbeitsverzeichnis konnte nicht erstellt werden: "
|
||||
+ config.localWorkDir(), e);
|
||||
@@ -44,8 +45,10 @@ public class SftpService {
|
||||
private <T> T withSftp(SftpOperation<T> operation) throws SftpException {
|
||||
try (SSHClient ssh = new SSHClient()) {
|
||||
configureHostKeyVerification(ssh);
|
||||
Log.infof("Verbinde zu SFTP %s:%d", config.host(), config.port());
|
||||
ssh.connect(config.host(), config.port());
|
||||
authenticate(ssh);
|
||||
Log.infof("SFTP-Verbindung hergestellt");
|
||||
try (SFTPClient sftp = ssh.newSFTPClient()) {
|
||||
return operation.execute(sftp);
|
||||
}
|
||||
@@ -55,19 +58,20 @@ public class SftpService {
|
||||
}
|
||||
}
|
||||
|
||||
private void configureHostKeyVerification(SSHClient ssh) {
|
||||
private void configureHostKeyVerification(SSHClient ssh) throws SftpException {
|
||||
if (config.hostKeyFingerprint().isPresent()) {
|
||||
ssh.addHostKeyVerifier(config.hostKeyFingerprint().get());
|
||||
} else {
|
||||
Log.warn("SFTP Host-Key-Fingerprint nicht konfiguriert — PromiscuousVerifier aktiv (nur Dev!)");
|
||||
ssh.addHostKeyVerifier(new PromiscuousVerifier());
|
||||
throw new SftpException("SFTP Host-Key-Fingerprint nicht konfiguriert — Verbindung abgelehnt");
|
||||
}
|
||||
}
|
||||
|
||||
private void authenticate(SSHClient ssh) throws IOException {
|
||||
if (config.privateKeyPath().isPresent()) {
|
||||
Log.infof("SFTP-Authentifizierung via Public-Key für Benutzer '%s'", config.username());
|
||||
ssh.authPublickey(config.username(), config.privateKeyPath().get());
|
||||
} else {
|
||||
Log.infof("SFTP-Authentifizierung via Passwort für Benutzer '%s'", config.username());
|
||||
ssh.authPassword(config.username(), config.password());
|
||||
}
|
||||
}
|
||||
@@ -79,6 +83,7 @@ public class SftpService {
|
||||
* @throws SftpException bei Verbindungs- oder Lesefehler
|
||||
*/
|
||||
public List<String> listZipFiles() throws SftpException {
|
||||
Log.infof("Lese SFTP-Verzeichnis '%s'", config.remotePath());
|
||||
return withSftp(sftp ->
|
||||
sftp.ls(config.remotePath()).stream()
|
||||
.filter(RemoteResourceInfo::isRegularFile)
|
||||
@@ -97,6 +102,7 @@ public class SftpService {
|
||||
*/
|
||||
public Path download(String filename) throws SftpException {
|
||||
Path localFile = Path.of(config.localWorkDir(), filename);
|
||||
Log.infof("Starte Download: '%s'", filename);
|
||||
withSftp(sftp -> {
|
||||
sftp.get(config.remotePath() + "/" + filename, localFile.toString());
|
||||
return null;
|
||||
@@ -112,7 +118,8 @@ public class SftpService {
|
||||
* @param newFilename neuer Dateiname, z.B. {@code export_2026-04-08.zip.processed}
|
||||
* @throws SftpException bei Verbindungs- oder Umbenennfehler
|
||||
*/
|
||||
public void renameRemote(String filename, String newFilename) throws SftpException {
|
||||
public void renameFile(String filename, String newFilename) throws SftpException {
|
||||
Log.infof("SFTP Rename: '%s' → '%s'", filename, newFilename);
|
||||
withSftp(sftp -> {
|
||||
sftp.rename(
|
||||
config.remotePath() + "/" + filename,
|
||||
@@ -120,5 +127,6 @@ public class SftpService {
|
||||
);
|
||||
return null;
|
||||
});
|
||||
Log.infof("SFTP Rename erfolgreich: '%s'", newFilename);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
package de.galabau.dateieingang.zip;
|
||||
|
||||
import de.galabau.dateieingang.config.OciConfig;
|
||||
import de.galabau.dateieingang.exception.ZipException;
|
||||
import de.galabau.dateieingang.model.FileEntry;
|
||||
import de.galabau.dateieingang.model.ProcessingContext;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
|
||||
import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
|
||||
|
||||
@@ -19,6 +22,9 @@ import java.util.List;
|
||||
@ApplicationScoped
|
||||
public class ZipExtractionService {
|
||||
|
||||
@Inject
|
||||
OciConfig ociConfig;
|
||||
|
||||
/**
|
||||
* Entpackt die ZIP-Datei aus {@code context.localZipPath} in ein gleichnamiges Unterverzeichnis.
|
||||
* Setzt {@code context.localExtractDir} und {@code context.extractedFiles}.
|
||||
@@ -35,6 +41,7 @@ public class ZipExtractionService {
|
||||
|
||||
try {
|
||||
Files.createDirectories(extractDir);
|
||||
Log.infof("Entpacke ZIP '%s'", context.zipFilename);
|
||||
|
||||
try (ZipArchiveInputStream zis = new ZipArchiveInputStream(
|
||||
new BufferedInputStream(Files.newInputStream(context.localZipPath)))) {
|
||||
@@ -51,14 +58,20 @@ public class ZipExtractionService {
|
||||
Files.copy(zis, targetFile, StandardCopyOption.REPLACE_EXISTING);
|
||||
|
||||
boolean isMarker = Path.of(entryName).getFileName()
|
||||
.toString().equals("_READY_FOR_DB_PROCESSING_");
|
||||
.toString().equals(ociConfig.markerFilenameDbProcessing());
|
||||
|
||||
entries.add(new FileEntry(entryName, Files.size(targetFile), isMarker));
|
||||
FileEntry fileEntry = new FileEntry(entryName, Files.size(targetFile), isMarker);
|
||||
entries.add(fileEntry);
|
||||
Log.infof("Extrahiert: '%s' (%d Bytes)", entryName, fileEntry.fileSize);
|
||||
if (fileEntry.isMarker) {
|
||||
Log.infof("Marker-Datei gefunden: '%s'", entryName);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
context.extractedFiles = entries;
|
||||
Log.infof("Extraktion abgeschlossen: %d Datei(en) aus '%s'", entries.size(), context.zipFilename);
|
||||
|
||||
} catch (IOException e) {
|
||||
throw new ZipException("ZIP '" + context.zipFilename + "' konnte nicht entpackt werden: "
|
||||
|
||||
@@ -18,27 +18,34 @@ galabau.sftp.local-work-dir=/tmp/sftp-work
|
||||
# galabau.sftp.private-key-path=/etc/secrets/sftp-key
|
||||
# galabau.sftp.private-key-passphrase=${SFTP_KEY_PASSPHRASE}
|
||||
|
||||
# ===== OCI (Stub — noch nicht aktiv) =====
|
||||
# galabau.oci.namespace=${OCI_NAMESPACE}
|
||||
# galabau.oci.region=${OCI_REGION}
|
||||
# galabau.oci.bucket=${OCI_BUCKET}
|
||||
# galabau.oci.tenant-prefix=mandant_42/
|
||||
# galabau.oci.incoming-prefix=eingang/
|
||||
# galabau.oci.tenancy-id=${OCI_TENANCY_ID}
|
||||
# galabau.oci.user-id=${OCI_USER_ID}
|
||||
# galabau.oci.fingerprint=${OCI_FINGERPRINT}
|
||||
# galabau.oci.private-key-path=${OCI_PRIVATE_KEY_PATH}
|
||||
# ===== OCI Object Storage =====
|
||||
# Dateiname des DB-Processing-Markers, der nach dem Upload aller Nutzdateien in OCI abgelegt wird
|
||||
galabau.oci.marker-filename-db-processing=${OCI_MARKER_FILENAME_DB_PROCESSING:_READY_FOR_DB_PROCESSING_}
|
||||
|
||||
# ===== ORDS (Stub — noch nicht aktiv) =====
|
||||
# galabau.ords.base-url=${GALABAU_ORDS_BASE_URL:http://ords:8080}
|
||||
# galabau.ords.process-incoming-path=/ords/.../auto_import/process_incoming
|
||||
# galabau.ords.api-key=${GALABAU_ORDS_API_KEY}
|
||||
# quarkus.rest-client.ords-client.url=${galabau.ords.base-url}
|
||||
galabau.oci.namespace=${OCI_NAMESPACE}
|
||||
galabau.oci.region=${OCI_REGION}
|
||||
galabau.oci.bucket=${OCI_BUCKET}
|
||||
# Root-Prefix im Bucket, muss mit / enden
|
||||
galabau.oci.tenant-prefix=${OCI_TENANT_PREFIX:testmandant-42/}
|
||||
# Eingangs-Prefix unterhalb von tenant-prefix, muss mit / enden
|
||||
galabau.oci.incoming-prefix=${OCI_INCOMING_FILES_PATH:BA/Eingang/Import/}
|
||||
galabau.oci.tenancy-id=${OCI_TENANCY_ID}
|
||||
galabau.oci.user-id=${OCI_USER_ID}
|
||||
galabau.oci.fingerprint=${OCI_FINGERPRINT}
|
||||
%prod.galabau.oci.private-key-path=${OCI_PRIVATE_KEY_PATH}
|
||||
%dev.galabau.oci.private-key-path=${OCI_PRIVATE_KEY_PATH:oci-private-key.pem}
|
||||
|
||||
# ===== ORDS =====
|
||||
# Base-URL bis einschließlich Modul-Pfad, z.B. https://apex.example.com/ords/myschema/auto_import
|
||||
galabau.ords.base-url=${GALABAU_ORDS_BASE_URL}
|
||||
galabau.ords.api-key=${GALABAU_ORDS_API_KEY}
|
||||
# MicroProfile REST Client liest die URL aus dieser Property. Das hier ist eine einfache weiterleitung auf die env variable GALABAU_ORDS_BASE_URL (s.o.)
|
||||
quarkus.rest-client.ords-client.url=${galabau.ords.base-url}
|
||||
|
||||
# ===== Observability =====
|
||||
%prod.quarkus.otel.exporter.otlp.endpoint=${OTEL_EXPORTER_OTLP_ENDPOINT:http://localhost:4317}
|
||||
%dev.quarkus.observability.lgtm.grafana-port=3000
|
||||
%dev.quarkus.observability.lgtm.otel-grpc-port=4317
|
||||
%dev.quarkus.otel.logs.enabled=true
|
||||
#%dev.quarkus.observability.lgtm.grafana-port=3000
|
||||
#%dev.quarkus.observability.lgtm.otel-grpc-port=4317
|
||||
quarkus.otel.logs.enabled=true
|
||||
#%prod.quarkus.otel.logs.enabled=true
|
||||
%prod.quarkus.log.console.json=true
|
||||
#%prod.quarkus.log.console.json=true
|
||||
|
||||
@@ -30,7 +30,8 @@ Details zur DB-Verarbeitung: `database/docs/plan_pck_net_storage.md`
|
||||
│ im letzten Quarkus-Lauf fehlgeschlagen) │
|
||||
│ │
|
||||
│ 2. Dateieingang Service aufrufen (fire & forget) │
|
||||
│ HTTP POST /api/process-incoming (Header: X-Api-Key) │
|
||||
│ HTTP POST /api/process-incoming-ba-korrespondenz |
|
||||
| (Header: X-Api-Key) │
|
||||
└────────────────────────────┬────────────────────────────────────┘
|
||||
│
|
||||
▼
|
||||
@@ -42,11 +43,13 @@ Details zur DB-Verarbeitung: `database/docs/plan_pck_net_storage.md`
|
||||
│ 3c. Alle Dateien in OCI eingang/<zip-name>/ hochladen │
|
||||
│ (Unterordner aus der ZIP werden beibehalten) │
|
||||
│ → Fehler stoppt Verarbeitung dieser ZIP │
|
||||
│ 3d. Marker eingang/<zip-name>/_READY_FOR_DB_PROCESSING_ │
|
||||
│ hochladen │
|
||||
│ 3e. ZIP auf SFTP umbenennen (.processed oder .error) │
|
||||
│ → erst NACH erfolgreichem Marker-Upload │
|
||||
│ 3f. ORDS-Endpunkt aufrufen (pck_auto_import.p_process_incoming_ba_data)│
|
||||
│ 3d. ZIP auf SFTP umbenennen zu .processed │
|
||||
│ → bei ungültiger ZIP: .error (manuelle Prüfung nötig) │
|
||||
│ → bei Infrastrukturfehlern: keine Umbenennung, Retry │
|
||||
│ 3e. Marker eingang/<zip-name>/_READY_FOR_DB_PROCESSING_ │
|
||||
│ hochladen — ERST NACH dem SFTP-Rename (siehe unten) │
|
||||
│ 3f. ORDS-Endpunkt aufrufen |
|
||||
| (pck_auto_import.p_process_incoming_ba_data) │
|
||||
│ 3g. Lokale Arbeitsdateien löschen │
|
||||
└────────────────────────────┬────────────────────────────────────┘
|
||||
│
|
||||
@@ -95,30 +98,85 @@ Daran können die Sachbearbeiter erkennen, dass der Ordner nicht mehr automatisc
|
||||
|
||||
## Fehlerfall-Verhalten
|
||||
|
||||
**Service: Upload einer Datei schlägt fehl**
|
||||
- Verarbeitung dieser ZIP stoppt sofort
|
||||
- Kein Marker wird geschrieben, ZIP auf SFTP wird zu `.error` umbenannt
|
||||
- ORDS wird nicht aufgerufen
|
||||
- Bereits hochgeladene Dateien werden beim nächsten Trigger überschrieben (OCI PUT idempotent)
|
||||
**Service: ZIP ist beschädigt oder ungültig**
|
||||
- SFTP: ZIP → `.error` (manuelle Prüfung nötig)
|
||||
- OCI: kein Upload, kein Marker
|
||||
- DB: wird nicht aufgerufen
|
||||
|
||||
**Service: ORDS-Aufruf schlägt fehl**
|
||||
- Marker liegt in `eingang/<zip-name>/`, Dateien sind vollständig hochgeladen
|
||||
- Beim nächsten Stundenlauf findet APEX Automation den Marker und verarbeitet
|
||||
**Service: SFTP-Download fehlgeschlagen**
|
||||
- SFTP: ZIP bleibt unverändert, wird beim nächsten Stundenlauf erneut versucht
|
||||
- OCI: kein Upload, kein Marker
|
||||
- DB: wird nicht aufgerufen
|
||||
|
||||
**Service: OCI-Upload (Dateien) fehlgeschlagen**
|
||||
- SFTP: ZIP bleibt unverändert, wird beim nächsten Stundenlauf erneut versucht
|
||||
- OCI: teilweise hochgeladene Dateien bleiben liegen (kein Marker → DB ignoriert den Ordner); beim Retry werden sie überschrieben (OCI PUT ist idempotent)
|
||||
- DB: wird nicht aufgerufen
|
||||
|
||||
**Service: SFTP-Rename zu `.processed` fehlgeschlagen**
|
||||
- SFTP: ZIP bleibt unverändert, wird beim nächsten Stundenlauf erneut versucht
|
||||
- OCI: Dateien hochgeladen, noch kein Marker (Marker kommt erst nach dem Rename)
|
||||
- DB: wird nicht aufgerufen
|
||||
- beim nächsten Stundenlauf werden die Dateien aber nicht importiert, da APEX Automation ohne Marker nichts findet
|
||||
- d.h. erst nachdem die ZIP Datei erneut abgearbeitet und komplett in OCI hochgeladen wurde (diesmal mit .processed-Umbennung auf SFTP & Marker in OCI) werden die Dateien abgearbeitet
|
||||
|
||||
|
||||
**Service: OCI-Marker-Upload fehlgeschlagen**
|
||||
- SFTP: ZIP ist bereits `.processed` — Quarkus greift sie nie wieder auf
|
||||
- OCI: Dateien vollständig hochgeladen, Marker fehlt → DB-Verarbeitung wird nicht ausgelöst
|
||||
- DB wird die Dateien wegen dem fehlendem Marker nie automatisiert abarbeiten, aber man sieht das recht einfach über den OCI Dateibrowser in Apex
|
||||
|
||||
- DB: wird nicht aufgerufen
|
||||
- **Manueller Fix:** Marker-Datei `eingang/<zip-name>/_READY_FOR_DB_PROCESSING_` in OCI von Hand anlegen (leere Datei) — APEX Automation verarbeitet den Batch dann beim nächsten Stundenlauf
|
||||
|
||||
**Service: ORDS-Aufruf fehlgeschlagen**
|
||||
- SFTP: ZIP ist bereits `.processed` — Quarkus greift sie nie wieder auf
|
||||
- OCI: Dateien + Marker vollständig hochgeladen
|
||||
- DB: APEX Automation findet den Marker beim nächsten Stundenlauf und verarbeitet ihn (Schritt 1) — kein Doppelimport, da Quarkus die `.processed`-Datei nicht erneut verarbeitet
|
||||
|
||||
**DB: Verarbeitung einer einzelnen Datei schlägt fehl**
|
||||
- Rollback — Datei bleibt in `eingang/<zip-name>/`
|
||||
- ERROR in `lg_app_log` mit `log_object_ref = eingang/<zip-name>/datei.csv`
|
||||
- Nächste Dateien im Batch werden weiterverarbeitet
|
||||
- OCI `eingang/`: Datei bleibt in `eingang/<zip-name>/` (Rollback)
|
||||
- OCI `zielordner/`: keine Änderung
|
||||
- DB: Rollback, ERROR in `lg_app_log` mit `log_object_ref = eingang/<zip-name>/datei.csv`, nächste Dateien im Batch werden weiterverarbeitet
|
||||
|
||||
**DB: Batch-Abschluss (nach dem Datei-Loop)**
|
||||
- DB-Marker (`_READY_FOR_DB_PROCESSING_`) wird **immer gelöscht** — kein automatischer Retry
|
||||
- Liegen noch Dateien im Unterordner: SB-Marker (`_BITTE_PRÜFEN_`) wird angelegt → Sachbearbeiter müssen manuell eingreifen
|
||||
- Alle Dateien erfolgreich: INFO in `lg_app_log`, Unterordner ist leer
|
||||
- Alle Dateien erfolgreich: `eingang/<zip-name>/` ist leer, Marker wird gelöscht
|
||||
- Noch Dateien übrig: Marker wird gelöscht, SB-Marker (`_BITTE_PRÜFEN_`) wird angelegt → Sachbearbeiter müssen manuell eingreifen
|
||||
|
||||
**DB: p_move_object schlägt nach erfolgreichem Import fehl**
|
||||
- Rollback des Imports → sauberer Ausgangszustand
|
||||
- Datei bleibt in `eingang/<zip-name>/`
|
||||
- DB-Marker wird trotzdem am Ende des Loops gelöscht; falls noch Dateien übrig → SB-Marker
|
||||
- OCI `eingang/`: Datei bleibt in `eingang/<zip-name>/` (Rollback des gesamten Imports)
|
||||
- OCI `zielordner/`: keine Änderung
|
||||
- DB: Marker wird am Ende des Loops trotzdem gelöscht; falls noch Dateien übrig → SB-Marker
|
||||
|
||||
---
|
||||
|
||||
## Design-Entscheidung: Marker wird nach dem SFTP-Rename gesetzt
|
||||
|
||||
Der OCI-Marker `_READY_FOR_DB_PROCESSING_` wird bewusst **nach** dem SFTP-Rename zu `.processed`
|
||||
hochgeladen — nicht davor. Das erzeugt eine harte Invariante:
|
||||
|
||||
> **Marker in OCI vorhanden ↔ ZIP auf SFTP bereits `.processed`**
|
||||
|
||||
### Warum ist das wichtig?
|
||||
|
||||
APEX Automation ruft `p_process_incoming_ba_data` in jedem Stundenlauf einmal direkt auf
|
||||
(Schritt 1, Fallback), und Quarkus ruft dieselbe Funktion via ORDS auf (Schritt 3f, schneller Pfad).
|
||||
Ohne die Invariante könnte folgender Race entstehen:
|
||||
|
||||
1. Quarkus lädt Dateien + Marker hoch, schlägt dann beim SFTP-Rename fehl
|
||||
2. APEX Schritt 1 findet den Marker → importiert Daten
|
||||
3. Quarkus wiederholt den Lauf, ruft ORDS auf → zweiter Import derselben Daten
|
||||
|
||||
Mit der Invariante ist dieser Fall ausgeschlossen: APEX Schritt 1 findet nur dann einen Marker,
|
||||
wenn die ZIP auf dem SFTP bereits `.processed` ist. Ist sie das, greift Quarkus sie im Retry
|
||||
nicht mehr an — `listZipFiles()` gibt nur `.zip`-Dateien zurück.
|
||||
|
||||
### Einzig verbleibender manueller Fehlerfall
|
||||
|
||||
Schlägt der Marker-Upload fehl (nach erfolgreichem SFTP-Rename), ist der Zustand eindeutig
|
||||
erkennbar: `.processed` auf SFTP, Dateien in OCI ohne Marker. Manueller Fix: Marker-Datei
|
||||
in OCI von Hand anlegen. Dieser Fall erfordert keine DB-seitige Idempotenz, da Quarkus
|
||||
die Datei nicht erneut verarbeitet und ORDS nicht aufruft.
|
||||
|
||||
---
|
||||
|
||||
|
||||
Reference in New Issue
Block a user