diff --git a/app.py b/app.py
index 54ad4f8..e359730 100644
--- a/app.py
+++ b/app.py
@@ -1,7 +1,10 @@
import json
import os
import sqlite3
+import threading
+from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
+from pathlib import Path
from typing import Optional
import markdown as md
@@ -16,6 +19,10 @@ DB_PATH = os.getenv("DB_PATH", "/data/ui.db")
app = FastAPI(title="Diarization UI")
+JOB_DIR = Path(os.getenv("JOB_DIR", "/data/jobs"))
+EXECUTOR = ThreadPoolExecutor(max_workers=2)
+JOB_LOCK = threading.Lock()
+
def db():
conn = sqlite3.connect(DB_PATH)
@@ -68,6 +75,25 @@ def init_db():
)
"""
)
+ c.execute(
+ """
+ CREATE TABLE IF NOT EXISTS jobs (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ kind TEXT NOT NULL, -- upload|analysis
+ status TEXT NOT NULL, -- queued|running|done|error
+ project_id INTEGER,
+ document_id INTEGER,
+ prompt_id INTEGER,
+ title TEXT,
+ file_path TEXT,
+ error TEXT,
+ result_document_id INTEGER,
+ created_at TEXT NOT NULL,
+ started_at TEXT,
+ finished_at TEXT
+ )
+ """
+ )
# defaults
c.execute("INSERT OR IGNORE INTO projects(name, created_at) VALUES (?,?)", ("Default", now_iso()))
@@ -174,6 +200,7 @@ window.uiSelect = async function(title, options, placeholder='') {{
🗂️Datenbank
🧩Prompts
🤖Prompt ausführen
+ ⏳Hintergrundjobs
💚Health
{body}
@@ -197,9 +224,129 @@ def get_prompts():
return c.execute("SELECT id,name,prompt FROM prompts ORDER BY name").fetchall()
+def _job_set(job_id: int, **fields):
+ if not fields:
+ return
+ cols = ", ".join([f"{k}=?" for k in fields.keys()])
+ vals = list(fields.values()) + [job_id]
+ with db() as c:
+ c.execute(f"UPDATE jobs SET {cols} WHERE id=?", vals)
+
+
+def _process_upload_job(job_id: int):
+ with db() as c:
+ j = c.execute("SELECT * FROM jobs WHERE id=?", (job_id,)).fetchone()
+ if not j:
+ return
+
+ _job_set(job_id, status="running", started_at=now_iso())
+ try:
+ p = Path(j["file_path"])
+ data = p.read_bytes()
+ filename = p.name
+ files = {"file": (filename, data, "application/octet-stream")}
+ r = requests.post(f"{API_BASE}/transcribe-diarize", files=files, timeout=1800)
+ r.raise_for_status()
+ payload = r.json()
+
+ content_md = payload.get("formatted_text", "")
+ doc_title = (j["title"] or "").strip() or filename
+ with db() as c:
+ cur = c.execute(
+ "INSERT INTO documents(project_id, kind, title, content_md, raw_json, created_at) VALUES (?,?,?,?,?,?)",
+ (j["project_id"], "transcript", doc_title, content_md, json.dumps(payload, ensure_ascii=False), now_iso()),
+ )
+ new_doc_id = cur.lastrowid
+
+ _job_set(job_id, status="done", result_document_id=new_doc_id, finished_at=now_iso())
+ except Exception as e:
+ _job_set(job_id, status="error", error=str(e), finished_at=now_iso())
+
+
+def _process_analysis_job(job_id: int):
+ with db() as c:
+ j = c.execute("SELECT * FROM jobs WHERE id=?", (job_id,)).fetchone()
+ if not j:
+ return
+
+ _job_set(job_id, status="running", started_at=now_iso())
+ try:
+ with db() as c:
+ doc = c.execute("SELECT * FROM documents WHERE id=?", (j["document_id"],)).fetchone()
+ prm = c.execute("SELECT * FROM prompts WHERE id=?", (j["prompt_id"],)).fetchone()
+ if not doc or not prm:
+ raise RuntimeError("Dokument oder Prompt nicht gefunden")
+
+ llm_prompt = (
+ "Du bist ein präziser Assistent. Antworte auf Deutsch.\\n"
+ f"AUFTRAG:\\n{prm['prompt']}\\n\\n"
+ f"TEXT:\\n{doc['content_md']}\\n"
+ )
+
+ r = requests.post(
+ f"{OLLAMA_BASE_URL}/api/generate",
+ json={"model": OLLAMA_MODEL, "prompt": llm_prompt, "stream": False},
+ timeout=1200,
+ )
+ r.raise_for_status()
+ answer = r.json().get("response", "")
+
+ with db() as c:
+ cur = c.execute(
+ """
+ INSERT INTO documents(project_id, kind, title, content_md, source_document_id, prompt_id, raw_json, created_at)
+ VALUES (?,?,?,?,?,?,?,?)
+ """,
+ (
+ doc["project_id"],
+ "analysis",
+ f"Analyse: {prm['name']} · {doc['title']}",
+ answer,
+ doc["id"],
+ prm["id"],
+ json.dumps({"ollama_response": r.json()}, ensure_ascii=False),
+ now_iso(),
+ ),
+ )
+ new_doc_id = cur.lastrowid
+
+ _job_set(job_id, status="done", result_document_id=new_doc_id, finished_at=now_iso())
+ except Exception as e:
+ _job_set(job_id, status="error", error=str(e), finished_at=now_iso())
+
+
+def enqueue_job(kind: str, **kwargs) -> int:
+ with JOB_LOCK:
+ with db() as c:
+ cur = c.execute(
+ """
+ INSERT INTO jobs(kind,status,project_id,document_id,prompt_id,title,file_path,created_at)
+ VALUES (?,?,?,?,?,?,?,?)
+ """,
+ (
+ kind,
+ "queued",
+ kwargs.get("project_id"),
+ kwargs.get("document_id"),
+ kwargs.get("prompt_id"),
+ kwargs.get("title"),
+ kwargs.get("file_path"),
+ now_iso(),
+ ),
+ )
+ job_id = cur.lastrowid
+
+ if kind == "upload":
+ EXECUTOR.submit(_process_upload_job, job_id)
+ elif kind == "analysis":
+ EXECUTOR.submit(_process_analysis_job, job_id)
+ return job_id
+
+
@app.on_event("startup")
def startup():
init_db()
+ JOB_DIR.mkdir(parents=True, exist_ok=True)
@app.get("/manifest.webmanifest")
@@ -300,23 +447,18 @@ async def upload(project_id: int = Form(...), title: str = Form(""), file: Uploa
if not data:
raise HTTPException(400, "Leere Datei")
- files = {"file": (file.filename or "audio.bin", data, file.content_type or "application/octet-stream")}
- r = requests.post(f"{API_BASE}/transcribe-diarize", files=files, timeout=1800)
- if r.status_code >= 400:
- raise HTTPException(r.status_code, r.text)
+ JOB_DIR.mkdir(parents=True, exist_ok=True)
+ filename = (file.filename or "audio.bin").replace("/", "_")
+ temp_path = JOB_DIR / f"{now_iso().replace(':','-')}_{filename}"
+ temp_path.write_bytes(data)
- payload = r.json()
- content_md = payload.get("formatted_text", "")
- doc_title = (title or "").strip() or (file.filename or "Transkript")
-
- with db() as c:
- cur = c.execute(
- "INSERT INTO documents(project_id, kind, title, content_md, raw_json, created_at) VALUES (?,?,?,?,?,?)",
- (project_id, "transcript", doc_title, content_md, json.dumps(payload, ensure_ascii=False), now_iso()),
- )
- doc_id = cur.lastrowid
-
- return HTMLResponse(f"")
+ job_id = enqueue_job(
+ "upload",
+ project_id=project_id,
+ title=(title or "").strip() or filename,
+ file_path=str(temp_path),
+ )
+ return HTMLResponse(f"")
@app.get("/library", response_class=HTMLResponse)
@@ -565,6 +707,41 @@ def prompt_delete(prompt_id: int):
return HTMLResponse("")
+@app.get("/jobs", response_class=HTMLResponse)
+def jobs_page(queued: Optional[int] = None):
+ with db() as c:
+ jobs = c.execute(
+ """
+ SELECT j.*, p.name AS project_name, d.title AS document_title, pr.name AS prompt_name
+ FROM jobs j
+ LEFT JOIN projects p ON p.id=j.project_id
+ LEFT JOIN documents d ON d.id=j.document_id
+ LEFT JOIN prompts pr ON pr.id=j.prompt_id
+ ORDER BY j.id DESC LIMIT 200
+ """
+ ).fetchall()
+
+ rows = "".join([
+ f"
Job #{j['id']} [{j['kind']}] ·
{j['status']}"
+ f"
{j['created_at']}"
+ f"{('Projekt: '+str(j['project_name'])+'
') if j['project_name'] else ''}"
+ f"{('Dokument: '+str(j['document_title'])+'
') if j['document_title'] else ''}"
+ f"{('Prompt: '+str(j['prompt_name'])+'
') if j['prompt_name'] else ''}"
+ f"{('Fehler:
'+str(j['error']).replace('<','<')+'') if j['error'] else ''}"
+ f"{(f'
Ergebnis öffnen') if j['result_document_id'] else ''}"
+ f"
" for j in jobs
+ ])
+
+ notice = f"Job #{queued} wurde eingereiht.
" if queued else ""
+ body = f"""
+Hintergrundverarbeitung
+Maximal 2 Jobs laufen gleichzeitig.
+{notice}
+{rows or 'Keine Jobs.
'}
+"""
+ return layout("Jobs", body)
+
+
@app.get("/run", response_class=HTMLResponse)
def run_page():
with db() as c:
@@ -590,43 +767,10 @@ def run_page():
@app.post("/run", response_class=HTMLResponse)
def run_prompt(document_id: int = Form(...), prompt_id: int = Form(...)):
with db() as c:
- doc = c.execute("SELECT * FROM documents WHERE id=?", (document_id,)).fetchone()
- prm = c.execute("SELECT * FROM prompts WHERE id=?", (prompt_id,)).fetchone()
+ doc = c.execute("SELECT id FROM documents WHERE id=?", (document_id,)).fetchone()
+ prm = c.execute("SELECT id FROM prompts WHERE id=?", (prompt_id,)).fetchone()
if not doc or not prm:
raise HTTPException(404, "Dokument oder Prompt nicht gefunden")
- llm_prompt = (
- "Du bist ein präziser Assistent. Antworte auf Deutsch.\n"
- f"AUFTRAG:\n{prm['prompt']}\n\n"
- f"TEXT:\n{doc['content_md']}\n"
- )
-
- r = requests.post(
- f"{OLLAMA_BASE_URL}/api/generate",
- json={"model": OLLAMA_MODEL, "prompt": llm_prompt, "stream": False},
- timeout=1200,
- )
- if r.status_code >= 400:
- raise HTTPException(r.status_code, r.text)
- answer = r.json().get("response", "")
-
- with db() as c:
- cur = c.execute(
- """
- INSERT INTO documents(project_id, kind, title, content_md, source_document_id, prompt_id, raw_json, created_at)
- VALUES (?,?,?,?,?,?,?,?)
- """,
- (
- doc["project_id"],
- "analysis",
- f"Analyse: {prm['name']} · {doc['title']}",
- answer,
- doc["id"],
- prm["id"],
- json.dumps({"ollama_response": r.json()}, ensure_ascii=False),
- now_iso(),
- ),
- )
- new_id = cur.lastrowid
-
- return HTMLResponse(f"")
+ job_id = enqueue_job("analysis", document_id=document_id, prompt_id=prompt_id)
+ return HTMLResponse(f"")