Source code for fmusvid.backends.ffmpeg.core

"""
Core functionality for the FFmpeg backend.

Contains utility functions for finding ffmpeg/ffprobe, running commands, etc.
"""

import os
import sys
import subprocess
import tempfile
import shutil
import json
import logging
from typing import List, Optional, Callable, Dict, Any, Union
from pathlib import Path

logger = logging.getLogger(__name__)

[docs] class FFmpegCore: """Core FFmpeg functionality.""" def _find_ffmpeg(self) -> str: """ Find the FFmpeg executable. Returns: Path to FFmpeg executable Raises: ValueError: If FFmpeg is not found """ # Check environment variable first env_path = os.environ.get("FMUSVID_FFMPEG_PATH") if env_path and os.path.isfile(env_path): logger.info(f"Found FFmpeg from environment: {env_path}") return env_path # Try to find ffmpeg in PATH ffmpeg_path = shutil.which("ffmpeg") if ffmpeg_path: logger.info(f"Found FFmpeg at: {ffmpeg_path}") return ffmpeg_path # Try common installation locations common_locations = [] if sys.platform == "win32": common_locations = [ r"C:\Program Files\FFmpeg\bin\ffmpeg.exe", r"C:\Program Files (x86)\FFmpeg\bin\ffmpeg.exe", r"C:\Program Files\ImageMagick-7.1.0-Q16-HDRI\ffmpeg.exe", os.path.expanduser(r"~\FFmpeg\bin\ffmpeg.exe") ] elif sys.platform == "darwin": # macOS common_locations = [ "/usr/local/bin/ffmpeg", "/opt/homebrew/bin/ffmpeg", "/opt/local/bin/ffmpeg" ] else: # Linux and others common_locations = [ "/usr/bin/ffmpeg", "/usr/local/bin/ffmpeg", "/opt/ffmpeg/bin/ffmpeg" ] for location in common_locations: if os.path.isfile(location): logger.info(f"Found FFmpeg at: {location}") return location # If we get here, FFmpeg was not found raise ValueError( "FFmpeg not found. Please install FFmpeg and make sure it's in your PATH, " "or specify the path to FFmpeg using FMUSVID_FFMPEG_PATH environment variable." ) def _find_ffprobe(self) -> str: """ Find the FFprobe executable. Returns: Path to FFprobe executable Raises: ValueError: If FFprobe is not found """ # Check environment variable first env_path = os.environ.get("FMUSVID_FFPROBE_PATH") if env_path and os.path.isfile(env_path): logger.info(f"Found FFprobe from environment: {env_path}") return env_path # Try to find ffprobe in PATH ffprobe_path = shutil.which("ffprobe") if ffprobe_path: logger.info(f"Found FFprobe at: {ffprobe_path}") return ffprobe_path # Try to find ffprobe in the same directory as ffmpeg if self._ffmpeg_path: ffmpeg_dir = os.path.dirname(self._ffmpeg_path) ffprobe_candidate = os.path.join(ffmpeg_dir, "ffprobe") if sys.platform == "win32": ffprobe_candidate += ".exe" if os.path.isfile(ffprobe_candidate): logger.info(f"Found FFprobe at: {ffprobe_candidate}") return ffprobe_candidate # Try common installation locations common_locations = [] if sys.platform == "win32": common_locations = [ r"C:\Program Files\FFmpeg\bin\ffprobe.exe", r"C:\Program Files (x86)\FFmpeg\bin\ffprobe.exe", r"C:\Program Files\ImageMagick-7.1.0-Q16-HDRI\ffprobe.exe", os.path.expanduser(r"~\FFmpeg\bin\ffprobe.exe") ] elif sys.platform == "darwin": # macOS common_locations = [ "/usr/local/bin/ffprobe", "/opt/homebrew/bin/ffprobe", "/opt/local/bin/ffprobe" ] else: # Linux and others common_locations = [ "/usr/bin/ffprobe", "/usr/local/bin/ffprobe", "/opt/ffmpeg/bin/ffprobe" ] for location in common_locations: if os.path.isfile(location): logger.info(f"Found FFprobe at: {location}") return location # If we get here, FFprobe was not found # Try to see if we can use ffmpeg -version to get some info instead logger.warning("FFprobe not found. Some features may not work properly.") # Return ffmpeg path as fallback (for basic functionality) if self._ffmpeg_path: return self._ffmpeg_path raise ValueError( "FFprobe not found. Please install FFmpeg (which includes FFprobe) " "and make sure it's in your PATH, or specify the path to FFprobe using " "FMUSVID_FFPROBE_PATH environment variable." ) def _ensure_temp_dir(self): """Ensure that a temporary directory exists.""" if self._temp_dir is None or not os.path.isdir(self._temp_dir): self._temp_dir = tempfile.mkdtemp(prefix="fmusvid_") logger.debug(f"Created temporary directory: {self._temp_dir}") def _run_ffmpeg(self, args: List[str], input_data=None, progress_callback: Optional[Callable[[float], None]] = None) -> bytes: """ Run FFmpeg with the given arguments. Args: args: FFmpeg command-line arguments input_data: Input data to pipe to FFmpeg progress_callback: Function to call with progress (0-1) Returns: FFmpeg output Raises: RuntimeError: If FFmpeg returns a non-zero exit code """ # Add FFmpeg executable to the beginning of the arguments cmd = [self._ffmpeg_path] + args # Add progress monitoring if requested if progress_callback: cmd += ["-progress", "pipe:1"] # Log the command logger.debug(f"Running FFmpeg: {' '.join(cmd)}") # Set up the subprocess stdin = subprocess.PIPE if input_data is not None else None # Run FFmpeg process = subprocess.Popen( cmd, stdin=stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=False ) # Send input data if provided if input_data is not None: stdout, stderr = process.communicate(input=input_data) else: # Monitor progress if callback provided if progress_callback: stdout, stderr = self._monitor_progress(process, progress_callback) else: stdout, stderr = process.communicate() # Check return code if process.returncode != 0: error_msg = stderr.decode("utf-8", errors="replace") logger.error(f"FFmpeg error (code {process.returncode}): {error_msg}") raise RuntimeError(f"FFmpeg error (code {process.returncode}): {error_msg}") return stdout def _monitor_progress(self, process, progress_callback): """ Monitor FFmpeg progress and call the callback function. Args: process: Subprocess process progress_callback: Function to call with progress (0-1) Returns: Tuple of (stdout, stderr) """ stdout_data = b"" stderr_data = b"" # Read output in a non-blocking way while process.poll() is None: # Read stdout if process.stdout: line = process.stdout.readline() if line: stdout_data += line # Parse progress information try: line_str = line.decode("utf-8", errors="replace").strip() if line_str.startswith("out_time_ms="): time_ms = int(line_str.split("=")[1]) progress = time_ms / 1000 / 60 # Approximate progress progress_callback(min(1.0, max(0.0, progress))) except Exception as e: logger.debug(f"Error parsing progress: {e}") # Read stderr if process.stderr: line = process.stderr.readline() if line: stderr_data += line # Read any remaining output if process.stdout: stdout_data += process.stdout.read() if process.stderr: stderr_data += process.stderr.read() return stdout_data, stderr_data def _run_ffprobe(self, args: List[str]) -> Dict[str, Any]: """ Run FFprobe with the given arguments. Args: args: FFprobe command-line arguments Returns: Parsed JSON output Raises: RuntimeError: If FFprobe returns a non-zero exit code """ # Add FFprobe executable and JSON output format to the arguments cmd = [ self._ffprobe_path, "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams" ] + args # Log the command logger.debug(f"Running FFprobe: {' '.join(cmd)}") # Run FFprobe process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=False ) stdout, stderr = process.communicate() # Check return code if process.returncode != 0: error_msg = stderr.decode("utf-8", errors="replace") logger.error(f"FFprobe error (code {process.returncode}): {error_msg}") raise RuntimeError(f"FFprobe error (code {process.returncode}): {error_msg}") # Parse JSON output try: result = json.loads(stdout.decode("utf-8")) except json.JSONDecodeError: logger.error("Failed to parse FFprobe JSON output") result = {} return result
[docs] def get_info(self, video: Dict[str, Any]) -> Dict[str, Any]: """ Get video information. Args: video: Video dictionary Returns: Dictionary with video info """ # If info is already in the video dictionary and is processed, return it if "info" in video and isinstance(video["info"], dict) and "width" in video["info"]: return video["info"] # Run FFprobe to get video info if not already available if "info" in video: info = video["info"] else: info = self._run_ffprobe(["-i", video["path"]]) # Find video stream video_stream = None for stream in info.get("streams", []): if stream.get("codec_type") == "video": video_stream = stream break # Find audio stream audio_stream = None for stream in info.get("streams", []): if stream.get("codec_type") == "audio": audio_stream = stream break # Extract video information width = int(video_stream.get("width", 0)) if video_stream else 0 height = int(video_stream.get("height", 0)) if video_stream else 0 # Extract duration duration = float(info.get("format", {}).get("duration", 0)) # Apply start and end times if specified if video.get("start", 0) > 0: duration -= video["start"] if video.get("end") is not None: duration = min(duration, video["end"] - video.get("start", 0)) # Extract FPS fps = 0 if video_stream and "avg_frame_rate" in video_stream: try: fps_fraction = video_stream["avg_frame_rate"].split("/") fps = float(fps_fraction[0]) / float(fps_fraction[1]) except (ValueError, ZeroDivisionError, IndexError): fps = 0 # Extract codec information video_codec = video_stream.get("codec_name", "unknown") if video_stream else "none" audio_codec = audio_stream.get("codec_name", "unknown") if audio_stream else "none" # Extract bitrate bitrate = info.get("format", {}).get("bit_rate") if bitrate: try: bitrate = f"{int(bitrate) // 1000}k" except (ValueError, TypeError): bitrate = "unknown" return { "width": width, "height": height, "duration": duration, "fps": fps, "has_audio": audio_stream is not None, "codec": video_codec, "audio_codec": audio_codec if audio_stream else None, "bitrate": bitrate }