| import ffmpeg, typer, os, sys, json |
| from loguru import logger |
| from PIL import Image |
| from tqdm import tqdm |
|
|
| logger.remove() |
| logger.add( |
| sys.stderr, |
| format="<d>{time:YYYY-MM-DD ddd HH:mm:ss}</d> | <lvl>{level}</lvl> | <lvl>{message}</lvl>", |
| ) |
| app = typer.Typer(pretty_exceptions_show_locals=False) |
|
|
|
|
| def parse_frame_name(fname: str): |
| """return a tuple of frame_type and frame_index""" |
| fn, fext = os.path.splitext(os.path.basename(fname)) |
| frame_type, frame_index = fn.split("_") |
| return frame_type, int(frame_index) |
|
|
|
|
| def get_fps_ffmpeg(video_path: str): |
| probe = ffmpeg.probe(video_path) |
| |
| video_stream = next( |
| (stream for stream in probe["streams"] if stream["codec_type"] == "video"), None |
| ) |
| if video_stream is None: |
| raise ValueError("No video stream found") |
| |
| r_frame_rate = video_stream["r_frame_rate"] |
| num, denom = map(int, r_frame_rate.split("/")) |
| return num / denom |
|
|
|
|
| @app.command() |
| def get_video_metadata(video_path: str, bverbose: bool = True): |
| """ |
| Extract comprehensive metadata from a video file. |
| |
| Args: |
| video_path (str): Path to the video file |
| |
| Returns: |
| dict: Dictionary containing video metadata including: |
| - width, height: Video dimensions |
| - duration: Video duration in seconds |
| - fps: Frames per second |
| - codec: Video codec name |
| - bitrate: Video bitrate |
| - format_name: Container format |
| - file_size: File size in bytes |
| """ |
| try: |
| probe = ffmpeg.probe(video_path) |
|
|
| |
| video_stream = next( |
| (stream for stream in probe["streams"] if stream["codec_type"] == "video"), |
| None, |
| ) |
|
|
| if video_stream is None: |
| raise ValueError("No video stream found") |
|
|
| |
| width = int(video_stream.get("width", 0)) |
| height = int(video_stream.get("height", 0)) |
| duration = float(video_stream.get("duration", 0)) |
|
|
| |
| r_frame_rate = video_stream.get("r_frame_rate", "0/1") |
| num, denom = map(int, r_frame_rate.split("/")) |
| fps = num / denom if denom != 0 else 0 |
|
|
| |
| codec = video_stream.get("codec_name", "unknown") |
| bitrate = ( |
| int(video_stream.get("bit_rate", 0)) if video_stream.get("bit_rate") else 0 |
| ) |
|
|
| |
| format_info = probe.get("format", {}) |
| format_name = format_info.get("format_name", "unknown") |
| file_size = int(format_info.get("size", 0)) |
|
|
| |
| audio_stream = next( |
| (stream for stream in probe["streams"] if stream["codec_type"] == "audio"), |
| None, |
| ) |
|
|
| audio_codec = audio_stream.get("codec_name", "none") if audio_stream else "none" |
| audio_bitrate = ( |
| int(audio_stream.get("bit_rate", 0)) |
| if audio_stream and audio_stream.get("bit_rate") |
| else 0 |
| ) |
|
|
| metadata = { |
| "width": width, |
| "height": height, |
| "duration": duration, |
| "fps": fps, |
| "video_codec": codec, |
| "video_bitrate": bitrate, |
| "audio_codec": audio_codec, |
| "audio_bitrate": audio_bitrate, |
| "format_name": format_name, |
| "file_size": file_size, |
| "total_streams": len(probe["streams"]), |
| } |
| if bverbose: |
| logger.info(f"Video metadata extracted: {json.dumps(metadata, indent=4)}") |
| return metadata |
|
|
| except Exception as e: |
| logger.error(f"Failed to extract video metadata: {e}") |
| return None |
|
|
|
|
| @app.command() |
| def extract_frames( |
| input_path: str, |
| fps: int = 8, |
| max_short_edge: int = 1080, |
| write_timestamp: bool = True, |
| write_frame_num: bool = True, |
| output_dir: str = None, |
| ): |
| """ |
| Extract frames from a video file using FFmpeg. |
| |
| Args: |
| input_path (str): Path to the input video file. |
| fps (int): Frames per second to extract. |
| max_short_edge (int): Maximum length of the shorter edge of the extracted frames. |
| write_timestamp (bool): Whether to write the timestamp of each frame. |
| write_frame_num (bool): Whether to write the frame number of each frame. |
| output_dir (str): Directory to save the extracted frames. |
| |
| Returns: |
| List of PIL Images |
| """ |
| if output_dir: |
| assert os.path.isdir( |
| output_dir |
| ), f"Output directory {output_dir} does not exist" |
|
|
| |
| vmeta = get_video_metadata(input_path, bverbose=False) |
| org_w, org_h = vmeta["width"], vmeta["height"] |
| max_short_edge = int(max_short_edge) if max_short_edge else min(org_w, org_h) |
| long_edge = int((max(org_h, org_w) / min(org_h, org_w)) * max_short_edge) |
| long_edge += 0 if long_edge % 2 == 0 else 1 |
| duration = vmeta["duration"] |
| org_fps = vmeta["fps"] |
| if fps > org_fps: |
| logger.debug( |
| f"requested fps({fps}) exceeded source fps({org_fps}): fps will be capped to source fps({org_fps})" |
| ) |
| fps = org_fps |
|
|
| |
| total_frames = int(duration * fps) |
|
|
| |
| add_scale_filter = max_short_edge < min(org_w, org_h) |
| w = max_short_edge if org_w < org_h else long_edge |
| h = max_short_edge if org_w > org_h else long_edge |
| logger.debug(f"Video dimensions: {org_w}x{org_h}") |
| if add_scale_filter: |
| logger.debug(f"\tscaling video to {w}x{h}") |
|
|
| |
| drawtext_filter_text = ( |
| r"text='Timestamp\:%{pts\:hms} \|Frame Number\: %{frame_num}'" |
| if write_frame_num |
| else r"text='Timestamp\:%{pts\:hms}'" |
| ) |
|
|
| |
| drawtext_filter = ( |
| f",drawtext={drawtext_filter_text}: x=(w-tw)/2: y=h-(2*lh): fontcolor=white: fontsize=20: box=1: boxcolor=0x00000099: boxborderw=5" |
| if write_timestamp |
| else "" |
| ) |
| scale_filter = ( |
| |
| f",scale='{w}:{h}'" |
| if add_scale_filter |
| else "" |
| ) |
| filter_chain = f"fps={fps}{drawtext_filter}{scale_filter}" |
|
|
| |
| process = ( |
| ffmpeg.input(input_path) |
| .output("pipe:", vf=filter_chain, format="rawvideo", pix_fmt="rgb24") |
| .run_async(pipe_stdout=True, pipe_stderr=True) |
| ) |
| logger.info(f"running ffmpeg with filter:\n{filter_chain}") |
|
|
| frame_size = ( |
| long_edge * max_short_edge * 3 if add_scale_filter else org_w * org_h * 3 |
| ) |
| frames = [] |
|
|
| |
| for _ in tqdm(range(total_frames), desc="Extracting frames with FFMPEG"): |
| in_bytes = process.stdout.read(frame_size) |
| if not in_bytes or len(in_bytes) < frame_size: |
| break |
| frame = Image.frombytes( |
| "RGB", (w, h) if add_scale_filter else (org_w, org_h), in_bytes |
| ) |
| frames.append(frame) |
|
|
| process.stdout.close() |
| process.wait() |
|
|
| if output_dir: |
| vname, _ = os.path.splitext(os.path.basename(input_path)) |
| for i, im in enumerate(tqdm(frames, desc=f"Saving frames to {output_dir}")): |
| output_path = os.path.join(output_dir, f"{vname}_{i}.jpg") |
| im.save(output_path) |
|
|
| return frames |
|
|
|
|
| def extract_specific_frames( |
| input_path: str, |
| timestamps_or_frames: list, |
| max_short_edge: int = 1080, |
| ): |
| """ |
| Extract specific frames from a video file using FFmpeg at given timestamps or frame numbers. |
| |
| Args: |
| input_path (str): Path to the input video file. |
| timestamps_or_frames (list): List of timestamps (in seconds) or frame numbers to extract. |
| max_short_edge (int): Maximum length of the shorter edge of the extracted frames. |
| write_timestamp (bool): Whether to write the timestamp of each frame. |
| write_frame_num (bool): Whether to write the frame number of each frame. |
| use_timestamps (bool): If True, treat input list as timestamps. If False, treat as frame numbers. |
| |
| Returns: |
| List of PIL Images corresponding to the specified timestamps/frames |
| """ |
| |
| vmeta = get_video_metadata(input_path, bverbose=False) |
| org_w, org_h = vmeta["width"], vmeta["height"] |
| max_short_edge = int(max_short_edge) if max_short_edge else min(org_w, org_h) |
| long_edge = int((max(org_h, org_w) / min(org_h, org_w)) * max_short_edge) |
| long_edge += 0 if long_edge % 2 == 0 else 1 |
| duration = vmeta["duration"] |
| org_fps = vmeta["fps"] |
|
|
| |
| add_scale_filter = max_short_edge < min(org_w, org_h) |
| w = max_short_edge if org_w < org_h else long_edge |
| h = max_short_edge if org_w > org_h else long_edge |
| logger.debug(f"Video dimensions: {org_w}x{org_h}") |
| if add_scale_filter: |
| logger.debug(f"\tscaling video to {w}x{h}") |
| scale_filter = f",scale='{w}:{h}'" if add_scale_filter else "" |
|
|
| frames = [] |
|
|
| for target in tqdm(timestamps_or_frames, desc="Extracting specific frames"): |
| try: |
| |
| use_timestamps = isinstance(target, float) |
| if use_timestamps: |
| seek_time = float(target) |
| if seek_time > duration: |
| logger.warning( |
| f"Timestamp {seek_time}s exceeds video duration {duration}s, skipping" |
| ) |
| continue |
| else: |
| |
| seek_time = float(target) / org_fps |
| if seek_time > duration: |
| logger.warning(f"Frame {target} exceeds video duration, skipping") |
| continue |
|
|
| filter_chain = f"fps={org_fps}{scale_filter}" |
|
|
| |
| logger.debug(f"Extracting frame at {seek_time}s") |
| process = ( |
| ffmpeg.input(input_path, ss=seek_time) |
| .output( |
| "pipe:", |
| vf=filter_chain, |
| format="rawvideo", |
| pix_fmt="rgb24", |
| frames=1, |
| ) |
| .run_async(pipe_stdout=True, pipe_stderr=True) |
| ) |
|
|
| frame_size = ( |
| w * h * 3 if add_scale_filter else org_w * org_h * 3 |
| ) |
|
|
| in_bytes = process.stdout.read(frame_size) |
| if in_bytes and len(in_bytes) >= frame_size: |
| frame = Image.frombytes( |
| "RGB", (w, h) if add_scale_filter else (org_w, org_h), in_bytes |
| ) |
| frames.append(frame) |
| else: |
| logger.warning( |
| f"Failed to extract frame at {'timestamp' if use_timestamps else 'frame'} {target}" |
| ) |
| frames.append( |
| None |
| ) |
|
|
| process.stdout.close() |
| process.wait() |
|
|
| except Exception as e: |
| logger.error( |
| f"Error extracting frame at {'timestamp' if use_timestamps else 'frame'} {target}: {e}" |
| ) |
| frames.append(None) |
|
|
| |
| logger.info( |
| f"Successfully extracted {len([f for f in frames if f is not None])} out of {len(timestamps_or_frames)} requested frames" |
| ) |
|
|
| return frames |
|
|
|
|
| @app.command() |
| def extract_audio( |
| video_path: str, |
| output_dir: str = "/tmp/miro/clip_cognition/audio/", |
| overwrite: bool = False, |
| ): |
| """extracting audio of a video file into m4a without re-encoding |
| ref: https://www.baeldung.com/linux/ffmpeg-audio-from-video#1-extracting-audio-without-re-encoding |
| """ |
| |
| vmeta = get_video_metadata(video_path, bverbose=False) |
| if vmeta.get("audio_codec") == "none": |
| logger.error(f"No audio found in {video_path}") |
| return None |
|
|
| |
| output_dir = output_dir if output_dir else os.path.dirname(video_path) |
| vname, vext = os.path.splitext(os.path.basename(video_path)) |
| output_dir = os.path.join(output_dir, vname) |
| output_fname = os.path.join(output_dir, vname + ".mp3") |
| if os.path.isfile(output_fname): |
| if overwrite: |
| os.remove(output_fname) |
| logger.warning(f"removed existing data: {output_fname}") |
| else: |
| logger.error(f"overwrite is false and data already exists!") |
| return None |
| os.makedirs(output_dir, exist_ok=True) |
|
|
| |
| stream = ffmpeg.input(video_path) |
| config_dict = {"map": "0:a", "acodec": "mp3"} |
| stream = ffmpeg.output(stream, output_fname, **config_dict) |
|
|
| |
| try: |
| ffmpeg.run(stream, capture_stdout=True, capture_stderr=True) |
| logger.success(f"audio extracted to {output_fname}") |
| return output_fname |
| except ffmpeg.Error as e: |
| logger.error(f"Error executing FFmpeg command: {e.stderr.decode()}") |
| return None |
|
|
|
|
| if __name__ == "__main__": |
| app() |
|
|