Spaces:
Runtime error
Runtime error
| # lythron_full.py | |
| import os | |
| import io | |
| import struct | |
| import time | |
| import sqlite3 | |
| import secrets | |
| from typing import List, Optional, Dict, Any | |
| from fastapi import FastAPI, UploadFile, File, Depends, HTTPException, status, Request, Response | |
| from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, Field | |
| from passlib.context import CryptContext | |
| from jose import JWTError, jwt | |
| # ------------------------- | |
| # Config / Environment | |
| # ------------------------- | |
| SECRET_KEY = os.getenv("LYTHRON_SECRET_KEY", secrets.token_urlsafe(32)) | |
| ALGORITHM = "HS256" | |
| ACCESS_TOKEN_EXPIRE_SECONDS = 60 * 60 * 24 # 24h | |
| DATABASE = os.getenv("LYTHRON_DB", "lythron_lythron.db") | |
| ALLOWED_ORIGINS = os.getenv("LYTHRON_ALLOWED_ORIGINS", "http://localhost:3000").split(",") | |
| # ------------------------- | |
| # DB init (SQLite lightweight) | |
| # ------------------------- | |
| def init_db(): | |
| conn = sqlite3.connect(DATABASE) | |
| cur = conn.cursor() | |
| cur.execute(""" | |
| CREATE TABLE IF NOT EXISTS users ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| email TEXT UNIQUE, | |
| hashed_password TEXT, | |
| plan TEXT DEFAULT 'LTR 2.1', | |
| prompts_used INTEGER DEFAULT 0, | |
| prompts_reset_ts INTEGER DEFAULT 0, | |
| is_admin INTEGER DEFAULT 0 | |
| )""") | |
| cur.execute(""" | |
| CREATE TABLE IF NOT EXISTS audit ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| user_email TEXT, | |
| action TEXT, | |
| details TEXT, | |
| ts INTEGER | |
| )""") | |
| conn.commit() | |
| conn.close() | |
| init_db() | |
| # ------------------------- | |
| # Auth utils (bcrypt + JWT) | |
| # ------------------------- | |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token") | |
| def hash_password(password: str) -> str: | |
| return pwd_context.hash(password) | |
| def verify_password(plain: str, hashed: str) -> bool: | |
| return pwd_context.verify(plain, hashed) | |
| def create_access_token(data: dict, expires_delta: Optional[int] = None): | |
| to_encode = data.copy() | |
| expire = int(time.time()) + (expires_delta or ACCESS_TOKEN_EXPIRE_SECONDS) | |
| to_encode.update({"exp": expire}) | |
| return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) | |
| def get_user(email: str) -> Optional[Dict[str, Any]]: | |
| conn = sqlite3.connect(DATABASE) | |
| cur = conn.cursor() | |
| cur.execute("SELECT email, hashed_password, plan, prompts_used, prompts_reset_ts, is_admin FROM users WHERE email=?", (email,)) | |
| row = cur.fetchone() | |
| conn.close() | |
| if not row: | |
| return None | |
| return { | |
| "email": row[0], | |
| "hashed_password": row[1], | |
| "plan": row[2], | |
| "prompts_used": row[3], | |
| "prompts_reset_ts": row[4], | |
| "is_admin": bool(row[5]) | |
| } | |
| def create_user(email: str, password: str, plan: str = "LTR 2.1", is_admin: bool = False): | |
| hp = hash_password(password) | |
| conn = sqlite3.connect(DATABASE) | |
| cur = conn.cursor() | |
| try: | |
| cur.execute("INSERT INTO users (email, hashed_password, plan, prompts_reset_ts) VALUES (?, ?, ?, ?)", | |
| (email, hp, plan, 0)) | |
| conn.commit() | |
| except sqlite3.IntegrityError: | |
| conn.close() | |
| raise HTTPException(status_code=400, detail="User already exists") | |
| conn.close() | |
| # ------------------------- | |
| # Audit helper | |
| # ------------------------- | |
| def audit(user_email: str, action: str, details: str = ""): | |
| conn = sqlite3.connect(DATABASE) | |
| cur = conn.cursor() | |
| cur.execute("INSERT INTO audit (user_email, action, details, ts) VALUES (?, ?, ?, ?)", | |
| (user_email, action, details, int(time.time()))) | |
| conn.commit() | |
| conn.close() | |
| # ------------------------- | |
| # Plan limits | |
| # ------------------------- | |
| PLAN_LIMITS = { | |
| "LTR 2.1": 10, | |
| "LTR 2.5": 20, | |
| "LTR 5.0": 9999999 | |
| } | |
| def check_and_consume_prompt(user_email: Optional[str]): | |
| if not user_email: | |
| return True | |
| user = get_user(user_email) | |
| if not user: | |
| raise HTTPException(status_code=401, detail="Unknown user") | |
| limit = PLAN_LIMITS.get(user["plan"], 10) | |
| now = int(time.time()) | |
| reset_ts = user["prompts_reset_ts"] or 0 | |
| # reset every 24h since last reset | |
| if now > (reset_ts + 24*3600): | |
| conn = sqlite3.connect(DATABASE) | |
| cur = conn.cursor() | |
| cur.execute("UPDATE users SET prompts_used=0, prompts_reset_ts=? WHERE email=?", (now, user["email"])) | |
| conn.commit() | |
| conn.close() | |
| user["prompts_used"] = 0 | |
| if user["prompts_used"] >= limit: | |
| raise HTTPException(status_code=429, detail="Prompts limit reached for your plan") | |
| conn = sqlite3.connect(DATABASE) | |
| cur = conn.cursor() | |
| cur.execute("UPDATE users SET prompts_used = prompts_used + 1 WHERE email=?", (user["email"],)) | |
| conn.commit() | |
| conn.close() | |
| return True | |
| # ------------------------- | |
| # Simple safe mock engine (non-destructive) | |
| # ------------------------- | |
| class MockEngine: | |
| def __init__(self): | |
| self.status = {"model": "mock-v1"} | |
| self.memory = [] | |
| def generate(self, prompt: str, modality: str = "texto", language: Optional[str] = None) -> str: | |
| # minimal sanitization | |
| text = prompt.replace("\x00", " ") | |
| self.memory.append({"prompt": text, "modality": modality, "ts": int(time.time())}) | |
| if language: | |
| safe = text.replace('"', '\\"').replace("\n"," ") | |
| return f"// {language} stub code for: {safe[:120]}" | |
| return f"{self.status.get('model')} response: {text[:400]}" | |
| def analyze_file(self, name: str, data: bytes, content_type: Optional[str] = None) -> Dict[str, Any]: | |
| summary = {"filename": name, "size": len(data)} | |
| extension = name.lower().split(".")[-1] if "." in name else "" | |
| if extension == "png" or (content_type and "png" in (content_type or "").lower()): | |
| if len(data) >= 24 and data.startswith(b"\x89PNG\r\n\x1a\n"): | |
| width = struct.unpack("!I", data[16:20])[0] | |
| height = struct.unpack("!I", data[20:24])[0] | |
| summary["preview"] = f"PNG {width}x{height}" | |
| summary["metadata"] = {"width": width, "height": height, "mode": "RGBA"} | |
| else: | |
| summary["error"] = "Invalid PNG" | |
| elif extension == "pdf" or (content_type and "pdf" in (content_type or "").lower()): | |
| try: | |
| import PyPDF2 | |
| reader = PyPDF2.PdfReader(io.BytesIO(data)) | |
| pages = len(reader.pages) | |
| text_preview = "" | |
| for p in reader.pages[:2]: | |
| text_preview += (p.extract_text() or "") | |
| summary["preview"] = text_preview[:2048] | |
| summary["metadata"] = {"pages": pages} | |
| except Exception as e: | |
| summary["error"] = f"PDF parse error: {e}" | |
| else: | |
| try: | |
| text = data.decode("utf-8", errors="ignore") | |
| summary["preview"] = text[:1024] | |
| except Exception: | |
| summary["error"] = "Unknown file content" | |
| return summary | |
| def remove_background(self, image_bytes: bytes) -> bytes: | |
| # demo stub: return original bytes. Replace with real model in production. | |
| return image_bytes | |
| def remove_watermark(self, image_bytes: bytes, user_email: str) -> bytes: | |
| # require plan check outside this function | |
| return image_bytes | |
| engine = MockEngine() | |
| # ------------------------- | |
| # Pydantic schemas (your original ones merged) | |
| # ------------------------- | |
| class TaskRequest(BaseModel): | |
| mode: Literal["core", "connect", "mirror", "interface", "creative", "strategy", "task"] | |
| objective: str | |
| context: Optional[str] = None | |
| class SimulationRequest(BaseModel): | |
| scenario: Literal["proyectos", "finanzas", "estrategia", "diseño"] | |
| variables: List[str] | |
| horizon: int = Field(gt=0, le=52) | |
| class CreativeFusionRequest(BaseModel): | |
| title: str | |
| narrative: str | |
| include_audio: bool = True | |
| include_video: bool = True | |
| class ProfileUpdate(BaseModel): | |
| name: str | |
| subscription: Literal["LTR 2.1", "LTR 2.5", "LTR 5.0"] | |
| palette: Literal["Neón Futurista", "Aurora Boreal", "Minimal Zen", "Ciber Rojo"] | |
| mood: Literal["sereno", "enérgico", "analítico"] | |
| emotional_state: Optional[str] = None | |
| class PredictRequest(BaseModel): | |
| prompt: str | |
| modality: str = "texto" | |
| language: Optional[str] = None | |
| class PredictResponse(BaseModel): | |
| output: str | |
| status: Dict[str, Any] | |
| # ------------------------- | |
| # FastAPI app + middleware + security headers | |
| # ------------------------- | |
| app = FastAPI(title="Lythron Mock + Secure MVP", version="EVO.3") | |
| # CORS | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=ALLOWED_ORIGINS, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Security headers middleware | |
| async def add_security_headers(request: Request, call_next): | |
| response: Response = await call_next(request) | |
| # Basic headers | |
| response.headers["X-Frame-Options"] = "DENY" | |
| response.headers["X-Content-Type-Options"] = "nosniff" | |
| response.headers["Referrer-Policy"] = "no-referrer" | |
| response.headers["Permissions-Policy"] = "geolocation=(), microphone=()" | |
| response.headers["Strict-Transport-Security"] = "max-age=63072000; includeSubDomains; preload" | |
| # You can customize CSP to your frontend needs (start strict) | |
| response.headers["Content-Security-Policy"] = "default-src 'self'" | |
| return response | |
| # ------------------------- | |
| # Auth endpoints | |
| # ------------------------- | |
| async def register(form: OAuth2PasswordRequestForm = Depends()): | |
| email = form.username | |
| password = form.password | |
| create_user(email, password) | |
| audit(email, "register", "user created") | |
| return {"ok": True} | |
| async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): | |
| user = get_user(form_data.username) | |
| if not user or not verify_password(form_data.password, user["hashed_password"]): | |
| raise HTTPException(status_code=400, detail="Incorrect username or password") | |
| access_token = create_access_token(data={"sub": user["email"]}) | |
| audit(user["email"], "login", "token issued") | |
| return {"access_token": access_token, "token_type": "bearer"} | |
| # Dependency: current user (strict) | |
| async def get_current_user(token: str = Depends(oauth2_scheme)): | |
| credentials_exception = HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Could not validate credentials", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| try: | |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
| email: str = payload.get("sub") | |
| if email is None: | |
| raise credentials_exception | |
| except JWTError: | |
| raise credentials_exception | |
| user = get_user(email) | |
| if user is None: | |
| raise credentials_exception | |
| return user | |
| # Optional user dependency (None if anonymous) | |
| async def get_current_user_optional(request: Request) -> Optional[Dict[str, Any]]: | |
| auth = request.headers.get("authorization") | |
| if not auth: | |
| return None | |
| token = auth.split("Bearer ")[-1] | |
| try: | |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
| email: str = payload.get("sub") | |
| if not email: | |
| return None | |
| return get_user(email) | |
| except Exception: | |
| return None | |
| # ------------------------- | |
| # Your original endpoints (integrated with checks) | |
| # ------------------------- | |
| def status(): | |
| return { | |
| "persona": { | |
| "arquetipo": "Mentor Sereno", | |
| "mantra": "Comprende. Mejora. Acompaña.", | |
| "lema": "Precision through understanding." | |
| }, | |
| "versiones": { | |
| "Lythron-EVO": "3", | |
| "Lythron-CORE": "experimental", | |
| "Lythron-SYN": "1" | |
| }, | |
| "modulos": [ | |
| "Lythron.Core", | |
| "Lythron.Connect", | |
| "Lythron.Mirror", | |
| "Lythron.Interface", | |
| "Lythron.ImageCleanse", | |
| "Lythron.WatermarkShield", | |
| "Lythron.PlanFlow", | |
| "Lythron.ContextSense", | |
| "Lythron.SYN.MediaLab", | |
| "Lythron.SimForge", | |
| "Lythron.SYN.Fusion", | |
| "Lythron.Sentinel" | |
| ], | |
| "paletas": { | |
| "Neón Futurista": ["#66CCFF", "#1E1E1E", "#FAFAFA"], | |
| "Aurora Boreal": ["#4DA1FF", "#1A2E40", "#E8F7FF"], | |
| "Minimal Zen": ["#FFFFFF", "#111111", "#88C0D0"], | |
| "Ciber Rojo": ["#FF3366", "#0B0B0D", "#F5F5F7"] | |
| } | |
| } | |
| def plan_actions(payload: TaskRequest, user=Depends(get_current_user_optional)): | |
| user_email = user["email"] if user else "anonymous" | |
| audit(user_email, "plan", payload.objective[:200]) | |
| return { | |
| "engine": "Lythron.PlanFlow", | |
| "mode": payload.mode, | |
| "objective": payload.objective, | |
| "context": payload.context or "sin contexto adicional", | |
| "steps": [ | |
| {"step": 1, "action": "Diagnóstico Lythron.Core", "explainable": True}, | |
| {"step": 2, "action": "Colaboración Lythron.Connect", "explainable": True}, | |
| {"step": 3, "action": "Ajuste emocional Lythron.Mirror", "explainable": True}, | |
| {"step": 4, "action": "Entrega narrativa Lythron.Interface", "explainable": True} | |
| ], | |
| "telemetria": {"tracking": True, "alerts": ["Lythron.Sentinel"]} | |
| } | |
| def simulate(payload: SimulationRequest, user=Depends(get_current_user_optional)): | |
| user_email = user["email"] if user else "anonymous" | |
| audit(user_email, "simulate", payload.scenario) | |
| return { | |
| "engine": "Lythron.SimForge", | |
| "scenario": payload.scenario, | |
| "variables": payload.variables, | |
| "horizon_weeks": payload.horizon, | |
| "prediction": { | |
| "baseline": "Escenario conservador", | |
| "optimistic": "Incremento del 18%", | |
| "sensitive": "Riesgo moderado, activar Lythron.Sentinel" | |
| }, | |
| "explainability": { | |
| "rationale": "Modelos causales con buffer de experiencia bajo control", | |
| "bias_audit": "En línea via Lythron.Sentinel" | |
| } | |
| } | |
| def creative_fusion(payload: CreativeFusionRequest, user=Depends(get_current_user_optional)): | |
| user_email = user["email"] if user else "anonymous" | |
| audit(user_email, "creative_fusion", payload.title) | |
| return { | |
| "engine": "Lythron.SYN.Fusion", | |
| "title": payload.title, | |
| "tracks": { | |
| "texto": {"status": "borrador", "preview": payload.narrative[:120]}, | |
| "imagen": {"status": "render", "style": "paleta seleccionada"}, | |
| "audio": {"status": "incluido" if payload.include_audio else "omitido", "duration": "90s"}, | |
| "video": {"status": "incluido" if payload.include_video else "omitido", "resolution": "1080p"} | |
| }, | |
| "collab": {"board_url": "https://mock.lythron/collab/123", "iterations": 3} | |
| } | |
| def update_profile(payload: ProfileUpdate, user=Depends(get_current_user_optional)): | |
| user_email = user["email"] if user else "anonymous" | |
| audit(user_email, "update_profile", json_repr(payload.dict())) | |
| modules = ["timeline"] if payload.subscription == "LTR 2.1" else ["timeline", "creative-board", "sentinel-alerts"] | |
| return { | |
| "profile": payload.dict(), | |
| "mirror_mode": "Lythron.Mirror+" if payload.emotional_state else "Lythron.Mirror", | |
| "interface_layout": { | |
| "primary": "modos adaptativos", | |
| "modules": modules | |
| }, | |
| "history_panel": { | |
| "enabled": True, | |
| "metrics": ["impacto", "tiempo ahorrado", "compliance"], | |
| "gamification": payload.subscription != "LTR 2.1" | |
| } | |
| } | |
| def security_overview(user=Depends(get_current_user_optional)): | |
| user_email = user["email"] if user else "anonymous" | |
| audit(user_email, "security_overview", "") | |
| return { | |
| "compliance": { | |
| "auditoria": "continua", | |
| "traceability": "blockchain experimental", | |
| "permisos": "seguridad adaptativa" | |
| }, | |
| "supervision": { | |
| "sentinel": { | |
| "alerts": ["riesgo legal", "sesgo detectado", "uso no autorizado"], | |
| "mode": "autónomo asistido" | |
| }, | |
| "watermarkshield": {"requires_consent": True, "status": "enterprise only"} | |
| } | |
| } | |
| # ------------------------- | |
| # File endpoints: analyze + remove BG + remove watermark | |
| # ------------------------- | |
| async def analyze(file: UploadFile = File(...), user=Depends(get_current_user_optional)): | |
| data = await file.read() | |
| res = engine.analyze_file(file.filename, data, file.content_type) | |
| audit((user["email"] if user else "anonymous"), "analyze", file.filename) | |
| return res | |
| async def remove_background(file: UploadFile = File(...), user=Depends(get_current_user_optional)): | |
| data = await file.read() | |
| # allowed for all; quality or export limits handled in frontend/plan logic | |
| out = engine.remove_background(data) | |
| audit((user["email"] if user else "anonymous"), "remove_background", file.filename) | |
| return {"filename": file.filename, "size": len(out), "note": "demo stub: background removed"} | |
| async def remove_watermark(file: UploadFile = File(...), proof: UploadFile = File(...), consent: bool = True, user=Depends(get_current_user)): | |
| # proof should be validated in production (OCR, metadata). consent must be True and user plan LTR 5.0 | |
| if not consent: | |
| raise HTTPException(status_code=400, detail="Consent required") | |
| if user["plan"] != "LTR 5.0": | |
| raise HTTPException(status_code=403, detail="Watermark removal only for LTR 5.0") | |
| data = await file.read() | |
| proofdata = await proof.read() | |
| # TODO: validate proof (stub) | |
| out = engine.remove_watermark(data, user["email"]) | |
| audit(user["email"], "remove_watermark", file.filename) | |
| return {"filename": file.filename, "size": len(out), "note": "demo stub: watermark removed"} | |
| # ------------------------- | |
| # Admin endpoint | |
| # ------------------------- | |
| def admin_audit(user=Depends(get_current_user)): | |
| if not user["is_admin"]: | |
| raise HTTPException(status_code=403, detail="Admin only") | |
| conn = sqlite3.connect(DATABASE) | |
| cur = conn.cursor() | |
| cur.execute("SELECT user_email, action, details, ts FROM audit ORDER BY ts DESC LIMIT 500") | |
| rows = cur.fetchall() | |
| conn.close() | |
| out = [{"user": r[0], "action": r[1], "details": r[2], "ts": r[3]} for r in rows] | |
| return {"audit": out} | |
| def get_config(): | |
| return { | |
| "name": "Lythron", | |
| "versions": ["LTR 2.1", "LTR 2.5", "LTR 5.0"], | |
| "modules": ["Core","Connect","Mirror","Interface","ImageCleanse","WatermarkShield","PlanFlow","Sentinel"] | |
| } | |
| # ------------------------- | |
| # Helpers | |
| # ------------------------- | |
| def json_repr(obj: Any) -> str: | |
| try: | |
| import json | |
| return json.dumps(obj, ensure_ascii=False) | |
| except Exception: | |
| return str(obj) | |
| # ------------------------- | |
| # Run (uvicorn) | |
| # ------------------------- | |
| if __name__ == "__main__": | |
| import uvicorn | |
| print("Starting Lythron full mock on http://0.0.0.0:8000") | |
| uvicorn.run("lythron_full:app", host="0.0.0.0", port=8000, reload=True) | |