| import gradio as gr |
| import os |
| import yaml |
| import json |
| import random |
| import re |
| from datasets import load_dataset, get_dataset_config_names, get_dataset_split_names |
| from openai import OpenAI |
| from openevolve import run_evolution |
| from typing import Dict, List, Tuple, Optional |
| import tempfile |
| import shutil |
| import requests |
| import glob |
|
|
| |
| MODELS = [ |
| "google/gemini-2.5-flash-lite", |
| ] |
|
|
|
|
| def extract_answer(text: str) -> str: |
| """Extract the core answer from a string. |
| |
| Handles: |
| - GSM8K format: "reasoning...\n#### 2280" -> "2280" |
| - Numeric labels: "0" or "1" -> "0" or "1" |
| - Plain text answers |
| """ |
| text = str(text).strip() |
| |
| if "####" in text: |
| answer = text.split("####")[-1].strip() |
| |
| answer = answer.replace(",", "") |
| return answer |
| return text |
|
|
|
|
| def check_answer(prediction: str, target: str) -> bool: |
| """Check if prediction matches target using flexible matching.""" |
| target_answer = extract_answer(target).lower().strip() |
| pred_lower = prediction.lower().strip() |
|
|
| |
| if target_answer in ("true", "false"): |
| pred_start = pred_lower[:200] |
| has_yes = any(w in pred_start for w in ("true", "yes")) |
| has_no = any(w in pred_start for w in ("false", "no")) |
| if target_answer == "true" and has_yes and not has_no: |
| return True |
| if target_answer == "false" and has_no and not has_yes: |
| return True |
| return False |
|
|
| |
| if target_answer in pred_lower: |
| return True |
|
|
| |
| try: |
| target_num = float(target_answer) |
| numbers = re.findall(r'-?[\d,]+\.?\d*', pred_lower) |
| for n in numbers: |
| try: |
| if float(n.replace(",", "")) == target_num: |
| return True |
| except ValueError: |
| continue |
| except ValueError: |
| pass |
|
|
| |
| if target_answer in ("0", "1"): |
| has_positive = "positive" in pred_lower[:200] |
| has_negative = "negative" in pred_lower[:200] |
| if target_answer == "1" and has_positive and not has_negative: |
| return True |
| if target_answer == "0" and has_negative and not has_positive: |
| return True |
|
|
| return False |
|
|
|
|
| def validate_dataset(dataset_name: str, split: str, input_field: str, target_field: str) -> Tuple[bool, str]: |
| """ |
| Validate that the dataset exists and has the required fields. |
| |
| Returns: |
| Tuple of (is_valid, error_message) |
| """ |
| try: |
| |
| if not dataset_name or dataset_name.strip() == "": |
| return False, "❌ Dataset name cannot be empty" |
|
|
| dataset_name = dataset_name.strip() |
|
|
| |
| hf_token = os.environ.get("HF_TOKEN", None) |
| headers = {} |
| if hf_token: |
| headers["Authorization"] = f"Bearer {hf_token}" |
|
|
| |
| api_url = f"https://huggingface.co/api/datasets/{dataset_name}" |
| response = requests.get(api_url, headers=headers, timeout=10) |
|
|
| if response.status_code == 404: |
| return False, f"❌ Dataset '{dataset_name}' not found on HuggingFace Hub. Please use the full dataset name (e.g., 'stanfordnlp/imdb' or 'gsm8k')" |
| elif response.status_code != 200: |
| |
| print(f"Warning: Could not verify dataset via API (status {response.status_code}), attempting to load...") |
|
|
| |
| print(f"Loading dataset {dataset_name} with split {split}...") |
|
|
| |
| try: |
| available_splits = get_dataset_split_names(dataset_name) |
| if split not in available_splits: |
| return False, f"❌ Split '{split}' not found. Available splits: {', '.join(available_splits)}" |
| except Exception as e: |
| print(f"Could not get split names: {e}. Will try to load anyway...") |
|
|
| |
| |
| try: |
| dataset = load_dataset(dataset_name, split=split, streaming=True) |
| except ValueError as e: |
| |
| if "config" in str(e).lower() or "Config name is missing" in str(e): |
| |
| default_config = "main" |
| if dataset_name.lower() == "glue": |
| default_config = "sst2" |
|
|
| print(f"Dataset requires config, trying with '{default_config}' config...") |
| try: |
| dataset = load_dataset(dataset_name, default_config, split=split, streaming=True) |
| except: |
| |
| raise e |
| else: |
| raise |
|
|
| |
| first_example = next(iter(dataset)) |
| available_fields = list(first_example.keys()) |
|
|
| |
| if input_field not in available_fields: |
| return False, f"❌ Input field '{input_field}' not found. Available fields: {', '.join(available_fields)}" |
|
|
| |
| if target_field not in available_fields: |
| return False, f"❌ Target field '{target_field}' not found. Available fields: {', '.join(available_fields)}" |
|
|
| |
| return True, f"✅ Dataset validated successfully! Fields '{input_field}' and '{target_field}' found." |
|
|
| except Exception as e: |
| error_msg = str(e) |
| if "404" in error_msg or "not found" in error_msg.lower(): |
| return False, f"❌ Dataset '{dataset_name}' not found. Please check the dataset name (use format: org/dataset-name)" |
| return False, f"❌ Error validating dataset: {error_msg}" |
|
|
|
|
| def validate_inputs(dataset_name: str, split: str, input_field: str, target_field: str, |
| initial_prompt: str) -> Tuple[bool, str]: |
| """ |
| Validate all inputs before starting optimization. |
| |
| Returns: |
| Tuple of (is_valid, message) |
| """ |
| |
| api_key = os.environ.get("OPENAI_API_KEY") |
| if not api_key: |
| return False, "❌ OPENAI_API_KEY environment variable not set. Please set it in the Space secrets." |
|
|
| |
| if "{input}" not in initial_prompt: |
| return False, "❌ Prompt must contain '{input}' placeholder for dataset inputs" |
|
|
| |
| dataset_name = dataset_name.strip() |
| if not dataset_name: |
| return False, "❌ Dataset name cannot be empty" |
|
|
| |
| is_valid, message = validate_dataset(dataset_name, split, input_field, target_field) |
| if not is_valid: |
| return False, message |
|
|
| return True, message |
|
|
|
|
| def evaluate_prompt(prompt: str, dataset_name: str, split: str, num_samples: int, |
| model: str, input_field: str, target_field: str, |
| fixed_indices: List[int] = None) -> Dict: |
| """ |
| Evaluate a prompt on a dataset using the selected model. |
| |
| Args: |
| fixed_indices: Optional list of dataset indices to use. If provided, |
| ensures we evaluate on the SAME samples every time. |
| """ |
| try: |
| |
| api_key = os.environ.get("OPENAI_API_KEY") |
| if not api_key: |
| return { |
| "error": "OPENAI_API_KEY not set in environment", |
| "accuracy": 0, |
| "correct": 0, |
| "total": 0, |
| "results": [] |
| } |
|
|
| |
| |
| try: |
| dataset = load_dataset(dataset_name, split=split, streaming=False) |
| except ValueError as e: |
| |
| if "config" in str(e).lower() or "Config name is missing" in str(e): |
| |
| default_config = "main" |
| if dataset_name.lower() == "glue": |
| default_config = "sst2" |
| dataset = load_dataset(dataset_name, default_config, split=split, streaming=False) |
| else: |
| raise |
|
|
| |
| if fixed_indices is not None: |
| |
| indices = fixed_indices |
| samples = [dataset[i] for i in indices] |
| elif len(dataset) > num_samples: |
| |
| random.seed(42) |
| indices = random.sample(range(len(dataset)), num_samples) |
| samples = [dataset[i] for i in indices] |
| else: |
| indices = list(range(min(num_samples, len(dataset)))) |
| samples = list(dataset)[:num_samples] |
|
|
| |
| client = OpenAI( |
| base_url="https://openrouter.ai/api/v1", |
| api_key=api_key, |
| ) |
|
|
| correct = 0 |
| total = 0 |
| results = [] |
| errors = [] |
|
|
| for idx, sample in enumerate(samples): |
| try: |
| |
| input_text = sample.get(input_field, "") |
| if isinstance(input_text, dict): |
| input_text = str(input_text) |
|
|
| target = sample.get(target_field, "") |
| if isinstance(target, dict): |
| target = str(target) |
|
|
| |
| formatted_prompt = prompt.replace("{input}", str(input_text)) |
|
|
| |
| max_retries = 3 |
| import time |
| for retry in range(max_retries): |
| try: |
| response = client.chat.completions.create( |
| model=model, |
| messages=[ |
| {"role": "system", "content": "You are a helpful assistant."}, |
| {"role": "user", "content": formatted_prompt} |
| ], |
| temperature=0.0, |
| max_tokens=500, |
| ) |
| break |
| except Exception as api_error: |
| if retry < max_retries - 1: |
| wait_time = (retry + 1) * 2 |
| print(f" API error on sample {idx+1}, retrying in {wait_time}s...") |
| time.sleep(wait_time) |
| else: |
| raise |
|
|
| prediction = response.choices[0].message.content.strip() |
|
|
| |
| time.sleep(0.1) |
|
|
| |
| is_correct = check_answer(prediction, str(target)) |
|
|
| if is_correct: |
| correct += 1 |
| total += 1 |
|
|
| results.append({ |
| "input": str(input_text)[:100] + "..." if len(str(input_text)) > 100 else str(input_text), |
| "target": str(target), |
| "prediction": prediction[:100] + "..." if len(prediction) > 100 else prediction, |
| "correct": is_correct |
| }) |
|
|
| except Exception as e: |
| error_msg = f"Sample {idx+1}: {str(e)}" |
| print(f"Error evaluating sample {idx+1}: {e}") |
| errors.append(error_msg) |
| |
| if len(errors) > len(samples) // 2: |
| print(f"Too many errors ({len(errors)} out of {len(samples)}), stopping evaluation") |
| break |
| continue |
|
|
| accuracy = (correct / total * 100) if total > 0 else 0 |
|
|
| result_dict = { |
| "accuracy": accuracy, |
| "correct": correct, |
| "total": total, |
| "results": results, |
| "indices": indices |
| } |
|
|
| |
| if errors: |
| result_dict["errors"] = errors |
| if total == 0: |
| |
| result_dict["error"] = f"All {len(samples)} samples failed to evaluate. First few errors:\n" + "\n".join(errors[:3]) |
|
|
| return result_dict |
|
|
| except Exception as e: |
| return { |
| "error": str(e), |
| "accuracy": 0, |
| "correct": 0, |
| "total": 0, |
| "results": [] |
| } |
|
|
|
|
| def collect_prompt_history(output_dir: str, initial_score: float = 0.0) -> List[Dict]: |
| """ |
| Collect only the prompts that were "best" at some point during evolution. |
| Returns only programs that improved upon the initial score (deduplicated). |
| |
| Args: |
| output_dir: Directory containing checkpoint data |
| initial_score: Score of the initial prompt (baseline to beat) |
| |
| Returns a list of dicts with: {prompt, score, iteration, id} |
| """ |
| try: |
| all_programs = [] |
| seen_prompts = set() |
|
|
| |
| |
| checkpoints_dir = os.path.join(output_dir, "checkpoints") |
|
|
| if not os.path.exists(checkpoints_dir): |
| return [] |
|
|
| |
| checkpoint_dirs = sorted(glob.glob(os.path.join(checkpoints_dir, "checkpoint_*"))) |
|
|
| |
| for checkpoint_dir in checkpoint_dirs: |
| programs_dir = os.path.join(checkpoint_dir, "programs") |
| if not os.path.exists(programs_dir): |
| continue |
|
|
| |
| program_files = glob.glob(os.path.join(programs_dir, "*.json")) |
|
|
| for pfile in program_files: |
| try: |
| with open(pfile, 'r') as f: |
| program_data = json.load(f) |
|
|
| |
| prompt_content = program_data.get("code", "").strip() |
| prog_id = program_data.get("id", os.path.basename(pfile).replace(".json", "")) |
| iteration = program_data.get("iteration_found", 0) |
| metrics = program_data.get("metrics", {}) |
|
|
| |
| combined_score = metrics.get("combined_score", 0.0) |
|
|
| all_programs.append({ |
| "prompt": prompt_content, |
| "id": prog_id, |
| "file": pfile, |
| "iteration": iteration, |
| "metrics": metrics, |
| "score": combined_score |
| }) |
| except Exception as e: |
| print(f"Error reading program file {pfile}: {e}") |
| continue |
|
|
| |
| all_programs.sort(key=lambda x: x.get("iteration", 0)) |
|
|
| |
| |
| best_programs = [] |
| current_best_score = initial_score |
|
|
| for program in all_programs: |
| prompt_content = program["prompt"] |
| score = program["score"] |
| iteration = program["iteration"] |
|
|
| |
| if iteration == 0: |
| continue |
|
|
| |
| normalized_prompt = " ".join(prompt_content.split()) |
|
|
| |
| if normalized_prompt in seen_prompts: |
| continue |
|
|
| |
| if score > current_best_score: |
| seen_prompts.add(normalized_prompt) |
| best_programs.append(program) |
| improvement = score - current_best_score |
| print(f" ✓ Best program at iteration {iteration}: score={score:.2%} (improved by +{improvement:.2%})") |
| current_best_score = score |
|
|
| return best_programs |
|
|
| except Exception as e: |
| print(f"Error collecting prompt history: {e}") |
| return [] |
|
|
|
|
| def parse_evolution_history(output_dir: str) -> str: |
| """ |
| Parse evolution history from OpenEvolve output directory. |
| |
| Returns a markdown string with visualization of the evolution process. |
| """ |
| try: |
| evolution_viz = "## 🧬 Evolution Progress\n\n" |
|
|
| |
| generation_files = sorted(glob.glob(os.path.join(output_dir, "generation_*.txt"))) |
| log_file = os.path.join(output_dir, "evolution.log") |
|
|
| |
| if generation_files: |
| evolution_viz += "### Generation-by-Generation Progress\n\n" |
| for gen_file in generation_files: |
| gen_num = os.path.basename(gen_file).replace("generation_", "").replace(".txt", "") |
| try: |
| with open(gen_file, 'r') as f: |
| content = f.read() |
| evolution_viz += f"**Generation {gen_num}:**\n```\n{content[:200]}{'...' if len(content) > 200 else ''}\n```\n\n" |
| except: |
| pass |
|
|
| |
| elif os.path.exists(log_file): |
| evolution_viz += "### Evolution Log\n\n" |
| try: |
| with open(log_file, 'r') as f: |
| log_content = f.read() |
| evolution_viz += f"```\n{log_content[-1000:]}\n```\n\n" |
| except: |
| pass |
|
|
| |
| scores_file = os.path.join(output_dir, "scores.json") |
| if os.path.exists(scores_file): |
| try: |
| with open(scores_file, 'r') as f: |
| scores = json.load(f) |
|
|
| evolution_viz += "### Score Progression\n\n" |
| evolution_viz += "| Generation | Best Score | Avg Score | Population |\n" |
| evolution_viz += "|------------|-----------|-----------|------------|\n" |
|
|
| for gen in scores: |
| evolution_viz += f"| {gen['generation']} | {gen['best']:.3f} | {gen['avg']:.3f} | {gen['population']} |\n" |
|
|
| evolution_viz += "\n" |
| except: |
| pass |
|
|
| |
| program_files = sorted(glob.glob(os.path.join(output_dir, "program_*.txt"))) |
| if program_files: |
| evolution_viz += f"### Explored Variants\n\n" |
| evolution_viz += f"OpenEvolve explored {len(program_files)} different prompt variants during evolution.\n\n" |
|
|
| |
| if len(program_files) > 3: |
| sample_files = [program_files[0], program_files[len(program_files)//2], program_files[-2]] |
| evolution_viz += "**Sample Intermediate Prompts:**\n\n" |
| for idx, pfile in enumerate(sample_files, 1): |
| try: |
| with open(pfile, 'r') as f: |
| prompt_content = f.read() |
| evolution_viz += f"**Variant {idx}:**\n```\n{prompt_content[:150]}{'...' if len(prompt_content) > 150 else ''}\n```\n\n" |
| except: |
| pass |
|
|
| |
| if not generation_files and not os.path.exists(log_file) and not os.path.exists(scores_file): |
| evolution_viz += "### Evolution Complete\n\n" |
| evolution_viz += "OpenEvolve ran 5 iterations of evolutionary optimization using:\n" |
| evolution_viz += "- **Variants**: 1 new prompt per iteration\n" |
| evolution_viz += "- **Selection Strategy**: 40% elite, 10% explore, 50% exploit\n" |
| evolution_viz += "- **Population**: 1 island, up to 15 programs retained\n" |
| evolution_viz += "- **Evaluation**: 20 samples per prompt variant\n\n" |
|
|
| |
| all_files = os.listdir(output_dir) |
| evolution_viz += f"Generated {len(all_files)} files during evolution process.\n\n" |
|
|
| return evolution_viz |
|
|
| except Exception as e: |
| return f"## 🧬 Evolution Progress\n\nEvolution completed successfully. Unable to parse detailed history: {str(e)}\n\n" |
|
|
|
|
| def create_evaluator_file(dataset_name: str, split: str, model: str, |
| input_field: str, target_field: str, work_dir: str): |
| """Create an evaluator.py file for OpenEvolve that uses same 20 samples as initial/final eval.""" |
| evaluator_code = f''' |
| import os |
| import random |
| import time |
| from datasets import load_dataset |
| from openai import OpenAI |
| |
| def evaluate(prompt: str) -> dict: |
| """ |
| Evaluate a prompt using 50 fixed samples - SAME as initial and final evaluation. |
| |
| OpenEvolve passes a file path, so we need to read the prompt from the file. |
| Using the same 20 samples ensures evolution optimizes for the exact test set. |
| Includes early stopping and rate limit handling. |
| """ |
| try: |
| # CRITICAL: OpenEvolve passes a FILE PATH, not the prompt text! |
| # Check if prompt is a file path and read it |
| if os.path.exists(prompt): |
| with open(prompt, 'r') as f: |
| prompt_text = f.read() |
| # Strip EVOLVE-BLOCK markers if present |
| prompt_text = prompt_text.replace("# EVOLVE-BLOCK-START", "").replace("# EVOLVE-BLOCK-END", "").strip() |
| else: |
| # If not a file path, use as-is (for backward compatibility) |
| prompt_text = prompt |
| |
| # IMPORTANT: Use fixed seed for consistent sampling across all evaluations |
| random.seed(42) |
| |
| # Load dataset |
| try: |
| dataset = load_dataset("{dataset_name}", split="{split}", streaming=False) |
| except ValueError as e: |
| if "config" in str(e).lower() or "Config name is missing" in str(e): |
| default_config = "main" |
| if "{dataset_name}".lower() == "glue": |
| default_config = "sst2" |
| dataset = load_dataset("{dataset_name}", default_config, split="{split}", streaming=False) |
| else: |
| raise |
| |
| # Sample 20 samples with seed 42 - SAME as initial/final evaluation for consistency! |
| num_samples = 20 |
| if len(dataset) > num_samples: |
| # Use SAME sampling logic as initial/final eval |
| indices = random.sample(range(len(dataset)), num_samples) |
| samples = [dataset[i] for i in indices] |
| else: |
| indices = list(range(min(num_samples, len(dataset)))) |
| samples = list(dataset)[:num_samples] |
| |
| # Initialize OpenAI client |
| api_key = os.environ.get("OPENAI_API_KEY") |
| client = OpenAI( |
| base_url="https://openrouter.ai/api/v1", |
| api_key=api_key, |
| ) |
| |
| correct = 0 |
| total = 0 |
| errors = 0 |
| |
| print(f"Evaluating on {{len(samples)}} samples...") |
| |
| for idx, sample in enumerate(samples): |
| try: |
| # Get input and target |
| input_text = sample.get("{input_field}", "") |
| if isinstance(input_text, dict): |
| input_text = str(input_text) |
| |
| target = sample.get("{target_field}", "") |
| if isinstance(target, dict): |
| target = str(target) |
| |
| # Format the prompt (use prompt_text that we read from file) |
| formatted_prompt = prompt_text.replace("{{input}}", str(input_text)) |
| |
| # Call the model with retry logic for transient failures |
| max_retries = 3 |
| for retry in range(max_retries): |
| try: |
| response = client.chat.completions.create( |
| model="{model}", |
| messages=[ |
| {{"role": "system", "content": "You are a helpful assistant."}}, |
| {{"role": "user", "content": formatted_prompt}} |
| ], |
| temperature=0.0, |
| max_tokens=500, |
| ) |
| break # Success, exit retry loop |
| except Exception as api_error: |
| if retry < max_retries - 1: |
| wait_time = (retry + 1) * 2 # Exponential backoff: 2s, 4s, 6s |
| print(f" API error on sample {{idx+1}}, retrying in {{wait_time}}s...") |
| time.sleep(wait_time) |
| else: |
| raise # Final retry failed, propagate error |
| |
| prediction = response.choices[0].message.content.strip() |
| |
| # Generic answer matching |
| target_str = str(target).strip() |
| |
| # Extract core answer (handles GSM8K "####" format, plain labels, etc.) |
| if "####" in target_str: |
| target_answer = target_str.split("####")[-1].strip().replace(",", "") |
| else: |
| target_answer = target_str |
| |
| pred_lower = prediction.lower().strip() |
| target_lower = target_answer.lower().strip() |
| |
| is_correct = False |
| |
| # Direct containment |
| if target_lower in pred_lower: |
| is_correct = True |
| |
| # Numeric matching |
| if not is_correct: |
| import re as _re |
| try: |
| target_num = float(target_lower) |
| numbers = _re.findall(r'-?[\\d,]+\\.?\\d*', pred_lower) |
| for n in numbers: |
| try: |
| if float(n.replace(",", "")) == target_num: |
| is_correct = True |
| break |
| except ValueError: |
| continue |
| except ValueError: |
| pass |
| |
| # IMDB-style 0/1 labels |
| if not is_correct and target_lower in ("0", "1"): |
| has_positive = "positive" in pred_lower[:200] |
| has_negative = "negative" in pred_lower[:200] |
| if target_lower == "1" and has_positive and not has_negative: |
| is_correct = True |
| if target_lower == "0" and has_negative and not has_positive: |
| is_correct = True |
| |
| if is_correct: |
| correct += 1 |
| total += 1 |
| |
| # Small delay to avoid rate limiting |
| time.sleep(0.1) |
| |
| if (idx + 1) % 25 == 0: |
| print(f" Progress: {{idx + 1}}/{{len(samples)}} - Current accuracy: {{correct/total:.2%}}") |
| |
| except Exception as e: |
| errors += 1 |
| print(f"Error evaluating sample {{idx+1}}: {{e}}") |
| |
| # Early stopping: if more than 40% of samples fail, abort |
| if errors > len(samples) * 0.4: |
| print(f"Too many errors ({{errors}}/{{idx+1}}), stopping evaluation early") |
| break |
| |
| continue |
| |
| accuracy = (correct / total) if total > 0 else 0.0 |
| |
| print(f"Final: {{correct}}/{{total}} = {{accuracy:.2%}}") |
| |
| # DEBUG: Log the prompt being evaluated and its score (use prompt_text, not file path) |
| prompt_preview = prompt_text[:80].replace('\\n', ' ') if len(prompt_text) > 80 else prompt_text.replace('\\n', ' ') |
| print(f"[EVAL DEBUG] Prompt: '{{prompt_preview}}...' → Score: {{accuracy:.2%}}") |
| |
| return {{ |
| "combined_score": accuracy, |
| "accuracy": accuracy, |
| "correct": correct, |
| "total": total |
| }} |
| |
| except Exception as e: |
| print(f"Error in evaluation: {{e}}") |
| return {{ |
| "combined_score": 0.0, |
| "accuracy": 0.0, |
| "correct": 0, |
| "total": 0, |
| "error": str(e) |
| }} |
| ''' |
|
|
| evaluator_path = os.path.join(work_dir, "evaluator.py") |
| with open(evaluator_path, "w") as f: |
| f.write(evaluator_code) |
|
|
| return evaluator_path |
|
|
|
|
| def create_config_file(model: str, work_dir: str): |
| """Create a config.yaml file for OpenEvolve.""" |
|
|
| |
| templates_dir = os.path.join(work_dir, "templates") |
| os.makedirs(templates_dir, exist_ok=True) |
|
|
| |
| system_template = """You are an expert prompt engineer tasked with iteratively improving prompts for language models. |
| Your job is to analyze the current prompt and suggest improvements based on performance feedback. |
| |
| CRITICAL RULES: |
| 1. Keep prompts BRIEF and DIRECT - shorter is usually better |
| 2. Preserve the EXACT output format that the evaluation expects |
| 3. Do NOT make prompts conversational or verbose |
| 4. Do NOT ask for explanations - just ask for the answer |
| 5. Maintain all placeholder variables like {input}, {text}, etc. |
| 6. Focus on clarity and directness, not linguistic elegance |
| 7. Avoid prompts that might cause the model to discuss multiple possibilities |
| |
| For classification tasks: |
| - Ask for direct classification (e.g., "The sentiment is positive") |
| - Avoid asking "what", "why", or "explain" - just ask for the label |
| - Ensure the response will include the label word (positive/negative/neutral) |
| - Keep prompts short enough that responses stay focused |
| - IMPORTANT: The prompt should naturally cause the model to echo the task type in its response |
| (e.g., if classifying sentiment, the response should include the word "sentiment") |
| |
| Good patterns for classification prompts: |
| - "[Action] [task_type] [delimiter] {input}" - e.g., "Classify sentiment: {input}" |
| - "[Task_type] of [delimiter] {input}" - e.g., "Sentiment of: {input}" |
| - "[Action] the [task_type]: {input}" - e.g., "Determine the sentiment: {input}" |
| |
| Bad patterns to avoid: |
| - Questions ("Is this X or Y?", "What is the X?") - too conversational |
| - No task type mentioned - response won't include the keyword |
| - Verbose explanations - pushes keywords past evaluation window |
| - Multiple questions - confuses the model |
| """ |
|
|
| with open(os.path.join(templates_dir, "system_message.txt"), "w") as f: |
| f.write(system_template) |
|
|
| |
| user_template = """# Current Prompt Performance |
| - Current metrics: {metrics} |
| - Areas for improvement: {improvement_areas} |
| |
| {artifacts} |
| |
| # Prompt Evolution History |
| {evolution_history} |
| |
| # Current Prompt |
| ```text |
| {current_program} |
| ``` |
| |
| # Task |
| Rewrite the prompt to MAXIMIZE accuracy on sentiment classification. |
| |
| CRITICAL REQUIREMENTS: |
| 1. The model's response MUST include the word "sentiment" |
| 2. The model's response MUST include either "positive" or "negative" |
| 3. You MUST keep the {{input}} placeholder exactly as {{input}} |
| |
| EVALUATION CRITERIA: |
| - Responses are evaluated by checking if they contain "sentiment" AND ("positive" OR "negative") in the first 150 characters |
| - The response must match the true label (positive=1, negative=0) |
| |
| Be creative! Try different approaches: |
| - Direct instructions vs detailed explanations |
| - Short prompts vs longer contextual prompts |
| - Imperative commands vs questions |
| - System-style vs user-style prompts |
| - With or without examples/formatting instructions |
| |
| The goal is to maximize the model's accuracy. Experiment freely! |
| |
| Output ONLY the new prompt between ```text markers: |
| |
| ```text |
| Your improved prompt here |
| ``` |
| """ |
|
|
| with open(os.path.join(templates_dir, "full_rewrite_user.txt"), "w") as f: |
| f.write(user_template) |
|
|
| config = { |
| "llm": { |
| "primary_model": "google/gemini-2.5-flash", |
| "api_base": "https://openrouter.ai/api/v1", |
| "temperature": 1.2, |
| }, |
| "max_iterations": 5, |
| "checkpoint_interval": 1, |
| "diff_based_evolution": False, |
| "language": "text", |
| "max_code_length": 40000, |
| "num_islands": 1, |
| "prompt": { |
| "template_dir": templates_dir, |
| }, |
| "evolution": { |
| "population_size": 15, |
| "num_islands": 1, |
| "elite_ratio": 0.4, |
| "explore_ratio": 0.1, |
| "exploit_ratio": 0.5, |
| }, |
| "database": { |
| "log_prompts": True, |
| "num_islands": 1, |
| }, |
| "evaluator": { |
| "timeout": 3600, |
| "cascade_evaluation": False, |
| "parallel_evaluations": 1, |
| "distributed": False, |
| } |
| } |
|
|
| config_path = os.path.join(work_dir, "config.yaml") |
| with open(config_path, "w") as f: |
| yaml.dump(config, f) |
|
|
| return config_path |
|
|
|
|
| def optimize_prompt(initial_prompt: str, dataset_name: str, dataset_split: str, |
| model: str, input_field: str, target_field: str, |
| progress=gr.Progress()) -> Tuple[str, str, str]: |
| """Run OpenEvolve to optimize the prompt.""" |
|
|
| progress(0, desc="Validating inputs...") |
|
|
| |
| is_valid, validation_message = validate_inputs( |
| dataset_name, dataset_split, input_field, target_field, initial_prompt |
| ) |
|
|
| if not is_valid: |
| return f"## Validation Failed\n\n{validation_message}", "", "" |
|
|
| progress(0.05, desc=f"Validation passed: {validation_message}") |
|
|
| |
| work_dir = tempfile.mkdtemp(prefix="openevolve_") |
|
|
| try: |
| |
| |
| initial_prompt_path = os.path.join(work_dir, "initial_prompt.txt") |
| with open(initial_prompt_path, "w") as f: |
| |
| f.write("# EVOLVE-BLOCK-START\n") |
| f.write(initial_prompt) |
| f.write("\n# EVOLVE-BLOCK-END\n") |
|
|
| |
| progress(0.1, desc="Creating evaluator...") |
| evaluator_path = create_evaluator_file(dataset_name, dataset_split, model, |
| input_field, target_field, work_dir) |
|
|
| |
| progress(0.15, desc="Creating configuration...") |
| config_path = create_config_file(model, work_dir) |
|
|
| |
| |
| progress(0.2, desc="Running initial evaluation on 20 samples...") |
| initial_eval = evaluate_prompt( |
| initial_prompt, dataset_name, dataset_split, 20, |
| model, input_field, target_field |
| ) |
|
|
| if "error" in initial_eval: |
| return f"## Error\n\n❌ Initial evaluation failed: {initial_eval['error']}", "", "" |
|
|
| if initial_eval["total"] == 0: |
| return f"## Error\n\n❌ Initial evaluation failed: No samples could be evaluated. This usually means:\n- API key is invalid or has no credits\n- Model is unavailable or rate-limited\n- Dataset fields are incorrect\n- Network connectivity issues\n\nPlease check your configuration and try again.", "", "" |
|
|
| |
| eval_indices = initial_eval.get("indices", []) |
|
|
| initial_results = f""" |
| ### Initial Prompt Evaluation |
| |
| **Prompt:** |
| ``` |
| {initial_prompt} |
| ``` |
| |
| **Results:** |
| - Accuracy: {initial_eval['accuracy']:.2f}% |
| - Correct: {initial_eval['correct']}/{initial_eval['total']} |
| |
| **Sample Results:** |
| """ |
| for i, result in enumerate(initial_eval['results'][:5], 1): |
| initial_results += f"\n{i}. Input: {result['input']}\n" |
| initial_results += f" Target: {result['target']}\n" |
| initial_results += f" Prediction: {result['prediction']}\n" |
| initial_results += f" ✓ Correct\n" if result['correct'] else f" ✗ Incorrect\n" |
|
|
| |
| progress(0.3, desc="Starting evolution: 5 iterations...") |
|
|
| output_dir = os.path.join(work_dir, "output") |
| os.makedirs(output_dir, exist_ok=True) |
|
|
| try: |
| |
| |
|
|
| |
| import os as os_env |
| os_env.environ['OPENEVOLVE_NO_PARALLEL'] = '1' |
|
|
| |
| import signal |
| import threading |
|
|
| original_signal = signal.signal |
|
|
| def safe_signal(signum, handler): |
| """Only set signal handlers in main thread""" |
| if threading.current_thread() is threading.main_thread(): |
| return original_signal(signum, handler) |
| else: |
| |
| return signal.SIG_DFL |
|
|
| signal.signal = safe_signal |
|
|
| |
| result = run_evolution( |
| initial_program=initial_prompt_path, |
| evaluator=evaluator_path, |
| config=config_path, |
| output_dir=output_dir |
| ) |
|
|
| |
| signal.signal = original_signal |
|
|
| progress(0.80, desc="Parsing evolution history...") |
|
|
| |
| evolution_viz = parse_evolution_history(output_dir) |
|
|
| progress(0.85, desc="Evaluating best evolved prompt...") |
|
|
| |
| best_prompt_path = os.path.join(output_dir, "best", "best_program.txt") |
| if os.path.exists(best_prompt_path): |
| with open(best_prompt_path, "r") as f: |
| best_prompt_raw = f.read() |
| |
| best_prompt = best_prompt_raw.replace("# EVOLVE-BLOCK-START", "").replace("# EVOLVE-BLOCK-END", "").strip() |
| print(f"\n[SELECTION] OpenEvolve selected best prompt from: {best_prompt_path}") |
| print(f"[SELECTION] Raw prompt length: {len(best_prompt_raw)} chars") |
| print(f"[SELECTION] Best prompt: '{best_prompt[:100].replace(chr(10), ' ')}...'") |
| else: |
| |
| best_prompt_path_alt = os.path.join(output_dir, "best_program.txt") |
| if os.path.exists(best_prompt_path_alt): |
| with open(best_prompt_path_alt, "r") as f: |
| best_prompt_raw = f.read() |
| |
| best_prompt = best_prompt_raw.replace("# EVOLVE-BLOCK-START", "").replace("# EVOLVE-BLOCK-END", "").strip() |
| print(f"\n[SELECTION] OpenEvolve selected best prompt from: {best_prompt_path_alt}") |
| print(f"[SELECTION] Raw prompt length: {len(best_prompt_raw)} chars") |
| print(f"[SELECTION] Best prompt: '{best_prompt[:100].replace(chr(10), ' ')}...'") |
| else: |
| best_prompt = initial_prompt |
| print(f"\n[SELECTION] WARNING: No best_program.txt found, using initial prompt") |
|
|
| |
| progress(0.85, desc="Evaluating best prompt on 20 samples (same as initial)...") |
| final_eval = evaluate_prompt( |
| best_prompt, dataset_name, dataset_split, 20, |
| model, input_field, target_field, |
| fixed_indices=eval_indices |
| ) |
|
|
| |
| if final_eval['accuracy'] < initial_eval['accuracy']: |
| best_prompt = initial_prompt |
| final_eval = initial_eval |
| regression = True |
| else: |
| regression = False |
|
|
| progress(0.95, desc=f"Evaluation complete: {final_eval['correct']}/{final_eval['total']} = {final_eval['accuracy']:.1f}%") |
|
|
| improvement = final_eval['accuracy'] - initial_eval['accuracy'] |
|
|
| final_results = f""" |
| ### Best Prompt |
| |
| **Prompt:** |
| ``` |
| {best_prompt} |
| ``` |
| |
| **Results:** |
| - Accuracy: {final_eval['accuracy']:.2f}% |
| - Correct: {final_eval['correct']}/{final_eval['total']} |
| """ |
| if regression: |
| final_results += "\n**Note:** Evolution did not improve on the initial prompt. Keeping the original.\n" |
| else: |
| final_results += f"\n- Improvement: {improvement:+.2f}%\n" |
|
|
| final_results += "\n**Sample Results:**\n" |
| for i, result in enumerate(final_eval['results'][:5], 1): |
| final_results += f"\n{i}. Input: {result['input']}\n" |
| final_results += f" Target: {result['target']}\n" |
| final_results += f" Prediction: {result['prediction']}\n" |
| final_results += f" ✓ Correct\n" if result['correct'] else f" ✗ Incorrect\n" |
|
|
| if regression: |
| summary_title = "## Optimization Complete (No Improvement)" |
| summary_note = "\n**Evolution did not find a better prompt.** The initial prompt is already strong for this task.\n" |
| else: |
| summary_title = "## Optimization Complete!" |
| summary_note = "" |
|
|
| summary = f""" |
| {summary_title} |
| |
| ### Summary |
| - **Dataset**: {dataset_name} ({dataset_split} split) |
| - **Evaluation Model**: {model} |
| - **Evolution Model**: google/gemini-2.5-flash |
| - **Samples**: 20 (same for initial, evolution, and final eval) |
| - **Iterations**: 5 |
| {summary_note} |
| ### Results |
| - **Initial Accuracy**: {initial_eval['accuracy']:.2f}% ({initial_eval['correct']}/{initial_eval['total']}) |
| - **Final Accuracy**: {final_eval['accuracy']:.2f}% ({final_eval['correct']}/{final_eval['total']}) |
| - **Improvement**: {improvement:+.2f}% |
| |
| {validation_message} |
| """ |
|
|
| progress(1.0, desc="Complete!") |
|
|
| return summary, initial_results, final_results |
|
|
| except Exception as e: |
| return f"## Error During Evolution\n\n❌ {str(e)}", initial_results, "" |
|
|
| finally: |
| |
| |
| pass |
|
|
|
|
| |
| custom_css = """ |
| /* Minimal CSS — only style what Gradio can't handle natively. |
| All text-bearing elements use gr.Markdown (inherits theme colors). |
| Only the run button gets custom styling. */ |
| |
| .gradio-container { max-width: 1200px !important; margin: auto; } |
| |
| /* Primary action button — always purple with white text */ |
| .run-btn button, .run-btn > button, button.run-btn, .run-btn { |
| background: linear-gradient(135deg, #7c3aed 0%, #6d28d9 100%) !important; |
| color: #fff !important; |
| border: none !important; |
| border-radius: 12px !important; |
| font-size: 1.05rem !important; |
| font-weight: 600 !important; |
| padding: 14px 28px !important; |
| transition: transform 0.1s, box-shadow 0.2s !important; |
| } |
| .run-btn:hover, .run-btn button:hover { |
| transform: translateY(-1px) !important; |
| box-shadow: 0 8px 24px rgba(124,58,237,0.35) !important; |
| color: #fff !important; |
| } |
| """ |
|
|
| |
| PRESETS = { |
| "imdb": { |
| "dataset": "stanfordnlp/imdb", |
| "split": "test", |
| "input": "text", |
| "target": "label", |
| "prompt": "What do you think about this? {input}", |
| }, |
| } |
|
|
|
|
| def load_preset(name): |
| p = PRESETS[name] |
| return p["dataset"], p["split"], p["input"], p["target"], p["prompt"] |
|
|
|
|
| |
| with gr.Blocks(title="OpenEvolve Prompt Optimizer") as demo: |
|
|
| |
| gr.HTML(""" |
| <div style="background:linear-gradient(135deg,#0f0c29 0%,#302b63 50%,#24243e 100%); |
| border-radius:16px;padding:32px 40px;margin-bottom:8px;text-align:center;"> |
| <h1 style="color:#fff;font-size:2rem;font-weight:700;margin:0 0 8px 0;letter-spacing:-0.02em;"> |
| OpenEvolve Prompt Optimizer |
| </h1> |
| <p style="color:#c4b5fd;font-size:0.95rem;margin:0;"> |
| Evolve better prompts automatically using |
| <a href="https://github.com/codelion/openevolve" target="_blank" style="color:#c4b5fd;text-decoration:underline;">OpenEvolve</a>. |
| Powered by <code style="background:rgba(255,255,255,0.12);color:#e0d4ff;padding:2px 6px;border-radius:4px;">gemini-2.5-flash</code> via |
| <a href="https://openrouter.ai/" target="_blank" style="color:#c4b5fd;text-decoration:underline;">OpenRouter</a>. |
| </p> |
| <p style="color:#94a3b8;font-size:0.82rem;margin:12px 0 0 0;"> |
| <strong style="color:#a78bfa;">1.</strong> Pick a dataset & prompt → |
| <strong style="color:#a78bfa;">2.</strong> Evolve 5 iterations → |
| <strong style="color:#a78bfa;">3.</strong> Compare results side-by-side |
| </p> |
| </div> |
| """) |
|
|
| |
| gr.Markdown("#### Dataset") |
|
|
| gr.Markdown("Quick preset:") |
| preset_imdb = gr.Button("IMDB Sentiment", size="sm") |
|
|
| dataset_name = gr.Textbox( |
| label="HuggingFace Dataset", |
| value="stanfordnlp/imdb", |
| placeholder="org/dataset-name", |
| ) |
| with gr.Row(): |
| dataset_split = gr.Textbox(label="Split", value="test", scale=1) |
| input_field = gr.Textbox(label="Input Field", value="text", scale=1) |
| target_field = gr.Textbox(label="Target Field", value="label", scale=1) |
|
|
| gr.Markdown("#### Prompt") |
| initial_prompt = gr.TextArea( |
| label="Initial Prompt", |
| value="What do you think about this? {input}", |
| lines=5, |
| info="Must contain {input} placeholder. Start with a weak prompt -- evolution will improve it!", |
| ) |
| gr.Markdown( |
| "*Eval model:* `gemini-2.5-flash-lite` (20 samples) | *Evolution model:* `gemini-2.5-flash` (5 iterations) \n" |
| "**Note:** Optimization can take up to 10 minutes to complete." |
| ) |
|
|
| |
| optimize_btn = gr.Button( |
| "Optimize Prompt", |
| variant="primary", |
| size="lg", |
| elem_classes="run-btn", |
| ) |
|
|
| |
| gr.Markdown("---") |
| summary = gr.Markdown("") |
|
|
| with gr.Row(equal_height=True): |
| with gr.Column(): |
| initial_results = gr.Markdown("**Initial Prompt**\n\nResults will appear here after optimization...") |
| with gr.Column(): |
| final_results = gr.Markdown("**Evolved Prompt**\n\nResults will appear here after optimization...") |
|
|
| |
| def optimize_with_fixed_model(initial_prompt, dataset_name, dataset_split, |
| input_field, target_field, progress=gr.Progress()): |
| return optimize_prompt( |
| initial_prompt, dataset_name, dataset_split, |
| MODELS[0], |
| input_field, target_field, progress |
| ) |
|
|
| optimize_btn.click( |
| fn=optimize_with_fixed_model, |
| inputs=[initial_prompt, dataset_name, dataset_split, |
| input_field, target_field], |
| outputs=[summary, initial_results, final_results], |
| ) |
|
|
| preset_outputs = [dataset_name, dataset_split, input_field, target_field, initial_prompt] |
| preset_imdb.click(fn=lambda: load_preset("imdb"), outputs=preset_outputs) |
|
|
| if __name__ == "__main__": |
| demo.launch(css=custom_css) |
|
|