Lythron / AI poweful /backend /lythron_full.py
LythronAI's picture
Upload 10 files
8f95b98 verified
raw
history blame
20.1 kB
# lythron_full.py
import os
import io
import struct
import time
import sqlite3
import secrets
from typing import List, Optional, Dict, Any
from fastapi import FastAPI, UploadFile, File, Depends, HTTPException, status, Request, Response
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from passlib.context import CryptContext
from jose import JWTError, jwt
# -------------------------
# Config / Environment
# -------------------------
SECRET_KEY = os.getenv("LYTHRON_SECRET_KEY", secrets.token_urlsafe(32))
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_SECONDS = 60 * 60 * 24 # 24h
DATABASE = os.getenv("LYTHRON_DB", "lythron_lythron.db")
ALLOWED_ORIGINS = os.getenv("LYTHRON_ALLOWED_ORIGINS", "http://localhost:3000").split(",")
# -------------------------
# DB init (SQLite lightweight)
# -------------------------
def init_db():
conn = sqlite3.connect(DATABASE)
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT UNIQUE,
hashed_password TEXT,
plan TEXT DEFAULT 'LTR 2.1',
prompts_used INTEGER DEFAULT 0,
prompts_reset_ts INTEGER DEFAULT 0,
is_admin INTEGER DEFAULT 0
)""")
cur.execute("""
CREATE TABLE IF NOT EXISTS audit (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_email TEXT,
action TEXT,
details TEXT,
ts INTEGER
)""")
conn.commit()
conn.close()
init_db()
# -------------------------
# Auth utils (bcrypt + JWT)
# -------------------------
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def create_access_token(data: dict, expires_delta: Optional[int] = None):
to_encode = data.copy()
expire = int(time.time()) + (expires_delta or ACCESS_TOKEN_EXPIRE_SECONDS)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def get_user(email: str) -> Optional[Dict[str, Any]]:
conn = sqlite3.connect(DATABASE)
cur = conn.cursor()
cur.execute("SELECT email, hashed_password, plan, prompts_used, prompts_reset_ts, is_admin FROM users WHERE email=?", (email,))
row = cur.fetchone()
conn.close()
if not row:
return None
return {
"email": row[0],
"hashed_password": row[1],
"plan": row[2],
"prompts_used": row[3],
"prompts_reset_ts": row[4],
"is_admin": bool(row[5])
}
def create_user(email: str, password: str, plan: str = "LTR 2.1", is_admin: bool = False):
hp = hash_password(password)
conn = sqlite3.connect(DATABASE)
cur = conn.cursor()
try:
cur.execute("INSERT INTO users (email, hashed_password, plan, prompts_reset_ts) VALUES (?, ?, ?, ?)",
(email, hp, plan, 0))
conn.commit()
except sqlite3.IntegrityError:
conn.close()
raise HTTPException(status_code=400, detail="User already exists")
conn.close()
# -------------------------
# Audit helper
# -------------------------
def audit(user_email: str, action: str, details: str = ""):
conn = sqlite3.connect(DATABASE)
cur = conn.cursor()
cur.execute("INSERT INTO audit (user_email, action, details, ts) VALUES (?, ?, ?, ?)",
(user_email, action, details, int(time.time())))
conn.commit()
conn.close()
# -------------------------
# Plan limits
# -------------------------
PLAN_LIMITS = {
"LTR 2.1": 10,
"LTR 2.5": 20,
"LTR 5.0": 9999999
}
def check_and_consume_prompt(user_email: Optional[str]):
if not user_email:
return True
user = get_user(user_email)
if not user:
raise HTTPException(status_code=401, detail="Unknown user")
limit = PLAN_LIMITS.get(user["plan"], 10)
now = int(time.time())
reset_ts = user["prompts_reset_ts"] or 0
# reset every 24h since last reset
if now > (reset_ts + 24*3600):
conn = sqlite3.connect(DATABASE)
cur = conn.cursor()
cur.execute("UPDATE users SET prompts_used=0, prompts_reset_ts=? WHERE email=?", (now, user["email"]))
conn.commit()
conn.close()
user["prompts_used"] = 0
if user["prompts_used"] >= limit:
raise HTTPException(status_code=429, detail="Prompts limit reached for your plan")
conn = sqlite3.connect(DATABASE)
cur = conn.cursor()
cur.execute("UPDATE users SET prompts_used = prompts_used + 1 WHERE email=?", (user["email"],))
conn.commit()
conn.close()
return True
# -------------------------
# Simple safe mock engine (non-destructive)
# -------------------------
class MockEngine:
def __init__(self):
self.status = {"model": "mock-v1"}
self.memory = []
def generate(self, prompt: str, modality: str = "texto", language: Optional[str] = None) -> str:
# minimal sanitization
text = prompt.replace("\x00", " ")
self.memory.append({"prompt": text, "modality": modality, "ts": int(time.time())})
if language:
safe = text.replace('"', '\\"').replace("\n"," ")
return f"// {language} stub code for: {safe[:120]}"
return f"{self.status.get('model')} response: {text[:400]}"
def analyze_file(self, name: str, data: bytes, content_type: Optional[str] = None) -> Dict[str, Any]:
summary = {"filename": name, "size": len(data)}
extension = name.lower().split(".")[-1] if "." in name else ""
if extension == "png" or (content_type and "png" in (content_type or "").lower()):
if len(data) >= 24 and data.startswith(b"\x89PNG\r\n\x1a\n"):
width = struct.unpack("!I", data[16:20])[0]
height = struct.unpack("!I", data[20:24])[0]
summary["preview"] = f"PNG {width}x{height}"
summary["metadata"] = {"width": width, "height": height, "mode": "RGBA"}
else:
summary["error"] = "Invalid PNG"
elif extension == "pdf" or (content_type and "pdf" in (content_type or "").lower()):
try:
import PyPDF2
reader = PyPDF2.PdfReader(io.BytesIO(data))
pages = len(reader.pages)
text_preview = ""
for p in reader.pages[:2]:
text_preview += (p.extract_text() or "")
summary["preview"] = text_preview[:2048]
summary["metadata"] = {"pages": pages}
except Exception as e:
summary["error"] = f"PDF parse error: {e}"
else:
try:
text = data.decode("utf-8", errors="ignore")
summary["preview"] = text[:1024]
except Exception:
summary["error"] = "Unknown file content"
return summary
def remove_background(self, image_bytes: bytes) -> bytes:
# demo stub: return original bytes. Replace with real model in production.
return image_bytes
def remove_watermark(self, image_bytes: bytes, user_email: str) -> bytes:
# require plan check outside this function
return image_bytes
engine = MockEngine()
# -------------------------
# Pydantic schemas (your original ones merged)
# -------------------------
class TaskRequest(BaseModel):
mode: Literal["core", "connect", "mirror", "interface", "creative", "strategy", "task"]
objective: str
context: Optional[str] = None
class SimulationRequest(BaseModel):
scenario: Literal["proyectos", "finanzas", "estrategia", "diseño"]
variables: List[str]
horizon: int = Field(gt=0, le=52)
class CreativeFusionRequest(BaseModel):
title: str
narrative: str
include_audio: bool = True
include_video: bool = True
class ProfileUpdate(BaseModel):
name: str
subscription: Literal["LTR 2.1", "LTR 2.5", "LTR 5.0"]
palette: Literal["Neón Futurista", "Aurora Boreal", "Minimal Zen", "Ciber Rojo"]
mood: Literal["sereno", "enérgico", "analítico"]
emotional_state: Optional[str] = None
class PredictRequest(BaseModel):
prompt: str
modality: str = "texto"
language: Optional[str] = None
class PredictResponse(BaseModel):
output: str
status: Dict[str, Any]
# -------------------------
# FastAPI app + middleware + security headers
# -------------------------
app = FastAPI(title="Lythron Mock + Secure MVP", version="EVO.3")
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Security headers middleware
@app.middleware("http")
async def add_security_headers(request: Request, call_next):
response: Response = await call_next(request)
# Basic headers
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["Referrer-Policy"] = "no-referrer"
response.headers["Permissions-Policy"] = "geolocation=(), microphone=()"
response.headers["Strict-Transport-Security"] = "max-age=63072000; includeSubDomains; preload"
# You can customize CSP to your frontend needs (start strict)
response.headers["Content-Security-Policy"] = "default-src 'self'"
return response
# -------------------------
# Auth endpoints
# -------------------------
@app.post("/register")
async def register(form: OAuth2PasswordRequestForm = Depends()):
email = form.username
password = form.password
create_user(email, password)
audit(email, "register", "user created")
return {"ok": True}
@app.post("/token")
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = get_user(form_data.username)
if not user or not verify_password(form_data.password, user["hashed_password"]):
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token = create_access_token(data={"sub": user["email"]})
audit(user["email"], "login", "token issued")
return {"access_token": access_token, "token_type": "bearer"}
# Dependency: current user (strict)
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
email: str = payload.get("sub")
if email is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = get_user(email)
if user is None:
raise credentials_exception
return user
# Optional user dependency (None if anonymous)
async def get_current_user_optional(request: Request) -> Optional[Dict[str, Any]]:
auth = request.headers.get("authorization")
if not auth:
return None
token = auth.split("Bearer ")[-1]
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
email: str = payload.get("sub")
if not email:
return None
return get_user(email)
except Exception:
return None
# -------------------------
# Your original endpoints (integrated with checks)
# -------------------------
@app.get("/status")
def status():
return {
"persona": {
"arquetipo": "Mentor Sereno",
"mantra": "Comprende. Mejora. Acompaña.",
"lema": "Precision through understanding."
},
"versiones": {
"Lythron-EVO": "3",
"Lythron-CORE": "experimental",
"Lythron-SYN": "1"
},
"modulos": [
"Lythron.Core",
"Lythron.Connect",
"Lythron.Mirror",
"Lythron.Interface",
"Lythron.ImageCleanse",
"Lythron.WatermarkShield",
"Lythron.PlanFlow",
"Lythron.ContextSense",
"Lythron.SYN.MediaLab",
"Lythron.SimForge",
"Lythron.SYN.Fusion",
"Lythron.Sentinel"
],
"paletas": {
"Neón Futurista": ["#66CCFF", "#1E1E1E", "#FAFAFA"],
"Aurora Boreal": ["#4DA1FF", "#1A2E40", "#E8F7FF"],
"Minimal Zen": ["#FFFFFF", "#111111", "#88C0D0"],
"Ciber Rojo": ["#FF3366", "#0B0B0D", "#F5F5F7"]
}
}
@app.post("/plan")
def plan_actions(payload: TaskRequest, user=Depends(get_current_user_optional)):
user_email = user["email"] if user else "anonymous"
audit(user_email, "plan", payload.objective[:200])
return {
"engine": "Lythron.PlanFlow",
"mode": payload.mode,
"objective": payload.objective,
"context": payload.context or "sin contexto adicional",
"steps": [
{"step": 1, "action": "Diagnóstico Lythron.Core", "explainable": True},
{"step": 2, "action": "Colaboración Lythron.Connect", "explainable": True},
{"step": 3, "action": "Ajuste emocional Lythron.Mirror", "explainable": True},
{"step": 4, "action": "Entrega narrativa Lythron.Interface", "explainable": True}
],
"telemetria": {"tracking": True, "alerts": ["Lythron.Sentinel"]}
}
@app.post("/simulate")
def simulate(payload: SimulationRequest, user=Depends(get_current_user_optional)):
user_email = user["email"] if user else "anonymous"
audit(user_email, "simulate", payload.scenario)
return {
"engine": "Lythron.SimForge",
"scenario": payload.scenario,
"variables": payload.variables,
"horizon_weeks": payload.horizon,
"prediction": {
"baseline": "Escenario conservador",
"optimistic": "Incremento del 18%",
"sensitive": "Riesgo moderado, activar Lythron.Sentinel"
},
"explainability": {
"rationale": "Modelos causales con buffer de experiencia bajo control",
"bias_audit": "En línea via Lythron.Sentinel"
}
}
@app.post("/creative/fusion")
def creative_fusion(payload: CreativeFusionRequest, user=Depends(get_current_user_optional)):
user_email = user["email"] if user else "anonymous"
audit(user_email, "creative_fusion", payload.title)
return {
"engine": "Lythron.SYN.Fusion",
"title": payload.title,
"tracks": {
"texto": {"status": "borrador", "preview": payload.narrative[:120]},
"imagen": {"status": "render", "style": "paleta seleccionada"},
"audio": {"status": "incluido" if payload.include_audio else "omitido", "duration": "90s"},
"video": {"status": "incluido" if payload.include_video else "omitido", "resolution": "1080p"}
},
"collab": {"board_url": "https://mock.lythron/collab/123", "iterations": 3}
}
@app.post("/profile")
def update_profile(payload: ProfileUpdate, user=Depends(get_current_user_optional)):
user_email = user["email"] if user else "anonymous"
audit(user_email, "update_profile", json_repr(payload.dict()))
modules = ["timeline"] if payload.subscription == "LTR 2.1" else ["timeline", "creative-board", "sentinel-alerts"]
return {
"profile": payload.dict(),
"mirror_mode": "Lythron.Mirror+" if payload.emotional_state else "Lythron.Mirror",
"interface_layout": {
"primary": "modos adaptativos",
"modules": modules
},
"history_panel": {
"enabled": True,
"metrics": ["impacto", "tiempo ahorrado", "compliance"],
"gamification": payload.subscription != "LTR 2.1"
}
}
@app.get("/security")
def security_overview(user=Depends(get_current_user_optional)):
user_email = user["email"] if user else "anonymous"
audit(user_email, "security_overview", "")
return {
"compliance": {
"auditoria": "continua",
"traceability": "blockchain experimental",
"permisos": "seguridad adaptativa"
},
"supervision": {
"sentinel": {
"alerts": ["riesgo legal", "sesgo detectado", "uso no autorizado"],
"mode": "autónomo asistido"
},
"watermarkshield": {"requires_consent": True, "status": "enterprise only"}
}
}
# -------------------------
# File endpoints: analyze + remove BG + remove watermark
# -------------------------
@app.post("/analyze")
async def analyze(file: UploadFile = File(...), user=Depends(get_current_user_optional)):
data = await file.read()
res = engine.analyze_file(file.filename, data, file.content_type)
audit((user["email"] if user else "anonymous"), "analyze", file.filename)
return res
@app.post("/remove-background")
async def remove_background(file: UploadFile = File(...), user=Depends(get_current_user_optional)):
data = await file.read()
# allowed for all; quality or export limits handled in frontend/plan logic
out = engine.remove_background(data)
audit((user["email"] if user else "anonymous"), "remove_background", file.filename)
return {"filename": file.filename, "size": len(out), "note": "demo stub: background removed"}
@app.post("/remove-watermark")
async def remove_watermark(file: UploadFile = File(...), proof: UploadFile = File(...), consent: bool = True, user=Depends(get_current_user)):
# proof should be validated in production (OCR, metadata). consent must be True and user plan LTR 5.0
if not consent:
raise HTTPException(status_code=400, detail="Consent required")
if user["plan"] != "LTR 5.0":
raise HTTPException(status_code=403, detail="Watermark removal only for LTR 5.0")
data = await file.read()
proofdata = await proof.read()
# TODO: validate proof (stub)
out = engine.remove_watermark(data, user["email"])
audit(user["email"], "remove_watermark", file.filename)
return {"filename": file.filename, "size": len(out), "note": "demo stub: watermark removed"}
# -------------------------
# Admin endpoint
# -------------------------
@app.get("/admin/audit")
def admin_audit(user=Depends(get_current_user)):
if not user["is_admin"]:
raise HTTPException(status_code=403, detail="Admin only")
conn = sqlite3.connect(DATABASE)
cur = conn.cursor()
cur.execute("SELECT user_email, action, details, ts FROM audit ORDER BY ts DESC LIMIT 500")
rows = cur.fetchall()
conn.close()
out = [{"user": r[0], "action": r[1], "details": r[2], "ts": r[3]} for r in rows]
return {"audit": out}
@app.get("/config")
def get_config():
return {
"name": "Lythron",
"versions": ["LTR 2.1", "LTR 2.5", "LTR 5.0"],
"modules": ["Core","Connect","Mirror","Interface","ImageCleanse","WatermarkShield","PlanFlow","Sentinel"]
}
# -------------------------
# Helpers
# -------------------------
def json_repr(obj: Any) -> str:
try:
import json
return json.dumps(obj, ensure_ascii=False)
except Exception:
return str(obj)
# -------------------------
# Run (uvicorn)
# -------------------------
if __name__ == "__main__":
import uvicorn
print("Starting Lythron full mock on http://0.0.0.0:8000")
uvicorn.run("lythron_full:app", host="0.0.0.0", port=8000, reload=True)