From 068c433a8ff6b7e74d977bb59026392500d51c38 Mon Sep 17 00:00:00 2001 From: OpenClaw Bot Date: Sun, 8 Mar 2026 10:07:26 +0100 Subject: [PATCH] feat: persist generation logs and improve batch timeout handling --- src/server.js | 153 +++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 134 insertions(+), 19 deletions(-) diff --git a/src/server.js b/src/server.js index 73c1693..fd45897 100644 --- a/src/server.js +++ b/src/server.js @@ -47,6 +47,8 @@ Kurzbeschreibung: 30 bis 50 Worte Tip + Inhalt zusammen 100 bis 150 Worte (Langtext 400 Worte)`; const generationJobs = new Map(); +const generationJobControllers = new Map(); +const generationJobOrder = []; let activeGenerationJobId = null; const pool = new Pool({ @@ -92,11 +94,14 @@ async function savePromptVersion(promptEditable) { await pool.query('INSERT INTO prompt_versions(prompt_editable, prompt_full) VALUES ($1, $2)', [promptEditable, `${promptEditable}\n\n${PROMPT_LOCKED_TAIL}`]); } -async function callOpenAI(systemPrompt, userPrompt, job = null) { +async function callOpenAI(systemPrompt, userPrompt, job = null, jobId = null, timeoutMs = 90000) { if (!OPENAI_API_KEY) throw new Error('OPENAI_API_KEY fehlt'); const controller = new AbortController(); - if (job) job.currentController = controller; - const timeout = setTimeout(() => controller.abort(), 90000); + if (jobId) { + if (!generationJobControllers.has(jobId)) generationJobControllers.set(jobId, new Set()); + generationJobControllers.get(jobId).add(controller); + } + const timeout = setTimeout(() => controller.abort(), timeoutMs); let r; try { r = await fetch('https://api.openai.com/v1/chat/completions', { @@ -117,12 +122,14 @@ async function callOpenAI(systemPrompt, userPrompt, job = null) { } catch (e) { if (e?.name === 'AbortError') { if (job?.cancelRequested) throw new Error('Abgebrochen'); - throw new Error('OpenAI Timeout nach 90s'); + throw new Error(`OpenAI Timeout nach ${Math.round(timeoutMs/1000)}s`); } throw e; } finally { clearTimeout(timeout); - if (job && job.currentController === controller) job.currentController = null; + if (jobId && generationJobControllers.has(jobId)) { + generationJobControllers.get(jobId).delete(controller); + } } const j = await r.json(); if (!r.ok) throw new Error(`OpenAI Fehler ${r.status}: ${JSON.stringify(j).slice(0, 400)}`); @@ -130,6 +137,12 @@ async function callOpenAI(systemPrompt, userPrompt, job = null) { return stripCodeFences(text); } +async function appendJobLog(jobId, level, message, meta = null) { + try { + await pool.query('INSERT INTO generation_logs(job_id, level, message, meta) VALUES ($1,$2,$3,$4::jsonb)', [jobId, level, message, JSON.stringify(meta || {})]); + } catch {} +} + async function initDb() { await pool.query(` CREATE TABLE IF NOT EXISTS exercise_cards ( @@ -153,6 +166,16 @@ async function initDb() { created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); `); + await pool.query(` + CREATE TABLE IF NOT EXISTS generation_logs ( + id BIGSERIAL PRIMARY KEY, + job_id TEXT NOT NULL, + level TEXT NOT NULL, + message TEXT NOT NULL, + meta JSONB, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ); + `); const promptCount = await pool.query('SELECT COUNT(*)::int AS c FROM prompt_versions'); if (promptCount.rows[0].c === 0) await savePromptVersion(DEFAULT_PROMPT_EDITABLE); @@ -297,6 +320,38 @@ app.post('/api/cards/test-selection', async (req, res) => { } }); +app.get('/api/generate/recent', async (_req, res) => { + try { + const memItems = generationJobOrder + .slice(-10) + .reverse() + .map((id) => generationJobs.get(id)) + .filter(Boolean) + .map((j) => ({ id: j.id, status: j.status, total: j.total, done: j.done, updated: j.updated, errors: j.errors || [] })); + + const { rows } = await pool.query(` + SELECT job_id, MAX(created_at) AS last_at, COUNT(*)::int AS entries + FROM generation_logs + GROUP BY job_id + ORDER BY last_at DESC + LIMIT 20 + `); + return res.json({ ok: true, items: memItems, persisted: rows }); + } catch (e) { + return res.status(500).json({ ok: false, error: e.message || String(e) }); + } +}); + +app.get('/api/generate/logs/:jobId', async (req, res) => { + try { + const jobId = String(req.params.jobId || ''); + const { rows } = await pool.query('SELECT id, level, message, meta, created_at FROM generation_logs WHERE job_id=$1 ORDER BY id ASC LIMIT 500', [jobId]); + return res.json({ ok: true, jobId, items: rows }); + } catch (e) { + return res.status(500).json({ ok: false, error: e.message || String(e) }); + } +}); + app.get('/api/generate/active', (_req, res) => { if (!activeGenerationJobId) return res.json({ ok: true, active: null }); const job = generationJobs.get(activeGenerationJobId); @@ -314,8 +369,11 @@ app.post('/api/generate/job/:id/cancel', (req, res) => { if (['done', 'failed', 'aborted'].includes(job.status)) return res.json({ ok: true, status: job.status }); job.cancelRequested = true; job.status = 'cancelling'; - if (job.currentController) { - try { job.currentController.abort(); } catch {} + const ctrls = generationJobControllers.get(id); + if (ctrls && ctrls.size) { + for (const c of ctrls) { + try { c.abort(); } catch {} + } } job.updated = Date.now(); res.json({ ok: true, status: job.status }); @@ -338,8 +396,11 @@ app.post('/api/generate/run', async (req, res) => { await savePromptVersion(promptEditable); const jobId = crypto.randomUUID(); - generationJobs.set(jobId, { id: jobId, status: 'running', total: 0, done: 0, errors: [], updated: Date.now(), cancelRequested: false, currentController: null }); + generationJobs.set(jobId, { id: jobId, status: 'running', total: 0, done: 0, errors: [], updated: Date.now(), cancelRequested: false }); + generationJobOrder.push(jobId); + generationJobControllers.set(jobId, new Set()); activeGenerationJobId = jobId; + await appendJobLog(jobId, 'info', 'Job gestartet'); (async () => { const job = generationJobs.get(jobId); @@ -350,29 +411,73 @@ app.post('/api/generate/run', async (req, res) => { const q = ids && ids.length ? await pool.query(filterSql, [ids]) : await pool.query(filterSql); const rows = q.rows || []; job.total = rows.length; - for (const row of rows) { + await appendJobLog(jobId, 'info', 'Karten geladen', { total: rows.length }); + + const chunks = []; + for (let i = 0; i < rows.length; i += 5) chunks.push(rows.slice(i, i + 5)); + + for (const chunk of chunks) { if (job.cancelRequested) { job.status = 'aborted'; break; } - const userPrompt = JSON.stringify({ titel: row.input?.titel || '', card: row.input || {} }, null, 2); + + const payload = chunk.map((row) => ({ + id: row.id, + titel: row.input?.titel || '', + card: row.input || {} + })); + await appendJobLog(jobId, 'info', 'Batch gestartet', { ids: payload.map(p=>p.id), size: payload.length }); let success = false; let lastError = null; + for (let attempt = 1; attempt <= 3; attempt += 1) { if (job.cancelRequested) { job.status = 'aborted'; break; } try { - const raw = await callOpenAI(`${promptEditable}\n\n${PROMPT_LOCKED_TAIL}`, userPrompt, job); + const userPrompt = [ + 'Erzeuge für ALLE übergebenen Karten neue Outputs.', + `Es sind ${payload.length} Karten.`, + 'Antworte als JSON-Array mit exakt gleich vielen Elementen und gleicher Reihenfolge.', + JSON.stringify(payload, null, 2) + ].join('\n\n'); + + const strictSingleCardNotice = [ + 'WICHTIG: Auch wenn mehrere Karten gemeinsam übergeben werden, muss JEDE Karte einzeln verarbeitet werden.', + 'Für JEDE einzelne Karte gelten ALLE Regeln aus dem editierbaren Prompt strikt.', + 'Insbesondere Absatzstruktur/Zeilenumbrüche beibehalten bzw. korrekt gemäß Vorgaben erzeugen (keine verlorenen Absätze).', + 'Keine Regel darf durch Batch-Verarbeitung abgeschwächt werden.' + ].join('\n'); + const batchTimeoutMs = Math.max(90000, payload.length * 90000); + await appendJobLog(jobId, 'info', 'Batch-Request an OpenAI', { attempt, size: payload.length, timeoutMs: batchTimeoutMs }); + const raw = await callOpenAI(`${strictSingleCardNotice}\n\n${promptEditable}\n\n${PROMPT_LOCKED_TAIL}`, userPrompt, job, jobId, batchTimeoutMs); if (job.cancelRequested) { job.status = 'aborted'; break; } + const parsed = JSON.parse(raw); - if (!isValidOutputShape(parsed)) throw new Error('Schema ungültig'); - await pool.query('UPDATE exercise_cards SET output=$2::jsonb, updated_at=NOW() WHERE id=$1', [row.id, JSON.stringify(parsed)]); + const arr = Array.isArray(parsed) ? parsed : Array.isArray(parsed?.items) ? parsed.items : null; + if (!arr) throw new Error('Antwort ist kein JSON-Array'); + if (arr.length !== payload.length) throw new Error(`Antwortgröße passt nicht (${arr.length} statt ${payload.length})`); + + for (let i = 0; i < chunk.length; i++) { + const out = arr[i]; + if (!isValidOutputShape(out)) throw new Error(`Schema ungültig bei Index ${i}`); + } + + for (let i = 0; i < chunk.length; i++) { + const row = chunk[i]; + const out = arr[i]; + await pool.query('UPDATE exercise_cards SET output=$2::jsonb, updated_at=NOW() WHERE id=$1', [row.id, JSON.stringify(out)]); + job.done += 1; + job.updated = Date.now(); + } + await appendJobLog(jobId, 'info', 'Batch erfolgreich', { ids: chunk.map(c=>c.id), done: job.done, total: job.total }); + success = true; break; } catch (e) { @@ -381,23 +486,33 @@ app.post('/api/generate/run', async (req, res) => { break; } lastError = e; + await appendJobLog(jobId, 'warn', 'Batch-Versuch fehlgeschlagen', { attempt, error: e?.message || String(e), ids: chunk.map(c=>c.id) }); } } if (!success && job.status !== 'aborted') { - job.errors.push({ id: row.id, error: (lastError?.message || String(lastError || 'Unbekannter Fehler')) + ' (nach 3 Versuchen)' }); + await appendJobLog(jobId, 'error', 'Batch endgültig fehlgeschlagen', { error: (lastError?.message || String(lastError || 'Unbekannter Fehler')), ids: chunk.map(c=>c.id) }); + for (const row of chunk) { + job.errors.push({ id: row.id, error: (lastError?.message || String(lastError || 'Unbekannter Fehler')) + ' (Batch nach 3 Versuchen)' }); + job.done += 1; + job.updated = Date.now(); + } } - - job.done += 1; - job.updated = Date.now(); } - if (job.status !== 'aborted') job.status = 'done'; + + if (job.status !== 'aborted') { + job.status = 'done'; + await appendJobLog(jobId, 'info', 'Job abgeschlossen', { done: job.done, total: job.total, errors: job.errors.length }); + } else { + await appendJobLog(jobId, 'warn', 'Job abgebrochen', { done: job.done, total: job.total }); + } } catch (e) { job.status = 'failed'; job.errors.push({ error: e.message || String(e) }); + await appendJobLog(jobId, 'error', 'Job fehlgeschlagen', { error: e.message || String(e) }); } finally { job.updated = Date.now(); - job.currentController = null; + generationJobControllers.delete(job.id); if (activeGenerationJobId === job.id) activeGenerationJobId = null; } })();