Source code for fmusvid.backends.ffmpeg.overlay

"""
FFmpeg overlay functionality.

This module provides functionality for overlaying videos, images, and text.
"""

import os
import hashlib
from typing import Dict, Any, Tuple, Optional, Union, List
from pathlib import Path
import logging
from PIL import Image, ImageDraw, ImageFont

import numpy as np

logger = logging.getLogger(__name__)

[docs] class FFmpegOverlay: """FFmpeg overlay functionality with caching capabilities."""
[docs] def overlay_frame(self, frame_data: Any, overlay_data: Any, position: Tuple[int, int] = (0, 0), opacity: float = 1.0) -> Any: """ Overlay an image on the frame. Args: frame_data: Frame data (PIL Image) overlay_data: Overlay image (PIL Image) position: (x, y) coordinates for placement opacity: Opacity of overlay (1.0 = fully opaque) Returns: Combined PIL Image """ # Create a copy of the frame result = frame_data.copy() # If overlay has alpha channel, use it if overlay_data.mode == 'RGBA': overlay_with_alpha = overlay_data else: # Create an alpha channel based on opacity overlay_with_alpha = overlay_data.convert('RGBA') if opacity < 1.0: # Apply opacity to the alpha channel r, g, b, a = overlay_with_alpha.split() a = a.point(lambda i: i * opacity) overlay_with_alpha = Image.merge('RGBA', (r, g, b, a)) # Paste the overlay onto the frame result.paste(overlay_with_alpha, position, overlay_with_alpha) return result
[docs] def add_text_to_frame(self, frame_data: Any, text: str, position: Tuple[int, int], font: str = "Arial", size: int = 24, color: Union[str, Tuple[int, int, int]] = "white") -> Any: """ Add text to the frame. Args: frame_data: Frame data (PIL Image) text: Text to add position: (x, y) coordinates for placement font: Font name size: Font size color: Text color (name or RGB tuple) Returns: PIL Image with text """ # Create a copy of the frame result = frame_data.copy() # Create a drawing context draw = ImageDraw.Draw(result) # Try to load the font try: font_obj = ImageFont.truetype(font, size) except IOError: # Fall back to default font font_obj = ImageFont.load_default() # Draw the text draw.text(position, text, fill=color, font=font_obj) return result
def _generate_overlay_cache_key(self, overlay_video: Dict[str, Any], position: Tuple[int, int], start: float, duration: Optional[float], opacity: float) -> str: """ Generate a cache key for overlay operations. Args: overlay_video: Video to overlay position: (x, y) coordinates for placement start: Start time in seconds duration: Duration to display overlay opacity: Opacity of overlay Returns: Cache key string """ # Create a string with all parameters params = ( f"path={overlay_video['path']}" f"pos={position[0]},{position[1]}" f"start={start}" f"dur={duration}" f"op={opacity}" ) if "start" in overlay_video: params += f"vstart={overlay_video['start']}" if "end" in overlay_video and overlay_video["end"] is not None: params += f"vend={overlay_video['end']}" # Create a hash of the parameters return hashlib.md5(params.encode()).hexdigest() def _build_overlay_filter_complex(self, position: Tuple[int, int], start: float, duration: Optional[float], opacity: float) -> List[str]: """ Build FFmpeg filter complex for overlay operations. Args: position: (x, y) coordinates for placement start: Start time in seconds duration: Duration to display overlay opacity: Opacity of overlay Returns: List of filter complex parts """ # Build filter chain for overlay video overlay_filter = "[1:v]" # Only add setpts if start time is non-zero if start > 0: overlay_filter += f"setpts=PTS-STARTPTS+{start}/TB," # Only add trim if duration is specified if duration is not None: overlay_filter += f"trim=duration={duration}," # Only add opacity adjustment if not fully opaque if opacity < 1.0: overlay_filter += f"format=rgba,colorchannelmixer=aa={opacity}," # Remove trailing comma if present and add label if overlay_filter.endswith(","): overlay_filter = overlay_filter[:-1] overlay_filter += "[overlay]" # Build the complete filter complex filter_complex = [ overlay_filter, f"[0:v][overlay]overlay={position[0]}:{position[1]}" ] return filter_complex
[docs] def overlay(self, video: Dict[str, Any], overlay_video: Dict[str, Any], position: Tuple[int, int] = (0, 0), start: float = 0, duration: Optional[float] = None, opacity: float = 1.0) -> Dict[str, Any]: """ Overlay another video (picture-in-picture) with caching. Args: video: Video dictionary overlay_video: Video to overlay position: (x, y) coordinates for placement start: Start time in seconds duration: Duration to display overlay (None = full duration) opacity: Opacity of overlay (1.0 = fully opaque) Returns: New video dictionary """ # Check cache first for processed overlay video cache_key = self._generate_overlay_cache_key( overlay_video, position, start, duration, opacity ) if cache_key in self._overlay_cache: logger.debug(f"Using cached overlay: {cache_key}") # Create a new video dictionary with the cached file temp_file = self._overlay_cache[cache_key] info = self._run_ffprobe(["-i", temp_file]) return { "path": temp_file, "info": info, "is_temp": True, "start": 0, "end": None } # Create a new temporary file self._ensure_temp_dir() temp_file = os.path.join(self._temp_dir, f"overlay_{os.path.basename(video['path'])}") # Get overlay duration overlay_info = self.get_info(overlay_video) overlay_duration = overlay_info["duration"] # If duration is not specified, use the overlay duration overlay_display_duration = duration if duration is not None else overlay_duration # Build optimized FFmpeg filter complex filter_parts = self._build_overlay_filter_complex( position, start, overlay_display_duration, opacity ) filter_complex = ";".join(filter_parts) # Build FFmpeg arguments args = [] if video.get("start", 0) > 0: args.extend(["-ss", str(video["start"])]) args.extend([ "-i", video["path"], "-i", overlay_video["path"], "-filter_complex", filter_complex, "-c:a", "copy" # Copy audio from main video ]) # Add duration if specified if video.get("end") is not None: trim_duration = video["end"] - video.get("start", 0) args.extend(["-t", str(trim_duration)]) args.extend(["-y", temp_file]) # Run FFmpeg self._run_ffmpeg(args) # Get info for the new video info = self._run_ffprobe(["-i", temp_file]) # Cache the result for future use self._overlay_cache[cache_key] = temp_file logger.debug(f"Cached overlay: {cache_key}") # Create a new video dictionary new_video = { "path": temp_file, "info": info, "is_temp": True, "start": 0, # Reset start time since we've applied it "end": None # Reset end time since we've applied it } return new_video
[docs] def batch_overlay(self, video: Dict[str, Any], overlays: List[Dict[str, Any]]) -> Dict[str, Any]: """ Apply multiple overlays in a single FFmpeg operation. Args: video: Video dictionary overlays: List of overlay specifications, each with keys: - overlay_video: Video to overlay - position: (x, y) coordinates for placement - start: Start time in seconds - duration: Duration to display overlay - opacity: Opacity of overlay Returns: New video dictionary """ if not overlays: return video # Create a new temporary file self._ensure_temp_dir() temp_file = os.path.join(self._temp_dir, f"batch_overlay_{os.path.basename(video['path'])}") # Build complex filter graph filter_complex = [] # First, label the main input filter_complex.append("[0:v]setpts=PTS[main]") # Process each overlay for i, overlay_spec in enumerate(overlays): overlay_video = overlay_spec["overlay_video"] position = overlay_spec.get("position", (0, 0)) start = overlay_spec.get("start", 0) duration = overlay_spec.get("duration") opacity = overlay_spec.get("opacity", 1.0) # Get overlay info overlay_info = self.get_info(overlay_video) overlay_duration = overlay_info["duration"] # If duration is not specified, use the overlay duration if duration is None: duration = overlay_duration # Label for this overlay overlay_label = f"overlay{i}" # Process this overlay overlay_filters = [] # Adjust PTS to start at the right time if start > 0: overlay_filters.append(f"setpts=PTS-STARTPTS+{start}/TB") else: overlay_filters.append("setpts=PTS") # Trim to specified duration if duration is not None: overlay_filters.append(f"trim=duration={duration}") # Set opacity if needed if opacity < 1.0: overlay_filters.append(f"format=rgba,colorchannelmixer=aa={opacity}") # Add the filter chain for this overlay filter_complex.append( f"[{i+1}:v]{','.join(overlay_filters)}[{overlay_label}]" ) # Overlay onto the main video or previous result previous_label = "main" if i == 0 else f"tmp{i-1}" next_label = f"tmp{i}" if i < len(overlays) - 1 else "out" filter_complex.append( f"[{previous_label}][{overlay_label}]overlay={position[0]}:{position[1]}[{next_label}]" ) # Build FFmpeg arguments args = ["-i", video["path"]] # Add all overlay inputs for overlay_spec in overlays: args.extend(["-i", overlay_spec["overlay_video"]["path"]]) # Add filter complex and output options args.extend([ "-filter_complex", ";".join(filter_complex), "-map", "[out]", "-map", "0:a?", # Copy audio from main video if present "-c:a", "copy", "-y", # Overwrite output file if it exists temp_file ]) # Add start and end times if specified if video.get("start", 0) > 0: args = ["-ss", str(video["start"])] + args if video.get("end") is not None: duration = video["end"] - video.get("start", 0) args += ["-t", str(duration)] # Run FFmpeg self._run_ffmpeg(args) # Get info for the new video info = self._run_ffprobe(["-i", temp_file]) # Create a new video dictionary new_video = { "path": temp_file, "info": info, "is_temp": True, "start": 0, # Reset start time since we've applied it "end": None # Reset end time since we've applied it } return new_video
[docs] def add_text(self, video: Dict[str, Any], overlay: Any) -> Dict[str, Any]: """ Add text overlay to video using FFmpeg drawtext filter. Args: video: Video dictionary overlay: TextOverlay object from operations.text Returns: New video dictionary """ text = overlay.text position = overlay.position font = overlay.font size = overlay.size color = overlay.color duration = overlay.duration start_time = overlay.start_time info = self.get_info(video) video_width = info["width"] video_height = info["height"] # Parse position if isinstance(position, str): pos_map = { "center": f"(w-text_w)/2:(h-text_h)/2", "top": f"(w-text_w)/2:10", "bottom": f"(w-text_w)/2:h-text_h-10", "left": f"10:(h-text_h)/2", "right": f"w-text_w-10:(h-text_h)/2", "top-left": "10:10", "top-right": "w-text_w-10:10", "bottom-left": f"10:h-text_h-10", "bottom-right": "w-text_w-10:h-text_h-10" } x_y = pos_map.get(position, f"(w-text_w)/2:(h-text_h)/2") else: x, y = position x_y = f"{x}:{y}" # Parse color if isinstance(color, str): color_str = color elif isinstance(color, tuple): r, g, b = color color_str = f"&H{b:02x}{g:02x}{r:02x}" else: color_str = "white" # Build drawtext filter filter_str = f"drawtext=text='{text}':x={x_y}:fontsize={size}:fontcolor={color_str}" # Add font file if specified if font and not font.startswith("Arial"): filter_str += f":fontfile='{font}'" # Add timing if duration specified if duration is not None: end = start_time + duration filter_str += f":enable='between(t,{start_time},{end})'" self._ensure_temp_dir() temp_file = os.path.join(self._temp_dir, f"text_overlay_{os.path.basename(video['path'])}") args = ["-i", video["path"]] if video.get("start", 0) > 0: args = ["-ss", str(video["start"])] + args if video.get("end") is not None: dur = video["end"] - video.get("start", 0) args += ["-t", str(dur)] args += [ "-vf", filter_str, "-c:a", "copy", "-y", temp_file ] self._run_ffmpeg(args) info_result = self._run_ffprobe(["-i", temp_file]) return { "path": temp_file, "info": info_result, "is_temp": True, "start": 0, "end": None }
[docs] def add_image(self, video: Dict[str, Any], overlay: Any) -> Dict[str, Any]: """ Add image overlay to video using FFmpeg. Args: video: Video dictionary overlay: GraphicsOverlay object from operations.text Returns: New video dictionary """ image_path = str(overlay.image_path) position = overlay.position opacity = overlay.opacity scale = overlay.scale duration = overlay.duration start_time = overlay.start_time self._ensure_temp_dir() temp_file = os.path.join(self._temp_dir, f"image_overlay_{os.path.basename(video['path'])}") # Build filter complex filter_complex = [] # Scale image if needed if scale != 1.0: overlay_width = f"iw*{scale}" overlay_height = f"ih*{scale}" filter_complex.append(f"[1:v]scale={overlay_width}:{overlay_height}[overlay]") else: filter_complex.append("[1:v]copy[overlay]") # Apply opacity if not 1.0 if opacity < 1.0: filter_complex.append(f"[overlay]format=rgba,colorchannelmixer=aa={opacity}[overlay2]") overlay_input = "[overlay2]" else: overlay_input = "[overlay]" # Build the filter string if filter_complex: filter_str = ";".join(filter_complex[:-1]) + ";" + f"{overlay_input}overlay={position[0]}:{position[1]}" else: filter_str = f"overlay={position[0]}:{position[1]}" # Add timing for overlay if duration is not None: end = start_time + duration filter_str = f"[1:v]scale=iw*{scale}:-1,format=rgba,colorchannelmixer=aa={opacity}[overlay];" filter_str += f"[0:v][overlay]overlay={position[0]}:{position[1]}:enable='between(t,{start_time},{end})'" args = ["-i", video["path"], "-i", image_path] if video.get("start", 0) > 0: args = ["-ss", str(video["start"])] + args if video.get("end") is not None: dur = video["end"] - video.get("start", 0) args += ["-t", str(dur)] args += [ "-filter_complex", filter_str, "-c:a", "copy", "-y", temp_file ] self._run_ffmpeg(args) info_result = self._run_ffprobe(["-i", temp_file]) return { "path": temp_file, "info": info_result, "is_temp": True, "start": 0, "end": None }