BuddyMath / memory_service.py
dotandru's picture
Fix: Clean production deployment with sse-starlette
9d29c62
raw
history blame
5.78 kB
# memory_service.py
"""
Buddy Math - Memory Service (Pinecone Edition)
==============================================
ื’ืจืกื” v2.1: ืฉื™ืคื•ืจ ืžื ื’ื ื•ืŸ ื–ื™ื”ื•ื™ ื”ื“ืžื™ื•ืŸ.
ื‘ืžืงื•ื ืœื‘ื“ื•ืง ืกื“ืจ ืžื™ืœื™ื (ืฉื ืฉื‘ืจ ื‘-OCR), ื‘ื•ื“ืงื™ื ื—ืคื™ืคืช ืžื™ืœื™ื (Jaccard).
"""
import os
import json
import logging
import uuid
import re
from typing import Optional, Dict
from sentence_transformers import SentenceTransformer
from pinecone import Pinecone
logger = logging.getLogger("MemoryService")
class MemoryService:
def __init__(self):
self.api_key = os.environ.get("PINECONE_API_KEY")
if not self.api_key:
logger.warning("โš ๏ธ PINECONE_API_KEY not found! Memory will be disabled.")
self.index = None
return
try:
self.pc = Pinecone(api_key=self.api_key)
self.index_name = "buddy-math"
self.index = self.pc.Index(self.index_name)
logger.info("โณ Loading embedding model...")
self.embedder = SentenceTransformer('all-MiniLM-L6-v2')
logger.info("โœ… Brain initialized (Pinecone + MiniLM)")
except Exception as e:
logger.error(f"โŒ Failed to init Pinecone: {e}")
self.index = None
def find_similar_solution(self, problem_text: str, vector_threshold: float = 0.85) -> Optional[Dict]:
"""
ืžื—ืคืฉ ืคืชืจื•ืŸ ื‘ื–ื™ื›ืจื•ืŸ.
ืžื‘ืฆืข ืื™ืžื•ืช ื›ืคื•ืœ: ื•ืงื˜ื•ืจื™ (ืžืฉืžืขื•ืช) + ื—ืคื™ืคืช ืžื™ืœื™ื (ืชื•ื›ืŸ).
"""
if not self.index: return None
try:
# 1. ื—ื™ืคื•ืฉ ื•ืงื˜ื•ืจื™ (ืžื”ื™ืจ)
vector = self.embedder.encode(problem_text).tolist()
results = self.index.query(
vector=vector,
top_k=1,
include_metadata=True
)
if not results['matches']:
return None
match = results['matches'][0]
vector_score = match['score']
# ื‘ื“ื™ืงืช ืกืฃ ื•ืงื˜ื•ืจื™
if vector_score < vector_threshold:
return None
# ืฉืœื™ืคืช ื”ื˜ืงืกื˜ ื”ืžืงื•ืจื™ ืžื”ื–ื™ื›ืจื•ืŸ
cached_text = match['metadata'].get('text', '')
# 2. ื‘ื“ื™ืงืช ื“ืžื™ื•ืŸ ืžืฉื•ืคืจืช (Jaccard Similarity)
# ื‘ื•ื“ืงื™ื ื›ืžื” ืžื™ืœื™ื ืžืฉื•ืชืคื•ืช ื™ืฉ, ื‘ืœื™ ืงืฉืจ ืœืกื“ืจ
text_similarity = self._calculate_jaccard_similarity(problem_text, cached_text)
logger.info(f"๐Ÿง  Brain Check: Vector={vector_score:.3f}, Jaccard={text_similarity:.3f}")
# ื”ื•ืจื“ื ื• ืืช ื”ืจืฃ ืœ-30% ื—ืคื™ืคื” (ืžืกืคื™ืง ืœื–ื™ื”ื•ื™ ืื•ืชื” ืฉืืœื” ื‘-OCR ืžืฉื•ื‘ืฉ)
if text_similarity < 0.3:
logger.warning("โš ๏ธ High vector score but low text overlap. Ignoring.")
return None
# ืฉืœื™ืคืช ื”-JSON
solution_json = match['metadata'].get('solution_json')
if solution_json:
logger.info("๐Ÿง  Brain HIT! Verified match found.")
return json.loads(solution_json)
return None
except Exception as e:
logger.error(f"Memory search failed: {e}")
return None
def learn_solution(self, problem_text: str, solution_data: dict):
"""ืฉื•ืžืจ ืคืชืจื•ืŸ ื—ื“ืฉ"""
if not self.index: return
try:
clean_text = problem_text.strip()
if len(clean_text) < 10: return
vector = self.embedder.encode(clean_text).tolist()
json_str = json.dumps(solution_data, ensure_ascii=False)
# ื”ื’ื ื”: Pinecone ืžื’ื‘ื™ืœ Metadata ืœ-40KB
if len(json_str.encode('utf-8')) > 38000:
logger.warning("โš ๏ธ Solution too big for memory. Skipping save.")
return
metadata = {
"text": clean_text[:1000],
"topic": solution_data.get("meta", {}).get("topic", "unknown"),
"solution_json": json_str
}
self.index.upsert(vectors=[{
"id": str(uuid.uuid4()),
"values": vector,
"metadata": metadata
}])
logger.info("๐Ÿง  Brain LEARNED and saved to Cloud!")
except Exception as e:
logger.error(f"Memory learn failed: {e}")
def _calculate_jaccard_similarity(self, text1: str, text2: str) -> float:
"""
ืžื—ืฉื‘ ื“ืžื™ื•ืŸ ืœืคื™ ื—ืคื™ืคืช ืžื™ืœื™ื (ืžืชืขืœื ืžืกื“ืจ ื”ืžื™ืœื™ื).
ื˜ื•ื‘ ืœ-OCR ืขื‘ืจื™ืช/ืื ื’ืœื™ืช ืฉืžืชื”ืคืš.
"""
# ื ื™ืงื•ื™ ื•ืคื™ืจื•ืง ืœืžื™ืœื™ื ื™ื™ื—ื•ื“ื™ื•ืช (Tokens)
tokens1 = self._tokenize(text1)
tokens2 = self._tokenize(text2)
if not tokens1 or not tokens2:
return 0.0
# ื—ื™ืชื•ืš (ืžื™ืœื™ื ืžืฉื•ืชืคื•ืช) ื—ืœืงื™ ืื™ื—ื•ื“ (ื›ืœ ื”ืžื™ืœื™ื)
intersection = len(tokens1.intersection(tokens2))
union = len(tokens1.union(tokens2))
return intersection / union
def _tokenize(self, text: str) -> set:
"""ืžืคืจืง ื˜ืงืกื˜ ืœืกื˜ ืฉืœ ืžื™ืœื™ื ื ืงื™ื•ืช"""
# ืžืฉืื™ืจ ืจืง ืื•ืชื™ื•ืช ื•ืžืกืคืจื™ื
clean = re.sub(r'[^\w\s]', '', text)
# ืคื™ืจื•ืง ืœืžื™ืœื™ื
words = clean.lower().split()
# ืžืกื ืŸ ืžื™ืœื™ื ืงืฆืจื•ืช ืžื“ื™ (ื›ืžื• "ืฉืœ", "ืืช")
return {w for w in words if len(w) > 1}