File size: 11,478 Bytes
83ac874 db88be5 83ac874 db88be5 83ac874 42a6235 83ac874 d9cad98 4a1e151 83ac874 af0842c 8150db4 83ac874 af0842c e4cf6e1 8150db4 af0842c e4cf6e1 af0842c 83ac874 1ec58e2 42a6235 e4cf6e1 83ac874 e4cf6e1 1ddb4e8 42a6235 db88be5 e4cf6e1 42a6235 e4cf6e1 42a6235 4e27563 83ac874 3eb70c5 83ac874 42a6235 cf1843b 83ac874 cf1843b 42a6235 cf1843b 42a6235 3eb70c5 42a6235 83ac874 e4cf6e1 42a6235 83ac874 e4cf6e1 83ac874 e4cf6e1 83ac874 42a6235 83ac874 42a6235 83ac874 42a6235 e4cf6e1 42a6235 cf1843b 83ac874 cf1843b db88be5 83ac874 af0842c 83ac874 42a6235 83ac874 42a6235 83ac874 8150db4 83ac874 8150db4 83ac874 e2d623f 83ac874 73c0690 83ac874 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 | # app.py - نسخه کامل و نهایی برای تمام اسپیسهای Hugging Face
import os
import sys
import traceback
import re
import struct
import time
import uuid
import shutil
import logging
import mimetypes
import threading
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from google import genai
from google.genai import types
# اضافه کردن uvicorn برای اجرا از داخل اسکریپت
import uvicorn
try:
from pydub import AudioSegment
PYDUB_AVAILABLE = True
except ImportError:
PYDUB_AVAILABLE = False
# --- پیکربندی لاگینگ ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
# --- START: تعریف تمام توابع کمکی ---
# --- منطق مدیریت API Key ---
ALL_API_KEYS: list[str] = []
NEXT_KEY_INDEX: int = 0
KEY_LOCK: threading.Lock = threading.Lock()
def _init_api_keys():
global ALL_API_KEYS
all_keys_string = os.environ.get("ALL_GEMINI_API_KEYS")
if all_keys_string:
ALL_API_KEYS = [key.strip() for key in all_keys_string.split(',') if key.strip()]
logging.info(f"✅ تعداد {len(ALL_API_KEYS)} کلید API جیمینای بارگذاری شد.")
if not ALL_API_KEYS:
logging.warning("⛔️ هشدار: هیچ Secret با نام ALL_GEMINI_API_KEYS یافت نشد! برنامه بدون کلید API کار نخواهد کرد.")
# --- ثابتها ---
FIXED_MODEL_NAME = "gemini-2.5-flash-preview-tts"
DEFAULT_MAX_CHUNK_SIZE = 3800
DEFAULT_SLEEP_BETWEEN_REQUESTS = 8
# --- توابع کمکی فایل و صدا ---
def save_binary_file(file_name, data):
try:
with open(file_name, "wb") as f: f.write(data)
return file_name
except Exception as e:
logging.error(f"❌ خطا در ذخیره فایل {file_name}: {e}")
return None
def convert_to_wav(audio_data: bytes, mime_type: str) -> bytes:
parameters = parse_audio_mime_type(mime_type)
bits_per_sample, rate = parameters["bits_per_sample"], parameters["rate"]
num_channels, data_size = 1, len(audio_data)
bytes_per_sample, block_align = bits_per_sample // 8, num_channels * (bits_per_sample // 8)
byte_rate, chunk_size = rate * block_align, 36 + data_size
header = struct.pack("<4sI4s4sIHHIIHH4sI", b"RIFF", chunk_size, b"WAVE", b"fmt ", 16, 1, num_channels, rate, byte_rate, block_align, bits_per_sample, b"data", data_size)
return header + audio_data
def parse_audio_mime_type(mime_type: str) -> dict[str, int]:
bits, rate = 16, 24000
for param in mime_type.split(";"):
param = param.strip()
if param.lower().startswith("rate="):
try: rate = int(param.split("=", 1)[1])
except: pass
elif param.startswith("audio/L"):
try: bits = int(param.split("L", 1)[1])
except: pass
return {"bits_per_sample": bits, "rate": rate}
def smart_text_split(text, max_size=3800):
if len(text) <= max_size: return [text]
chunks, current_chunk = [], ""
sentences = re.split(r'(?<=[.!?؟])\s+', text)
for sentence in sentences:
if len(current_chunk) + len(sentence) + 1 > max_size:
if current_chunk: chunks.append(current_chunk.strip())
current_chunk = sentence
while len(current_chunk) > max_size:
split_idx = next((i for i in range(max_size - 1, max_size // 2, -1) if current_chunk[i] in ['،', ',', ';', ':', ' ']), -1)
part, current_chunk = (current_chunk[:split_idx+1], current_chunk[split_idx+1:]) if split_idx != -1 else (current_chunk[:max_size], current_chunk[max_size:])
chunks.append(part.strip())
else: current_chunk += (" " if current_chunk else "") + sentence
if current_chunk: chunks.append(current_chunk.strip())
final_chunks = [c for c in chunks if c]
return final_chunks
def merge_audio_files_func(file_paths, output_path):
if not PYDUB_AVAILABLE: logging.warning("⚠️ pydub برای ادغام در دسترس نیست."); return False
try:
combined = AudioSegment.empty()
for i, fp in enumerate(file_paths):
if os.path.exists(fp): combined += AudioSegment.from_file(fp) + (AudioSegment.silent(duration=150) if i < len(file_paths) - 1 else AudioSegment.empty())
else: logging.warning(f"⚠️ فایل برای ادغام پیدا نشد: {fp}")
combined.export(output_path, format="wav")
return True
except Exception as e: logging.error(f"❌ خطا در ادغام فایلهای صوتی: {e}"); return False
def get_next_api_key():
global NEXT_KEY_INDEX, ALL_API_KEYS, KEY_LOCK
with KEY_LOCK:
if not ALL_API_KEYS: return None, None
key_to_use = ALL_API_KEYS[NEXT_KEY_INDEX % len(ALL_API_KEYS)]
key_display_index = (NEXT_KEY_INDEX % len(ALL_API_KEYS)) + 1
NEXT_KEY_INDEX += 1
return key_to_use, key_display_index
# --- منطق اصلی تولید صدا ---
def generate_audio_chunk_with_retry(chunk_text, prompt_text, voice, temp, session_id):
if not ALL_API_KEYS: raise Exception("هیچ کلید API برای پردازش در دسترس نیست.")
for _ in range(len(ALL_API_KEYS)):
selected_api_key, key_idx_display = get_next_api_key()
if not selected_api_key: break
logging.info(f"[{session_id}] ⚙️ تلاش برای تولید قطعه با کلید API شماره {key_idx_display}")
try:
client = genai.Client(api_key=selected_api_key)
final_text = f'"{prompt_text}"\n{chunk_text}' if prompt_text and prompt_text.strip() else chunk_text
contents = [types.Content(role="user", parts=[types.Part.from_text(text=final_text)])]
config = types.GenerateContentConfig(temperature=temp, response_modalities=["audio"],
speech_config=types.SpeechConfig(voice_config=types.VoiceConfig(
prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name=voice))))
response = client.models.generate_content(model=FIXED_MODEL_NAME, contents=contents, config=config)
if response.candidates and response.candidates[0].content and response.candidates[0].content.parts and response.candidates[0].content.parts[0].inline_data:
logging.info(f"[{session_id}] ✅ قطعه با موفقیت توسط کلید شماره {key_idx_display} تولید شد.")
return response.candidates[0].content.parts[0].inline_data
except Exception as e:
logging.error(f"[{session_id}] ❌ خطا در تولید قطعه با کلید شماره {key_idx_display}: {e}.")
return None
def core_generate_audio(text_input, prompt_input, selected_voice, temperature_val, session_id):
logging.info(f"[{session_id}] 🚀 شروع فرآیند تولید صدا.")
temp_dir = f"temp_{session_id}"
os.makedirs(temp_dir, exist_ok=True)
output_base_name = f"{temp_dir}/audio_session_{session_id}"
if not text_input or not text_input.strip(): raise ValueError("متن ورودی خالی است.")
text_chunks = smart_text_split(text_input, DEFAULT_MAX_CHUNK_SIZE)
if not text_chunks: raise ValueError("متن قابل پردازش به قطعات کوچکتر نیست.")
generated_files = []
try:
for i, chunk in enumerate(text_chunks):
logging.info(f"[{session_id}] 🔊 پردازش قطعه {i+1}/{len(text_chunks)}...")
inline_data = generate_audio_chunk_with_retry(chunk, prompt_input, selected_voice, temperature_val, session_id)
if inline_data:
data_buffer = inline_data.data
ext = mimetypes.guess_extension(inline_data.mime_type) or ".wav"
if "audio/L" in inline_data.mime_type and ext == ".wav":
data_buffer = convert_to_wav(data_buffer, inline_data.mime_type)
if not ext.startswith("."): ext = "." + ext
fpath = save_binary_file(f"{output_base_name}_part{i+1:03d}{ext}", data_buffer)
if fpath: generated_files.append(fpath)
else:
raise Exception(f"فرآیند متوقف شد زیرا تولید قطعه {i+1} با تمام کلیدهای موجود ناموفق بود.")
if i < len(text_chunks) - 1 and len(text_chunks) > 1: time.sleep(DEFAULT_SLEEP_BETWEEN_REQUESTS)
if not generated_files: raise Exception("هیچ فایل صوتی تولید نشد.")
final_output_path = f"output_{session_id}.wav"
if len(generated_files) > 1:
if PYDUB_AVAILABLE and merge_audio_files_func(generated_files, final_output_path):
final_audio_file = final_output_path
else:
shutil.move(generated_files[0], final_output_path)
final_audio_file = final_output_path
else:
shutil.move(generated_files[0], final_output_path)
final_audio_file = final_output_path
logging.info(f"[{session_id}] ✅ فایل صوتی نهایی با موفقیت تولید شد: {os.path.basename(final_audio_file)}")
return final_audio_file
finally:
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
# --- END: تعریف تمام توابع کمکی ---
# --- اجرای کدهای اولیه برنامه ---
_init_api_keys()
# --- تعریف اپلیکیشن FastAPI ---
app = FastAPI(title="Alpha TTS Worker API")
class TTSRequest(BaseModel):
text: str
prompt: str | None = ""
speaker: str
temperature: float
@app.post("/generate")
async def generate_audio_endpoint(request: TTSRequest):
session_id = str(uuid.uuid4())[:8]
logging.info(f"[{session_id}] 🏁 درخواست جدید API در این Worker دریافت شد.")
try:
final_path = core_generate_audio(
text_input=request.text,
prompt_input=request.prompt,
selected_voice=request.speaker,
temperature_val=request.temperature,
session_id=session_id
)
if final_path and os.path.exists(final_path):
from fastapi.responses import FileResponse
return FileResponse(path=final_path, media_type='audio/wav', filename=os.path.basename(final_path), background=shutil.rmtree(os.path.dirname(final_path), ignore_errors=True))
else:
raise HTTPException(status_code=500, detail="خطا در تولید فایل صوتی در Worker.")
except Exception as e:
logging.error(f"[{session_id}] ❌ خطای کلی در Worker: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/")
def health_check():
return {"status": "ok", "message": "TTS Worker is running."}
logging.info("✅✅✅ Application logic initialized successfully. Starting Uvicorn server...")
# --- START: بخش جدید برای اجرای سرور ---
if __name__ == "__main__":
# پورت را از متغیرهای محیطی هاگینگ فیس یا به صورت پیشفرض 7860 بخوان
port = int(os.environ.get("PORT", 7860))
# اجرای سرور Uvicorn از داخل کد پایتون
# reload=False برای محیط production مهم است
uvicorn.run(app, host="0.0.0.0", port=port, reload=False)
# --- END: بخش جدید برای اجرای سرور --- |