# Plan: n8n Workflow → Quarkus Backend ersetzen **Stand:** 2026-04-08 **Basis:** Avise-Tool Quarkus Backend (agent-backend/) — angepasst für SFTP → OCI Object Storage → DB-Workflow --- ## Ziel Ersetze den n8n-Workflow (derzeit: SFTP-Poll → ZIP-Entpacken → OCI-Upload → ORDS-Trigger) durch einen nativen Quarkus-Backend-Service. **Vorteil:** - Keine externe Workflow-Engine-Abhängigkeit (n8n) - Single-Responsibility: Ein Service für eine Aufgabe (Dateieingang) - Bessere Observability (OTLP → Loki/Grafana) - Volle Kontrolle über Fehlerbehandlung und Retry-Logik - Einfacheres Deployment (JAR statt n8n-Instanz) --- ## Architektur-Überblick Das Scheduling übernimmt weiterhin die APEX Automation (stündlich). Sie ruft den Quarkus-Service per HTTP POST auf — wie bisher den n8n-Webhook. Der Service selbst hat keinen eigenen Scheduler. ``` APEX Automation (stündlich) │ ▼ HTTP POST /api/process-incoming (Header: X-Api-Key) FileProcessingResource [Quarkus REST Endpoint] │ gibt sofort 202 Accepted zurück (async, fire & forget) ▼ FileProcessingPipeline [Hintergrund-Thread — für jede neue ZIP] │ ├─→ SftpService [SSHJ — list, download, rename] │ ├─→ ZipExtractionService [Apache Commons Compress] │ ├─→ OciUploadService [OCI SDK — Instance Principal Auth] │ └─→ OrdsNotificationService [MicroProfile REST Client] │ ▼ Oracle DB (eingang/, zielordner) ``` --- ## Projektstruktur (Maven) ``` backend-n8n-replacement/ ├── pom.xml (Java 25, Quarkus 3.x LTS) │ ├── src/main/java/de/galabau/ │ │ │ ├── api/ │ │ └── FileProcessingResource.java (@Path("/api/process-incoming"), @POST, API-Key-Check) │ │ │ ├── sftp/ │ │ ├── SftpConfig.java (@ConfigMapping) │ │ └── SftpService.java (SSHJ — list, download, rename) │ │ │ ├── zip/ │ │ └── ZipExtractionService.java (Apache Commons Compress) │ │ │ ├── oci/ │ │ ├── OciConfig.java │ │ └── OciUploadService.java (OCI SDK, Instance Principal, Marker-Handling) │ │ │ ├── ords/ │ │ ├── OrdsClient.java (MicroProfile REST Client) │ │ └── OrdsNotificationService.java (Retry via @Retry annotation) │ │ │ ├── pipeline/ │ │ └── FileProcessingPipeline.java (Extract → Upload → Notify → Cleanup) │ │ │ ├── model/ │ │ ├── ProcessingContext.java (runId, Dateien, Status) │ │ └── OrdsRequest.java (DTO) │ │ │ ├── config/ │ │ └── ApplicationConfig.java (zentrale @ConfigMapping) │ │ │ └── exception/ │ ├── SftpException.java │ ├── ZipException.java │ ├── OciException.java │ └── OrdsException.java │ ├── src/main/resources/ │ ├── application.properties │ └── application-dev.properties │ └── docs/ ├── SETUP.md ├── Architecture.md ├── Configuration.md └── ErrorHandling.md ``` --- ## Pipeline-Steps | Step | Komponente | Bibliothek | Beschreibung | |---|---|---|---| | `api-trigger` | `FileProcessingResource` | Quarkus REST | Empfängt POST von APEX Automation, prüft API-Key, startet Pipeline async | | `sftp-list` | `SftpService` | SSHJ | Listet `*.zip`-Dateien im Remote-Verzeichnis | | `sftp-download` | `SftpService` | SSHJ | Lädt ZIP in lokales Arbeitsverzeichnis | | `zip-extract` | `ZipExtractionService` | Apache Commons Compress | Entpackt ZIP, preserviert Ordnerstruktur | | `oci-upload` | `OciUploadService` | OCI SDK | Lädt Dateien + Marker zu OCI Object Storage | | `sftp-rename` | `SftpService` | SSHJ | Remote-Rename zu `.processed` oder `.error` | | `ords-notify` | `OrdsNotificationService` | MicroProfile REST Client | Ruft ORDS-Endpunkt auf | | `cleanup` | `FileProcessingPipeline` | pure Java | Löscht lokale Arbeitsdateien (ZIP + entpackte Dateien) | --- ## Konfiguration (`application.properties`) ```properties # API-Absicherung galabau.api.key=${GALABAU_API_KEY} # SFTP galabau.sftp.host=sftp.lieferant.de galabau.sftp.port=22 galabau.sftp.username=sftp_user galabau.sftp.password=${GALABAU_SFTP_PASSWORD} galabau.sftp.host-key-fingerprint=SHA256:AbCdEfGh... # ssh-keyscan host | ssh-keygen -lf - galabau.sftp.remote-path=/outgoing galabau.sftp.local-work-dir=/tmp/sftp-work # Alternative: Public-Key-Auth (empfohlen für Produktion) # galabau.sftp.private-key-path=/etc/secrets/sftp-key # galabau.sftp.private-key-passphrase=${SFTP_KEY_PASSPHRASE} # OCI Object Storage — Auth via Credentials (Key als gemountetes Kubernetes Secret) galabau.oci.namespace=mycompany galabau.oci.region=eu-frankfurt-1 galabau.oci.bucket=my-bucket galabau.oci.tenant-prefix=mandant_42/ galabau.oci.incoming-prefix=eingang/ # Scalar Credentials aus Env-Vars: galabau.oci.tenancy-id=${OCI_TENANCY_ID} galabau.oci.user-id=${OCI_USER_ID} galabau.oci.fingerprint=${OCI_FINGERPRINT} # Private Key als gemountetes Kubernetes Secret (Dateipfad, kein Env-Var-String): galabau.oci.private-key-path=${OCI_PRIVATE_KEY_PATH} # z.B. /etc/oci/private-key.pem # ORDS galabau.ords.base-url=http://ords:8080 galabau.ords.process-incoming-path=/ords/.../net_storage/process_incoming galabau.ords.api-key=${GALABAU_ORDS_API_KEY} # Fehlerbehandlung galabau.retry.max-attempts=3 galabau.retry.backoff-ms=1000 ``` **Credential-Strategie:** - Alle Passwörter und API-Keys über `${ENV_VAR_NAME}` — Quarkus liest Umgebungsvariablen nativ - OCI-Auth via **Instance Principal** — kein Key-Management, keine Rotation (empfohlen auf OKE) - Deployment: Kubernetes Secrets → Environment Injection für Passwörter/API-Keys --- ## REST Endpoint ```java @Path("/api/process-incoming") @ApplicationScoped public class FileProcessingResource { @Inject ApplicationConfig config; @Inject FileProcessingPipeline pipeline; @POST @Produces(MediaType.APPLICATION_JSON) public Response triggerProcessing(@HeaderParam("X-Api-Key") String apiKey) { if (!config.api().key().equals(apiKey)) { return Response.status(Response.Status.UNAUTHORIZED).build(); } // Async: Pipeline im Hintergrund, APEX bekommt sofort 202 pipeline.processAllAsync(); return Response.accepted().build(); } } ``` **Aufruf durch APEX Automation:** ```sql apex_web_service.make_rest_request( p_url => 'http://backend:8080/api/process-incoming', p_http_method => 'POST', p_http_headers => apex_util.string_to_table('X-Api-Key:' || ) ); ``` **Concurrency Guard:** `FileProcessingPipeline` hält ein `AtomicBoolean isRunning`-Flag. Kommt ein zweiter Trigger während eine Pipeline läuft → 409 Conflict zurückgeben, kein doppelter Lauf. --- ## Async-Verhalten ```java @ApplicationScoped public class FileProcessingPipeline { private final AtomicBoolean isRunning = new AtomicBoolean(false); @Inject ManagedExecutor executor; // Quarkus Managed Executor public Response processAllAsync() { if (!isRunning.compareAndSet(false, true)) { return Response.status(409).entity("Pipeline already running").build(); } executor.submit(() -> { try { processAll(); } finally { isRunning.set(false); } }); return Response.accepted().build(); } void processAll() { ... } // eigentliche Logik } ``` Fehler im Hintergrund-Thread landen im Log (OTLP → Grafana) — APEX bekommt sie nicht als HTTP-Fehler, da die Antwort bereits weg ist. Das entspricht dem bisherigen n8n fire-and-forget-Verhalten. --- ## Fehlerbehandlung & Retry ### Fehlerklassen | Fehler | Typ | Retry | Verhalten | |---|---|---|---| | SFTP-Verbindung fehlgeschlagen | transient | nein | Nächster APEX-Lauf (1h) versucht es | | ZIP beschädigt | persistent | nein | ZIP auf SFTP umbenennen zu `.error`, Log | | OCI-Verbindung fehlgeschlagen (z.B. 503) | transient | ja (exponential backoff) | @Retry | | OCI-Upload einer Datei schlägt fehl | persistent | nein | Bereits hochgeladene löschen, ZIP belassen, Log | | ORDS-Aufruf schlägt fehl | transient | ja (2-3x) | Marker liegt vor → APEX Automation schlägt beim nächsten Lauf ein | | Allgemein technischer Fehler | fallabhängig | siehe SmallRye Fault Tolerance | Exception-Log | ### Retry-Strategie (SmallRye Fault Tolerance) ```java @Retry(maxRetries = 3, delay = 1000, delayUnit = ChronoUnit.MILLIS) @Timeout(value = 10, unit = ChronoUnit.SECONDS) public void notifyOrds(ProcessingContext context) { ... } ``` --- ## Datenmodelle ### `ProcessingContext` ```java public class ProcessingContext { public UUID runId; // eindeutige Lauf-ID public String zipFilename; // z.B. "export_2026-04-08.zip" public String zipNameWithoutExt; // z.B. "export_2026-04-08" public Path localZipPath; // lokale ZIP-Datei (für Cleanup) public Path localExtractDir; // lokales Entpack-Verzeichnis (für Cleanup) public List extractedFiles; // Verzeichnis + Datei-Namen public boolean markerUploaded; public LocalDateTime startTime; public ProcessingStatus status; } public enum ProcessingStatus { PENDING, PARTIALLY_UPLOADED, MARKER_UPLOADED, ORDS_NOTIFIED, FAILED } ``` ### `FileEntry` ```java public class FileEntry { public String relativePath; // "subdir/file.csv" public String ociKey; // "eingang/export_2026-04-08/subdir/file.csv" public long fileSize; public boolean isMarker; // true nur für "_READY_FOR_DB_PROCESSING_" } ``` --- ## SFTP-Verarbeitung mit SSHJ Siehe `plan_camel_integration.md` für Details zur SSHJ-Integration (Host-Key-Verification, Credentials, Fehlerbehandlung). ### Verarbeitungslogik ``` Pipeline.processAll(): 1. SftpService.listZipFiles() → ["export_2026-04-08.zip", ...] 2. für jede ZIP: a. SftpService.download(zip) → lokale Datei b. ZipExtractionService.extract() → ProcessingContext mit FileEntry-Liste c. OciUploadService.upload() → Dateien + Marker in OCI d. SftpService.renameRemote(.processed oder .error) e. OrdsNotificationService.notify() f. cleanup: lokale ZIP + Entpack-Verzeichnis löschen ← immer, auch bei Fehler ``` **Cleanup (Schritt f) läuft immer** — in einem `finally`-Block — damit kein Disk-Vollaufen bei Fehlern oder großen ZIPs. --- ## OCI-Authentifizierung (SimpleAuthenticationDetailsProvider) Credentials kommen aus Umgebungsvariablen (skalare Werte) und einem gemounteten Kubernetes Secret (Private Key als PEM-Datei). ```java @ApplicationScoped public class OciUploadService { private final ObjectStorage client; @Inject OciConfig config; @PostConstruct void init() { SimpleAuthenticationDetailsProvider auth = SimpleAuthenticationDetailsProvider.builder() .tenantId(config.tenancyId()) .userId(config.userId()) .fingerprint(config.fingerprint()) .region(Region.fromRegionId(config.region())) .privateKeySupplier(new FilePrivateKeySupplier(config.privateKeyPath())) .build(); this.client = ObjectStorageClient.builder().build(auth); } } ``` **Kubernetes Deployment:** - Skalare Werte (`tenancyId`, `userId`, `fingerprint`) → Kubernetes Secret → Env-Vars - Private Key PEM-Datei → Kubernetes Secret → Volume Mount unter `/etc/oci/private-key.pem` - `OCI_PRIVATE_KEY_PATH=/etc/oci/private-key.pem` als Env-Var --- ## ORDS-Integration ### REST-Client ```java @RegisterRestClient(configKey = "ords-client") public interface OrdsClient { @POST @Path("${galabau.ords.process-incoming-path}") @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) Response processIncoming(ProcessIncomingRequest request); } // application.properties: // quarkus.rest-client.ords-client.url=${galabau.ords.base-url} public class ProcessIncomingRequest { public String zipName; // "export_2026-04-08" public String runId; // für Logging-Korrelation public LocalDateTime uploadedAt; } ``` ### Fehlerbehandlung - Schlägt der ORDS-Aufruf fehl → Marker liegt trotzdem vor - APEX Automation findet beim nächsten Stundenlauf den Marker und verarbeitet - **Keine** Versuche, den Fehler lokal zu beheben --- ## Abhängigkeiten (pom.xml) ```xml 25 io.quarkus.platform quarkus-bom ${quarkus.platform.version} pom import io.quarkus quarkus-arc io.quarkus quarkus-rest io.quarkus quarkus-rest-client-jackson io.quarkus quarkus-smallrye-context-propagation com.hierynomus sshj 0.38.0 com.oracle.oci.sdk oci-java-sdk-objectstorage 3.44.0 org.apache.commons commons-compress 1.26.1 io.quarkus quarkus-opentelemetry io.quarkus quarkus-smallrye-fault-tolerance io.quarkus quarkus-junit5 test ``` **Hinweis:** `oci-java-sdk-objectstorage` und `sshj` liegen nicht im Quarkus BOM — Versionen beim Projektstart auf Maven Central prüfen. Quarkus-eigene Artefakte (`quarkus-*`) brauchen keine ``, da das BOM sie bereitstellt. --- ## Observability & Logging ### MDC-Hierarchie ``` runId → ein ZIP-Verarbeitungslauf (gesetzt am Anfang jeder ZIP) step → aktueller Pipeline-Step fileIdx → welche Datei in diesem Schritt (optional) ``` MDC nach jeder ZIP mit `MDC.clear()` leeren — damit kein Kontext in die nächste ZIP sickert. ### Log-Level | Schritt | Level | Beispiel | |---|---|---| | `api-trigger` | INFO | "Processing triggered by APEX, starting async pipeline" | | `sftp-list` | INFO | "5 new ZIP files found on SFTP" | | `zip-extract` | INFO | "ZIP 'export_2026-04-08.zip' extracted: 12 files, 5 directories" | | `oci-upload` | INFO | "Uploaded 12 files + marker to OCI in 3.2s" | | `ords-notify` | INFO | "ORDS endpoint called, HTTP 200" | | `cleanup` | DEBUG | "Local files deleted: /tmp/sftp-work/export_2026-04-08.zip" | | Fehler | ERROR | Mit vollem MDC-Context und Exception | **Stack:** OTLP → Loki/Grafana (Entwicklung: Dev Services, Produktion: externer Collector) --- ## Migrations-Schritte 1. **Phase 1: Entwicklung** - Projekt-Setup (Java 25, Quarkus 3.x, SSHJ) - REST Endpoint (`FileProcessingResource`) mit API-Key-Auth + Concurrency Guard - SFTP-Operationen (SSHJ: list, download, rename) mit Fingerprint-Verification - ZIP-Entpacken (Apache Commons Compress) - OCI-Upload (OCI SDK, Instance Principal, Marker-Handling) - ORDS-Benachrichtigung (REST Client + Retry) - Lokaler Cleanup (always in finally) - Fehlerbehandlung + OTLP-Logging - Package `pck_auto_import` mit `p_process_incoming_files` auf DB-Seite implementieren - Unit-Tests + Integration-Tests 2. **Phase 2: Testing & Deployment** - E2E-Tests mit echtem SFTP / OCI / ORDS - Parallel-Run: n8n und Quarkus-Backend gleichzeitig aktiv - **APEX Automation aktualisieren:** URL von n8n-Webhook auf `http://backend:8080/api/process-incoming` ändern, `X-Api-Key`-Header ergänzen - Grafana Dashboards + Alerts aufsetzen 3. **Phase 3: Cutover** - n8n deaktivieren / entfernen - Monitoring etablieren --- ## Deployment-Constraints - **Einzelinstanz:** Der Service läuft als Einzelinstanz. Kein Clustering. Mehrere Instanzen würden dieselben SFTP-Dateien gleichzeitig verarbeiten. Der `AtomicBoolean`-Guard schützt nur innerhalb einer Instanz. Solange APEX Automation den Webhook nur einmal pro Lauf aufruft, kein Problem. - **Idempotenz:** OCI PUT ist idempotent. Bei abgebrochenem Lauf ohne Marker: nächster Trigger lädt ZIP erneut, Dateien werden überschrieben — kein korrupter Zustand. - **Disk-Space:** Lokales Arbeitsverzeichnis `/tmp/sftp-work` — Cleanup läuft immer im finally. Maximale gleichzeitige Belegung: eine ZIP + entpackte Dateien (sequenzielle Verarbeitung). Capacity Planning erforderlich (→ offene Frage). --- ## Vorteile gegenüber n8n | Aspekt | n8n | Quarkus Backend | |---|---|---| | **Abhängigkeiten** | Externe SaaS/self-hosted | Ein JAR | | **Fehlerbehandlung** | UI-basiert konfigurieren | Java Code, testbar | | **Observability** | n8n-Logs + externe Integrationen | OTLP → Loki/Grafana nativ | | **Sicherheit** | Credentials in n8n-DB | Env-Variablen / Secrets | | **Trigger** | n8n-Webhook | REST Endpoint (identisches Muster) | | **Wartbarkeit** | Workflow-Änderungen in UI | Code-Reviews, Git-History | | **Skalierung** | Horizontale Skalierung komplex | Standard Quarkus/Kubernetes | --- ## Architektur-Entscheidungen ✅ - [x] Trigger: **REST Endpoint** (von APEX Automation aufgerufen, identisch zu bisherigem n8n-Webhook) - [x] Scheduling: **APEX Automation** (bleibt unverändert — kein eigener Scheduler im Service) - [x] Async: **fire & forget** (202 Accepted sofort, Pipeline im Hintergrund — wie n8n) - [x] Endpoint-Auth: **X-Api-Key Header** (einfach, funktioniert mit APEX Web Service Calls) - [x] Concurrency Guard: **AtomicBoolean** (kein Doppellauf bei schnellen APEX-Retries) - [x] SFTP-Lösung: **SSHJ** (`com.hierynomus:sshj`) — leichtgewichtiger SFTP-Client - [x] SFTP Host-Key: **Fingerprint-basiert** via `galabau.sftp.host-key-fingerprint` in Config - [x] OCI Auth: **SimpleAuthenticationDetailsProvider** (Credentials via Env-Vars + gemountetes Kubernetes Secret für Private Key) - [x] OCI SDK: **OCI Java SDK** (HTTP Signature V1 automatisch) - [x] ZIP: **Apache Commons Compress** - [x] REST Clients: **MicroProfile REST Client** (configKey-basiert) - [x] Credentials: **`${ENV_VAR}`** in application.properties — Quarkus liest nativ aus Umgebungsvariablen - [x] Java: **25** - [x] Cleanup: **immer im finally** nach jeder ZIP ## Offene Fragen - [ ] Disk-Space: Wie groß dürfen ZIPs maximal sein? → Capacity Planning - [ ] SFTP-Auth: Password oder Public-Key für Produktion? → Public-Key empfohlen - [ ] Monitoring: Welche Grafana-Alerts sind Pflicht? → TBD mit OPS-Team