Spaces:
Runtime error
Runtime error
File size: 11,484 Bytes
c861002 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 | """Async job queue for the TESSERA inference API.
Runtime layout:
user submit (web tier)
v
submit_job(...) -> insert row in jobs table -> queue Future on
ThreadPoolExecutor
(max_workers=1)
v
worker thread runs run_inference + pack_outputs (defined in app.py),
then uploads the ZIP to storage and emails the user a download URL.
Storage backend, picked by ``TESSERA_STORAGE``:
'local' (default)
Local filesystem under ``TESSERA_LOCAL_RESULTS``
(default ``/tmp/tessera_results/<job_id>/result.zip``).
For development without AWS.
'aws'
S3 (``TESSERA_S3_BUCKET``), 24h pre-signed URL. Requires AWS
credentials in the environment.
Email backend is independent of storage. If ``GMAIL_USER`` and
``GMAIL_APP_PASSWORD`` are set we send via Gmail SMTP
(``smtp.gmail.com:465``); the App Password is the 16-char value
generated at https://myaccount.google.com/apppasswords on a
2-Step-Verification-enabled Google account. Sending from a real Gmail
account through Google's own SMTP keeps DKIM/SPF/DMARC alignment
intact, which is what kept us out of the spam folder when SES tried
to forge a ``@gmail.com`` From header. If the credentials are unset we
log to stdout (useful in local dev).
A SQLite job-state file (``TESSERA_JOBS_DB``, default
``/tmp/tessera_jobs.db``) holds the ``{id, email, status, n_samples,
created_at, finished_at, result_url, error}`` rows. At process startup
any job left in 'running' state from a prior crash / Space sleep is
marked failed and (best-effort) emailed.
"""
from __future__ import annotations
import os
import smtplib
import sqlite3
import threading
import time
import traceback
import uuid
from concurrent.futures import ThreadPoolExecutor
from email.message import EmailMessage
from pathlib import Path
# ----------------------------------------------------------------------------
# Configuration: load .env.local if present, then read env vars.
# ----------------------------------------------------------------------------
def _load_dotenv() -> None:
here = Path(__file__).resolve().parent
env_file = here / ".env.local"
if not env_file.exists():
return
for raw in env_file.read_text().splitlines():
line = raw.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
k, v = line.split("=", 1)
os.environ.setdefault(k.strip(), v.strip())
_load_dotenv()
STORAGE_BACKEND = os.environ.get("TESSERA_STORAGE", "local")
S3_BUCKET = os.environ.get("TESSERA_S3_BUCKET")
GMAIL_USER = os.environ.get("GMAIL_USER", "").strip()
GMAIL_APP_PASSWORD = os.environ.get("GMAIL_APP_PASSWORD", "").replace(" ", "").strip()
SMTP_HOST = os.environ.get("SMTP_HOST", "smtp.gmail.com")
SMTP_PORT = int(os.environ.get("SMTP_PORT", "465"))
DB_PATH = Path(os.environ.get("TESSERA_JOBS_DB", "/tmp/tessera_jobs.db"))
LOCAL_RESULTS_DIR = Path(os.environ.get("TESSERA_LOCAL_RESULTS", "/tmp/tessera_results"))
RESULT_TTL = 24 * 3600
MAX_WORKERS = 1
RESULT_TTL_HOURS = RESULT_TTL // 3600
EMAIL_ENABLED = bool(GMAIL_USER and GMAIL_APP_PASSWORD)
if STORAGE_BACKEND == "aws":
if not S3_BUCKET:
raise RuntimeError("TESSERA_STORAGE=aws but TESSERA_S3_BUCKET is unset")
import boto3 # noqa: E402
_s3 = boto3.client("s3")
elif STORAGE_BACKEND == "local":
LOCAL_RESULTS_DIR.mkdir(parents=True, exist_ok=True)
else:
raise RuntimeError(f"TESSERA_STORAGE must be 'local' or 'aws', got {STORAGE_BACKEND!r}")
_executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
_print_lock = threading.Lock()
def _log(msg: str) -> None:
with _print_lock:
print(f"[jobs] {msg}", flush=True)
_log(
f"backend={STORAGE_BACKEND} db={DB_PATH} bucket={S3_BUCKET or '-'} "
f"email={'gmail-smtp' if EMAIL_ENABLED else 'stdout'} "
f"from={GMAIL_USER or '-'}"
)
# ----------------------------------------------------------------------------
# Tiny SQLite for job state
# ----------------------------------------------------------------------------
def _db():
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
con = sqlite3.connect(str(DB_PATH))
con.execute("""CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY,
email TEXT,
status TEXT,
n_samples INTEGER,
created_at INTEGER,
finished_at INTEGER,
result_url TEXT,
error TEXT
)""")
return con
def _set(job_id: str, **kw) -> None:
fields = ", ".join(f"{k}=?" for k in kw)
with _db() as c:
c.execute(f"UPDATE jobs SET {fields} WHERE id=?", (*kw.values(), job_id))
def get_job(job_id: str) -> dict | None:
keys = ("id", "email", "status", "n_samples", "created_at",
"finished_at", "result_url", "error")
with _db() as c:
row = c.execute(
f"SELECT {', '.join(keys)} FROM jobs WHERE id=?", (job_id,)
).fetchone()
return dict(zip(keys, row)) if row else None
# ----------------------------------------------------------------------------
# Storage backends
# ----------------------------------------------------------------------------
def _upload_result(job_id: str, local_zip: str) -> str:
"""Move the ZIP to durable storage and return a download URL valid ~24h."""
if STORAGE_BACKEND == "aws":
s3_key = f"{job_id}/result.zip"
_s3.upload_file(
local_zip, S3_BUCKET, s3_key,
ExtraArgs={"ContentType": "application/zip"},
)
return _s3.generate_presigned_url(
"get_object",
Params={"Bucket": S3_BUCKET, "Key": s3_key},
ExpiresIn=RESULT_TTL,
)
else:
dst_dir = LOCAL_RESULTS_DIR / job_id
dst_dir.mkdir(parents=True, exist_ok=True)
dst = dst_dir / "result.zip"
Path(local_zip).rename(dst)
return f"file://{dst}"
# ----------------------------------------------------------------------------
# Email
# ----------------------------------------------------------------------------
def _success_html(job_id: str, url: str, n_samples: int, model_variant: str) -> str:
return (
"<html><body style='font-family: sans-serif;'>"
"<p>Your TESSERA inference job is ready.</p>"
"<table style='border-collapse: collapse;'>"
f"<tr><td><b>Job ID</b></td><td><code>{job_id}</code></td></tr>"
f"<tr><td><b>Samples</b></td><td>{n_samples}</td></tr>"
f"<tr><td><b>Model</b></td><td><code>{model_variant}</code></td></tr>"
"</table>"
f'<p><a href="{url}"><b>Download results (ZIP)</b></a><br>'
f"Link expires in {RESULT_TTL_HOURS} hours.</p>"
"</body></html>"
)
def _success_text(job_id: str, url: str, n_samples: int, model_variant: str) -> str:
return (
"Your TESSERA inference job is ready.\n\n"
f"Job ID: {job_id}\n"
f"Samples: {n_samples}\n"
f"Model: {model_variant}\n\n"
f"Download (valid {RESULT_TTL_HOURS} hours):\n{url}\n"
)
def _send_email(to: str, subject: str,
text: str, html: str | None = None) -> None:
"""Send via Gmail SMTP if creds are present, else log to stdout."""
if not EMAIL_ENABLED:
_log("--- EMAIL (stdout, no SMTP creds) ---")
_log(f"To: {to}")
_log(f"Subject: {subject}")
_log(text)
_log("--- END EMAIL ---")
return
msg = EmailMessage()
msg["From"] = GMAIL_USER
msg["To"] = to
msg["Subject"] = subject
msg.set_content(text)
if html is not None:
msg.add_alternative(html, subtype="html")
with smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT) as smtp:
smtp.login(GMAIL_USER, GMAIL_APP_PASSWORD)
smtp.send_message(msg)
def _send_success(to: str, job_id: str, url: str,
n_samples: int, model_variant: str) -> None:
subject = f"TESSERA inference ready ({n_samples} sample{'s' if n_samples != 1 else ''})"
_send_email(
to,
subject,
text=_success_text(job_id, url, n_samples, model_variant),
html=_success_html(job_id, url, n_samples, model_variant),
)
def _send_failure(to: str, job_id: str, error: str) -> None:
subject = f"TESSERA inference failed ({job_id[:8]})"
body = f"Your TESSERA inference job {job_id} failed:\n\n{error}\n"
_send_email(to, subject, text=body)
# ----------------------------------------------------------------------------
# Worker
# ----------------------------------------------------------------------------
def _run(job_id, snv_df, cna_df, apply_qn, email):
_set(job_id, status="running")
_log(f"job {job_id}: running")
try:
# Lazy import: avoids loading TF in jobs.py if app.py imports jobs first
from app import run_inference, pack_outputs # type: ignore
result = run_inference(snv_df, cna_df, apply_qn=apply_qn)
zip_path = pack_outputs(result)
url = _upload_result(job_id, zip_path)
_send_success(email, job_id, url,
result["n_samples"], result["model_variant"])
_set(job_id, status="done", finished_at=int(time.time()),
result_url=url)
_log(f"job {job_id}: done -> {email}")
except Exception as e:
tb = traceback.format_exc()
_log(f"job {job_id}: failed: {e}\n{tb}")
_set(job_id, status="failed", finished_at=int(time.time()),
error=str(e))
try:
_send_failure(email, job_id, str(e))
except Exception as e2:
_log(f"job {job_id}: failure-email also failed: {e2}")
# ----------------------------------------------------------------------------
# Public API
# ----------------------------------------------------------------------------
def submit_job(snv_df, cna_df, apply_qn: bool, email: str, n_samples: int) -> str:
job_id = str(uuid.uuid4())
with _db() as c:
c.execute(
"INSERT INTO jobs (id, email, status, n_samples, created_at) "
"VALUES (?, ?, 'queued', ?, ?)",
(job_id, email, n_samples, int(time.time())),
)
_executor.submit(_run, job_id, snv_df, cna_df, apply_qn, email)
_log(f"job {job_id}: queued ({n_samples} samples) for {email}")
return job_id
# ----------------------------------------------------------------------------
# Startup: fail any 'running' jobs left over from a prior crash / sleep.
# ----------------------------------------------------------------------------
def _resurrect_running_jobs() -> None:
cutoff = int(time.time()) - 60
with _db() as c:
rows = c.execute(
"SELECT id, email FROM jobs "
"WHERE status='running' AND created_at < ?", (cutoff,),
).fetchall()
for jid, email in rows:
_set(jid, status="failed",
finished_at=int(time.time()),
error="process restart killed worker")
_log(f"orphan job {jid}: marked failed (restart)")
if email:
try:
_send_failure(email, jid, "process restart killed the worker")
except Exception:
pass
_resurrect_running_jobs()
|