File size: 11,484 Bytes
c861002
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
"""Async job queue for the TESSERA inference API.

Runtime layout:

    user submit (web tier)
            v
    submit_job(...) -> insert row in jobs table -> queue Future on
                                                    ThreadPoolExecutor
                                                    (max_workers=1)
            v
    worker thread runs run_inference + pack_outputs (defined in app.py),
    then uploads the ZIP to storage and emails the user a download URL.

Storage backend, picked by ``TESSERA_STORAGE``:

    'local' (default)
        Local filesystem under ``TESSERA_LOCAL_RESULTS``
        (default ``/tmp/tessera_results/<job_id>/result.zip``).
        For development without AWS.

    'aws'
        S3 (``TESSERA_S3_BUCKET``), 24h pre-signed URL. Requires AWS
        credentials in the environment.

Email backend is independent of storage. If ``GMAIL_USER`` and
``GMAIL_APP_PASSWORD`` are set we send via Gmail SMTP
(``smtp.gmail.com:465``); the App Password is the 16-char value
generated at https://myaccount.google.com/apppasswords on a
2-Step-Verification-enabled Google account. Sending from a real Gmail
account through Google's own SMTP keeps DKIM/SPF/DMARC alignment
intact, which is what kept us out of the spam folder when SES tried
to forge a ``@gmail.com`` From header. If the credentials are unset we
log to stdout (useful in local dev).

A SQLite job-state file (``TESSERA_JOBS_DB``, default
``/tmp/tessera_jobs.db``) holds the ``{id, email, status, n_samples,
created_at, finished_at, result_url, error}`` rows. At process startup
any job left in 'running' state from a prior crash / Space sleep is
marked failed and (best-effort) emailed.
"""
from __future__ import annotations

import os
import smtplib
import sqlite3
import threading
import time
import traceback
import uuid
from concurrent.futures import ThreadPoolExecutor
from email.message import EmailMessage
from pathlib import Path


# ----------------------------------------------------------------------------
# Configuration: load .env.local if present, then read env vars.
# ----------------------------------------------------------------------------

def _load_dotenv() -> None:
    here = Path(__file__).resolve().parent
    env_file = here / ".env.local"
    if not env_file.exists():
        return
    for raw in env_file.read_text().splitlines():
        line = raw.strip()
        if not line or line.startswith("#"):
            continue
        if "=" not in line:
            continue
        k, v = line.split("=", 1)
        os.environ.setdefault(k.strip(), v.strip())

_load_dotenv()

STORAGE_BACKEND    = os.environ.get("TESSERA_STORAGE", "local")
S3_BUCKET          = os.environ.get("TESSERA_S3_BUCKET")
GMAIL_USER         = os.environ.get("GMAIL_USER", "").strip()
GMAIL_APP_PASSWORD = os.environ.get("GMAIL_APP_PASSWORD", "").replace(" ", "").strip()
SMTP_HOST          = os.environ.get("SMTP_HOST", "smtp.gmail.com")
SMTP_PORT          = int(os.environ.get("SMTP_PORT", "465"))
DB_PATH            = Path(os.environ.get("TESSERA_JOBS_DB", "/tmp/tessera_jobs.db"))
LOCAL_RESULTS_DIR  = Path(os.environ.get("TESSERA_LOCAL_RESULTS", "/tmp/tessera_results"))
RESULT_TTL         = 24 * 3600
MAX_WORKERS        = 1
RESULT_TTL_HOURS   = RESULT_TTL // 3600
EMAIL_ENABLED      = bool(GMAIL_USER and GMAIL_APP_PASSWORD)

if STORAGE_BACKEND == "aws":
    if not S3_BUCKET:
        raise RuntimeError("TESSERA_STORAGE=aws but TESSERA_S3_BUCKET is unset")
    import boto3                                              # noqa: E402
    _s3  = boto3.client("s3")
elif STORAGE_BACKEND == "local":
    LOCAL_RESULTS_DIR.mkdir(parents=True, exist_ok=True)
