| import topic_taxonomy |
| import micro_prompts |
| import validators |
| import asyncio |
| import json |
| import cost_tracker |
| import re |
| import logging |
| from utils.safe_json import safe_extract_json |
| import prompts |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class StrategyManager: |
| def __init__(self, llm_model): |
| self.llm = llm_model |
| self.max_retries = 2 |
| |
| async def solve_with_strategy(self, problem_text, data_anchor, grade="10", image_data=None, image_data_list=None, parent_category=None, ambiguity_warning=False, intent=None, intent_contract=None, proof_graph_steps_count=1, student_name="ื ืกืื", student_gender="M"): |
| """ |
| V3.1.2: Added support for adaptive failure (Soft Recovery). |
| V5.8.2: Dynamic Token Budget added via proof_graph_steps_count. |
| """ |
| self.grade = grade |
| topic_id = topic_taxonomy.detect_topic(problem_text, grade) |
| detected_category = topic_taxonomy.get_topic_category(topic_id) |
| category = detected_category if detected_category != "GENERAL" else (parent_category or "GENERAL") |
| |
| image_pages = [] |
| if image_data_list: |
| |
| for i_data in image_data_list: |
| image_pages.append({"mime_type": "image/jpeg", "data": i_data}) |
| elif image_data: |
| try: |
| from ocr_strip_engine import paginate_image |
| image_pages = paginate_image(image_data) |
| except Exception as e: |
| logger.exception("CRITICAL FLOW ERROR") |
| logger.error(f"Pagination failed: {e}") |
| |
| |
| try: |
| strategy_config = self._get_strategy(topic_id) |
| prompt = self._build_prompt(topic_id, data_anchor, strategy_config, problem_text, category, grade, student_name, student_gender) |
| |
| |
| if intent_contract and intent_contract.get("status") != "unconstrained": |
| lockdown_note = f""" |
| ๐ข [V4.2] PEDAGOGICAL LOCKDOWN: |
| - Intent: {intent} |
| - Max Variables: {intent_contract.get('max_variables', 'N/A')} |
| - Forbidden: {", ".join(intent_contract.get('forbidden_strategies', []))} |
| - REQUIRED: Use ONLY basic algebraic steps. DO NOT use advanced functions or calculus. |
| """ |
| prompt = lockdown_note + "\n" + prompt |
| |
| |
| |
| if data_anchor.get("function_equations") and not image_data and not image_data_list: |
| prompt = f"TARGET FUNCTION: {data_anchor['function_equations'][0]}\n\n" + prompt |
| |
| |
| if ambiguity_warning: |
| soft_recovery_note = """ |
| System Prompt Override: ืืืกืฃ ืืชืืืืช ืืืกืืจ ืฉืื ืืช ืืืขืจื ืืืื ืื ืืื ืืืืืืชืืช: 'ืืื! ืืฆืืืื ืืื ืงืฆืช ืื ืืจืืจ, ืืื ื ืจืื ืื ืฉืืชืืืื ืช ืืืืืื ืืื. ืื ืืชืืืื ืช ืืืฉืื ืืืจ, ืคืฉืื ืฆืื ืฉืื ืืงืจืื!' - ืืืฉื ืืืกืืืจ ืืช ืืฉืืืื ืืจืืื. |
| """ |
| prompt = soft_recovery_note + "\n" + prompt |
|
|
| prompt = (prompt or "") + f"\n\n๐ฏ MISSION: Solve ONLY part: {problem_text}" |
| llm_response = await self._call_llm(prompt, image_data, category, image_pages, proof_graph_steps_count) |
| |
| |
| if isinstance(llm_response, list): |
| llm_response = {"steps": llm_response, "is_valid": True, "confidence_score": 1.0} |
| |
| return { |
| "topic": topic_id, |
| "category": category, |
| "llm_response": llm_response, |
| "validated": llm_response.get("is_valid", True), |
| "confidence": llm_response.get("confidence_score", 1.0) |
| } |
| except asyncio.CancelledError: |
| logger.warning("V4.3 Single Pass Cancelled (Client Disconnected)") |
| return { |
| "topic": topic_id, |
| "category": category, |
| "llm_response": {"solution_markdown": "ืืืื ืขื ืืื ืืืฉืชืืฉ", "is_valid": False, "confidence_score": 0.0}, |
| "validated": False |
| } |
| except Exception as e: |
| logger.error(f"V4.3 Single Pass Failed: {e}") |
| return { |
| "topic": topic_id, |
| "category": category, |
| "llm_response": {"solution_markdown": "ืืืืจื ื ืชืงืื ืืงืืฉื ืืื ื.", "is_valid": False, "confidence_score": 0.0}, |
| "validated": False |
| } |
|
|
| def _get_strategy(self, topic_id): |
| return {"has_micro_prompt": topic_id in micro_prompts.MICRO_PROMPTS, "has_validator": topic_id in validators.VALIDATORS} |
|
|
| def _build_prompt(self, topic_id, data_anchor, strategy_config, problem_text, category, grade, student_name, student_gender): |
| |
| specialist = prompts.get_specialist_prompt( |
| category=category, |
| problem_text=problem_text, |
| solver_hint="Hybrid Navigation Mode", |
| grade=grade, |
| student_name=student_name, |
| student_gender=student_gender, |
| data_anchor=data_anchor |
| ) |
|
|
| if strategy_config["has_micro_prompt"]: |
| try: |
| focus = micro_prompts.get_micro_prompt(topic_id, data_anchor, grade=self.grade) |
| return f"{focus}\n\n{specialist}" |
| except: |
| pass |
| |
| return specialist |
| return micro_prompts.get_general_prompt(data_anchor) |
|
|
| async def _call_llm(self, prompt, image_data, category, image_pages, proof_graph_steps_count=1): |
| |
| v430_instruction = prompts.get_master_prompt_v430(category=category, problem_text=prompt) |
| prompt = (prompt or "") + (v430_instruction or "") |
| |
| from google.generativeai.types import GenerationConfig |
| import asyncio |
| |
| |
| |
| max_tokens = 8192 |
| logger.info(f"๐ช [BUDGET] Dynamic Token Budget allocated: {max_tokens} tokens for {proof_graph_steps_count} steps.") |
| |
| gen_config = GenerationConfig( |
| temperature=0.0, |
| top_p=0.1, |
| top_k=1, |
| max_output_tokens=max_tokens, |
| response_mime_type="application/json" |
| ) |
| |
| try: |
| if image_pages: |
| payload = [f"Images are sequential. Image 1 is Header.\n{prompt}"] + image_pages |
| logger.info(f"๐ง [TRACE] PROMPT SENT TO LLM: {payload[0]}") |
| |
| response = await asyncio.wait_for( |
| self.llm.generate_content_async(payload, generation_config=gen_config), |
| timeout=60.0 |
| ) |
| else: |
| logger.info(f"๐ง [TRACE] PROMPT SENT TO LLM: {prompt}") |
| response = await asyncio.wait_for( |
| self.llm.generate_content_async(prompt, generation_config=gen_config), |
| timeout=30.0 |
| ) |
| except asyncio.CancelledError: |
| |
| logger.warning("๐ [V8.5] LLM Call actively CANCELLED due to client disconnect.") |
| |
| raise |
| except Exception as e: |
| logger.error(f"Error in _call_llm generation: {e}") |
| raise |
|
|
| raw_text = response.text |
| logger.info(f"๐ฆ [TRACE] RAW RESPONSE RECEIVED: {raw_text}") |
| |
| |
| usage = getattr(response, 'usage_metadata', None) |
| usage_dict = {} |
| if usage: |
| usage_dict = { |
| "prompt_token_count": getattr(usage, 'prompt_token_count', 0), |
| "candidates_token_count": getattr(usage, 'candidates_token_count', 0), |
| "total_token_count": getattr(usage, 'total_token_count', 0) |
| } |
| |
| parsed = self._parse_v4_json(raw_text) |
| return {**parsed, "usage_metadata": usage_dict} |
|
|
| def _parse_v4_json(self, raw_text): |
| |
| if "{" not in raw_text and "[" not in raw_text: |
| return {"solution_markdown": "ืฉืืืืช ืืื ื", "is_valid": False, "confidence_score": 0.1} |
| result = safe_extract_json(raw_text, caller="STRATEGY_MANAGER") |
| if isinstance(result, dict) and result.get("logic_error"): |
| return {"solution_markdown": "ืืืืจื ื ืืฉืื ืืคืืจืืฉ ืืคืชืจืื", "is_valid": False, "confidence_score": 0.1} |
| return result |
|
|
| def _validate(self, topic_id, llm_response, data_anchor): return {"valid": True, "error": None} |
|
|
| async def generate_raw(self, system_prompt: str, user_prompt: str) -> str: |
| """ |
| V6.1: Direct raw LLM call for ProposalEngine. Unconstrained output. |
| """ |
| from google.generativeai.types import GenerationConfig |
| import asyncio |
| |
| full_prompt = f"{system_prompt}\n\nUser Request: {user_prompt}" |
| |
| gen_config = GenerationConfig( |
| temperature=0.2, |
| top_p=0.8, |
| ) |
| |
| logger.info(f"๐ง [PROPOSAL-GATEWAY] Sending PROMPT to LLM.") |
| try: |
| response = await asyncio.wait_for( |
| asyncio.to_thread(self.llm.generate_content, full_prompt, generation_config=gen_config), |
| timeout=45.0 |
| ) |
| return getattr(response, 'text', '') |
| except Exception as e: |
| logger.error(f"โ [PROPOSAL-GATEWAY] LLM Request failed: {e}") |
| raise e |
|
|