feat: persist generation logs and improve batch timeout handling
This commit is contained in:
153
src/server.js
153
src/server.js
@@ -47,6 +47,8 @@ Kurzbeschreibung: 30 bis 50 Worte
|
||||
Tip + Inhalt zusammen 100 bis 150 Worte (Langtext 400 Worte)`;
|
||||
|
||||
const generationJobs = new Map();
|
||||
const generationJobControllers = new Map();
|
||||
const generationJobOrder = [];
|
||||
let activeGenerationJobId = null;
|
||||
|
||||
const pool = new Pool({
|
||||
@@ -92,11 +94,14 @@ async function savePromptVersion(promptEditable) {
|
||||
await pool.query('INSERT INTO prompt_versions(prompt_editable, prompt_full) VALUES ($1, $2)', [promptEditable, `${promptEditable}\n\n${PROMPT_LOCKED_TAIL}`]);
|
||||
}
|
||||
|
||||
async function callOpenAI(systemPrompt, userPrompt, job = null) {
|
||||
async function callOpenAI(systemPrompt, userPrompt, job = null, jobId = null, timeoutMs = 90000) {
|
||||
if (!OPENAI_API_KEY) throw new Error('OPENAI_API_KEY fehlt');
|
||||
const controller = new AbortController();
|
||||
if (job) job.currentController = controller;
|
||||
const timeout = setTimeout(() => controller.abort(), 90000);
|
||||
if (jobId) {
|
||||
if (!generationJobControllers.has(jobId)) generationJobControllers.set(jobId, new Set());
|
||||
generationJobControllers.get(jobId).add(controller);
|
||||
}
|
||||
const timeout = setTimeout(() => controller.abort(), timeoutMs);
|
||||
let r;
|
||||
try {
|
||||
r = await fetch('https://api.openai.com/v1/chat/completions', {
|
||||
@@ -117,12 +122,14 @@ async function callOpenAI(systemPrompt, userPrompt, job = null) {
|
||||
} catch (e) {
|
||||
if (e?.name === 'AbortError') {
|
||||
if (job?.cancelRequested) throw new Error('Abgebrochen');
|
||||
throw new Error('OpenAI Timeout nach 90s');
|
||||
throw new Error(`OpenAI Timeout nach ${Math.round(timeoutMs/1000)}s`);
|
||||
}
|
||||
throw e;
|
||||
} finally {
|
||||
clearTimeout(timeout);
|
||||
if (job && job.currentController === controller) job.currentController = null;
|
||||
if (jobId && generationJobControllers.has(jobId)) {
|
||||
generationJobControllers.get(jobId).delete(controller);
|
||||
}
|
||||
}
|
||||
const j = await r.json();
|
||||
if (!r.ok) throw new Error(`OpenAI Fehler ${r.status}: ${JSON.stringify(j).slice(0, 400)}`);
|
||||
@@ -130,6 +137,12 @@ async function callOpenAI(systemPrompt, userPrompt, job = null) {
|
||||
return stripCodeFences(text);
|
||||
}
|
||||
|
||||
async function appendJobLog(jobId, level, message, meta = null) {
|
||||
try {
|
||||
await pool.query('INSERT INTO generation_logs(job_id, level, message, meta) VALUES ($1,$2,$3,$4::jsonb)', [jobId, level, message, JSON.stringify(meta || {})]);
|
||||
} catch {}
|
||||
}
|
||||
|
||||
async function initDb() {
|
||||
await pool.query(`
|
||||
CREATE TABLE IF NOT EXISTS exercise_cards (
|
||||
@@ -153,6 +166,16 @@ async function initDb() {
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
`);
|
||||
await pool.query(`
|
||||
CREATE TABLE IF NOT EXISTS generation_logs (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
job_id TEXT NOT NULL,
|
||||
level TEXT NOT NULL,
|
||||
message TEXT NOT NULL,
|
||||
meta JSONB,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
`);
|
||||
const promptCount = await pool.query('SELECT COUNT(*)::int AS c FROM prompt_versions');
|
||||
if (promptCount.rows[0].c === 0) await savePromptVersion(DEFAULT_PROMPT_EDITABLE);
|
||||
|
||||
@@ -297,6 +320,38 @@ app.post('/api/cards/test-selection', async (req, res) => {
|
||||
}
|
||||
});
|
||||
|
||||
app.get('/api/generate/recent', async (_req, res) => {
|
||||
try {
|
||||
const memItems = generationJobOrder
|
||||
.slice(-10)
|
||||
.reverse()
|
||||
.map((id) => generationJobs.get(id))
|
||||
.filter(Boolean)
|
||||
.map((j) => ({ id: j.id, status: j.status, total: j.total, done: j.done, updated: j.updated, errors: j.errors || [] }));
|
||||
|
||||
const { rows } = await pool.query(`
|
||||
SELECT job_id, MAX(created_at) AS last_at, COUNT(*)::int AS entries
|
||||
FROM generation_logs
|
||||
GROUP BY job_id
|
||||
ORDER BY last_at DESC
|
||||
LIMIT 20
|
||||
`);
|
||||
return res.json({ ok: true, items: memItems, persisted: rows });
|
||||
} catch (e) {
|
||||
return res.status(500).json({ ok: false, error: e.message || String(e) });
|
||||
}
|
||||
});
|
||||
|
||||
app.get('/api/generate/logs/:jobId', async (req, res) => {
|
||||
try {
|
||||
const jobId = String(req.params.jobId || '');
|
||||
const { rows } = await pool.query('SELECT id, level, message, meta, created_at FROM generation_logs WHERE job_id=$1 ORDER BY id ASC LIMIT 500', [jobId]);
|
||||
return res.json({ ok: true, jobId, items: rows });
|
||||
} catch (e) {
|
||||
return res.status(500).json({ ok: false, error: e.message || String(e) });
|
||||
}
|
||||
});
|
||||
|
||||
app.get('/api/generate/active', (_req, res) => {
|
||||
if (!activeGenerationJobId) return res.json({ ok: true, active: null });
|
||||
const job = generationJobs.get(activeGenerationJobId);
|
||||
@@ -314,8 +369,11 @@ app.post('/api/generate/job/:id/cancel', (req, res) => {
|
||||
if (['done', 'failed', 'aborted'].includes(job.status)) return res.json({ ok: true, status: job.status });
|
||||
job.cancelRequested = true;
|
||||
job.status = 'cancelling';
|
||||
if (job.currentController) {
|
||||
try { job.currentController.abort(); } catch {}
|
||||
const ctrls = generationJobControllers.get(id);
|
||||
if (ctrls && ctrls.size) {
|
||||
for (const c of ctrls) {
|
||||
try { c.abort(); } catch {}
|
||||
}
|
||||
}
|
||||
job.updated = Date.now();
|
||||
res.json({ ok: true, status: job.status });
|
||||
@@ -338,8 +396,11 @@ app.post('/api/generate/run', async (req, res) => {
|
||||
await savePromptVersion(promptEditable);
|
||||
|
||||
const jobId = crypto.randomUUID();
|
||||
generationJobs.set(jobId, { id: jobId, status: 'running', total: 0, done: 0, errors: [], updated: Date.now(), cancelRequested: false, currentController: null });
|
||||
generationJobs.set(jobId, { id: jobId, status: 'running', total: 0, done: 0, errors: [], updated: Date.now(), cancelRequested: false });
|
||||
generationJobOrder.push(jobId);
|
||||
generationJobControllers.set(jobId, new Set());
|
||||
activeGenerationJobId = jobId;
|
||||
await appendJobLog(jobId, 'info', 'Job gestartet');
|
||||
|
||||
(async () => {
|
||||
const job = generationJobs.get(jobId);
|
||||
@@ -350,29 +411,73 @@ app.post('/api/generate/run', async (req, res) => {
|
||||
const q = ids && ids.length ? await pool.query(filterSql, [ids]) : await pool.query(filterSql);
|
||||
const rows = q.rows || [];
|
||||
job.total = rows.length;
|
||||
for (const row of rows) {
|
||||
await appendJobLog(jobId, 'info', 'Karten geladen', { total: rows.length });
|
||||
|
||||
const chunks = [];
|
||||
for (let i = 0; i < rows.length; i += 5) chunks.push(rows.slice(i, i + 5));
|
||||
|
||||
for (const chunk of chunks) {
|
||||
if (job.cancelRequested) {
|
||||
job.status = 'aborted';
|
||||
break;
|
||||
}
|
||||
const userPrompt = JSON.stringify({ titel: row.input?.titel || '', card: row.input || {} }, null, 2);
|
||||
|
||||
const payload = chunk.map((row) => ({
|
||||
id: row.id,
|
||||
titel: row.input?.titel || '',
|
||||
card: row.input || {}
|
||||
}));
|
||||
await appendJobLog(jobId, 'info', 'Batch gestartet', { ids: payload.map(p=>p.id), size: payload.length });
|
||||
|
||||
let success = false;
|
||||
let lastError = null;
|
||||
|
||||
for (let attempt = 1; attempt <= 3; attempt += 1) {
|
||||
if (job.cancelRequested) {
|
||||
job.status = 'aborted';
|
||||
break;
|
||||
}
|
||||
try {
|
||||
const raw = await callOpenAI(`${promptEditable}\n\n${PROMPT_LOCKED_TAIL}`, userPrompt, job);
|
||||
const userPrompt = [
|
||||
'Erzeuge für ALLE übergebenen Karten neue Outputs.',
|
||||
`Es sind ${payload.length} Karten.`,
|
||||
'Antworte als JSON-Array mit exakt gleich vielen Elementen und gleicher Reihenfolge.',
|
||||
JSON.stringify(payload, null, 2)
|
||||
].join('\n\n');
|
||||
|
||||
const strictSingleCardNotice = [
|
||||
'WICHTIG: Auch wenn mehrere Karten gemeinsam übergeben werden, muss JEDE Karte einzeln verarbeitet werden.',
|
||||
'Für JEDE einzelne Karte gelten ALLE Regeln aus dem editierbaren Prompt strikt.',
|
||||
'Insbesondere Absatzstruktur/Zeilenumbrüche beibehalten bzw. korrekt gemäß Vorgaben erzeugen (keine verlorenen Absätze).',
|
||||
'Keine Regel darf durch Batch-Verarbeitung abgeschwächt werden.'
|
||||
].join('\n');
|
||||
const batchTimeoutMs = Math.max(90000, payload.length * 90000);
|
||||
await appendJobLog(jobId, 'info', 'Batch-Request an OpenAI', { attempt, size: payload.length, timeoutMs: batchTimeoutMs });
|
||||
const raw = await callOpenAI(`${strictSingleCardNotice}\n\n${promptEditable}\n\n${PROMPT_LOCKED_TAIL}`, userPrompt, job, jobId, batchTimeoutMs);
|
||||
if (job.cancelRequested) {
|
||||
job.status = 'aborted';
|
||||
break;
|
||||
}
|
||||
|
||||
const parsed = JSON.parse(raw);
|
||||
if (!isValidOutputShape(parsed)) throw new Error('Schema ungültig');
|
||||
await pool.query('UPDATE exercise_cards SET output=$2::jsonb, updated_at=NOW() WHERE id=$1', [row.id, JSON.stringify(parsed)]);
|
||||
const arr = Array.isArray(parsed) ? parsed : Array.isArray(parsed?.items) ? parsed.items : null;
|
||||
if (!arr) throw new Error('Antwort ist kein JSON-Array');
|
||||
if (arr.length !== payload.length) throw new Error(`Antwortgröße passt nicht (${arr.length} statt ${payload.length})`);
|
||||
|
||||
for (let i = 0; i < chunk.length; i++) {
|
||||
const out = arr[i];
|
||||
if (!isValidOutputShape(out)) throw new Error(`Schema ungültig bei Index ${i}`);
|
||||
}
|
||||
|
||||
for (let i = 0; i < chunk.length; i++) {
|
||||
const row = chunk[i];
|
||||
const out = arr[i];
|
||||
await pool.query('UPDATE exercise_cards SET output=$2::jsonb, updated_at=NOW() WHERE id=$1', [row.id, JSON.stringify(out)]);
|
||||
job.done += 1;
|
||||
job.updated = Date.now();
|
||||
}
|
||||
await appendJobLog(jobId, 'info', 'Batch erfolgreich', { ids: chunk.map(c=>c.id), done: job.done, total: job.total });
|
||||
|
||||
success = true;
|
||||
break;
|
||||
} catch (e) {
|
||||
@@ -381,23 +486,33 @@ app.post('/api/generate/run', async (req, res) => {
|
||||
break;
|
||||
}
|
||||
lastError = e;
|
||||
await appendJobLog(jobId, 'warn', 'Batch-Versuch fehlgeschlagen', { attempt, error: e?.message || String(e), ids: chunk.map(c=>c.id) });
|
||||
}
|
||||
}
|
||||
|
||||
if (!success && job.status !== 'aborted') {
|
||||
job.errors.push({ id: row.id, error: (lastError?.message || String(lastError || 'Unbekannter Fehler')) + ' (nach 3 Versuchen)' });
|
||||
await appendJobLog(jobId, 'error', 'Batch endgültig fehlgeschlagen', { error: (lastError?.message || String(lastError || 'Unbekannter Fehler')), ids: chunk.map(c=>c.id) });
|
||||
for (const row of chunk) {
|
||||
job.errors.push({ id: row.id, error: (lastError?.message || String(lastError || 'Unbekannter Fehler')) + ' (Batch nach 3 Versuchen)' });
|
||||
job.done += 1;
|
||||
job.updated = Date.now();
|
||||
}
|
||||
}
|
||||
|
||||
job.done += 1;
|
||||
job.updated = Date.now();
|
||||
}
|
||||
if (job.status !== 'aborted') job.status = 'done';
|
||||
|
||||
if (job.status !== 'aborted') {
|
||||
job.status = 'done';
|
||||
await appendJobLog(jobId, 'info', 'Job abgeschlossen', { done: job.done, total: job.total, errors: job.errors.length });
|
||||
} else {
|
||||
await appendJobLog(jobId, 'warn', 'Job abgebrochen', { done: job.done, total: job.total });
|
||||
}
|
||||
} catch (e) {
|
||||
job.status = 'failed';
|
||||
job.errors.push({ error: e.message || String(e) });
|
||||
await appendJobLog(jobId, 'error', 'Job fehlgeschlagen', { error: e.message || String(e) });
|
||||
} finally {
|
||||
job.updated = Date.now();
|
||||
job.currentController = null;
|
||||
generationJobControllers.delete(job.id);
|
||||
if (activeGenerationJobId === job.id) activeGenerationJobId = null;
|
||||
}
|
||||
})();
|
||||
|
||||
Reference in New Issue
Block a user