import json
import os
import sqlite3
import threading
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from pathlib import Path
from typing import Optional
import markdown as md
import requests
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
from fastapi.responses import HTMLResponse, PlainTextResponse, Response, JSONResponse
API_BASE = os.getenv("API_BASE", "http://gx10.aquantico.lan:8093").rstrip("/")
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://gx10.aquantico.lan:11434").rstrip("/")
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen3.5:9b")
DB_PATH = os.getenv("DB_PATH", "/data/ui.db")
app = FastAPI(title="Diarization UI")
JOB_DIR = Path(os.getenv("JOB_DIR", "/data/jobs"))
EXECUTOR = ThreadPoolExecutor(max_workers=2)
JOB_LOCK = threading.Lock()
def db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def now_iso() -> str:
return datetime.utcnow().isoformat()
def init_db():
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
with db() as c:
c.execute(
"""
CREATE TABLE IF NOT EXISTS projects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
created_at TEXT NOT NULL
)
"""
)
c.execute(
"""
CREATE TABLE IF NOT EXISTS prompts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
prompt TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
"""
)
c.execute(
"""
CREATE TABLE IF NOT EXISTS documents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id INTEGER NOT NULL,
kind TEXT NOT NULL, -- transcript|analysis
title TEXT NOT NULL,
content_md TEXT NOT NULL,
source_document_id INTEGER,
prompt_id INTEGER,
raw_json TEXT,
created_at TEXT NOT NULL,
FOREIGN KEY(project_id) REFERENCES projects(id),
FOREIGN KEY(source_document_id) REFERENCES documents(id),
FOREIGN KEY(prompt_id) REFERENCES prompts(id)
)
"""
)
c.execute(
"""
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
kind TEXT NOT NULL, -- upload|analysis
status TEXT NOT NULL, -- queued|running|done|error
project_id INTEGER,
document_id INTEGER,
prompt_id INTEGER,
title TEXT,
file_path TEXT,
error TEXT,
result_document_id INTEGER,
created_at TEXT NOT NULL,
started_at TEXT,
finished_at TEXT
)
"""
)
# defaults
c.execute("INSERT OR IGNORE INTO projects(name, created_at) VALUES (?,?)", ("Default", now_iso()))
c.execute(
"INSERT OR IGNORE INTO prompts(name, prompt, created_at, updated_at) VALUES (?,?,?,?)",
(
"Zusammenfassung",
"Erstelle eine prägnante Zusammenfassung des Gesprächs in Stichpunkten.",
now_iso(),
now_iso(),
),
)
c.execute(
"INSERT OR IGNORE INTO prompts(name, prompt, created_at, updated_at) VALUES (?,?,?,?)",
(
"Aufgaben",
"Extrahiere alle Aufgaben. Gib pro Aufgabe: Verantwortlich, Aufgabe, Deadline (falls vorhanden), Priorität.",
now_iso(),
now_iso(),
),
)
def layout(title: str, body: str) -> str:
return f"""
{title}
{body}
"""
def get_projects():
with db() as c:
return c.execute("SELECT id,name FROM projects ORDER BY name").fetchall()
def get_project_name(project_id: int) -> str:
with db() as c:
r = c.execute("SELECT name FROM projects WHERE id=?", (project_id,)).fetchone()
return r[0] if r else ""
def get_prompts():
with db() as c:
return c.execute("SELECT id,name,prompt FROM prompts ORDER BY name").fetchall()
def _job_get(job_id: int):
with db() as c:
return c.execute("SELECT * FROM jobs WHERE id=?", (job_id,)).fetchone()
def _job_set(job_id: int, **fields):
if not fields:
return
cols = ", ".join([f"{k}=?" for k in fields.keys()])
vals = list(fields.values()) + [job_id]
with db() as c:
c.execute(f"UPDATE jobs SET {cols} WHERE id=?", vals)
def _process_upload_job(job_id: int):
j = _job_get(job_id)
if not j:
return
if j["status"] == "cancelled":
return
_job_set(job_id, status="running", started_at=now_iso())
try:
j = _job_get(job_id)
if not j or j["status"] == "cancelled":
return
p = Path(j["file_path"])
data = p.read_bytes()
filename = p.name
files = {"file": (filename, data, "application/octet-stream")}
r = requests.post(f"{API_BASE}/transcribe-diarize", files=files, timeout=1800)
r.raise_for_status()
payload = r.json()
j = _job_get(job_id)
if not j or j["status"] == "cancelled":
return
content_md = payload.get("formatted_text", "")
doc_title = (j["title"] or "").strip() or filename
with db() as c:
cur = c.execute(
"INSERT INTO documents(project_id, kind, title, content_md, raw_json, created_at) VALUES (?,?,?,?,?,?)",
(j["project_id"], "transcript", doc_title, content_md, json.dumps(payload, ensure_ascii=False), now_iso()),
)
new_doc_id = cur.lastrowid
_job_set(job_id, status="done", result_document_id=new_doc_id, finished_at=now_iso())
except Exception as e:
_job_set(job_id, status="error", error=str(e), finished_at=now_iso())
def _process_analysis_job(job_id: int):
j = _job_get(job_id)
if not j:
return
if j["status"] == "cancelled":
return
_job_set(job_id, status="running", started_at=now_iso())
try:
j = _job_get(job_id)
if not j or j["status"] == "cancelled":
return
with db() as c:
doc = c.execute("SELECT * FROM documents WHERE id=?", (j["document_id"],)).fetchone()
prm = c.execute("SELECT * FROM prompts WHERE id=?", (j["prompt_id"],)).fetchone()
if not doc or not prm:
raise RuntimeError("Dokument oder Prompt nicht gefunden")
llm_prompt = (
"Du bist ein präziser Assistent. Antworte auf Deutsch.\\n"
f"AUFTRAG:\\n{prm['prompt']}\\n\\n"
f"TEXT:\\n{doc['content_md']}\\n"
)
r = requests.post(
f"{OLLAMA_BASE_URL}/api/generate",
json={"model": OLLAMA_MODEL, "prompt": llm_prompt, "stream": False},
timeout=1200,
)
r.raise_for_status()
answer = r.json().get("response", "")
j = _job_get(job_id)
if not j or j["status"] == "cancelled":
return
with db() as c:
cur = c.execute(
"""
INSERT INTO documents(project_id, kind, title, content_md, source_document_id, prompt_id, raw_json, created_at)
VALUES (?,?,?,?,?,?,?,?)
""",
(
doc["project_id"],
"analysis",
f"Analyse: {prm['name']} · {doc['title']}",
answer,
doc["id"],
prm["id"],
json.dumps({"ollama_response": r.json()}, ensure_ascii=False),
now_iso(),
),
)
new_doc_id = cur.lastrowid
_job_set(job_id, status="done", result_document_id=new_doc_id, finished_at=now_iso())
except Exception as e:
_job_set(job_id, status="error", error=str(e), finished_at=now_iso())
def enqueue_job(kind: str, **kwargs) -> int:
with JOB_LOCK:
with db() as c:
cur = c.execute(
"""
INSERT INTO jobs(kind,status,project_id,document_id,prompt_id,title,file_path,created_at)
VALUES (?,?,?,?,?,?,?,?)
""",
(
kind,
"queued",
kwargs.get("project_id"),
kwargs.get("document_id"),
kwargs.get("prompt_id"),
kwargs.get("title"),
kwargs.get("file_path"),
now_iso(),
),
)
job_id = cur.lastrowid
if kind == "upload":
EXECUTOR.submit(_process_upload_job, job_id)
elif kind == "analysis":
EXECUTOR.submit(_process_analysis_job, job_id)
return job_id
@app.on_event("startup")
def startup():
init_db()
JOB_DIR.mkdir(parents=True, exist_ok=True)
@app.get("/manifest.webmanifest")
def manifest():
return {
"name": "Audio Copilot",
"short_name": "Copilot",
"start_url": "/",
"display": "standalone",
"background_color": "#020617",
"theme_color": "#0f172a",
"icons": [
{"src": "/icon.svg", "sizes": "any", "type": "image/svg+xml", "purpose": "any maskable"}
],
}
@app.get("/icon.svg")
def icon_svg():
svg = """"""
return Response(content=svg, media_type="image/svg+xml")
@app.get("/sw.js")
def sw_js():
js = """
self.addEventListener('install', (event) => { self.skipWaiting(); });
self.addEventListener('activate', (event) => { event.waitUntil(clients.claim()); });
self.addEventListener('fetch', (event) => {
if (event.request.method !== 'GET') return;
event.respondWith(fetch(event.request).catch(() => new Response('offline', {status: 503})));
});
"""
return Response(content=js, media_type="application/javascript")
@app.get("/healthz")
def healthz():
return {"ok": True, "api_base": API_BASE, "ollama_base_url": OLLAMA_BASE_URL, "ollama_model": OLLAMA_MODEL, "db_path": DB_PATH}
@app.get("/", response_class=HTMLResponse)
def upload_page(msg: str = ""):
projects = get_projects()
opts = "".join([f"" for p in projects])
body = f"""
Audio Upload
Audio wird transkribiert + mit Sprechern angereichert und als Dokument gespeichert.
{f"
{msg}
" if msg else ""}
"""
return layout("Upload", body)
@app.post("/projects", response_class=HTMLResponse)
def add_project(name: str = Form(...)):
with db() as c:
c.execute("INSERT INTO projects(name, created_at) VALUES (?,?)", (name.strip(), now_iso()))
return HTMLResponse("")
@app.post("/projects/update", response_class=HTMLResponse)
def rename_project(id: int = Form(...), name: str = Form(...)):
with db() as c:
c.execute("UPDATE projects SET name=? WHERE id=?", (name.strip(), id))
return HTMLResponse("")
@app.post("/projects/{project_id}/delete", response_class=HTMLResponse)
def delete_project(project_id: int):
with db() as c:
default = c.execute("SELECT id FROM projects WHERE name='Default'").fetchone()
if not default:
c.execute("INSERT INTO projects(name, created_at) VALUES (?,?)", ("Default", now_iso()))
default_id = c.execute("SELECT id FROM projects WHERE name='Default'").fetchone()[0]
else:
default_id = default[0]
if project_id == default_id:
return HTMLResponse("")
c.execute("UPDATE documents SET project_id=? WHERE project_id=?", (default_id, project_id))
c.execute("DELETE FROM projects WHERE id=?", (project_id,))
return HTMLResponse("")
@app.post("/upload", response_class=HTMLResponse)
async def upload(project_id: int = Form(...), title: str = Form(""), file: UploadFile = File(...)):
data = await file.read()
if not data:
raise HTTPException(400, "Leere Datei")
JOB_DIR.mkdir(parents=True, exist_ok=True)
filename = (file.filename or "audio.bin").replace("/", "_")
temp_path = JOB_DIR / f"{now_iso().replace(':','-')}_{filename}"
temp_path.write_bytes(data)
job_id = enqueue_job(
"upload",
project_id=project_id,
title=(title or "").strip() or filename,
file_path=str(temp_path),
)
return HTMLResponse(f"")
@app.get("/library", response_class=HTMLResponse)
def library(project_id: Optional[int] = None):
with db() as c:
projects = c.execute("SELECT id,name FROM projects ORDER BY name").fetchall()
if project_id:
docs = c.execute(
"""
SELECT d.id,d.kind,d.title,d.created_at,p.name AS project
FROM documents d JOIN projects p ON p.id=d.project_id
WHERE d.project_id=? ORDER BY d.id DESC
""",
(project_id,),
).fetchall()
else:
docs = c.execute(
"""
SELECT d.id,d.kind,d.title,d.created_at,p.name AS project
FROM documents d JOIN projects p ON p.id=d.project_id
ORDER BY d.id DESC LIMIT 200
"""
).fetchall()
p_opts = "" + "".join(
[f"" for p in projects]
)
items = "".join(
[
f"
"
for d in docs
]
)
project_js = json.dumps([{"value": p["id"], "label": p["name"]} for p in projects], ensure_ascii=False)
body = f"""
Datenbank / Dokumente
{items or '
Keine Einträge.
'}
"""
return layout("Library", body)
@app.get("/document/{doc_id}", response_class=HTMLResponse)
def view_document(doc_id: int):
with db() as c:
d = c.execute(
"""
SELECT d.*, p.name AS project, pr.name AS prompt_name
FROM documents d
JOIN projects p ON p.id=d.project_id
LEFT JOIN prompts pr ON pr.id=d.prompt_id
WHERE d.id=?
""",
(doc_id,),
).fetchone()
if not d:
raise HTTPException(404, "not found")
rendered = md.markdown(d["content_md"] or "", extensions=["fenced_code", "tables", "nl2br"])
projects = get_projects()
body = f"""
"""
return layout("Dokument", body)
@app.get("/document/{doc_id}/download.md", response_class=PlainTextResponse)
def download_md(doc_id: int):
with db() as c:
d = c.execute("SELECT title,content_md FROM documents WHERE id=?", (doc_id,)).fetchone()
if not d:
raise HTTPException(404, "not found")
base = (d["title"] or f"document_{doc_id}").strip()
safe = "".join(ch if ch.isalnum() or ch in ("-", "_", " ") else "_" for ch in base).strip()
safe = safe.replace(" ", "_") or f"document_{doc_id}"
filename = f"{safe}.md"
return PlainTextResponse(
d["content_md"],
headers={"Content-Disposition": f"attachment; filename={filename}"},
)
@app.post("/document/{doc_id}/rename", response_class=HTMLResponse)
def rename_document(doc_id: int, title: str = Form(...)):
with db() as c:
c.execute("UPDATE documents SET title=? WHERE id=?", (title.strip(), doc_id))
return HTMLResponse("")
@app.post("/document/{doc_id}/move", response_class=HTMLResponse)
def move_document(doc_id: int, project_id: int = Form(...)):
with db() as c:
c.execute("UPDATE documents SET project_id=? WHERE id=?", (project_id, doc_id))
return HTMLResponse("")
@app.post("/document/{doc_id}/delete", response_class=HTMLResponse)
def delete_document(doc_id: int):
with db() as c:
c.execute("DELETE FROM documents WHERE id=?", (doc_id,))
return HTMLResponse("")
@app.get("/prompts", response_class=HTMLResponse)
def prompts_page():
with db() as c:
prompts = c.execute("SELECT * FROM prompts ORDER BY name").fetchall()
projects = c.execute("SELECT id,name FROM projects ORDER BY name").fetchall()
p_list = "".join(
[
f"
{p['name']}
{(p['prompt'] or '').replace('<','<')}
"
f""
f""
f"
"
for p in prompts
]
)
project_opts = "".join([f"" for p in projects])
project_list = "".join([
f"
{p['name']}"
f""
f""
f"
" for p in projects
])
body = f"""
Prompt-Konfiguration
Projekte
{project_list}
Prompts
{p_list}
"""
return layout("Prompts", body)
@app.post("/prompts/add", response_class=HTMLResponse)
def prompt_add(name: str = Form(...), prompt: str = Form(...)):
with db() as c:
c.execute(
"INSERT INTO prompts(name,prompt,created_at,updated_at) VALUES (?,?,?,?)",
(name.strip(), prompt.strip(), now_iso(), now_iso()),
)
return HTMLResponse("")
@app.post("/prompts/update", response_class=HTMLResponse)
def prompt_update(id: int = Form(...), name: str = Form(...), prompt: str = Form(...)):
with db() as c:
c.execute("UPDATE prompts SET name=?, prompt=?, updated_at=? WHERE id=?", (name.strip(), prompt.strip(), now_iso(), id))
return HTMLResponse("")
@app.post("/prompts/{prompt_id}/delete", response_class=HTMLResponse)
def prompt_delete(prompt_id: int):
with db() as c:
c.execute("DELETE FROM prompts WHERE id=?", (prompt_id,))
return HTMLResponse("")
def _jobs_payload(limit: int = 200):
with db() as c:
jobs = c.execute(
"""
SELECT j.*, p.name AS project_name, d.title AS document_title, pr.name AS prompt_name
FROM jobs j
LEFT JOIN projects p ON p.id=j.project_id
LEFT JOIN documents d ON d.id=j.document_id
LEFT JOIN prompts pr ON pr.id=j.prompt_id
ORDER BY j.id DESC LIMIT ?
""",
(limit,),
).fetchall()
return [dict(j) for j in jobs]
@app.get("/jobs/data")
def jobs_data(limit: int = 200):
return {"items": _jobs_payload(limit)}
@app.post("/jobs/{job_id}/cancel")
def jobs_cancel(job_id: int):
j = _job_get(job_id)
if not j:
raise HTTPException(404, "job not found")
if j["status"] in ("done", "error", "cancelled"):
return {"ok": True, "status": j["status"]}
_job_set(job_id, status="cancelled", finished_at=now_iso(), error="Cancelled by user")
return {"ok": True, "status": "cancelled"}
@app.post("/jobs/{job_id}/delete")
def jobs_delete(job_id: int):
with db() as c:
c.execute("DELETE FROM jobs WHERE id=?", (job_id,))
return {"ok": True}
@app.get("/jobs", response_class=HTMLResponse)
def jobs_page(queued: Optional[int] = None):
notice = f"
Job #{queued} wurde eingereiht.
" if queued else ""
body = f"""
Hintergrundverarbeitung
Maximal 2 Jobs gleichzeitig. Seite aktualisiert automatisch.
{notice}
"""
return layout("Jobs", body)
@app.get("/run", response_class=HTMLResponse)
def run_page():
with db() as c:
docs = c.execute("SELECT id,title,kind,created_at FROM documents ORDER BY id DESC LIMIT 200").fetchall()
prompts = c.execute("SELECT id,name FROM prompts ORDER BY name").fetchall()
d_opts = "".join([f"" for d in docs])
p_opts = "".join([f"" for p in prompts])
body = f"""
Prompt ausführen
"""
return layout("Run", body)
@app.post("/run", response_class=HTMLResponse)
def run_prompt(document_id: int = Form(...), prompt_id: int = Form(...)):
with db() as c:
doc = c.execute("SELECT id FROM documents WHERE id=?", (document_id,)).fetchone()
prm = c.execute("SELECT id FROM prompts WHERE id=?", (prompt_id,)).fetchone()
if not doc or not prm:
raise HTTPException(404, "Dokument oder Prompt nicht gefunden")
job_id = enqueue_job("analysis", document_id=document_id, prompt_id=prompt_id)
return HTMLResponse(f"")