Source code for fmusvid.backends.ffmpeg.filters

"""
FFmpeg filter operations.

This module provides implementation for various video filters with caching.
"""

import os
import hashlib
from typing import Dict, Any, Tuple, Optional, Union, List
import logging

logger = logging.getLogger(__name__)

[docs] class FFmpegFilters: """FFmpeg filters implementation with caching mechanism.""" def _generate_filter_cache_key(self, video: Dict[str, Any], filter_name: str, params: Dict[str, Any]) -> str: """ Generate a cache key for filter operations. Args: video: Video dictionary filter_name: Name of the filter params: Filter parameters Returns: Cache key string """ # Create a string with all parameters param_str = f"path={video['path']},filter={filter_name}" if "start" in video: param_str += f",start={video['start']}" if "end" in video and video["end"] is not None: param_str += f",end={video['end']}" # Add all filter parameters for key, value in sorted(params.items()): param_str += f",{key}={value}" # Create a hash of the parameters return hashlib.md5(param_str.encode()).hexdigest()
[docs] def grayscale(self, video: Dict[str, Any]) -> Dict[str, Any]: """ Convert video to grayscale. Args: video: Video dictionary Returns: New video dictionary """ return self.apply_filter(video, "hue=s=0")
[docs] def apply_filter(self, video: Dict[str, Any], filter_string: str) -> Dict[str, Any]: """ Apply a custom FFmpeg filter with caching. Args: video: Video dictionary filter_string: FFmpeg filter string Returns: New video dictionary """ # Generate cache key cache_key = self._generate_filter_cache_key( video, "custom", {"filter": filter_string} ) # Check cache if cache_key in self._filter_cache: logger.debug(f"Using cached filter: {cache_key}") # Create a new video dictionary with the cached file temp_file = self._filter_cache[cache_key] info = self._run_ffprobe(["-i", temp_file]) return { "path": temp_file, "info": info, "is_temp": True, "start": 0, "end": None } # Create a new temporary file self._ensure_temp_dir() temp_file = os.path.join(self._temp_dir, f"filter_{os.path.basename(video['path'])}") # Build FFmpeg arguments # Start with input options args = [] if video.get("start", 0) > 0: args.extend(["-ss", str(video["start"])]) # Add input and filters args.extend([ "-i", video["path"], "-vf", filter_string, "-c:a", "copy" # Copy audio stream ]) # Add duration (output option, must come before output file) if video.get("end") is not None: duration = video["end"] - video.get("start", 0) args.extend(["-t", str(duration)]) # Add output options args.extend(["-y", temp_file]) # Run FFmpeg self._run_ffmpeg(args) # Get info for the new video info = self._run_ffprobe(["-i", temp_file]) # Cache the result for future use self._filter_cache[cache_key] = temp_file logger.debug(f"Cached filter: {cache_key}") # Create a new video dictionary new_video = { "path": temp_file, "info": info, "is_temp": True, "start": 0, # Reset start time since we've applied it "end": None # Reset end time since we've applied it } return new_video
[docs] def blur(self, video: Dict[str, Any], radius: float) -> Dict[str, Any]: """ Apply Gaussian blur with specified radius. Args: video: Video dictionary radius: Blur radius (1.0 = slight blur, 10.0 = heavy blur) Returns: New video dictionary """ # Generate cache key cache_key = self._generate_filter_cache_key( video, "blur", {"radius": radius} ) # Check cache if cache_key in self._filter_cache: logger.debug(f"Using cached blur: {cache_key}") temp_file = self._filter_cache[cache_key] info = self._run_ffprobe(["-i", temp_file]) return { "path": temp_file, "info": info, "is_temp": True, "start": 0, "end": None } # Create filter string - use boxblur for better performance filter_string = f"boxblur={min(20, max(1, int(radius*2)))}:1" # Apply the filter result = self.apply_filter(video, filter_string) # Cache the result self._filter_cache[cache_key] = result["path"] return result
[docs] def brightness(self, video: Dict[str, Any], factor: float) -> Dict[str, Any]: """ Adjust video brightness. Args: video: Video dictionary factor: Brightness factor (1.0 = original, 1.5 = +50%, 0.5 = -50%) Returns: New video dictionary """ # Generate cache key cache_key = self._generate_filter_cache_key( video, "brightness", {"factor": factor} ) # Check cache if cache_key in self._filter_cache: temp_file = self._filter_cache[cache_key] info = self._run_ffprobe(["-i", temp_file]) return { "path": temp_file, "info": info, "is_temp": True, "start": 0, "end": None } # Create filter string filter_string = f"eq=brightness={factor-1}" # Apply the filter result = self.apply_filter(video, filter_string) # Cache the result self._filter_cache[cache_key] = result["path"] return result
[docs] def contrast(self, video: Dict[str, Any], factor: float) -> Dict[str, Any]: """ Adjust video contrast. Args: video: Video dictionary factor: Contrast factor (1.0 = original, 1.5 = +50%, 0.5 = -50%) Returns: New video dictionary """ # Generate cache key cache_key = self._generate_filter_cache_key( video, "contrast", {"factor": factor} ) # Check cache if cache_key in self._filter_cache: temp_file = self._filter_cache[cache_key] info = self._run_ffprobe(["-i", temp_file]) return { "path": temp_file, "info": info, "is_temp": True, "start": 0, "end": None } # Create filter string filter_string = f"eq=contrast={factor}" # Apply the filter result = self.apply_filter(video, filter_string) # Cache the result self._filter_cache[cache_key] = result["path"] return result
[docs] def saturation(self, video: Dict[str, Any], factor: float) -> Dict[str, Any]: """ Adjust video saturation. Args: video: Video dictionary factor: Saturation factor (1.0 = original, 1.5 = +50%, 0.5 = -50%, 0 = grayscale) Returns: New video dictionary """ # Generate cache key cache_key = self._generate_filter_cache_key( video, "saturation", {"factor": factor} ) # Check cache if cache_key in self._filter_cache: temp_file = self._filter_cache[cache_key] info = self._run_ffprobe(["-i", temp_file]) return { "path": temp_file, "info": info, "is_temp": True, "start": 0, "end": None } # Create filter string filter_string = f"eq=saturation={factor}" # Apply the filter result = self.apply_filter(video, filter_string) # Cache the result self._filter_cache[cache_key] = result["path"] return result
[docs] def batch_filters(self, video: Dict[str, Any], filters: List[Tuple[str, Dict[str, Any]]]) -> Dict[str, Any]: """ Apply multiple filters in a single FFmpeg operation. Args: video: Video dictionary filters: List of (filter_name, params) tuples Returns: New video dictionary """ if not filters: return video # Generate a complex cache key for all filters cache_key = "batch_" for filter_name, params in filters: filter_key = self._generate_filter_cache_key( video, filter_name, params ) cache_key += filter_key[:8] + "_" # Check cache if cache_key in self._filter_cache: logger.debug(f"Using cached batch filter: {cache_key}") temp_file = self._filter_cache[cache_key] info = self._run_ffprobe(["-i", temp_file]) return { "path": temp_file, "info": info, "is_temp": True, "start": 0, "end": None } # Build filter string filter_parts = [] for filter_name, params in filters: if filter_name == "blur": radius = params.get("radius", 1.0) filter_parts.append(f"boxblur={min(20, max(1, int(radius*2)))}:1") elif filter_name == "brightness": factor = params.get("factor", 1.0) filter_parts.append(f"eq=brightness={factor-1}") elif filter_name == "contrast": factor = params.get("factor", 1.0) filter_parts.append(f"eq=contrast={factor}") elif filter_name == "saturation": factor = params.get("factor", 1.0) filter_parts.append(f"eq=saturation={factor}") elif filter_name == "custom": filter_parts.append(params.get("filter", "")) # Join filter parts with commas filter_string = ",".join(filter_parts) # Create a new temporary file self._ensure_temp_dir() temp_file = os.path.join(self._temp_dir, f"batch_filter_{os.path.basename(video['path'])}") # Build FFmpeg arguments args = [ "-i", video["path"], "-vf", filter_string, "-c:a", "copy", # Copy audio stream "-y", # Overwrite output file if it exists temp_file ] # Add start and end times if specified if video.get("start", 0) > 0: args = ["-ss", str(video["start"])] + args if video.get("end") is not None: duration = video["end"] - video.get("start", 0) args += ["-t", str(duration)] # Run FFmpeg self._run_ffmpeg(args) # Get info for the new video info = self._run_ffprobe(["-i", temp_file]) # Cache the result for future use self._filter_cache[cache_key] = temp_file logger.debug(f"Cached batch filter: {cache_key}") # Create a new video dictionary new_video = { "path": temp_file, "info": info, "is_temp": True, "start": 0, # Reset start time since we've applied it "end": None # Reset end time since we've applied it } return new_video
[docs] def speed(self, video: Dict[str, Any], factor: float) -> Dict[str, Any]: """ Change video playback speed. Args: video: Video dictionary factor: Speed factor (2.0 = 2x faster, 0.5 = half speed) Returns: New video dictionary """ cache_key = self._generate_filter_cache_key( video, "speed", {"factor": factor} ) if cache_key in self._filter_cache: temp_file = self._filter_cache[cache_key] info = self._run_ffprobe(["-i", temp_file]) return { "path": temp_file, "info": info, "is_temp": True, "start": 0, "end": None } # For speed change, we need to adjust both video and audio filter_complex = [] filter_complex.append(f"[0:v]setpts=PTS/{factor}[vout]") filter_complex.append(f"[0:a]atempo=1/{factor}[aout]") self._ensure_temp_dir() temp_file = os.path.join(self._temp_dir, f"speed_{factor}_{os.path.basename(video['path'])}") args = ["-i", video["path"]] if video.get("start", 0) > 0: args = ["-ss", str(video["start"])] + args if video.get("end") is not None: duration = video["end"] - video.get("start", 0) args += ["-t", str(duration)] args += [ "-filter_complex", ";".join(filter_complex), "-map", "[vout]", "-map", "[aout]", "-y", temp_file ] self._run_ffmpeg(args) info = self._run_ffprobe(["-i", temp_file]) self._filter_cache[cache_key] = temp_file return { "path": temp_file, "info": info, "is_temp": True, "start": 0, "end": None }
[docs] def fade_in(self, video: Dict[str, Any], duration: float) -> Dict[str, Any]: """ Fade video in from black. Args: video: Video dictionary duration: Fade duration in seconds Returns: New video dictionary """ cache_key = self._generate_filter_cache_key( video, "fade_in", {"duration": duration} ) if cache_key in self._filter_cache: temp_file = self._filter_cache[cache_key] info = self._run_ffprobe(["-i", temp_file]) return { "path": temp_file, "info": info, "is_temp": True, "start": 0, "end": None } info = self.get_info(video) fps = info.get("fps", 30) fade_frames = int(duration * fps) filter_string = f"fade=t=in:st=0:d={fade_frames}:color=black" result = self.apply_filter(video, filter_string) self._filter_cache[cache_key] = result["path"] return result
[docs] def fade_out(self, video: Dict[str, Any], duration: float) -> Dict[str, Any]: """ Fade video out to black. Args: video: Video dictionary duration: Fade duration in seconds Returns: New video dictionary """ cache_key = self._generate_filter_cache_key( video, "fade_out", {"duration": duration} ) if cache_key in self._filter_cache: temp_file = self._filter_cache[cache_key] info = self._run_ffprobe(["-i", temp_file]) return { "path": temp_file, "info": info, "is_temp": True, "start": 0, "end": None } info = self.get_info(video) video_duration = info.get("duration", 10.0) fps = info.get("fps", 30) start_time = max(0, video_duration - duration) fade_frames = int(duration * fps) filter_string = f"fade=t=out:st={start_time}:d={fade_frames}:color=black" result = self.apply_filter(video, filter_string) self._filter_cache[cache_key] = result["path"] return result
[docs] def fade_audio_in(self, video: Dict[str, Any], duration: float) -> Dict[str, Any]: """ Fade audio in. Args: video: Video dictionary duration: Fade duration in seconds Returns: New video dictionary """ cache_key = self._generate_filter_cache_key( video, "fade_audio_in", {"duration": duration} ) if cache_key in self._filter_cache: temp_file = self._filter_cache[cache_key] info = self._run_ffprobe(["-i", temp_file]) return { "path": temp_file, "info": info, "is_temp": True, "start": 0, "end": None } self._ensure_temp_dir() temp_file = os.path.join(self._temp_dir, f"fade_audio_in_{duration}_{os.path.basename(video['path'])}") filter_string = f"afade=t=in:st=0:d={duration}" args = ["-i", video["path"]] if video.get("start", 0) > 0: args = ["-ss", str(video["start"])] + args if video.get("end") is not None: dur = video["end"] - video.get("start", 0) args += ["-t", str(dur)] args += [ "-af", filter_string, "-c:v", "copy", "-y", temp_file ] self._run_ffmpeg(args) info = self._run_ffprobe(["-i", temp_file]) self._filter_cache[cache_key] = temp_file return { "path": temp_file, "info": info, "is_temp": True, "start": 0, "end": None }
[docs] def fade_audio_out(self, video: Dict[str, Any], duration: float) -> Dict[str, Any]: """ Fade audio out. Args: video: Video dictionary duration: Fade duration in seconds Returns: New video dictionary """ cache_key = self._generate_filter_cache_key( video, "fade_audio_out", {"duration": duration} ) if cache_key in self._filter_cache: temp_file = self._filter_cache[cache_key] info = self._run_ffprobe(["-i", temp_file]) return { "path": temp_file, "info": info, "is_temp": True, "start": 0, "end": None } info = self.get_info(video) video_duration = info.get("duration", 10.0) start_time = max(0, video_duration - duration) self._ensure_temp_dir() temp_file = os.path.join(self._temp_dir, f"fade_audio_out_{duration}_{os.path.basename(video['path'])}") filter_string = f"afade=t=out:st={start_time}:d={duration}" args = ["-i", video["path"]] if video.get("start", 0) > 0: args = ["-ss", str(video["start"])] + args if video.get("end") is not None: dur = video["end"] - video.get("start", 0) args += ["-t", str(dur)] args += [ "-af", filter_string, "-c:v", "copy", "-y", temp_file ] self._run_ffmpeg(args) info = self._run_ffprobe(["-i", temp_file]) self._filter_cache[cache_key] = temp_file return { "path": temp_file, "info": info, "is_temp": True, "start": 0, "end": None }
[docs] def chroma_key(self, video: Dict[str, Any], color: Tuple[int, int, int], similarity: float = 0.1, blend: float = 0.0) -> Dict[str, Any]: """ Remove green screen / chroma key. Args: video: Video dictionary color: RGB color to key out (e.g., (0, 255, 0) for green) similarity: Color similarity threshold (0.01 to 1.0) blend: Blend factor for semi-transparent edges (0.0 to 1.0) Returns: New video dictionary """ cache_key = self._generate_filter_cache_key( video, "chroma_key", {"color": color, "similarity": similarity, "blend": blend} ) if cache_key in self._filter_cache: temp_file = self._filter_cache[cache_key] info = self._run_ffprobe(["-i", temp_file]) return { "path": temp_file, "info": info, "is_temp": True, "start": 0, "end": None } # Convert RGB to hex for chromakey filter color_hex = f"{color[0]:02x}{color[1]:02x}{color[2]:02x}" filter_string = f"chromakey={color_hex}:{similarity}:{blend}" result = self.apply_filter(video, filter_string) self._filter_cache[cache_key] = result["path"] return result
[docs] def normalize_audio(self, video: Dict[str, Any], target_db: float = -16.0) -> Dict[str, Any]: """ Normalize audio to target dB level. Args: video: Video dictionary target_db: Target loudness in dB (typically -16 to -20) Returns: New video dictionary """ cache_key = self._generate_filter_cache_key( video, "normalize_audio", {"target_db": target_db} ) if cache_key in self._filter_cache: temp_file = self._filter_cache[cache_key] info = self._run_ffprobe(["-i", temp_file]) return { "path": temp_file, "info": info, "is_temp": True, "start": 0, "end": None } self._ensure_temp_dir() temp_file = os.path.join(self._temp_dir, f"normalize_audio_{target_db}_{os.path.basename(video['path'])}") # Use loudnorm filter for EBU R128 normalization filter_string = f"loudnorm=I={target_db}:TP=-1.5:LRA=11" args = ["-i", video["path"]] if video.get("start", 0) > 0: args = ["-ss", str(video["start"])] + args if video.get("end") is not None: dur = video["end"] - video.get("start", 0) args += ["-t", str(dur)] args += [ "-af", filter_string, "-c:v", "copy", "-y", temp_file ] self._run_ffmpeg(args) info = self._run_ffprobe(["-i", temp_file]) self._filter_cache[cache_key] = temp_file return { "path": temp_file, "info": info, "is_temp": True, "start": 0, "end": None }
[docs] def crossfade(self, video1: Dict[str, Any], video2: Dict[str, Any], duration: float) -> Dict[str, Any]: """ Crossfade between two videos. Args: video1: First video dictionary video2: Second video dictionary duration: Crossfade duration in seconds Returns: New video dictionary """ if not video1 or not video2: raise ValueError("Both videos required for crossfade") info1 = self.get_info(video1) info2 = self.get_info(video2) fps = info1.get("fps", 30) self._ensure_temp_dir() temp_file = os.path.join(self._temp_dir, f"crossfade_{duration}_{os.path.basename(video1['path'])}") # Calculate offset for xfade filter # First video plays, then crossfade starts at (duration1 - fade_duration) duration1 = info1.get("duration", 5.0) offset = duration1 - duration # xfade transition # Available transitions: fade, wipeleft, slideright, etc. filter_string = f"[0:v][1:v]xfade=transition=fade:duration={duration}:offset={offset}[vout]" args = ["-i", video1["path"], "-i", video2["path"]] args += [ "-filter_complex", filter_string, "-map", "[vout]", "-y", temp_file ] self._run_ffmpeg(args) info = self._run_ffprobe(["-i", temp_file]) return { "path": temp_file, "info": info, "is_temp": True, "start": 0, "end": None }