neyugncol commited on
Commit
3dddfe4
Β·
verified Β·
1 Parent(s): 422fca6

Add read video segment tool

Browse files
Files changed (6) hide show
  1. app.py +94 -84
  2. prompt.py +37 -0
  3. rag.py +47 -7
  4. tools.py +51 -38
  5. transcriber.py +12 -7
  6. utils.py +24 -0
app.py CHANGED
@@ -1,85 +1,95 @@
1
- import os
2
- import shutil
3
-
4
- import gradio as gr
5
- from smolagents import ChatMessageToolCall, ActionStep, FinalAnswerStep
6
-
7
- from agent import VideoChatbot
8
- from configs import settings
9
-
10
-
11
- bot = VideoChatbot(
12
- model=settings.CHATBOT_MODEL,
13
- api_base=settings.MODEL_BASE_API,
14
- api_key=os.environ['GEMINI_API_KEY']
15
- )
16
-
17
-
18
- def chat(message: dict, history: list[dict]):
19
-
20
- # move the file to the data directory
21
- message['files'] = [shutil.copy(file, settings.DATA_DIR) for file in message['files']]
22
-
23
- # add the input message to the history
24
- history.extend([{'role': 'user', 'content': {'path': file}} for file in message['files']])
25
- history.append({'role': 'user', 'content': message['text']})
26
- yield history, ''
27
-
28
- for step in bot.chat(message['text'], message['files']):
29
- match step:
30
- case ChatMessageToolCall():
31
- if step.function.name == 'download_video':
32
- history.append({
33
- 'role': 'assistant',
34
- 'content': f'πŸ“₯ Downloading video from {step.function.arguments["url"]}'
35
- })
36
- elif step.function.name == 'add_video':
37
- history.append({
38
- 'role': 'assistant',
39
- 'content': f'πŸŽ₯ Processing and adding video `{step.function.arguments["filename"]}` '
40
- f'to the knowledge base. This may take a while...'
41
- })
42
- elif step.function.name == 'search_in_video':
43
- filename = os.path.basename(bot.video_rag.videos[step.function.arguments["video_id"]]['video_path'])
44
- history.append({
45
- 'role': 'assistant',
46
- 'content': f'πŸ” Searching in video `{filename}` '
47
- f'for query: *{step.function.arguments.get("text_query", step.function.arguments.get("image_query", ""))}*'
48
- })
49
- elif step.function.name == 'final_answer':
50
- continue
51
- yield history, ''
52
- case ActionStep():
53
- yield history, ''
54
- case FinalAnswerStep():
55
- history.append({'role': 'assistant', 'content': step.output})
56
- yield history, ''
57
-
58
-
59
- def clear_chat(chatbot):
60
- chatbot.clear()
61
- return chatbot, gr.update(value='')
62
-
63
-
64
- def main():
65
- with gr.Blocks() as demo:
66
- gr.Markdown('# Video Chatbot Demo')
67
- gr.Markdown('This demo showcases a video chatbot that can process and search videos using '
68
- 'RAG (Retrieval-Augmented Generation). You can upload videos/images or link to YouTube videos, '
69
- 'ask questions, and get answers based on the video content.')
70
- chatbot = gr.Chatbot(type='messages', label='Video Chatbot', height=800, resizable=True)
71
- textbox = gr.MultimodalTextbox(
72
- sources=['upload'],
73
- file_types=['image', '.mp4'],
74
- show_label=False,
75
- placeholder='Type a message or upload an image/video...',
76
-
77
- )
78
- textbox.submit(chat, [textbox, chatbot], [chatbot, textbox])
79
- clear = gr.Button('Clear Chat')
80
- clear.click(clear_chat, [chatbot], [chatbot, textbox])
81
-
82
- demo.launch(debug=True)
83
-
84
- if __name__ == '__main__':
 
 
 
 
 
 
 
 
 
 
85
  main()
 
