File size: 13,623 Bytes
1ebaeb9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 | import subprocess
import tempfile
import os
import json
import shutil
import time
import librosa
import torch
import argparse
import soundfile as sf
from pathlib import Path
import cn2an
# 导入SenseVoice相关模块
from model import SinusoidalPositionEncoder
from utils.ax_model_bin import AX_SenseVoiceSmall
from utils.ax_vad_bin import AX_Fsmn_vad
from utils.vad_utils import merge_vad
from funasr.tokenizer.sentencepiece_tokenizer import SentencepiecesTokenizer
# 配置参数
# translate 参数
TRANSLATE_EXECUTABLE = "libtranslate/test_translate"
TRANSLATE_MODEL = "libtranslate/opus-mt-en-zh.axmodel"
TRANSLATE_TOKENIZER_DIR = "libtranslate/opus-mt-en-zh/"
# tts 参数
TTS_EXECUTABLE = "libmelotts/install/melotts"
TTS_MODEL_DIR = "libmelotts/models"
TTS_MODEL_FILES = {
"g": "g-zh_mix_en.bin",
"encoder": "encoder-zh.onnx",
"lexicon": "lexicon.txt",
"tokens": "tokens.txt",
"decoder": "decoder-zh.axmodel"
}
class SpeechTranslationPipeline:
def __init__(self,
translate_exec, translate_model, translate_tokenizer,
tts_exec, tts_model_dir, tts_model_files,
asr_model_dir="ax_model", seq_len=132):
self.translate_exec = translate_exec
self.translate_model = translate_model
self.translate_tokenizer = translate_tokenizer
self.tts_exec = tts_exec
self.tts_model_dir = tts_model_dir
self.tts_model_files = tts_model_files
self.asr_model_dir = asr_model_dir
self.seq_len = seq_len
# 初始化ASR模型
self._init_asr_models()
# 验证所有必需文件存在
self._validate_files()
def _init_asr_models(self):
"""初始化语音识别相关模型"""
print("Initializing SenseVoice models...")
# VAD模型
self.model_vad = AX_Fsmn_vad(self.asr_model_dir)
# 位置编码
self.embed = SinusoidalPositionEncoder()
self.position_encoding = self.embed.get_position_encoding(
torch.randn(1, self.seq_len, 560)).numpy()
# ASR模型
self.model_bin = AX_SenseVoiceSmall(self.asr_model_dir, seq_len=self.seq_len)
# Tokenizer
tokenizer_path = os.path.join(self.asr_model_dir, "chn_jpn_yue_eng_ko_spectok.bpe.model")
self.tokenizer = SentencepiecesTokenizer(bpemodel=tokenizer_path)
print("SenseVoice models initialized successfully.")
def _validate_files(self):
"""验证所有必需的文件都存在"""
# 检查翻译相关文件
if not os.path.exists(self.translate_exec):
raise FileNotFoundError(f"翻译可执行文件不存在: {self.translate_exec}")
if not os.path.exists(self.translate_model):
raise FileNotFoundError(f"翻译模型不存在: {self.translate_model}")
if not os.path.exists(self.translate_tokenizer):
raise FileNotFoundError(f"翻译tokenizer目录不存在: {self.translate_tokenizer}")
# 检查TTS相关文件
if not os.path.exists(self.tts_exec):
raise FileNotFoundError(f"TTS可执行文件不存在: {self.tts_exec}")
for key, filename in self.tts_model_files.items():
filepath = os.path.join(self.tts_model_dir, filename)
if not os.path.exists(filepath):
raise FileNotFoundError(f"TTS模型文件不存在: {filepath}")
def speech_recognition(self, speech, fs):
"""
第一步:语音识别(ASR)
"""
speech_lengths = len(speech)
# VAD处理
print("Running VAD...")
vad_start_time = time.time()
res_vad = self.model_vad(speech)[0]
vad_segments = merge_vad(res_vad, 15 * 1000)
vad_time_cost = time.time() - vad_start_time
print(f"VAD processing time: {vad_time_cost:.2f} seconds")
print(f"VAD segments detected: {len(vad_segments)}")
# ASR处理
print("Running ASR...")
asr_start_time = time.time()
all_results = ""
# 遍历每个VAD片段并处理
for i, segment in enumerate(vad_segments):
segment_start, segment_end = segment
start_sample = int(segment_start / 1000 * fs)
end_sample = min(int(segment_end / 1000 * fs), speech_lengths)
segment_speech = speech[start_sample:end_sample]
# 为当前片段创建临时文件
segment_filename = f"temp_segment_{i}.wav"
sf.write(segment_filename, segment_speech, fs)
# 对当前片段进行识别
try:
segment_res = self.model_bin(
segment_filename,
"auto", # 语言自动检测
True, # withitn
self.position_encoding,
tokenizer=self.tokenizer,
)
all_results += segment_res
# 清理临时文件
if os.path.exists(segment_filename):
os.remove(segment_filename)
except Exception as e:
if os.path.exists(segment_filename):
os.remove(segment_filename)
print(f"Error processing segment {i}: {e}")
continue
asr_time_cost = time.time() - asr_start_time
print(f"ASR processing time: {asr_time_cost:.2f} seconds")
print(f"ASR Result: {all_results}")
return all_results.strip()
def run_translation(self, english_text):
"""
第二步:调用翻译程序将英文翻译成中文
"""
# 构建命令参数
cmd = [
self.translate_exec,
"--model", self.translate_model,
"--tokenizer_dir", self.translate_tokenizer,
"--text", f'"{english_text}"' # 添加引号处理包含空格和特殊字符的文本
]
try:
# 执行命令
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30 # 设置超时时间,单位秒
)
# 检查执行结果
if result.returncode != 0:
error_msg = f"翻译程序执行失败: {result.stderr}"
raise RuntimeError(error_msg)
# 提取翻译结果
chinese_text = result.stdout.strip()
# 清理可能的额外输出
# if "翻译结果:" in chinese_text:
# chinese_text = chinese_text.split("翻译结果:", 1)[-1].strip()
chinese_text = chinese_text.split("output: ")[-1].split("\nAX_ENGINE_Deinit")[0]
print(f"翻译结果: {chinese_text}")
return chinese_text
except subprocess.TimeoutExpired:
raise RuntimeError("翻译程序执行超时")
except Exception as e:
raise e
def run_tts(self, chinese_text, output_dir, output_wav=None):
"""
第三步:调用TTS程序合成中文语音
"""
output_path = os.path.join(output_dir, output_wav)
#chinese_text = chinese_text.split("output: ")[-1].split("\nAX_ENGINE_Deinit")[0]
chinese_text = cn2an.transform(chinese_text, "an2cn")
# 构建命令参数
cmd = [
self.tts_exec,
"--g", os.path.join(self.tts_model_dir, self.tts_model_files["g"]),
"-e", os.path.join(self.tts_model_dir, self.tts_model_files["encoder"]),
"-l", os.path.join(self.tts_model_dir, self.tts_model_files["lexicon"]),
"-t", os.path.join(self.tts_model_dir, self.tts_model_files["tokens"]),
"-d", os.path.join(self.tts_model_dir, self.tts_model_files["decoder"]),
"-w", output_path,
"-s", f'"{chinese_text}"'
]
try:
# 执行命令
result = subprocess.run(
cmd,
capture_output=False,
text=True,
timeout=60 # TTS可能需要更长时间
)
# 检查执行结果
if result.returncode != 0:
error_msg = f"TTS程序执行失败: {result.stderr}"
raise RuntimeError(error_msg)
# 验证输出文件是否存在
if not os.path.exists(output_path):
raise FileNotFoundError(f"输出文件未生成: {output_path}")
return output_path
except subprocess.TimeoutExpired:
raise RuntimeError("TTS程序执行超时")
except Exception as e:
# 清理临时文件
if output_path and os.path.exists(os.path.dirname(output_path)):
shutil.rmtree(os.path.dirname(output_path))
raise e
def full_pipeline(self, speech, fs, output_dir=None,output_tts = None):
"""
完整Pipeline:语音识别 -> 翻译 -> TTS合成
"""
# 第一步:语音识别
print("\n----------------------VAD+ASR----------------------------\n")
start_time = time.time() # 记录开始时间
english_text = self.speech_recognition(speech, fs)
asr_time = time.time() - start_time # 计算耗时
print(f"语音识别耗时: {asr_time:.2f} 秒")
# 第二步:翻译
print("\n---------------------translate---------------------------\n")
start_time = time.time() # 记录开始时间
chinese_text = self.run_translation(english_text)
translate_time = time.time() - start_time # 计算耗时
print(f"翻译耗时: {translate_time:.2f} 秒")
# 第三步:TTS合成
print("-------------------------TTS-------------------------------\n")
start_time = time.time() # 记录开始时间
output_path = self.run_tts(chinese_text, output_dir, output_tts)
tts_time = time.time() - start_time # 计算耗时
print(f"TTS合成耗时: {tts_time:.2f} 秒")
return {
"original_text": english_text,
"translated_text": chinese_text,
"audio_path": output_path
}
def main():
parser = argparse.ArgumentParser(description="Speech Recognition, Translation and TTS Pipeline")
parser.add_argument("--audio_file", type=str, required=True, help="Input audio file path")
parser.add_argument("--output_dir", type=str, default="./output", help="Output directory")
parser.add_argument("--output_tts", type=str, default="output.wav", help="Output directory")
args = parser.parse_args()
print("-------------------START------------------------\n")
os.makedirs(args.output_dir ,exist_ok=True)
print(f"Processing audio file: {args.audio_file}")
# 加载音频
speech, fs = librosa.load(args.audio_file, sr=None)
if fs != 16000:
print(f"Resampling audio from {fs}Hz to 16000Hz")
speech = librosa.resample(y=speech, orig_sr=fs, target_sr=16000)
fs = 16000
audio_duration = librosa.get_duration(y=speech, sr=fs)
# 初始化
pipeline = SpeechTranslationPipeline(
translate_exec=TRANSLATE_EXECUTABLE,
translate_model=TRANSLATE_MODEL,
translate_tokenizer=TRANSLATE_TOKENIZER_DIR,
tts_exec=TTS_EXECUTABLE,
tts_model_dir=TTS_MODEL_DIR,
tts_model_files=TTS_MODEL_FILES,
asr_model_dir="ax_model",
seq_len=132
)
start_time = time.time()
try:
# 运行
result = pipeline.full_pipeline(speech, fs, args.output_dir, args.output_tts)
print("\n" + "="*50)
print("speech translate 完成!")
print("="*50 + "\n")
print(f"原始音频: {args.audio_file}")
print(f"原始文本: {result['original_text']}")
print(f"翻译文本: {result['translated_text']}")
print(f"生成音频: {result['audio_path']}")
# 保存结果到文件
result_file = os.path.join(args.output_dir, "pipeline_result.txt")
with open(result_file, 'w', encoding='utf-8') as f:
f.write(f"原始音频: {args.audio_file}\n")
f.write(f"识别文本: {result['original_text']}\n")
f.write(f"翻译结果: {result['translated_text']}\n")
f.write(f"合成音频: {result['audio_path']}\n")
# print(f"\n详细结果已保存到: {result_file}")
time_cost = time.time() - start_time
rtf = time_cost / audio_duration
print(f"Inference time for {args.audio_file}: {time_cost:.2f} seconds")
print(f"Audio duration: {audio_duration:.2f} seconds")
print(f"RTF: {rtf:.2f}\n")
except Exception as e:
print(f"Pipeline执行失败: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()
|