"""
FFmpeg conversion operations.
This module provides implementation for various video conversion operations.
"""
import os
from typing import Dict, Any, Tuple, Optional, Union, List
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
[docs]
class FFmpegConversion:
"""FFmpeg implementation for conversion operations."""
[docs]
def trim(self, video: Dict[str, Any], start: float, end: Optional[float] = None) -> Dict[str, Any]:
"""
Trim video to specified time range.
Args:
video: Video dictionary
start: Start time in seconds
end: End time in seconds (None means until the end)
Returns:
New video dictionary
"""
# Calculate absolute start and end times
abs_start = video.get("start", 0) + start
if end is not None:
abs_end = video.get("start", 0) + end
else:
# If end is not specified, use the original end time
if video.get("end") is not None:
abs_end = video["end"]
else:
# If original end is not specified, use the duration
duration = float(video.get("info", {}).get("format", {}).get("duration", 0))
abs_end = duration
# Create a new video dictionary with updated start and end times
# Remove 'info' key to force recalculation with new times
new_video = video.copy()
new_video["start"] = abs_start
new_video["end"] = abs_end
if "info" in new_video:
del new_video["info"]
return new_video
[docs]
def resize(self, video: Dict[str, Any], width: Optional[int] = None,
height: Optional[int] = None, keep_aspect: bool = True) -> Dict[str, Any]:
"""
Resize video to specified dimensions.
Args:
video: Video dictionary
width: Target width (None to auto-calculate from height)
height: Target height (None to auto-calculate from width)
keep_aspect: Maintain aspect ratio if only one dimension is specified
Returns:
New video dictionary
"""
# Get current dimensions
info = self.get_info(video)
current_width = info["width"]
current_height = info["height"]
# Calculate new dimensions
if width is None and height is None:
# No resize needed
return video
if width is None:
# Calculate width from height
if keep_aspect:
width = int(current_width * (height / current_height))
else:
width = current_width
if height is None:
# Calculate height from width
if keep_aspect:
height = int(current_height * (width / current_width))
else:
height = current_height
# Create a new temporary file
self._ensure_temp_dir()
temp_file = os.path.join(self._temp_dir, f"resize_{width}x{height}_{os.path.basename(video['path'])}")
# Build FFmpeg arguments
# Start with input options
args = []
if video.get("start", 0) > 0:
args.extend(["-ss", str(video["start"])])
# Add input and filters
args.extend([
"-i", video["path"],
"-vf", f"scale={width}:{height}",
"-c:a", "copy" # Copy audio stream
])
# Add duration (output option, must come before output file)
if video.get("end") is not None:
duration = video["end"] - video.get("start", 0)
args.extend(["-t", str(duration)])
# Add output options
args.extend(["-y", temp_file])
# Run FFmpeg
self._run_ffmpeg(args)
# Get info for the new video
info = self._run_ffprobe(["-i", temp_file])
# Create a new video dictionary
new_video = {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0, # Reset start time since we've applied it
"end": None # Reset end time since we've applied it
}
return new_video
[docs]
def crop(self, video: Dict[str, Any], x: int, y: int, width: int, height: int) -> Dict[str, Any]:
"""
Crop video to specified region.
Args:
video: Video dictionary
x: X coordinate of top-left corner
y: Y coordinate of top-left corner
width: Width of crop region
height: Height of crop region
Returns:
New video dictionary
"""
# Create a new temporary file
self._ensure_temp_dir()
temp_file = os.path.join(self._temp_dir, f"crop_{width}x{height}_{os.path.basename(video['path'])}")
# Build FFmpeg arguments
args = [
"-i", video["path"],
"-vf", f"crop={width}:{height}:{x}:{y}",
"-c:a", "copy", # Copy audio stream
"-y", # Overwrite output file if it exists
temp_file
]
# Add start and end times if specified
if video.get("start", 0) > 0:
args = ["-ss", str(video["start"])] + args
if video.get("end") is not None:
duration = video["end"] - video.get("start", 0)
args += ["-t", str(duration)]
# Run FFmpeg
self._run_ffmpeg(args)
# Get info for the new video
info = self._run_ffprobe(["-i", temp_file])
# Create a new video dictionary
new_video = {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0, # Reset start time since we've applied it
"end": None # Reset end time since we've applied it
}
return new_video
[docs]
def rotate(self, video: Dict[str, Any], degrees: float) -> Dict[str, Any]:
"""
Rotate video by specified degrees.
Args:
video: Video dictionary
degrees: Rotation angle in degrees
Returns:
New video dictionary
"""
self._ensure_temp_dir()
temp_file = os.path.join(self._temp_dir, f"rotate_{degrees}_{os.path.basename(video['path'])}")
# Use transpose filter for 90-degree rotations (more efficient)
# transpose=0: rotate 90 degrees counterclockwise
# transpose=1: rotate 90 degrees clockwise
# transpose=2: rotate 90 degrees counterclockwise and flip
# transpose=3: rotate 90 degrees clockwise and flip
if degrees == 90:
filter_string = "transpose=1"
elif degrees == -90 or degrees == 270:
filter_string = "transpose=2"
elif degrees == 180:
filter_string = "transpose=1,transpose=1"
else:
# For arbitrary angles, use rotate filter
filter_string = f"rotate={degrees}*PI/180"
args = ["-i", video["path"]]
if video.get("start", 0) > 0:
args = ["-ss", str(video["start"])] + args
if video.get("end") is not None:
duration = video["end"] - video.get("start", 0)
args += ["-t", str(duration)]
args += [
"-vf", filter_string,
"-c:a", "copy",
"-y",
temp_file
]
self._run_ffmpeg(args)
info = self._run_ffprobe(["-i", temp_file])
return {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0,
"end": None
}
[docs]
def concat(self, videos: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Concatenate multiple videos.
Args:
videos: List of video dictionaries
Returns:
New video dictionary
"""
if not videos:
raise ValueError("No videos to concatenate")
if len(videos) == 1:
return videos[0]
# Create a new temporary file
self._ensure_temp_dir()
temp_file = os.path.join(self._temp_dir, f"concat_{os.path.basename(videos[0]['path'])}")
# Create a temporary file list for FFmpeg
concat_list_path = os.path.join(self._temp_dir, "concat_list.txt")
# Create concatenation file list
with open(concat_list_path, "w") as f:
for video in videos:
# Escape single quotes in path
escaped_path = video["path"].replace("'", "'\\''")
if video.get("start", 0) > 0 or video.get("end") is not None:
# For videos with start/end times, we need to create trimmed versions
trimmed_path = os.path.join(self._temp_dir, f"trim_{os.path.basename(video['path'])}")
trim_args = ["-i", video["path"]]
if video.get("start", 0) > 0:
trim_args = ["-ss", str(video["start"])] + trim_args
if video.get("end") is not None:
duration = video["end"] - video.get("start", 0)
trim_args += ["-t", str(duration)]
trim_args += [
"-c", "copy", # Just copy streams without re-encoding
"-y", # Overwrite output file if it exists
trimmed_path
]
# Run FFmpeg to create the trimmed file
self._run_ffmpeg(trim_args)
# Use the trimmed file in the concat list
escaped_path = trimmed_path.replace("'", "'\\''")
f.write(f"file '{escaped_path}'\n")
# Build FFmpeg arguments for concatenation
args = [
"-f", "concat",
"-safe", "0", # Allow absolute paths
"-i", concat_list_path,
"-c", "copy", # Copy streams without re-encoding
"-y", # Overwrite output file if it exists
temp_file
]
# Run FFmpeg
self._run_ffmpeg(args)
# Get info for the new video
info = self._run_ffprobe(["-i", temp_file])
# Create a new video dictionary
new_video = {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0,
"end": None
}
return new_video
[docs]
def grid(self, videos: List[Dict[str, Any]], rows: int, cols: int, **kwargs) -> Dict[str, Any]:
"""
Arrange videos in a grid layout.
Args:
videos: List of video dictionaries
rows: Number of rows
cols: Number of columns
**kwargs: Additional options (padding, etc.)
Returns:
New video dictionary
"""
if not videos:
raise ValueError("No videos for grid layout")
# Create a new temporary file
self._ensure_temp_dir()
temp_file = os.path.join(self._temp_dir, f"grid_{os.path.basename(videos[0]['path'])}")
# Calculate the dimensions of each cell
# Use the first video's dimensions as reference
info = self.get_info(videos[0])
# Each cell maintains the original width, height is scaled to fit in rows
cell_width = info["width"]
cell_height = info["height"] // rows
# Total output dimensions
output_width = cell_width * cols
output_height = cell_height * rows
# Build filter complex
filter_complex = []
# First, scale all videos to cell size
for i, video in enumerate(videos):
if i >= rows * cols:
break # Only process videos that fit in the grid
filter_complex.append(f"[{i}:v]scale={cell_width}:{cell_height}[v{i}]")
# Then create the grid
grid_filter = ""
for r in range(rows):
row_inputs = []
for c in range(cols):
idx = r * cols + c
if idx < len(videos) and idx < rows * cols:
row_inputs.append(f"[v{idx}]")
else:
# Create black video for missing cells
black_idx = len(filter_complex)
filter_complex.append(
f"color=black:{cell_width}x{cell_height}:d=999999[black{black_idx}]"
)
row_inputs.append(f"[black{black_idx}]")
# Horizontal stack for this row
row_idx = r
row_outputs = "".join(row_inputs)
filter_complex.append(f"{row_outputs}hstack=inputs={cols}[row{row_idx}]")
# Vertical stack all rows
row_refs = "".join(f"[row{r}]" for r in range(rows))
filter_complex.append(f"{row_refs}vstack=inputs={rows}[v]")
# Build FFmpeg arguments
args = []
# Add all video inputs
for video in videos:
if video.get("start", 0) > 0:
args.extend(["-ss", str(video["start"])])
args.extend(["-i", video["path"]])
if video.get("end") is not None:
duration = video["end"] - video.get("start", 0)
args.extend(["-t", str(duration)])
# Add filter complex and output options
args.extend([
"-filter_complex", ";".join(filter_complex),
"-map", "[v]",
"-an", # No audio for grid layout
"-y", # Overwrite output file if it exists
temp_file
])
# Run FFmpeg
self._run_ffmpeg(args)
# Get info for the new video
info = self._run_ffprobe(["-i", temp_file])
# Create a new video dictionary
new_video = {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0,
"end": None
}
return new_video
[docs]
def reverse(self, video: Dict[str, Any]) -> Dict[str, Any]:
"""
Reverse video playback.
Args:
video: Video dictionary
Returns:
New video dictionary
"""
self._ensure_temp_dir()
temp_file = os.path.join(self._temp_dir, f"reverse_{os.path.basename(video['path'])}")
# Use reverse filter for both video and audio
filter_complex = []
filter_complex.append("[0:v]reverse[vout]")
filter_complex.append("[0:a]areverse[aout]")
args = ["-i", video["path"]]
if video.get("start", 0) > 0:
args = ["-ss", str(video["start"])] + args
if video.get("end") is not None:
duration = video["end"] - video.get("start", 0)
args += ["-t", str(duration)]
args += [
"-filter_complex", ";".join(filter_complex),
"-map", "[vout]",
"-map", "[aout]",
"-y",
temp_file
]
self._run_ffmpeg(args)
info = self._run_ffprobe(["-i", temp_file])
return {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0,
"end": None
}
[docs]
def replace_audio(self, video: Dict[str, Any], audio_path: Union[str, Path]) -> Dict[str, Any]:
"""
Replace audio track with new audio.
Args:
video: Video dictionary
audio_path: Path to new audio file
Returns:
New video dictionary
"""
self._ensure_temp_dir()
temp_file = os.path.join(self._temp_dir, f"replace_audio_{os.path.basename(video['path'])}")
args = ["-i", video["path"], "-i", str(audio_path)]
if video.get("start", 0) > 0:
args = ["-ss", str(video["start"])] + args
if video.get("end") is not None:
duration = video["end"] - video.get("start", 0)
args += ["-t", str(duration)]
# Map video from first input, audio from second input
args += [
"-map", "0:v",
"-map", "1:a",
"-c:v", "copy",
"-c:a", "aac",
"-shortest",
"-y",
temp_file
]
self._run_ffmpeg(args)
info = self._run_ffprobe(["-i", temp_file])
return {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0,
"end": None
}
[docs]
def export_gif(self, video: Dict[str, Any], output_path: Union[str, Path],
fps: Optional[float] = None, quality: int = 95,
loop: int = 0, width: Optional[int] = None) -> None:
"""
Export video as animated GIF.
Args:
video: Video dictionary
output_path: Output GIF path
fps: Frame rate for GIF (None = use original)
quality: Quality (1-100, higher = better)
loop: Loop count (0 = infinite)
width: Resize width (None = keep original)
"""
output_path = str(output_path)
info = self.get_info(video)
original_fps = info.get("fps", 30)
target_fps = fps if fps is not None else min(original_fps, 15)
# For high quality GIF, we use a two-pass approach with palette generation
self._ensure_temp_dir()
palette_file = os.path.join(self._temp_dir, "palette.png")
# First pass: generate palette
filter_parts = []
if width is not None:
filter_parts.append(f"scale={width}:-1")
filter_parts.append(f"fps={target_fps}")
if filter_parts:
vf1 = ",".join(filter_parts)
else:
vf1 = f"fps={target_fps}"
args1 = ["-i", video["path"]]
if video.get("start", 0) > 0:
args1 = ["-ss", str(video["start"])] + args1
if video.get("end") is not None:
duration = video["end"] - video.get("start", 0)
args1 += ["-t", str(duration)]
args1 += [
"-vf", f"{vf1},palettegen=max_colors=256:reserve_transparent=1",
"-y",
palette_file
]
self._run_ffmpeg(args1)
# Second pass: create GIF using palette
filter_parts2 = filter_parts + [f"fps={target_fps}"]
if filter_parts2:
vf2 = ",".join(filter_parts2 + ["paletteuse"])
else:
vf2 = f"fps={target_fps},paletteuse"
args2 = ["-i", video["path"], "-i", palette_file]
if video.get("start", 0) > 0:
args2 = ["-ss", str(video["start"])] + args2
if video.get("end") is not None:
duration = video["end"] - video.get("start", 0)
args2 += ["-t", str(duration)]
args2 += [
"-filter_complex", f"[0:v]{vf2}[out]",
"-map", "[out]",
"-loop", str(loop),
"-y",
output_path
]
self._run_ffmpeg(args2)
[docs]
def export_frames(self, video: Dict[str, Any], output_dir: Union[str, Path],
prefix: str = "frame_", format: str = "png",
start: Optional[float] = None, end: Optional[float] = None,
fps: Optional[float] = None) -> List[Path]:
"""
Export video frames as images.
Args:
video: Video dictionary
output_dir: Output directory path
prefix: Filename prefix
format: Image format (png, jpg, etc.)
start: Start time in seconds (None = from beginning)
end: End time in seconds (None = to end)
fps: Frame rate for extraction (None = use original fps)
Returns:
List of exported frame paths
"""
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
info = self.get_info(video)
original_fps = info.get("fps", 30)
target_fps = fps if fps is not None else original_fps
# Calculate start and end times
abs_start = video.get("start", 0)
if start is not None:
abs_start += start
if end is not None:
abs_end = video.get("start", 0) + end
elif video.get("end") is not None:
abs_end = video["end"]
else:
abs_end = info.get("duration", 10.0)
duration = abs_end - abs_start
# Build FFmpeg args for frame extraction
args = [
"-ss", str(abs_start),
"-i", video["path"],
"-t", str(duration),
"-vf", f"fps={target_fps}",
"-y",
str(output_dir / f"{prefix}%04d.{format}")
]
self._run_ffmpeg(args)
# Get list of exported frames
frames = sorted(output_dir.glob(f"{prefix}*.{format}"))
return frames