tessera / jobs.py
sidhomj's picture
Add jobs.py
c861002 verified
"""Async job queue for the TESSERA inference API.
Runtime layout:
user submit (web tier)
v
submit_job(...) -> insert row in jobs table -> queue Future on
ThreadPoolExecutor
(max_workers=1)
v
worker thread runs run_inference + pack_outputs (defined in app.py),
then uploads the ZIP to storage and emails the user a download URL.
Storage backend, picked by ``TESSERA_STORAGE``:
'local' (default)
Local filesystem under ``TESSERA_LOCAL_RESULTS``
(default ``/tmp/tessera_results/<job_id>/result.zip``).
For development without AWS.
'aws'
S3 (``TESSERA_S3_BUCKET``), 24h pre-signed URL. Requires AWS
credentials in the environment.
Email backend is independent of storage. If ``GMAIL_USER`` and
``GMAIL_APP_PASSWORD`` are set we send via Gmail SMTP
(``smtp.gmail.com:465``); the App Password is the 16-char value
generated at https://myaccount.google.com/apppasswords on a
2-Step-Verification-enabled Google account. Sending from a real Gmail
account through Google's own SMTP keeps DKIM/SPF/DMARC alignment
intact, which is what kept us out of the spam folder when SES tried
to forge a ``@gmail.com`` From header. If the credentials are unset we
log to stdout (useful in local dev).
A SQLite job-state file (``TESSERA_JOBS_DB``, default
``/tmp/tessera_jobs.db``) holds the ``{id, email, status, n_samples,
created_at, finished_at, result_url, error}`` rows. At process startup
any job left in 'running' state from a prior crash / Space sleep is
marked failed and (best-effort) emailed.
"""
from __future__ import annotations
import os
import smtplib
import sqlite3
import threading
import time
import traceback
import uuid
from concurrent.futures import ThreadPoolExecutor
from email.message import EmailMessage
from pathlib import Path
# ----------------------------------------------------------------------------
# Configuration: load .env.local if present, then read env vars.
# ----------------------------------------------------------------------------
def _load_dotenv() -> None:
here = Path(__file__).resolve().parent
env_file = here / ".env.local"
if not env_file.exists():
return
for raw in env_file.read_text().splitlines():
line = raw.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
k, v = line.split("=", 1)
os.environ.setdefault(k.strip(), v.strip())
_load_dotenv()
STORAGE_BACKEND = os.environ.get("TESSERA_STORAGE", "local")
S3_BUCKET = os.environ.get("TESSERA_S3_BUCKET")
GMAIL_USER = os.environ.get("GMAIL_USER", "").strip()
GMAIL_APP_PASSWORD = os.environ.get("GMAIL_APP_PASSWORD", "").replace(" ", "").strip()
SMTP_HOST = os.environ.get("SMTP_HOST", "smtp.gmail.com")
SMTP_PORT = int(os.environ.get("SMTP_PORT", "465"))
DB_PATH = Path(os.environ.get("TESSERA_JOBS_DB", "/tmp/tessera_jobs.db"))
LOCAL_RESULTS_DIR = Path(os.environ.get("TESSERA_LOCAL_RESULTS", "/tmp/tessera_results"))
RESULT_TTL = 24 * 3600
MAX_WORKERS = 1
RESULT_TTL_HOURS = RESULT_TTL // 3600
EMAIL_ENABLED = bool(GMAIL_USER and GMAIL_APP_PASSWORD)
if STORAGE_BACKEND == "aws":
if not S3_BUCKET:
raise RuntimeError("TESSERA_STORAGE=aws but TESSERA_S3_BUCKET is unset")
import boto3 # noqa: E402
_s3 = boto3.client("s3")
elif STORAGE_BACKEND == "local":
LOCAL_RESULTS_DIR.mkdir(parents=True, exist_ok=True)
else:
raise RuntimeError(f"TESSERA_STORAGE must be 'local' or 'aws', got {STORAGE_BACKEND!r}")
_executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
_print_lock = threading.Lock()
def _log(msg: str) -> None:
with _print_lock:
print(f"[jobs] {msg}", flush=True)
_log(
f"backend={STORAGE_BACKEND} db={DB_PATH} bucket={S3_BUCKET or '-'} "
f"email={'gmail-smtp' if EMAIL_ENABLED else 'stdout'} "
f"from={GMAIL_USER or '-'}"
)
# ----------------------------------------------------------------------------
# Tiny SQLite for job state
# ----------------------------------------------------------------------------
def _db():
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
con = sqlite3.connect(str(DB_PATH))
con.execute("""CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY,
email TEXT,
status TEXT,
n_samples INTEGER,
created_at INTEGER,
finished_at INTEGER,
result_url TEXT,
error TEXT
)""")
return con
def _set(job_id: str, **kw) -> None:
fields = ", ".join(f"{k}=?" for k in kw)
with _db() as c:
c.execute(f"UPDATE jobs SET {fields} WHERE id=?", (*kw.values(), job_id))
def get_job(job_id: str) -> dict | None:
keys = ("id", "email", "status", "n_samples", "created_at",
"finished_at", "result_url", "error")
with _db() as c:
row = c.execute(
f"SELECT {', '.join(keys)} FROM jobs WHERE id=?", (job_id,)
).fetchone()
return dict(zip(keys, row)) if row else None
# ----------------------------------------------------------------------------
# Storage backends
# ----------------------------------------------------------------------------
def _upload_result(job_id: str, local_zip: str) -> str:
"""Move the ZIP to durable storage and return a download URL valid ~24h."""
if STORAGE_BACKEND == "aws":
s3_key = f"{job_id}/result.zip"
_s3.upload_file(
local_zip, S3_BUCKET, s3_key,
ExtraArgs={"ContentType": "application/zip"},
)
return _s3.generate_presigned_url(
"get_object",
Params={"Bucket": S3_BUCKET, "Key": s3_key},
ExpiresIn=RESULT_TTL,
)
else:
dst_dir = LOCAL_RESULTS_DIR / job_id
dst_dir.mkdir(parents=True, exist_ok=True)
dst = dst_dir / "result.zip"
Path(local_zip).rename(dst)
return f"file://{dst}"
# ----------------------------------------------------------------------------
# Email
# ----------------------------------------------------------------------------
def _success_html(job_id: str, url: str, n_samples: int, model_variant: str) -> str:
return (
"<html><body style='font-family: sans-serif;'>"
"<p>Your TESSERA inference job is ready.</p>"
"<table style='border-collapse: collapse;'>"
f"<tr><td><b>Job ID</b></td><td><code>{job_id}</code></td></tr>"
f"<tr><td><b>Samples</b></td><td>{n_samples}</td></tr>"
f"<tr><td><b>Model</b></td><td><code>{model_variant}</code></td></tr>"
"</table>"
f'<p><a href="{url}"><b>Download results (ZIP)</b></a><br>'
f"Link expires in {RESULT_TTL_HOURS} hours.</p>"
"</body></html>"
)
def _success_text(job_id: str, url: str, n_samples: int, model_variant: str) -> str:
return (
"Your TESSERA inference job is ready.\n\n"
f"Job ID: {job_id}\n"
f"Samples: {n_samples}\n"
f"Model: {model_variant}\n\n"
f"Download (valid {RESULT_TTL_HOURS} hours):\n{url}\n"
)
def _send_email(to: str, subject: str,
text: str, html: str | None = None) -> None:
"""Send via Gmail SMTP if creds are present, else log to stdout."""
if not EMAIL_ENABLED:
_log("--- EMAIL (stdout, no SMTP creds) ---")
_log(f"To: {to}")
_log(f"Subject: {subject}")
_log(text)
_log("--- END EMAIL ---")
return
msg = EmailMessage()
msg["From"] = GMAIL_USER
msg["To"] = to
msg["Subject"] = subject
msg.set_content(text)
if html is not None:
msg.add_alternative(html, subtype="html")
with smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT) as smtp:
smtp.login(GMAIL_USER, GMAIL_APP_PASSWORD)
smtp.send_message(msg)
def _send_success(to: str, job_id: str, url: str,
n_samples: int, model_variant: str) -> None:
subject = f"TESSERA inference ready ({n_samples} sample{'s' if n_samples != 1 else ''})"
_send_email(
to,
subject,
text=_success_text(job_id, url, n_samples, model_variant),
html=_success_html(job_id, url, n_samples, model_variant),
)
def _send_failure(to: str, job_id: str, error: str) -> None:
subject = f"TESSERA inference failed ({job_id[:8]})"
body = f"Your TESSERA inference job {job_id} failed:\n\n{error}\n"
_send_email(to, subject, text=body)
# ----------------------------------------------------------------------------
# Worker
# ----------------------------------------------------------------------------
def _run(job_id, snv_df, cna_df, apply_qn, email):
_set(job_id, status="running")
_log(f"job {job_id}: running")
try:
# Lazy import: avoids loading TF in jobs.py if app.py imports jobs first
from app import run_inference, pack_outputs # type: ignore
result = run_inference(snv_df, cna_df, apply_qn=apply_qn)
zip_path = pack_outputs(result)
url = _upload_result(job_id, zip_path)
_send_success(email, job_id, url,
result["n_samples"], result["model_variant"])
_set(job_id, status="done", finished_at=int(time.time()),
result_url=url)
_log(f"job {job_id}: done -> {email}")
except Exception as e:
tb = traceback.format_exc()
_log(f"job {job_id}: failed: {e}\n{tb}")
_set(job_id, status="failed", finished_at=int(time.time()),
error=str(e))
try:
_send_failure(email, job_id, str(e))
except Exception as e2:
_log(f"job {job_id}: failure-email also failed: {e2}")
# ----------------------------------------------------------------------------
# Public API
# ----------------------------------------------------------------------------
def submit_job(snv_df, cna_df, apply_qn: bool, email: str, n_samples: int) -> str:
job_id = str(uuid.uuid4())
with _db() as c:
c.execute(
"INSERT INTO jobs (id, email, status, n_samples, created_at) "
"VALUES (?, ?, 'queued', ?, ?)",
(job_id, email, n_samples, int(time.time())),
)
_executor.submit(_run, job_id, snv_df, cna_df, apply_qn, email)
_log(f"job {job_id}: queued ({n_samples} samples) for {email}")
return job_id
# ----------------------------------------------------------------------------
# Startup: fail any 'running' jobs left over from a prior crash / sleep.
# ----------------------------------------------------------------------------
def _resurrect_running_jobs() -> None:
cutoff = int(time.time()) - 60
with _db() as c:
rows = c.execute(
"SELECT id, email FROM jobs "
"WHERE status='running' AND created_at < ?", (cutoff,),
).fetchall()
for jid, email in rows:
_set(jid, status="failed",
finished_at=int(time.time()),
error="process restart killed worker")
_log(f"orphan job {jid}: marked failed (restart)")
if email:
try:
_send_failure(email, jid, "process restart killed the worker")
except Exception:
pass
_resurrect_running_jobs()