"""Async job queue for the TESSERA inference API. Runtime layout: user submit (web tier) v submit_job(...) -> insert row in jobs table -> queue Future on ThreadPoolExecutor (max_workers=1) v worker thread runs run_inference + pack_outputs (defined in app.py), then uploads the ZIP to storage and emails the user a download URL. Storage backend, picked by ``TESSERA_STORAGE``: 'local' (default) Local filesystem under ``TESSERA_LOCAL_RESULTS`` (default ``/tmp/tessera_results//result.zip``). For development without AWS. 'aws' S3 (``TESSERA_S3_BUCKET``), 24h pre-signed URL. Requires AWS credentials in the environment. Email backend is independent of storage. If ``GMAIL_USER`` and ``GMAIL_APP_PASSWORD`` are set we send via Gmail SMTP (``smtp.gmail.com:465``); the App Password is the 16-char value generated at https://myaccount.google.com/apppasswords on a 2-Step-Verification-enabled Google account. Sending from a real Gmail account through Google's own SMTP keeps DKIM/SPF/DMARC alignment intact, which is what kept us out of the spam folder when SES tried to forge a ``@gmail.com`` From header. If the credentials are unset we log to stdout (useful in local dev). A SQLite job-state file (``TESSERA_JOBS_DB``, default ``/tmp/tessera_jobs.db``) holds the ``{id, email, status, n_samples, created_at, finished_at, result_url, error}`` rows. At process startup any job left in 'running' state from a prior crash / Space sleep is marked failed and (best-effort) emailed. """ from __future__ import annotations import os import smtplib import sqlite3 import threading import time import traceback import uuid from concurrent.futures import ThreadPoolExecutor from email.message import EmailMessage from pathlib import Path # ---------------------------------------------------------------------------- # Configuration: load .env.local if present, then read env vars. # ---------------------------------------------------------------------------- def _load_dotenv() -> None: here = Path(__file__).resolve().parent env_file = here / ".env.local" if not env_file.exists(): return for raw in env_file.read_text().splitlines(): line = raw.strip() if not line or line.startswith("#"): continue if "=" not in line: continue k, v = line.split("=", 1) os.environ.setdefault(k.strip(), v.strip()) _load_dotenv() STORAGE_BACKEND = os.environ.get("TESSERA_STORAGE", "local") S3_BUCKET = os.environ.get("TESSERA_S3_BUCKET") GMAIL_USER = os.environ.get("GMAIL_USER", "").strip() GMAIL_APP_PASSWORD = os.environ.get("GMAIL_APP_PASSWORD", "").replace(" ", "").strip() SMTP_HOST = os.environ.get("SMTP_HOST", "smtp.gmail.com") SMTP_PORT = int(os.environ.get("SMTP_PORT", "465")) DB_PATH = Path(os.environ.get("TESSERA_JOBS_DB", "/tmp/tessera_jobs.db")) LOCAL_RESULTS_DIR = Path(os.environ.get("TESSERA_LOCAL_RESULTS", "/tmp/tessera_results")) RESULT_TTL = 24 * 3600 MAX_WORKERS = 1 RESULT_TTL_HOURS = RESULT_TTL // 3600 EMAIL_ENABLED = bool(GMAIL_USER and GMAIL_APP_PASSWORD) if STORAGE_BACKEND == "aws": if not S3_BUCKET: raise RuntimeError("TESSERA_STORAGE=aws but TESSERA_S3_BUCKET is unset") import boto3 # noqa: E402 _s3 = boto3.client("s3") elif STORAGE_BACKEND == "local": LOCAL_RESULTS_DIR.mkdir(parents=True, exist_ok=True) else: raise RuntimeError(f"TESSERA_STORAGE must be 'local' or 'aws', got {STORAGE_BACKEND!r}") _executor = ThreadPoolExecutor(max_workers=MAX_WORKERS) _print_lock = threading.Lock() def _log(msg: str) -> None: with _print_lock: print(f"[jobs] {msg}", flush=True) _log( f"backend={STORAGE_BACKEND} db={DB_PATH} bucket={S3_BUCKET or '-'} " f"email={'gmail-smtp' if EMAIL_ENABLED else 'stdout'} " f"from={GMAIL_USER or '-'}" ) # ---------------------------------------------------------------------------- # Tiny SQLite for job state # ---------------------------------------------------------------------------- def _db(): DB_PATH.parent.mkdir(parents=True, exist_ok=True) con = sqlite3.connect(str(DB_PATH)) con.execute("""CREATE TABLE IF NOT EXISTS jobs ( id TEXT PRIMARY KEY, email TEXT, status TEXT, n_samples INTEGER, created_at INTEGER, finished_at INTEGER, result_url TEXT, error TEXT )""") return con def _set(job_id: str, **kw) -> None: fields = ", ".join(f"{k}=?" for k in kw) with _db() as c: c.execute(f"UPDATE jobs SET {fields} WHERE id=?", (*kw.values(), job_id)) def get_job(job_id: str) -> dict | None: keys = ("id", "email", "status", "n_samples", "created_at", "finished_at", "result_url", "error") with _db() as c: row = c.execute( f"SELECT {', '.join(keys)} FROM jobs WHERE id=?", (job_id,) ).fetchone() return dict(zip(keys, row)) if row else None # ---------------------------------------------------------------------------- # Storage backends # ---------------------------------------------------------------------------- def _upload_result(job_id: str, local_zip: str) -> str: """Move the ZIP to durable storage and return a download URL valid ~24h.""" if STORAGE_BACKEND == "aws": s3_key = f"{job_id}/result.zip" _s3.upload_file( local_zip, S3_BUCKET, s3_key, ExtraArgs={"ContentType": "application/zip"}, ) return _s3.generate_presigned_url( "get_object", Params={"Bucket": S3_BUCKET, "Key": s3_key}, ExpiresIn=RESULT_TTL, ) else: dst_dir = LOCAL_RESULTS_DIR / job_id dst_dir.mkdir(parents=True, exist_ok=True) dst = dst_dir / "result.zip" Path(local_zip).rename(dst) return f"file://{dst}" # ---------------------------------------------------------------------------- # Email # ---------------------------------------------------------------------------- def _success_html(job_id: str, url: str, n_samples: int, model_variant: str) -> str: return ( "" "

