Spaces:
Running
Running
| """ | |
| SupplyMind Environment | |
| High-level environment class that ties together the simulation engine, | |
| task registry, and graders. This is the main interface used by the | |
| FastAPI application -- all game logic lives here, not in the HTTP layer. | |
| """ | |
| from __future__ import annotations | |
| import hashlib | |
| from uuid import uuid4 | |
| from typing import Optional | |
| from models import SupplyMindAction, SupplyMindObservation, SupplyMindState | |
| from server.engine.simulation import SimulationEngine | |
| from server.tasks.registry import TaskRegistry, TaskDefinition | |
| from server.graders.grader import EpisodeGrader | |
| class SupplyMindEnvironment: | |
| """ | |
| OpenEnv-compliant environment for supply chain risk management. | |
| Wraps SimulationEngine with episode management, task selection, | |
| and grading. The FastAPI app.py delegates all logic to this class. | |
| Lifecycle: | |
| 1. __init__() -- registers tasks | |
| 2. reset(task_id) -- creates engine, returns initial observation | |
| 3. step(action) -- advances simulation, returns observation | |
| 4. grade() -- scores the completed episode | |
| 5. Repeat from 2 for next episode | |
| """ | |
| def __init__(self) -> None: | |
| """Initialize the environment and register all built-in tasks.""" | |
| TaskRegistry.register_all() | |
| self.engine: Optional[SimulationEngine] = None | |
| self.current_task: Optional[TaskDefinition] = None | |
| self._state: SupplyMindState = SupplyMindState() | |
| self._episode_history: list[tuple[SupplyMindAction, SupplyMindObservation]] = [] | |
| def reset( | |
| self, | |
| task_id: str = "easy_typhoon_response", | |
| seed: Optional[int] = None, | |
| ) -> SupplyMindObservation: | |
| """ | |
| Reset the environment for a new episode. | |
| Args: | |
| task_id: Which task to run. Must be one of the registered task IDs. | |
| seed: Optional episode seed. When provided, enables scenario jitter | |
| for episode variation (different seeds = different episodes). | |
| When None, uses deterministic seed from task_id for backward- | |
| compatible reproducible behavior. | |
| Returns: | |
| Initial observation of the supply chain state. | |
| Raises: | |
| ValueError: If task_id is not registered. | |
| """ | |
| task = TaskRegistry.get(task_id) | |
| self.current_task = task | |
| # Seed logic: | |
| # - No seed provided: derive deterministically from task_id (backward compat) | |
| # - Seed provided: use it directly AND enable scenario jitter | |
| episode_id = str(uuid4()) | |
| if seed is not None: | |
| episode_seed = seed % (2**31) | |
| jitter_enabled = True | |
| else: | |
| episode_seed = int(hashlib.sha256(task_id.encode()).hexdigest(), 16) % (2**31) | |
| jitter_enabled = False | |
| # Create a fresh simulation engine for this episode | |
| self.engine = SimulationEngine( | |
| graph_file=task.graph_file, | |
| disruption_file=task.disruption_file, | |
| budget=task.budget, | |
| max_steps=task.episode_length, | |
| min_episode_days=task.min_episode_days, | |
| seed=episode_seed, | |
| jitter_enabled=jitter_enabled, | |
| ) | |
| # Initialize episode state tracking | |
| self._state = SupplyMindState( | |
| episode_id=episode_id, | |
| step_count=0, | |
| task_id=task.task_id, | |
| task_name=task.name, | |
| task_difficulty=task.difficulty, | |
| total_steps=task.episode_length, | |
| is_done=False, | |
| cumulative_reward=0.0, | |
| ) | |
| # Clear history for the new episode | |
| self._episode_history = [] | |
| # Get the initial observation from the engine | |
| initial_obs = self.engine.get_initial_observation() | |
| return initial_obs | |
| def step(self, action: SupplyMindAction) -> SupplyMindObservation: | |
| """ | |
| Execute one step in the environment. | |
| Args: | |
| action: The action to take this step. | |
| Returns: | |
| Observation after the action is applied and the simulation advances. | |
| Raises: | |
| RuntimeError: If the engine has not been initialized (call reset first). | |
| RuntimeError: If the episode is already done. | |
| """ | |
| if self.engine is None: | |
| raise RuntimeError( | |
| "Environment not initialized. Call reset() before step()." | |
| ) | |
| if self._state.is_done: | |
| # Return the last observation with done=True instead of crashing. | |
| # This is graceful behavior: calling step() after done is a no-op. | |
| from models import SupplyMindObservation, FinancialSnapshot, ActionResult | |
| return SupplyMindObservation( | |
| current_day=self._state.step_count, | |
| days_remaining=0, | |
| financials=FinancialSnapshot( | |
| budget_remaining=self.engine.financial.budget_remaining, | |
| budget_total=self.engine.financial.budget_total, | |
| ), | |
| last_action_result=ActionResult( | |
| success=False, | |
| message="Episode is already done. Call reset() to start a new episode.", | |
| cost=0.0, | |
| ), | |
| reward=0.0, | |
| done=True, | |
| info={"post_done": True}, | |
| ) | |
| # Execute the step in the simulation engine | |
| obs = self.engine.step(action) | |
| # Update episode state | |
| self._state.step_count += 1 | |
| self._state.cumulative_reward += obs.reward | |
| self._state.is_done = obs.done | |
| # Record in history for grading | |
| self._episode_history.append((action, obs)) | |
| return obs | |
| def state(self) -> SupplyMindState: | |
| """Return the current episode state metadata.""" | |
| return self._state | |
| def grade(self) -> dict: | |
| """ | |
| Grade the completed (or in-progress) episode. | |
| Runs the task-specific grader over the full episode history and | |
| returns a detailed score breakdown. | |
| Returns: | |
| Dict with keys: task_id, task_name, difficulty, score, | |
| steps_taken, cumulative_reward, breakdown. | |
| Raises: | |
| RuntimeError: If no episode has been run. | |
| """ | |
| if self.engine is None: | |
| raise RuntimeError( | |
| "No episode to grade. Call reset() and run an episode first." | |
| ) | |
| grader = EpisodeGrader(self._state.task_id) | |
| score = grader.grade(self._episode_history, self.engine) | |
| return { | |
| "task_id": self._state.task_id, | |
| "task_name": self._state.task_name, | |
| "difficulty": self._state.task_difficulty, | |
| "score": score, | |
| "steps_taken": self._state.step_count, | |
| "total_steps": self._state.total_steps, | |
| "cumulative_reward": round(self._state.cumulative_reward, 4), | |
| "is_done": self._state.is_done, | |
| "breakdown": grader.get_breakdown(), | |
| } | |
| def episode_history(self) -> list[tuple[SupplyMindAction, SupplyMindObservation]]: | |
| """Return the episode history (read-only access for testing).""" | |
| return list(self._episode_history) | |