""" health_server.py — tiny FastAPI status surface for Hugging Face Spaces. This lets the Space expose /health while the LiveKit worker runs in the same container. It intentionally avoids exposing secrets or caller transcripts. """ from __future__ import annotations import logging import os import re import threading import time from pathlib import Path from typing import Any, Dict import httpx import uvicorn from fastapi import FastAPI from config import Settings import llm_engine import memory import tts_engine logger = logging.getLogger(__name__) STARTED_AT = time.time() SELFTEST_CACHE: Dict[str, Any] = {"timestamp": 0.0, "payload": None} SELFTEST_TTL_SECONDS = 120 TOKEN_PATTERN = re.compile(r"hf_[A-Za-z0-9_\-]{6,}") STATE: Dict[str, Any] = { "worker_started": False, "last_error": None, "active_calls": 0, "calls_started": 0, "calls_ended": 0, "escalations": 0, } app = FastAPI(title="LINGO Runtime", version="0.2.0") def health_payload() -> Dict[str, Any]: return { "ok": True, "service": "lingo-agent", "version": "0.2.0", "uptime_seconds": round(time.time() - STARTED_AT, 2), "worker_started": STATE["worker_started"], } def _local_llm_info(settings: Settings) -> Dict[str, Any]: return { "enabled": settings.local_llm_enabled, "repo_id": settings.local_llm_repo_id, "filename": settings.local_llm_filename, "disabled_reason": "CPU Basic build was OOMKilled with llama-cpp-python; using local rules fallback until higher hardware/prebuilt runtime is available.", } def diagnostics_payload() -> Dict[str, Any]: settings = Settings.from_env() missing = settings.missing_required() memory_db = Path(settings.memory_db) data_dir = memory_db.parent return { "ok": True, "service": "lingo-agent", "version": "0.2.0", "required_secrets_present": len(missing) == 0, "missing_required_secret_names": missing, "worker_started": STATE["worker_started"], "last_error": STATE["last_error"], "runtime": { "port": settings.health_port, "health_enabled": settings.health_enabled, "memory_backend": settings.memory_backend, "memory_db": settings.memory_db, "data_dir_exists": data_dir.exists(), "data_dir_writable": os.access(data_dir, os.W_OK) if data_dir.exists() else False, "hf_chat_model": settings.hf_chat_model, "hf_chat_url": settings.hf_chat_url, "hf_tts_model": settings.hf_tts_model, "hf_tts_url": settings.hf_tts_url, "local_llm": _local_llm_info(settings), "business_name_set": bool(settings.business_name), "escalation_webhook_configured": bool(settings.escalation_webhook_url), "owner_alert_email_configured": bool(settings.owner_alert_email), }, "warning": "No secret values are exposed by this endpoint.", } def landing_payload(path: str = "/") -> Dict[str, Any]: return { **health_payload(), "path": path, "routes": [ "/", "/health", "/ready", "/readyz", "/status", "/metrics", "/diagnostics", "/api/diagnostics", "/selftest/inference", ], "note": "LINGO beta phone-agent backend. No secret values are exposed.", } def _sanitize(value: Any, settings: Settings) -> Any: if isinstance(value, dict): return {k: _sanitize(v, settings) for k, v in value.items()} if isinstance(value, list): return [_sanitize(v, settings) for v in value] text = str(value) text = TOKEN_PATTERN.sub("[REDACTED_HF_TOKEN]", text) if settings.hf_token: text = text.replace(settings.hf_token, "[REDACTED_HF_TOKEN]") return text[:1000] async def run_hf_selftest() -> Dict[str, Any]: """Run a tiny fixed chat + local TTS validation using Space secrets.""" now = time.time() cached = SELFTEST_CACHE.get("payload") if cached and now - float(SELFTEST_CACHE.get("timestamp", 0.0)) < SELFTEST_TTL_SECONDS: return {**cached, "cached": True} settings = Settings.from_env() missing = settings.missing_required() if missing: return { "ok": False, "missing_required_secret_names": missing, "warning": "No secret values are exposed by this endpoint.", } result: Dict[str, Any] = { "ok": False, "chat": {"ok": False}, "tts": {"ok": False}, "local_llm": _local_llm_info(settings), "warning": "No secret values are exposed by this endpoint.", } async with httpx.AsyncClient() as client: chat_result = await llm_engine.chat_completion( messages=[{"role": "user", "content": "Say LINGO_OK in one word."}], http_client=client, settings=settings, max_tokens=16, temperature=0, allow_local_fallback=True, ) result["chat"] = _sanitize(chat_result, settings) try: audio = await tts_engine.synthesize_tts_wav("LINGO test.", client, settings) result["tts"] = { "ok": True, "engine": "local-espeak-ng-or-provider-fallback", "audio_bytes": len(audio), } except Exception as exc: # noqa: BLE001 result["tts"] = {"ok": False, "exception": _sanitize(str(exc), settings)} result["ok"] = bool(result["chat"].get("ok")) and bool(result["tts"].get("ok")) SELFTEST_CACHE["timestamp"] = now SELFTEST_CACHE["payload"] = result return result @app.get("/") def root() -> Dict[str, Any]: return landing_payload("/") @app.get("/health") def health() -> Dict[str, Any]: return health_payload() @app.get("/ready") def ready() -> Dict[str, Any]: return health_payload() @app.get("/readyz") def readyz() -> Dict[str, Any]: return health_payload() @app.get("/status") def status() -> Dict[str, Any]: return { "ok": True, "state": STATE, "callers_known": len(memory.all_callers()), "diagnostics": diagnostics_payload(), } @app.get("/metrics") def metrics() -> Dict[str, Any]: return { "ok": True, "calls_started": STATE["calls_started"], "calls_ended": STATE["calls_ended"], "active_calls": STATE["active_calls"], "escalations": STATE["escalations"], "recent_events": memory.recent_events(limit=20), } @app.get("/diagnostics") def diagnostics() -> Dict[str, Any]: return diagnostics_payload() @app.get("/diagnostics/") def diagnostics_slash() -> Dict[str, Any]: return diagnostics_payload() @app.get("/api/diagnostics") def api_diagnostics() -> Dict[str, Any]: return diagnostics_payload() @app.get("/api/health") def api_health() -> Dict[str, Any]: return health_payload() @app.get("/selftest/inference") async def selftest_inference() -> Dict[str, Any]: return await run_hf_selftest() @app.get("/{full_path:path}") def catch_all(full_path: str) -> Dict[str, Any]: return landing_payload("/" + full_path) def mark_worker_started() -> None: STATE["worker_started"] = True def set_last_error(error: str | None) -> None: STATE["last_error"] = error def record_call_start() -> None: STATE["calls_started"] += 1 STATE["active_calls"] += 1 def record_call_end() -> None: STATE["calls_ended"] += 1 STATE["active_calls"] = max(0, STATE["active_calls"] - 1) def record_escalation() -> None: STATE["escalations"] += 1 def start_health_server(host: str = "0.0.0.0", port: int = 7860) -> threading.Thread: """Start the HTTP health server in a daemon thread.""" def _run() -> None: logger.info("Starting health server on %s:%s", host, port) uvicorn.run(app, host=host, port=port, log_level="info") thread = threading.Thread(target=_run, name="lingo-health-server", daemon=True) thread.start() return thread