Source code for fmusvid.backends.ffmpeg

"""
FFmpeg backend for FMUS-VID.

This module provides the FFmpeg implementation of the Backend interface.
"""

import os
import sys
import subprocess
import tempfile
import shutil
import json
import re
from pathlib import Path
from typing import Union, List, Tuple, Optional, Dict, Any, Callable
import logging
from fractions import Fraction

import numpy as np
from PIL import Image, ImageDraw, ImageFont

from ..backend import Backend
from .core import FFmpegCore
from .filters import FFmpegFilters
from .overlay import FFmpegOverlay
from .conversion import FFmpegConversion
from .subtitle import FFmpegSubtitle
from .audio import FFmpegAudio

logger = logging.getLogger(__name__)

[docs] class FFmpegBackend(FFmpegCore, FFmpegFilters, FFmpegOverlay, FFmpegConversion, FFmpegSubtitle, FFmpegAudio, Backend): """FFmpeg implementation of the Backend interface."""
[docs] def __init__(self): """Initialize the FFmpeg backend.""" self._ffmpeg_path = self._find_ffmpeg() self._ffprobe_path = self._find_ffprobe() self._temp_dir = None # Initialize overlay cache self._overlay_cache = {} self._filter_cache = {}
[docs] def __del__(self): """Clean up temporary files on instance destruction.""" if self._temp_dir and os.path.isdir(self._temp_dir): try: shutil.rmtree(self._temp_dir) logger.debug(f"Cleaned up temporary directory: {self._temp_dir}") except Exception as e: logger.warning(f"Failed to clean up temporary directory: {e}")
[docs] def load(self, path: Union[str, Path], **kwargs) -> Dict[str, Any]: """ Load a video file. Args: path: Path to video file **kwargs: Additional options Returns: Video dictionary """ path = str(path) if not os.path.isfile(path): raise ValueError(f"Video file not found: {path}") # Get video info info = self.get_info({"path": path}) return { "path": path, "info": info, "is_temp": False, "start": 0, "end": None }
[docs] def create(self, width: int, height: int, duration: float, fps: float, color: Tuple[int, int, int], **kwargs) -> Dict[str, Any]: """ Create a blank video. Args: width: Width in pixels height: Height in pixels duration: Duration in seconds fps: Frames per second color: Background color as RGB tuple **kwargs: Additional options Returns: Video dictionary """ self._ensure_temp_dir() temp_file = os.path.join(self._temp_dir, f"blank_{width}x{height}.mp4") # Convert RGB to hex for FFmpeg color_hex = "{:02x}{:02x}{:02x}".format(*color) # Build FFmpeg command to create blank video args = [ "-f", "lavfi", "-i", f"color=c={color_hex}:s={width}x{height}:d={duration}:r={fps}", "-c:v", "libx264", "-t", str(duration), "-pix_fmt", "yuv420p", "-y", temp_file ] self._run_ffmpeg(args) return { "path": temp_file, "info": { "width": width, "height": height, "duration": duration, "fps": fps, "has_audio": False, "codec": "h264" }, "is_temp": True, "start": 0, "end": None }
[docs] def save(self, video: Dict[str, Any], output_path: Union[str, Path], progress_callback: Optional[Callable[[float], None]] = None, **kwargs) -> None: """ Save video to file. Args: video: Video dictionary output_path: Output file path progress_callback: Function to call with progress (0-1) **kwargs: Additional options (codec, bitrate, etc.) """ output_path = str(output_path) self._ensure_temp_dir() # Build FFmpeg arguments args = ["-i", video["path"]] # Add start and end times if specified if video.get("start", 0) > 0: args = ["-ss", str(video["start"])] + args if video.get("end") is not None: duration = video["end"] - video.get("start", 0) args += ["-t", str(duration)] # Apply any filters that were accumulated if "filters" in video and video["filters"]: filter_string = ",".join(video["filters"]) args += ["-vf", filter_string] # Add codec options codec = kwargs.get("codec", "libx264") bitrate = kwargs.get("bitrate") args += [ "-c:v", codec, "-preset", "medium", "-crf", "23" ] if bitrate: args += ["-b:v", bitrate] args += ["-y", output_path] self._run_ffmpeg(args, progress_callback=progress_callback)
[docs] @staticmethod def is_available() -> bool: """Check if FFmpeg is available on the system.""" try: ffmpeg_path = shutil.which("ffmpeg") return ffmpeg_path is not None except Exception: return False