import json import os import sqlite3 import threading from concurrent.futures import ThreadPoolExecutor from datetime import datetime from pathlib import Path from typing import List, Optional import markdown as md import requests from fastapi import FastAPI, File, Form, HTTPException, UploadFile from fastapi.responses import HTMLResponse, PlainTextResponse, Response, JSONResponse, RedirectResponse API_BASE = os.getenv("API_BASE", "http://gx10.aquantico.lan:8093").rstrip("/") OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://gx10.aquantico.lan:11434").rstrip("/") OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen3.5:9b") DB_PATH = os.getenv("DB_PATH", "/data/ui.db") app = FastAPI(title="Diarization UI") JOB_DIR = Path(os.getenv("JOB_DIR", "/data/jobs")) EXECUTOR = ThreadPoolExecutor(max_workers=2) JOB_LOCK = threading.Lock() def db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def now_iso() -> str: return datetime.utcnow().isoformat() def init_db(): os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) with db() as c: c.execute( """ CREATE TABLE IF NOT EXISTS projects ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE NOT NULL, created_at TEXT NOT NULL ) """ ) c.execute( """ CREATE TABLE IF NOT EXISTS prompts ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE NOT NULL, prompt TEXT NOT NULL, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """ ) c.execute( """ CREATE TABLE IF NOT EXISTS documents ( id INTEGER PRIMARY KEY AUTOINCREMENT, project_id INTEGER NOT NULL, kind TEXT NOT NULL, -- transcript|analysis title TEXT NOT NULL, content_md TEXT NOT NULL, source_document_id INTEGER, prompt_id INTEGER, raw_json TEXT, created_at TEXT NOT NULL, FOREIGN KEY(project_id) REFERENCES projects(id), FOREIGN KEY(source_document_id) REFERENCES documents(id), FOREIGN KEY(prompt_id) REFERENCES prompts(id) ) """ ) c.execute( """ CREATE TABLE IF NOT EXISTS jobs ( id INTEGER PRIMARY KEY AUTOINCREMENT, kind TEXT NOT NULL, -- upload|analysis status TEXT NOT NULL, -- queued|running|done|error project_id INTEGER, document_id INTEGER, prompt_id INTEGER, title TEXT, file_path TEXT, error TEXT, result_document_id INTEGER, created_at TEXT NOT NULL, started_at TEXT, finished_at TEXT ) """ ) # defaults c.execute("INSERT OR IGNORE INTO projects(name, created_at) VALUES (?,?)", ("Default", now_iso())) c.execute( "INSERT OR IGNORE INTO prompts(name, prompt, created_at, updated_at) VALUES (?,?,?,?)", ( "Zusammenfassung", "Erstelle eine prägnante Zusammenfassung des Gesprächs in Stichpunkten.", now_iso(), now_iso(), ), ) c.execute( "INSERT OR IGNORE INTO prompts(name, prompt, created_at, updated_at) VALUES (?,?,?,?)", ( "Aufgaben", "Extrahiere alle Aufgaben. Gib pro Aufgabe: Verantwortlich, Aufgabe, Deadline (falls vorhanden), Priorität.", now_iso(), now_iso(), ), ) def layout(title: str, body: str) -> str: return f""" {title}

{title}