Your TESSERA inference job is ready.

" "" f"" f"" f"" "
Job ID{job_id}
Samples{n_samples}
Model{model_variant}
" f'

Download results (ZIP)
' f"Link expires in {RESULT_TTL_HOURS} hours.

" "" ) def _success_text(job_id: str, url: str, n_samples: int, model_variant: str) -> str: return ( "Your TESSERA inference job is ready.\n\n" f"Job ID: {job_id}\n" f"Samples: {n_samples}\n" f"Model: {model_variant}\n\n" f"Download (valid {RESULT_TTL_HOURS} hours):\n{url}\n" ) def _send_email(to: str, subject: str, text: str, html: str | None = None) -> None: """Send via Gmail SMTP if creds are present, else log to stdout.""" if not EMAIL_ENABLED: _log("--- EMAIL (stdout, no SMTP creds) ---") _log(f"To: {to}") _log(f"Subject: {subject}") _log(text) _log("--- END EMAIL ---") return msg = EmailMessage() msg["From"] = GMAIL_USER msg["To"] = to msg["Subject"] = subject msg.set_content(text) if html is not None: msg.add_alternative(html, subtype="html") with smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT) as smtp: smtp.login(GMAIL_USER, GMAIL_APP_PASSWORD) smtp.send_message(msg) def _send_success(to: str, job_id: str, url: str, n_samples: int, model_variant: str) -> None: subject = f"TESSERA inference ready ({n_samples} sample{'s' if n_samples != 1 else ''})" _send_email( to, subject, text=_success_text(job_id, url, n_samples, model_variant), html=_success_html(job_id, url, n_samples, model_variant), ) def _send_failure(to: str, job_id: str, error: str) -> None: subject = f"TESSERA inference failed ({job_id[:8]})" body = f"Your TESSERA inference job {job_id} failed:\n\n{error}\n" _send_email(to, subject, text=body) # ---------------------------------------------------------------------------- # Worker # ---------------------------------------------------------------------------- def _run(job_id, snv_df, cna_df, apply_qn, email): _set(job_id, status="running") _log(f"job {job_id}: running") try: # Lazy import: avoids loading TF in jobs.py if app.py imports jobs first from app import run_inference, pack_outputs # type: ignore result = run_inference(snv_df, cna_df, apply_qn=apply_qn) zip_path = pack_outputs(result) url = _upload_result(job_id, zip_path) _send_success(email, job_id, url, result["n_samples"], result["model_variant"]) _set(job_id, status="done", finished_at=int(time.time()), result_url=url) _log(f"job {job_id}: done -> {email}") except Exception as e: tb = traceback.format_exc() _log(f"job {job_id}: failed: {e}\n{tb}") _set(job_id, status="failed", finished_at=int(time.time()), error=str(e)) try: _send_failure(email, job_id, str(e)) except Exception as e2: _log(f"job {job_id}: failure-email also failed: {e2}") # ---------------------------------------------------------------------------- # Public API # ---------------------------------------------------------------------------- def submit_job(snv_df, cna_df, apply_qn: bool, email: str, n_samples: int) -> str: job_id = str(uuid.uuid4()) with _db() as c: c.execute( "INSERT INTO jobs (id, email, status, n_samples, created_at) " "VALUES (?, ?, 'queued', ?, ?)", (job_id, email, n_samples, int(time.time())), ) _executor.submit(_run, job_id, snv_df, cna_df, apply_qn, email) _log(f"job {job_id}: queued ({n_samples} samples) for {email}") return job_id # ---------------------------------------------------------------------------- # Startup: fail any 'running' jobs left over from a prior crash / sleep. # ---------------------------------------------------------------------------- def _resurrect_running_jobs() -> None: cutoff = int(time.time()) - 60 with _db() as c: rows = c.execute( "SELECT id, email FROM jobs " "WHERE status='running' AND created_at < ?", (cutoff,), ).fetchall() for jid, email in rows: _set(jid, status="failed", finished_at=int(time.time()), error="process restart killed worker") _log(f"orphan job {jid}: marked failed (restart)") if email: try: _send_failure(email, jid, "process restart killed the worker") except Exception: pass _resurrect_running_jobs()