import json
import os
import sqlite3
import threading
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from pathlib import Path
from typing import List, Optional
import markdown as md
import requests
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
from fastapi.responses import HTMLResponse, PlainTextResponse, Response, JSONResponse, RedirectResponse
API_BASE = os.getenv("API_BASE", "http://gx10.aquantico.lan:8093").rstrip("/")
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://gx10.aquantico.lan:11434").rstrip("/")
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen3.5:9b")
OLLAMA_NUM_PREDICT = int(os.getenv("OLLAMA_NUM_PREDICT", "16384"))
OLLAMA_THINK = os.getenv("OLLAMA_THINK", "true").lower() in ("1", "true", "yes")
DB_PATH = os.getenv("DB_PATH", "/data/ui.db")
app = FastAPI(title="Diarization UI")
JOB_DIR = Path(os.getenv("JOB_DIR", "/data/jobs"))
EXECUTOR = ThreadPoolExecutor(max_workers=2)
JOB_LOCK = threading.Lock()
_JOB_STREAMS: dict = {}
_JOB_STREAM_LOCK = threading.Lock()
def db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def now_iso() -> str:
return datetime.utcnow().isoformat()
def _estimate_num_ctx(prompt: str) -> int:
needed = len(prompt) // 3 + 2048 # rough token estimate + response buffer
for ctx in (4096, 8192, 16384, 32768, 65536):
if needed <= ctx:
return ctx
return 65536
def init_db():
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
with db() as c:
c.execute(
"""
CREATE TABLE IF NOT EXISTS projects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
created_at TEXT NOT NULL
)
"""
)
c.execute(
"""
CREATE TABLE IF NOT EXISTS prompts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
prompt TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
"""
)
c.execute(
"""
CREATE TABLE IF NOT EXISTS documents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id INTEGER NOT NULL,
kind TEXT NOT NULL, -- transcript|analysis
title TEXT NOT NULL,
content_md TEXT NOT NULL,
source_document_id INTEGER,
prompt_id INTEGER,
raw_json TEXT,
created_at TEXT NOT NULL,
FOREIGN KEY(project_id) REFERENCES projects(id),
FOREIGN KEY(source_document_id) REFERENCES documents(id),
FOREIGN KEY(prompt_id) REFERENCES prompts(id)
)
"""
)
c.execute(
"""
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
kind TEXT NOT NULL, -- upload|analysis
status TEXT NOT NULL, -- queued|running|done|error
project_id INTEGER,
document_id INTEGER,
prompt_id INTEGER,
title TEXT,
file_path TEXT,
error TEXT,
result_document_id INTEGER,
created_at TEXT NOT NULL,
started_at TEXT,
finished_at TEXT
)
"""
)
# migrations
try:
c.execute("ALTER TABLE jobs ADD COLUMN user_prompt TEXT")
except Exception:
pass
try:
c.execute("ALTER TABLE jobs ADD COLUMN llm_prompt TEXT")
except Exception:
pass
try:
c.execute("ALTER TABLE jobs ADD COLUMN llm_response TEXT")
except Exception:
pass
try:
c.execute("ALTER TABLE jobs ADD COLUMN llm_thinking TEXT")
except Exception:
pass
# defaults
c.execute("INSERT OR IGNORE INTO projects(name, created_at) VALUES (?,?)", ("Default", now_iso()))
c.execute(
"INSERT OR IGNORE INTO prompts(name, prompt, created_at, updated_at) VALUES (?,?,?,?)",
(
"Zusammenfassung",
"Erstelle eine prägnante Zusammenfassung des Gesprächs in Stichpunkten.",
now_iso(),
now_iso(),
),
)
c.execute(
"INSERT OR IGNORE INTO prompts(name, prompt, created_at, updated_at) VALUES (?,?,?,?)",
(
"Aufgaben",
"Extrahiere alle Aufgaben. Gib pro Aufgabe: Verantwortlich, Aufgabe, Deadline (falls vorhanden), Priorität.",
now_iso(),
now_iso(),
),
)
def layout(title: str, body: str) -> str:
return f"""
{title}
{title}
{body}
"""
def get_projects():
with db() as c:
return c.execute("SELECT id,name FROM projects ORDER BY name").fetchall()
def get_project_name(project_id: int) -> str:
with db() as c:
r = c.execute("SELECT name FROM projects WHERE id=?", (project_id,)).fetchone()
return r[0] if r else ""
def get_prompts():
with db() as c:
return c.execute("SELECT id,name,prompt FROM prompts ORDER BY name").fetchall()
def _job_get(job_id: int):
with db() as c:
row = c.execute("SELECT * FROM jobs WHERE id=?", (job_id,)).fetchone()
return dict(row) if row else None
def _job_set(job_id: int, **fields):
if not fields:
return
cols = ", ".join([f"{k}=?" for k in fields.keys()])
vals = list(fields.values()) + [job_id]
with db() as c:
c.execute(f"UPDATE jobs SET {cols} WHERE id=?", vals)
def _process_upload_job(job_id: int):
j = _job_get(job_id)
if not j:
return
if j["status"] == "cancelled":
return
_job_set(job_id, status="running", started_at=now_iso())
try:
j = _job_get(job_id)
if not j or j["status"] == "cancelled":
return
p = Path(j["file_path"])
data = p.read_bytes()
filename = p.name
files = {"file": (filename, data, "application/octet-stream")}
r = requests.post(f"{API_BASE}/transcribe-diarize", files=files, timeout=1800)
r.raise_for_status()
payload = r.json()
j = _job_get(job_id)
if not j or j["status"] == "cancelled":
return
content_md = payload.get("formatted_text", "")
doc_title = (j["title"] or "").strip() or filename
with db() as c:
cur = c.execute(
"INSERT INTO documents(project_id, kind, title, content_md, raw_json, created_at) VALUES (?,?,?,?,?,?)",
(j["project_id"], "transcript", doc_title, content_md, json.dumps(payload, ensure_ascii=False), now_iso()),
)
new_doc_id = cur.lastrowid
_job_set(job_id, status="done", result_document_id=new_doc_id, finished_at=now_iso())
except Exception as e:
_job_set(job_id, status="error", error=str(e), finished_at=now_iso())
def _process_analysis_job(job_id: int):
j = _job_get(job_id)
if not j:
return
if j["status"] == "cancelled":
return
_job_set(job_id, status="running", started_at=now_iso())
try:
j = _job_get(job_id)
if not j or j["status"] == "cancelled":
return
with db() as c:
doc = c.execute("SELECT * FROM documents WHERE id=?", (j["document_id"],)).fetchone()
prm = c.execute("SELECT * FROM prompts WHERE id=?", (j["prompt_id"],)).fetchone()
if not doc or not prm:
raise RuntimeError("Dokument oder Prompt nicht gefunden")
if not (doc["content_md"] or "").strip():
raise RuntimeError("Dokument hat keinen Inhalt – bitte zuerst das Transkript prüfen")
user_extra = (j.get("user_prompt") or "").strip()
llm_prompt = (
"Du bist ein präziser Assistent. Antworte auf Deutsch.\\n"
f"AUFTRAG:\\n{prm['prompt']}\\n"
+ (f"\\nZUSATZINFOS:\\n{user_extra}\\n" if user_extra else "")
+ f"\\nTEXT:\\n{doc['content_md']}\\n"
)
num_ctx = _estimate_num_ctx(llm_prompt)
_job_set(job_id, llm_prompt=f"[num_ctx={num_ctx}]\n\n{llm_prompt}")
with _JOB_STREAM_LOCK:
_JOB_STREAMS[job_id] = {"thinking": "", "response": ""}
r = requests.post(
f"{OLLAMA_BASE_URL}/api/generate",
json={"model": OLLAMA_MODEL, "prompt": llm_prompt, "stream": True, "think": OLLAMA_THINK, "options": {
"num_ctx": num_ctx,
"num_predict": OLLAMA_NUM_PREDICT,
"repeat_penalty": 1.15,
"repeat_last_n": 128,
}},
stream=True,
timeout=1200,
)
r.raise_for_status()
acc_thinking = ""
acc_response = ""
ollama_final = {}
chunk_count = 0
for line in r.iter_lines():
if not line:
continue
chunk = json.loads(line)
acc_thinking += chunk.get("thinking") or ""
acc_response += chunk.get("response") or ""
with _JOB_STREAM_LOCK:
_JOB_STREAMS[job_id] = {"thinking": acc_thinking, "response": acc_response}
chunk_count += 1
if chunk_count % 100 == 0:
j_check = _job_get(job_id)
if not j_check or j_check["status"] == "cancelled":
return
if chunk.get("done"):
ollama_final = chunk
break
answer = acc_response
j = _job_get(job_id)
if not j or j["status"] == "cancelled":
return
with db() as c:
cur = c.execute(
"""
INSERT INTO documents(project_id, kind, title, content_md, source_document_id, prompt_id, raw_json, created_at)
VALUES (?,?,?,?,?,?,?,?)
""",
(
doc["project_id"],
"analysis",
f"Analyse: {prm['name']} · {doc['title']}",
answer,
doc["id"],
prm["id"],
json.dumps({"ollama_response": ollama_final}, ensure_ascii=False),
now_iso(),
),
)
new_doc_id = cur.lastrowid
_job_set(job_id, status="done", result_document_id=new_doc_id, finished_at=now_iso(), llm_response=answer, llm_thinking=acc_thinking or None)
except Exception as e:
_job_set(job_id, status="error", error=str(e), finished_at=now_iso())
finally:
with _JOB_STREAM_LOCK:
_JOB_STREAMS.pop(job_id, None)
def enqueue_job(kind: str, **kwargs) -> int:
with JOB_LOCK:
with db() as c:
cur = c.execute(
"""
INSERT INTO jobs(kind,status,project_id,document_id,prompt_id,title,file_path,user_prompt,created_at)
VALUES (?,?,?,?,?,?,?,?,?)
""",
(
kind,
"queued",
kwargs.get("project_id"),
kwargs.get("document_id"),
kwargs.get("prompt_id"),
kwargs.get("title"),
kwargs.get("file_path"),
kwargs.get("user_prompt") or None,
now_iso(),
),
)
job_id = cur.lastrowid
if kind == "upload":
EXECUTOR.submit(_process_upload_job, job_id)
elif kind == "analysis":
EXECUTOR.submit(_process_analysis_job, job_id)
return job_id
@app.on_event("startup")
def startup():
init_db()
JOB_DIR.mkdir(parents=True, exist_ok=True)
@app.get("/manifest.webmanifest")
def manifest():
return {
"name": "VoiceLog",
"short_name": "VoiceLog",
"start_url": "/",
"display": "standalone",
"background_color": "#020617",
"theme_color": "#0f172a",
"icons": [
{"src": "/icon.svg", "sizes": "any", "type": "image/svg+xml", "purpose": "any maskable"}
],
}
@app.get("/icon.svg")
def icon_svg():
svg = """"""
return Response(content=svg, media_type="image/svg+xml")
@app.get("/sw.js")
def sw_js():
js = """
self.addEventListener('install', (event) => { self.skipWaiting(); });
self.addEventListener('activate', (event) => { event.waitUntil(clients.claim()); });
self.addEventListener('fetch', (event) => {
if (event.request.method !== 'GET') return;
event.respondWith(fetch(event.request).catch(() => new Response('offline', {status: 503})));
});
"""
return Response(content=js, media_type="application/javascript")
@app.get("/healthz")
def healthz():
return {"ok": True, "api_base": API_BASE, "ollama_base_url": OLLAMA_BASE_URL, "ollama_model": OLLAMA_MODEL, "db_path": DB_PATH}
@app.get("/", response_class=HTMLResponse)
def upload_page(msg: str = ""):
projects = get_projects()
existing_names = json.dumps([p["name"] for p in projects], ensure_ascii=False)
existing_map = json.dumps({p["name"]: p["id"] for p in projects}, ensure_ascii=False)
body = f"""
Audio Upload
Mehrere Audiodateien gleichzeitig hochladen — je Datei wird ein Transkriptions-Job erstellt.
{f"
{msg}
" if msg else ""}
Mehrere Dateien: Strg / Cmd gedrückt halten beim Auswählen
Warteschlange:
"""
return layout("Upload", body)
@app.post("/projects", response_class=HTMLResponse)
def add_project(name: str = Form(...)):
with db() as c:
c.execute("INSERT INTO projects(name, created_at) VALUES (?,?)", (name.strip(), now_iso()))
return HTMLResponse("")
@app.post("/projects/update", response_class=HTMLResponse)
def rename_project(id: int = Form(...), name: str = Form(...)):
with db() as c:
c.execute("UPDATE projects SET name=? WHERE id=?", (name.strip(), id))
return HTMLResponse("")
@app.post("/projects/{project_id}/delete", response_class=HTMLResponse)
def delete_project(project_id: int):
with db() as c:
default = c.execute("SELECT id FROM projects WHERE name='Default'").fetchone()
if not default:
c.execute("INSERT INTO projects(name, created_at) VALUES (?,?)", ("Default", now_iso()))
default_id = c.execute("SELECT id FROM projects WHERE name='Default'").fetchone()[0]
else:
default_id = default[0]
if project_id == default_id:
return HTMLResponse("")
c.execute("UPDATE documents SET project_id=? WHERE project_id=?", (default_id, project_id))
c.execute("DELETE FROM projects WHERE id=?", (project_id,))
return HTMLResponse("")
@app.post("/projects/create-api")
def create_project_api(name: str = Form(...)):
name = name.strip()
if not name:
raise HTTPException(400, "Name darf nicht leer sein")
with db() as c:
existing = c.execute("SELECT id FROM projects WHERE name=?", (name,)).fetchone()
if existing:
return {"id": existing["id"]}
cur = c.execute("INSERT INTO projects(name, created_at) VALUES (?,?)", (name, now_iso()))
return {"id": cur.lastrowid}
@app.post("/upload")
async def upload(project_id: int = Form(...), file: UploadFile = File(...)):
data = await file.read()
if not data:
raise HTTPException(400, "Leere Datei")
JOB_DIR.mkdir(parents=True, exist_ok=True)
filename = (file.filename or "audio.bin").replace("/", "_")
stem = filename.rsplit(".", 1)[0] or filename
temp_path = JOB_DIR / f"{now_iso().replace(':','-')}_{filename}"
temp_path.write_bytes(data)
job_id = enqueue_job(
"upload",
project_id=project_id,
title=stem,
file_path=str(temp_path),
)
return {"job_id": job_id}
@app.get("/library", response_class=HTMLResponse)
def library(project_id: Optional[str] = None, q_title: str = "", q_content: str = ""):
title_q = (q_title or "").strip()
content_q = (q_content or "").strip()
project_id_int = int(project_id) if (project_id and str(project_id).strip()) else None
where = []
params = []
if project_id_int:
where.append("d.project_id=?")
params.append(project_id_int)
if title_q:
where.append("LOWER(d.title) LIKE LOWER(?)")
params.append(f"%{title_q}%")
if content_q:
where.append("LOWER(d.content_md) LIKE LOWER(?)")
params.append(f"%{content_q}%")
where_sql = ("WHERE " + " AND ".join(where)) if where else ""
with db() as c:
projects = c.execute("SELECT id,name FROM projects ORDER BY name").fetchall()
docs = c.execute(
f"""
SELECT d.id,d.kind,d.title,d.created_at,p.name AS project
FROM documents d JOIN projects p ON p.id=d.project_id
{where_sql}
ORDER BY d.id DESC LIMIT 500
""",
tuple(params),
).fetchall()
p_opts = "" + "".join(
[f"" for p in projects]
)
rows = "".join(
[
f"
"
for d in docs
]
)
project_js = json.dumps([{"value": p["id"], "label": p["name"]} for p in projects], ensure_ascii=False)
body = f"""
Projekte · Dokumente
{len(docs)} Treffer
Ansicht im Projektlisten-Stil mit Schnellaktionen.
0 ausgewählt
ID
Titel
Typ
Projekt
Erstellt
Aktionen
{rows or "
Keine Einträge.
"}
"""
return layout("Library", body)
@app.get("/document/{doc_id}", response_class=HTMLResponse)
def view_document(doc_id: int):
with db() as c:
d = c.execute(
"""
SELECT d.*, p.name AS project, pr.name AS prompt_name
FROM documents d
JOIN projects p ON p.id=d.project_id
LEFT JOIN prompts pr ON pr.id=d.prompt_id
WHERE d.id=?
""",
(doc_id,),
).fetchone()
if not d:
raise HTTPException(404, "not found")
rendered = md.markdown(d["content_md"] or "", extensions=["fenced_code", "tables", "nl2br"])
projects = get_projects()
content_escaped = (d['content_md'] or '').replace('`', '`').replace('', '<\\/script>')
body = f"""
"""
return layout("Dokument", body)
@app.get("/document/{doc_id}/download.md", response_class=PlainTextResponse)
def download_md(doc_id: int):
with db() as c:
d = c.execute("SELECT title,content_md FROM documents WHERE id=?", (doc_id,)).fetchone()
if not d:
raise HTTPException(404, "not found")
base = (d["title"] or f"document_{doc_id}").strip()
safe = "".join(ch if ch.isalnum() or ch in ("-", "_", " ") else "_" for ch in base).strip()
safe = safe.replace(" ", "_") or f"document_{doc_id}"
filename = f"{safe}.md"
return PlainTextResponse(
d["content_md"],
headers={"Content-Disposition": f"attachment; filename={filename}"},
)
@app.post("/document/{doc_id}/edit", response_class=HTMLResponse)
def edit_document(doc_id: int, content_md: str = Form(...)):
with db() as c:
c.execute("UPDATE documents SET content_md=? WHERE id=?", (content_md, doc_id))
return HTMLResponse(f"")
@app.post("/document/{doc_id}/rename", response_class=HTMLResponse)
def rename_document(doc_id: int, title: str = Form(...)):
with db() as c:
c.execute("UPDATE documents SET title=? WHERE id=?", (title.strip(), doc_id))
return HTMLResponse("")
@app.post("/document/{doc_id}/move", response_class=HTMLResponse)
def move_document(doc_id: int, project_id: int = Form(...)):
with db() as c:
c.execute("UPDATE documents SET project_id=? WHERE id=?", (project_id, doc_id))
return HTMLResponse("")
@app.post("/document/{doc_id}/delete", response_class=HTMLResponse)
def delete_document(doc_id: int):
with db() as c:
c.execute("DELETE FROM documents WHERE id=?", (doc_id,))
return HTMLResponse("")
@app.post("/documents/bulk-move", response_class=HTMLResponse)
def bulk_move_documents(ids: List[int] = Form(...), project_id: int = Form(...)):
with db() as c:
c.executemany("UPDATE documents SET project_id=? WHERE id=?", [(project_id, i) for i in ids])
return HTMLResponse("")
@app.post("/documents/bulk-delete", response_class=HTMLResponse)
def bulk_delete_documents(ids: List[int] = Form(...)):
with db() as c:
c.executemany("DELETE FROM documents WHERE id=?", [(i,) for i in ids])
return HTMLResponse("")
@app.get("/prompts", response_class=HTMLResponse)
def prompts_page():
with db() as c:
prompts = c.execute("SELECT * FROM prompts ORDER BY name").fetchall()
projects = c.execute("SELECT id,name FROM projects ORDER BY name").fetchall()
project_opts = "".join([f"" for p in projects])
project_list = "".join([
f"
"
f"
{p['name']}
Projekt
"
f""
f""
f"
" for p in projects
]) or "
Keine Projekte vorhanden.
"
prompt_list = "".join(
[
f"
"
f"
{p['name']}
#{p['id']}
"
f""
f""
f"
"
for p in prompts
]
) or "
Keine Prompts vorhanden.
"
body = f"""
{len(prompts)} Prompts · {len(projects)} Projekte
Neues Projekt anlegen
{project_list}
Neuen Prompt anlegen
{prompt_list}
Prompt Vorschau
Lade …
Prompt bearbeiten (Vollbild)
"""
return layout("Prompts & Projekte", body)
@app.get("/prompts/{prompt_id}/preview")
def prompt_preview(prompt_id: int):
with db() as c:
p = c.execute("SELECT id,name,prompt FROM prompts WHERE id=?", (prompt_id,)).fetchone()
if not p:
raise HTTPException(404, "Prompt nicht gefunden")
html = md.markdown(p["prompt"] or "", extensions=["fenced_code", "tables", "nl2br"])
return {"id": p["id"], "name": p["name"], "html": html}
@app.post("/prompts/add", response_class=HTMLResponse)
def prompt_add(name: str = Form(...), prompt: str = Form(...)):
with db() as c:
c.execute(
"INSERT INTO prompts(name,prompt,created_at,updated_at) VALUES (?,?,?,?)",
(name.strip(), prompt.strip(), now_iso(), now_iso()),
)
return HTMLResponse("")
@app.post("/prompts/update", response_class=HTMLResponse)
def prompt_update(id: int = Form(...), name: str = Form(...), prompt: str = Form(...)):
with db() as c:
c.execute("UPDATE prompts SET name=?, prompt=?, updated_at=? WHERE id=?", (name.strip(), prompt.strip(), now_iso(), id))
return HTMLResponse("")
@app.post("/prompts/{prompt_id}/delete", response_class=HTMLResponse)
def prompt_delete(prompt_id: int):
with db() as c:
c.execute("DELETE FROM prompts WHERE id=?", (prompt_id,))
return HTMLResponse("")
def _parse_utcish(ts: Optional[str]) -> Optional[datetime]:
if not ts:
return None
try:
return datetime.fromisoformat(str(ts).replace("Z", "+00:00")).replace(tzinfo=None)
except Exception:
try:
return datetime.fromisoformat(str(ts))
except Exception:
return None
def _fmt_elapsed(start_iso: Optional[str], end_iso: Optional[str] = None) -> str:
s = _parse_utcish(start_iso)
if not s:
return "-"
try:
e = _parse_utcish(end_iso) if end_iso else datetime.utcnow()
if not e:
e = datetime.utcnow()
sec = max(0, int((e - s).total_seconds()))
if sec < 60:
return f"{sec}s"
m, s2 = divmod(sec, 60)
if m < 60:
return f"{m}m {s2}s"
h, m2 = divmod(m, 60)
return f"{h}h {m2}m"
except Exception:
return "-"
def _jobs_payload(limit: int = 200):
with db() as c:
jobs = c.execute(
"""
SELECT j.*, p.name AS project_name, d.title AS document_title, pr.name AS prompt_name
FROM jobs j
LEFT JOIN projects p ON p.id=j.project_id
LEFT JOIN documents d ON d.id=j.document_id
LEFT JOIN prompts pr ON pr.id=j.prompt_id
ORDER BY j.id DESC LIMIT ?
""",
(limit,),
).fetchall()
return [dict(j) for j in jobs]
@app.get("/jobs/data")
def jobs_data(limit: int = 200):
return {"items": _jobs_payload(limit)}
@app.post("/jobs/{job_id}/cancel")
def jobs_cancel(job_id: int):
j = _job_get(job_id)
if not j:
raise HTTPException(404, "job not found")
if j["status"] not in ("done", "error", "cancelled"):
_job_set(job_id, status="cancelled", finished_at=now_iso(), error="Cancelled by user")
return {"ok": True, "status": "cancelled"}
@app.post("/jobs/{job_id}/cancel-form")
def jobs_cancel_form(job_id: int):
jobs_cancel(job_id)
return RedirectResponse(url="/jobs", status_code=303)
@app.post("/jobs/{job_id}/delete")
def jobs_delete(job_id: int):
with db() as c:
c.execute("DELETE FROM jobs WHERE id=?", (job_id,))
return {"ok": True}
@app.post("/jobs/{job_id}/delete-form")
def jobs_delete_form(job_id: int):
jobs_delete(job_id)
return RedirectResponse(url="/jobs", status_code=303)
@app.get("/jobs/{job_id}/debug-data")
def job_debug_data(job_id: int):
j = _job_get(job_id)
if not j:
raise HTTPException(404, "job not found")
with _JOB_STREAM_LOCK:
live = dict(_JOB_STREAMS[job_id]) if job_id in _JOB_STREAMS else None
return {
"status": j["status"],
"kind": j["kind"],
"llm_prompt": j.get("llm_prompt"),
"llm_thinking": live["thinking"] if live is not None else j.get("llm_thinking"),
"llm_response": live["response"] if live is not None else j.get("llm_response"),
"streaming": live is not None,
}
@app.get("/jobs", response_class=HTMLResponse)
def jobs_page(queued: Optional[int] = None):
items = _jobs_payload(200)
def _badge(status: str) -> str:
s = (status or "").lower()
if s == "done":
return "success"
if s in ("running", "queued"):
return "primary"
if s == "cancelled":
return "warning"
return "danger"
cards = []
for it in items:
start_ts = it.get("started_at") or it.get("created_at") or ""
end_ts = it.get("finished_at") or ""
actions = ""
if it["status"] not in ("done", "error", "cancelled"):
actions += f" "
actions += f" "
if it.get("result_document_id"):
actions += f" Ergebnis"
if it["kind"] == "analysis":
actions += f" "
err = f"
"
f"{('Projekt: '+it['project_name']) if it.get('project_name') else ''}"
f"{(' Dokument: '+it['document_title']) if it.get('document_title') else ''}"
f"{(' Prompt: '+it['prompt_name']) if it.get('prompt_name') else ''}"
f"
"
f"
{actions}
"
f"{err}"
f"
"
)
pre = "".join(cards) or "
Keine Jobs.
"
notice = f"
Job #{queued} wurde eingereiht.
" if queued else ""
body = f"""
Hintergrundverarbeitung
Maximal 2 Jobs gleichzeitig. Seite aktualisiert automatisch.
{notice}
0 ausgewählt
{pre}
Debug · Job #
"""
return layout("Jobs", body)
@app.get("/run", response_class=HTMLResponse)
def run_page():
with db() as c:
docs = c.execute("SELECT id,title,kind,created_at FROM documents ORDER BY id DESC LIMIT 200").fetchall()
prompts = c.execute("SELECT id,name FROM prompts ORDER BY name").fetchall()
projects = c.execute("SELECT id,name FROM projects ORDER BY name").fetchall()
d_opts = "".join([f"" for d in docs])
p_opts = "".join([f"" for p in prompts])
proj_opts = "".join([f"" for p in projects])
body = f"""
Prompt ausführen
"""
return layout("Run", body)
@app.post("/run", response_class=HTMLResponse)
def run_prompt(document_id: int = Form(...), prompt_id: int = Form(...), user_prompt: str = Form("")):
with db() as c:
doc = c.execute("SELECT id FROM documents WHERE id=?", (document_id,)).fetchone()
prm = c.execute("SELECT id FROM prompts WHERE id=?", (prompt_id,)).fetchone()
if not doc or not prm:
raise HTTPException(404, "Dokument oder Prompt nicht gefunden")
job_id = enqueue_job("analysis", document_id=document_id, prompt_id=prompt_id, user_prompt=user_prompt.strip() or None)
return HTMLResponse(f"")
@app.post("/run/project", response_class=HTMLResponse)
def run_project(project_id: int = Form(...), prompt_id: int = Form(...), user_prompt: str = Form("")):
with db() as c:
prm = c.execute("SELECT id FROM prompts WHERE id=?", (prompt_id,)).fetchone()
transcripts = c.execute(
"SELECT id FROM documents WHERE project_id=? AND kind='transcript'",
(project_id,),
).fetchall()
if not prm:
raise HTTPException(404, "Prompt nicht gefunden")
if not transcripts:
raise HTTPException(400, "Keine Transkripte im Projekt gefunden")
up = user_prompt.strip() or None
for doc in transcripts:
enqueue_job("analysis", document_id=doc["id"], prompt_id=prompt_id, user_prompt=up)
return HTMLResponse("")