1
+ import os
2
+ import shutil
3
+
4
+ import gradio as gr
5
+ from smolagents import ChatMessageToolCall, ActionStep, FinalAnswerStep
6
+
7
+ import utils
8
+ from agent import VideoChatbot
9
+ from configs import settings
10
+
11
+
12
+ bot = VideoChatbot(
13
+ model=settings.CHATBOT_MODEL,
14
+ api_base=settings.MODEL_BASE_API,
15
+ api_key=os.environ['GEMINI_API_KEY']
16
+ )
17
+
18
+
19
+ def chat(message: dict, history: list[dict]):
20
+
21
+ # move the file to the data directory
22
+ message['files'] = [shutil.copy(file, settings.DATA_DIR) for file in message['files']]
23
+
24
+ # add the input message to the history
25
+ history.extend([{'role': 'user', 'content': {'path': file}} for file in message['files']])
26
+ history.append({'role': 'user', 'content': message['text']})
27
+ yield history, ''
28
+
29
+ for step in bot.chat(message['text'], message['files']):
30
+ match step:
31
+ case ChatMessageToolCall():
32
+ if step.function.name == 'download_video':
33
+ history.append({
34
+ 'role': 'assistant',
35
+ 'content': f'πŸ“₯ Downloading video from {step.function.arguments["url"]}'
36
+ })
37
+ elif step.function.name == 'index_video':
38
+ video_path = os.path.join(settings.DATA_DIR, step.function.arguments['filename'])
39
+ video_duration = utils.seconds_to_hms(int(utils.get_media_duration(video_path)))
40
+ history.append({
41
+ 'role': 'assistant',
42
+ 'content': f'πŸŽ₯ Indexing video `{step.function.arguments["filename"]}` with length *{video_duration}* '
43
+ f'to the knowledge base. This may take a while...'
44
+ })
45
+ elif step.function.name == 'search_video_segments':
46
+ filename = os.path.basename(bot.video_rag.videos[step.function.arguments["video_id"]]['video_path'])
47
+ history.append({
48
+ 'role': 'assistant',
49
+ 'content': f'πŸ” Searching video segments in `{filename}` '
50
+ f'for query: *{step.function.arguments.get("text_query", step.function.arguments.get("image_query", ""))}*'
51
+ })
52
+ elif step.function.name == 'read_video_segment':
53
+ filename = os.path.basename(bot.video_rag.videos[step.function.arguments["video_id"]]['video_path'])
54
+ history.append({
55
+ 'role': 'assistant',
56
+ 'content': f'πŸ“– Reading video segment `{filename}` '
57
+ f'from *{step.function.arguments["start_time"]}* to *{step.function.arguments["end_time"]}*'
58
+ })
59
+ elif step.function.name == 'final_answer':
60
+ continue
61
+ yield history, ''
62
+ case ActionStep():
63
+ yield history, ''
64
+ case FinalAnswerStep():
65
+ history.append({'role': 'assistant', 'content': step.output})
66
+ yield history, ''
67
+
68
+
69
+ def clear_chat(chatbot):
70
+ chatbot.clear()
71
+ return chatbot, gr.update(value='')
72
+
73
+
74
+ def main():
75
+ with gr.Blocks() as demo:
76
+ gr.Markdown('# Video Chatbot Demo')
77
+ gr.Markdown('This demo showcases a video chatbot that can process and search videos using '
78
+ 'RAG (Retrieval-Augmented Generation). You can upload videos/images or link to YouTube videos, '
79
+ 'ask questions, and get answers based on the video content.')
80
+ chatbot = gr.Chatbot(type='messages', label='Video Chatbot', height=800, resizable=True)
81
+ textbox = gr.MultimodalTextbox(
82
+ sources=['upload'],
83
+ file_types=['image', '.mp4'],
84
+ show_label=False,
85
+ placeholder='Type a message or upload an image/video...',
86
+
87
+ )
88
+ textbox.submit(chat, [textbox, chatbot], [chatbot, textbox])
89
+ clear = gr.Button('Clear Chat')
90
+ clear.click(clear_chat, [chatbot], [chatbot, textbox])
91
+
92
+ demo.launch(debug=True)
93
+
94
+ if __name__ == '__main__':
95
  main()
