diff --git a/app.py b/app.py index c4f0f4f..e4ac8fd 100644 --- a/app.py +++ b/app.py @@ -22,6 +22,8 @@ app = FastAPI(title="Diarization UI") JOB_DIR = Path(os.getenv("JOB_DIR", "/data/jobs")) EXECUTOR = ThreadPoolExecutor(max_workers=2) JOB_LOCK = threading.Lock() +_JOB_STREAMS: dict = {} +_JOB_STREAM_LOCK = threading.Lock() def db(): @@ -100,6 +102,14 @@ def init_db(): c.execute("ALTER TABLE jobs ADD COLUMN user_prompt TEXT") except Exception: pass + try: + c.execute("ALTER TABLE jobs ADD COLUMN llm_prompt TEXT") + except Exception: + pass + try: + c.execute("ALTER TABLE jobs ADD COLUMN llm_response TEXT") + except Exception: + pass # defaults c.execute("INSERT OR IGNORE INTO projects(name, created_at) VALUES (?,?)", ("Default", now_iso())) @@ -357,13 +367,38 @@ def _process_analysis_job(job_id: int): + f"\\nTEXT:\\n{doc['content_md']}\\n" ) + _job_set(job_id, llm_prompt=llm_prompt) + with _JOB_STREAM_LOCK: + _JOB_STREAMS[job_id] = "" + r = requests.post( f"{OLLAMA_BASE_URL}/api/generate", - json={"model": OLLAMA_MODEL, "prompt": llm_prompt, "stream": False}, + json={"model": OLLAMA_MODEL, "prompt": llm_prompt, "stream": True}, + stream=True, timeout=1200, ) r.raise_for_status() - answer = r.json().get("response", "") + + accumulated = "" + ollama_final = {} + chunk_count = 0 + for line in r.iter_lines(): + if not line: + continue + chunk = json.loads(line) + accumulated += chunk.get("response", "") + with _JOB_STREAM_LOCK: + _JOB_STREAMS[job_id] = accumulated + chunk_count += 1 + if chunk_count % 100 == 0: + j_check = _job_get(job_id) + if not j_check or j_check["status"] == "cancelled": + return + if chunk.get("done"): + ollama_final = chunk + break + + answer = accumulated j = _job_get(job_id) if not j or j["status"] == "cancelled": @@ -382,15 +417,18 @@ def _process_analysis_job(job_id: int): answer, doc["id"], prm["id"], - json.dumps({"ollama_response": r.json()}, ensure_ascii=False), + json.dumps({"ollama_response": ollama_final}, ensure_ascii=False), now_iso(), ), ) new_doc_id = cur.lastrowid - _job_set(job_id, status="done", result_document_id=new_doc_id, finished_at=now_iso()) + _job_set(job_id, status="done", result_document_id=new_doc_id, finished_at=now_iso(), llm_response=answer) except Exception as e: _job_set(job_id, status="error", error=str(e), finished_at=now_iso()) + finally: + with _JOB_STREAM_LOCK: + _JOB_STREAMS.pop(job_id, None) def enqueue_job(kind: str, **kwargs) -> int: @@ -1258,6 +1296,22 @@ def jobs_delete_form(job_id: int): return RedirectResponse(url="/jobs", status_code=303) +@app.get("/jobs/{job_id}/debug-data") +def job_debug_data(job_id: int): + j = _job_get(job_id) + if not j: + raise HTTPException(404, "job not found") + with _JOB_STREAM_LOCK: + live = _JOB_STREAMS.get(job_id) + return { + "status": j["status"], + "kind": j["kind"], + "llm_prompt": j.get("llm_prompt"), + "llm_response": live if live is not None else j.get("llm_response"), + "streaming": live is not None, + } + + @app.get("/jobs", response_class=HTMLResponse) def jobs_page(queued: Optional[int] = None): items = _jobs_payload(200) @@ -1282,6 +1336,8 @@ def jobs_page(queued: Optional[int] = None): actions += f" " if it.get("result_document_id"): actions += f" Ergebnis" + if it["kind"] == "analysis": + actions += f" " err = f"
{str(it['error']).replace('<','<')}
" if it.get("error") else "" cards.append( @@ -1343,6 +1399,8 @@ function renderCard(it) {{ actions += ` `; if(it.result_document_id) actions += ` Ergebnis`; + if(it.kind === 'analysis') + actions += ` `; const err = it.error ? `
${{String(it.error).replace(/` : ''; const meta = [ it.project_name ? 'Projekt: '+it.project_name : '', @@ -1447,7 +1505,86 @@ setInterval(() => document.querySelectorAll('.elapsed').forEach(el => {{ }}), 1000); updateSelectionCount(); schedulePoll(3000); + +let _debugJobId = null; +let _debugPollTimer = null; +async function openDebug(jobId) {{ + _debugJobId = jobId; + document.getElementById('debug-job-id').textContent = jobId; + document.getElementById('debug-prompt-content').textContent = 'Lade …'; + document.getElementById('debug-response-content').textContent = 'Lade …'; + bootstrap.Modal.getOrCreateInstance(document.getElementById('debugModal')).show(); + await refreshDebug(); +}} +async function refreshDebug() {{ + if (!_debugJobId) return; + try {{ + const r = await fetch('/jobs/' + _debugJobId + '/debug-data'); + if (!r.ok) return; + const d = await r.json(); + if (d.kind !== 'analysis') {{ + document.getElementById('debug-prompt-content').textContent = 'Upload-Job – kein LLM-Prompt'; + document.getElementById('debug-response-content').textContent = '-'; + return; + }} + document.getElementById('debug-prompt-content').textContent = d.llm_prompt || '(kein Prompt gespeichert)'; + const respEl = document.getElementById('debug-response-content'); + const atBottom = respEl.scrollHeight - respEl.scrollTop <= respEl.clientHeight + 20; + respEl.textContent = d.llm_response || '(noch keine Antwort)'; + if (atBottom) respEl.scrollTop = respEl.scrollHeight; + const badge = document.getElementById('debug-streaming-badge'); + badge.classList.toggle('d-none', !d.streaming); + clearTimeout(_debugPollTimer); + if (d.streaming || d.status === 'running') {{ + _debugPollTimer = setTimeout(refreshDebug, 500); + }} + }} catch(e) {{ + clearTimeout(_debugPollTimer); + _debugPollTimer = setTimeout(refreshDebug, 2000); + }} +}} +document.addEventListener('DOMContentLoaded', () => {{ + const el = document.getElementById('debugModal'); + if (el) el.addEventListener('hide.bs.modal', () => {{ + _debugJobId = null; + clearTimeout(_debugPollTimer); + }}); +}}); + + """ return layout("Jobs", body)