Source code for fmusvid.backends.ffmpeg.conversion

"""
FFmpeg conversion operations.

This module provides implementation for various video conversion operations.
"""

import os
from typing import Dict, Any, Tuple, Optional, Union, List
from pathlib import Path
import logging

logger = logging.getLogger(__name__)

[docs] class FFmpegConversion: """FFmpeg implementation for conversion operations."""
[docs] def trim(self, video: Dict[str, Any], start: float, end: Optional[float] = None) -> Dict[str, Any]: """ Trim video to specified time range. Args: video: Video dictionary start: Start time in seconds end: End time in seconds (None means until the end) Returns: New video dictionary """ # Calculate absolute start and end times abs_start = video.get("start", 0) + start if end is not None: abs_end = video.get("start", 0) + end else: # If end is not specified, use the original end time if video.get("end") is not None: abs_end = video["end"] else: # If original end is not specified, use the duration duration = float(video.get("info", {}).get("format", {}).get("duration", 0)) abs_end = duration # Create a new video dictionary with updated start and end times # Remove 'info' key to force recalculation with new times new_video = video.copy() new_video["start"] = abs_start new_video["end"] = abs_end if "info" in new_video: del new_video["info"] return new_video
[docs] def resize(self, video: Dict[str, Any], width: Optional[int] = None, height: Optional[int] = None, keep_aspect: bool = True) -> Dict[str, Any]: """ Resize video to specified dimensions. Args: video: Video dictionary width: Target width (None to auto-calculate from height) height: Target height (None to auto-calculate from width) keep_aspect: Maintain aspect ratio if only one dimension is specified Returns: New video dictionary """ # Get current dimensions info = self.get_info(video) current_width = info["width"] current_height = info["height"] # Calculate new dimensions if width is None and height is None: # No resize needed return video if width is None: # Calculate width from height if keep_aspect: width = int(current_width * (height / current_height)) else: width = current_width if height is None: # Calculate height from width if keep_aspect: height = int(current_height * (width / current_width)) else: height = current_height # Create a new temporary file self._ensure_temp_dir() temp_file = os.path.join(self._temp_dir, f"resize_{width}x{height}_{os.path.basename(video['path'])}") # Build FFmpeg arguments # Start with input options args = [] if video.get("start", 0) > 0: args.extend(["-ss", str(video["start"])]) # Add input and filters args.extend([ "-i", video["path"], "-vf", f"scale={width}:{height}", "-c:a", "copy" # Copy audio stream ]) # Add duration (output option, must come before output file) if video.get("end") is not None: duration = video["end"] - video.get("start", 0) args.extend(["-t", str(duration)]) # Add output options args.extend(["-y", temp_file]) # Run FFmpeg self._run_ffmpeg(args) # Get info for the new video info = self._run_ffprobe(["-i", temp_file]) # Create a new video dictionary new_video = { "path": temp_file, "info": info, "is_temp": True, "start": 0, # Reset start time since we've applied it "end": None # Reset end time since we've applied it } return new_video
[docs] def crop(self, video: Dict[str, Any], x: int, y: int, width: int, height: int) -> Dict[str, Any]: """ Crop video to specified region. Args: video: Video dictionary x: X coordinate of top-left corner y: Y coordinate of top-left corner width: Width of crop region height: Height of crop region Returns: New video dictionary """ # Create a new temporary file self._ensure_temp_dir() temp_file = os.path.join(self._temp_dir, f"crop_{width}x{height}_{os.path.basename(video['path'])}") # Build FFmpeg arguments args = [ "-i", video["path"], "-vf", f"crop={width}:{height}:{x}:{y}", "-c:a", "copy", # Copy audio stream "-y", # Overwrite output file if it exists temp_file ] # Add start and end times if specified if video.get("start", 0) > 0: args = ["-ss", str(video["start"])] + args if video.get("end") is not None: duration = video["end"] - video.get("start", 0) args += ["-t", str(duration)] # Run FFmpeg self._run_ffmpeg(args) # Get info for the new video info = self._run_ffprobe(["-i", temp_file]) # Create a new video dictionary new_video = { "path": temp_file, "info": info, "is_temp": True, "start": 0, # Reset start time since we've applied it "end": None # Reset end time since we've applied it } return new_video
[docs] def rotate(self, video: Dict[str, Any], degrees: float) -> Dict[str, Any]: """ Rotate video by specified degrees. Args: video: Video dictionary degrees: Rotation angle in degrees Returns: New video dictionary """ self._ensure_temp_dir() temp_file = os.path.join(self._temp_dir, f"rotate_{degrees}_{os.path.basename(video['path'])}") # Use transpose filter for 90-degree rotations (more efficient) # transpose=0: rotate 90 degrees counterclockwise # transpose=1: rotate 90 degrees clockwise # transpose=2: rotate 90 degrees counterclockwise and flip # transpose=3: rotate 90 degrees clockwise and flip if degrees == 90: filter_string = "transpose=1" elif degrees == -90 or degrees == 270: filter_string = "transpose=2" elif degrees == 180: filter_string = "transpose=1,transpose=1" else: # For arbitrary angles, use rotate filter filter_string = f"rotate={degrees}*PI/180" args = ["-i", video["path"]] if video.get("start", 0) > 0: args = ["-ss", str(video["start"])] + args if video.get("end") is not None: duration = video["end"] - video.get("start", 0) args += ["-t", str(duration)] args += [ "-vf", filter_string, "-c:a", "copy", "-y", temp_file ] self._run_ffmpeg(args) info = self._run_ffprobe(["-i", temp_file]) return { "path": temp_file, "info": info, "is_temp": True, "start": 0, "end": None }
[docs] def extract_frame(self, video: Dict[str, Any], time: float) -> Any: """ Extract a single frame at specified time. Args: video: Video dictionary time: Time in seconds Returns: PIL Image """ from PIL import Image import io # Calculate absolute time abs_time = video.get("start", 0) + time # Build FFmpeg arguments to extract a single frame args = [ "-ss", str(abs_time), "-i", video["path"], "-vframes", "1", # Extract only one frame "-f", "image2pipe", # Output to pipe "-pix_fmt", "rgb24", # RGB format "-vcodec", "png", # PNG codec "-" # Output to stdout ] # Run FFmpeg and capture output output = self._run_ffmpeg(args) # Create PIL Image from output image = Image.open(io.BytesIO(output)) return image
[docs] def concat(self, videos: List[Dict[str, Any]]) -> Dict[str, Any]: """ Concatenate multiple videos. Args: videos: List of video dictionaries Returns: New video dictionary """ if not videos: raise ValueError("No videos to concatenate") if len(videos) == 1: return videos[0] # Create a new temporary file self._ensure_temp_dir() temp_file = os.path.join(self._temp_dir, f"concat_{os.path.basename(videos[0]['path'])}") # Create a temporary file list for FFmpeg concat_list_path = os.path.join(self._temp_dir, "concat_list.txt") # Create concatenation file list with open(concat_list_path, "w") as f: for video in videos: # Escape single quotes in path escaped_path = video["path"].replace("'", "'\\''") if video.get("start", 0) > 0 or video.get("end") is not None: # For videos with start/end times, we need to create trimmed versions trimmed_path = os.path.join(self._temp_dir, f"trim_{os.path.basename(video['path'])}") trim_args = ["-i", video["path"]] if video.get("start", 0) > 0: trim_args = ["-ss", str(video["start"])] + trim_args if video.get("end") is not None: duration = video["end"] - video.get("start", 0) trim_args += ["-t", str(duration)] trim_args += [ "-c", "copy", # Just copy streams without re-encoding "-y", # Overwrite output file if it exists trimmed_path ] # Run FFmpeg to create the trimmed file self._run_ffmpeg(trim_args) # Use the trimmed file in the concat list escaped_path = trimmed_path.replace("'", "'\\''") f.write(f"file '{escaped_path}'\n") # Build FFmpeg arguments for concatenation args = [ "-f", "concat", "-safe", "0", # Allow absolute paths "-i", concat_list_path, "-c", "copy", # Copy streams without re-encoding "-y", # Overwrite output file if it exists temp_file ] # Run FFmpeg self._run_ffmpeg(args) # Get info for the new video info = self._run_ffprobe(["-i", temp_file]) # Create a new video dictionary new_video = { "path": temp_file, "info": info, "is_temp": True, "start": 0, "end": None } return new_video
[docs] def grid(self, videos: List[Dict[str, Any]], rows: int, cols: int, **kwargs) -> Dict[str, Any]: """ Arrange videos in a grid layout. Args: videos: List of video dictionaries rows: Number of rows cols: Number of columns **kwargs: Additional options (padding, etc.) Returns: New video dictionary """ if not videos: raise ValueError("No videos for grid layout") # Create a new temporary file self._ensure_temp_dir() temp_file = os.path.join(self._temp_dir, f"grid_{os.path.basename(videos[0]['path'])}") # Calculate the dimensions of each cell # Use the first video's dimensions as reference info = self.get_info(videos[0]) # Each cell maintains the original width, height is scaled to fit in rows cell_width = info["width"] cell_height = info["height"] // rows # Total output dimensions output_width = cell_width * cols output_height = cell_height * rows # Build filter complex filter_complex = [] # First, scale all videos to cell size for i, video in enumerate(videos): if i >= rows * cols: break # Only process videos that fit in the grid filter_complex.append(f"[{i}:v]scale={cell_width}:{cell_height}[v{i}]") # Then create the grid grid_filter = "" for r in range(rows): row_inputs = [] for c in range(cols): idx = r * cols + c if idx < len(videos) and idx < rows * cols: row_inputs.append(f"[v{idx}]") else: # Create black video for missing cells black_idx = len(filter_complex) filter_complex.append( f"color=black:{cell_width}x{cell_height}:d=999999[black{black_idx}]" ) row_inputs.append(f"[black{black_idx}]") # Horizontal stack for this row row_idx = r row_outputs = "".join(row_inputs) filter_complex.append(f"{row_outputs}hstack=inputs={cols}[row{row_idx}]") # Vertical stack all rows row_refs = "".join(f"[row{r}]" for r in range(rows)) filter_complex.append(f"{row_refs}vstack=inputs={rows}[v]") # Build FFmpeg arguments args = [] # Add all video inputs for video in videos: if video.get("start", 0) > 0: args.extend(["-ss", str(video["start"])]) args.extend(["-i", video["path"]]) if video.get("end") is not None: duration = video["end"] - video.get("start", 0) args.extend(["-t", str(duration)]) # Add filter complex and output options args.extend([ "-filter_complex", ";".join(filter_complex), "-map", "[v]", "-an", # No audio for grid layout "-y", # Overwrite output file if it exists temp_file ]) # Run FFmpeg self._run_ffmpeg(args) # Get info for the new video info = self._run_ffprobe(["-i", temp_file]) # Create a new video dictionary new_video = { "path": temp_file, "info": info, "is_temp": True, "start": 0, "end": None } return new_video
[docs] def reverse(self, video: Dict[str, Any]) -> Dict[str, Any]: """ Reverse video playback. Args: video: Video dictionary Returns: New video dictionary """ self._ensure_temp_dir() temp_file = os.path.join(self._temp_dir, f"reverse_{os.path.basename(video['path'])}") # Use reverse filter for both video and audio filter_complex = [] filter_complex.append("[0:v]reverse[vout]") filter_complex.append("[0:a]areverse[aout]") args = ["-i", video["path"]] if video.get("start", 0) > 0: args = ["-ss", str(video["start"])] + args if video.get("end") is not None: duration = video["end"] - video.get("start", 0) args += ["-t", str(duration)] args += [ "-filter_complex", ";".join(filter_complex), "-map", "[vout]", "-map", "[aout]", "-y", temp_file ] self._run_ffmpeg(args) info = self._run_ffprobe(["-i", temp_file]) return { "path": temp_file, "info": info, "is_temp": True, "start": 0, "end": None }
[docs] def replace_audio(self, video: Dict[str, Any], audio_path: Union[str, Path]) -> Dict[str, Any]: """ Replace audio track with new audio. Args: video: Video dictionary audio_path: Path to new audio file Returns: New video dictionary """ self._ensure_temp_dir() temp_file = os.path.join(self._temp_dir, f"replace_audio_{os.path.basename(video['path'])}") args = ["-i", video["path"], "-i", str(audio_path)] if video.get("start", 0) > 0: args = ["-ss", str(video["start"])] + args if video.get("end") is not None: duration = video["end"] - video.get("start", 0) args += ["-t", str(duration)] # Map video from first input, audio from second input args += [ "-map", "0:v", "-map", "1:a", "-c:v", "copy", "-c:a", "aac", "-shortest", "-y", temp_file ] self._run_ffmpeg(args) info = self._run_ffprobe(["-i", temp_file]) return { "path": temp_file, "info": info, "is_temp": True, "start": 0, "end": None }
[docs] def export_gif(self, video: Dict[str, Any], output_path: Union[str, Path], fps: Optional[float] = None, quality: int = 95, loop: int = 0, width: Optional[int] = None) -> None: """ Export video as animated GIF. Args: video: Video dictionary output_path: Output GIF path fps: Frame rate for GIF (None = use original) quality: Quality (1-100, higher = better) loop: Loop count (0 = infinite) width: Resize width (None = keep original) """ output_path = str(output_path) info = self.get_info(video) original_fps = info.get("fps", 30) target_fps = fps if fps is not None else min(original_fps, 15) # For high quality GIF, we use a two-pass approach with palette generation self._ensure_temp_dir() palette_file = os.path.join(self._temp_dir, "palette.png") # First pass: generate palette filter_parts = [] if width is not None: filter_parts.append(f"scale={width}:-1") filter_parts.append(f"fps={target_fps}") if filter_parts: vf1 = ",".join(filter_parts) else: vf1 = f"fps={target_fps}" args1 = ["-i", video["path"]] if video.get("start", 0) > 0: args1 = ["-ss", str(video["start"])] + args1 if video.get("end") is not None: duration = video["end"] - video.get("start", 0) args1 += ["-t", str(duration)] args1 += [ "-vf", f"{vf1},palettegen=max_colors=256:reserve_transparent=1", "-y", palette_file ] self._run_ffmpeg(args1) # Second pass: create GIF using palette filter_parts2 = filter_parts + [f"fps={target_fps}"] if filter_parts2: vf2 = ",".join(filter_parts2 + ["paletteuse"]) else: vf2 = f"fps={target_fps},paletteuse" args2 = ["-i", video["path"], "-i", palette_file] if video.get("start", 0) > 0: args2 = ["-ss", str(video["start"])] + args2 if video.get("end") is not None: duration = video["end"] - video.get("start", 0) args2 += ["-t", str(duration)] args2 += [ "-filter_complex", f"[0:v]{vf2}[out]", "-map", "[out]", "-loop", str(loop), "-y", output_path ] self._run_ffmpeg(args2)
[docs] def export_frames(self, video: Dict[str, Any], output_dir: Union[str, Path], prefix: str = "frame_", format: str = "png", start: Optional[float] = None, end: Optional[float] = None, fps: Optional[float] = None) -> List[Path]: """ Export video frames as images. Args: video: Video dictionary output_dir: Output directory path prefix: Filename prefix format: Image format (png, jpg, etc.) start: Start time in seconds (None = from beginning) end: End time in seconds (None = to end) fps: Frame rate for extraction (None = use original fps) Returns: List of exported frame paths """ output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) info = self.get_info(video) original_fps = info.get("fps", 30) target_fps = fps if fps is not None else original_fps # Calculate start and end times abs_start = video.get("start", 0) if start is not None: abs_start += start if end is not None: abs_end = video.get("start", 0) + end elif video.get("end") is not None: abs_end = video["end"] else: abs_end = info.get("duration", 10.0) duration = abs_end - abs_start # Build FFmpeg args for frame extraction args = [ "-ss", str(abs_start), "-i", video["path"], "-t", str(duration), "-vf", f"fps={target_fps}", "-y", str(output_dir / f"{prefix}%04d.{format}") ] self._run_ffmpeg(args) # Get list of exported frames frames = sorted(output_dir.glob(f"{prefix}*.{format}")) return frames