| |
| """ |
| BeatHeritage V1 vs Mapperatorinator V30 Benchmark Script |
| Compares performance, quality, and generation characteristics |
| """ |
|
|
| import os |
| import sys |
| import time |
| import json |
| import argparse |
| import subprocess |
| import numpy as np |
| import pandas as pd |
| from pathlib import Path |
| from typing import Dict, List, Tuple, Optional |
| from datetime import datetime |
| import torch |
| import matplotlib.pyplot as plt |
| import seaborn as sns |
| from tqdm import tqdm |
| import logging |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(levelname)s - %(message)s' |
| ) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| class BenchmarkRunner: |
| """Run benchmarks comparing BeatHeritage V1 with Mapperatorinator V30""" |
| |
| def __init__(self, output_dir: str = "./benchmark_results"): |
| self.output_dir = Path(output_dir) |
| self.output_dir.mkdir(parents=True, exist_ok=True) |
| self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| self.results = [] |
| |
| def run_inference(self, model_config: str, audio_path: str, |
| gamemode: int, difficulty: float) -> Dict: |
| """Run inference with specified model and parameters""" |
| |
| output_path = self.output_dir / f"{model_config}_{Path(audio_path).stem}" |
| output_path.mkdir(parents=True, exist_ok=True) |
| |
| cmd = [ |
| 'python', 'inference.py', |
| '-cn', model_config, |
| f'audio_path={audio_path}', |
| f'output_path={str(output_path)}', |
| f'gamemode={gamemode}', |
| f'difficulty={difficulty}', |
| ] |
| |
| |
| if model_config == 'beatheritage_v1': |
| cmd.extend([ |
| 'temperature=0.85', |
| 'top_p=0.92', |
| 'quality_control.enable_auto_correction=true', |
| 'quality_control.enable_flow_optimization=true', |
| 'advanced_features.enable_pattern_variety=true', |
| ]) |
| else: |
| cmd.extend([ |
| 'temperature=0.9', |
| 'top_p=0.9', |
| ]) |
| |
| |
| start_time = time.time() |
| memory_before = self._get_memory_usage() |
| |
| try: |
| result = subprocess.run( |
| cmd, |
| capture_output=True, |
| text=True, |
| check=True |
| ) |
| |
| end_time = time.time() |
| memory_after = self._get_memory_usage() |
| |
| |
| output_files = list(output_path.glob('*.osu')) |
| |
| metrics = { |
| 'model': model_config, |
| 'audio': Path(audio_path).name, |
| 'gamemode': gamemode, |
| 'difficulty': difficulty, |
| 'generation_time': end_time - start_time, |
| 'memory_usage': memory_after - memory_before, |
| 'success': True, |
| 'output_files': len(output_files), |
| 'quality_metrics': self._analyze_quality(output_files[0] if output_files else None) |
| } |
| |
| except subprocess.CalledProcessError as e: |
| logger.error(f"Error running {model_config}: {e}") |
| metrics = { |
| 'model': model_config, |
| 'audio': Path(audio_path).name, |
| 'gamemode': gamemode, |
| 'difficulty': difficulty, |
| 'generation_time': -1, |
| 'memory_usage': -1, |
| 'success': False, |
| 'error': str(e), |
| 'output_files': 0, |
| 'quality_metrics': {} |
| } |
| |
| return metrics |
| |
| def _get_memory_usage(self) -> float: |
| """Get current GPU memory usage in MB""" |
| if torch.cuda.is_available(): |
| return torch.cuda.memory_allocated() / 1024**2 |
| return 0 |
| |
| def _analyze_quality(self, osu_file: Optional[Path]) -> Dict: |
| """Analyze quality metrics of generated beatmap""" |
| if not osu_file or not osu_file.exists(): |
| return {} |
| |
| metrics = { |
| 'object_count': 0, |
| 'avg_spacing': 0, |
| 'spacing_variance': 0, |
| 'pattern_diversity': 0, |
| 'flow_score': 0, |
| 'difficulty_consistency': 0 |
| } |
| |
| try: |
| with open(osu_file, 'r', encoding='utf-8') as f: |
| lines = f.readlines() |
| |
| |
| hit_objects = [] |
| in_hit_objects = False |
| |
| for line in lines: |
| if '[HitObjects]' in line: |
| in_hit_objects = True |
| continue |
| |
| if in_hit_objects and line.strip(): |
| parts = line.strip().split(',') |
| if len(parts) >= 2: |
| try: |
| x, y = int(parts[0]), int(parts[1]) |
| hit_objects.append((x, y)) |
| except: |
| pass |
| |
| metrics['object_count'] = len(hit_objects) |
| |
| if len(hit_objects) > 1: |
| |
| distances = [] |
| for i in range(1, len(hit_objects)): |
| dist = np.sqrt( |
| (hit_objects[i][0] - hit_objects[i-1][0])**2 + |
| (hit_objects[i][1] - hit_objects[i-1][1])**2 |
| ) |
| distances.append(dist) |
| |
| metrics['avg_spacing'] = np.mean(distances) |
| metrics['spacing_variance'] = np.var(distances) |
| |
| |
| hist, _ = np.histogram(distances, bins=10) |
| hist = hist / hist.sum() |
| entropy = -np.sum(hist * np.log(hist + 1e-10)) |
| metrics['pattern_diversity'] = entropy |
| |
| |
| if len(hit_objects) > 2: |
| angles = [] |
| for i in range(2, len(hit_objects)): |
| angle = self._calculate_angle( |
| hit_objects[i-2], |
| hit_objects[i-1], |
| hit_objects[i] |
| ) |
| angles.append(angle) |
| |
| |
| metrics['flow_score'] = 1.0 / (1.0 + np.var(angles) / 100) |
| |
| |
| chunk_size = max(10, len(distances) // 10) |
| chunk_variances = [] |
| for i in range(0, len(distances), chunk_size): |
| chunk = distances[i:i+chunk_size] |
| if chunk: |
| chunk_variances.append(np.var(chunk)) |
| |
| if chunk_variances: |
| metrics['difficulty_consistency'] = 1.0 / (1.0 + np.var(chunk_variances)) |
| |
| except Exception as e: |
| logger.error(f"Error analyzing quality: {e}") |
| |
| return metrics |
| |
| def _calculate_angle(self, p1: Tuple, p2: Tuple, p3: Tuple) -> float: |
| """Calculate angle between three points""" |
| v1 = (p2[0] - p1[0], p2[1] - p1[1]) |
| v2 = (p3[0] - p2[0], p3[1] - p2[1]) |
| |
| angle1 = np.arctan2(v1[1], v1[0]) |
| angle2 = np.arctan2(v2[1], v2[0]) |
| |
| angle_diff = angle2 - angle1 |
| |
| while angle_diff > np.pi: |
| angle_diff -= 2 * np.pi |
| while angle_diff < -np.pi: |
| angle_diff += 2 * np.pi |
| |
| return abs(angle_diff) |
| |
| def run_benchmark_suite(self, test_audio_files: List[str]): |
| """Run complete benchmark suite""" |
| |
| models = ['beatheritage_v1', 'v30'] |
| gamemodes = [0, 1, 2, 3] |
| difficulties = [3.0, 5.5, 7.5] |
| |
| total_tests = len(test_audio_files) * len(models) * len(gamemodes) * len(difficulties) |
| |
| with tqdm(total=total_tests, desc="Running benchmarks") as pbar: |
| for audio_file in test_audio_files: |
| for gamemode in gamemodes: |
| for difficulty in difficulties: |
| for model in models: |
| logger.info(f"Testing {model} on {audio_file} " |
| f"(GM:{gamemode}, Diff:{difficulty})") |
| |
| result = self.run_inference( |
| model, audio_file, gamemode, difficulty |
| ) |
| self.results.append(result) |
| pbar.update(1) |
| |
| |
| self._save_results() |
| |
| def _save_results(self): |
| """Save benchmark results to JSON and CSV""" |
| |
| json_path = self.output_dir / f"benchmark_results_{self.timestamp}.json" |
| with open(json_path, 'w') as f: |
| json.dump(self.results, f, indent=2) |
| |
| |
| df = pd.DataFrame(self.results) |
| csv_path = self.output_dir / f"benchmark_results_{self.timestamp}.csv" |
| df.to_csv(csv_path, index=False) |
| |
| logger.info(f"Results saved to {json_path} and {csv_path}") |
| |
| def generate_report(self): |
| """Generate comprehensive benchmark report with visualizations""" |
| |
| if not self.results: |
| logger.error("No results to generate report") |
| return |
| |
| df = pd.DataFrame(self.results) |
| |
| |
| fig = plt.figure(figsize=(20, 12)) |
| |
| |
| ax1 = plt.subplot(2, 3, 1) |
| successful_df = df[df['success'] == True] |
| if not successful_df.empty: |
| sns.boxplot(data=successful_df, x='model', y='generation_time', ax=ax1) |
| ax1.set_title('Generation Time Comparison') |
| ax1.set_ylabel('Time (seconds)') |
| |
| |
| ax2 = plt.subplot(2, 3, 2) |
| if not successful_df.empty: |
| sns.boxplot(data=successful_df, x='model', y='memory_usage', ax=ax2) |
| ax2.set_title('Memory Usage Comparison') |
| ax2.set_ylabel('Memory (MB)') |
| |
| |
| ax3 = plt.subplot(2, 3, 3) |
| success_rates = df.groupby('model')['success'].mean() * 100 |
| success_rates.plot(kind='bar', ax=ax3) |
| ax3.set_title('Success Rate (%)') |
| ax3.set_ylabel('Success Rate') |
| ax3.set_ylim(0, 105) |
| |
| |
| if not successful_df.empty and 'quality_metrics' in successful_df.columns: |
| |
| quality_data = [] |
| for _, row in successful_df.iterrows(): |
| if row['quality_metrics']: |
| quality_data.append({ |
| 'model': row['model'], |
| 'pattern_diversity': row['quality_metrics'].get('pattern_diversity', 0), |
| 'flow_score': row['quality_metrics'].get('flow_score', 0), |
| 'difficulty_consistency': row['quality_metrics'].get('difficulty_consistency', 0) |
| }) |
| |
| if quality_data: |
| quality_df = pd.DataFrame(quality_data) |
| |
| |
| ax4 = plt.subplot(2, 3, 4) |
| if 'pattern_diversity' in quality_df.columns: |
| sns.boxplot(data=quality_df, x='model', y='pattern_diversity', ax=ax4) |
| ax4.set_title('Pattern Diversity Score') |
| |
| |
| ax5 = plt.subplot(2, 3, 5) |
| if 'flow_score' in quality_df.columns: |
| sns.boxplot(data=quality_df, x='model', y='flow_score', ax=ax5) |
| ax5.set_title('Flow Quality Score') |
| |
| |
| ax6 = plt.subplot(2, 3, 6) |
| if 'difficulty_consistency' in quality_df.columns: |
| sns.boxplot(data=quality_df, x='model', y='difficulty_consistency', ax=ax6) |
| ax6.set_title('Difficulty Consistency Score') |
| |
| plt.suptitle('BeatHeritage V1 vs Mapperatorinator V30 Benchmark Report', fontsize=16) |
| plt.tight_layout() |
| |
| |
| plot_path = self.output_dir / f"benchmark_report_{self.timestamp}.png" |
| plt.savefig(plot_path, dpi=150, bbox_inches='tight') |
| plt.show() |
| |
| |
| summary = self._generate_text_summary(df) |
| summary_path = self.output_dir / f"benchmark_summary_{self.timestamp}.txt" |
| with open(summary_path, 'w') as f: |
| f.write(summary) |
| |
| logger.info(f"Report generated: {plot_path} and {summary_path}") |
| |
| def _generate_text_summary(self, df: pd.DataFrame) -> str: |
| """Generate text summary of benchmark results""" |
| |
| summary = [] |
| summary.append("=" * 80) |
| summary.append("BEATHERITAGE V1 VS MAPPERATORINATOR V30 BENCHMARK SUMMARY") |
| summary.append("=" * 80) |
| summary.append(f"Timestamp: {self.timestamp}") |
| summary.append(f"Total Tests: {len(df)}") |
| summary.append("") |
| |
| for model in df['model'].unique(): |
| model_df = df[df['model'] == model] |
| successful_df = model_df[model_df['success'] == True] |
| |
| summary.append(f"\n{model.upper()}") |
| summary.append("-" * 40) |
| summary.append(f"Success Rate: {model_df['success'].mean()*100:.1f}%") |
| |
| if not successful_df.empty: |
| summary.append(f"Avg Generation Time: {successful_df['generation_time'].mean():.2f}s") |
| summary.append(f"Avg Memory Usage: {successful_df['memory_usage'].mean():.1f}MB") |
| |
| |
| quality_metrics = [] |
| for _, row in successful_df.iterrows(): |
| if row['quality_metrics']: |
| quality_metrics.append(row['quality_metrics']) |
| |
| if quality_metrics: |
| avg_diversity = np.mean([m.get('pattern_diversity', 0) for m in quality_metrics]) |
| avg_flow = np.mean([m.get('flow_score', 0) for m in quality_metrics]) |
| avg_consistency = np.mean([m.get('difficulty_consistency', 0) for m in quality_metrics]) |
| |
| summary.append(f"Avg Pattern Diversity: {avg_diversity:.3f}") |
| summary.append(f"Avg Flow Score: {avg_flow:.3f}") |
| summary.append(f"Avg Difficulty Consistency: {avg_consistency:.3f}") |
| |
| |
| summary.append("\n" + "=" * 80) |
| summary.append("WINNER ANALYSIS") |
| summary.append("=" * 80) |
| |
| if len(df['model'].unique()) == 2: |
| model1, model2 = df['model'].unique() |
| |
| |
| metrics_comparison = [] |
| |
| for metric in ['generation_time', 'memory_usage']: |
| m1_avg = df[df['model'] == model1][metric].mean() |
| m2_avg = df[df['model'] == model2][metric].mean() |
| |
| if m1_avg < m2_avg: |
| winner = model1 |
| improvement = ((m2_avg - m1_avg) / m2_avg) * 100 |
| else: |
| winner = model2 |
| improvement = ((m1_avg - m2_avg) / m1_avg) * 100 |
| |
| metrics_comparison.append( |
| f"{metric}: {winner} ({improvement:.1f}% better)" |
| ) |
| |
| for comp in metrics_comparison: |
| summary.append(comp) |
| |
| return "\n".join(summary) |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description='Benchmark BeatHeritage V1 vs V30') |
| parser.add_argument( |
| '--audio-dir', |
| type=str, |
| default='./test_audio', |
| help='Directory containing test audio files' |
| ) |
| parser.add_argument( |
| '--output-dir', |
| type=str, |
| default='./benchmark_results', |
| help='Directory to save benchmark results' |
| ) |
| parser.add_argument( |
| '--quick-test', |
| action='store_true', |
| help='Run quick test with limited parameters' |
| ) |
| |
| args = parser.parse_args() |
| |
| |
| audio_dir = Path(args.audio_dir) |
| if audio_dir.exists(): |
| audio_files = list(audio_dir.glob('*.mp3')) + list(audio_dir.glob('*.ogg')) |
| else: |
| |
| logger.warning(f"Audio directory {audio_dir} not found, using demo files") |
| audio_files = ['demo.mp3'] |
| |
| if args.quick_test: |
| |
| audio_files = audio_files[:1] |
| logger.info("Running quick test with 1 audio file") |
| |
| |
| runner = BenchmarkRunner(args.output_dir) |
| runner.run_benchmark_suite([str(f) for f in audio_files]) |
| runner.generate_report() |
| |
| logger.info("Benchmark complete!") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|