"""
Core functionality for the FFmpeg backend.
Contains utility functions for finding ffmpeg/ffprobe, running commands, etc.
"""
import os
import sys
import subprocess
import tempfile
import shutil
import json
import logging
from typing import List, Optional, Callable, Dict, Any, Union
from pathlib import Path
logger = logging.getLogger(__name__)
[docs]
class FFmpegCore:
"""Core FFmpeg functionality."""
def _find_ffmpeg(self) -> str:
"""
Find the FFmpeg executable.
Returns:
Path to FFmpeg executable
Raises:
ValueError: If FFmpeg is not found
"""
# Check environment variable first
env_path = os.environ.get("FMUSVID_FFMPEG_PATH")
if env_path and os.path.isfile(env_path):
logger.info(f"Found FFmpeg from environment: {env_path}")
return env_path
# Try to find ffmpeg in PATH
ffmpeg_path = shutil.which("ffmpeg")
if ffmpeg_path:
logger.info(f"Found FFmpeg at: {ffmpeg_path}")
return ffmpeg_path
# Try common installation locations
common_locations = []
if sys.platform == "win32":
common_locations = [
r"C:\Program Files\FFmpeg\bin\ffmpeg.exe",
r"C:\Program Files (x86)\FFmpeg\bin\ffmpeg.exe",
r"C:\Program Files\ImageMagick-7.1.0-Q16-HDRI\ffmpeg.exe",
os.path.expanduser(r"~\FFmpeg\bin\ffmpeg.exe")
]
elif sys.platform == "darwin": # macOS
common_locations = [
"/usr/local/bin/ffmpeg",
"/opt/homebrew/bin/ffmpeg",
"/opt/local/bin/ffmpeg"
]
else: # Linux and others
common_locations = [
"/usr/bin/ffmpeg",
"/usr/local/bin/ffmpeg",
"/opt/ffmpeg/bin/ffmpeg"
]
for location in common_locations:
if os.path.isfile(location):
logger.info(f"Found FFmpeg at: {location}")
return location
# If we get here, FFmpeg was not found
raise ValueError(
"FFmpeg not found. Please install FFmpeg and make sure it's in your PATH, "
"or specify the path to FFmpeg using FMUSVID_FFMPEG_PATH environment variable."
)
def _find_ffprobe(self) -> str:
"""
Find the FFprobe executable.
Returns:
Path to FFprobe executable
Raises:
ValueError: If FFprobe is not found
"""
# Check environment variable first
env_path = os.environ.get("FMUSVID_FFPROBE_PATH")
if env_path and os.path.isfile(env_path):
logger.info(f"Found FFprobe from environment: {env_path}")
return env_path
# Try to find ffprobe in PATH
ffprobe_path = shutil.which("ffprobe")
if ffprobe_path:
logger.info(f"Found FFprobe at: {ffprobe_path}")
return ffprobe_path
# Try to find ffprobe in the same directory as ffmpeg
if self._ffmpeg_path:
ffmpeg_dir = os.path.dirname(self._ffmpeg_path)
ffprobe_candidate = os.path.join(ffmpeg_dir, "ffprobe")
if sys.platform == "win32":
ffprobe_candidate += ".exe"
if os.path.isfile(ffprobe_candidate):
logger.info(f"Found FFprobe at: {ffprobe_candidate}")
return ffprobe_candidate
# Try common installation locations
common_locations = []
if sys.platform == "win32":
common_locations = [
r"C:\Program Files\FFmpeg\bin\ffprobe.exe",
r"C:\Program Files (x86)\FFmpeg\bin\ffprobe.exe",
r"C:\Program Files\ImageMagick-7.1.0-Q16-HDRI\ffprobe.exe",
os.path.expanduser(r"~\FFmpeg\bin\ffprobe.exe")
]
elif sys.platform == "darwin": # macOS
common_locations = [
"/usr/local/bin/ffprobe",
"/opt/homebrew/bin/ffprobe",
"/opt/local/bin/ffprobe"
]
else: # Linux and others
common_locations = [
"/usr/bin/ffprobe",
"/usr/local/bin/ffprobe",
"/opt/ffmpeg/bin/ffprobe"
]
for location in common_locations:
if os.path.isfile(location):
logger.info(f"Found FFprobe at: {location}")
return location
# If we get here, FFprobe was not found
# Try to see if we can use ffmpeg -version to get some info instead
logger.warning("FFprobe not found. Some features may not work properly.")
# Return ffmpeg path as fallback (for basic functionality)
if self._ffmpeg_path:
return self._ffmpeg_path
raise ValueError(
"FFprobe not found. Please install FFmpeg (which includes FFprobe) "
"and make sure it's in your PATH, or specify the path to FFprobe using "
"FMUSVID_FFPROBE_PATH environment variable."
)
def _ensure_temp_dir(self):
"""Ensure that a temporary directory exists."""
if self._temp_dir is None or not os.path.isdir(self._temp_dir):
self._temp_dir = tempfile.mkdtemp(prefix="fmusvid_")
logger.debug(f"Created temporary directory: {self._temp_dir}")
def _run_ffmpeg(self, args: List[str], input_data=None,
progress_callback: Optional[Callable[[float], None]] = None) -> bytes:
"""
Run FFmpeg with the given arguments.
Args:
args: FFmpeg command-line arguments
input_data: Input data to pipe to FFmpeg
progress_callback: Function to call with progress (0-1)
Returns:
FFmpeg output
Raises:
RuntimeError: If FFmpeg returns a non-zero exit code
"""
# Add FFmpeg executable to the beginning of the arguments
cmd = [self._ffmpeg_path] + args
# Add progress monitoring if requested
if progress_callback:
cmd += ["-progress", "pipe:1"]
# Log the command
logger.debug(f"Running FFmpeg: {' '.join(cmd)}")
# Set up the subprocess
stdin = subprocess.PIPE if input_data is not None else None
# Run FFmpeg
process = subprocess.Popen(
cmd,
stdin=stdin,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=False
)
# Send input data if provided
if input_data is not None:
stdout, stderr = process.communicate(input=input_data)
else:
# Monitor progress if callback provided
if progress_callback:
stdout, stderr = self._monitor_progress(process, progress_callback)
else:
stdout, stderr = process.communicate()
# Check return code
if process.returncode != 0:
error_msg = stderr.decode("utf-8", errors="replace")
logger.error(f"FFmpeg error (code {process.returncode}): {error_msg}")
raise RuntimeError(f"FFmpeg error (code {process.returncode}): {error_msg}")
return stdout
def _monitor_progress(self, process, progress_callback):
"""
Monitor FFmpeg progress and call the callback function.
Args:
process: Subprocess process
progress_callback: Function to call with progress (0-1)
Returns:
Tuple of (stdout, stderr)
"""
stdout_data = b""
stderr_data = b""
# Read output in a non-blocking way
while process.poll() is None:
# Read stdout
if process.stdout:
line = process.stdout.readline()
if line:
stdout_data += line
# Parse progress information
try:
line_str = line.decode("utf-8", errors="replace").strip()
if line_str.startswith("out_time_ms="):
time_ms = int(line_str.split("=")[1])
progress = time_ms / 1000 / 60 # Approximate progress
progress_callback(min(1.0, max(0.0, progress)))
except Exception as e:
logger.debug(f"Error parsing progress: {e}")
# Read stderr
if process.stderr:
line = process.stderr.readline()
if line:
stderr_data += line
# Read any remaining output
if process.stdout:
stdout_data += process.stdout.read()
if process.stderr:
stderr_data += process.stderr.read()
return stdout_data, stderr_data
def _run_ffprobe(self, args: List[str]) -> Dict[str, Any]:
"""
Run FFprobe with the given arguments.
Args:
args: FFprobe command-line arguments
Returns:
Parsed JSON output
Raises:
RuntimeError: If FFprobe returns a non-zero exit code
"""
# Add FFprobe executable and JSON output format to the arguments
cmd = [
self._ffprobe_path,
"-v", "quiet",
"-print_format", "json",
"-show_format",
"-show_streams"
] + args
# Log the command
logger.debug(f"Running FFprobe: {' '.join(cmd)}")
# Run FFprobe
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=False
)
stdout, stderr = process.communicate()
# Check return code
if process.returncode != 0:
error_msg = stderr.decode("utf-8", errors="replace")
logger.error(f"FFprobe error (code {process.returncode}): {error_msg}")
raise RuntimeError(f"FFprobe error (code {process.returncode}): {error_msg}")
# Parse JSON output
try:
result = json.loads(stdout.decode("utf-8"))
except json.JSONDecodeError:
logger.error("Failed to parse FFprobe JSON output")
result = {}
return result
[docs]
def get_info(self, video: Dict[str, Any]) -> Dict[str, Any]:
"""
Get video information.
Args:
video: Video dictionary
Returns:
Dictionary with video info
"""
# If info is already in the video dictionary and is processed, return it
if "info" in video and isinstance(video["info"], dict) and "width" in video["info"]:
return video["info"]
# Run FFprobe to get video info if not already available
if "info" in video:
info = video["info"]
else:
info = self._run_ffprobe(["-i", video["path"]])
# Find video stream
video_stream = None
for stream in info.get("streams", []):
if stream.get("codec_type") == "video":
video_stream = stream
break
# Find audio stream
audio_stream = None
for stream in info.get("streams", []):
if stream.get("codec_type") == "audio":
audio_stream = stream
break
# Extract video information
width = int(video_stream.get("width", 0)) if video_stream else 0
height = int(video_stream.get("height", 0)) if video_stream else 0
# Extract duration
duration = float(info.get("format", {}).get("duration", 0))
# Apply start and end times if specified
if video.get("start", 0) > 0:
duration -= video["start"]
if video.get("end") is not None:
duration = min(duration, video["end"] - video.get("start", 0))
# Extract FPS
fps = 0
if video_stream and "avg_frame_rate" in video_stream:
try:
fps_fraction = video_stream["avg_frame_rate"].split("/")
fps = float(fps_fraction[0]) / float(fps_fraction[1])
except (ValueError, ZeroDivisionError, IndexError):
fps = 0
# Extract codec information
video_codec = video_stream.get("codec_name", "unknown") if video_stream else "none"
audio_codec = audio_stream.get("codec_name", "unknown") if audio_stream else "none"
# Extract bitrate
bitrate = info.get("format", {}).get("bit_rate")
if bitrate:
try:
bitrate = f"{int(bitrate) // 1000}k"
except (ValueError, TypeError):
bitrate = "unknown"
return {
"width": width,
"height": height,
"duration": duration,
"fps": fps,
"has_audio": audio_stream is not None,
"codec": video_codec,
"audio_codec": audio_codec if audio_stream else None,
"bitrate": bitrate
}