{body}
""" def get_projects(): with db() as c: return c.execute("SELECT id,name FROM projects ORDER BY name").fetchall() def get_project_name(project_id: int) -> str: with db() as c: r = c.execute("SELECT name FROM projects WHERE id=?", (project_id,)).fetchone() return r[0] if r else "" def get_prompts(): with db() as c: return c.execute("SELECT id,name,prompt FROM prompts ORDER BY name").fetchall() def _job_get(job_id: int): with db() as c: return c.execute("SELECT * FROM jobs WHERE id=?", (job_id,)).fetchone() def _job_set(job_id: int, **fields): if not fields: return cols = ", ".join([f"{k}=?" for k in fields.keys()]) vals = list(fields.values()) + [job_id] with db() as c: c.execute(f"UPDATE jobs SET {cols} WHERE id=?", vals) def _process_upload_job(job_id: int): j = _job_get(job_id) if not j: return if j["status"] == "cancelled": return _job_set(job_id, status="running", started_at=now_iso()) try: j = _job_get(job_id) if not j or j["status"] == "cancelled": return p = Path(j["file_path"]) data = p.read_bytes() filename = p.name files = {"file": (filename, data, "application/octet-stream")} r = requests.post(f"{API_BASE}/transcribe-diarize", files=files, timeout=1800) r.raise_for_status() payload = r.json() j = _job_get(job_id) if not j or j["status"] == "cancelled": return content_md = payload.get("formatted_text", "") doc_title = (j["title"] or "").strip() or filename with db() as c: cur = c.execute( "INSERT INTO documents(project_id, kind, title, content_md, raw_json, created_at) VALUES (?,?,?,?,?,?)", (j["project_id"], "transcript", doc_title, content_md, json.dumps(payload, ensure_ascii=False), now_iso()), ) new_doc_id = cur.lastrowid _job_set(job_id, status="done", result_document_id=new_doc_id, finished_at=now_iso()) except Exception as e: _job_set(job_id, status="error", error=str(e), finished_at=now_iso()) def _process_analysis_job(job_id: int): j = _job_get(job_id) if not j: return if j["status"] == "cancelled": return _job_set(job_id, status="running", started_at=now_iso()) try: j = _job_get(job_id) if not j or j["status"] == "cancelled": return with db() as c: doc = c.execute("SELECT * FROM documents WHERE id=?", (j["document_id"],)).fetchone() prm = c.execute("SELECT * FROM prompts WHERE id=?", (j["prompt_id"],)).fetchone() if not doc or not prm: raise RuntimeError("Dokument oder Prompt nicht gefunden") llm_prompt = ( "Du bist ein präziser Assistent. Antworte auf Deutsch.\\n" f"AUFTRAG:\\n{prm['prompt']}\\n\\n" f"TEXT:\\n{doc['content_md']}\\n" ) r = requests.post( f"{OLLAMA_BASE_URL}/api/generate", json={"model": OLLAMA_MODEL, "prompt": llm_prompt, "stream": False}, timeout=1200, ) r.raise_for_status() answer = r.json().get("response", "") j = _job_get(job_id) if not j or j["status"] == "cancelled": return with db() as c: cur = c.execute( """ INSERT INTO documents(project_id, kind, title, content_md, source_document_id, prompt_id, raw_json, created_at) VALUES (?,?,?,?,?,?,?,?) """, ( doc["project_id"], "analysis", f"Analyse: {prm['name']} · {doc['title']}", answer, doc["id"], prm["id"], json.dumps({"ollama_response": r.json()}, ensure_ascii=False), now_iso(), ), ) new_doc_id = cur.lastrowid _job_set(job_id, status="done", result_document_id=new_doc_id, finished_at=now_iso()) except Exception as e: _job_set(job_id, status="error", error=str(e), finished_at=now_iso()) def enqueue_job(kind: str, **kwargs) -> int: with JOB_LOCK: with db() as c: cur = c.execute( """ INSERT INTO jobs(kind,status,project_id,document_id,prompt_id,title,file_path,created_at) VALUES (?,?,?,?,?,?,?,?) """, ( kind, "queued", kwargs.get("project_id"), kwargs.get("document_id"), kwargs.get("prompt_id"), kwargs.get("title"), kwargs.get("file_path"), now_iso(), ), ) job_id = cur.lastrowid if kind == "upload": EXECUTOR.submit(_process_upload_job, job_id) elif kind == "analysis": EXECUTOR.submit(_process_analysis_job, job_id) return job_id @app.on_event("startup") def startup(): init_db() JOB_DIR.mkdir(parents=True, exist_ok=True) @app.get("/manifest.webmanifest") def manifest(): return { "name": "VoiceLog", "short_name": "VoiceLog", "start_url": "/", "display": "standalone", "background_color": "#020617", "theme_color": "#0f172a", "icons": [ {"src": "/icon.svg", "sizes": "any", "type": "image/svg+xml", "purpose": "any maskable"} ], } @app.get("/icon.svg") def icon_svg(): svg = """🎙️""" return Response(content=svg, media_type="image/svg+xml") @app.get("/sw.js") def sw_js(): js = """ self.addEventListener('install', (event) => { self.skipWaiting(); }); self.addEventListener('activate', (event) => { event.waitUntil(clients.claim()); }); self.addEventListener('fetch', (event) => { if (event.request.method !== 'GET') return; event.respondWith(fetch(event.request).catch(() => new Response('offline', {status: 503}))); }); """ return Response(content=js, media_type="application/javascript") @app.get("/healthz") def healthz(): return {"ok": True, "api_base": API_BASE, "ollama_base_url": OLLAMA_BASE_URL, "ollama_model": OLLAMA_MODEL, "db_path": DB_PATH} @app.get("/", response_class=HTMLResponse) def upload_page(msg: str = ""): projects = get_projects() opts = "".join([f"" for p in projects]) body = f"""

