"""
FFmpeg filter operations.
This module provides implementation for various video filters with caching.
"""
import os
import hashlib
from typing import Dict, Any, Tuple, Optional, Union, List
import logging
logger = logging.getLogger(__name__)
[docs]
class FFmpegFilters:
"""FFmpeg filters implementation with caching mechanism."""
def _generate_filter_cache_key(self, video: Dict[str, Any],
filter_name: str,
params: Dict[str, Any]) -> str:
"""
Generate a cache key for filter operations.
Args:
video: Video dictionary
filter_name: Name of the filter
params: Filter parameters
Returns:
Cache key string
"""
# Create a string with all parameters
param_str = f"path={video['path']},filter={filter_name}"
if "start" in video:
param_str += f",start={video['start']}"
if "end" in video and video["end"] is not None:
param_str += f",end={video['end']}"
# Add all filter parameters
for key, value in sorted(params.items()):
param_str += f",{key}={value}"
# Create a hash of the parameters
return hashlib.md5(param_str.encode()).hexdigest()
[docs]
def grayscale(self, video: Dict[str, Any]) -> Dict[str, Any]:
"""
Convert video to grayscale.
Args:
video: Video dictionary
Returns:
New video dictionary
"""
return self.apply_filter(video, "hue=s=0")
[docs]
def apply_filter(self, video: Dict[str, Any], filter_string: str) -> Dict[str, Any]:
"""
Apply a custom FFmpeg filter with caching.
Args:
video: Video dictionary
filter_string: FFmpeg filter string
Returns:
New video dictionary
"""
# Generate cache key
cache_key = self._generate_filter_cache_key(
video, "custom", {"filter": filter_string}
)
# Check cache
if cache_key in self._filter_cache:
logger.debug(f"Using cached filter: {cache_key}")
# Create a new video dictionary with the cached file
temp_file = self._filter_cache[cache_key]
info = self._run_ffprobe(["-i", temp_file])
return {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0,
"end": None
}
# Create a new temporary file
self._ensure_temp_dir()
temp_file = os.path.join(self._temp_dir, f"filter_{os.path.basename(video['path'])}")
# Build FFmpeg arguments
# Start with input options
args = []
if video.get("start", 0) > 0:
args.extend(["-ss", str(video["start"])])
# Add input and filters
args.extend([
"-i", video["path"],
"-vf", filter_string,
"-c:a", "copy" # Copy audio stream
])
# Add duration (output option, must come before output file)
if video.get("end") is not None:
duration = video["end"] - video.get("start", 0)
args.extend(["-t", str(duration)])
# Add output options
args.extend(["-y", temp_file])
# Run FFmpeg
self._run_ffmpeg(args)
# Get info for the new video
info = self._run_ffprobe(["-i", temp_file])
# Cache the result for future use
self._filter_cache[cache_key] = temp_file
logger.debug(f"Cached filter: {cache_key}")
# Create a new video dictionary
new_video = {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0, # Reset start time since we've applied it
"end": None # Reset end time since we've applied it
}
return new_video
[docs]
def blur(self, video: Dict[str, Any], radius: float) -> Dict[str, Any]:
"""
Apply Gaussian blur with specified radius.
Args:
video: Video dictionary
radius: Blur radius (1.0 = slight blur, 10.0 = heavy blur)
Returns:
New video dictionary
"""
# Generate cache key
cache_key = self._generate_filter_cache_key(
video, "blur", {"radius": radius}
)
# Check cache
if cache_key in self._filter_cache:
logger.debug(f"Using cached blur: {cache_key}")
temp_file = self._filter_cache[cache_key]
info = self._run_ffprobe(["-i", temp_file])
return {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0,
"end": None
}
# Create filter string - use boxblur for better performance
filter_string = f"boxblur={min(20, max(1, int(radius*2)))}:1"
# Apply the filter
result = self.apply_filter(video, filter_string)
# Cache the result
self._filter_cache[cache_key] = result["path"]
return result
[docs]
def brightness(self, video: Dict[str, Any], factor: float) -> Dict[str, Any]:
"""
Adjust video brightness.
Args:
video: Video dictionary
factor: Brightness factor (1.0 = original, 1.5 = +50%, 0.5 = -50%)
Returns:
New video dictionary
"""
# Generate cache key
cache_key = self._generate_filter_cache_key(
video, "brightness", {"factor": factor}
)
# Check cache
if cache_key in self._filter_cache:
temp_file = self._filter_cache[cache_key]
info = self._run_ffprobe(["-i", temp_file])
return {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0,
"end": None
}
# Create filter string
filter_string = f"eq=brightness={factor-1}"
# Apply the filter
result = self.apply_filter(video, filter_string)
# Cache the result
self._filter_cache[cache_key] = result["path"]
return result
[docs]
def contrast(self, video: Dict[str, Any], factor: float) -> Dict[str, Any]:
"""
Adjust video contrast.
Args:
video: Video dictionary
factor: Contrast factor (1.0 = original, 1.5 = +50%, 0.5 = -50%)
Returns:
New video dictionary
"""
# Generate cache key
cache_key = self._generate_filter_cache_key(
video, "contrast", {"factor": factor}
)
# Check cache
if cache_key in self._filter_cache:
temp_file = self._filter_cache[cache_key]
info = self._run_ffprobe(["-i", temp_file])
return {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0,
"end": None
}
# Create filter string
filter_string = f"eq=contrast={factor}"
# Apply the filter
result = self.apply_filter(video, filter_string)
# Cache the result
self._filter_cache[cache_key] = result["path"]
return result
[docs]
def saturation(self, video: Dict[str, Any], factor: float) -> Dict[str, Any]:
"""
Adjust video saturation.
Args:
video: Video dictionary
factor: Saturation factor (1.0 = original, 1.5 = +50%, 0.5 = -50%, 0 = grayscale)
Returns:
New video dictionary
"""
# Generate cache key
cache_key = self._generate_filter_cache_key(
video, "saturation", {"factor": factor}
)
# Check cache
if cache_key in self._filter_cache:
temp_file = self._filter_cache[cache_key]
info = self._run_ffprobe(["-i", temp_file])
return {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0,
"end": None
}
# Create filter string
filter_string = f"eq=saturation={factor}"
# Apply the filter
result = self.apply_filter(video, filter_string)
# Cache the result
self._filter_cache[cache_key] = result["path"]
return result
[docs]
def batch_filters(self, video: Dict[str, Any],
filters: List[Tuple[str, Dict[str, Any]]]) -> Dict[str, Any]:
"""
Apply multiple filters in a single FFmpeg operation.
Args:
video: Video dictionary
filters: List of (filter_name, params) tuples
Returns:
New video dictionary
"""
if not filters:
return video
# Generate a complex cache key for all filters
cache_key = "batch_"
for filter_name, params in filters:
filter_key = self._generate_filter_cache_key(
video, filter_name, params
)
cache_key += filter_key[:8] + "_"
# Check cache
if cache_key in self._filter_cache:
logger.debug(f"Using cached batch filter: {cache_key}")
temp_file = self._filter_cache[cache_key]
info = self._run_ffprobe(["-i", temp_file])
return {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0,
"end": None
}
# Build filter string
filter_parts = []
for filter_name, params in filters:
if filter_name == "blur":
radius = params.get("radius", 1.0)
filter_parts.append(f"boxblur={min(20, max(1, int(radius*2)))}:1")
elif filter_name == "brightness":
factor = params.get("factor", 1.0)
filter_parts.append(f"eq=brightness={factor-1}")
elif filter_name == "contrast":
factor = params.get("factor", 1.0)
filter_parts.append(f"eq=contrast={factor}")
elif filter_name == "saturation":
factor = params.get("factor", 1.0)
filter_parts.append(f"eq=saturation={factor}")
elif filter_name == "custom":
filter_parts.append(params.get("filter", ""))
# Join filter parts with commas
filter_string = ",".join(filter_parts)
# Create a new temporary file
self._ensure_temp_dir()
temp_file = os.path.join(self._temp_dir, f"batch_filter_{os.path.basename(video['path'])}")
# Build FFmpeg arguments
args = [
"-i", video["path"],
"-vf", filter_string,
"-c:a", "copy", # Copy audio stream
"-y", # Overwrite output file if it exists
temp_file
]
# Add start and end times if specified
if video.get("start", 0) > 0:
args = ["-ss", str(video["start"])] + args
if video.get("end") is not None:
duration = video["end"] - video.get("start", 0)
args += ["-t", str(duration)]
# Run FFmpeg
self._run_ffmpeg(args)
# Get info for the new video
info = self._run_ffprobe(["-i", temp_file])
# Cache the result for future use
self._filter_cache[cache_key] = temp_file
logger.debug(f"Cached batch filter: {cache_key}")
# Create a new video dictionary
new_video = {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0, # Reset start time since we've applied it
"end": None # Reset end time since we've applied it
}
return new_video
[docs]
def speed(self, video: Dict[str, Any], factor: float) -> Dict[str, Any]:
"""
Change video playback speed.
Args:
video: Video dictionary
factor: Speed factor (2.0 = 2x faster, 0.5 = half speed)
Returns:
New video dictionary
"""
cache_key = self._generate_filter_cache_key(
video, "speed", {"factor": factor}
)
if cache_key in self._filter_cache:
temp_file = self._filter_cache[cache_key]
info = self._run_ffprobe(["-i", temp_file])
return {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0,
"end": None
}
# For speed change, we need to adjust both video and audio
filter_complex = []
filter_complex.append(f"[0:v]setpts=PTS/{factor}[vout]")
filter_complex.append(f"[0:a]atempo=1/{factor}[aout]")
self._ensure_temp_dir()
temp_file = os.path.join(self._temp_dir, f"speed_{factor}_{os.path.basename(video['path'])}")
args = ["-i", video["path"]]
if video.get("start", 0) > 0:
args = ["-ss", str(video["start"])] + args
if video.get("end") is not None:
duration = video["end"] - video.get("start", 0)
args += ["-t", str(duration)]
args += [
"-filter_complex", ";".join(filter_complex),
"-map", "[vout]",
"-map", "[aout]",
"-y",
temp_file
]
self._run_ffmpeg(args)
info = self._run_ffprobe(["-i", temp_file])
self._filter_cache[cache_key] = temp_file
return {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0,
"end": None
}
[docs]
def fade_in(self, video: Dict[str, Any], duration: float) -> Dict[str, Any]:
"""
Fade video in from black.
Args:
video: Video dictionary
duration: Fade duration in seconds
Returns:
New video dictionary
"""
cache_key = self._generate_filter_cache_key(
video, "fade_in", {"duration": duration}
)
if cache_key in self._filter_cache:
temp_file = self._filter_cache[cache_key]
info = self._run_ffprobe(["-i", temp_file])
return {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0,
"end": None
}
info = self.get_info(video)
fps = info.get("fps", 30)
fade_frames = int(duration * fps)
filter_string = f"fade=t=in:st=0:d={fade_frames}:color=black"
result = self.apply_filter(video, filter_string)
self._filter_cache[cache_key] = result["path"]
return result
[docs]
def fade_out(self, video: Dict[str, Any], duration: float) -> Dict[str, Any]:
"""
Fade video out to black.
Args:
video: Video dictionary
duration: Fade duration in seconds
Returns:
New video dictionary
"""
cache_key = self._generate_filter_cache_key(
video, "fade_out", {"duration": duration}
)
if cache_key in self._filter_cache:
temp_file = self._filter_cache[cache_key]
info = self._run_ffprobe(["-i", temp_file])
return {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0,
"end": None
}
info = self.get_info(video)
video_duration = info.get("duration", 10.0)
fps = info.get("fps", 30)
start_time = max(0, video_duration - duration)
fade_frames = int(duration * fps)
filter_string = f"fade=t=out:st={start_time}:d={fade_frames}:color=black"
result = self.apply_filter(video, filter_string)
self._filter_cache[cache_key] = result["path"]
return result
[docs]
def fade_audio_in(self, video: Dict[str, Any], duration: float) -> Dict[str, Any]:
"""
Fade audio in.
Args:
video: Video dictionary
duration: Fade duration in seconds
Returns:
New video dictionary
"""
cache_key = self._generate_filter_cache_key(
video, "fade_audio_in", {"duration": duration}
)
if cache_key in self._filter_cache:
temp_file = self._filter_cache[cache_key]
info = self._run_ffprobe(["-i", temp_file])
return {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0,
"end": None
}
self._ensure_temp_dir()
temp_file = os.path.join(self._temp_dir, f"fade_audio_in_{duration}_{os.path.basename(video['path'])}")
filter_string = f"afade=t=in:st=0:d={duration}"
args = ["-i", video["path"]]
if video.get("start", 0) > 0:
args = ["-ss", str(video["start"])] + args
if video.get("end") is not None:
dur = video["end"] - video.get("start", 0)
args += ["-t", str(dur)]
args += [
"-af", filter_string,
"-c:v", "copy",
"-y",
temp_file
]
self._run_ffmpeg(args)
info = self._run_ffprobe(["-i", temp_file])
self._filter_cache[cache_key] = temp_file
return {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0,
"end": None
}
[docs]
def fade_audio_out(self, video: Dict[str, Any], duration: float) -> Dict[str, Any]:
"""
Fade audio out.
Args:
video: Video dictionary
duration: Fade duration in seconds
Returns:
New video dictionary
"""
cache_key = self._generate_filter_cache_key(
video, "fade_audio_out", {"duration": duration}
)
if cache_key in self._filter_cache:
temp_file = self._filter_cache[cache_key]
info = self._run_ffprobe(["-i", temp_file])
return {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0,
"end": None
}
info = self.get_info(video)
video_duration = info.get("duration", 10.0)
start_time = max(0, video_duration - duration)
self._ensure_temp_dir()
temp_file = os.path.join(self._temp_dir, f"fade_audio_out_{duration}_{os.path.basename(video['path'])}")
filter_string = f"afade=t=out:st={start_time}:d={duration}"
args = ["-i", video["path"]]
if video.get("start", 0) > 0:
args = ["-ss", str(video["start"])] + args
if video.get("end") is not None:
dur = video["end"] - video.get("start", 0)
args += ["-t", str(dur)]
args += [
"-af", filter_string,
"-c:v", "copy",
"-y",
temp_file
]
self._run_ffmpeg(args)
info = self._run_ffprobe(["-i", temp_file])
self._filter_cache[cache_key] = temp_file
return {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0,
"end": None
}
[docs]
def chroma_key(self, video: Dict[str, Any], color: Tuple[int, int, int],
similarity: float = 0.1, blend: float = 0.0) -> Dict[str, Any]:
"""
Remove green screen / chroma key.
Args:
video: Video dictionary
color: RGB color to key out (e.g., (0, 255, 0) for green)
similarity: Color similarity threshold (0.01 to 1.0)
blend: Blend factor for semi-transparent edges (0.0 to 1.0)
Returns:
New video dictionary
"""
cache_key = self._generate_filter_cache_key(
video, "chroma_key", {"color": color, "similarity": similarity, "blend": blend}
)
if cache_key in self._filter_cache:
temp_file = self._filter_cache[cache_key]
info = self._run_ffprobe(["-i", temp_file])
return {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0,
"end": None
}
# Convert RGB to hex for chromakey filter
color_hex = f"{color[0]:02x}{color[1]:02x}{color[2]:02x}"
filter_string = f"chromakey={color_hex}:{similarity}:{blend}"
result = self.apply_filter(video, filter_string)
self._filter_cache[cache_key] = result["path"]
return result
[docs]
def normalize_audio(self, video: Dict[str, Any], target_db: float = -16.0) -> Dict[str, Any]:
"""
Normalize audio to target dB level.
Args:
video: Video dictionary
target_db: Target loudness in dB (typically -16 to -20)
Returns:
New video dictionary
"""
cache_key = self._generate_filter_cache_key(
video, "normalize_audio", {"target_db": target_db}
)
if cache_key in self._filter_cache:
temp_file = self._filter_cache[cache_key]
info = self._run_ffprobe(["-i", temp_file])
return {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0,
"end": None
}
self._ensure_temp_dir()
temp_file = os.path.join(self._temp_dir, f"normalize_audio_{target_db}_{os.path.basename(video['path'])}")
# Use loudnorm filter for EBU R128 normalization
filter_string = f"loudnorm=I={target_db}:TP=-1.5:LRA=11"
args = ["-i", video["path"]]
if video.get("start", 0) > 0:
args = ["-ss", str(video["start"])] + args
if video.get("end") is not None:
dur = video["end"] - video.get("start", 0)
args += ["-t", str(dur)]
args += [
"-af", filter_string,
"-c:v", "copy",
"-y",
temp_file
]
self._run_ffmpeg(args)
info = self._run_ffprobe(["-i", temp_file])
self._filter_cache[cache_key] = temp_file
return {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0,
"end": None
}
[docs]
def crossfade(self, video1: Dict[str, Any], video2: Dict[str, Any],
duration: float) -> Dict[str, Any]:
"""
Crossfade between two videos.
Args:
video1: First video dictionary
video2: Second video dictionary
duration: Crossfade duration in seconds
Returns:
New video dictionary
"""
if not video1 or not video2:
raise ValueError("Both videos required for crossfade")
info1 = self.get_info(video1)
info2 = self.get_info(video2)
fps = info1.get("fps", 30)
self._ensure_temp_dir()
temp_file = os.path.join(self._temp_dir, f"crossfade_{duration}_{os.path.basename(video1['path'])}")
# Calculate offset for xfade filter
# First video plays, then crossfade starts at (duration1 - fade_duration)
duration1 = info1.get("duration", 5.0)
offset = duration1 - duration
# xfade transition
# Available transitions: fade, wipeleft, slideright, etc.
filter_string = f"[0:v][1:v]xfade=transition=fade:duration={duration}:offset={offset}[vout]"
args = ["-i", video1["path"], "-i", video2["path"]]
args += [
"-filter_complex", filter_string,
"-map", "[vout]",
"-y",
temp_file
]
self._run_ffmpeg(args)
info = self._run_ffprobe(["-i", temp_file])
return {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0,
"end": None
}