BeatHeritage-v1 / benchmark_comparison.py
fourmansyah's picture
Duplicate from hongminh54/BeatHeritage-v1
12a8e0f
#!/usr/bin/env python3
"""
BeatHeritage V1 vs Mapperatorinator V30 Benchmark Script
Compares performance, quality, and generation characteristics
"""
import os
import sys
import time
import json
import argparse
import subprocess
import numpy as np
import pandas as pd
from pathlib import Path
from typing import Dict, List, Tuple, Optional
from datetime import datetime
import torch
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import logging
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class BenchmarkRunner:
"""Run benchmarks comparing BeatHeritage V1 with Mapperatorinator V30"""
def __init__(self, output_dir: str = "./benchmark_results"):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.results = []
def run_inference(self, model_config: str, audio_path: str,
gamemode: int, difficulty: float) -> Dict:
"""Run inference with specified model and parameters"""
output_path = self.output_dir / f"{model_config}_{Path(audio_path).stem}"
output_path.mkdir(parents=True, exist_ok=True)
cmd = [
'python', 'inference.py',
'-cn', model_config,
f'audio_path={audio_path}',
f'output_path={str(output_path)}',
f'gamemode={gamemode}',
f'difficulty={difficulty}',
]
# Add model-specific parameters
if model_config == 'beatheritage_v1':
cmd.extend([
'temperature=0.85',
'top_p=0.92',
'quality_control.enable_auto_correction=true',
'quality_control.enable_flow_optimization=true',
'advanced_features.enable_pattern_variety=true',
])
else: # v30
cmd.extend([
'temperature=0.9',
'top_p=0.9',
])
# Measure performance
start_time = time.time()
memory_before = self._get_memory_usage()
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True
)
end_time = time.time()
memory_after = self._get_memory_usage()
# Parse output for quality metrics
output_files = list(output_path.glob('*.osu'))
metrics = {
'model': model_config,
'audio': Path(audio_path).name,
'gamemode': gamemode,
'difficulty': difficulty,
'generation_time': end_time - start_time,
'memory_usage': memory_after - memory_before,
'success': True,
'output_files': len(output_files),
'quality_metrics': self._analyze_quality(output_files[0] if output_files else None)
}
except subprocess.CalledProcessError as e:
logger.error(f"Error running {model_config}: {e}")
metrics = {
'model': model_config,
'audio': Path(audio_path).name,
'gamemode': gamemode,
'difficulty': difficulty,
'generation_time': -1,
'memory_usage': -1,
'success': False,
'error': str(e),
'output_files': 0,
'quality_metrics': {}
}
return metrics
def _get_memory_usage(self) -> float:
"""Get current GPU memory usage in MB"""
if torch.cuda.is_available():
return torch.cuda.memory_allocated() / 1024**2
return 0
def _analyze_quality(self, osu_file: Optional[Path]) -> Dict:
"""Analyze quality metrics of generated beatmap"""
if not osu_file or not osu_file.exists():
return {}
metrics = {
'object_count': 0,
'avg_spacing': 0,
'spacing_variance': 0,
'pattern_diversity': 0,
'flow_score': 0,
'difficulty_consistency': 0
}
try:
with open(osu_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
# Parse hit objects
hit_objects = []
in_hit_objects = False
for line in lines:
if '[HitObjects]' in line:
in_hit_objects = True
continue
if in_hit_objects and line.strip():
parts = line.strip().split(',')
if len(parts) >= 2:
try:
x, y = int(parts[0]), int(parts[1])
hit_objects.append((x, y))
except:
pass
metrics['object_count'] = len(hit_objects)
if len(hit_objects) > 1:
# Calculate spacing metrics
distances = []
for i in range(1, len(hit_objects)):
dist = np.sqrt(
(hit_objects[i][0] - hit_objects[i-1][0])**2 +
(hit_objects[i][1] - hit_objects[i-1][1])**2
)
distances.append(dist)
metrics['avg_spacing'] = np.mean(distances)
metrics['spacing_variance'] = np.var(distances)
# Pattern diversity (entropy of distance distribution)
hist, _ = np.histogram(distances, bins=10)
hist = hist / hist.sum()
entropy = -np.sum(hist * np.log(hist + 1e-10))
metrics['pattern_diversity'] = entropy
# Flow score (based on angle changes)
if len(hit_objects) > 2:
angles = []
for i in range(2, len(hit_objects)):
angle = self._calculate_angle(
hit_objects[i-2],
hit_objects[i-1],
hit_objects[i]
)
angles.append(angle)
# Lower angle variance = better flow
metrics['flow_score'] = 1.0 / (1.0 + np.var(angles) / 100)
# Difficulty consistency
chunk_size = max(10, len(distances) // 10)
chunk_variances = []
for i in range(0, len(distances), chunk_size):
chunk = distances[i:i+chunk_size]
if chunk:
chunk_variances.append(np.var(chunk))
if chunk_variances:
metrics['difficulty_consistency'] = 1.0 / (1.0 + np.var(chunk_variances))
except Exception as e:
logger.error(f"Error analyzing quality: {e}")
return metrics
def _calculate_angle(self, p1: Tuple, p2: Tuple, p3: Tuple) -> float:
"""Calculate angle between three points"""
v1 = (p2[0] - p1[0], p2[1] - p1[1])
v2 = (p3[0] - p2[0], p3[1] - p2[1])
angle1 = np.arctan2(v1[1], v1[0])
angle2 = np.arctan2(v2[1], v2[0])
angle_diff = angle2 - angle1
# Normalize to [-pi, pi]
while angle_diff > np.pi:
angle_diff -= 2 * np.pi
while angle_diff < -np.pi:
angle_diff += 2 * np.pi
return abs(angle_diff)
def run_benchmark_suite(self, test_audio_files: List[str]):
"""Run complete benchmark suite"""
models = ['beatheritage_v1', 'v30']
gamemodes = [0, 1, 2, 3] # All gamemodes
difficulties = [3.0, 5.5, 7.5] # Easy, Normal, Hard
total_tests = len(test_audio_files) * len(models) * len(gamemodes) * len(difficulties)
with tqdm(total=total_tests, desc="Running benchmarks") as pbar:
for audio_file in test_audio_files:
for gamemode in gamemodes:
for difficulty in difficulties:
for model in models:
logger.info(f"Testing {model} on {audio_file} "
f"(GM:{gamemode}, Diff:{difficulty})")
result = self.run_inference(
model, audio_file, gamemode, difficulty
)
self.results.append(result)
pbar.update(1)
# Save intermediate results
self._save_results()
def _save_results(self):
"""Save benchmark results to JSON and CSV"""
# Save as JSON
json_path = self.output_dir / f"benchmark_results_{self.timestamp}.json"
with open(json_path, 'w') as f:
json.dump(self.results, f, indent=2)
# Save as CSV for analysis
df = pd.DataFrame(self.results)
csv_path = self.output_dir / f"benchmark_results_{self.timestamp}.csv"
df.to_csv(csv_path, index=False)
logger.info(f"Results saved to {json_path} and {csv_path}")
def generate_report(self):
"""Generate comprehensive benchmark report with visualizations"""
if not self.results:
logger.error("No results to generate report")
return
df = pd.DataFrame(self.results)
# Create visualizations
fig = plt.figure(figsize=(20, 12))
# 1. Generation Time Comparison
ax1 = plt.subplot(2, 3, 1)
successful_df = df[df['success'] == True]
if not successful_df.empty:
sns.boxplot(data=successful_df, x='model', y='generation_time', ax=ax1)
ax1.set_title('Generation Time Comparison')
ax1.set_ylabel('Time (seconds)')
# 2. Memory Usage Comparison
ax2 = plt.subplot(2, 3, 2)
if not successful_df.empty:
sns.boxplot(data=successful_df, x='model', y='memory_usage', ax=ax2)
ax2.set_title('Memory Usage Comparison')
ax2.set_ylabel('Memory (MB)')
# 3. Success Rate
ax3 = plt.subplot(2, 3, 3)
success_rates = df.groupby('model')['success'].mean() * 100
success_rates.plot(kind='bar', ax=ax3)
ax3.set_title('Success Rate (%)')
ax3.set_ylabel('Success Rate')
ax3.set_ylim(0, 105)
# 4. Quality Metrics Comparison
if not successful_df.empty and 'quality_metrics' in successful_df.columns:
# Extract quality metrics
quality_data = []
for _, row in successful_df.iterrows():
if row['quality_metrics']:
quality_data.append({
'model': row['model'],
'pattern_diversity': row['quality_metrics'].get('pattern_diversity', 0),
'flow_score': row['quality_metrics'].get('flow_score', 0),
'difficulty_consistency': row['quality_metrics'].get('difficulty_consistency', 0)
})
if quality_data:
quality_df = pd.DataFrame(quality_data)
# Pattern Diversity
ax4 = plt.subplot(2, 3, 4)
if 'pattern_diversity' in quality_df.columns:
sns.boxplot(data=quality_df, x='model', y='pattern_diversity', ax=ax4)
ax4.set_title('Pattern Diversity Score')
# Flow Score
ax5 = plt.subplot(2, 3, 5)
if 'flow_score' in quality_df.columns:
sns.boxplot(data=quality_df, x='model', y='flow_score', ax=ax5)
ax5.set_title('Flow Quality Score')
# Difficulty Consistency
ax6 = plt.subplot(2, 3, 6)
if 'difficulty_consistency' in quality_df.columns:
sns.boxplot(data=quality_df, x='model', y='difficulty_consistency', ax=ax6)
ax6.set_title('Difficulty Consistency Score')
plt.suptitle('BeatHeritage V1 vs Mapperatorinator V30 Benchmark Report', fontsize=16)
plt.tight_layout()
# Save plot
plot_path = self.output_dir / f"benchmark_report_{self.timestamp}.png"
plt.savefig(plot_path, dpi=150, bbox_inches='tight')
plt.show()
# Generate text summary
summary = self._generate_text_summary(df)
summary_path = self.output_dir / f"benchmark_summary_{self.timestamp}.txt"
with open(summary_path, 'w') as f:
f.write(summary)
logger.info(f"Report generated: {plot_path} and {summary_path}")
def _generate_text_summary(self, df: pd.DataFrame) -> str:
"""Generate text summary of benchmark results"""
summary = []
summary.append("=" * 80)
summary.append("BEATHERITAGE V1 VS MAPPERATORINATOR V30 BENCHMARK SUMMARY")
summary.append("=" * 80)
summary.append(f"Timestamp: {self.timestamp}")
summary.append(f"Total Tests: {len(df)}")
summary.append("")
for model in df['model'].unique():
model_df = df[df['model'] == model]
successful_df = model_df[model_df['success'] == True]
summary.append(f"\n{model.upper()}")
summary.append("-" * 40)
summary.append(f"Success Rate: {model_df['success'].mean()*100:.1f}%")
if not successful_df.empty:
summary.append(f"Avg Generation Time: {successful_df['generation_time'].mean():.2f}s")
summary.append(f"Avg Memory Usage: {successful_df['memory_usage'].mean():.1f}MB")
# Quality metrics
quality_metrics = []
for _, row in successful_df.iterrows():
if row['quality_metrics']:
quality_metrics.append(row['quality_metrics'])
if quality_metrics:
avg_diversity = np.mean([m.get('pattern_diversity', 0) for m in quality_metrics])
avg_flow = np.mean([m.get('flow_score', 0) for m in quality_metrics])
avg_consistency = np.mean([m.get('difficulty_consistency', 0) for m in quality_metrics])
summary.append(f"Avg Pattern Diversity: {avg_diversity:.3f}")
summary.append(f"Avg Flow Score: {avg_flow:.3f}")
summary.append(f"Avg Difficulty Consistency: {avg_consistency:.3f}")
# Winner determination
summary.append("\n" + "=" * 80)
summary.append("WINNER ANALYSIS")
summary.append("=" * 80)
if len(df['model'].unique()) == 2:
model1, model2 = df['model'].unique()
# Compare metrics
metrics_comparison = []
for metric in ['generation_time', 'memory_usage']:
m1_avg = df[df['model'] == model1][metric].mean()
m2_avg = df[df['model'] == model2][metric].mean()
if m1_avg < m2_avg:
winner = model1
improvement = ((m2_avg - m1_avg) / m2_avg) * 100
else:
winner = model2
improvement = ((m1_avg - m2_avg) / m1_avg) * 100
metrics_comparison.append(
f"{metric}: {winner} ({improvement:.1f}% better)"
)
for comp in metrics_comparison:
summary.append(comp)
return "\n".join(summary)
def main():
parser = argparse.ArgumentParser(description='Benchmark BeatHeritage V1 vs V30')
parser.add_argument(
'--audio-dir',
type=str,
default='./test_audio',
help='Directory containing test audio files'
)
parser.add_argument(
'--output-dir',
type=str,
default='./benchmark_results',
help='Directory to save benchmark results'
)
parser.add_argument(
'--quick-test',
action='store_true',
help='Run quick test with limited parameters'
)
args = parser.parse_args()
# Get test audio files
audio_dir = Path(args.audio_dir)
if audio_dir.exists():
audio_files = list(audio_dir.glob('*.mp3')) + list(audio_dir.glob('*.ogg'))
else:
# Use demo files
logger.warning(f"Audio directory {audio_dir} not found, using demo files")
audio_files = ['demo.mp3'] # Fallback to demo
if args.quick_test:
# Quick test with limited parameters
audio_files = audio_files[:1]
logger.info("Running quick test with 1 audio file")
# Run benchmarks
runner = BenchmarkRunner(args.output_dir)
runner.run_benchmark_suite([str(f) for f in audio_files])
runner.generate_report()
logger.info("Benchmark complete!")
if __name__ == "__main__":
main()