else:
    raise RuntimeError(f"TESSERA_STORAGE must be 'local' or 'aws', got {STORAGE_BACKEND!r}")

_executor   = ThreadPoolExecutor(max_workers=MAX_WORKERS)
_print_lock = threading.Lock()


def _log(msg: str) -> None:
    with _print_lock:
        print(f"[jobs] {msg}", flush=True)


_log(
    f"backend={STORAGE_BACKEND}  db={DB_PATH}  bucket={S3_BUCKET or '-'}  "
    f"email={'gmail-smtp' if EMAIL_ENABLED else 'stdout'}  "
    f"from={GMAIL_USER or '-'}"
)


# ----------------------------------------------------------------------------
# Tiny SQLite for job state
# ----------------------------------------------------------------------------

def _db():
    DB_PATH.parent.mkdir(parents=True, exist_ok=True)
    con = sqlite3.connect(str(DB_PATH))
    con.execute("""CREATE TABLE IF NOT EXISTS jobs (
        id           TEXT PRIMARY KEY,
        email        TEXT,
        status       TEXT,
        n_samples    INTEGER,
        created_at   INTEGER,
        finished_at  INTEGER,
        result_url   TEXT,
        error        TEXT
    )""")
    return con


def _set(job_id: str, **kw) -> None:
    fields = ", ".join(f"{k}=?" for k in kw)
    with _db() as c:
        c.execute(f"UPDATE jobs SET {fields} WHERE id=?", (*kw.values(), job_id))


def get_job(job_id: str) -> dict | None:
    keys = ("id", "email", "status", "n_samples", "created_at",
            "finished_at", "result_url", "error")
    with _db() as c:
        row = c.execute(
            f"SELECT {', '.join(keys)} FROM jobs WHERE id=?", (job_id,)
        ).fetchone()
    return dict(zip(keys, row)) if row else None


# ----------------------------------------------------------------------------
# Storage backends
# ----------------------------------------------------------------------------

def _upload_result(job_id: str, local_zip: str) -> str:
    """Move the ZIP to durable storage and return a download URL valid ~24h."""
    if STORAGE_BACKEND == "aws":
        s3_key = f"{job_id}/result.zip"
        _s3.upload_file(
            local_zip, S3_BUCKET, s3_key,
            ExtraArgs={"ContentType": "application/zip"},
        )
        return _s3.generate_presigned_url(
            "get_object",
            Params={"Bucket": S3_BUCKET, "Key": s3_key},
            ExpiresIn=RESULT_TTL,
        )
    else:
        dst_dir = LOCAL_RESULTS_DIR / job_id
        dst_dir.mkdir(parents=True, exist_ok=True)
        dst = dst_dir / "result.zip"
        Path(local_zip).rename(dst)
        return f"file://{dst}"


# ----------------------------------------------------------------------------
# Email
# ----------------------------------------------------------------------------

def _success_html(job_id: str, url: str, n_samples: int, model_variant: str) -> str:
    return (
        "<html><body style='font-family: sans-serif;'>"
        "<p>Your TESSERA inference job is ready.</p>"
        "<table style='border-collapse: collapse;'>"
        f"<tr><td><b>Job ID</b></td><td><code>{job_id}</code></td></tr>"
        f"<tr><td><b>Samples</b></td><td>{n_samples}</td></tr>"
        f"<tr><td><b>Model</b></td><td><code>{model_variant}</code></td></tr>"
        "</table>"
        f'<p><a href="{url}"><b>Download results (ZIP)</b></a><br>'
        f"Link expires in {RESULT_TTL_HOURS} hours.</p>"
        "</body></html>"
    )


def _success_text(job_id: str, url: str, n_samples: int, model_variant: str) -> str:
    return (
        "Your TESSERA inference job is ready.\n\n"
        f"Job ID:  {job_id}\n"
        f"Samples: {n_samples}\n"
        f"Model:   {model_variant}\n\n"
        f"Download (valid {RESULT_TTL_HOURS} hours):\n{url}\n"
    )