prompt.py CHANGED
@@ -1,5 +1,7 @@
1
  import os
2
 
 
 
3
 
4
  def image_to_text_prompt(image_path: str, metadata: dict = None) -> str:
5
  """Generate a text prompt to represent a image file with its metadata."""
@@ -25,3 +27,38 @@ Filename: {os.path.basename(video_path)}
25
  Metadata: {metadata_lines}
26
  </video>
27
  '''
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import os
2
 
3
+ import utils
4
+
5
 
6
  def image_to_text_prompt(image_path: str, metadata: dict = None) -> str:
7
  """Generate a text prompt to represent a image file with its metadata."""
 
27
  Metadata: {metadata_lines}
28
  </video>
29
  '''
30
+
31
+
32
+ def video_segment_to_text_prompt(
33
+ start: float,
34
+ end: float,
35
+ transcript_segments: list[dict],
36
+ frame_paths: list[str]
37
+ ) -> str:
38
+ """Generate a text prompt to represent a video segment with its timespan, transcript segments, and frame images."""
39
+
40
+ # include timespans
41
+ timespan_text = f'{utils.seconds_to_hms(int(start))} - {utils.seconds_to_hms(int(end))}'
42
+
43
+ # include transcript segments
44
+ transcript_texts = []
45
+ for segment in transcript_segments:
46
+ transcript_texts.append(
47
+ f'- {utils.seconds_to_hms(int(segment["start"]), drop_hours=True)}'
48
+ f'-{utils.seconds_to_hms(int(segment["end"]), drop_hours=True)}: {segment["text"]}')
49
+ transcript_lines = '\n'.join(transcript_texts)
50
+ if transcript_lines:
51
+ transcript_lines = '\n' + transcript_lines
52
+
53
+ # include frame images
54
+ image_tags = []
55
+ for frame_path in frame_paths:
56
+ image_tags.append(f'<image>{frame_path}</image>')
57
+ frame_images_lines = '\n'.join(image_tags)
58
+
59
+ return f'''<video_segment>
60
+ Timespan: {timespan_text}
61
+ Transcript: {transcript_lines}
62
+ {frame_images_lines}
63
+ </video_segment>
64
+ '''
rag.py CHANGED
@@ -76,19 +76,19 @@ class VideoRAG:
76
  raise ValueError(f'Video with ID {video_id} not found.')
77
  return self.videos[video_id]
78
 
79
- def add_video(self, video_path: str) -> str:
80
- """Add a video to the RAG system by processing its frames and transcripts.
81
 
82
  Args:
83
- video_path (str): The path to the video file to be added.
84
 
85
  Returns:
86
- str: A unique video ID generated for the added video.
87
  """
88
  # create a unique video ID
89
  video_id = uuid.uuid4().hex[:8]
90
 
91
- print(f'Adding video "{video_path}" with ID {video_id} to the RAG system...')
92
 
93
  print('Extracting video frames')
94
  # process video frames
@@ -149,12 +149,13 @@ class VideoRAG:
149
  # add video metadata to the database
150
  self.videos[video_id] = {
151
  'video_path': video_path,
 
152
  'frame_dir': f'{video_path}_frames',
153
  'video_frame_rate': self.video_frame_rate,
154
  'transcript_segments': segments,
155
  }
156
 
157
- print(f'Video "{video_path}" added with ID {video_id}.')
158
  return video_id
159
 
160
  def search(self, video_id: str, text: str = None, image: str | Image.Image = None, limit: int = 10) -> list[dict]:
@@ -274,6 +275,45 @@ class VideoRAG:
274
 
