From 648a24303a5319695384758cd858aabbedfb0028 Mon Sep 17 00:00:00 2001 From: OpenClaw Bot Date: Sat, 21 Mar 2026 15:00:21 +0100 Subject: [PATCH] feat(diarization-ui): run upload/analysis as background jobs with max 2 workers and add jobs page --- app.py | 250 +++++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 197 insertions(+), 53 deletions(-) diff --git a/app.py b/app.py index 54ad4f8..e359730 100644 --- a/app.py +++ b/app.py @@ -1,7 +1,10 @@ import json import os import sqlite3 +import threading +from concurrent.futures import ThreadPoolExecutor from datetime import datetime +from pathlib import Path from typing import Optional import markdown as md @@ -16,6 +19,10 @@ DB_PATH = os.getenv("DB_PATH", "/data/ui.db") app = FastAPI(title="Diarization UI") +JOB_DIR = Path(os.getenv("JOB_DIR", "/data/jobs")) +EXECUTOR = ThreadPoolExecutor(max_workers=2) +JOB_LOCK = threading.Lock() + def db(): conn = sqlite3.connect(DB_PATH) @@ -68,6 +75,25 @@ def init_db(): ) """ ) + c.execute( + """ + CREATE TABLE IF NOT EXISTS jobs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + kind TEXT NOT NULL, -- upload|analysis + status TEXT NOT NULL, -- queued|running|done|error + project_id INTEGER, + document_id INTEGER, + prompt_id INTEGER, + title TEXT, + file_path TEXT, + error TEXT, + result_document_id INTEGER, + created_at TEXT NOT NULL, + started_at TEXT, + finished_at TEXT + ) + """ + ) # defaults c.execute("INSERT OR IGNORE INTO projects(name, created_at) VALUES (?,?)", ("Default", now_iso())) @@ -174,6 +200,7 @@ window.uiSelect = async function(title, options, placeholder='') {{ 🗂️Datenbank 🧩Prompts 🤖Prompt ausführen + Hintergrundjobs 💚Health
{body}
@@ -197,9 +224,129 @@ def get_prompts(): return c.execute("SELECT id,name,prompt FROM prompts ORDER BY name").fetchall() +def _job_set(job_id: int, **fields): + if not fields: + return + cols = ", ".join([f"{k}=?" for k in fields.keys()]) + vals = list(fields.values()) + [job_id] + with db() as c: + c.execute(f"UPDATE jobs SET {cols} WHERE id=?", vals) + + +def _process_upload_job(job_id: int): + with db() as c: + j = c.execute("SELECT * FROM jobs WHERE id=?", (job_id,)).fetchone() + if not j: + return + + _job_set(job_id, status="running", started_at=now_iso()) + try: + p = Path(j["file_path"]) + data = p.read_bytes() + filename = p.name + files = {"file": (filename, data, "application/octet-stream")} + r = requests.post(f"{API_BASE}/transcribe-diarize", files=files, timeout=1800) + r.raise_for_status() + payload = r.json() + + content_md = payload.get("formatted_text", "") + doc_title = (j["title"] or "").strip() or filename + with db() as c: + cur = c.execute( + "INSERT INTO documents(project_id, kind, title, content_md, raw_json, created_at) VALUES (?,?,?,?,?,?)", + (j["project_id"], "transcript", doc_title, content_md, json.dumps(payload, ensure_ascii=False), now_iso()), + ) + new_doc_id = cur.lastrowid + + _job_set(job_id, status="done", result_document_id=new_doc_id, finished_at=now_iso()) + except Exception as e: + _job_set(job_id, status="error", error=str(e), finished_at=now_iso()) + + +def _process_analysis_job(job_id: int): + with db() as c: + j = c.execute("SELECT * FROM jobs WHERE id=?", (job_id,)).fetchone() + if not j: + return + + _job_set(job_id, status="running", started_at=now_iso()) + try: + with db() as c: + doc = c.execute("SELECT * FROM documents WHERE id=?", (j["document_id"],)).fetchone() + prm = c.execute("SELECT * FROM prompts WHERE id=?", (j["prompt_id"],)).fetchone() + if not doc or not prm: + raise RuntimeError("Dokument oder Prompt nicht gefunden") + + llm_prompt = ( + "Du bist ein präziser Assistent. Antworte auf Deutsch.\\n" + f"AUFTRAG:\\n{prm['prompt']}\\n\\n" + f"TEXT:\\n{doc['content_md']}\\n" + ) + + r = requests.post( + f"{OLLAMA_BASE_URL}/api/generate", + json={"model": OLLAMA_MODEL, "prompt": llm_prompt, "stream": False}, + timeout=1200, + ) + r.raise_for_status() + answer = r.json().get("response", "") + + with db() as c: + cur = c.execute( + """ + INSERT INTO documents(project_id, kind, title, content_md, source_document_id, prompt_id, raw_json, created_at) + VALUES (?,?,?,?,?,?,?,?) + """, + ( + doc["project_id"], + "analysis", + f"Analyse: {prm['name']} · {doc['title']}", + answer, + doc["id"], + prm["id"], + json.dumps({"ollama_response": r.json()}, ensure_ascii=False), + now_iso(), + ), + ) + new_doc_id = cur.lastrowid + + _job_set(job_id, status="done", result_document_id=new_doc_id, finished_at=now_iso()) + except Exception as e: + _job_set(job_id, status="error", error=str(e), finished_at=now_iso()) + + +def enqueue_job(kind: str, **kwargs) -> int: + with JOB_LOCK: + with db() as c: + cur = c.execute( + """ + INSERT INTO jobs(kind,status,project_id,document_id,prompt_id,title,file_path,created_at) + VALUES (?,?,?,?,?,?,?,?) + """, + ( + kind, + "queued", + kwargs.get("project_id"), + kwargs.get("document_id"), + kwargs.get("prompt_id"), + kwargs.get("title"), + kwargs.get("file_path"), + now_iso(), + ), + ) + job_id = cur.lastrowid + + if kind == "upload": + EXECUTOR.submit(_process_upload_job, job_id) + elif kind == "analysis": + EXECUTOR.submit(_process_analysis_job, job_id) + return job_id + + @app.on_event("startup") def startup(): init_db() + JOB_DIR.mkdir(parents=True, exist_ok=True) @app.get("/manifest.webmanifest") @@ -300,23 +447,18 @@ async def upload(project_id: int = Form(...), title: str = Form(""), file: Uploa if not data: raise HTTPException(400, "Leere Datei") - files = {"file": (file.filename or "audio.bin", data, file.content_type or "application/octet-stream")} - r = requests.post(f"{API_BASE}/transcribe-diarize", files=files, timeout=1800) - if r.status_code >= 400: - raise HTTPException(r.status_code, r.text) + JOB_DIR.mkdir(parents=True, exist_ok=True) + filename = (file.filename or "audio.bin").replace("/", "_") + temp_path = JOB_DIR / f"{now_iso().replace(':','-')}_{filename}" + temp_path.write_bytes(data) - payload = r.json() - content_md = payload.get("formatted_text", "") - doc_title = (title or "").strip() or (file.filename or "Transkript") - - with db() as c: - cur = c.execute( - "INSERT INTO documents(project_id, kind, title, content_md, raw_json, created_at) VALUES (?,?,?,?,?,?)", - (project_id, "transcript", doc_title, content_md, json.dumps(payload, ensure_ascii=False), now_iso()), - ) - doc_id = cur.lastrowid - - return HTMLResponse(f"") + job_id = enqueue_job( + "upload", + project_id=project_id, + title=(title or "").strip() or filename, + file_path=str(temp_path), + ) + return HTMLResponse(f"") @app.get("/library", response_class=HTMLResponse) @@ -565,6 +707,41 @@ def prompt_delete(prompt_id: int): return HTMLResponse("") +@app.get("/jobs", response_class=HTMLResponse) +def jobs_page(queued: Optional[int] = None): + with db() as c: + jobs = c.execute( + """ + SELECT j.*, p.name AS project_name, d.title AS document_title, pr.name AS prompt_name + FROM jobs j + LEFT JOIN projects p ON p.id=j.project_id + LEFT JOIN documents d ON d.id=j.document_id + LEFT JOIN prompts pr ON pr.id=j.prompt_id + ORDER BY j.id DESC LIMIT 200 + """ + ).fetchall() + + rows = "".join([ + f"
Job #{j['id']} [{j['kind']}] · {j['status']}
" + f"{j['created_at']}
" + f"{('Projekt: '+str(j['project_name'])+'
') if j['project_name'] else ''}" + f"{('Dokument: '+str(j['document_title'])+'
') if j['document_title'] else ''}" + f"{('Prompt: '+str(j['prompt_name'])+'
') if j['prompt_name'] else ''}" + f"{('Fehler:
'+str(j['error']).replace('<','<')+'
') if j['error'] else ''}" + f"{(f'Ergebnis öffnen') if j['result_document_id'] else ''}" + f"
" for j in jobs + ]) + + notice = f"

Job #{queued} wurde eingereiht.

" if queued else "" + body = f""" +

