Spaces:
Runtime error
Runtime error
| """Async job queue for the TESSERA inference API. | |
| Runtime layout: | |
| user submit (web tier) | |
| v | |
| submit_job(...) -> insert row in jobs table -> queue Future on | |
| ThreadPoolExecutor | |
| (max_workers=1) | |
| v | |
| worker thread runs run_inference + pack_outputs (defined in app.py), | |
| then uploads the ZIP to storage and emails the user a download URL. | |
| Storage backend, picked by ``TESSERA_STORAGE``: | |
| 'local' (default) | |
| Local filesystem under ``TESSERA_LOCAL_RESULTS`` | |
| (default ``/tmp/tessera_results/<job_id>/result.zip``). | |
| For development without AWS. | |
| 'aws' | |
| S3 (``TESSERA_S3_BUCKET``), 24h pre-signed URL. Requires AWS | |
| credentials in the environment. | |
| Email backend is independent of storage. If ``GMAIL_USER`` and | |
| ``GMAIL_APP_PASSWORD`` are set we send via Gmail SMTP | |
| (``smtp.gmail.com:465``); the App Password is the 16-char value | |
| generated at https://myaccount.google.com/apppasswords on a | |
| 2-Step-Verification-enabled Google account. Sending from a real Gmail | |
| account through Google's own SMTP keeps DKIM/SPF/DMARC alignment | |
| intact, which is what kept us out of the spam folder when SES tried | |
| to forge a ``@gmail.com`` From header. If the credentials are unset we | |
| log to stdout (useful in local dev). | |
| A SQLite job-state file (``TESSERA_JOBS_DB``, default | |
| ``/tmp/tessera_jobs.db``) holds the ``{id, email, status, n_samples, | |
| created_at, finished_at, result_url, error}`` rows. At process startup | |
| any job left in 'running' state from a prior crash / Space sleep is | |
| marked failed and (best-effort) emailed. | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import smtplib | |
| import sqlite3 | |
| import threading | |
| import time | |
| import traceback | |
| import uuid | |
| from concurrent.futures import ThreadPoolExecutor | |
| from email.message import EmailMessage | |
| from pathlib import Path | |
| # ---------------------------------------------------------------------------- | |
| # Configuration: load .env.local if present, then read env vars. | |
| # ---------------------------------------------------------------------------- | |
| def _load_dotenv() -> None: | |
| here = Path(__file__).resolve().parent | |
| env_file = here / ".env.local" | |
| if not env_file.exists(): | |
| return | |
| for raw in env_file.read_text().splitlines(): | |
| line = raw.strip() | |
| if not line or line.startswith("#"): | |
| continue | |
| if "=" not in line: | |
| continue | |
| k, v = line.split("=", 1) | |
| os.environ.setdefault(k.strip(), v.strip()) | |
| _load_dotenv() | |
| STORAGE_BACKEND = os.environ.get("TESSERA_STORAGE", "local") | |
| S3_BUCKET = os.environ.get("TESSERA_S3_BUCKET") | |
| GMAIL_USER = os.environ.get("GMAIL_USER", "").strip() | |
| GMAIL_APP_PASSWORD = os.environ.get("GMAIL_APP_PASSWORD", "").replace(" ", "").strip() | |
| SMTP_HOST = os.environ.get("SMTP_HOST", "smtp.gmail.com") | |
| SMTP_PORT = int(os.environ.get("SMTP_PORT", "465")) | |
| DB_PATH = Path(os.environ.get("TESSERA_JOBS_DB", "/tmp/tessera_jobs.db")) | |
| LOCAL_RESULTS_DIR = Path(os.environ.get("TESSERA_LOCAL_RESULTS", "/tmp/tessera_results")) | |
| RESULT_TTL = 24 * 3600 | |
| MAX_WORKERS = 1 | |
| RESULT_TTL_HOURS = RESULT_TTL // 3600 | |
| EMAIL_ENABLED = bool(GMAIL_USER and GMAIL_APP_PASSWORD) | |
| if STORAGE_BACKEND == "aws": | |
| if not S3_BUCKET: | |
| raise RuntimeError("TESSERA_STORAGE=aws but TESSERA_S3_BUCKET is unset") | |
| import boto3 # noqa: E402 | |
| _s3 = boto3.client("s3") | |
| elif STORAGE_BACKEND == "local": | |
| LOCAL_RESULTS_DIR.mkdir(parents=True, exist_ok=True) | |
| else: | |
| raise RuntimeError(f"TESSERA_STORAGE must be 'local' or 'aws', got {STORAGE_BACKEND!r}") | |
| _executor = ThreadPoolExecutor(max_workers=MAX_WORKERS) | |
| _print_lock = threading.Lock() | |
| def _log(msg: str) -> None: | |
| with _print_lock: | |
| print(f"[jobs] {msg}", flush=True) | |
| _log( | |
| f"backend={STORAGE_BACKEND} db={DB_PATH} bucket={S3_BUCKET or '-'} " | |
| f"email={'gmail-smtp' if EMAIL_ENABLED else 'stdout'} " | |
| f"from={GMAIL_USER or '-'}" | |
| ) | |
| # ---------------------------------------------------------------------------- | |
| # Tiny SQLite for job state | |
| # ---------------------------------------------------------------------------- | |
| def _db(): | |
| DB_PATH.parent.mkdir(parents=True, exist_ok=True) | |
| con = sqlite3.connect(str(DB_PATH)) | |
| con.execute("""CREATE TABLE IF NOT EXISTS jobs ( | |
| id TEXT PRIMARY KEY, | |
| email TEXT, | |
| status TEXT, | |
| n_samples INTEGER, | |
| created_at INTEGER, | |
| finished_at INTEGER, | |
| result_url TEXT, | |
| error TEXT | |
| )""") | |
| return con | |
| def _set(job_id: str, **kw) -> None: | |
| fields = ", ".join(f"{k}=?" for k in kw) | |
| with _db() as c: | |
| c.execute(f"UPDATE jobs SET {fields} WHERE id=?", (*kw.values(), job_id)) | |
| def get_job(job_id: str) -> dict | None: | |
| keys = ("id", "email", "status", "n_samples", "created_at", | |
| "finished_at", "result_url", "error") | |
| with _db() as c: | |
| row = c.execute( | |
| f"SELECT {', '.join(keys)} FROM jobs WHERE id=?", (job_id,) | |
| ).fetchone() | |
| return dict(zip(keys, row)) if row else None | |
| # ---------------------------------------------------------------------------- | |
| # Storage backends | |
| # ---------------------------------------------------------------------------- | |
| def _upload_result(job_id: str, local_zip: str) -> str: | |
| """Move the ZIP to durable storage and return a download URL valid ~24h.""" | |
| if STORAGE_BACKEND == "aws": | |
| s3_key = f"{job_id}/result.zip" | |
| _s3.upload_file( | |
| local_zip, S3_BUCKET, s3_key, | |
| ExtraArgs={"ContentType": "application/zip"}, | |
| ) | |
| return _s3.generate_presigned_url( | |
| "get_object", | |
| Params={"Bucket": S3_BUCKET, "Key": s3_key}, | |
| ExpiresIn=RESULT_TTL, | |
| ) | |
| else: | |
| dst_dir = LOCAL_RESULTS_DIR / job_id | |
| dst_dir.mkdir(parents=True, exist_ok=True) | |
| dst = dst_dir / "result.zip" | |
| Path(local_zip).rename(dst) | |
| return f"file://{dst}" | |
| # ---------------------------------------------------------------------------- | |
| # ---------------------------------------------------------------------------- | |
| def _success_html(job_id: str, url: str, n_samples: int, model_variant: str) -> str: | |
| return ( | |
| "<html><body style='font-family: sans-serif;'>" | |
| "<p>Your TESSERA inference job is ready.</p>" | |
| "<table style='border-collapse: collapse;'>" | |
| f"<tr><td><b>Job ID</b></td><td><code>{job_id}</code></td></tr>" | |
| f"<tr><td><b>Samples</b></td><td>{n_samples}</td></tr>" | |
| f"<tr><td><b>Model</b></td><td><code>{model_variant}</code></td></tr>" | |
| "</table>" | |
| f'<p><a href="{url}"><b>Download results (ZIP)</b></a><br>' | |
| f"Link expires in {RESULT_TTL_HOURS} hours.</p>" | |
| "</body></html>" | |
| ) | |
| def _success_text(job_id: str, url: str, n_samples: int, model_variant: str) -> str: | |
| return ( | |
| "Your TESSERA inference job is ready.\n\n" | |
| f"Job ID: {job_id}\n" | |
| f"Samples: {n_samples}\n" | |
| f"Model: {model_variant}\n\n" | |
| f"Download (valid {RESULT_TTL_HOURS} hours):\n{url}\n" | |
| ) | |
| def _send_email(to: str, subject: str, | |
| text: str, html: str | None = None) -> None: | |
| """Send via Gmail SMTP if creds are present, else log to stdout.""" | |
| if not EMAIL_ENABLED: | |
| _log("--- EMAIL (stdout, no SMTP creds) ---") | |
| _log(f"To: {to}") | |
| _log(f"Subject: {subject}") | |
| _log(text) | |
| _log("--- END EMAIL ---") | |
| return | |
| msg = EmailMessage() | |
| msg["From"] = GMAIL_USER | |
| msg["To"] = to | |
| msg["Subject"] = subject | |
| msg.set_content(text) | |
| if html is not None: | |
| msg.add_alternative(html, subtype="html") | |
| with smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT) as smtp: | |
| smtp.login(GMAIL_USER, GMAIL_APP_PASSWORD) | |
| smtp.send_message(msg) | |
| def _send_success(to: str, job_id: str, url: str, | |
| n_samples: int, model_variant: str) -> None: | |
| subject = f"TESSERA inference ready ({n_samples} sample{'s' if n_samples != 1 else ''})" | |
| _send_email( | |
| to, | |
| subject, | |
| text=_success_text(job_id, url, n_samples, model_variant), | |
| html=_success_html(job_id, url, n_samples, model_variant), | |
| ) | |
| def _send_failure(to: str, job_id: str, error: str) -> None: | |
| subject = f"TESSERA inference failed ({job_id[:8]})" | |
| body = f"Your TESSERA inference job {job_id} failed:\n\n{error}\n" | |
| _send_email(to, subject, text=body) | |
| # ---------------------------------------------------------------------------- | |
| # Worker | |
| # ---------------------------------------------------------------------------- | |
| def _run(job_id, snv_df, cna_df, apply_qn, email): | |
| _set(job_id, status="running") | |
| _log(f"job {job_id}: running") | |
| try: | |
| # Lazy import: avoids loading TF in jobs.py if app.py imports jobs first | |
| from app import run_inference, pack_outputs # type: ignore | |
| result = run_inference(snv_df, cna_df, apply_qn=apply_qn) | |
| zip_path = pack_outputs(result) | |
| url = _upload_result(job_id, zip_path) | |
| _send_success(email, job_id, url, | |
| result["n_samples"], result["model_variant"]) | |
| _set(job_id, status="done", finished_at=int(time.time()), | |
| result_url=url) | |
| _log(f"job {job_id}: done -> {email}") | |
| except Exception as e: | |
| tb = traceback.format_exc() | |
| _log(f"job {job_id}: failed: {e}\n{tb}") | |
| _set(job_id, status="failed", finished_at=int(time.time()), | |
| error=str(e)) | |
| try: | |
| _send_failure(email, job_id, str(e)) | |
| except Exception as e2: | |
| _log(f"job {job_id}: failure-email also failed: {e2}") | |
| # ---------------------------------------------------------------------------- | |
| # Public API | |
| # ---------------------------------------------------------------------------- | |
| def submit_job(snv_df, cna_df, apply_qn: bool, email: str, n_samples: int) -> str: | |
| job_id = str(uuid.uuid4()) | |
| with _db() as c: | |
| c.execute( | |
| "INSERT INTO jobs (id, email, status, n_samples, created_at) " | |
| "VALUES (?, ?, 'queued', ?, ?)", | |
| (job_id, email, n_samples, int(time.time())), | |
| ) | |
| _executor.submit(_run, job_id, snv_df, cna_df, apply_qn, email) | |
| _log(f"job {job_id}: queued ({n_samples} samples) for {email}") | |
| return job_id | |
| # ---------------------------------------------------------------------------- | |
| # Startup: fail any 'running' jobs left over from a prior crash / sleep. | |
| # ---------------------------------------------------------------------------- | |
| def _resurrect_running_jobs() -> None: | |
| cutoff = int(time.time()) - 60 | |
| with _db() as c: | |
| rows = c.execute( | |
| "SELECT id, email FROM jobs " | |
| "WHERE status='running' AND created_at < ?", (cutoff,), | |
| ).fetchall() | |
| for jid, email in rows: | |
| _set(jid, status="failed", | |
| finished_at=int(time.time()), | |
| error="process restart killed worker") | |
| _log(f"orphan job {jid}: marked failed (restart)") | |
| if email: | |
| try: | |
| _send_failure(email, jid, "process restart killed the worker") | |
| except Exception: | |
| pass | |
| _resurrect_running_jobs() | |