Manglify_Backend / auth_supabase.py
Ying Jiang
check point 1
5028126
raw
history blame
4.78 kB
"""Verify Supabase user access_token: locally (JWT secret) or via Auth REST (no legacy secret)."""
from __future__ import annotations
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Annotated, Any, Optional
from uuid import UUID
import httpx
import jwt
from fastapi import Depends, Header, HTTPException
try:
from dotenv import load_dotenv
load_dotenv(Path(__file__).resolve().parent / ".env")
except ImportError:
pass
@dataclass(frozen=True)
class AuthContext:
user_id: UUID
email: Optional[str]
def _auth_context_from_jwt_local(token: str, secret: str) -> AuthContext:
try:
payload = jwt.decode(
token,
secret,
algorithms=["HS256"],
audience="authenticated",
)
except jwt.InvalidAudienceError:
payload = jwt.decode(
token,
secret,
algorithms=["HS256"],
options={"verify_aud": False},
)
except jwt.PyJWTError:
raise HTTPException(status_code=401, detail="Invalid or expired token")
sub = payload.get("sub")
if not sub:
raise HTTPException(status_code=401, detail="Token missing sub")
try:
user_id = UUID(str(sub))
except ValueError:
raise HTTPException(status_code=401, detail="Invalid user id in token")
raw_email = payload.get("email")
email: Optional[str] = (
str(raw_email) if isinstance(raw_email, str) and raw_email.strip() else None
)
return AuthContext(user_id=user_id, email=email)
def _auth_context_from_supabase_remote(token: str, base_url: str, anon_key: str) -> AuthContext:
url = f"{base_url.rstrip('/')}/auth/v1/user"
try:
with httpx.Client(timeout=10.0) as client:
r = client.get(
url,
headers={
"Authorization": f"Bearer {token}",
"apikey": anon_key,
},
)
except httpx.RequestError as e:
raise HTTPException(
status_code=502,
detail=f"Could not reach Supabase Auth: {e}",
) from e
if r.status_code in (401, 403):
raise HTTPException(status_code=401, detail="Invalid or expired token")
if r.status_code != 200:
raise HTTPException(
status_code=502,
detail=f"Supabase Auth returned {r.status_code}",
)
try:
data: dict[str, Any] = r.json()
except ValueError as e:
raise HTTPException(status_code=502, detail="Invalid response from Supabase Auth") from e
uid = data.get("id")
if not uid:
raise HTTPException(status_code=502, detail="Supabase Auth response missing user id")
try:
user_id = UUID(str(uid))
except ValueError:
raise HTTPException(status_code=502, detail="Invalid user id from Supabase Auth")
raw_email = data.get("email")
email: Optional[str] = (
str(raw_email) if isinstance(raw_email, str) and raw_email.strip() else None
)
return AuthContext(user_id=user_id, email=email)
def auth_context_from_authorization(authorization: str | None) -> AuthContext:
if not authorization or not authorization.lower().startswith("bearer "):
raise HTTPException(
status_code=401,
detail="Missing or invalid Authorization header",
)
token = authorization[7:].strip()
secret = (os.environ.get("SUPABASE_JWT_SECRET") or "").strip()
if secret:
return _auth_context_from_jwt_local(token, secret)
base_url = (os.environ.get("SUPABASE_URL") or "").strip()
anon_key = (
os.environ.get("SUPABASE_ANON_KEY")
or os.environ.get("SUPABASE_PUBLISHABLE_KEY")
or ""
).strip()
if base_url and anon_key:
return _auth_context_from_supabase_remote(token, base_url, anon_key)
raise HTTPException(
status_code=503,
detail=(
"Server is not configured for Supabase auth: set SUPABASE_JWT_SECRET, "
"or set SUPABASE_URL + SUPABASE_ANON_KEY (publishable/anon key) to verify tokens via Auth API."
),
)
def user_id_from_authorization(authorization: str | None) -> UUID:
return auth_context_from_authorization(authorization).user_id
def get_current_user_id(
authorization: Annotated[str | None, Header(alias="Authorization")] = None,
) -> UUID:
return user_id_from_authorization(authorization)
def get_auth_context(
authorization: Annotated[str | None, Header(alias="Authorization")] = None,
) -> AuthContext:
return auth_context_from_authorization(authorization)
CurrentUserId = Annotated[UUID, Depends(get_current_user_id)]
CurrentAuthContext = Annotated[AuthContext, Depends(get_auth_context)]