# lythron_full.py import os import io import struct import time import sqlite3 import secrets from typing import List, Optional, Dict, Any from fastapi import FastAPI, UploadFile, File, Depends, HTTPException, status, Request, Response from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field from passlib.context import CryptContext from jose import JWTError, jwt # ------------------------- # Config / Environment # ------------------------- SECRET_KEY = os.getenv("LYTHRON_SECRET_KEY", secrets.token_urlsafe(32)) ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_SECONDS = 60 * 60 * 24 # 24h DATABASE = os.getenv("LYTHRON_DB", "lythron_lythron.db") ALLOWED_ORIGINS = os.getenv("LYTHRON_ALLOWED_ORIGINS", "http://localhost:3000").split(",") # ------------------------- # DB init (SQLite lightweight) # ------------------------- def init_db(): conn = sqlite3.connect(DATABASE) cur = conn.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT UNIQUE, hashed_password TEXT, plan TEXT DEFAULT 'LTR 2.1', prompts_used INTEGER DEFAULT 0, prompts_reset_ts INTEGER DEFAULT 0, is_admin INTEGER DEFAULT 0 )""") cur.execute(""" CREATE TABLE IF NOT EXISTS audit ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_email TEXT, action TEXT, details TEXT, ts INTEGER )""") conn.commit() conn.close() init_db() # ------------------------- # Auth utils (bcrypt + JWT) # ------------------------- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token") def hash_password(password: str) -> str: return pwd_context.hash(password) def verify_password(plain: str, hashed: str) -> bool: return pwd_context.verify(plain, hashed) def create_access_token(data: dict, expires_delta: Optional[int] = None): to_encode = data.copy() expire = int(time.time()) + (expires_delta or ACCESS_TOKEN_EXPIRE_SECONDS) to_encode.update({"exp": expire}) return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) def get_user(email: str) -> Optional[Dict[str, Any]]: conn = sqlite3.connect(DATABASE) cur = conn.cursor() cur.execute("SELECT email, hashed_password, plan, prompts_used, prompts_reset_ts, is_admin FROM users WHERE email=?", (email,)) row = cur.fetchone() conn.close() if not row: return None return { "email": row[0], "hashed_password": row[1], "plan": row[2], "prompts_used": row[3], "prompts_reset_ts": row[4], "is_admin": bool(row[5]) } def create_user(email: str, password: str, plan: str = "LTR 2.1", is_admin: bool = False): hp = hash_password(password) conn = sqlite3.connect(DATABASE) cur = conn.cursor() try: cur.execute("INSERT INTO users (email, hashed_password, plan, prompts_reset_ts) VALUES (?, ?, ?, ?)", (email, hp, plan, 0)) conn.commit() except sqlite3.IntegrityError: conn.close() raise HTTPException(status_code=400, detail="User already exists") conn.close() # ------------------------- # Audit helper # ------------------------- def audit(user_email: str, action: str, details: str = ""): conn = sqlite3.connect(DATABASE) cur = conn.cursor() cur.execute("INSERT INTO audit (user_email, action, details, ts) VALUES (?, ?, ?, ?)", (user_email, action, details, int(time.time()))) conn.commit() conn.close() # ------------------------- # Plan limits # ------------------------- PLAN_LIMITS = { "LTR 2.1": 10, "LTR 2.5": 20, "LTR 5.0": 9999999 } def check_and_consume_prompt(user_email: Optional[str]): if not user_email: return True user = get_user(user_email) if not user: raise HTTPException(status_code=401, detail="Unknown user") limit = PLAN_LIMITS.get(user["plan"], 10) now = int(time.time()) reset_ts = user["prompts_reset_ts"] or 0 # reset every 24h since last reset if now > (reset_ts + 24*3600): conn = sqlite3.connect(DATABASE) cur = conn.cursor() cur.execute("UPDATE users SET prompts_used=0, prompts_reset_ts=? WHERE email=?", (now, user["email"])) conn.commit() conn.close() user["prompts_used"] = 0 if user["prompts_used"] >= limit: raise HTTPException(status_code=429, detail="Prompts limit reached for your plan") conn = sqlite3.connect(DATABASE) cur = conn.cursor() cur.execute("UPDATE users SET prompts_used = prompts_used + 1 WHERE email=?", (user["email"],)) conn.commit() conn.close() return True # ------------------------- # Simple safe mock engine (non-destructive) # ------------------------- class MockEngine: def __init__(self): self.status = {"model": "mock-v1"} self.memory = [] def generate(self, prompt: str, modality: str = "texto", language: Optional[str] = None) -> str: # minimal sanitization text = prompt.replace("\x00", " ") self.memory.append({"prompt": text, "modality": modality, "ts": int(time.time())}) if language: safe = text.replace('"', '\\"').replace("\n"," ") return f"// {language} stub code for: {safe[:120]}" return f"{self.status.get('model')} response: {text[:400]}" def analyze_file(self, name: str, data: bytes, content_type: Optional[str] = None) -> Dict[str, Any]: summary = {"filename": name, "size": len(data)} extension = name.lower().split(".")[-1] if "." in name else "" if extension == "png" or (content_type and "png" in (content_type or "").lower()): if len(data) >= 24 and data.startswith(b"\x89PNG\r\n\x1a\n"): width = struct.unpack("!I", data[16:20])[0] height = struct.unpack("!I", data[20:24])[0] summary["preview"] = f"PNG {width}x{height}" summary["metadata"] = {"width": width, "height": height, "mode": "RGBA"} else: summary["error"] = "Invalid PNG" elif extension == "pdf" or (content_type and "pdf" in (content_type or "").lower()): try: import PyPDF2 reader = PyPDF2.PdfReader(io.BytesIO(data)) pages = len(reader.pages) text_preview = "" for p in reader.pages[:2]: text_preview += (p.extract_text() or "") summary["preview"] = text_preview[:2048] summary["metadata"] = {"pages": pages} except Exception as e: summary["error"] = f"PDF parse error: {e}" else: try: text = data.decode("utf-8", errors="ignore") summary["preview"] = text[:1024] except Exception: summary["error"] = "Unknown file content" return summary def remove_background(self, image_bytes: bytes) -> bytes: # demo stub: return original bytes. Replace with real model in production. return image_bytes def remove_watermark(self, image_bytes: bytes, user_email: str) -> bytes: # require plan check outside this function return image_bytes engine = MockEngine() # ------------------------- # Pydantic schemas (your original ones merged) # ------------------------- class TaskRequest(BaseModel): mode: Literal["core", "connect", "mirror", "interface", "creative", "strategy", "task"] objective: str context: Optional[str] = None class SimulationRequest(BaseModel): scenario: Literal["proyectos", "finanzas", "estrategia", "diseño"] variables: List[str] horizon: int = Field(gt=0, le=52) class CreativeFusionRequest(BaseModel): title: str narrative: str include_audio: bool = True include_video: bool = True class ProfileUpdate(BaseModel): name: str subscription: Literal["LTR 2.1", "LTR 2.5", "LTR 5.0"] palette: Literal["Neón Futurista", "Aurora Boreal", "Minimal Zen", "Ciber Rojo"] mood: Literal["sereno", "enérgico", "analítico"] emotional_state: Optional[str] = None class PredictRequest(BaseModel): prompt: str modality: str = "texto" language: Optional[str] = None class PredictResponse(BaseModel): output: str status: Dict[str, Any] # ------------------------- # FastAPI app + middleware + security headers # ------------------------- app = FastAPI(title="Lythron Mock + Secure MVP", version="EVO.3") # CORS app.add_middleware( CORSMiddleware, allow_origins=ALLOWED_ORIGINS, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Security headers middleware @app.middleware("http") async def add_security_headers(request: Request, call_next): response: Response = await call_next(request) # Basic headers response.headers["X-Frame-Options"] = "DENY" response.headers["X-Content-Type-Options"] = "nosniff" response.headers["Referrer-Policy"] = "no-referrer" response.headers["Permissions-Policy"] = "geolocation=(), microphone=()" response.headers["Strict-Transport-Security"] = "max-age=63072000; includeSubDomains; preload" # You can customize CSP to your frontend needs (start strict) response.headers["Content-Security-Policy"] = "default-src 'self'" return response # ------------------------- # Auth endpoints # ------------------------- @app.post("/register") async def register(form: OAuth2PasswordRequestForm = Depends()): email = form.username password = form.password create_user(email, password) audit(email, "register", "user created") return {"ok": True} @app.post("/token") async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): user = get_user(form_data.username) if not user or not verify_password(form_data.password, user["hashed_password"]): raise HTTPException(status_code=400, detail="Incorrect username or password") access_token = create_access_token(data={"sub": user["email"]}) audit(user["email"], "login", "token issued") return {"access_token": access_token, "token_type": "bearer"} # Dependency: current user (strict) async def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) email: str = payload.get("sub") if email is None: raise credentials_exception except JWTError: raise credentials_exception user = get_user(email) if user is None: raise credentials_exception return user # Optional user dependency (None if anonymous) async def get_current_user_optional(request: Request) -> Optional[Dict[str, Any]]: auth = request.headers.get("authorization") if not auth: return None token = auth.split("Bearer ")[-1] try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) email: str = payload.get("sub") if not email: return None return get_user(email) except Exception: return None # ------------------------- # Your original endpoints (integrated with checks) # ------------------------- @app.get("/status") def status(): return { "persona": { "arquetipo": "Mentor Sereno", "mantra": "Comprende. Mejora. Acompaña.", "lema": "Precision through understanding." }, "versiones": { "Lythron-EVO": "3", "Lythron-CORE": "experimental", "Lythron-SYN": "1" }, "modulos": [ "Lythron.Core", "Lythron.Connect", "Lythron.Mirror", "Lythron.Interface", "Lythron.ImageCleanse", "Lythron.WatermarkShield", "Lythron.PlanFlow", "Lythron.ContextSense", "Lythron.SYN.MediaLab", "Lythron.SimForge", "Lythron.SYN.Fusion", "Lythron.Sentinel" ], "paletas": { "Neón Futurista": ["#66CCFF", "#1E1E1E", "#FAFAFA"], "Aurora Boreal": ["#4DA1FF", "#1A2E40", "#E8F7FF"], "Minimal Zen": ["#FFFFFF", "#111111", "#88C0D0"], "Ciber Rojo": ["#FF3366", "#0B0B0D", "#F5F5F7"] } } @app.post("/plan") def plan_actions(payload: TaskRequest, user=Depends(get_current_user_optional)): user_email = user["email"] if user else "anonymous" audit(user_email, "plan", payload.objective[:200]) return { "engine": "Lythron.PlanFlow", "mode": payload.mode, "objective": payload.objective, "context": payload.context or "sin contexto adicional", "steps": [ {"step": 1, "action": "Diagnóstico Lythron.Core", "explainable": True}, {"step": 2, "action": "Colaboración Lythron.Connect", "explainable": True}, {"step": 3, "action": "Ajuste emocional Lythron.Mirror", "explainable": True}, {"step": 4, "action": "Entrega narrativa Lythron.Interface", "explainable": True} ], "telemetria": {"tracking": True, "alerts": ["Lythron.Sentinel"]} } @app.post("/simulate") def simulate(payload: SimulationRequest, user=Depends(get_current_user_optional)): user_email = user["email"] if user else "anonymous" audit(user_email, "simulate", payload.scenario) return { "engine": "Lythron.SimForge", "scenario": payload.scenario, "variables": payload.variables, "horizon_weeks": payload.horizon, "prediction": { "baseline": "Escenario conservador", "optimistic": "Incremento del 18%", "sensitive": "Riesgo moderado, activar Lythron.Sentinel" }, "explainability": { "rationale": "Modelos causales con buffer de experiencia bajo control", "bias_audit": "En línea via Lythron.Sentinel" } } @app.post("/creative/fusion") def creative_fusion(payload: CreativeFusionRequest, user=Depends(get_current_user_optional)): user_email = user["email"] if user else "anonymous" audit(user_email, "creative_fusion", payload.title) return { "engine": "Lythron.SYN.Fusion", "title": payload.title, "tracks": { "texto": {"status": "borrador", "preview": payload.narrative[:120]}, "imagen": {"status": "render", "style": "paleta seleccionada"}, "audio": {"status": "incluido" if payload.include_audio else "omitido", "duration": "90s"}, "video": {"status": "incluido" if payload.include_video else "omitido", "resolution": "1080p"} }, "collab": {"board_url": "https://mock.lythron/collab/123", "iterations": 3} } @app.post("/profile") def update_profile(payload: ProfileUpdate, user=Depends(get_current_user_optional)): user_email = user["email"] if user else "anonymous" audit(user_email, "update_profile", json_repr(payload.dict())) modules = ["timeline"] if payload.subscription == "LTR 2.1" else ["timeline", "creative-board", "sentinel-alerts"] return { "profile": payload.dict(), "mirror_mode": "Lythron.Mirror+" if payload.emotional_state else "Lythron.Mirror", "interface_layout": { "primary": "modos adaptativos", "modules": modules }, "history_panel": { "enabled": True, "metrics": ["impacto", "tiempo ahorrado", "compliance"], "gamification": payload.subscription != "LTR 2.1" } } @app.get("/security") def security_overview(user=Depends(get_current_user_optional)): user_email = user["email"] if user else "anonymous" audit(user_email, "security_overview", "") return { "compliance": { "auditoria": "continua", "traceability": "blockchain experimental", "permisos": "seguridad adaptativa" }, "supervision": { "sentinel": { "alerts": ["riesgo legal", "sesgo detectado", "uso no autorizado"], "mode": "autónomo asistido" }, "watermarkshield": {"requires_consent": True, "status": "enterprise only"} } } # ------------------------- # File endpoints: analyze + remove BG + remove watermark # ------------------------- @app.post("/analyze") async def analyze(file: UploadFile = File(...), user=Depends(get_current_user_optional)): data = await file.read() res = engine.analyze_file(file.filename, data, file.content_type) audit((user["email"] if user else "anonymous"), "analyze", file.filename) return res @app.post("/remove-background") async def remove_background(file: UploadFile = File(...), user=Depends(get_current_user_optional)): data = await file.read() # allowed for all; quality or export limits handled in frontend/plan logic out = engine.remove_background(data) audit((user["email"] if user else "anonymous"), "remove_background", file.filename) return {"filename": file.filename, "size": len(out), "note": "demo stub: background removed"} @app.post("/remove-watermark") async def remove_watermark(file: UploadFile = File(...), proof: UploadFile = File(...), consent: bool = True, user=Depends(get_current_user)): # proof should be validated in production (OCR, metadata). consent must be True and user plan LTR 5.0 if not consent: raise HTTPException(status_code=400, detail="Consent required") if user["plan"] != "LTR 5.0": raise HTTPException(status_code=403, detail="Watermark removal only for LTR 5.0") data = await file.read() proofdata = await proof.read() # TODO: validate proof (stub) out = engine.remove_watermark(data, user["email"]) audit(user["email"], "remove_watermark", file.filename) return {"filename": file.filename, "size": len(out), "note": "demo stub: watermark removed"} # ------------------------- # Admin endpoint # ------------------------- @app.get("/admin/audit") def admin_audit(user=Depends(get_current_user)): if not user["is_admin"]: raise HTTPException(status_code=403, detail="Admin only") conn = sqlite3.connect(DATABASE) cur = conn.cursor() cur.execute("SELECT user_email, action, details, ts FROM audit ORDER BY ts DESC LIMIT 500") rows = cur.fetchall() conn.close() out = [{"user": r[0], "action": r[1], "details": r[2], "ts": r[3]} for r in rows] return {"audit": out} @app.get("/config") def get_config(): return { "name": "Lythron", "versions": ["LTR 2.1", "LTR 2.5", "LTR 5.0"], "modules": ["Core","Connect","Mirror","Interface","ImageCleanse","WatermarkShield","PlanFlow","Sentinel"] } # ------------------------- # Helpers # ------------------------- def json_repr(obj: Any) -> str: try: import json return json.dumps(obj, ensure_ascii=False) except Exception: return str(obj) # ------------------------- # Run (uvicorn) # ------------------------- if __name__ == "__main__": import uvicorn print("Starting Lythron full mock on http://0.0.0.0:8000") uvicorn.run("lythron_full:app", host="0.0.0.0", port=8000, reload=True)