def _send_email(to: str, subject: str,
                text: str, html: str | None = None) -> None:
    """Send via Gmail SMTP if creds are present, else log to stdout."""
    if not EMAIL_ENABLED:
        _log("--- EMAIL (stdout, no SMTP creds) ---")
        _log(f"To:      {to}")
        _log(f"Subject: {subject}")
        _log(text)
        _log("--- END EMAIL ---")
        return

    msg = EmailMessage()
    msg["From"]    = GMAIL_USER
    msg["To"]      = to
    msg["Subject"] = subject
    msg.set_content(text)
    if html is not None:
        msg.add_alternative(html, subtype="html")

    with smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT) as smtp:
        smtp.login(GMAIL_USER, GMAIL_APP_PASSWORD)
        smtp.send_message(msg)


def _send_success(to: str, job_id: str, url: str,
                    n_samples: int, model_variant: str) -> None:
    subject = f"TESSERA inference ready ({n_samples} sample{'s' if n_samples != 1 else ''})"
    _send_email(
        to,
        subject,
        text=_success_text(job_id, url, n_samples, model_variant),
        html=_success_html(job_id, url, n_samples, model_variant),
    )


def _send_failure(to: str, job_id: str, error: str) -> None:
    subject = f"TESSERA inference failed ({job_id[:8]})"
    body = f"Your TESSERA inference job {job_id} failed:\n\n{error}\n"
    _send_email(to, subject, text=body)


# ----------------------------------------------------------------------------
# Worker
# ----------------------------------------------------------------------------

def _run(job_id, snv_df, cna_df, apply_qn, email):
    _set(job_id, status="running")
    _log(f"job {job_id}: running")
    try:
        # Lazy import: avoids loading TF in jobs.py if app.py imports jobs first
        from app import run_inference, pack_outputs                # type: ignore
        result = run_inference(snv_df, cna_df, apply_qn=apply_qn)
        zip_path = pack_outputs(result)
        url = _upload_result(job_id, zip_path)
        _send_success(email, job_id, url,
                      result["n_samples"], result["model_variant"])
        _set(job_id, status="done", finished_at=int(time.time()),
             result_url=url)
        _log(f"job {job_id}: done -> {email}")
    except Exception as e:
        tb = traceback.format_exc()
        _log(f"job {job_id}: failed: {e}\n{tb}")
        _set(job_id, status="failed", finished_at=int(time.time()),
             error=str(e))
        try:
            _send_failure(email, job_id, str(e))
        except Exception as e2:
            _log(f"job {job_id}: failure-email also failed: {e2}")


# ----------------------------------------------------------------------------
# Public API
# ----------------------------------------------------------------------------

def submit_job(snv_df, cna_df, apply_qn: bool, email: str, n_samples: int) -> str:
    job_id = str(uuid.uuid4())
    with _db() as c:
        c.execute(
            "INSERT INTO jobs (id, email, status, n_samples, created_at) "
            "VALUES (?, ?, 'queued', ?, ?)",
            (job_id, email, n_samples, int(time.time())),
        )
    _executor.submit(_run, job_id, snv_df, cna_df, apply_qn, email)
    _log(f"job {job_id}: queued ({n_samples} samples) for {email}")
    return job_id


# ----------------------------------------------------------------------------
# Startup: fail any 'running' jobs left over from a prior crash / sleep.
# ----------------------------------------------------------------------------

def _resurrect_running_jobs() -> None:
    cutoff = int(time.time()) - 60
    with _db() as c:
        rows = c.execute(
            "SELECT id, email FROM jobs "
            "WHERE status='running' AND created_at < ?", (cutoff,),
        ).fetchall()
    for jid, email in rows:
        _set(jid, status="failed",
             finished_at=int(time.time()),
             error="process restart killed worker")
        _log(f"orphan job {jid}: marked failed (restart)")
        if email:
            try:
                _send_failure(email, jid, "process restart killed the worker")
            except Exception:
                pass

_resurrect_running_jobs()