OCI / ORDS Implementierung hinzugefügt
This commit is contained in:
@@ -0,0 +1,45 @@
|
||||
package de.galabau.dateieingang.config;
|
||||
|
||||
import io.smallrye.config.ConfigMapping;
|
||||
|
||||
/** OCI Object Storage Konfiguration. Credentials kommen ausschließlich aus Umgebungsvariablen. */
|
||||
@ConfigMapping(prefix = "galabau.oci")
|
||||
public interface OciConfig {
|
||||
|
||||
/** OCI Object Storage Namespace, z.B. {@code frhqaxi5sgcg}. */
|
||||
String namespace();
|
||||
|
||||
/** OCI Region, z.B. {@code eu-frankfurt-1}. */
|
||||
String region();
|
||||
|
||||
/** OCI Bucket-Name. */
|
||||
String bucket();
|
||||
|
||||
/**
|
||||
* Root-Prefix für alle Objekte im Bucket, z.B. {@code mandant_42/}.
|
||||
* Muss mit {@code /} enden.
|
||||
*/
|
||||
String tenantPrefix();
|
||||
|
||||
/**
|
||||
* Prefix für eingehende Dateien unterhalb von {@code tenantPrefix},
|
||||
* z.B. {@code eingang/}. Muss mit {@code /} enden.
|
||||
*/
|
||||
String incomingPrefix();
|
||||
|
||||
/** OCI Tenancy OCID. Aus Env-Var {@code OCI_TENANCY_ID}. */
|
||||
String tenancyId();
|
||||
|
||||
/** OCI User OCID. Aus Env-Var {@code OCI_USER_ID}. */
|
||||
String userId();
|
||||
|
||||
/** API Key Fingerprint, z.B. {@code aa:bb:cc:...}. Aus Env-Var {@code OCI_FINGERPRINT}. */
|
||||
String fingerprint();
|
||||
|
||||
/**
|
||||
* Dateisystempfad zur PEM-Datei des OCI API Keys.
|
||||
* Produktion: absoluter Pfad zum Kubernetes Secret Volume Mount.
|
||||
* Dev: relativer Pfad zum Projektverzeichnis (Default: {@code oci-private-key.pem}).
|
||||
*/
|
||||
String privateKeyPath();
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
package de.galabau.dateieingang.config;
|
||||
|
||||
import io.smallrye.config.ConfigMapping;
|
||||
|
||||
/** ORDS-Konfiguration für den Dateieingang-Endpunkt. */
|
||||
@ConfigMapping(prefix = "galabau.ords")
|
||||
public interface OrdsConfig {
|
||||
|
||||
/**
|
||||
* Base-URL des ORDS-Moduls bis einschließlich Modul-Pfad,
|
||||
* z.B. {@code https://apex.example.com/ords/myschema/auto_import}.
|
||||
* Wird direkt als {@code quarkus.rest-client.ords-client.url} verwendet.
|
||||
*/
|
||||
String baseUrl();
|
||||
|
||||
/**
|
||||
* API-Key für den ORDS-Endpunkt (Header: {@code X-Api-Key}).
|
||||
* Aus Env-Var {@code GALABAU_ORDS_API_KEY}.
|
||||
*/
|
||||
String apiKey();
|
||||
}
|
||||
@@ -1,32 +1,121 @@
|
||||
package de.galabau.dateieingang.oci;
|
||||
|
||||
import com.oracle.bmc.auth.SimpleAuthenticationDetailsProvider;
|
||||
import com.oracle.bmc.objectstorage.ObjectStorage;
|
||||
import com.oracle.bmc.objectstorage.ObjectStorageClient;
|
||||
import com.oracle.bmc.objectstorage.requests.PutObjectRequest;
|
||||
import de.galabau.dateieingang.config.OciConfig;
|
||||
import de.galabau.dateieingang.exception.OciException;
|
||||
import de.galabau.dateieingang.model.FileEntry;
|
||||
import de.galabau.dateieingang.model.ProcessingContext;
|
||||
import de.galabau.dateieingang.model.ProcessingStatus;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Lädt die entpackten Dateien und den Marker in OCI Object Storage hoch.
|
||||
*
|
||||
* <p><b>Stub:</b> OCI-Upload ist noch nicht implementiert.
|
||||
* Der Upload wird übersprungen und der Status auf {@link ProcessingStatus#MARKER_UPLOADED} gesetzt,
|
||||
* damit der Rest der Pipeline (SFTP-Rename, ORDS-Notify) getestet werden kann.
|
||||
* Authentifizierung via OCI HTTP Signature V1 (entspricht APEX Web Credential vom Typ OCI).
|
||||
*/
|
||||
@ApplicationScoped
|
||||
public class OciUploadService {
|
||||
|
||||
@Inject
|
||||
OciConfig config;
|
||||
|
||||
private ObjectStorage client;
|
||||
|
||||
@PostConstruct
|
||||
void init() {
|
||||
SimpleAuthenticationDetailsProvider auth = SimpleAuthenticationDetailsProvider.builder()
|
||||
.tenantId(config.tenancyId())
|
||||
.userId(config.userId())
|
||||
.fingerprint(config.fingerprint())
|
||||
.region(com.oracle.bmc.Region.fromRegionId(config.region()))
|
||||
.privateKeySupplier(() -> {
|
||||
try {
|
||||
return Files.newInputStream(Path.of(config.privateKeyPath()));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("OCI Private Key nicht lesbar: "
|
||||
+ config.privateKeyPath(), e);
|
||||
}
|
||||
})
|
||||
.build();
|
||||
|
||||
this.client = ObjectStorageClient.builder().build(auth);
|
||||
}
|
||||
|
||||
/**
|
||||
* Lädt alle Dateien aus {@code context.extractedFiles} sowie den Marker in OCI hoch.
|
||||
* Dateien mit {@code isMarker = true} werden übersprungen — der Marker wird separat
|
||||
* am Ende hochgeladen, um sicherzustellen dass er erst nach allen Dateien erscheint.
|
||||
*
|
||||
* @param context enthält die Liste der hochzuladenden Dateien und den Ziel-Prefix
|
||||
* @throws OciException bei persistenten OCI-Fehlern (4xx) nach Retry-Erschöpfung
|
||||
* @throws OciException bei Verbindungs- oder Upload-Fehlern
|
||||
*/
|
||||
public void upload(ProcessingContext context) throws OciException {
|
||||
// TODO: OCI-Upload implementieren (OCI SDK, SimpleAuthenticationDetailsProvider)
|
||||
Log.infof("[STUB] OCI-Upload übersprungen für '%s' (%d Dateien) — wird später implementiert",
|
||||
context.zipNameWithoutExt, context.extractedFiles.size());
|
||||
context.status = ProcessingStatus.PARTIALLY_UPLOADED;
|
||||
|
||||
List<FileEntry> files = context.extractedFiles.stream()
|
||||
.filter(e -> !e.isMarker)
|
||||
.toList();
|
||||
|
||||
for (FileEntry entry : files) {
|
||||
String key = buildKey(context.zipNameWithoutExt, entry.relativePath);
|
||||
entry.ociKey = key;
|
||||
putFile(key, context.localExtractDir.resolve(entry.relativePath), entry.fileSize);
|
||||
Log.debugf("Datei hochgeladen: %s (%d Bytes)", key, entry.fileSize);
|
||||
}
|
||||
|
||||
String markerKey = buildKey(context.zipNameWithoutExt, "_READY_FOR_DB_PROCESSING_");
|
||||
putEmpty(markerKey);
|
||||
|
||||
context.markerUploaded = true;
|
||||
context.status = ProcessingStatus.MARKER_UPLOADED;
|
||||
Log.infof("OCI-Upload abgeschlossen: %d Datei(en) + Marker in '%s'",
|
||||
files.size(), buildPrefix(context.zipNameWithoutExt));
|
||||
}
|
||||
|
||||
private String buildPrefix(String zipNameWithoutExt) {
|
||||
return config.tenantPrefix() + config.incomingPrefix() + zipNameWithoutExt + "/";
|
||||
}
|
||||
|
||||
private String buildKey(String zipNameWithoutExt, String relativePath) {
|
||||
return buildPrefix(zipNameWithoutExt) + relativePath;
|
||||
}
|
||||
|
||||
private void putFile(String key, Path localFile, long fileSize) throws OciException {
|
||||
try (InputStream is = Files.newInputStream(localFile)) {
|
||||
client.putObject(PutObjectRequest.builder()
|
||||
.namespaceName(config.namespace())
|
||||
.bucketName(config.bucket())
|
||||
.objectName(key)
|
||||
.putObjectBody(is)
|
||||
.contentLength(fileSize)
|
||||
.build());
|
||||
} catch (Exception e) {
|
||||
throw new OciException("OCI-Upload fehlgeschlagen für '" + key + "'", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void putEmpty(String key) throws OciException {
|
||||
try (InputStream is = InputStream.nullInputStream()) {
|
||||
client.putObject(PutObjectRequest.builder()
|
||||
.namespaceName(config.namespace())
|
||||
.bucketName(config.bucket())
|
||||
.objectName(key)
|
||||
.putObjectBody(is)
|
||||
.contentLength(0L)
|
||||
.build());
|
||||
} catch (Exception e) {
|
||||
throw new OciException("OCI-Upload Marker fehlgeschlagen für '" + key + "'", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
package de.galabau.dateieingang.ords;
|
||||
|
||||
import jakarta.ws.rs.HeaderParam;
|
||||
import jakarta.ws.rs.POST;
|
||||
import jakarta.ws.rs.Path;
|
||||
import jakarta.ws.rs.core.Response;
|
||||
import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;
|
||||
|
||||
/**
|
||||
* MicroProfile REST Client für den ORDS-Endpunkt des Dateieingang-Service.
|
||||
* Base-URL wird über {@code quarkus.rest-client.ords-client.url} konfiguriert
|
||||
* und zeigt auf den ORDS-Modul-Pfad (z.B. {@code .../ords/myschema/auto_import}).
|
||||
* Weitere Endpunkte zukünftiger Pipelines werden hier als neue Methoden ergänzt.
|
||||
*/
|
||||
@RegisterRestClient(configKey = "ords-client")
|
||||
public interface OrdsClient {
|
||||
|
||||
/**
|
||||
* Ruft {@code pck_auto_import.p_process_incoming_ba_data} über ORDS auf.
|
||||
* Die Prozedur verarbeitet alle offenen OCI-Batches (Unterordner mit Marker).
|
||||
*
|
||||
* @param apiKey API-Key aus {@code galabau.ords.api-key} (Header: {@code X-Api-Key})
|
||||
* @return ORDS-Response (erwartet: 2xx)
|
||||
*/
|
||||
@POST
|
||||
@Path("/process_incoming_ba_data")
|
||||
Response processIncomingBaData(@HeaderParam("X-Api-Key") String apiKey);
|
||||
}
|
||||
@@ -1,30 +1,60 @@
|
||||
package de.galabau.dateieingang.ords;
|
||||
|
||||
import de.galabau.dateieingang.config.OrdsConfig;
|
||||
import de.galabau.dateieingang.exception.OrdsException;
|
||||
import de.galabau.dateieingang.model.ProcessingContext;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.ws.rs.core.Response;
|
||||
import org.eclipse.microprofile.faulttolerance.Retry;
|
||||
import org.eclipse.microprofile.faulttolerance.Timeout;
|
||||
import org.eclipse.microprofile.rest.client.inject.RestClient;
|
||||
|
||||
import java.time.temporal.ChronoUnit;
|
||||
|
||||
/**
|
||||
* Benachrichtigt den ORDS-Endpunkt {@code pck_auto_import.p_process_incoming_files},
|
||||
* Benachrichtigt den ORDS-Endpunkt {@code pck_auto_import.p_process_incoming_ba_data},
|
||||
* damit die DB-Verarbeitung sofort angestoßen wird.
|
||||
*
|
||||
* <p><b>Stub:</b> ORDS-Aufruf ist noch nicht implementiert.
|
||||
* Bei einem Ausfall wäre die Verarbeitung ohnehin durch die APEX Automation abgesichert
|
||||
* (diese findet den Marker beim nächsten Stundenlauf).
|
||||
* <p>Bei Ausfall ist die Verarbeitung durch die APEX Automation abgesichert:
|
||||
* Sie findet den Marker beim nächsten Stundenlauf und ruft die Prozedur selbst auf.
|
||||
*/
|
||||
@ApplicationScoped
|
||||
public class OrdsNotificationService {
|
||||
|
||||
@Inject
|
||||
@RestClient
|
||||
OrdsClient ordsClient;
|
||||
|
||||
@Inject
|
||||
OrdsConfig config;
|
||||
|
||||
/**
|
||||
* Sendet eine Benachrichtigung an den ORDS-Endpunkt.
|
||||
* Wird bei transienten Fehlern bis zu 3-mal wiederholt (1s Backoff).
|
||||
*
|
||||
* @param context enthält {@code zipNameWithoutExt} und {@code runId} für den Request-Body
|
||||
* @param context enthält {@code zipNameWithoutExt} für das Log
|
||||
* @throws OrdsException wenn der ORDS-Aufruf nach allen Retries fehlschlägt
|
||||
*/
|
||||
@Retry(maxRetries = 3, delay = 1000, delayUnit = ChronoUnit.MILLIS,
|
||||
retryOn = OrdsException.class)
|
||||
@Timeout(value = 10, unit = ChronoUnit.SECONDS)
|
||||
public void notify(ProcessingContext context) throws OrdsException {
|
||||
// TODO: ORDS REST-Client implementieren (MicroProfile REST Client + @Retry)
|
||||
Log.infof("[STUB] ORDS-Benachrichtigung übersprungen für '%s' — wird später implementiert",
|
||||
context.zipNameWithoutExt);
|
||||
Response response;
|
||||
try {
|
||||
response = ordsClient.processIncomingBaData(config.apiKey());
|
||||
} catch (Exception e) {
|
||||
throw new OrdsException("ORDS-Verbindung fehlgeschlagen für '"
|
||||
+ context.zipNameWithoutExt + "'", e);
|
||||
}
|
||||
|
||||
int status = response.getStatus();
|
||||
if (status >= 400) {
|
||||
throw new OrdsException("ORDS antwortete mit HTTP " + status
|
||||
+ " für '" + context.zipNameWithoutExt + "'");
|
||||
}
|
||||
|
||||
Log.infof("ORDS-Endpunkt aufgerufen, HTTP %d für '%s'", status, context.zipNameWithoutExt);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -113,7 +113,7 @@ public class FileProcessingPipeline {
|
||||
Log.infof("ZIP '%s' entpackt: %d Datei(en)", zipFilename,
|
||||
context.extractedFiles.size());
|
||||
|
||||
// --- OCI Upload (Stub) ---
|
||||
// --- OCI Upload ---
|
||||
MDC.put("step", "oci-upload");
|
||||
ociUploadService.upload(context);
|
||||
|
||||
@@ -122,7 +122,7 @@ public class FileProcessingPipeline {
|
||||
sftpService.renameRemote(zipFilename, zipFilename + ".processed");
|
||||
Log.infof("SFTP Rename: '%s' → '%s.processed'", zipFilename, zipFilename);
|
||||
|
||||
// --- ORDS Notify (Stub) ---
|
||||
// --- ORDS Notify ---
|
||||
MDC.put("step", "ords-notify");
|
||||
ordsNotificationService.notify(context);
|
||||
|
||||
|
||||
@@ -60,8 +60,10 @@ public class SftpService {
|
||||
if (config.hostKeyFingerprint().isPresent()) {
|
||||
ssh.addHostKeyVerifier(config.hostKeyFingerprint().get());
|
||||
} else {
|
||||
Log.warn("SFTP Host-Key-Fingerprint nicht konfiguriert — PromiscuousVerifier aktiv (nur Dev!)");
|
||||
ssh.addHostKeyVerifier(new PromiscuousVerifier());
|
||||
Log.warn("SFTP Host-Key-Fingerprint nicht konfiguriert");
|
||||
throw new IllegalStateException(
|
||||
"SFTP Host-Key-Fingerprint muss konfiguriert sein!"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user