Audio Upload

Audio wird transkribiert + mit Sprechern angereichert und als Dokument gespeichert.

{f"

{msg}

" if msg else ""}
""" return layout("Upload", body) @app.post("/projects", response_class=HTMLResponse) def add_project(name: str = Form(...)): with db() as c: c.execute("INSERT INTO projects(name, created_at) VALUES (?,?)", (name.strip(), now_iso())) return HTMLResponse("") @app.post("/projects/update", response_class=HTMLResponse) def rename_project(id: int = Form(...), name: str = Form(...)): with db() as c: c.execute("UPDATE projects SET name=? WHERE id=?", (name.strip(), id)) return HTMLResponse("") @app.post("/projects/{project_id}/delete", response_class=HTMLResponse) def delete_project(project_id: int): with db() as c: default = c.execute("SELECT id FROM projects WHERE name='Default'").fetchone() if not default: c.execute("INSERT INTO projects(name, created_at) VALUES (?,?)", ("Default", now_iso())) default_id = c.execute("SELECT id FROM projects WHERE name='Default'").fetchone()[0] else: default_id = default[0] if project_id == default_id: return HTMLResponse("") c.execute("UPDATE documents SET project_id=? WHERE project_id=?", (default_id, project_id)) c.execute("DELETE FROM projects WHERE id=?", (project_id,)) return HTMLResponse("") @app.post("/upload", response_class=HTMLResponse) async def upload(project_id: int = Form(...), title: str = Form(""), file: UploadFile = File(...)): data = await file.read() if not data: raise HTTPException(400, "Leere Datei") JOB_DIR.mkdir(parents=True, exist_ok=True) filename = (file.filename or "audio.bin").replace("/", "_") temp_path = JOB_DIR / f"{now_iso().replace(':','-')}_{filename}" temp_path.write_bytes(data) job_id = enqueue_job( "upload", project_id=project_id, title=(title or "").strip() or filename, file_path=str(temp_path), ) return HTMLResponse(f"") @app.get("/library", response_class=HTMLResponse) def library(project_id: Optional[str] = None, q_title: str = "", q_content: str = ""): title_q = (q_title or "").strip() content_q = (q_content or "").strip() project_id_int = int(project_id) if (project_id and str(project_id).strip()) else None where = [] params = [] if project_id_int: where.append("d.project_id=?") params.append(project_id_int) if title_q: where.append("LOWER(d.title) LIKE LOWER(?)") params.append(f"%{title_q}%") if content_q: where.append("LOWER(d.content_md) LIKE LOWER(?)") params.append(f"%{content_q}%") where_sql = ("WHERE " + " AND ".join(where)) if where else "" with db() as c: projects = c.execute("SELECT id,name FROM projects ORDER BY name").fetchall() docs = c.execute( f""" SELECT d.id,d.kind,d.title,d.created_at,p.name AS project FROM documents d JOIN projects p ON p.id=d.project_id {where_sql} ORDER BY d.id DESC LIMIT 500 """, tuple(params), ).fetchall() p_opts = "" + "".join( [f"" for p in projects] ) rows = "".join( [ f"" f"" f"#{d['id']}" f"{d['title']}" f"{d['kind']}" f"{d['project']}" f"{d['created_at']}" f"" f"👁️" f"⬇️" f"✏️" f"📁" f"🗑️" f"" f"" for d in docs ] ) project_js = json.dumps([{"value": p["id"], "label": p["name"]} for p in projects], ensure_ascii=False) body = f"""

Projekte · Dokumente

Ansicht im Projektlisten-Stil mit Schnellaktionen.
{len(docs)} Treffer
0 ausgewählt
{rows or ""}
IDTitelTypProjektErstelltAktionen
Keine Einträge.
""" return layout("Library", body) @app.get("/document/{doc_id}", response_class=HTMLResponse) def view_document(doc_id: int): with db() as c: d = c.execute( """ SELECT d.*, p.name AS project, pr.name AS prompt_name FROM documents d JOIN projects p ON p.id=d.project_id LEFT JOIN prompts pr ON pr.id=d.prompt_id WHERE d.id=? """, (doc_id,), ).fetchone() if not d: raise HTTPException(404, "not found") rendered = md.markdown(d["content_md"] or "", extensions=["fenced_code", "tables", "nl2br"]) projects = get_projects() body = f"""

Dokument #{d['id']} – {d['title']}

Projekt: {d['project']} · Typ: {d['kind']} · {d['created_at']}    ⬇️ ✏️ 📁 🗑️

{rendered}
""" return layout("Dokument", body) @app.get("/document/{doc_id}/download.md", response_class=PlainTextResponse) def download_md(doc_id: int): with db() as c: d = c.execute("SELECT title,content_md FROM documents WHERE id=?", (doc_id,)).fetchone() if not d: raise HTTPException(404, "not found") base = (d["title"] or f"document_{doc_id}").strip() safe = "".join(ch if ch.isalnum() or ch in ("-", "_", " ") else "_" for ch in base).strip() safe = safe.replace(" ", "_") or f"document_{doc_id}" filename = f"{safe}.md" return PlainTextResponse( d["content_md"], headers={"Content-Disposition": f"attachment; filename={filename}"}, ) @app.post("/document/{doc_id}/rename", response_class=HTMLResponse) def rename_document(doc_id: int, title: str = Form(...)): with db() as c: c.execute("UPDATE documents SET title=? WHERE id=?", (title.strip(), doc_id)) return HTMLResponse("") @app.post("/document/{doc_id}/move", response_class=HTMLResponse) def move_document(doc_id: int, project_id: int = Form(...)): with db() as c: c.execute("UPDATE documents SET project_id=? WHERE id=?", (project_id, doc_id)) return HTMLResponse("") @app.post("/document/{doc_id}/delete", response_class=HTMLResponse) def delete_document(doc_id: int): with db() as c: c.execute("DELETE FROM documents WHERE id=?", (doc_id,)) return HTMLResponse("") @app.post("/documents/bulk-move", response_class=HTMLResponse) def bulk_move_documents(ids: List[int] = Form(...), project_id: int = Form(...)): with db() as c: c.executemany("UPDATE documents SET project_id=? WHERE id=?", [(project_id, i) for i in ids]) return HTMLResponse("") @app.post("/documents/bulk-delete", response_class=HTMLResponse) def bulk_delete_documents(ids: List[int] = Form(...)): with db() as c: c.executemany("DELETE FROM documents WHERE id=?", [(i,) for i in ids]) return HTMLResponse("") @app.get("/prompts", response_class=HTMLResponse) def prompts_page(): with db() as c: prompts = c.execute("SELECT * FROM prompts ORDER BY name").fetchall() projects = c.execute("SELECT id,name FROM projects ORDER BY name").fetchall() p_list = "".join( [ f"
{p['name']}
{(p['prompt'] or '').replace('<','<')}
" f"


" f"
" f"
" for p in prompts ] ) project_opts = "".join([f"" for p in projects]) project_list = "".join([ f"
{p['name']}" f"
" f"
" f"
" f"
" for p in projects ]) body = f"""

Prompt-Konfiguration

Neuer Prompt



Neues Projekt

{project_opts}

Projekte

{project_list}

Prompts

{p_list} """ return layout("Prompts", body) @app.post("/prompts/add", response_class=HTMLResponse) def prompt_add(name: str = Form(...), prompt: str = Form(...)): with db() as c: c.execute( "INSERT INTO prompts(name,prompt,created_at,updated_at) VALUES (?,?,?,?)", (name.strip(), prompt.strip(), now_iso(), now_iso()), ) return HTMLResponse("") @app.post("/prompts/update", response_class=HTMLResponse) def prompt_update(id: int = Form(...), name: str = Form(...), prompt: str = Form(...)): with db() as c: c.execute("UPDATE prompts SET name=?, prompt=?, updated_at=? WHERE id=?", (name.strip(), prompt.strip(), now_iso(), id)) return HTMLResponse("") @app.post("/prompts/{prompt_id}/delete", response_class=HTMLResponse) def prompt_delete(prompt_id: int): with db() as c: c.execute("DELETE FROM prompts WHERE id=?", (prompt_id,)) return HTMLResponse("") def _parse_utcish(ts: Optional[str]) -> Optional[datetime]: if not ts: return None try: return datetime.fromisoformat(str(ts).replace("Z", "+00:00")).replace(tzinfo=None) except Exception: try: return datetime.fromisoformat(str(ts)) except Exception: return None def _fmt_elapsed(start_iso: Optional[str], end_iso: Optional[str] = None) -> str: s = _parse_utcish(start_iso) if not s: return "-" try: e = _parse_utcish(end_iso) if end_iso else datetime.utcnow() if not e: e = datetime.utcnow() sec = max(0, int((e - s).total_seconds())) if sec < 60: return f"{sec}s" m, s2 = divmod(sec, 60) if m < 60: return f"{m}m {s2}s" h, m2 = divmod(m, 60) return f"{h}h {m2}m" except Exception: return "-" def _jobs_payload(limit: int = 200): with db() as c: jobs = c.execute( """ SELECT j.*, p.name AS project_name, d.title AS document_title, pr.name AS prompt_name FROM jobs j LEFT JOIN projects p ON p.id=j.project_id LEFT JOIN documents d ON d.id=j.document_id LEFT JOIN prompts pr ON pr.id=j.prompt_id ORDER BY j.id DESC LIMIT ? """, (limit,), ).fetchall() return [dict(j) for j in jobs] @app.get("/jobs/data") def jobs_data(limit: int = 200): return {"items": _jobs_payload(limit)} @app.post("/jobs/{job_id}/cancel") def jobs_cancel(job_id: int): j = _job_get(job_id) if not j: raise HTTPException(404, "job not found") if j["status"] not in ("done", "error", "cancelled"): _job_set(job_id, status="cancelled", finished_at=now_iso(), error="Cancelled by user") return {"ok": True, "status": "cancelled"} @app.post("/jobs/{job_id}/cancel-form") def jobs_cancel_form(job_id: int): jobs_cancel(job_id) return RedirectResponse(url="/jobs", status_code=303) @app.post("/jobs/{job_id}/delete") def jobs_delete(job_id: int): with db() as c: c.execute("DELETE FROM jobs WHERE id=?", (job_id,)) return {"ok": True} @app.post("/jobs/{job_id}/delete-form") def jobs_delete_form(job_id: int): jobs_delete(job_id) return RedirectResponse(url="/jobs", status_code=303) @app.get("/jobs", response_class=HTMLResponse) def jobs_page(queued: Optional[int] = None): items = _jobs_payload(200) pre = "".join([ ( f"
Job #{it['id']} [{it['kind']}] · {it['status']} · läuft: {_fmt_elapsed(it.get('started_at') or it.get('created_at'), it.get('finished_at'))}
" f"{it['created_at']}
" f"
" + ( "" if it["status"] in ("done", "error", "cancelled") else ( f"
" f"
" ) ) + ( f"
" f"
" ) + ( f"Ergebnis öffnen" if it.get("result_document_id") else "" ) + "
" ) for it in items ]) or "

Keine Jobs.

" notice = f"

Job #{queued} wurde eingereiht.

" if queued else "" body = f"""

Hintergrundverarbeitung

Maximal 2 Jobs gleichzeitig. Seite aktualisiert automatisch.

{notice}
Live-Update aktiv …
{pre}
""" return layout("Jobs", body) @app.get("/run", response_class=HTMLResponse) def run_page(): with db() as c: docs = c.execute("SELECT id,title,kind,created_at FROM documents ORDER BY id DESC LIMIT 200").fetchall() prompts = c.execute("SELECT id,name FROM prompts ORDER BY name").fetchall() d_opts = "".join([f"" for d in docs]) p_opts = "".join([f"" for p in prompts]) body = f"""

Prompt ausführen







""" return layout("Run", body) @app.post("/run", response_class=HTMLResponse) def run_prompt(document_id: int = Form(...), prompt_id: int = Form(...)): with db() as c: doc = c.execute("SELECT id FROM documents WHERE id=?", (document_id,)).fetchone() prm = c.execute("SELECT id FROM prompts WHERE id=?", (prompt_id,)).fetchone() if not doc or not prm: raise HTTPException(404, "Dokument oder Prompt nicht gefunden") job_id = enqueue_job("analysis", document_id=document_id, prompt_id=prompt_id) return HTMLResponse(f"")