275
  return timespans
276
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
277
  def clear(self):
278
  """Clear the RAG system by dropping all tables and resetting video metadata."""
279
  self._init_db()
@@ -332,4 +372,4 @@ def merge_searched_timespans(timespans: list[dict], threshold: float) -> list[di
332
 
333
  # Add the last span
334
  merged_spans.append(current_span)
335
- return merged_spans
 
76
  raise ValueError(f'Video with ID {video_id} not found.')
77
  return self.videos[video_id]
78
 
79
+ def index(self, video_path: str) -> str:
80
+ """Index a video file into the RAG system by extracting frames, transcribing audio, and computing embeddings.
81
 
82
  Args:
83
+ video_path (str): The path to the video file to be indexed.
84
 
85
  Returns:
86
+ str: A unique video ID generated for the indexed video.
87
  """
88
  # create a unique video ID
89
  video_id = uuid.uuid4().hex[:8]
90
 
91
+ print(f'Indexing video "{video_path}" with ID {video_id} to the RAG system...')
92
 
93
  print('Extracting video frames')
94
  # process video frames
 
149
  # add video metadata to the database
150
  self.videos[video_id] = {
151
  'video_path': video_path,
152
+ 'video_duration': utils.get_media_duration(video_path),
153
  'frame_dir': f'{video_path}_frames',
154
  'video_frame_rate': self.video_frame_rate,
155
  'transcript_segments': segments,
156
  }
157
 
158
+ print(f'Video "{video_path}" indexed with ID {video_id}.')
159
  return video_id
160
 
161
  def search(self, video_id: str, text: str = None, image: str | Image.Image = None, limit: int = 10) -> list[dict]:
 
275
 
276
  return timespans
277
 
278
+ def read(self, video_id: str, start: float, end: float) -> dict:
279
+ """Read a segment of the video by its ID and time range.
280
+
281
+ Args:
282
+ video_id (str): The ID of the video to read.
283
+ start (float): The start time of the segment in seconds.
284
+ end (float): The end time of the segment in seconds.
285
+
286
+ Returns:
287
+ dict: A dictionary containing the video segment metadata, including start and end times, frame paths, and transcript segments.
288
+ """
289
+ video_metadata = self.get_video(video_id)
290
+
291
+ if start > video_metadata['video_duration'] or end > video_metadata['video_duration']:
292
+ raise ValueError(f'Start ({start}) or end ({end}) time exceeds video duration ({video_metadata["video_duration"]}).')
293
+
294
+ timespan = {
295
+ 'start': start,
296
+ 'end': end,
297
+ 'frame_paths': [],
298
+ 'transcript_segments': []
299
+ }
300
+
301
+ # add frame paths
302
+ for frame_index in range(
303
+ int(start * self.video_frame_rate),
304
+ int(end * self.video_frame_rate)
305
+ ):
306
+ timespan['frame_paths'].append(os.path.join(video_metadata['frame_dir'], f'{frame_index + 1}.jpg'))
307
+
308
+ # add transcript segments
309
+ for segment in video_metadata['transcript_segments']:
310
+ if utils.span_iou((segment['start'], segment['end']),
311
+ (start, end)) > 0:
312
+ timespan['transcript_segments'].append(segment)
313
+
314
+ return timespan
315
+
316
+
317
  def clear(self):
318
  """Clear the RAG system by dropping all tables and resetting video metadata."""
319
  self._init_db()
 
372
 
373
  # Add the last span
374
  merged_spans.append(current_span)
375
+ return merged_spans
tools.py CHANGED
@@ -4,7 +4,7 @@ from smolagents import tool, Tool
4
 
5
  import utils
6
  from configs import settings
7
- from prompt import video_to_text_prompt
8
  from rag import VideoRAG
9
 
10
 
@@ -45,32 +45,32 @@ def download_video(url: str) -> str:
45
  def create_video_rag_tools(video_rag: VideoRAG) -> list[Tool]:
46
 
47
  @tool
48
- def add_video(filename: str) -> str:
49
  """
50
- Add a video file to the RAG knowledge-base for further search and analysis.
51
 
52
  Args:
53
- filename (str): The video filename to add.
54
 
55
  Returns:
56
- str: The video ID if added successfully, or an error message.
57
  """
58
  try:
59
- video_id = video_rag.add_video(os.path.join(settings.DATA_DIR, filename))
60
- return f'Video added with ID: {video_id}'
61
  except Exception as e:
62
- return f'Error adding video: {e.__class__.__name__}: {e}'
63
 
64
 
65
  @tool
66
- def search_in_video(video_id: str, text_query: str = None, image_query: str = None) -> str:
67
  """
68
- Search for relevant video frames and transcripts based on text or image query. Allows searching within a specific video added to the RAG knowledge-base.
69
  At least one of `text_query` or `image_query` must be provided.
70
  The image frames of the retrieved video segments will be output at a frame rate of 1 frame per second. The order of the frames is according to the returned video segments.
71
 
72
  Args:
73
- video_id (str): The ID of the video to search in. This should be the ID returned by `add_video`.
74
  text_query (str, optional): The text query to search for in the video transcripts.
75
  image_query (str, optional): The image query to search for in the video frames. This is the filename of the image.
76
 
@@ -79,7 +79,7 @@ def create_video_rag_tools(video_rag: VideoRAG) -> list[Tool]:
79
  """
80
 
81
  if not video_rag.is_video_exists(video_id):
82
- return f'Video with ID "{video_id}" not found in the knowledge-base. Please add the video first using `add_video` tool.'
83
  if not text_query and not image_query:
84
  return 'Please provide at least one of `text_query` or `image_query` to search in the video.'
85
  if image_query:
@@ -101,32 +101,45 @@ def create_video_rag_tools(video_rag: VideoRAG) -> list[Tool]:
101
  # build the output message
102
  output = f'Search results for video ID {video_id}:\n'
103
  for result in results:
104
- # include timespans
105
- timespan_text = f'{utils.seconds_to_hms(int(result["start"]))} - {utils.seconds_to_hms(int(result["end"]))}'
106
-
107
- # include transcript segments
108
- transcript_texts = []
109
- for segment in result['transcript_segments']:
110
- transcript_texts.append(
111
- f'- {utils.seconds_to_hms(int(segment["start"]), drop_hours=True)}'
112
- f'-{utils.seconds_to_hms(int(segment["end"]), drop_hours=True)}: {segment["text"]}')
113
- transcript_lines = '\n'.join(transcript_texts)
114
- if transcript_lines:
115
- transcript_lines = '\n' + transcript_lines
116
-
117
- # include frame images
118
- image_tags = []
119
- for frame_path in result['frame_paths']:
120
- image_tags.append(f'<image>{frame_path}</image>')
121
- frame_images_lines = '\n'.join(image_tags)
122
-
123
- output += f'''<video_segment>
124
- Timespan: {timespan_text}
125
- Transcript: {transcript_lines}
126
- {frame_images_lines}
127
- </video_segment>
128
- '''
129
 
130
  return output
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
131
 
132
- return [add_video, search_in_video]
 
4
 
5
  import utils
6
  from configs import settings
7
+ from prompt import video_to_text_prompt, video_segment_to_text_prompt
8
  from rag import VideoRAG
9
 
10
 
 
45
  def create_video_rag_tools(video_rag: VideoRAG) -> list[Tool]:
46
 
47
  @tool
48
+ def index_video(filename: str) -> str:
49
  """
50
+ Index a video file to the RAG knowledge-base for further search and analysis.
51
 
52
  Args:
53
+ filename (str): The video filename to index.
54
 
55
  Returns:
56
+ str: The video ID if indexed successfully, or an error message.
57
  """
58
  try:
59
+ video_id = video_rag.index(os.path.join(settings.DATA_DIR, filename))
60
+ return f'Video indexed with ID: {video_id}'
61
  except Exception as e:
62
+ return f'Error indexing video: {e.__class__.__name__}: {e}'
63
 
64
 
65
  @tool
66
+ def search_video_segments(video_id: str, text_query: str = None, image_query: str = None) -> str:
67
  """
68
+ Search for relevant video frames and transcripts based on text or image query. Allows searching within a specific video indexed to the RAG knowledge-base.
69
  At least one of `text_query` or `image_query` must be provided.
70
  The image frames of the retrieved video segments will be output at a frame rate of 1 frame per second. The order of the frames is according to the returned video segments.
71
 
72
  Args:
73
+ video_id (str): The ID of the video to search in. This should be the ID returned by `index_video`.
74
  text_query (str, optional): The text query to search for in the video transcripts.
75
  image_query (str, optional): The image query to search for in the video frames. This is the filename of the image.
76
 
 
79
  """
80
 
81
  if not video_rag.is_video_exists(video_id):
82
+ return f'Video with ID "{video_id}" not found in the knowledge-base. Please index the video first using `index_video` tool.'
83
  if not text_query and not image_query:
84
  return 'Please provide at least one of `text_query` or `image_query` to search in the video.'
85
  if image_query:
 
101
  # build the output message
102
  output = f'Search results for video ID {video_id}:\n'
103
  for result in results:
104
+ output += video_segment_to_text_prompt(
105
+ start=result['start'],
106
+ end=result['end'],
107
+ transcript_segments=result['transcript_segments'],
108
+ frame_paths=result['frame_paths']
109
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
110
 
111
  return output
112
+
113
+ def read_video_segment(video_id: str, start: str, end: str) -> str:
114
+ """
115
+ Read a specific segment of a video by its ID and time range. Use this tool when you want to read a specific segment of a video for further analysis. Don't use this tool to search for video segments, use `search_video_segments` instead. Don't read too long segments.
116
+
117
+ Args:
118
+ video_id (str): The ID of the video to read.
119
+ start (str): The start time in HH:MM:SS or MM:SS format. (e.g., "00:01:30" or "01:30" for 1 minute 30 seconds)
120
+ end (str): The end time in HH:MM:SS or MM:SS format. (e.g., "00:02:00" or "02:00" for 2 minutes)
121
+
122
+ Returns:
123
+ str: A message indicating the segment has been read or an error message if the video is not found. The output will include the video segment's timespan and the path to the video segment file.
124
+ """
125
+ if not video_rag.is_video_exists(video_id):
126
+ return f'Video with ID "{video_id}" not found in the knowledge-base. Please index the video first using `index_video` tool.'
127
+
128
+ # convert start and end to seconds
129
+ start_seconds = utils.hms_to_seconds(start)
130
+ end_seconds = utils.hms_to_seconds(end)
131
+
132
+ try:
133
+ result = video_rag.read(video_id, start_seconds, end_seconds)
134
+ except Exception as e:
135
+ return f'Error reading video segment: {e.__class__.__name__}: {e}'
136
+
137
+ return f'''Read video segment of video ID {video_id}:
138
+ {video_segment_to_text_prompt(
139
+ start=start_seconds,
140
+ end=end_seconds,
141
+ transcript_segments=result['transcript_segments'],
142
+ frame_paths=result['frame_paths']
143
+ )}'''
144
 
145
+ return [index_video, search_video_segments, read_video_segment]
transcriber.py CHANGED
@@ -4,6 +4,9 @@ from typing import Any
4
  from google import genai
5
  from google.genai import types
6
 
 
 
 
7
  class AudioTranscriber:
8
  """A class to transcribe audio files"""
9
 
@@ -33,14 +36,14 @@ Your response MUST be a valid JSON object with the following structure:
33
  - A "segment" is defined as a continuous section of speech from a single speaker include multiple sentences or phrases.
34
  - Each segment object MUST contain `text`, `start`, `end`, and `speaker` fields.
35
  - `text`: The verbatim transcription of the speech within that segment.
36
- - `start`: The precise start time of the segment in seconds, represented as a floating-point number (e.g., 0.0, 5.25).
37
- - `end`: The precise end time of the segment in seconds, represented as a floating-point number (e.g., 4.9, 10.12).
38
  - `speaker`: An integer representing the speaker ID.
39
  + Speaker IDs start at `0` for the first detected speaker.
40
  + The speaker ID MUST increment by 1 each time a new, distinct speaker is identified in the audio. Do not reuse speaker IDs within the same transcription.
41
  + If the same speaker talks again after another speaker, they retain their original speaker ID.
42
  + **Segment Splitting Rule**: A segment for the same speaker should only be split if there is a period of silence lasting more than 5 seconds. Otherwise, continuous speech from the same speaker, even with short pauses, should remain within a single segment.
43
-
44
  2. Language:
45
  - `language`: A two-letter ISO 639-1 code representing the primary language of the transcribed text (e.g., "en" for English, "es" for Spanish, "fr" for French).
46
  - If multiple languages are detected in the audio, you MUST select and output only the ISO 639-1 code for the primary language used throughout the audio.
@@ -60,11 +63,11 @@ Your response MUST be a valid JSON object with the following structure:
60
  'description': 'The transcribed text for the segment.'
61
  },
62
  'start': {
63
- 'type': 'number',
64
  'description': 'The start time of the segment in seconds.'
65
  },
66
  'end': {
67
- 'type': 'number',
68
  'description': 'The end time of the segment in seconds.'
69
  },
70
  'speaker': {
@@ -117,9 +120,11 @@ Your response MUST be a valid JSON object with the following structure:
117
  if uploaded_file.state == 'FAILED':
118
  raise ValueError('Failed to upload the audio file')
119
 
 
 
120
  response = self.client.models.generate_content(
121
  model=self.model,
122
- contents=uploaded_file,
123
  config=types.GenerateContentConfig(
124
  system_instruction=self.SYSTEM_INSTRUCTION,
125
  temperature=0.2,
@@ -131,4 +136,4 @@ Your response MUST be a valid JSON object with the following structure:
131
  if response.parsed is None:
132
  raise ValueError('Failed to transcribe the audio file')
133
 
134
- return response.parsed # type: ignore
 
4
  from google import genai
5
  from google.genai import types
6
 
7
+ import utils
8
+
9
+
10
  class AudioTranscriber:
11
  """A class to transcribe audio files"""
12
 
 
36
  - A "segment" is defined as a continuous section of speech from a single speaker include multiple sentences or phrases.
37
  - Each segment object MUST contain `text`, `start`, `end`, and `speaker` fields.
38
  - `text`: The verbatim transcription of the speech within that segment.
39
+ - `start`: The precise start time of the segment in seconds, represented as a integer number (e.g., 1, 5)
40
+ - `end`: The precise end time of the segment in seconds, represented as a integer number (e.g., 2, 6)
41
  - `speaker`: An integer representing the speaker ID.
42
  + Speaker IDs start at `0` for the first detected speaker.
43
  + The speaker ID MUST increment by 1 each time a new, distinct speaker is identified in the audio. Do not reuse speaker IDs within the same transcription.
44
  + If the same speaker talks again after another speaker, they retain their original speaker ID.
45
  + **Segment Splitting Rule**: A segment for the same speaker should only be split if there is a period of silence lasting more than 5 seconds. Otherwise, continuous speech from the same speaker, even with short pauses, should remain within a single segment.
46
+
47
  2. Language:
48
  - `language`: A two-letter ISO 639-1 code representing the primary language of the transcribed text (e.g., "en" for English, "es" for Spanish, "fr" for French).
49
  - If multiple languages are detected in the audio, you MUST select and output only the ISO 639-1 code for the primary language used throughout the audio.
 
63
  'description': 'The transcribed text for the segment.'
64
  },
65
  'start': {
66
+ 'type': 'integer',
67
  'description': 'The start time of the segment in seconds.'
68
  },
69
  'end': {
70
+ 'type': 'integer',
71
  'description': 'The end time of the segment in seconds.'
72
  },
73
  'speaker': {
 
120
  if uploaded_file.state == 'FAILED':
121
  raise ValueError('Failed to upload the audio file')
122
 
123
+ audio_duration = utils.get_media_duration(audio_path)
124
+
125
  response = self.client.models.generate_content(
126
  model=self.model,
127
+ contents=[uploaded_file, f'Audio duration: {int(audio_duration)} seconds'],
128
  config=types.GenerateContentConfig(
129
  system_instruction=self.SYSTEM_INSTRUCTION,
130
  temperature=0.2,
 
136
  if response.parsed is None:
137
  raise ValueError('Failed to transcribe the audio file')
138
 
139
+ return response.parsed # type: ignore
utils.py CHANGED
@@ -3,6 +3,7 @@ import os.path
3
  import subprocess
4
 
5
  from yt_dlp import YoutubeDL
 
6
 
7
  from configs import settings
8
 
@@ -149,6 +150,16 @@ def split_media_file(file_path: str, output_dir: str, segment_length: int = 60)
149
  return sorted(glob.glob(f'{output_dir}/*{base_name}_*.{extension}'))
150
 
151
 
 
 
 
 
 
 
 
 
 
 
152
  def span_iou(span1: tuple[float, float], span2: tuple[float, float]) -> float:
153
  """Calculate the Intersection over Union (IoU) of two spans."""
154
  start1, end1 = span1
@@ -179,3 +190,16 @@ def seconds_to_hms(total_seconds: int, drop_hours: bool = False) -> str:
179
  return f'{minutes:02d}:{seconds:02d}'
180
 
181
  return f'{hours:02d}:{minutes:02d}:{seconds:02d}'
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3
  import subprocess
4
 
5
  from yt_dlp import YoutubeDL
6
+ from pymediainfo import MediaInfo
7
 
8
  from configs import settings
9
 
 
150
  return sorted(glob.glob(f'{output_dir}/*{base_name}_*.{extension}'))
151
 
152
 
153
+ def get_media_duration(file_path: str) -> float:
154
+ """Get the duration of a media file in seconds."""
155
+ # use pymediainfo to get the duration
156
+ media_info = MediaInfo.parse(file_path)
157
+ for track in media_info.tracks:
158
+ if track.track_type == 'General':
159
+ return track.duration / 1000.0
160
+ raise ValueError(f'Could not determine duration for file: {file_path}')
161
+
162
+
163
  def span_iou(span1: tuple[float, float], span2: tuple[float, float]) -> float:
164
  """Calculate the Intersection over Union (IoU) of two spans."""
165
  start1, end1 = span1
 
190
  return f'{minutes:02d}:{seconds:02d}'
191
 
192
  return f'{hours:02d}:{minutes:02d}:{seconds:02d}'
193
+
194
+
195
+ def hms_to_seconds(hms: str) -> int:
196
+ """Convert a string formatted as HH:MM:SS to total seconds."""
197
+ parts = hms.split(':')
198
+ if len(parts) == 2: # MM:SS format
199
+ minutes, seconds = map(int, parts)
200
+ return minutes * 60 + seconds
201
+ elif len(parts) == 3: # HH:MM:SS format
202
+ hours, minutes, seconds = map(int, parts)
203
+ return hours * 3600 + minutes * 60 + seconds
204
+ else:
205
+ raise ValueError('Invalid time format. Use HH:MM:SS or MM:SS.')