Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Generate a video from a text prompt and optionally extend it multiple times. | |
| Final length = duration * (num_extend + 1). | |
| Extension only works with VEO-generated videos (API rejects non-VEO sources). | |
| """ | |
| import argparse | |
| import json | |
| import os | |
| import subprocess | |
| import sys | |
| import tempfile | |
| import time | |
| from pathlib import Path | |
| from google import genai | |
| from google.genai import types | |
| def strip_audio(video_path: Path) -> None: | |
| """Remove audio track from video using ffmpeg (video stream copied, no re-encode).""" | |
| with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as f: | |
| temp_path = Path(f.name) | |
| try: | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-i", str(video_path), "-an", "-c:v", "copy", str(temp_path)], | |
| check=True, | |
| capture_output=True, | |
| ) | |
| temp_path.replace(video_path) | |
| finally: | |
| if temp_path.exists(): | |
| temp_path.unlink() | |
| def load_image(image_path: Path): | |
| """Load an image file into a types.Image for video conditioning.""" | |
| if not image_path.exists(): | |
| raise FileNotFoundError(f"Input image not found: {image_path}") | |
| try: | |
| return types.Image.from_file(location=str(image_path)) | |
| except TypeError: | |
| return types.Image.from_file(str(image_path)) | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser( | |
| description="Generate a video from a text prompt and optionally extend it (VEO only)." | |
| ) | |
| parser.add_argument( | |
| "--prompt", | |
| action="append", | |
| required=True, | |
| help="Prompt(s) for video. Pass once for all segments, or num_extend+1 times for initial + each extension.", | |
| ) | |
| parser.add_argument( | |
| "--model", | |
| default="veo-3.0-fast-generate-001", | |
| help="Video generation model name.", | |
| ) | |
| parser.add_argument("--name", default="generated_video", help="Base output filename.") | |
| parser.add_argument( | |
| "--output-dir", | |
| "--output_dir", | |
| dest="output_dir", | |
| default="output_dir", | |
| help="Directory to save outputs (default: output_dir).", | |
| ) | |
| parser.add_argument("--resolution", default="1080p", help="e.g. 720p, 1080p, 4k") | |
| parser.add_argument("--duration", type=int, default=8, help="Video length in seconds.") | |
| parser.add_argument( | |
| "--aspect-ratio", | |
| default="16:9", | |
| help="Aspect ratio (e.g. 16:9, 9:16, 1:1).", | |
| ) | |
| parser.add_argument( | |
| "--number-of-videos", | |
| type=int, | |
| default=1, | |
| help="How many videos to generate. When num-extend > 0, only the first is extended.", | |
| ) | |
| parser.add_argument( | |
| "--num-extend", | |
| type=int, | |
| default=0, | |
| help="How many times to extend the video. Final length = duration * (num_extend + 1).", | |
| ) | |
| parser.add_argument( | |
| "--start-image", | |
| "--start_image", | |
| dest="start_image", | |
| default=None, | |
| help="Path to image used as the first frame (initial generation only).", | |
| ) | |
| parser.add_argument( | |
| "--end-image", | |
| "--end_image", | |
| dest="end_image", | |
| default=None, | |
| help="Path to image used as the last frame (initial generation only; extensions do not support image conditioning).", | |
| ) | |
| parser.add_argument( | |
| "--poll-seconds", | |
| type=int, | |
| default=10, | |
| help="Polling interval while generation is running.", | |
| ) | |
| return parser.parse_args() | |
| def main() -> int: | |
| args = parse_args() | |
| if not os.getenv("GEMINI_API_KEY"): | |
| print("Missing GEMINI_API_KEY environment variable.", file=sys.stderr) | |
| return 1 | |
| if args.num_extend < 0: | |
| print("--num-extend must be >= 0.", file=sys.stderr) | |
| return 1 | |
| prompts: list[str] = args.prompt | |
| if len(prompts) > 1: | |
| expected = args.num_extend + 1 | |
| if len(prompts) != expected: | |
| print( | |
| f"With {len(prompts)} prompts, expected num_extend+1 = {expected}. " | |
| f"Got num_extend={args.num_extend}.", | |
| file=sys.stderr, | |
| ) | |
| return 1 | |
| else: | |
| prompts = [prompts[0]] * (args.num_extend + 1) | |
| client = genai.Client() | |
| first_image = None | |
| if args.start_image: | |
| start_path = Path(args.start_image).expanduser().resolve() | |
| first_image = load_image(start_path) | |
| print(f"Using start image: {start_path}") | |
| last_image = None | |
| if args.end_image: | |
| end_path = Path(args.end_image).expanduser().resolve() | |
| last_image = load_image(end_path) | |
| print(f"Using end image: {end_path}") | |
| config_kwargs = { | |
| "resolution": args.resolution, | |
| "duration_seconds": args.duration, | |
| "aspect_ratio": args.aspect_ratio, | |
| "number_of_videos": args.number_of_videos, | |
| } | |
| if last_image is not None: | |
| config_kwargs["last_frame"] = last_image | |
| config = types.GenerateVideosConfig(**config_kwargs) | |
| # Initial generation | |
| print("Generating initial video...") | |
| gen_kwargs = {"model": args.model, "prompt": prompts[0], "config": config} | |
| if first_image is not None: | |
| gen_kwargs["image"] = first_image | |
| operation = client.models.generate_videos(**gen_kwargs) | |
| started_at = time.time() | |
| while not operation.done: | |
| elapsed_seconds = int(time.time() - started_at) | |
| print(f"Waiting for video generation... elapsed: {elapsed_seconds}s") | |
| time.sleep(args.poll_seconds) | |
| operation = client.operations.get(operation) | |
| if operation.response is None: | |
| err = getattr(operation, "error", None) | |
| print(f"API returned no response. Error: {err}", file=sys.stderr) | |
| return 2 | |
| generated = operation.response.generated_videos | |
| if not generated: | |
| print("No videos returned by API.", file=sys.stderr) | |
| return 2 | |
| out_dir = Path(args.output_dir) | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| base_name = args.name | |
| saved_files = [] | |
| # Save initial video as _1 (when extending, only first is used; when not, save all) | |
| if args.num_extend > 0: | |
| video_obj = generated[0].video | |
| client.files.download(file=video_obj) | |
| out_path = out_dir / f"{base_name}_1.mp4" | |
| video_obj.save(str(out_path)) | |
| strip_audio(out_path) | |
| saved_files.append(str(out_path.resolve())) | |
| print(f"Saved video: {out_path.resolve()}") | |
| else: | |
| for idx, item in enumerate(generated, start=1): | |
| video_obj = item.video | |
| client.files.download(file=video_obj) | |
| out_path = out_dir / f"{base_name}_{idx}.mp4" | |
| video_obj.save(str(out_path)) | |
| strip_audio(out_path) | |
| saved_files.append(str(out_path.resolve())) | |
| print(f"Saved video: {out_path.resolve()}") | |
| # Extend num_extend times (only extends the first video; each stage saved as _2, _3, ...) | |
| for ext_idx in range(args.num_extend): | |
| print(f"Extending video ({ext_idx + 1}/{args.num_extend})...") | |
| video_to_extend = generated[0].video | |
| client.files.download(file=video_to_extend) | |
| extend_config = types.GenerateVideosConfig( | |
| number_of_videos=1, | |
| resolution=args.resolution, | |
| ) | |
| operation = client.models.generate_videos( | |
| model=args.model, | |
| video=video_to_extend, | |
| prompt=prompts[ext_idx + 1], | |
| config=extend_config, | |
| ) | |
| started_at = time.time() | |
| while not operation.done: | |
| elapsed_seconds = int(time.time() - started_at) | |
| print(f"Waiting for extension... elapsed: {elapsed_seconds}s") | |
| time.sleep(args.poll_seconds) | |
| operation = client.operations.get(operation) | |
| if operation.response is None: | |
| err = getattr(operation, "error", None) | |
| print(f"Extension API returned no response. Error: {err}", file=sys.stderr) | |
| return 2 | |
| generated = operation.response.generated_videos | |
| if not generated: | |
| print("No videos returned by extension API.", file=sys.stderr) | |
| return 2 | |
| # Save this extended video as _2, _3, _4, etc. | |
| video_idx = ext_idx + 2 | |
| video_obj = generated[0].video | |
| client.files.download(file=video_obj) | |
| out_path = out_dir / f"{base_name}_{video_idx}.mp4" | |
| video_obj.save(str(out_path)) | |
| strip_audio(out_path) | |
| saved_files.append(str(out_path.resolve())) | |
| print(f"Saved video: {out_path.resolve()}") | |
| final_duration_approx = args.duration * (args.num_extend + 1) | |
| metadata_path = out_dir / f"{base_name}.json" | |
| metadata = { | |
| "prompts": prompts, | |
| "model": args.model, | |
| "config": { | |
| "resolution": args.resolution, | |
| "duration_seconds": args.duration, | |
| "num_extend": args.num_extend, | |
| "final_duration_approx_seconds": final_duration_approx, | |
| "aspect_ratio": args.aspect_ratio, | |
| "number_of_videos": args.number_of_videos, | |
| "poll_seconds": args.poll_seconds, | |
| "start_image": str(Path(args.start_image).expanduser().resolve()) if args.start_image else None, | |
| "end_image": str(Path(args.end_image).expanduser().resolve()) if args.end_image else None, | |
| }, | |
| "saved_videos": saved_files, | |
| } | |
| metadata_path.write_text(json.dumps(metadata, indent=2), encoding="utf-8") | |
| print(f"Saved metadata: {metadata_path.resolve()}") | |
| print(f"Final length (approx): {final_duration_approx}s") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |