| import excepthook |
| import functools |
| import os |
| import platform |
| import socket |
| import subprocess |
| import sys |
| import threading |
| import time |
| import datetime |
| from typing import Callable, Any, Tuple, Dict |
|
|
| import webview |
| import werkzeug.serving |
| from flask import Flask, render_template, request, Response, jsonify |
|
|
| from config import InferenceConfig |
| from inference import autofill_paths |
|
|
| script_dir = os.path.dirname(os.path.abspath(__file__)) |
| template_folder = os.path.join(script_dir, 'template') |
| static_folder = os.path.join(script_dir, 'static') |
|
|
| if not os.path.isdir(static_folder): |
| print(f"Warning: Static folder not found at {static_folder}. Ensure it exists and contains your CSS/images.") |
|
|
|
|
| |
| |
|
|
| |
| def _ansi_style_supressor(func: Callable[..., Any]) -> Callable[..., Any]: |
| @functools.wraps(func) |
| def wrapper(*args: Tuple[Any, ...], **kwargs: Dict[str, Any]) -> Any: |
| |
| if args: |
| first_arg = args[0] |
| if isinstance(first_arg, str) and first_arg.startswith('WARNING: This is a development server.'): |
| return '' |
| |
| return func(*args, **kwargs) |
|
|
| return wrapper |
|
|
|
|
| |
| |
| werkzeug.serving._ansi_style = _ansi_style_supressor(werkzeug.serving._ansi_style) |
| |
|
|
| app = Flask(__name__, template_folder=template_folder, static_folder=static_folder) |
| app.secret_key = os.urandom(24) |
|
|
|
|
| |
| class Api: |
| |
| def save_file(self, filename): |
| """Opens a save file dialog and returns the selected file path.""" |
| |
| if not webview.windows: |
| print("Error: No pywebview window found.") |
| return None |
| current_window = webview.windows[0] |
| result = current_window.create_file_dialog(webview.SAVE_DIALOG, save_filename=filename) |
| print(f"File dialog result: {result}") |
| return result |
|
|
| def browse_file(self, file_types=None): |
| """Opens a file dialog and returns the selected file path.""" |
| |
| if not webview.windows: |
| print("Error: No pywebview window found.") |
| return None |
|
|
| current_window = webview.windows[0] |
|
|
| |
| try: |
| if file_types and isinstance(file_types, list): |
| file_types = tuple(file_types) |
|
|
| result = current_window.create_file_dialog( |
| webview.OPEN_DIALOG, |
| file_types=file_types |
| ) |
| except Exception: |
| result = current_window.create_file_dialog(webview.OPEN_DIALOG) |
|
|
| return result[0] if result else None |
|
|
| def browse_folder(self): |
| """Opens a folder dialog and returns the selected folder path.""" |
| |
| if not webview.windows: |
| print("Error: No pywebview window found.") |
| return None |
| current_window = webview.windows[0] |
| result = current_window.create_file_dialog(webview.FOLDER_DIALOG) |
| print(f"Folder dialog result: {result}") |
| |
| return result[0] if result else None |
|
|
|
|
| |
| current_process: subprocess.Popen | None = None |
| process_lock = threading.Lock() |
|
|
|
|
| |
| def dq_quote(s): |
| """Wrap the string in double quotes and escape inner double quotes.""" |
| |
| if isinstance(s, str) and s.startswith('"') and s.endswith('"'): |
| return s |
| return '"' + str(s).replace('"', '\\"') + '"' |
|
|
|
|
| |
| def dsq_quote(s): |
| """ |
| Prepares a path string for Hydra command-line override. |
| Wraps the path in single quotes, escaping internal single quotes (' -> \\'). |
| Then wraps the result in double quotes for shell safety. |
| Example: "C:/My's Folder" becomes "\"'C:/My\\'s Folder'\"" |
| """ |
| path_str = str(s) |
|
|
| |
| escaped_path = path_str.replace("'", "\\'") |
|
|
| |
| inner_quoted = "'" + escaped_path + "'" |
|
|
| |
| return '"' + inner_quoted + '"' |
|
|
|
|
| def format_list_arg(items): |
| """Formats a list of strings for the command line argument.""" |
| return "[" + ",".join("'" + str(d) + "'" for d in items) + "]" |
|
|
|
|
| |
|
|
| @app.route('/') |
| def index(): |
| """Renders the main HTML page.""" |
| |
| return render_template('index.html') |
|
|
|
|
| @app.route('/start_inference', methods=['POST']) |
| def start_inference(): |
| """Starts the inference process based on form data.""" |
| global current_process |
| with process_lock: |
| if current_process and current_process.poll() is None: |
| return jsonify({"status": "error", "message": "Process already running"}), 409 |
|
|
| |
| python_executable = sys.executable |
| cmd = [python_executable, "inference.py", "-cn"] |
|
|
| |
| model_name = request.form.get('model') |
| config_name = model_name |
| cmd.append(config_name) |
|
|
| |
| def hydra_quote(value): |
| """Quotes a value for Hydra (single quotes, escapes internal).""" |
| value_str = str(value) |
| |
| escaped_value = value_str.replace("'", r"\'") |
| return f"'{escaped_value}'" |
|
|
| |
| path_keys = {"audio_path", "output_path", "beatmap_path"} |
|
|
| |
| def add_arg(key, value): |
| if value is not None and value != '': |
| if key in path_keys: |
| |
| cmd.append(f"{key}={hydra_quote(value)}") |
| else: |
| |
| cmd.append(f"{key}={value}") |
|
|
| |
| def add_list_arg(key, items): |
| if items: |
| |
| quoted_items = [f"'{str(item)}'" for item in items] |
| items_str = ",".join(quoted_items) |
| cmd.append(f"{key}=[{items_str}]") |
|
|
| |
| add_arg("audio_path", request.form.get('audio_path')) |
| add_arg("output_path", request.form.get('output_path')) |
| |
| beatmap_path = request.form.get('beatmap_path') |
| add_arg("beatmap_path", beatmap_path) |
|
|
| |
| if 'gamemode' in request.form: |
| add_arg("gamemode", request.form.get('gamemode')) |
| else: |
| |
| add_arg("gamemode", 0) |
| add_arg("difficulty", request.form.get('difficulty')) |
| add_arg("year", request.form.get('year')) |
|
|
| |
| for param in ['hp_drain_rate', 'circle_size', 'overall_difficulty', 'approach_rate', 'slider_multiplier', |
| 'slider_tick_rate', 'keycount', 'hold_note_ratio', 'scroll_speed_ratio', |
| 'cfg_scale', 'temperature', 'top_p', 'seed']: |
| add_arg(param, request.form.get(param)) |
| |
| add_arg("mapper_id", request.form.get('mapper_id')) |
|
|
| |
| for param in ['start_time', 'end_time']: |
| add_arg(param, request.form.get(param)) |
|
|
| |
| if 'export_osz' in request.form: |
| cmd.append("export_osz=true") |
| else : |
| cmd.append("export_osz=false") |
| if 'add_to_beatmap' in request.form: |
| cmd.append("add_to_beatmap=true") |
| else: |
| cmd.append("add_to_beatmap=false") |
| if 'hitsounded' in request.form: |
| cmd.append("hitsounded=true") |
| else: |
| cmd.append("hitsounded=false") |
| if 'super_timing' in request.form: |
| cmd.append("super_timing=true") |
| else: |
| cmd.append("super_timing=false") |
|
|
| |
| descriptors = request.form.getlist('descriptors') |
| add_list_arg("descriptors", descriptors) |
|
|
| |
| negative_descriptors = request.form.getlist('negative_descriptors') |
| add_list_arg("negative_descriptors", negative_descriptors) |
|
|
| |
| in_context_options = request.form.getlist('in_context_options') |
| if in_context_options and beatmap_path: |
| add_list_arg("in_context", in_context_options) |
| |
|
|
| print("Executing Command List (shell=False):", cmd) |
|
|
| try: |
| |
| current_process = subprocess.Popen( |
| cmd, |
| shell=False, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT, |
| bufsize=1, |
| universal_newlines=True, |
| encoding='utf-8', |
| errors='replace' |
| ) |
| print(f"Started process with PID: {current_process.pid}") |
| |
| return jsonify({"status": "success", "message": "Inference started"}), 202 |
|
|
| except Exception as e: |
| print(f"Error starting subprocess: {e}") |
| current_process = None |
| return jsonify({"status": "error", "message": f"Failed to start process: {e}"}), 500 |
|
|
|
|
| @app.route('/stream_output') |
| def stream_output(): |
| """Streams the output of the running inference process using SSE.""" |
|
|
| def generate(): |
| global current_process |
| process_to_stream = None |
|
|
| |
| with process_lock: |
| if current_process and current_process.poll() is None: |
| process_to_stream = current_process |
| print(f"Attempting to stream output for PID: {process_to_stream.pid}") |
| else: |
| |
| print("Stream requested but no active process found or process already finished.") |
| yield "event: end\ndata: No active process or process already finished\n\n" |
| return |
|
|
| |
| if process_to_stream: |
| print(f"Streaming output for PID: {process_to_stream.pid}") |
| full_output_lines = [] |
| error_occurred = False |
| log_filepath = None |
|
|
| try: |
| |
| for line in iter(process_to_stream.stdout.readline, ""): |
| full_output_lines.append(line) |
| yield f"data: {line.rstrip()}\n\n" |
| sys.stdout.flush() |
|
|
| |
| process_to_stream.stdout.close() |
| return_code = process_to_stream.wait() |
| print(f"Process {process_to_stream.pid} finished streaming with exit code: {return_code}") |
|
|
| if return_code != 0: |
| error_occurred = True |
| print(f"Non-zero exit code ({return_code}) detected for PID {process_to_stream.pid}. Marking as error.") |
|
|
| except Exception as e: |
| print(f"Error during streaming for PID {process_to_stream.pid}: {e}") |
| error_occurred = True |
| full_output_lines.append(f"\n--- STREAMING ERROR ---\n{e}\n") |
| finally: |
| |
| if error_occurred: |
| try: |
| log_dir = os.path.join(script_dir, 'logs') |
| os.makedirs(log_dir, exist_ok=True) |
| timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
| log_filename = f"error_{process_to_stream.pid}_{timestamp}.log" |
| log_filepath = os.path.join(log_dir, log_filename) |
| error_content = "".join(full_output_lines) |
|
|
| with open(log_filepath, 'w', encoding='utf-8') as f: |
| f.write(error_content) |
| print(f"Error log saved for PID {process_to_stream.pid} to: {log_filepath}") |
| yield f"event: error_log\ndata: {log_filepath.replace(os.sep, '/')}\n\n" |
|
|
| except Exception as log_e: |
| print(f"FATAL: Could not write error log for PID {process_to_stream.pid}: {log_e}") |
|
|
| |
| completion_message = "Process completed" |
| if error_occurred: |
| completion_message += " with errors" |
| yield f"event: end\ndata: {completion_message}\n\n" |
| print(f"Finished streaming for PID: {process_to_stream.pid}. Sent 'end' event.") |
|
|
| |
| with process_lock: |
| if current_process == process_to_stream: |
| current_process = None |
| print("Cleared global current_process reference.") |
| else: |
| print(f"Stale process {process_to_stream.pid} finished streaming, global reference was already updated/cleared.") |
|
|
| return Response(generate(), mimetype='text/event-stream') |
|
|
|
|
| @app.route('/cancel_inference', methods=['POST']) |
| def cancel_inference(): |
| """Attempts to terminate the currently running inference process.""" |
| global current_process |
| message = "" |
| success = False |
| status_code = 500 |
|
|
| with process_lock: |
| if current_process and current_process.poll() is None: |
| try: |
| pid = current_process.pid |
| print(f"Attempting to terminate process PID: {pid}...") |
| current_process.terminate() |
| |
| try: |
| current_process.wait(timeout=1) |
| print(f"Process PID: {pid} terminated successfully after request.") |
| message = "Cancel request sent, process terminated." |
| except subprocess.TimeoutExpired: |
| print(f"Process PID: {pid} did not terminate immediately after SIGTERM.") |
| message = "Cancel request sent. Process termination might take a moment." |
| |
|
|
| success = True |
| status_code = 200 |
| |
| except Exception as e: |
| print(f"Error terminating process: {e}") |
| message = f"Error occurred during cancellation: {e}" |
| success = False |
| status_code = 500 |
| elif current_process: |
| message = "Process already finished." |
| success = False |
| status_code = 409 |
| else: |
| message = "No process is currently running." |
| success = False |
| status_code = 404 |
|
|
| if success: |
| return jsonify({"status": "success", "message": message}), status_code |
| else: |
| return jsonify({"status": "error", "message": message}), status_code |
|
|
|
|
| @app.route('/open_folder', methods=['GET']) |
| def open_folder(): |
| """Opens a folder in the file explorer.""" |
| folder_path = request.args.get('folder') |
| print(f"Request received to open folder: {folder_path}") |
| if not folder_path: |
| return jsonify({"status": "error", "message": "No folder path specified"}), 400 |
|
|
| |
| abs_folder_path = os.path.abspath(folder_path) |
|
|
| |
| |
| workspace_root = os.path.abspath(script_dir) |
| |
| |
| |
| |
|
|
| if not os.path.isdir(abs_folder_path): |
| print(f"Invalid folder path provided or folder does not exist: {abs_folder_path}") |
| return jsonify({"status": "error", "message": "Invalid or non-existent folder path specified"}), 400 |
|
|
| try: |
| system = platform.system() |
| if system == 'Windows': |
| os.startfile(os.path.normpath(abs_folder_path)) |
| elif system == 'Darwin': |
| subprocess.Popen(['open', abs_folder_path]) |
| else: |
| subprocess.Popen(['xdg-open', abs_folder_path]) |
| print(f"Successfully requested to open folder: {abs_folder_path}") |
| return jsonify({"status": "success", "message": "Folder open request sent."}), 200 |
| except Exception as e: |
| print(f"Error opening folder '{abs_folder_path}': {e}") |
| return jsonify({"status": "error", "message": f"Could not open folder: {e}"}), 500 |
|
|
|
|
| @app.route('/open_log_file', methods=['GET']) |
| def open_log_file(): |
| """Opens a specific log file.""" |
| log_path = request.args.get('path') |
| print(f"Request received to open log file: {log_path}") |
| if not log_path: |
| return jsonify({"status": "error", "message": "No log file path specified"}), 400 |
|
|
| |
| log_dir = os.path.abspath(os.path.join(script_dir, 'logs')) |
| |
| abs_log_path = os.path.abspath(os.path.normpath(log_path)) |
|
|
| |
| if not abs_log_path.startswith(log_dir + os.sep): |
| print(f"Security Alert: Attempt to open file outside of logs directory: {abs_log_path} (Log dir: {log_dir})") |
| return jsonify({"status": "error", "message": "Access denied: File is outside the designated logs directory."}), 403 |
|
|
| if not os.path.isfile(abs_log_path): |
| print(f"Log file not found at: {abs_log_path}") |
| return jsonify({"status": "error", "message": "Log file not found."}), 404 |
|
|
| try: |
| system = platform.system() |
| if system == 'Windows': |
| os.startfile(abs_log_path) |
| elif system == 'Darwin': |
| subprocess.Popen(['open', abs_log_path]) |
| else: |
| subprocess.Popen(['xdg-open', abs_log_path]) |
| print(f"Successfully requested to open log file: {abs_log_path}") |
| return jsonify({"status": "success", "message": "Log file open request sent."}), 200 |
| except Exception as e: |
| print(f"Error opening log file '{abs_log_path}': {e}") |
| return jsonify({"status": "error", "message": f"Could not open log file: {e}"}), 500 |
|
|
|
|
| @app.route('/save_config', methods=['POST']) |
| def save_config(): |
| try: |
| file_path = request.form.get('file_path') |
| config_data = request.form.get('config_data') |
|
|
| if not file_path or not config_data: |
| return jsonify({'success': False, 'error': 'Missing required parameters'}) |
|
|
| |
| with open(file_path, 'w', encoding='utf-8') as f: |
| f.write(config_data) |
|
|
| return jsonify({ |
| 'success': True, |
| 'file_path': file_path, |
| 'message': 'Configuration saved successfully' |
| }) |
|
|
| except Exception as e: |
| return jsonify({ |
| 'success': False, |
| 'error': f'Failed to save configuration: {str(e)}' |
| }) |
|
|
|
|
| @app.route('/validate_paths', methods=['POST']) |
| def validate_paths(): |
| """Validates and autofills missing paths.""" |
| try: |
| |
| audio_path = request.form.get('audio_path', '').strip() |
| beatmap_path = request.form.get('beatmap_path', '').strip() |
| output_path = request.form.get('output_path', '').strip() |
|
|
| inference_args = InferenceConfig() |
| inference_args.audio_path = audio_path |
| inference_args.beatmap_path = beatmap_path |
| inference_args.output_path = output_path |
|
|
| result = autofill_paths(inference_args) |
|
|
| |
| response_data = { |
| 'success': result['success'], |
| 'autofilled_audio_path': inference_args.audio_path, |
| 'autofilled_output_path': inference_args.output_path, |
| 'errors': result['errors'] |
| } |
|
|
| return jsonify(response_data), 200 |
|
|
| except Exception as e: |
| error_msg = f"Error during path validation: {str(e)}" |
| print(f"Path validation error: {error_msg}") |
| return jsonify({ |
| 'success': False, |
| 'errors': [error_msg], |
| 'autofilled_audio_path': None, |
| 'autofilled_output_path': None |
| }), 500 |
|
|
|
|
| |
| def run_flask(port): |
| """Runs the Flask app.""" |
|
|
| |
| |
| print(f"Starting Flask server on http://127.0.0.1:{port}") |
| try: |
| |
| app.run(host='127.0.0.1', port=port, threaded=True, debug=False) |
| except OSError as e: |
| print(f"Flask server could not start on port {port}: {e}") |
| |
|
|
|
|
| |
| def find_available_port(start_port=5000, max_tries=100): |
| """Finds an available TCP port.""" |
| for port in range(start_port, start_port + max_tries): |
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: |
| try: |
| s.bind(('127.0.0.1', port)) |
| print(f"Found available port: {port}") |
| return port |
| except OSError: |
| continue |
| raise IOError("Could not find an available port.") |
|
|
|
|
| |
| if __name__ == '__main__': |
| |
| flask_port = find_available_port() |
|
|
| |
| flask_thread = threading.Thread(target=run_flask, args=(flask_port,), daemon=True) |
| flask_thread.start() |
|
|
| |
| time.sleep(1) |
|
|
| |
| try: |
| primary_screen = webview.screens[0] |
| screen_width = primary_screen.width |
| screen_height = primary_screen.height |
| |
| window_width = int(screen_width * 0.45) |
| window_height = int(screen_height * 0.95) |
| print(f"Screen: {screen_width}x{screen_height}, Window: {window_width}x{window_height}") |
| except Exception as e: |
| print(f"Could not get screen dimensions, using default: {e}") |
| |
| window_width = 900 |
| window_height = 1000 |
| |
|
|
| |
| window_title = 'Mapperatorinator' |
| flask_url = f'http://127.0.0.1:{flask_port}/' |
|
|
| print(f"Creating pywebview window loading URL: {flask_url}") |
|
|
| |
| api = Api() |
|
|
| |
| window = webview.create_window( |
| window_title, |
| url=flask_url, |
| width=window_width, |
| height=window_height, |
| resizable=True, |
| js_api=api |
| ) |
|
|
| |
| webview.start() |
|
|
| print("Pywebview window closed. Exiting application.") |
| |
|
|