lingo-agent / health_server.py
lucsanscartier's picture
Remove local LLM import after OOM rollback
6bfb4b9 verified
raw
history blame
8.16 kB
"""
health_server.py — tiny FastAPI status surface for Hugging Face Spaces.
This lets the Space expose /health while the LiveKit worker runs in the same
container. It intentionally avoids exposing secrets or caller transcripts.
"""
from __future__ import annotations
import logging
import os
import re
import threading
import time
from pathlib import Path
from typing import Any, Dict
import httpx
import uvicorn
from fastapi import FastAPI
from config import Settings
import llm_engine
import memory
import tts_engine
logger = logging.getLogger(__name__)
STARTED_AT = time.time()
SELFTEST_CACHE: Dict[str, Any] = {"timestamp": 0.0, "payload": None}
SELFTEST_TTL_SECONDS = 120
TOKEN_PATTERN = re.compile(r"hf_[A-Za-z0-9_\-]{6,}")
STATE: Dict[str, Any] = {
"worker_started": False,
"last_error": None,
"active_calls": 0,
"calls_started": 0,
"calls_ended": 0,
"escalations": 0,
}
app = FastAPI(title="LINGO Runtime", version="0.2.0")
def health_payload() -> Dict[str, Any]:
return {
"ok": True,
"service": "lingo-agent",
"version": "0.2.0",
"uptime_seconds": round(time.time() - STARTED_AT, 2),
"worker_started": STATE["worker_started"],
}
def _local_llm_info(settings: Settings) -> Dict[str, Any]:
return {
"enabled": settings.local_llm_enabled,
"repo_id": settings.local_llm_repo_id,
"filename": settings.local_llm_filename,
"disabled_reason": "CPU Basic build was OOMKilled with llama-cpp-python; using local rules fallback until higher hardware/prebuilt runtime is available.",
}
def diagnostics_payload() -> Dict[str, Any]:
settings = Settings.from_env()
missing = settings.missing_required()
memory_db = Path(settings.memory_db)
data_dir = memory_db.parent
return {
"ok": True,
"service": "lingo-agent",
"version": "0.2.0",
"required_secrets_present": len(missing) == 0,
"missing_required_secret_names": missing,
"worker_started": STATE["worker_started"],
"last_error": STATE["last_error"],
"runtime": {
"port": settings.health_port,
"health_enabled": settings.health_enabled,
"memory_backend": settings.memory_backend,
"memory_db": settings.memory_db,
"data_dir_exists": data_dir.exists(),
"data_dir_writable": os.access(data_dir, os.W_OK) if data_dir.exists() else False,
"hf_chat_model": settings.hf_chat_model,
"hf_chat_url": settings.hf_chat_url,
"hf_tts_model": settings.hf_tts_model,
"hf_tts_url": settings.hf_tts_url,
"local_llm": _local_llm_info(settings),
"business_name_set": bool(settings.business_name),
"escalation_webhook_configured": bool(settings.escalation_webhook_url),
"owner_alert_email_configured": bool(settings.owner_alert_email),
},
"warning": "No secret values are exposed by this endpoint.",
}
def landing_payload(path: str = "/") -> Dict[str, Any]:
return {
**health_payload(),
"path": path,
"routes": [
"/",
"/health",
"/ready",
"/readyz",
"/status",
"/metrics",
"/diagnostics",
"/api/diagnostics",
"/selftest/inference",
],
"note": "LINGO beta phone-agent backend. No secret values are exposed.",
}
def _sanitize(value: Any, settings: Settings) -> Any:
if isinstance(value, dict):
return {k: _sanitize(v, settings) for k, v in value.items()}
if isinstance(value, list):
return [_sanitize(v, settings) for v in value]
text = str(value)
text = TOKEN_PATTERN.sub("[REDACTED_HF_TOKEN]", text)
if settings.hf_token:
text = text.replace(settings.hf_token, "[REDACTED_HF_TOKEN]")
return text[:1000]
async def run_hf_selftest() -> Dict[str, Any]:
"""Run a tiny fixed chat + local TTS validation using Space secrets."""
now = time.time()
cached = SELFTEST_CACHE.get("payload")
if cached and now - float(SELFTEST_CACHE.get("timestamp", 0.0)) < SELFTEST_TTL_SECONDS:
return {**cached, "cached": True}
settings = Settings.from_env()
missing = settings.missing_required()
if missing:
return {
"ok": False,
"missing_required_secret_names": missing,
"warning": "No secret values are exposed by this endpoint.",
}
result: Dict[str, Any] = {
"ok": False,
"chat": {"ok": False},
"tts": {"ok": False},
"local_llm": _local_llm_info(settings),
"warning": "No secret values are exposed by this endpoint.",
}
async with httpx.AsyncClient() as client:
chat_result = await llm_engine.chat_completion(
messages=[{"role": "user", "content": "Say LINGO_OK in one word."}],
http_client=client,
settings=settings,
max_tokens=16,
temperature=0,
allow_local_fallback=True,
)
result["chat"] = _sanitize(chat_result, settings)
try:
audio = await tts_engine.synthesize_tts_wav("LINGO test.", client, settings)
result["tts"] = {
"ok": True,
"engine": "local-espeak-ng-or-provider-fallback",
"audio_bytes": len(audio),
}
except Exception as exc: # noqa: BLE001
result["tts"] = {"ok": False, "exception": _sanitize(str(exc), settings)}
result["ok"] = bool(result["chat"].get("ok")) and bool(result["tts"].get("ok"))
SELFTEST_CACHE["timestamp"] = now
SELFTEST_CACHE["payload"] = result
return result
@app.get("/")
def root() -> Dict[str, Any]:
return landing_payload("/")
@app.get("/health")
def health() -> Dict[str, Any]:
return health_payload()
@app.get("/ready")
def ready() -> Dict[str, Any]:
return health_payload()
@app.get("/readyz")
def readyz() -> Dict[str, Any]:
return health_payload()
@app.get("/status")
def status() -> Dict[str, Any]:
return {
"ok": True,
"state": STATE,
"callers_known": len(memory.all_callers()),
"diagnostics": diagnostics_payload(),
}
@app.get("/metrics")
def metrics() -> Dict[str, Any]:
return {
"ok": True,
"calls_started": STATE["calls_started"],
"calls_ended": STATE["calls_ended"],
"active_calls": STATE["active_calls"],
"escalations": STATE["escalations"],
"recent_events": memory.recent_events(limit=20),
}
@app.get("/diagnostics")
def diagnostics() -> Dict[str, Any]:
return diagnostics_payload()
@app.get("/diagnostics/")
def diagnostics_slash() -> Dict[str, Any]:
return diagnostics_payload()
@app.get("/api/diagnostics")
def api_diagnostics() -> Dict[str, Any]:
return diagnostics_payload()
@app.get("/api/health")
def api_health() -> Dict[str, Any]:
return health_payload()
@app.get("/selftest/inference")
async def selftest_inference() -> Dict[str, Any]:
return await run_hf_selftest()
@app.get("/{full_path:path}")
def catch_all(full_path: str) -> Dict[str, Any]:
return landing_payload("/" + full_path)
def mark_worker_started() -> None:
STATE["worker_started"] = True
def set_last_error(error: str | None) -> None:
STATE["last_error"] = error
def record_call_start() -> None:
STATE["calls_started"] += 1
STATE["active_calls"] += 1
def record_call_end() -> None:
STATE["calls_ended"] += 1
STATE["active_calls"] = max(0, STATE["active_calls"] - 1)
def record_escalation() -> None:
STATE["escalations"] += 1
def start_health_server(host: str = "0.0.0.0", port: int = 7860) -> threading.Thread:
"""Start the HTTP health server in a daemon thread."""
def _run() -> None:
logger.info("Starting health server on %s:%s", host, port)
uvicorn.run(app, host=host, port=port, log_level="info")
thread = threading.Thread(target=_run, name="lingo-health-server", daemon=True)
thread.start()
return thread