| import subprocess
|
| import tempfile
|
| import os
|
| import json
|
| import shutil
|
| import time
|
| import librosa
|
| import torch
|
| import argparse
|
| import soundfile as sf
|
| from pathlib import Path
|
| import cn2an
|
|
|
|
|
| from model import SinusoidalPositionEncoder
|
| from utils.ax_model_bin import AX_SenseVoiceSmall
|
| from utils.ax_vad_bin import AX_Fsmn_vad
|
| from utils.vad_utils import merge_vad
|
| from funasr.tokenizer.sentencepiece_tokenizer import SentencepiecesTokenizer
|
|
|
|
|
|
|
| TRANSLATE_EXECUTABLE = "libtranslate/test_translate"
|
| TRANSLATE_MODEL = "libtranslate/opus-mt-en-zh.axmodel"
|
| TRANSLATE_TOKENIZER_DIR = "libtranslate/opus-mt-en-zh/"
|
|
|
|
|
| TTS_EXECUTABLE = "libmelotts/install/melotts"
|
| TTS_MODEL_DIR = "libmelotts/models"
|
| TTS_MODEL_FILES = {
|
| "g": "g-zh_mix_en.bin",
|
| "encoder": "encoder-zh.onnx",
|
| "lexicon": "lexicon.txt",
|
| "tokens": "tokens.txt",
|
| "decoder": "decoder-zh.axmodel"
|
| }
|
|
|
| class SpeechTranslationPipeline:
|
| def __init__(self,
|
| translate_exec, translate_model, translate_tokenizer,
|
| tts_exec, tts_model_dir, tts_model_files,
|
| asr_model_dir="ax_model", seq_len=132):
|
| self.translate_exec = translate_exec
|
| self.translate_model = translate_model
|
| self.translate_tokenizer = translate_tokenizer
|
| self.tts_exec = tts_exec
|
| self.tts_model_dir = tts_model_dir
|
| self.tts_model_files = tts_model_files
|
| self.asr_model_dir = asr_model_dir
|
| self.seq_len = seq_len
|
|
|
|
|
| self._init_asr_models()
|
|
|
|
|
| self._validate_files()
|
|
|
| def _init_asr_models(self):
|
| """初始化语音识别相关模型"""
|
| print("Initializing SenseVoice models...")
|
|
|
|
|
| self.model_vad = AX_Fsmn_vad(self.asr_model_dir)
|
|
|
|
|
| self.embed = SinusoidalPositionEncoder()
|
| self.position_encoding = self.embed.get_position_encoding(
|
| torch.randn(1, self.seq_len, 560)).numpy()
|
|
|
|
|
| self.model_bin = AX_SenseVoiceSmall(self.asr_model_dir, seq_len=self.seq_len)
|
|
|
|
|
| tokenizer_path = os.path.join(self.asr_model_dir, "chn_jpn_yue_eng_ko_spectok.bpe.model")
|
| self.tokenizer = SentencepiecesTokenizer(bpemodel=tokenizer_path)
|
|
|
| print("SenseVoice models initialized successfully.")
|
|
|
| def _validate_files(self):
|
| """验证所有必需的文件都存在"""
|
|
|
| if not os.path.exists(self.translate_exec):
|
| raise FileNotFoundError(f"翻译可执行文件不存在: {self.translate_exec}")
|
| if not os.path.exists(self.translate_model):
|
| raise FileNotFoundError(f"翻译模型不存在: {self.translate_model}")
|
| if not os.path.exists(self.translate_tokenizer):
|
| raise FileNotFoundError(f"翻译tokenizer目录不存在: {self.translate_tokenizer}")
|
|
|
|
|
| if not os.path.exists(self.tts_exec):
|
| raise FileNotFoundError(f"TTS可执行文件不存在: {self.tts_exec}")
|
|
|
| for key, filename in self.tts_model_files.items():
|
| filepath = os.path.join(self.tts_model_dir, filename)
|
| if not os.path.exists(filepath):
|
| raise FileNotFoundError(f"TTS模型文件不存在: {filepath}")
|
|
|
| def speech_recognition(self, speech, fs):
|
| """
|
| 第一步:语音识别(ASR)
|
| """
|
| speech_lengths = len(speech)
|
|
|
|
|
| print("Running VAD...")
|
| vad_start_time = time.time()
|
| res_vad = self.model_vad(speech)[0]
|
| vad_segments = merge_vad(res_vad, 15 * 1000)
|
| vad_time_cost = time.time() - vad_start_time
|
| print(f"VAD processing time: {vad_time_cost:.2f} seconds")
|
| print(f"VAD segments detected: {len(vad_segments)}")
|
|
|
|
|
| print("Running ASR...")
|
| asr_start_time = time.time()
|
| all_results = ""
|
|
|
|
|
| for i, segment in enumerate(vad_segments):
|
| segment_start, segment_end = segment
|
| start_sample = int(segment_start / 1000 * fs)
|
| end_sample = min(int(segment_end / 1000 * fs), speech_lengths)
|
| segment_speech = speech[start_sample:end_sample]
|
|
|
|
|
| segment_filename = f"temp_segment_{i}.wav"
|
| sf.write(segment_filename, segment_speech, fs)
|
|
|
|
|
| try:
|
| segment_res = self.model_bin(
|
| segment_filename,
|
| "auto",
|
| True,
|
| self.position_encoding,
|
| tokenizer=self.tokenizer,
|
| )
|
|
|
| all_results += segment_res
|
|
|
|
|
| if os.path.exists(segment_filename):
|
| os.remove(segment_filename)
|
|
|
| except Exception as e:
|
| if os.path.exists(segment_filename):
|
| os.remove(segment_filename)
|
| print(f"Error processing segment {i}: {e}")
|
| continue
|
|
|
| asr_time_cost = time.time() - asr_start_time
|
| print(f"ASR processing time: {asr_time_cost:.2f} seconds")
|
| print(f"ASR Result: {all_results}")
|
|
|
| return all_results.strip()
|
|
|
| def run_translation(self, english_text):
|
| """
|
| 第二步:调用翻译程序将英文翻译成中文
|
| """
|
|
|
| cmd = [
|
| self.translate_exec,
|
| "--model", self.translate_model,
|
| "--tokenizer_dir", self.translate_tokenizer,
|
| "--text", f'"{english_text}"'
|
| ]
|
|
|
| try:
|
|
|
| result = subprocess.run(
|
| cmd,
|
| capture_output=True,
|
| text=True,
|
| timeout=30
|
| )
|
|
|
|
|
| if result.returncode != 0:
|
| error_msg = f"翻译程序执行失败: {result.stderr}"
|
| raise RuntimeError(error_msg)
|
|
|
|
|
| chinese_text = result.stdout.strip()
|
|
|
|
|
|
|
|
|
| chinese_text = chinese_text.split("output: ")[-1].split("\nAX_ENGINE_Deinit")[0]
|
|
|
| print(f"翻译结果: {chinese_text}")
|
| return chinese_text
|
|
|
| except subprocess.TimeoutExpired:
|
| raise RuntimeError("翻译程序执行超时")
|
| except Exception as e:
|
| raise e
|
|
|
| def run_tts(self, chinese_text, output_dir, output_wav=None):
|
| """
|
| 第三步:调用TTS程序合成中文语音
|
| """
|
| output_path = os.path.join(output_dir, output_wav)
|
|
|
|
|
| chinese_text = cn2an.transform(chinese_text, "an2cn")
|
|
|
|
|
| cmd = [
|
| self.tts_exec,
|
| "--g", os.path.join(self.tts_model_dir, self.tts_model_files["g"]),
|
| "-e", os.path.join(self.tts_model_dir, self.tts_model_files["encoder"]),
|
| "-l", os.path.join(self.tts_model_dir, self.tts_model_files["lexicon"]),
|
| "-t", os.path.join(self.tts_model_dir, self.tts_model_files["tokens"]),
|
| "-d", os.path.join(self.tts_model_dir, self.tts_model_files["decoder"]),
|
| "-w", output_path,
|
| "-s", f'"{chinese_text}"'
|
| ]
|
|
|
| try:
|
|
|
| result = subprocess.run(
|
| cmd,
|
| capture_output=False,
|
| text=True,
|
| timeout=60
|
| )
|
|
|
|
|
| if result.returncode != 0:
|
| error_msg = f"TTS程序执行失败: {result.stderr}"
|
| raise RuntimeError(error_msg)
|
|
|
|
|
| if not os.path.exists(output_path):
|
| raise FileNotFoundError(f"输出文件未生成: {output_path}")
|
|
|
| return output_path
|
|
|
| except subprocess.TimeoutExpired:
|
| raise RuntimeError("TTS程序执行超时")
|
| except Exception as e:
|
|
|
| if output_path and os.path.exists(os.path.dirname(output_path)):
|
| shutil.rmtree(os.path.dirname(output_path))
|
| raise e
|
|
|
| def full_pipeline(self, speech, fs, output_dir=None,output_tts = None):
|
| """
|
| 完整Pipeline:语音识别 -> 翻译 -> TTS合成
|
| """
|
|
|
|
|
| print("\n----------------------VAD+ASR----------------------------\n")
|
| start_time = time.time()
|
| english_text = self.speech_recognition(speech, fs)
|
| asr_time = time.time() - start_time
|
| print(f"语音识别耗时: {asr_time:.2f} 秒")
|
|
|
|
|
| print("\n---------------------translate---------------------------\n")
|
| start_time = time.time()
|
| chinese_text = self.run_translation(english_text)
|
| translate_time = time.time() - start_time
|
| print(f"翻译耗时: {translate_time:.2f} 秒")
|
|
|
|
|
| print("-------------------------TTS-------------------------------\n")
|
| start_time = time.time()
|
| output_path = self.run_tts(chinese_text, output_dir, output_tts)
|
| tts_time = time.time() - start_time
|
| print(f"TTS合成耗时: {tts_time:.2f} 秒")
|
|
|
| return {
|
| "original_text": english_text,
|
| "translated_text": chinese_text,
|
| "audio_path": output_path
|
| }
|
|
|
| def main():
|
| parser = argparse.ArgumentParser(description="Speech Recognition, Translation and TTS Pipeline")
|
| parser.add_argument("--audio_file", type=str, required=True, help="Input audio file path")
|
| parser.add_argument("--output_dir", type=str, default="./output", help="Output directory")
|
| parser.add_argument("--output_tts", type=str, default="output.wav", help="Output directory")
|
|
|
| args = parser.parse_args()
|
| print("-------------------START------------------------\n")
|
| os.makedirs(args.output_dir ,exist_ok=True)
|
|
|
| print(f"Processing audio file: {args.audio_file}")
|
|
|
| speech, fs = librosa.load(args.audio_file, sr=None)
|
| if fs != 16000:
|
| print(f"Resampling audio from {fs}Hz to 16000Hz")
|
| speech = librosa.resample(y=speech, orig_sr=fs, target_sr=16000)
|
| fs = 16000
|
| audio_duration = librosa.get_duration(y=speech, sr=fs)
|
|
|
|
|
|
|
| pipeline = SpeechTranslationPipeline(
|
| translate_exec=TRANSLATE_EXECUTABLE,
|
| translate_model=TRANSLATE_MODEL,
|
| translate_tokenizer=TRANSLATE_TOKENIZER_DIR,
|
| tts_exec=TTS_EXECUTABLE,
|
| tts_model_dir=TTS_MODEL_DIR,
|
| tts_model_files=TTS_MODEL_FILES,
|
| asr_model_dir="ax_model",
|
| seq_len=132
|
| )
|
|
|
| start_time = time.time()
|
| try:
|
|
|
| result = pipeline.full_pipeline(speech, fs, args.output_dir, args.output_tts)
|
|
|
| print("\n" + "="*50)
|
| print("speech translate 完成!")
|
| print("="*50 + "\n")
|
| print(f"原始音频: {args.audio_file}")
|
| print(f"原始文本: {result['original_text']}")
|
| print(f"翻译文本: {result['translated_text']}")
|
| print(f"生成音频: {result['audio_path']}")
|
|
|
|
|
| result_file = os.path.join(args.output_dir, "pipeline_result.txt")
|
| with open(result_file, 'w', encoding='utf-8') as f:
|
| f.write(f"原始音频: {args.audio_file}\n")
|
| f.write(f"识别文本: {result['original_text']}\n")
|
| f.write(f"翻译结果: {result['translated_text']}\n")
|
| f.write(f"合成音频: {result['audio_path']}\n")
|
|
|
|
|
| time_cost = time.time() - start_time
|
| rtf = time_cost / audio_duration
|
| print(f"Inference time for {args.audio_file}: {time_cost:.2f} seconds")
|
| print(f"Audio duration: {audio_duration:.2f} seconds")
|
| print(f"RTF: {rtf:.2f}\n")
|
| except Exception as e:
|
| print(f"Pipeline执行失败: {e}")
|
| import traceback
|
| traceback.print_exc()
|
|
|
| if __name__ == "__main__":
|
| main()
|
|
|
| |