import subprocess import tempfile import os import json import shutil import time import librosa import torch import argparse import soundfile as sf from pathlib import Path import cn2an # 导入SenseVoice相关模块 from model import SinusoidalPositionEncoder from utils.ax_model_bin import AX_SenseVoiceSmall from utils.ax_vad_bin import AX_Fsmn_vad from utils.vad_utils import merge_vad from funasr.tokenizer.sentencepiece_tokenizer import SentencepiecesTokenizer # 配置参数 # translate 参数 TRANSLATE_EXECUTABLE = "libtranslate/test_translate" TRANSLATE_MODEL = "libtranslate/opus-mt-en-zh.axmodel" TRANSLATE_TOKENIZER_DIR = "libtranslate/opus-mt-en-zh/" # tts 参数 TTS_EXECUTABLE = "libmelotts/install/melotts" TTS_MODEL_DIR = "libmelotts/models" TTS_MODEL_FILES = { "g": "g-zh_mix_en.bin", "encoder": "encoder-zh.onnx", "lexicon": "lexicon.txt", "tokens": "tokens.txt", "decoder": "decoder-zh.axmodel" } class SpeechTranslationPipeline: def __init__(self, translate_exec, translate_model, translate_tokenizer, tts_exec, tts_model_dir, tts_model_files, asr_model_dir="ax_model", seq_len=132): self.translate_exec = translate_exec self.translate_model = translate_model self.translate_tokenizer = translate_tokenizer self.tts_exec = tts_exec self.tts_model_dir = tts_model_dir self.tts_model_files = tts_model_files self.asr_model_dir = asr_model_dir self.seq_len = seq_len # 初始化ASR模型 self._init_asr_models() # 验证所有必需文件存在 self._validate_files() def _init_asr_models(self): """初始化语音识别相关模型""" print("Initializing SenseVoice models...") # VAD模型 self.model_vad = AX_Fsmn_vad(self.asr_model_dir) # 位置编码 self.embed = SinusoidalPositionEncoder() self.position_encoding = self.embed.get_position_encoding( torch.randn(1, self.seq_len, 560)).numpy() # ASR模型 self.model_bin = AX_SenseVoiceSmall(self.asr_model_dir, seq_len=self.seq_len) # Tokenizer tokenizer_path = os.path.join(self.asr_model_dir, "chn_jpn_yue_eng_ko_spectok.bpe.model") self.tokenizer = SentencepiecesTokenizer(bpemodel=tokenizer_path) print("SenseVoice models initialized successfully.") def _validate_files(self): """验证所有必需的文件都存在""" # 检查翻译相关文件 if not os.path.exists(self.translate_exec): raise FileNotFoundError(f"翻译可执行文件不存在: {self.translate_exec}") if not os.path.exists(self.translate_model): raise FileNotFoundError(f"翻译模型不存在: {self.translate_model}") if not os.path.exists(self.translate_tokenizer): raise FileNotFoundError(f"翻译tokenizer目录不存在: {self.translate_tokenizer}") # 检查TTS相关文件 if not os.path.exists(self.tts_exec): raise FileNotFoundError(f"TTS可执行文件不存在: {self.tts_exec}") for key, filename in self.tts_model_files.items(): filepath = os.path.join(self.tts_model_dir, filename) if not os.path.exists(filepath): raise FileNotFoundError(f"TTS模型文件不存在: {filepath}") def speech_recognition(self, speech, fs): """ 第一步:语音识别(ASR) """ speech_lengths = len(speech) # VAD处理 print("Running VAD...") vad_start_time = time.time() res_vad = self.model_vad(speech)[0] vad_segments = merge_vad(res_vad, 15 * 1000) vad_time_cost = time.time() - vad_start_time print(f"VAD processing time: {vad_time_cost:.2f} seconds") print(f"VAD segments detected: {len(vad_segments)}") # ASR处理 print("Running ASR...") asr_start_time = time.time() all_results = "" # 遍历每个VAD片段并处理 for i, segment in enumerate(vad_segments): segment_start, segment_end = segment start_sample = int(segment_start / 1000 * fs) end_sample = min(int(segment_end / 1000 * fs), speech_lengths) segment_speech = speech[start_sample:end_sample] # 为当前片段创建临时文件 segment_filename = f"temp_segment_{i}.wav" sf.write(segment_filename, segment_speech, fs) # 对当前片段进行识别 try: segment_res = self.model_bin( segment_filename, "auto", # 语言自动检测 True, # withitn self.position_encoding, tokenizer=self.tokenizer, ) all_results += segment_res # 清理临时文件 if os.path.exists(segment_filename): os.remove(segment_filename) except Exception as e: if os.path.exists(segment_filename): os.remove(segment_filename) print(f"Error processing segment {i}: {e}") continue asr_time_cost = time.time() - asr_start_time print(f"ASR processing time: {asr_time_cost:.2f} seconds") print(f"ASR Result: {all_results}") return all_results.strip() def run_translation(self, english_text): """ 第二步:调用翻译程序将英文翻译成中文 """ # 构建命令参数 cmd = [ self.translate_exec, "--model", self.translate_model, "--tokenizer_dir", self.translate_tokenizer, "--text", f'"{english_text}"' # 添加引号处理包含空格和特殊字符的文本 ] try: # 执行命令 result = subprocess.run( cmd, capture_output=True, text=True, timeout=30 # 设置超时时间,单位秒 ) # 检查执行结果 if result.returncode != 0: error_msg = f"翻译程序执行失败: {result.stderr}" raise RuntimeError(error_msg) # 提取翻译结果 chinese_text = result.stdout.strip() # 清理可能的额外输出 # if "翻译结果:" in chinese_text: # chinese_text = chinese_text.split("翻译结果:", 1)[-1].strip() chinese_text = chinese_text.split("output: ")[-1].split("\nAX_ENGINE_Deinit")[0] print(f"翻译结果: {chinese_text}") return chinese_text except subprocess.TimeoutExpired: raise RuntimeError("翻译程序执行超时") except Exception as e: raise e def run_tts(self, chinese_text, output_dir, output_wav=None): """ 第三步:调用TTS程序合成中文语音 """ output_path = os.path.join(output_dir, output_wav) #chinese_text = chinese_text.split("output: ")[-1].split("\nAX_ENGINE_Deinit")[0] chinese_text = cn2an.transform(chinese_text, "an2cn") # 构建命令参数 cmd = [ self.tts_exec, "--g", os.path.join(self.tts_model_dir, self.tts_model_files["g"]), "-e", os.path.join(self.tts_model_dir, self.tts_model_files["encoder"]), "-l", os.path.join(self.tts_model_dir, self.tts_model_files["lexicon"]), "-t", os.path.join(self.tts_model_dir, self.tts_model_files["tokens"]), "-d", os.path.join(self.tts_model_dir, self.tts_model_files["decoder"]), "-w", output_path, "-s", f'"{chinese_text}"' ] try: # 执行命令 result = subprocess.run( cmd, capture_output=False, text=True, timeout=60 # TTS可能需要更长时间 ) # 检查执行结果 if result.returncode != 0: error_msg = f"TTS程序执行失败: {result.stderr}" raise RuntimeError(error_msg) # 验证输出文件是否存在 if not os.path.exists(output_path): raise FileNotFoundError(f"输出文件未生成: {output_path}") return output_path except subprocess.TimeoutExpired: raise RuntimeError("TTS程序执行超时") except Exception as e: # 清理临时文件 if output_path and os.path.exists(os.path.dirname(output_path)): shutil.rmtree(os.path.dirname(output_path)) raise e def full_pipeline(self, speech, fs, output_dir=None,output_tts = None): """ 完整Pipeline:语音识别 -> 翻译 -> TTS合成 """ # 第一步:语音识别 print("\n----------------------VAD+ASR----------------------------\n") start_time = time.time() # 记录开始时间 english_text = self.speech_recognition(speech, fs) asr_time = time.time() - start_time # 计算耗时 print(f"语音识别耗时: {asr_time:.2f} 秒") # 第二步:翻译 print("\n---------------------translate---------------------------\n") start_time = time.time() # 记录开始时间 chinese_text = self.run_translation(english_text) translate_time = time.time() - start_time # 计算耗时 print(f"翻译耗时: {translate_time:.2f} 秒") # 第三步:TTS合成 print("-------------------------TTS-------------------------------\n") start_time = time.time() # 记录开始时间 output_path = self.run_tts(chinese_text, output_dir, output_tts) tts_time = time.time() - start_time # 计算耗时 print(f"TTS合成耗时: {tts_time:.2f} 秒") return { "original_text": english_text, "translated_text": chinese_text, "audio_path": output_path } def main(): parser = argparse.ArgumentParser(description="Speech Recognition, Translation and TTS Pipeline") parser.add_argument("--audio_file", type=str, required=True, help="Input audio file path") parser.add_argument("--output_dir", type=str, default="./output", help="Output directory") parser.add_argument("--output_tts", type=str, default="output.wav", help="Output directory") args = parser.parse_args() print("-------------------START------------------------\n") os.makedirs(args.output_dir ,exist_ok=True) print(f"Processing audio file: {args.audio_file}") # 加载音频 speech, fs = librosa.load(args.audio_file, sr=None) if fs != 16000: print(f"Resampling audio from {fs}Hz to 16000Hz") speech = librosa.resample(y=speech, orig_sr=fs, target_sr=16000) fs = 16000 audio_duration = librosa.get_duration(y=speech, sr=fs) # 初始化 pipeline = SpeechTranslationPipeline( translate_exec=TRANSLATE_EXECUTABLE, translate_model=TRANSLATE_MODEL, translate_tokenizer=TRANSLATE_TOKENIZER_DIR, tts_exec=TTS_EXECUTABLE, tts_model_dir=TTS_MODEL_DIR, tts_model_files=TTS_MODEL_FILES, asr_model_dir="ax_model", seq_len=132 ) start_time = time.time() try: # 运行 result = pipeline.full_pipeline(speech, fs, args.output_dir, args.output_tts) print("\n" + "="*50) print("speech translate 完成!") print("="*50 + "\n") print(f"原始音频: {args.audio_file}") print(f"原始文本: {result['original_text']}") print(f"翻译文本: {result['translated_text']}") print(f"生成音频: {result['audio_path']}") # 保存结果到文件 result_file = os.path.join(args.output_dir, "pipeline_result.txt") with open(result_file, 'w', encoding='utf-8') as f: f.write(f"原始音频: {args.audio_file}\n") f.write(f"识别文本: {result['original_text']}\n") f.write(f"翻译结果: {result['translated_text']}\n") f.write(f"合成音频: {result['audio_path']}\n") # print(f"\n详细结果已保存到: {result_file}") time_cost = time.time() - start_time rtf = time_cost / audio_duration print(f"Inference time for {args.audio_file}: {time_cost:.2f} seconds") print(f"Audio duration: {audio_duration:.2f} seconds") print(f"RTF: {rtf:.2f}\n") except Exception as e: print(f"Pipeline执行失败: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()