| import torch |
|
|
| import gradio as gr |
| import yt_dlp as youtube_dl |
| import numpy as np |
| from datasets import Dataset, Audio |
| from scipy.io import wavfile |
|
|
| from transformers import pipeline |
| from transformers.pipelines.audio_utils import ffmpeg_read |
|
|
| import tempfile |
| import os |
| import time |
| import demucs.api |
|
|
|
|
|
|
| MODEL_NAME = "openai/whisper-large-v3" |
| DEMUCS_MODEL_NAME = "htdemucs_ft" |
| BATCH_SIZE = 8 |
| FILE_LIMIT_MB = 1000 |
| YT_LENGTH_LIMIT_S = 3600 |
|
|
| device = 0 if torch.cuda.is_available() else "cpu" |
|
|
| pipe = pipeline( |
| task="automatic-speech-recognition", |
| model=MODEL_NAME, |
| chunk_length_s=30, |
| device=device, |
| ) |
|
|
| separator = demucs.api.Separator(model = DEMUCS_MODEL_NAME, ) |
|
|
| def separate_vocal(path): |
| origin, separated = separator.separate_audio_file(path) |
| demucs.api.save_audio(separated["vocals"], path, samplerate=separator.samplerate) |
| return path |
|
|
| |
|
|
| |
| |
| |
| |
| |
|
|
|
|
| def transcribe(inputs_path, task, use_demucs, dataset_name, oauth_token: gr.OAuthToken): |
| if inputs_path is None: |
| raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.") |
| |
| sampling_rate, inputs = wavfile.read(inputs_path) |
|
|
| out = pipe(inputs_path, batch_size=BATCH_SIZE, generate_kwargs={"task": task}, return_timestamps=True) |
| |
| text = out["text"] |
| |
| chunks = naive_postprocess_whisper_chunks(out["chunks"], inputs, sampling_rate) |
|
|
| transcripts = [] |
| audios = [] |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| for i,chunk in enumerate(chunks): |
| |
| |
| arr = chunk["audio"] |
| path = os.path.join(tmpdirname, f"{i}.wav") |
| wavfile.write(path, sampling_rate, arr) |
| |
| if use_demucs == "separate-audio": |
| |
| print(f"Separating vocals #{i}") |
| path = separate_vocal(path) |
| |
| audios.append(path) |
| transcripts.append(chunk["text"]) |
| |
| dataset = Dataset.from_dict({"audio": audios, "transcript": transcripts}).cast_column("audio", Audio()) |
| |
| |
| dataset.push_to_hub(dataset_name, token=oauth_token) |
| |
| return text |
|
|
|
|
| def _return_yt_html_embed(yt_url): |
| video_id = yt_url.split("?v=")[-1] |
| HTML_str = ( |
| f'<center> <iframe width="500" height="320" src="https://www.youtube.com/embed/{video_id}"> </iframe>' |
| " </center>" |
| ) |
| return HTML_str |
|
|
| def download_yt_audio(yt_url, filename): |
| info_loader = youtube_dl.YoutubeDL() |
| |
| try: |
| info = info_loader.extract_info(yt_url, download=False) |
| except youtube_dl.utils.DownloadError as err: |
| raise gr.Error(str(err)) |
| |
| file_length = info["duration_string"] |
| file_h_m_s = file_length.split(":") |
| file_h_m_s = [int(sub_length) for sub_length in file_h_m_s] |
| |
| if len(file_h_m_s) == 1: |
| file_h_m_s.insert(0, 0) |
| if len(file_h_m_s) == 2: |
| file_h_m_s.insert(0, 0) |
| file_length_s = file_h_m_s[0] * 3600 + file_h_m_s[1] * 60 + file_h_m_s[2] |
| |
| if file_length_s > YT_LENGTH_LIMIT_S: |
| yt_length_limit_hms = time.strftime("%HH:%MM:%SS", time.gmtime(YT_LENGTH_LIMIT_S)) |
| file_length_hms = time.strftime("%HH:%MM:%SS", time.gmtime(file_length_s)) |
| raise gr.Error(f"Maximum YouTube length is {yt_length_limit_hms}, got {file_length_hms} YouTube video.") |
| |
| ydl_opts = {"outtmpl": filename, "format": "worstvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best"} |
| |
| with youtube_dl.YoutubeDL(ydl_opts) as ydl: |
| try: |
| ydl.download([yt_url]) |
| except youtube_dl.utils.ExtractorError as err: |
| raise gr.Error(str(err)) |
|
|
|
|
| def yt_transcribe(yt_url, task, use_demucs, dataset_name, oauth_token: gr.OAuthToken, max_filesize=75.0, dataset_sampling_rate = 24000): |
| html_embed_str = _return_yt_html_embed(yt_url) |
|
|
| with tempfile.TemporaryDirectory() as tmpdirname: |
| filepath = os.path.join(tmpdirname, "video.mp4") |
| download_yt_audio(yt_url, filepath) |
| with open(filepath, "rb") as f: |
| inputs_path = f.read() |
|
|
| inputs = ffmpeg_read(inputs_path, pipe.feature_extractor.sampling_rate) |
| inputs = {"array": inputs, "sampling_rate": pipe.feature_extractor.sampling_rate} |
|
|
| out = pipe(inputs, batch_size=BATCH_SIZE, generate_kwargs={"task": task}, return_timestamps=True) |
| |
| text = out["text"] |
| |
| inputs = ffmpeg_read(inputs_path, dataset_sampling_rate) |
| |
| chunks = naive_postprocess_whisper_chunks(out["chunks"], inputs, dataset_sampling_rate) |
|
|
| transcripts = [] |
| audios = [] |
| with tempfile.TemporaryDirectory() as tmpdirname: |
| for i,chunk in enumerate(chunks): |
| |
| |
| arr = chunk["audio"] |
| path = os.path.join(tmpdirname, f"{i}.wav") |
| wavfile.write(path, dataset_sampling_rate, arr) |
| |
| if use_demucs == "separate-audio": |
| |
| print(f"Separating vocals #{i}") |
| path = separate_vocal(path) |
| |
| audios.append(path) |
| transcripts.append(chunk["text"]) |
| |
| dataset = Dataset.from_dict({"audio": audios, "transcript": transcripts}).cast_column("audio", Audio()) |
| |
| |
| dataset.push_to_hub(dataset_name, token=oauth_token) |
| |
|
|
| return html_embed_str, text |
|
|
|
|
| def naive_postprocess_whisper_chunks(chunks, audio_array, sampling_rate, stop_chars = ".!:;?", min_duration = 5): |
| |
| |
| |
| |
| min_duration = int(min_duration * sampling_rate) |
| |
| new_chunks = [] |
| while chunks: |
| current_chunk = chunks.pop(0) |
| begin, end = current_chunk["timestamp"] |
| begin, end = int(begin*sampling_rate), int(end*sampling_rate) |
| |
| current_dur = end-begin |
| |
| text = current_chunk["text"] |
| |
| |
| chunk_to_concat = [audio_array[begin:end]] |
| while chunks and (text[-1] not in stop_chars or (current_dur<min_duration)): |
| ch = chunks.pop(0) |
| |
| begin, end = ch["timestamp"] |
| begin, end = int(begin*sampling_rate), int(end*sampling_rate) |
| current_dur += end-begin |
| |
| text = "".join([text, ch["text"]]) |
| |
| |
| chunk_to_concat.append(audio_array[begin:end]) |
| |
|
|
| new_chunks.append({ |
| "text": text.strip(), |
| "audio": np.concatenate(chunk_to_concat), |
| }) |
| print(f"LENGTH CHUNK #{len(new_chunks)}: {current_dur/sampling_rate}s") |
| |
| return new_chunks |
| |
| |
| |
| mf_transcribe = gr.Interface( |
| fn=transcribe, |
| inputs=[ |
| gr.Audio(type="filepath"), |
| gr.Radio(["transcribe", "translate"], label="Task", value="transcribe"), |
| gr.Radio(["no-post-processing", "separate-audio"], label="Audio separation and cleaning (takes longer - use it if your samples are not cleaned (background noise and music))", value="separate-audio"), |
| gr.Textbox(lines=1, placeholder="Place your new dataset name here", label="Dataset name"), |
| ], |
| outputs="text", |
| theme="huggingface", |
| title="Create your own TTS dataset using your own recordings", |
| description=( |
| "This demo allows use to create a text-to-speech dataset from an input audio snippet and push it to hub to keep track of it." |
| f"Demo uses the checkpoint [{MODEL_NAME}](https://huggingface.co/{MODEL_NAME}) and 🤗 Transformers to automatically transcribe audio files" |
| " of arbitrary length. It then merge chunks of audio and push it to the hub." |
| ), |
| allow_flagging="never", |
| ) |
|
|
| yt_transcribe = gr.Interface( |
| fn=yt_transcribe, |
| inputs=[ |
| gr.Textbox(lines=1, placeholder="Paste the URL to a YouTube video here", label="YouTube URL"), |
| gr.Radio(["transcribe", "translate"], label="Task", value="transcribe"), |
| gr.Radio(["no-post-processing", "separate-audio"], label="Audio separation and cleaning (takes longer - use it if your samples are not cleaned (background noise and music))", value="separate-audio"), |
| gr.Textbox(lines=1, placeholder="Place your new dataset name here", label="Dataset name"), |
| ], |
| outputs=["html", "text"], |
| theme="huggingface", |
| title="Create your own TTS dataset using Youtube", |
| description=( |
| "This demo allows use to create a text-to-speech dataset from an input audio snippet and push it to hub to keep track of it." |
| f"Demo uses the checkpoint [{MODEL_NAME}](https://huggingface.co/{MODEL_NAME}) and 🤗 Transformers to automatically transcribe audio files" |
| " of arbitrary length. It then merge chunks of audio and push it to the hub." |
| ), |
| allow_flagging="never", |
| ) |
|
|
| with gr.Blocks() as demo: |
| with gr.Row(): |
| gr.LoginButton() |
| gr.LogoutButton() |
| gr.TabbedInterface([mf_transcribe, yt_transcribe], ["Microphone or Audio file", "YouTube"]) |
|
|
| demo.launch(debug=True) |
|
|