Asankhaya Sharma
Redesign UI, fix dark mode, generic evaluator, and reduce run time
9efc6b5
import gradio as gr
import os
import yaml
import json
import random
import re
from datasets import load_dataset, get_dataset_config_names, get_dataset_split_names
from openai import OpenAI
from openevolve import run_evolution
from typing import Dict, List, Tuple, Optional
import tempfile
import shutil
import requests
import glob
# Model for OpenRouter
MODELS = [
"google/gemini-2.5-flash-lite",
]
def extract_answer(text: str) -> str:
"""Extract the core answer from a string.
Handles:
- GSM8K format: "reasoning...\n#### 2280" -> "2280"
- Numeric labels: "0" or "1" -> "0" or "1"
- Plain text answers
"""
text = str(text).strip()
# GSM8K: extract number after ####
if "####" in text:
answer = text.split("####")[-1].strip()
# Remove commas from numbers like "1,234"
answer = answer.replace(",", "")
return answer
return text
def check_answer(prediction: str, target: str) -> bool:
"""Check if prediction matches target using flexible matching."""
target_answer = extract_answer(target).lower().strip()
pred_lower = prediction.lower().strip()
# Handle boolean targets (e.g., BoolQ returns Python True/False)
if target_answer in ("true", "false"):
pred_start = pred_lower[:200]
has_yes = any(w in pred_start for w in ("true", "yes"))
has_no = any(w in pred_start for w in ("false", "no"))
if target_answer == "true" and has_yes and not has_no:
return True
if target_answer == "false" and has_no and not has_yes:
return True
return False
# Direct containment check
if target_answer in pred_lower:
return True
# For numeric targets, look for the number in the prediction
try:
target_num = float(target_answer)
numbers = re.findall(r'-?[\d,]+\.?\d*', pred_lower)
for n in numbers:
try:
if float(n.replace(",", "")) == target_num:
return True
except ValueError:
continue
except ValueError:
pass
# For IMDB-style labels (0/1), check for positive/negative keywords
if target_answer in ("0", "1"):
has_positive = "positive" in pred_lower[:200]
has_negative = "negative" in pred_lower[:200]
if target_answer == "1" and has_positive and not has_negative:
return True
if target_answer == "0" and has_negative and not has_positive:
return True
return False
def validate_dataset(dataset_name: str, split: str, input_field: str, target_field: str) -> Tuple[bool, str]:
"""
Validate that the dataset exists and has the required fields.
Returns:
Tuple of (is_valid, error_message)
"""
try:
# Check if dataset name has correct format (should be org/name or just name)
if not dataset_name or dataset_name.strip() == "":
return False, "❌ Dataset name cannot be empty"
dataset_name = dataset_name.strip()
# Try to get dataset info from HuggingFace API
hf_token = os.environ.get("HF_TOKEN", None)
headers = {}
if hf_token:
headers["Authorization"] = f"Bearer {hf_token}"
# Check if dataset exists on HuggingFace Hub
api_url = f"https://huggingface.co/api/datasets/{dataset_name}"
response = requests.get(api_url, headers=headers, timeout=10)
if response.status_code == 404:
return False, f"❌ Dataset '{dataset_name}' not found on HuggingFace Hub. Please use the full dataset name (e.g., 'stanfordnlp/imdb' or 'gsm8k')"
elif response.status_code != 200:
# Try to load anyway - might be a private dataset or API issue
print(f"Warning: Could not verify dataset via API (status {response.status_code}), attempting to load...")
# Try to load a small sample to verify it works and check fields
print(f"Loading dataset {dataset_name} with split {split}...")
# First, check if the split exists
try:
available_splits = get_dataset_split_names(dataset_name)
if split not in available_splits:
return False, f"❌ Split '{split}' not found. Available splits: {', '.join(available_splits)}"
except Exception as e:
print(f"Could not get split names: {e}. Will try to load anyway...")
# Load a small sample to check fields
# Try loading with just dataset name first
try:
dataset = load_dataset(dataset_name, split=split, streaming=True)
except ValueError as e:
# If it fails with config error, try common configs
if "config" in str(e).lower() or "Config name is missing" in str(e):
# Try common configs based on dataset name
default_config = "main"
if dataset_name.lower() == "glue":
default_config = "sst2"
print(f"Dataset requires config, trying with '{default_config}' config...")
try:
dataset = load_dataset(dataset_name, default_config, split=split, streaming=True)
except:
# If default config doesn't work, raise the original error
raise e
else:
raise
# Get first example to check fields
first_example = next(iter(dataset))
available_fields = list(first_example.keys())
# Check if input field exists
if input_field not in available_fields:
return False, f"❌ Input field '{input_field}' not found. Available fields: {', '.join(available_fields)}"
# Check if target field exists
if target_field not in available_fields:
return False, f"❌ Target field '{target_field}' not found. Available fields: {', '.join(available_fields)}"
# All validations passed
return True, f"✅ Dataset validated successfully! Fields '{input_field}' and '{target_field}' found."
except Exception as e:
error_msg = str(e)
if "404" in error_msg or "not found" in error_msg.lower():
return False, f"❌ Dataset '{dataset_name}' not found. Please check the dataset name (use format: org/dataset-name)"
return False, f"❌ Error validating dataset: {error_msg}"
def validate_inputs(dataset_name: str, split: str, input_field: str, target_field: str,
initial_prompt: str) -> Tuple[bool, str]:
"""
Validate all inputs before starting optimization.
Returns:
Tuple of (is_valid, message)
"""
# Check API key
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
return False, "❌ OPENAI_API_KEY environment variable not set. Please set it in the Space secrets."
# Check prompt contains {input} placeholder
if "{input}" not in initial_prompt:
return False, "❌ Prompt must contain '{input}' placeholder for dataset inputs"
# Check dataset name format
dataset_name = dataset_name.strip()
if not dataset_name:
return False, "❌ Dataset name cannot be empty"
# Validate dataset and fields
is_valid, message = validate_dataset(dataset_name, split, input_field, target_field)
if not is_valid:
return False, message
return True, message
def evaluate_prompt(prompt: str, dataset_name: str, split: str, num_samples: int,
model: str, input_field: str, target_field: str,
fixed_indices: List[int] = None) -> Dict:
"""
Evaluate a prompt on a dataset using the selected model.
Args:
fixed_indices: Optional list of dataset indices to use. If provided,
ensures we evaluate on the SAME samples every time.
"""
try:
# Get API key from environment
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
return {
"error": "OPENAI_API_KEY not set in environment",
"accuracy": 0,
"correct": 0,
"total": 0,
"results": []
}
# Load dataset
# Try loading with just dataset name first
try:
dataset = load_dataset(dataset_name, split=split, streaming=False)
except ValueError as e:
# If it fails with config error, try common configs
if "config" in str(e).lower() or "Config name is missing" in str(e):
# Try common configs based on dataset name
default_config = "main"
if dataset_name.lower() == "glue":
default_config = "sst2"
dataset = load_dataset(dataset_name, default_config, split=split, streaming=False)
else:
raise
# Sample examples - use fixed indices if provided to ensure consistency
if fixed_indices is not None:
# Use the provided indices (ensures same samples for initial/final eval)
indices = fixed_indices
samples = [dataset[i] for i in indices]
elif len(dataset) > num_samples:
# First time: use fixed seed for reproducible sampling
random.seed(42) # Fixed seed ensures same samples across runs
indices = random.sample(range(len(dataset)), num_samples)
samples = [dataset[i] for i in indices]
else:
indices = list(range(min(num_samples, len(dataset))))
samples = list(dataset)[:num_samples]
# Initialize OpenAI client with OpenRouter
client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=api_key,
)
correct = 0
total = 0
results = []
errors = []
for idx, sample in enumerate(samples):
try:
# Get input and target
input_text = sample.get(input_field, "")
if isinstance(input_text, dict):
input_text = str(input_text)
target = sample.get(target_field, "")
if isinstance(target, dict):
target = str(target)
# Format the prompt with the input
formatted_prompt = prompt.replace("{input}", str(input_text))
# Call the model with retry logic for transient failures
max_retries = 3
import time
for retry in range(max_retries):
try:
response = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": formatted_prompt}
],
temperature=0.0,
max_tokens=500,
)
break # Success, exit retry loop
except Exception as api_error:
if retry < max_retries - 1:
wait_time = (retry + 1) * 2 # Exponential backoff: 2s, 4s, 6s
print(f" API error on sample {idx+1}, retrying in {wait_time}s...")
time.sleep(wait_time)
else:
raise # Final retry failed, propagate error
prediction = response.choices[0].message.content.strip()
# Small delay to avoid rate limiting
time.sleep(0.1)
# Generic answer matching: extract core answer from both target and prediction
is_correct = check_answer(prediction, str(target))
if is_correct:
correct += 1
total += 1
results.append({
"input": str(input_text)[:100] + "..." if len(str(input_text)) > 100 else str(input_text),
"target": str(target),
"prediction": prediction[:100] + "..." if len(prediction) > 100 else prediction,
"correct": is_correct
})
except Exception as e:
error_msg = f"Sample {idx+1}: {str(e)}"
print(f"Error evaluating sample {idx+1}: {e}")
errors.append(error_msg)
# Only continue if we haven't failed on all samples
if len(errors) > len(samples) // 2: # More than half failed
print(f"Too many errors ({len(errors)} out of {len(samples)}), stopping evaluation")
break
continue
accuracy = (correct / total * 100) if total > 0 else 0
result_dict = {
"accuracy": accuracy,
"correct": correct,
"total": total,
"results": results,
"indices": indices # Return indices so we can reuse them for final eval
}
# Add errors if any occurred
if errors:
result_dict["errors"] = errors
if total == 0:
# All samples failed - create a helpful error message
result_dict["error"] = f"All {len(samples)} samples failed to evaluate. First few errors:\n" + "\n".join(errors[:3])
return result_dict
except Exception as e:
return {
"error": str(e),
"accuracy": 0,
"correct": 0,
"total": 0,
"results": []
}
def collect_prompt_history(output_dir: str, initial_score: float = 0.0) -> List[Dict]:
"""
Collect only the prompts that were "best" at some point during evolution.
Returns only programs that improved upon the initial score (deduplicated).
Args:
output_dir: Directory containing checkpoint data
initial_score: Score of the initial prompt (baseline to beat)
Returns a list of dicts with: {prompt, score, iteration, id}
"""
try:
all_programs = []
seen_prompts = set() # Track unique prompts
# OpenEvolve saves programs in checkpoint directories as JSON files
# Structure: output_dir/checkpoints/checkpoint_{iteration}/programs/{program_id}.json
checkpoints_dir = os.path.join(output_dir, "checkpoints")
if not os.path.exists(checkpoints_dir):
return []
# Find all checkpoint directories
checkpoint_dirs = sorted(glob.glob(os.path.join(checkpoints_dir, "checkpoint_*")))
# Collect all programs from all checkpoints
for checkpoint_dir in checkpoint_dirs:
programs_dir = os.path.join(checkpoint_dir, "programs")
if not os.path.exists(programs_dir):
continue
# Read all program JSON files
program_files = glob.glob(os.path.join(programs_dir, "*.json"))
for pfile in program_files:
try:
with open(pfile, 'r') as f:
program_data = json.load(f)
# Extract the code (prompt) from the program data
prompt_content = program_data.get("code", "").strip()
prog_id = program_data.get("id", os.path.basename(pfile).replace(".json", ""))
iteration = program_data.get("iteration_found", 0)
metrics = program_data.get("metrics", {})
# Get combined score for comparison
combined_score = metrics.get("combined_score", 0.0)
all_programs.append({
"prompt": prompt_content,
"id": prog_id,
"file": pfile,
"iteration": iteration,
"metrics": metrics,
"score": combined_score
})
except Exception as e:
print(f"Error reading program file {pfile}: {e}")
continue
# Sort all programs by iteration (chronological order)
all_programs.sort(key=lambda x: x.get("iteration", 0))
# Filter to keep only programs that improved the best score
# Start from the initial score as the baseline
best_programs = []
current_best_score = initial_score
for program in all_programs:
prompt_content = program["prompt"]
score = program["score"]
iteration = program["iteration"]
# Skip iteration 0 (that's the initial prompt, already added separately)
if iteration == 0:
continue
# Create a normalized version for duplicate detection (ignore whitespace differences)
normalized_prompt = " ".join(prompt_content.split())
# Skip duplicates
if normalized_prompt in seen_prompts:
continue
# Only keep if this program improved the best score
if score > current_best_score:
seen_prompts.add(normalized_prompt)
best_programs.append(program)
improvement = score - current_best_score
print(f" ✓ Best program at iteration {iteration}: score={score:.2%} (improved by +{improvement:.2%})")
current_best_score = score
return best_programs
except Exception as e:
print(f"Error collecting prompt history: {e}")
return []
def parse_evolution_history(output_dir: str) -> str:
"""
Parse evolution history from OpenEvolve output directory.
Returns a markdown string with visualization of the evolution process.
"""
try:
evolution_viz = "## 🧬 Evolution Progress\n\n"
# Look for generation files or logs
generation_files = sorted(glob.glob(os.path.join(output_dir, "generation_*.txt")))
log_file = os.path.join(output_dir, "evolution.log")
# Try to parse generation files if they exist
if generation_files:
evolution_viz += "### Generation-by-Generation Progress\n\n"
for gen_file in generation_files:
gen_num = os.path.basename(gen_file).replace("generation_", "").replace(".txt", "")
try:
with open(gen_file, 'r') as f:
content = f.read()
evolution_viz += f"**Generation {gen_num}:**\n```\n{content[:200]}{'...' if len(content) > 200 else ''}\n```\n\n"
except:
pass
# Try to parse log file
elif os.path.exists(log_file):
evolution_viz += "### Evolution Log\n\n"
try:
with open(log_file, 'r') as f:
log_content = f.read()
evolution_viz += f"```\n{log_content[-1000:]}\n```\n\n"
except:
pass
# Look for scores or history file
scores_file = os.path.join(output_dir, "scores.json")
if os.path.exists(scores_file):
try:
with open(scores_file, 'r') as f:
scores = json.load(f)
evolution_viz += "### Score Progression\n\n"
evolution_viz += "| Generation | Best Score | Avg Score | Population |\n"
evolution_viz += "|------------|-----------|-----------|------------|\n"
for gen in scores:
evolution_viz += f"| {gen['generation']} | {gen['best']:.3f} | {gen['avg']:.3f} | {gen['population']} |\n"
evolution_viz += "\n"
except:
pass
# Look for all program variants
program_files = sorted(glob.glob(os.path.join(output_dir, "program_*.txt")))
if program_files:
evolution_viz += f"### Explored Variants\n\n"
evolution_viz += f"OpenEvolve explored {len(program_files)} different prompt variants during evolution.\n\n"
# Show a few intermediate prompts
if len(program_files) > 3:
sample_files = [program_files[0], program_files[len(program_files)//2], program_files[-2]]
evolution_viz += "**Sample Intermediate Prompts:**\n\n"
for idx, pfile in enumerate(sample_files, 1):
try:
with open(pfile, 'r') as f:
prompt_content = f.read()
evolution_viz += f"**Variant {idx}:**\n```\n{prompt_content[:150]}{'...' if len(prompt_content) > 150 else ''}\n```\n\n"
except:
pass
# If no specific files found, show directory contents
if not generation_files and not os.path.exists(log_file) and not os.path.exists(scores_file):
evolution_viz += "### Evolution Complete\n\n"
evolution_viz += "OpenEvolve ran 5 iterations of evolutionary optimization using:\n"
evolution_viz += "- **Variants**: 1 new prompt per iteration\n"
evolution_viz += "- **Selection Strategy**: 40% elite, 10% explore, 50% exploit\n"
evolution_viz += "- **Population**: 1 island, up to 15 programs retained\n"
evolution_viz += "- **Evaluation**: 20 samples per prompt variant\n\n"
# Count files in output directory
all_files = os.listdir(output_dir)
evolution_viz += f"Generated {len(all_files)} files during evolution process.\n\n"
return evolution_viz
except Exception as e:
return f"## 🧬 Evolution Progress\n\nEvolution completed successfully. Unable to parse detailed history: {str(e)}\n\n"
def create_evaluator_file(dataset_name: str, split: str, model: str,
input_field: str, target_field: str, work_dir: str):
"""Create an evaluator.py file for OpenEvolve that uses same 20 samples as initial/final eval."""
evaluator_code = f'''
import os
import random
import time
from datasets import load_dataset
from openai import OpenAI
def evaluate(prompt: str) -> dict:
"""
Evaluate a prompt using 50 fixed samples - SAME as initial and final evaluation.
OpenEvolve passes a file path, so we need to read the prompt from the file.
Using the same 20 samples ensures evolution optimizes for the exact test set.
Includes early stopping and rate limit handling.
"""
try:
# CRITICAL: OpenEvolve passes a FILE PATH, not the prompt text!
# Check if prompt is a file path and read it
if os.path.exists(prompt):
with open(prompt, 'r') as f:
prompt_text = f.read()
# Strip EVOLVE-BLOCK markers if present
prompt_text = prompt_text.replace("# EVOLVE-BLOCK-START", "").replace("# EVOLVE-BLOCK-END", "").strip()
else:
# If not a file path, use as-is (for backward compatibility)
prompt_text = prompt
# IMPORTANT: Use fixed seed for consistent sampling across all evaluations
random.seed(42)
# Load dataset
try:
dataset = load_dataset("{dataset_name}", split="{split}", streaming=False)
except ValueError as e:
if "config" in str(e).lower() or "Config name is missing" in str(e):
default_config = "main"
if "{dataset_name}".lower() == "glue":
default_config = "sst2"
dataset = load_dataset("{dataset_name}", default_config, split="{split}", streaming=False)
else:
raise
# Sample 20 samples with seed 42 - SAME as initial/final evaluation for consistency!
num_samples = 20
if len(dataset) > num_samples:
# Use SAME sampling logic as initial/final eval
indices = random.sample(range(len(dataset)), num_samples)
samples = [dataset[i] for i in indices]
else:
indices = list(range(min(num_samples, len(dataset))))
samples = list(dataset)[:num_samples]
# Initialize OpenAI client
api_key = os.environ.get("OPENAI_API_KEY")
client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=api_key,
)
correct = 0
total = 0
errors = 0
print(f"Evaluating on {{len(samples)}} samples...")
for idx, sample in enumerate(samples):
try:
# Get input and target
input_text = sample.get("{input_field}", "")
if isinstance(input_text, dict):
input_text = str(input_text)
target = sample.get("{target_field}", "")
if isinstance(target, dict):
target = str(target)
# Format the prompt (use prompt_text that we read from file)
formatted_prompt = prompt_text.replace("{{input}}", str(input_text))
# Call the model with retry logic for transient failures
max_retries = 3
for retry in range(max_retries):
try:
response = client.chat.completions.create(
model="{model}",
messages=[
{{"role": "system", "content": "You are a helpful assistant."}},
{{"role": "user", "content": formatted_prompt}}
],
temperature=0.0,
max_tokens=500,
)
break # Success, exit retry loop
except Exception as api_error:
if retry < max_retries - 1:
wait_time = (retry + 1) * 2 # Exponential backoff: 2s, 4s, 6s
print(f" API error on sample {{idx+1}}, retrying in {{wait_time}}s...")
time.sleep(wait_time)
else:
raise # Final retry failed, propagate error
prediction = response.choices[0].message.content.strip()
# Generic answer matching
target_str = str(target).strip()
# Extract core answer (handles GSM8K "####" format, plain labels, etc.)
if "####" in target_str:
target_answer = target_str.split("####")[-1].strip().replace(",", "")
else:
target_answer = target_str
pred_lower = prediction.lower().strip()
target_lower = target_answer.lower().strip()
is_correct = False
# Direct containment
if target_lower in pred_lower:
is_correct = True
# Numeric matching
if not is_correct:
import re as _re
try:
target_num = float(target_lower)
numbers = _re.findall(r'-?[\\d,]+\\.?\\d*', pred_lower)
for n in numbers:
try:
if float(n.replace(",", "")) == target_num:
is_correct = True
break
except ValueError:
continue
except ValueError:
pass
# IMDB-style 0/1 labels
if not is_correct and target_lower in ("0", "1"):
has_positive = "positive" in pred_lower[:200]
has_negative = "negative" in pred_lower[:200]
if target_lower == "1" and has_positive and not has_negative:
is_correct = True
if target_lower == "0" and has_negative and not has_positive:
is_correct = True
if is_correct:
correct += 1
total += 1
# Small delay to avoid rate limiting
time.sleep(0.1)
if (idx + 1) % 25 == 0:
print(f" Progress: {{idx + 1}}/{{len(samples)}} - Current accuracy: {{correct/total:.2%}}")
except Exception as e:
errors += 1
print(f"Error evaluating sample {{idx+1}}: {{e}}")
# Early stopping: if more than 40% of samples fail, abort
if errors > len(samples) * 0.4:
print(f"Too many errors ({{errors}}/{{idx+1}}), stopping evaluation early")
break
continue
accuracy = (correct / total) if total > 0 else 0.0
print(f"Final: {{correct}}/{{total}} = {{accuracy:.2%}}")
# DEBUG: Log the prompt being evaluated and its score (use prompt_text, not file path)
prompt_preview = prompt_text[:80].replace('\\n', ' ') if len(prompt_text) > 80 else prompt_text.replace('\\n', ' ')
print(f"[EVAL DEBUG] Prompt: '{{prompt_preview}}...' → Score: {{accuracy:.2%}}")
return {{
"combined_score": accuracy,
"accuracy": accuracy,
"correct": correct,
"total": total
}}
except Exception as e:
print(f"Error in evaluation: {{e}}")
return {{
"combined_score": 0.0,
"accuracy": 0.0,
"correct": 0,
"total": 0,
"error": str(e)
}}
'''
evaluator_path = os.path.join(work_dir, "evaluator.py")
with open(evaluator_path, "w") as f:
f.write(evaluator_code)
return evaluator_path
def create_config_file(model: str, work_dir: str):
"""Create a config.yaml file for OpenEvolve."""
# Create custom templates directory for prompt optimization
templates_dir = os.path.join(work_dir, "templates")
os.makedirs(templates_dir, exist_ok=True)
# Create custom system template for PROMPT optimization (not code)
system_template = """You are an expert prompt engineer tasked with iteratively improving prompts for language models.
Your job is to analyze the current prompt and suggest improvements based on performance feedback.
CRITICAL RULES:
1. Keep prompts BRIEF and DIRECT - shorter is usually better
2. Preserve the EXACT output format that the evaluation expects
3. Do NOT make prompts conversational or verbose
4. Do NOT ask for explanations - just ask for the answer
5. Maintain all placeholder variables like {input}, {text}, etc.
6. Focus on clarity and directness, not linguistic elegance
7. Avoid prompts that might cause the model to discuss multiple possibilities
For classification tasks:
- Ask for direct classification (e.g., "The sentiment is positive")
- Avoid asking "what", "why", or "explain" - just ask for the label
- Ensure the response will include the label word (positive/negative/neutral)
- Keep prompts short enough that responses stay focused
- IMPORTANT: The prompt should naturally cause the model to echo the task type in its response
(e.g., if classifying sentiment, the response should include the word "sentiment")
Good patterns for classification prompts:
- "[Action] [task_type] [delimiter] {input}" - e.g., "Classify sentiment: {input}"
- "[Task_type] of [delimiter] {input}" - e.g., "Sentiment of: {input}"
- "[Action] the [task_type]: {input}" - e.g., "Determine the sentiment: {input}"
Bad patterns to avoid:
- Questions ("Is this X or Y?", "What is the X?") - too conversational
- No task type mentioned - response won't include the keyword
- Verbose explanations - pushes keywords past evaluation window
- Multiple questions - confuses the model
"""
with open(os.path.join(templates_dir, "system_message.txt"), "w") as f:
f.write(system_template)
# Create custom user template for prompt rewriting
user_template = """# Current Prompt Performance
- Current metrics: {metrics}
- Areas for improvement: {improvement_areas}
{artifacts}
# Prompt Evolution History
{evolution_history}
# Current Prompt
```text
{current_program}
```
# Task
Rewrite the prompt to MAXIMIZE accuracy on sentiment classification.
CRITICAL REQUIREMENTS:
1. The model's response MUST include the word "sentiment"
2. The model's response MUST include either "positive" or "negative"
3. You MUST keep the {{input}} placeholder exactly as {{input}}
EVALUATION CRITERIA:
- Responses are evaluated by checking if they contain "sentiment" AND ("positive" OR "negative") in the first 150 characters
- The response must match the true label (positive=1, negative=0)
Be creative! Try different approaches:
- Direct instructions vs detailed explanations
- Short prompts vs longer contextual prompts
- Imperative commands vs questions
- System-style vs user-style prompts
- With or without examples/formatting instructions
The goal is to maximize the model's accuracy. Experiment freely!
Output ONLY the new prompt between ```text markers:
```text
Your improved prompt here
```
"""
with open(os.path.join(templates_dir, "full_rewrite_user.txt"), "w") as f:
f.write(user_template)
config = {
"llm": {
"primary_model": "google/gemini-2.5-flash", # Use STRONGER model for prompt generation
"api_base": "https://openrouter.ai/api/v1", # Use OpenRouter endpoint
"temperature": 1.2, # Even higher temperature for more creative variations
},
"max_iterations": 5, # Fewer iterations to fit within time limits
"checkpoint_interval": 1, # Save checkpoints every iteration to preserve prompt history
"diff_based_evolution": False, # Use full rewrite mode for prompts (not diff/patch mode)
"language": "text", # CRITICAL: Optimize text/prompts, not Python code!
"max_code_length": 40000, # Allow long prompts (default 10000 is too short)
"num_islands": 1, # IMPORTANT: Use only 1 island (not 5) for simpler evolution
"prompt": {
"template_dir": templates_dir, # Use our custom prompt engineering templates
},
"evolution": {
"population_size": 15, # Larger population = more variants per generation
"num_islands": 1, # Single island for simpler evolution
"elite_ratio": 0.4, # Keep top 40% (6 best prompts)
"explore_ratio": 0.1, # Minimal random exploration (only 1-2 prompts)
"exploit_ratio": 0.5, # 50% exploitation of best prompts
},
"database": {
"log_prompts": True, # Save prompts used to generate each program
"num_islands": 1, # CRITICAL: This is where island count is actually read from!
},
"evaluator": {
"timeout": 3600, # 1 hour timeout (effectively disabled, but prevents NoneType arithmetic errors)
"cascade_evaluation": False, # Disable cascade to prevent signal errors
"parallel_evaluations": 1, # Single worker to avoid multiprocessing complexity
"distributed": False, # No distributed processing
}
}
config_path = os.path.join(work_dir, "config.yaml")
with open(config_path, "w") as f:
yaml.dump(config, f)
return config_path
def optimize_prompt(initial_prompt: str, dataset_name: str, dataset_split: str,
model: str, input_field: str, target_field: str,
progress=gr.Progress()) -> Tuple[str, str, str]:
"""Run OpenEvolve to optimize the prompt."""
progress(0, desc="Validating inputs...")
# Validate all inputs
is_valid, validation_message = validate_inputs(
dataset_name, dataset_split, input_field, target_field, initial_prompt
)
if not is_valid:
return f"## Validation Failed\n\n{validation_message}", "", ""
progress(0.05, desc=f"Validation passed: {validation_message}")
# Create temporary working directory
work_dir = tempfile.mkdtemp(prefix="openevolve_")
try:
# Save initial prompt with EVOLVE-BLOCK markers for OpenEvolve
# These markers tell OpenEvolve which part to optimize
initial_prompt_path = os.path.join(work_dir, "initial_prompt.txt")
with open(initial_prompt_path, "w") as f:
# Wrap prompt in evolve markers so OpenEvolve knows what to optimize
f.write("# EVOLVE-BLOCK-START\n")
f.write(initial_prompt)
f.write("\n# EVOLVE-BLOCK-END\n")
# Create evaluator
progress(0.1, desc="Creating evaluator...")
evaluator_path = create_evaluator_file(dataset_name, dataset_split, model,
input_field, target_field, work_dir)
# Create config
progress(0.15, desc="Creating configuration...")
config_path = create_config_file(model, work_dir)
# Run initial evaluation with 20 samples
# IMPORTANT: We save the indices to ensure final eval uses THE SAME samples
progress(0.2, desc="Running initial evaluation on 20 samples...")
initial_eval = evaluate_prompt(
initial_prompt, dataset_name, dataset_split, 20,
model, input_field, target_field
)
if "error" in initial_eval:
return f"## Error\n\n❌ Initial evaluation failed: {initial_eval['error']}", "", ""
if initial_eval["total"] == 0:
return f"## Error\n\n❌ Initial evaluation failed: No samples could be evaluated. This usually means:\n- API key is invalid or has no credits\n- Model is unavailable or rate-limited\n- Dataset fields are incorrect\n- Network connectivity issues\n\nPlease check your configuration and try again.", "", ""
# Save the indices for final evaluation (ensures fair comparison)
eval_indices = initial_eval.get("indices", [])
initial_results = f"""
### Initial Prompt Evaluation
**Prompt:**
```
{initial_prompt}
```
**Results:**
- Accuracy: {initial_eval['accuracy']:.2f}%
- Correct: {initial_eval['correct']}/{initial_eval['total']}
**Sample Results:**
"""
for i, result in enumerate(initial_eval['results'][:5], 1):
initial_results += f"\n{i}. Input: {result['input']}\n"
initial_results += f" Target: {result['target']}\n"
initial_results += f" Prediction: {result['prediction']}\n"
initial_results += f" ✓ Correct\n" if result['correct'] else f" ✗ Incorrect\n"
# Run OpenEvolve
progress(0.3, desc="Starting evolution: 5 iterations...")
output_dir = os.path.join(work_dir, "output")
os.makedirs(output_dir, exist_ok=True)
try:
# Comprehensive fix for "signal only works in main thread" in Gradio
# We need to prevent OpenEvolve from using signal handlers entirely
# Step 1: Set environment variable to disable process pool
import os as os_env
os_env.environ['OPENEVOLVE_NO_PARALLEL'] = '1'
# Step 2: Monkey-patch signal module to ignore signal calls in threads
import signal
import threading
original_signal = signal.signal
def safe_signal(signum, handler):
"""Only set signal handlers in main thread"""
if threading.current_thread() is threading.main_thread():
return original_signal(signum, handler)
else:
# Return a dummy handler in non-main threads
return signal.SIG_DFL
signal.signal = safe_signal
# Run evolution with signal patch in place
result = run_evolution(
initial_program=initial_prompt_path,
evaluator=evaluator_path,
config=config_path,
output_dir=output_dir
)
# Restore signal handler
signal.signal = original_signal
progress(0.80, desc="Parsing evolution history...")
# Parse evolution history for visualization
evolution_viz = parse_evolution_history(output_dir)
progress(0.85, desc="Evaluating best evolved prompt...")
# Get the best prompt (OpenEvolve saves to output_dir/best/best_program.txt)
best_prompt_path = os.path.join(output_dir, "best", "best_program.txt")
if os.path.exists(best_prompt_path):
with open(best_prompt_path, "r") as f:
best_prompt_raw = f.read()
# Strip EVOLVE-BLOCK markers that we added
best_prompt = best_prompt_raw.replace("# EVOLVE-BLOCK-START", "").replace("# EVOLVE-BLOCK-END", "").strip()
print(f"\n[SELECTION] OpenEvolve selected best prompt from: {best_prompt_path}")
print(f"[SELECTION] Raw prompt length: {len(best_prompt_raw)} chars")
print(f"[SELECTION] Best prompt: '{best_prompt[:100].replace(chr(10), ' ')}...'")
else:
# Fallback: try without the "best" subdirectory
best_prompt_path_alt = os.path.join(output_dir, "best_program.txt")
if os.path.exists(best_prompt_path_alt):
with open(best_prompt_path_alt, "r") as f:
best_prompt_raw = f.read()
# Strip EVOLVE-BLOCK markers
best_prompt = best_prompt_raw.replace("# EVOLVE-BLOCK-START", "").replace("# EVOLVE-BLOCK-END", "").strip()
print(f"\n[SELECTION] OpenEvolve selected best prompt from: {best_prompt_path_alt}")
print(f"[SELECTION] Raw prompt length: {len(best_prompt_raw)} chars")
print(f"[SELECTION] Best prompt: '{best_prompt[:100].replace(chr(10), ' ')}...'")
else:
best_prompt = initial_prompt
print(f"\n[SELECTION] WARNING: No best_program.txt found, using initial prompt")
# Final evaluation: Use same 20 samples as initial eval for fair comparison
progress(0.85, desc="Evaluating best prompt on 20 samples (same as initial)...")
final_eval = evaluate_prompt(
best_prompt, dataset_name, dataset_split, 20,
model, input_field, target_field,
fixed_indices=eval_indices # Use same 20 samples as initial eval!
)
# If evolution regressed, fall back to the initial prompt
if final_eval['accuracy'] < initial_eval['accuracy']:
best_prompt = initial_prompt
final_eval = initial_eval
regression = True
else:
regression = False
progress(0.95, desc=f"Evaluation complete: {final_eval['correct']}/{final_eval['total']} = {final_eval['accuracy']:.1f}%")
improvement = final_eval['accuracy'] - initial_eval['accuracy']
final_results = f"""
### Best Prompt
**Prompt:**
```
{best_prompt}
```
**Results:**
- Accuracy: {final_eval['accuracy']:.2f}%
- Correct: {final_eval['correct']}/{final_eval['total']}
"""
if regression:
final_results += "\n**Note:** Evolution did not improve on the initial prompt. Keeping the original.\n"
else:
final_results += f"\n- Improvement: {improvement:+.2f}%\n"
final_results += "\n**Sample Results:**\n"
for i, result in enumerate(final_eval['results'][:5], 1):
final_results += f"\n{i}. Input: {result['input']}\n"
final_results += f" Target: {result['target']}\n"
final_results += f" Prediction: {result['prediction']}\n"
final_results += f" ✓ Correct\n" if result['correct'] else f" ✗ Incorrect\n"
if regression:
summary_title = "## Optimization Complete (No Improvement)"
summary_note = "\n**Evolution did not find a better prompt.** The initial prompt is already strong for this task.\n"
else:
summary_title = "## Optimization Complete!"
summary_note = ""
summary = f"""
{summary_title}
### Summary
- **Dataset**: {dataset_name} ({dataset_split} split)
- **Evaluation Model**: {model}
- **Evolution Model**: google/gemini-2.5-flash
- **Samples**: 20 (same for initial, evolution, and final eval)
- **Iterations**: 5
{summary_note}
### Results
- **Initial Accuracy**: {initial_eval['accuracy']:.2f}% ({initial_eval['correct']}/{initial_eval['total']})
- **Final Accuracy**: {final_eval['accuracy']:.2f}% ({final_eval['correct']}/{final_eval['total']})
- **Improvement**: {improvement:+.2f}%
{validation_message}
"""
progress(1.0, desc="Complete!")
return summary, initial_results, final_results
except Exception as e:
return f"## Error During Evolution\n\n❌ {str(e)}", initial_results, ""
finally:
# Don't clean up - keep prompts for browsing
# User can manually clean /tmp if needed
pass
# Custom CSS for a polished, branded look
custom_css = """
/* Minimal CSS — only style what Gradio can't handle natively.
All text-bearing elements use gr.Markdown (inherits theme colors).
Only the run button gets custom styling. */
.gradio-container { max-width: 1200px !important; margin: auto; }
/* Primary action button — always purple with white text */
.run-btn button, .run-btn > button, button.run-btn, .run-btn {
background: linear-gradient(135deg, #7c3aed 0%, #6d28d9 100%) !important;
color: #fff !important;
border: none !important;
border-radius: 12px !important;
font-size: 1.05rem !important;
font-weight: 600 !important;
padding: 14px 28px !important;
transition: transform 0.1s, box-shadow 0.2s !important;
}
.run-btn:hover, .run-btn button:hover {
transform: translateY(-1px) !important;
box-shadow: 0 8px 24px rgba(124,58,237,0.35) !important;
color: #fff !important;
}
"""
# Preset configurations
PRESETS = {
"imdb": {
"dataset": "stanfordnlp/imdb",
"split": "test",
"input": "text",
"target": "label",
"prompt": "What do you think about this? {input}",
},
}
def load_preset(name):
p = PRESETS[name]
return p["dataset"], p["split"], p["input"], p["target"], p["prompt"]
# Create Gradio interface
with gr.Blocks(title="OpenEvolve Prompt Optimizer") as demo:
# --- Hero header (self-contained dark bg, always light text) ---
gr.HTML("""
<div style="background:linear-gradient(135deg,#0f0c29 0%,#302b63 50%,#24243e 100%);
border-radius:16px;padding:32px 40px;margin-bottom:8px;text-align:center;">
<h1 style="color:#fff;font-size:2rem;font-weight:700;margin:0 0 8px 0;letter-spacing:-0.02em;">
OpenEvolve Prompt Optimizer
</h1>
<p style="color:#c4b5fd;font-size:0.95rem;margin:0;">
Evolve better prompts automatically using
<a href="https://github.com/codelion/openevolve" target="_blank" style="color:#c4b5fd;text-decoration:underline;">OpenEvolve</a>.
Powered by <code style="background:rgba(255,255,255,0.12);color:#e0d4ff;padding:2px 6px;border-radius:4px;">gemini-2.5-flash</code> via
<a href="https://openrouter.ai/" target="_blank" style="color:#c4b5fd;text-decoration:underline;">OpenRouter</a>.
</p>
<p style="color:#94a3b8;font-size:0.82rem;margin:12px 0 0 0;">
<strong style="color:#a78bfa;">1.</strong> Pick a dataset &amp; prompt &rarr;
<strong style="color:#a78bfa;">2.</strong> Evolve 5 iterations &rarr;
<strong style="color:#a78bfa;">3.</strong> Compare results side-by-side
</p>
</div>
""")
# --- Setup ---
gr.Markdown("#### Dataset")
gr.Markdown("Quick preset:")
preset_imdb = gr.Button("IMDB Sentiment", size="sm")
dataset_name = gr.Textbox(
label="HuggingFace Dataset",
value="stanfordnlp/imdb",
placeholder="org/dataset-name",
)
with gr.Row():
dataset_split = gr.Textbox(label="Split", value="test", scale=1)
input_field = gr.Textbox(label="Input Field", value="text", scale=1)
target_field = gr.Textbox(label="Target Field", value="label", scale=1)
gr.Markdown("#### Prompt")
initial_prompt = gr.TextArea(
label="Initial Prompt",
value="What do you think about this? {input}",
lines=5,
info="Must contain {input} placeholder. Start with a weak prompt -- evolution will improve it!",
)
gr.Markdown(
"*Eval model:* `gemini-2.5-flash-lite` (20 samples) | *Evolution model:* `gemini-2.5-flash` (5 iterations) \n"
"**Note:** Optimization can take up to 10 minutes to complete."
)
# Run button
optimize_btn = gr.Button(
"Optimize Prompt",
variant="primary",
size="lg",
elem_classes="run-btn",
)
# --- Results ---
gr.Markdown("---")
summary = gr.Markdown("")
with gr.Row(equal_height=True):
with gr.Column():
initial_results = gr.Markdown("**Initial Prompt**\n\nResults will appear here after optimization...")
with gr.Column():
final_results = gr.Markdown("**Evolved Prompt**\n\nResults will appear here after optimization...")
# --- Wiring ---
def optimize_with_fixed_model(initial_prompt, dataset_name, dataset_split,
input_field, target_field, progress=gr.Progress()):
return optimize_prompt(
initial_prompt, dataset_name, dataset_split,
MODELS[0],
input_field, target_field, progress
)
optimize_btn.click(
fn=optimize_with_fixed_model,
inputs=[initial_prompt, dataset_name, dataset_split,
input_field, target_field],
outputs=[summary, initial_results, final_results],
)
preset_outputs = [dataset_name, dataset_split, input_field, target_field, initial_prompt]
preset_imdb.click(fn=lambda: load_preset("imdb"), outputs=preset_outputs)
if __name__ == "__main__":
demo.launch(css=custom_css)