"""
FFmpeg overlay functionality.
This module provides functionality for overlaying videos, images, and text.
"""
import os
import hashlib
from typing import Dict, Any, Tuple, Optional, Union, List
from pathlib import Path
import logging
from PIL import Image, ImageDraw, ImageFont
import numpy as np
logger = logging.getLogger(__name__)
[docs]
class FFmpegOverlay:
"""FFmpeg overlay functionality with caching capabilities."""
[docs]
def overlay_frame(self, frame_data: Any, overlay_data: Any,
position: Tuple[int, int] = (0, 0), opacity: float = 1.0) -> Any:
"""
Overlay an image on the frame.
Args:
frame_data: Frame data (PIL Image)
overlay_data: Overlay image (PIL Image)
position: (x, y) coordinates for placement
opacity: Opacity of overlay (1.0 = fully opaque)
Returns:
Combined PIL Image
"""
# Create a copy of the frame
result = frame_data.copy()
# If overlay has alpha channel, use it
if overlay_data.mode == 'RGBA':
overlay_with_alpha = overlay_data
else:
# Create an alpha channel based on opacity
overlay_with_alpha = overlay_data.convert('RGBA')
if opacity < 1.0:
# Apply opacity to the alpha channel
r, g, b, a = overlay_with_alpha.split()
a = a.point(lambda i: i * opacity)
overlay_with_alpha = Image.merge('RGBA', (r, g, b, a))
# Paste the overlay onto the frame
result.paste(overlay_with_alpha, position, overlay_with_alpha)
return result
[docs]
def add_text_to_frame(self, frame_data: Any, text: str, position: Tuple[int, int],
font: str = "Arial", size: int = 24,
color: Union[str, Tuple[int, int, int]] = "white") -> Any:
"""
Add text to the frame.
Args:
frame_data: Frame data (PIL Image)
text: Text to add
position: (x, y) coordinates for placement
font: Font name
size: Font size
color: Text color (name or RGB tuple)
Returns:
PIL Image with text
"""
# Create a copy of the frame
result = frame_data.copy()
# Create a drawing context
draw = ImageDraw.Draw(result)
# Try to load the font
try:
font_obj = ImageFont.truetype(font, size)
except IOError:
# Fall back to default font
font_obj = ImageFont.load_default()
# Draw the text
draw.text(position, text, fill=color, font=font_obj)
return result
def _generate_overlay_cache_key(self, overlay_video: Dict[str, Any],
position: Tuple[int, int], start: float,
duration: Optional[float], opacity: float) -> str:
"""
Generate a cache key for overlay operations.
Args:
overlay_video: Video to overlay
position: (x, y) coordinates for placement
start: Start time in seconds
duration: Duration to display overlay
opacity: Opacity of overlay
Returns:
Cache key string
"""
# Create a string with all parameters
params = (
f"path={overlay_video['path']}"
f"pos={position[0]},{position[1]}"
f"start={start}"
f"dur={duration}"
f"op={opacity}"
)
if "start" in overlay_video:
params += f"vstart={overlay_video['start']}"
if "end" in overlay_video and overlay_video["end"] is not None:
params += f"vend={overlay_video['end']}"
# Create a hash of the parameters
return hashlib.md5(params.encode()).hexdigest()
def _build_overlay_filter_complex(self, position: Tuple[int, int], start: float,
duration: Optional[float], opacity: float) -> List[str]:
"""
Build FFmpeg filter complex for overlay operations.
Args:
position: (x, y) coordinates for placement
start: Start time in seconds
duration: Duration to display overlay
opacity: Opacity of overlay
Returns:
List of filter complex parts
"""
# Build filter chain for overlay video
overlay_filter = "[1:v]"
# Only add setpts if start time is non-zero
if start > 0:
overlay_filter += f"setpts=PTS-STARTPTS+{start}/TB,"
# Only add trim if duration is specified
if duration is not None:
overlay_filter += f"trim=duration={duration},"
# Only add opacity adjustment if not fully opaque
if opacity < 1.0:
overlay_filter += f"format=rgba,colorchannelmixer=aa={opacity},"
# Remove trailing comma if present and add label
if overlay_filter.endswith(","):
overlay_filter = overlay_filter[:-1]
overlay_filter += "[overlay]"
# Build the complete filter complex
filter_complex = [
overlay_filter,
f"[0:v][overlay]overlay={position[0]}:{position[1]}"
]
return filter_complex
[docs]
def overlay(self, video: Dict[str, Any], overlay_video: Dict[str, Any],
position: Tuple[int, int] = (0, 0), start: float = 0,
duration: Optional[float] = None, opacity: float = 1.0) -> Dict[str, Any]:
"""
Overlay another video (picture-in-picture) with caching.
Args:
video: Video dictionary
overlay_video: Video to overlay
position: (x, y) coordinates for placement
start: Start time in seconds
duration: Duration to display overlay (None = full duration)
opacity: Opacity of overlay (1.0 = fully opaque)
Returns:
New video dictionary
"""
# Check cache first for processed overlay video
cache_key = self._generate_overlay_cache_key(
overlay_video, position, start, duration, opacity
)
if cache_key in self._overlay_cache:
logger.debug(f"Using cached overlay: {cache_key}")
# Create a new video dictionary with the cached file
temp_file = self._overlay_cache[cache_key]
info = self._run_ffprobe(["-i", temp_file])
return {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0,
"end": None
}
# Create a new temporary file
self._ensure_temp_dir()
temp_file = os.path.join(self._temp_dir, f"overlay_{os.path.basename(video['path'])}")
# Get overlay duration
overlay_info = self.get_info(overlay_video)
overlay_duration = overlay_info["duration"]
# If duration is not specified, use the overlay duration
overlay_display_duration = duration if duration is not None else overlay_duration
# Build optimized FFmpeg filter complex
filter_parts = self._build_overlay_filter_complex(
position, start, overlay_display_duration, opacity
)
filter_complex = ";".join(filter_parts)
# Build FFmpeg arguments
args = []
if video.get("start", 0) > 0:
args.extend(["-ss", str(video["start"])])
args.extend([
"-i", video["path"],
"-i", overlay_video["path"],
"-filter_complex", filter_complex,
"-c:a", "copy" # Copy audio from main video
])
# Add duration if specified
if video.get("end") is not None:
trim_duration = video["end"] - video.get("start", 0)
args.extend(["-t", str(trim_duration)])
args.extend(["-y", temp_file])
# Run FFmpeg
self._run_ffmpeg(args)
# Get info for the new video
info = self._run_ffprobe(["-i", temp_file])
# Cache the result for future use
self._overlay_cache[cache_key] = temp_file
logger.debug(f"Cached overlay: {cache_key}")
# Create a new video dictionary
new_video = {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0, # Reset start time since we've applied it
"end": None # Reset end time since we've applied it
}
return new_video
[docs]
def batch_overlay(self, video: Dict[str, Any],
overlays: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Apply multiple overlays in a single FFmpeg operation.
Args:
video: Video dictionary
overlays: List of overlay specifications, each with keys:
- overlay_video: Video to overlay
- position: (x, y) coordinates for placement
- start: Start time in seconds
- duration: Duration to display overlay
- opacity: Opacity of overlay
Returns:
New video dictionary
"""
if not overlays:
return video
# Create a new temporary file
self._ensure_temp_dir()
temp_file = os.path.join(self._temp_dir, f"batch_overlay_{os.path.basename(video['path'])}")
# Build complex filter graph
filter_complex = []
# First, label the main input
filter_complex.append("[0:v]setpts=PTS[main]")
# Process each overlay
for i, overlay_spec in enumerate(overlays):
overlay_video = overlay_spec["overlay_video"]
position = overlay_spec.get("position", (0, 0))
start = overlay_spec.get("start", 0)
duration = overlay_spec.get("duration")
opacity = overlay_spec.get("opacity", 1.0)
# Get overlay info
overlay_info = self.get_info(overlay_video)
overlay_duration = overlay_info["duration"]
# If duration is not specified, use the overlay duration
if duration is None:
duration = overlay_duration
# Label for this overlay
overlay_label = f"overlay{i}"
# Process this overlay
overlay_filters = []
# Adjust PTS to start at the right time
if start > 0:
overlay_filters.append(f"setpts=PTS-STARTPTS+{start}/TB")
else:
overlay_filters.append("setpts=PTS")
# Trim to specified duration
if duration is not None:
overlay_filters.append(f"trim=duration={duration}")
# Set opacity if needed
if opacity < 1.0:
overlay_filters.append(f"format=rgba,colorchannelmixer=aa={opacity}")
# Add the filter chain for this overlay
filter_complex.append(
f"[{i+1}:v]{','.join(overlay_filters)}[{overlay_label}]"
)
# Overlay onto the main video or previous result
previous_label = "main" if i == 0 else f"tmp{i-1}"
next_label = f"tmp{i}" if i < len(overlays) - 1 else "out"
filter_complex.append(
f"[{previous_label}][{overlay_label}]overlay={position[0]}:{position[1]}[{next_label}]"
)
# Build FFmpeg arguments
args = ["-i", video["path"]]
# Add all overlay inputs
for overlay_spec in overlays:
args.extend(["-i", overlay_spec["overlay_video"]["path"]])
# Add filter complex and output options
args.extend([
"-filter_complex", ";".join(filter_complex),
"-map", "[out]",
"-map", "0:a?", # Copy audio from main video if present
"-c:a", "copy",
"-y", # Overwrite output file if it exists
temp_file
])
# Add start and end times if specified
if video.get("start", 0) > 0:
args = ["-ss", str(video["start"])] + args
if video.get("end") is not None:
duration = video["end"] - video.get("start", 0)
args += ["-t", str(duration)]
# Run FFmpeg
self._run_ffmpeg(args)
# Get info for the new video
info = self._run_ffprobe(["-i", temp_file])
# Create a new video dictionary
new_video = {
"path": temp_file,
"info": info,
"is_temp": True,
"start": 0, # Reset start time since we've applied it
"end": None # Reset end time since we've applied it
}
return new_video
[docs]
def add_text(self, video: Dict[str, Any], overlay: Any) -> Dict[str, Any]:
"""
Add text overlay to video using FFmpeg drawtext filter.
Args:
video: Video dictionary
overlay: TextOverlay object from operations.text
Returns:
New video dictionary
"""
text = overlay.text
position = overlay.position
font = overlay.font
size = overlay.size
color = overlay.color
duration = overlay.duration
start_time = overlay.start_time
info = self.get_info(video)
video_width = info["width"]
video_height = info["height"]
# Parse position
if isinstance(position, str):
pos_map = {
"center": f"(w-text_w)/2:(h-text_h)/2",
"top": f"(w-text_w)/2:10",
"bottom": f"(w-text_w)/2:h-text_h-10",
"left": f"10:(h-text_h)/2",
"right": f"w-text_w-10:(h-text_h)/2",
"top-left": "10:10",
"top-right": "w-text_w-10:10",
"bottom-left": f"10:h-text_h-10",
"bottom-right": "w-text_w-10:h-text_h-10"
}
x_y = pos_map.get(position, f"(w-text_w)/2:(h-text_h)/2")
else:
x, y = position
x_y = f"{x}:{y}"
# Parse color
if isinstance(color, str):
color_str = color
elif isinstance(color, tuple):
r, g, b = color
color_str = f"&H{b:02x}{g:02x}{r:02x}"
else:
color_str = "white"
# Build drawtext filter
filter_str = f"drawtext=text='{text}':x={x_y}:fontsize={size}:fontcolor={color_str}"
# Add font file if specified
if font and not font.startswith("Arial"):
filter_str += f":fontfile='{font}'"
# Add timing if duration specified
if duration is not None:
end = start_time + duration
filter_str += f":enable='between(t,{start_time},{end})'"
self._ensure_temp_dir()
temp_file = os.path.join(self._temp_dir, f"text_overlay_{os.path.basename(video['path'])}")
args = ["-i", video["path"]]
if video.get("start", 0) > 0:
args = ["-ss", str(video["start"])] + args
if video.get("end") is not None:
dur = video["end"] - video.get("start", 0)
args += ["-t", str(dur)]
args += [
"-vf", filter_str,
"-c:a", "copy",
"-y",
temp_file
]
self._run_ffmpeg(args)
info_result = self._run_ffprobe(["-i", temp_file])
return {
"path": temp_file,
"info": info_result,
"is_temp": True,
"start": 0,
"end": None
}
[docs]
def add_image(self, video: Dict[str, Any], overlay: Any) -> Dict[str, Any]:
"""
Add image overlay to video using FFmpeg.
Args:
video: Video dictionary
overlay: GraphicsOverlay object from operations.text
Returns:
New video dictionary
"""
image_path = str(overlay.image_path)
position = overlay.position
opacity = overlay.opacity
scale = overlay.scale
duration = overlay.duration
start_time = overlay.start_time
self._ensure_temp_dir()
temp_file = os.path.join(self._temp_dir, f"image_overlay_{os.path.basename(video['path'])}")
# Build filter complex
filter_complex = []
# Scale image if needed
if scale != 1.0:
overlay_width = f"iw*{scale}"
overlay_height = f"ih*{scale}"
filter_complex.append(f"[1:v]scale={overlay_width}:{overlay_height}[overlay]")
else:
filter_complex.append("[1:v]copy[overlay]")
# Apply opacity if not 1.0
if opacity < 1.0:
filter_complex.append(f"[overlay]format=rgba,colorchannelmixer=aa={opacity}[overlay2]")
overlay_input = "[overlay2]"
else:
overlay_input = "[overlay]"
# Build the filter string
if filter_complex:
filter_str = ";".join(filter_complex[:-1]) + ";" + f"{overlay_input}overlay={position[0]}:{position[1]}"
else:
filter_str = f"overlay={position[0]}:{position[1]}"
# Add timing for overlay
if duration is not None:
end = start_time + duration
filter_str = f"[1:v]scale=iw*{scale}:-1,format=rgba,colorchannelmixer=aa={opacity}[overlay];"
filter_str += f"[0:v][overlay]overlay={position[0]}:{position[1]}:enable='between(t,{start_time},{end})'"
args = ["-i", video["path"], "-i", image_path]
if video.get("start", 0) > 0:
args = ["-ss", str(video["start"])] + args
if video.get("end") is not None:
dur = video["end"] - video.get("start", 0)
args += ["-t", str(dur)]
args += [
"-filter_complex", filter_str,
"-c:a", "copy",
"-y",
temp_file
]
self._run_ffmpeg(args)
info_result = self._run_ffprobe(["-i", temp_file])
return {
"path": temp_file,
"info": info_result,
"is_temp": True,
"start": 0,
"end": None
}