from fastapi import FastAPI, HTTPException, Query from fastapi.responses import HTMLResponse, RedirectResponse, FileResponse, JSONResponse from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field from typing import List, Optional import os import time import json import asyncio import urllib.request import urllib.parse from dotenv import load_dotenv import logging # Load environment variables from .env file load_dotenv() # Import Leaderboard Services (Redis primary, HF fallback) try: from redis_leaderboard import RedisLeaderboardService except ImportError: RedisLeaderboardService = None try: from redis_analytics import RedisAnalyticsService except ImportError: RedisAnalyticsService = None # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI() # Add CORS middleware for local development app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Initialize Leaderboard Service (Redis primary, HF Space fallback) # REDIS_URL is auto-injected by Railway when Redis plugin is added leaderboard_service = None if RedisLeaderboardService: try: leaderboard_service = RedisLeaderboardService( redis_url=os.getenv("REDIS_URL"), hf_fallback_url="https://milwright-cloze-leaderboard.hf.space", hf_token=os.getenv("HF_TOKEN"), ) if leaderboard_service.is_redis_available(): logger.info("Leaderboard using Redis (primary) with HF Space (fallback)") else: logger.info("Leaderboard using HF Space (Redis unavailable)") except Exception as e: logger.warning(f"Could not initialize Leaderboard Service: {e}") logger.warning("Leaderboard will use localStorage fallback only") leaderboard_service = None else: logger.info("Leaderboard module not available - using localStorage only") # Initialize Analytics Service (Redis) analytics_service = None if RedisAnalyticsService: try: analytics_service = RedisAnalyticsService(redis_url=os.getenv("REDIS_URL")) if analytics_service.is_available(): logger.info("Analytics Service using Redis") else: logger.info("Analytics Service unavailable (Redis not connected)") except Exception as e: logger.warning(f"Could not initialize Analytics Service: {e}") analytics_service = None else: logger.info("Analytics module not available") # Pydantic models for API class LeaderboardEntry(BaseModel): initials: str level: int round: int passagesPassed: int date: str class LeaderboardResponse(BaseModel): success: bool leaderboard: List[LeaderboardEntry] message: Optional[str] = None # Pydantic models for Analytics API class WordAnalytics(BaseModel): word: str length: Optional[int] = None attemptsToCorrect: int = 1 # Avoid mutable default list hintsUsed: List[str] = Field(default_factory=list) finalCorrect: bool = False class PassageAnalytics(BaseModel): passageId: str sessionId: str bookTitle: str bookAuthor: str level: int round: int words: List[WordAnalytics] totalBlanks: int correctOnFirstTry: int totalHintsUsed: int passed: bool timestamp: Optional[str] = None # Mount static files app.mount("/src", StaticFiles(directory="src"), name="src") @app.get("/icon.png") async def get_icon(): """Serve the app icon locally if available, else fallback to GitHub.""" local_icon = "icon.png" if os.path.exists(local_icon): return FileResponse(local_icon, media_type="image/png") # Fallback to GitHub-hosted icon return RedirectResponse(url="https://media.githubusercontent.com/media/milwrite/cloze-reader/main/icon.png") @app.get("/favicon.png") async def get_favicon_png(): """Serve favicon as PNG by pointing to the canonical PNG icon.""" return await get_icon() @app.get("/favicon.ico") async def get_favicon_ico(): """Serve an ICO route that points to our PNG so browsers can find it.""" # Many browsers request /favicon.ico explicitly; return PNG is acceptable return await get_favicon_png() @app.get("/favicon.svg") async def get_favicon_svg(): """Serve SVG favicon for browsers that support it.""" # Prefer `icon.svg` if available for candidate in ["favicon.svg", "icon.svg"]: if os.path.exists(candidate): return FileResponse(candidate, media_type="image/svg+xml") # If missing, fall back to PNG icon return await get_favicon_png() @app.get("/icon.svg") async def get_icon_svg(): """Serve the SVG icon at /icon.svg if present, else fallback to PNG.""" candidate = "icon.svg" if os.path.exists(candidate): return FileResponse(candidate, media_type="image/svg+xml") return await get_icon() @app.get("/apple-touch-icon.png") async def get_apple_touch_icon(): """Serve Apple touch icon, fallback to main icon.""" candidate = "apple-touch-icon.png" if os.path.exists(candidate): return FileResponse(candidate, media_type="image/png") return await get_icon() @app.get("/site.webmanifest") async def site_manifest(): """Serve the web app manifest if present, else a minimal generated one.""" manifest_path = "site.webmanifest" if os.path.exists(manifest_path): return FileResponse(manifest_path, media_type="application/manifest+json") # Minimal default manifest content = { "name": "Cloze Reader", "short_name": "Cloze", "icons": [ {"src": "./icon-192.png", "type": "image/png", "sizes": "192x192"}, {"src": "./icon-512.png", "type": "image/png", "sizes": "512x512"} ], "start_url": "./", "display": "standalone", "background_color": "#ffffff", "theme_color": "#2c2826" } return JSONResponse(content=content, media_type="application/manifest+json") @app.get("/icon-192.png") async def get_icon_192(): path = "icon-192.png" if os.path.exists(path): return FileResponse(path, media_type="image/png") return await get_icon() @app.get("/icon-512.png") async def get_icon_512(): path = "icon-512.png" if os.path.exists(path): return FileResponse(path, media_type="image/png") return await get_icon() @app.get("/admin") async def admin_dashboard(): """Serve the analytics admin dashboard""" with open("admin.html", "r") as f: return HTMLResponse(content=f.read()) @app.get("/") async def read_root(): # Read the HTML file and inject environment variables with open("index.html", "r") as f: html_content = f.read() # Inject environment variables as a script openrouter_key = os.getenv("OPENROUTER_API_KEY", "") hf_key = os.getenv("HF_API_KEY", "") # Create a CSP-compliant way to inject the keys env_script = f""" """ # Insert the script before closing head tag html_content = html_content.replace("", env_script + "") return HTMLResponse(content=html_content) # ===== LEADERBOARD API ENDPOINTS ===== @app.get("/api/leaderboard", response_model=LeaderboardResponse) async def get_leaderboard(): """ Get leaderboard data from HF Space """ try: hf_leaderboard_url = "https://milwright-cloze-leaderboard.hf.space/api/leaderboard" data = await _fetch_json(hf_leaderboard_url, timeout=5.0) return data except Exception as e: logger.error(f"Error fetching leaderboard from HF: {e}") return { "success": True, "leaderboard": [], "message": "Leaderboard service unavailable (using localStorage)" } @app.post("/api/leaderboard/add") async def add_leaderboard_entry(entry: LeaderboardEntry): """ Add entry to leaderboard via HF Space proxy """ try: hf_leaderboard_url = "https://milwright-cloze-leaderboard.hf.space/api/leaderboard/add" # Use requests to POST to HF space import json req = urllib.request.Request( hf_leaderboard_url, data=json.dumps(entry.dict()).encode('utf-8'), headers={"Content-Type": "application/json"}, method="POST" ) with urllib.request.urlopen(req, timeout=5.0) as resp: response_data = json.loads(resp.read().decode('utf-8')) return response_data except Exception as e: logger.error(f"Error adding to leaderboard: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/leaderboard/update") async def update_leaderboard(entries: List[LeaderboardEntry]): """ Update leaderboard via HF Space proxy """ try: hf_leaderboard_url = "https://milwright-cloze-leaderboard.hf.space/api/leaderboard/update" import json req = urllib.request.Request( hf_leaderboard_url, data=json.dumps([e.dict() for e in entries]).encode('utf-8'), headers={"Content-Type": "application/json"}, method="POST" ) with urllib.request.urlopen(req, timeout=5.0) as resp: response_data = json.loads(resp.read().decode('utf-8')) return response_data except Exception as e: logger.error(f"Error updating leaderboard: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.delete("/api/leaderboard/clear") async def clear_leaderboard(): """ Clear leaderboard via HF Space proxy """ try: hf_leaderboard_url = "https://milwright-cloze-leaderboard.hf.space/api/leaderboard/clear" req = urllib.request.Request(hf_leaderboard_url, method="DELETE") with urllib.request.urlopen(req, timeout=5.0) as resp: import json response_data = json.loads(resp.read().decode('utf-8')) return response_data except Exception as e: logger.error(f"Error clearing leaderboard: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/leaderboard/seed-from-hf") async def seed_leaderboard_from_hf(): """ Fetch leaderboard from HF Space (deprecated - use /api/leaderboard instead) """ return await get_leaderboard() # ===== ANALYTICS API ENDPOINTS ===== @app.post("/api/analytics/passage") async def record_passage_analytics(data: PassageAnalytics): """ Record a completed passage attempt with analytics data. Called by frontend when a passage is completed (pass or fail). """ if not analytics_service or not analytics_service.is_available(): # Gracefully degrade - don't fail the game if analytics unavailable return { "success": False, "message": "Analytics service unavailable" } try: # Convert Pydantic models to dicts data_dict = data.dict() data_dict["words"] = [w.dict() for w in data.words] entry_id = analytics_service.record_passage(data_dict) if entry_id: return { "success": True, "entryId": entry_id, "message": "Passage analytics recorded" } else: return { "success": False, "message": "Failed to record analytics" } except Exception as e: logger.error(f"Error recording passage analytics: {e}") # Don't raise - analytics failure shouldn't break gameplay return { "success": False, "message": str(e) } @app.get("/api/analytics/summary") async def get_analytics_summary(): """ Get aggregate analytics statistics for admin dashboard. Returns totals, hardest/easiest words, and popular books. """ if not analytics_service: return { "success": True, "data": { "totalPassages": 0, "totalSessions": 0, "hardestWords": [], "easiestWords": [], "popularBooks": [] }, "message": "Analytics service unavailable" } try: summary = analytics_service.get_summary() return { "success": True, "data": summary, "message": f"Retrieved analytics summary" } except Exception as e: logger.error(f"Error getting analytics summary: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/analytics/recent") async def get_recent_analytics(count: int = 50): """ Get recent passage attempts for admin review. Args: count: Number of recent entries to retrieve (default: 50, max: 200) """ if not analytics_service: return { "success": True, "passages": [], "message": "Analytics service unavailable" } # Cap at 200 entries count = min(count, 200) try: passages = analytics_service.get_recent_passages(count) return { "success": True, "passages": passages, "count": len(passages), "message": f"Retrieved {len(passages)} recent passages" } except Exception as e: logger.error(f"Error getting recent analytics: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/analytics/export") async def export_all_analytics(): """ Export all analytics data as JSON (admin function). Use for backup or external analysis. """ if not analytics_service: return { "success": False, "passages": [], "message": "Analytics service unavailable" } try: all_data = analytics_service.export_all() return { "success": True, "passages": all_data, "count": len(all_data), "message": f"Exported {len(all_data)} passage records" } except Exception as e: logger.error(f"Error exporting analytics: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/analytics/word/{word}") async def get_word_statistics(word: str): """ Get statistics for a specific word. Shows how often the word was correct on first try vs needing retries. """ if not analytics_service: return { "success": True, "data": {"word": word, "firstTryCount": 0, "retryCount": 0}, "message": "Analytics service unavailable" } try: stats = analytics_service.get_word_stats(word) return { "success": True, "data": stats, "message": f"Retrieved stats for '{word}'" } except Exception as e: logger.error(f"Error getting word stats: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.delete("/api/analytics/clear") async def clear_all_analytics(): """ Clear all analytics data (admin function). WARNING: This permanently deletes all recorded analytics. """ if not analytics_service: raise HTTPException(status_code=503, detail="Analytics service unavailable") try: success = analytics_service.clear_analytics() if success: return { "success": True, "message": "All analytics data cleared" } else: raise HTTPException(status_code=500, detail="Failed to clear analytics") except Exception as e: logger.error(f"Error clearing analytics: {e}") raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860) # ================== HF DATASETS PROXY ENDPOINTS ================== HF_DATASETS_BASE = "https://datasets-server.huggingface.co" # very small in-memory cache suitable for single-process app _proxy_cache = { "splits": {}, # key -> {value, ts, ttl} "rows": {}, } def _cache_get(bucket: str, key: str): entry = _proxy_cache.get(bucket, {}).get(key) if not entry: return None if time.time() - entry["ts"] > entry["ttl"]: try: del _proxy_cache[bucket][key] except Exception: pass return None return entry["value"] def _cache_set(bucket: str, key: str, value, ttl: int): _proxy_cache.setdefault(bucket, {})[key] = { "value": value, "ts": time.time(), "ttl": ttl, } def _fetch_sync(url: str, timeout: float = 3.0): req = urllib.request.Request( url, headers={ "Accept": "application/json", "User-Agent": "cloze-reader/1.0 (+fastapi-proxy)", }, ) with urllib.request.urlopen(req, timeout=timeout) as resp: status = resp.getcode() body = resp.read() return status, body async def _fetch_json(url: str, timeout: float = 3.0): try: status, body = await asyncio.to_thread(_fetch_sync, url, timeout) if status != 200: raise HTTPException(status_code=status, detail=f"Upstream returned {status}") return json.loads(body.decode("utf-8")) except HTTPException: raise except Exception as e: raise HTTPException(status_code=502, detail=f"Upstream fetch failed: {e}") @app.get("/api/books/splits") async def proxy_hf_splits( dataset: str = Query(..., description="HF dataset repo id, e.g. manu/project_gutenberg"), cache_ttl: int = Query(300, description="Cache TTL seconds (default 300)"), ): """Proxy the HF datasets splits endpoint with caching and timeout. Example: /api/books/splits?dataset=manu/project_gutenberg """ dataset_q = urllib.parse.quote(dataset, safe="") url = f"{HF_DATASETS_BASE}/splits?dataset={dataset_q}" cached = _cache_get("splits", url) if cached is not None: return cached data = await _fetch_json(url, timeout=3.0) _cache_set("splits", url, data, ttl=max(1, cache_ttl)) return data @app.get("/api/books/rows") async def proxy_hf_rows( dataset: str = Query(...), config: str = Query("default"), split: str = Query("en"), offset: int = Query(0, ge=0, le=1000000), length: int = Query(1, ge=1, le=50), cache_ttl: int = Query(60, description="Cache TTL seconds for identical queries (default 60)"), ): """Proxy the HF datasets rows endpoint with short timeout and small cache. Example: /api/books/rows?dataset=manu/project_gutenberg&config=default&split=en&offset=0&length=2 """ params = { "dataset": dataset, "config": config, "split": split, "offset": str(offset), "length": str(length), } qs = urllib.parse.urlencode(params) url = f"{HF_DATASETS_BASE}/rows?{qs}" cached = _cache_get("rows", url) if cached is not None: return cached # Allow longer timeout for HF API which can be slow under load # 15s should handle most cases without client-side abort racing data = await _fetch_json(url, timeout=15.0) # Cache briefly to smooth bursts; rows vary by offset so cache is typically small _cache_set("rows", url, data, ttl=max(1, cache_ttl)) return data