Hintergrundverarbeitung

+

Maximal 2 Jobs laufen gleichzeitig.

+{notice} +{rows or '

Keine Jobs.

'} +""" + return layout("Jobs", body) + + @app.get("/run", response_class=HTMLResponse) def run_page(): with db() as c: @@ -590,43 +767,10 @@ def run_page(): @app.post("/run", response_class=HTMLResponse) def run_prompt(document_id: int = Form(...), prompt_id: int = Form(...)): with db() as c: - doc = c.execute("SELECT * FROM documents WHERE id=?", (document_id,)).fetchone() - prm = c.execute("SELECT * FROM prompts WHERE id=?", (prompt_id,)).fetchone() + doc = c.execute("SELECT id FROM documents WHERE id=?", (document_id,)).fetchone() + prm = c.execute("SELECT id FROM prompts WHERE id=?", (prompt_id,)).fetchone() if not doc or not prm: raise HTTPException(404, "Dokument oder Prompt nicht gefunden") - llm_prompt = ( - "Du bist ein präziser Assistent. Antworte auf Deutsch.\n" - f"AUFTRAG:\n{prm['prompt']}\n\n" - f"TEXT:\n{doc['content_md']}\n" - ) - - r = requests.post( - f"{OLLAMA_BASE_URL}/api/generate", - json={"model": OLLAMA_MODEL, "prompt": llm_prompt, "stream": False}, - timeout=1200, - ) - if r.status_code >= 400: - raise HTTPException(r.status_code, r.text) - answer = r.json().get("response", "") - - with db() as c: - cur = c.execute( - """ - INSERT INTO documents(project_id, kind, title, content_md, source_document_id, prompt_id, raw_json, created_at) - VALUES (?,?,?,?,?,?,?,?) - """, - ( - doc["project_id"], - "analysis", - f"Analyse: {prm['name']} · {doc['title']}", - answer, - doc["id"], - prm["id"], - json.dumps({"ollama_response": r.json()}, ensure_ascii=False), - now_iso(), - ), - ) - new_id = cur.lastrowid - - return HTMLResponse(f"") + job_id = enqueue_job("analysis", document_id=document_id, prompt_id=prompt_id) + return HTMLResponse(f"")