feat(diarization-ui): run upload/analysis as background jobs with max 2 workers and add jobs page

This commit is contained in:
2026-03-21 15:00:21 +01:00
parent 49f597f612
commit 648a24303a

248
app.py
View File

@@ -1,7 +1,10 @@
import json
import os
import sqlite3
import threading
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from pathlib import Path
from typing import Optional
import markdown as md
@@ -16,6 +19,10 @@ DB_PATH = os.getenv("DB_PATH", "/data/ui.db")
app = FastAPI(title="Diarization UI")
JOB_DIR = Path(os.getenv("JOB_DIR", "/data/jobs"))
EXECUTOR = ThreadPoolExecutor(max_workers=2)
JOB_LOCK = threading.Lock()
def db():
conn = sqlite3.connect(DB_PATH)
@@ -68,6 +75,25 @@ def init_db():
)
"""
)
c.execute(
"""
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
kind TEXT NOT NULL, -- upload|analysis
status TEXT NOT NULL, -- queued|running|done|error
project_id INTEGER,
document_id INTEGER,
prompt_id INTEGER,
title TEXT,
file_path TEXT,
error TEXT,
result_document_id INTEGER,
created_at TEXT NOT NULL,
started_at TEXT,
finished_at TEXT
)
"""
)
# defaults
c.execute("INSERT OR IGNORE INTO projects(name, created_at) VALUES (?,?)", ("Default", now_iso()))
@@ -174,6 +200,7 @@ window.uiSelect = async function(title, options, placeholder='') {{
<a href='/library'><span>🗂️</span><span>Datenbank</span></a>
<a href='/prompts'><span>🧩</span><span>Prompts</span></a>
<a href='/run'><span>🤖</span><span>Prompt ausführen</span></a>
<a href='/jobs'><span>⏳</span><span>Hintergrundjobs</span></a>
<a href='/healthz'><span>💚</span><span>Health</span></a>
</nav>
<main>{body}</main>
@@ -197,9 +224,129 @@ def get_prompts():
return c.execute("SELECT id,name,prompt FROM prompts ORDER BY name").fetchall()
def _job_set(job_id: int, **fields):
if not fields:
return
cols = ", ".join([f"{k}=?" for k in fields.keys()])
vals = list(fields.values()) + [job_id]
with db() as c:
c.execute(f"UPDATE jobs SET {cols} WHERE id=?", vals)
def _process_upload_job(job_id: int):
with db() as c:
j = c.execute("SELECT * FROM jobs WHERE id=?", (job_id,)).fetchone()
if not j:
return
_job_set(job_id, status="running", started_at=now_iso())
try:
p = Path(j["file_path"])
data = p.read_bytes()
filename = p.name
files = {"file": (filename, data, "application/octet-stream")}
r = requests.post(f"{API_BASE}/transcribe-diarize", files=files, timeout=1800)
r.raise_for_status()
payload = r.json()
content_md = payload.get("formatted_text", "")
doc_title = (j["title"] or "").strip() or filename
with db() as c:
cur = c.execute(
"INSERT INTO documents(project_id, kind, title, content_md, raw_json, created_at) VALUES (?,?,?,?,?,?)",
(j["project_id"], "transcript", doc_title, content_md, json.dumps(payload, ensure_ascii=False), now_iso()),
)
new_doc_id = cur.lastrowid
_job_set(job_id, status="done", result_document_id=new_doc_id, finished_at=now_iso())
except Exception as e:
_job_set(job_id, status="error", error=str(e), finished_at=now_iso())
def _process_analysis_job(job_id: int):
with db() as c:
j = c.execute("SELECT * FROM jobs WHERE id=?", (job_id,)).fetchone()
if not j:
return
_job_set(job_id, status="running", started_at=now_iso())
try:
with db() as c:
doc = c.execute("SELECT * FROM documents WHERE id=?", (j["document_id"],)).fetchone()
prm = c.execute("SELECT * FROM prompts WHERE id=?", (j["prompt_id"],)).fetchone()
if not doc or not prm:
raise RuntimeError("Dokument oder Prompt nicht gefunden")
llm_prompt = (
"Du bist ein präziser Assistent. Antworte auf Deutsch.\\n"
f"AUFTRAG:\\n{prm['prompt']}\\n\\n"
f"TEXT:\\n{doc['content_md']}\\n"
)
r = requests.post(
f"{OLLAMA_BASE_URL}/api/generate",
json={"model": OLLAMA_MODEL, "prompt": llm_prompt, "stream": False},
timeout=1200,
)
r.raise_for_status()
answer = r.json().get("response", "")
with db() as c:
cur = c.execute(
"""
INSERT INTO documents(project_id, kind, title, content_md, source_document_id, prompt_id, raw_json, created_at)
VALUES (?,?,?,?,?,?,?,?)
""",
(
doc["project_id"],
"analysis",
f"Analyse: {prm['name']} · {doc['title']}",
answer,
doc["id"],
prm["id"],
json.dumps({"ollama_response": r.json()}, ensure_ascii=False),
now_iso(),
),
)
new_doc_id = cur.lastrowid
_job_set(job_id, status="done", result_document_id=new_doc_id, finished_at=now_iso())
except Exception as e:
_job_set(job_id, status="error", error=str(e), finished_at=now_iso())
def enqueue_job(kind: str, **kwargs) -> int:
with JOB_LOCK:
with db() as c:
cur = c.execute(
"""
INSERT INTO jobs(kind,status,project_id,document_id,prompt_id,title,file_path,created_at)
VALUES (?,?,?,?,?,?,?,?)
""",
(
kind,
"queued",
kwargs.get("project_id"),
kwargs.get("document_id"),
kwargs.get("prompt_id"),
kwargs.get("title"),
kwargs.get("file_path"),
now_iso(),
),
)
job_id = cur.lastrowid
if kind == "upload":
EXECUTOR.submit(_process_upload_job, job_id)
elif kind == "analysis":
EXECUTOR.submit(_process_analysis_job, job_id)
return job_id
@app.on_event("startup")
def startup():
init_db()
JOB_DIR.mkdir(parents=True, exist_ok=True)
@app.get("/manifest.webmanifest")
@@ -300,23 +447,18 @@ async def upload(project_id: int = Form(...), title: str = Form(""), file: Uploa
if not data:
raise HTTPException(400, "Leere Datei")
files = {"file": (file.filename or "audio.bin", data, file.content_type or "application/octet-stream")}
r = requests.post(f"{API_BASE}/transcribe-diarize", files=files, timeout=1800)
if r.status_code >= 400:
raise HTTPException(r.status_code, r.text)
JOB_DIR.mkdir(parents=True, exist_ok=True)
filename = (file.filename or "audio.bin").replace("/", "_")
temp_path = JOB_DIR / f"{now_iso().replace(':','-')}_{filename}"
temp_path.write_bytes(data)
payload = r.json()
content_md = payload.get("formatted_text", "")
doc_title = (title or "").strip() or (file.filename or "Transkript")
with db() as c:
cur = c.execute(
"INSERT INTO documents(project_id, kind, title, content_md, raw_json, created_at) VALUES (?,?,?,?,?,?)",
(project_id, "transcript", doc_title, content_md, json.dumps(payload, ensure_ascii=False), now_iso()),
job_id = enqueue_job(
"upload",
project_id=project_id,
title=(title or "").strip() or filename,
file_path=str(temp_path),
)
doc_id = cur.lastrowid
return HTMLResponse(f"<meta http-equiv='refresh' content='0; url=/document/{doc_id}'>")
return HTMLResponse(f"<meta http-equiv='refresh' content='0; url=/jobs?queued={job_id}'>")
@app.get("/library", response_class=HTMLResponse)
@@ -565,6 +707,41 @@ def prompt_delete(prompt_id: int):
return HTMLResponse("<meta http-equiv='refresh' content='0; url=/prompts'>")
@app.get("/jobs", response_class=HTMLResponse)
def jobs_page(queued: Optional[int] = None):
with db() as c:
jobs = c.execute(
"""
SELECT j.*, p.name AS project_name, d.title AS document_title, pr.name AS prompt_name
FROM jobs j
LEFT JOIN projects p ON p.id=j.project_id
LEFT JOIN documents d ON d.id=j.document_id
LEFT JOIN prompts pr ON pr.id=j.prompt_id
ORDER BY j.id DESC LIMIT 200
"""
).fetchall()
rows = "".join([
f"<div class='card'><b>Job #{j['id']}</b> [{j['kind']}] · <b>{j['status']}</b><br>"
f"<small>{j['created_at']}</small><br>"
f"{('Projekt: '+str(j['project_name'])+'<br>') if j['project_name'] else ''}"
f"{('Dokument: '+str(j['document_title'])+'<br>') if j['document_title'] else ''}"
f"{('Prompt: '+str(j['prompt_name'])+'<br>') if j['prompt_name'] else ''}"
f"{('Fehler: <pre>'+str(j['error']).replace('<','&lt;')+'</pre>') if j['error'] else ''}"
f"{(f'<a href=\'/document/{j['result_document_id']}\'>Ergebnis öffnen</a>') if j['result_document_id'] else ''}"
f"</div>" for j in jobs
])
notice = f"<p><b>Job #{queued} wurde eingereiht.</b></p>" if queued else ""
body = f"""
<h2>Hintergrundverarbeitung</h2>
<p class='hint'>Maximal 2 Jobs laufen gleichzeitig.</p>
{notice}
{rows or '<p>Keine Jobs.</p>'}
"""
return layout("Jobs", body)
@app.get("/run", response_class=HTMLResponse)
def run_page():
with db() as c:
@@ -590,43 +767,10 @@ def run_page():
@app.post("/run", response_class=HTMLResponse)
def run_prompt(document_id: int = Form(...), prompt_id: int = Form(...)):
with db() as c:
doc = c.execute("SELECT * FROM documents WHERE id=?", (document_id,)).fetchone()
prm = c.execute("SELECT * FROM prompts WHERE id=?", (prompt_id,)).fetchone()
doc = c.execute("SELECT id FROM documents WHERE id=?", (document_id,)).fetchone()
prm = c.execute("SELECT id FROM prompts WHERE id=?", (prompt_id,)).fetchone()
if not doc or not prm:
raise HTTPException(404, "Dokument oder Prompt nicht gefunden")
llm_prompt = (
"Du bist ein präziser Assistent. Antworte auf Deutsch.\n"
f"AUFTRAG:\n{prm['prompt']}\n\n"
f"TEXT:\n{doc['content_md']}\n"
)
r = requests.post(
f"{OLLAMA_BASE_URL}/api/generate",
json={"model": OLLAMA_MODEL, "prompt": llm_prompt, "stream": False},
timeout=1200,
)
if r.status_code >= 400:
raise HTTPException(r.status_code, r.text)
answer = r.json().get("response", "")
with db() as c:
cur = c.execute(
"""
INSERT INTO documents(project_id, kind, title, content_md, source_document_id, prompt_id, raw_json, created_at)
VALUES (?,?,?,?,?,?,?,?)
""",
(
doc["project_id"],
"analysis",
f"Analyse: {prm['name']} · {doc['title']}",
answer,
doc["id"],
prm["id"],
json.dumps({"ollama_response": r.json()}, ensure_ascii=False),
now_iso(),
),
)
new_id = cur.lastrowid
return HTMLResponse(f"<meta http-equiv='refresh' content='0; url=/document/{new_id}'>")
job_id = enqueue_job("analysis", document_id=document_id, prompt_id=prompt_id)
return HTMLResponse(f"<meta http-equiv='refresh' content='0; url=/jobs?queued={job_id}'>")