Source code for fmusvid.core.video

"""
Core Video class for FMUS-VID.

This module provides the main Video class that users interact with.
"""

from typing import Union, List, Tuple, Optional, Dict, Any, Callable
from pathlib import Path
import asyncio
import logging
from dataclasses import dataclass
import os
import numpy as np

from ..backends.manager import get_backend, Backend
from ..operations.subtitle import SubtitleParser, SubtitleEntry, SubtitleRenderer

logger = logging.getLogger(__name__)

@dataclass
class VideoInfo:
    """Video information container."""
    width: int
    height: int
    duration: float
    fps: float
    has_audio: bool
    codec: str
    audio_codec: Optional[str] = None
    bitrate: Optional[str] = None

    def __str__(self) -> str:
        """String representation of video info."""
        return (f"{self.width}x{self.height}, {self.duration:.2f}s, {self.fps} fps, "
                f"codec: {self.codec}" +
                (f", audio: {self.audio_codec}" if self.has_audio else ", no audio"))

    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary for API compatibility."""
        return {
            "width": self.width,
            "height": self.height,
            "duration": self.duration,
            "fps": self.fps,
            "has_audio": self.has_audio,
            "codec": self.codec,
            "audio_codec": self.audio_codec,
            "bitrate": self.bitrate
        }

[docs] class Video: """ Main Video class for FMUS-VID. This class provides a unified interface for all video operations. """
[docs] def __init__(self, path: Optional[Union[str, Path]] = None, backend: Union[str, Backend] = "auto", **kwargs): """ Initialize a Video object. Args: path: Path to the video file (None for blank video) backend: Backend to use **kwargs: Additional options """ self._path = Path(path) if path else None self._backend_name = backend self._backend = None self._video = None self._operations = [] # Load the video if path is provided if self._path: self._load_video(**kwargs)
def _load_video(self, **kwargs): """Load the video using the selected backend.""" self._backend = get_backend(self._backend_name) self._video = self._backend.load(self._path, **kwargs) def _ensure_loaded(self): """Ensure the video is loaded.""" if self._video is None: raise ValueError("No video loaded. Use fmusvid.load() or fmusvid.create() first.") def _add_operation(self, name: str, **kwargs): """ Add an operation to the queue. Args: name: Operation name **kwargs: Operation parameters Returns: Self for method chaining """ self._operations.append((name, kwargs)) return self def _apply_operations(self): """ Apply all queued operations. Returns: New Video object with operations applied """ self._ensure_loaded() # Create a new Video object result = Video() result._backend = self._backend result._backend_name = self._backend_name result._video = self._video # Apply each operation for name, kwargs in self._operations: if not hasattr(self._backend, name): raise ValueError(f"Operation {name} not supported by backend {self._backend_name}") operation = getattr(self._backend, name) result._video = operation(result._video, **kwargs) return result
[docs] def save(self, path: Union[str, Path], **kwargs) -> None: """ Save video to file. Args: path: Output file path **kwargs: Export options (codec, bitrate, etc.) Example: >>> video.save("output.mp4") >>> video.save("output.mp4", codec="h264", bitrate="5M") """ # Apply all operations result = self._apply_operations() # Save the video result._backend.save(result._video, path, **kwargs) return self
[docs] async def save_async(self, path: Union[str, Path], progress_callback: Optional[Callable[[float], None]] = None, **kwargs) -> None: """ Save video asynchronously. Args: path: Output file path progress_callback: Function to call with progress (0-1) **kwargs: Export options (codec, bitrate, etc.) Example: >>> await video.save_async("output.mp4", ... progress_callback=lambda p: print(f"{p*100:.1f}%")) """ # Apply all operations result = self._apply_operations() # Save the video in a thread pool loop = asyncio.get_event_loop() await loop.run_in_executor( None, lambda: result._backend.save( result._video, path, progress_callback=progress_callback, **kwargs ) ) return self
[docs] def info(self) -> VideoInfo: """ Get video information. Returns: VideoInfo object Example: >>> info = video.info() >>> print(f"Resolution: {info.width}x{info.height}") """ self._ensure_loaded() info_dict = self._backend.get_info(self._video) return VideoInfo( width=info_dict.get("width", 0), height=info_dict.get("height", 0), duration=info_dict.get("duration", 0.0), fps=info_dict.get("fps", 0.0), has_audio=info_dict.get("has_audio", False), codec=info_dict.get("codec", "unknown"), audio_codec=info_dict.get("audio_codec"), bitrate=info_dict.get("bitrate") )
[docs] def get_info(self) -> Dict[str, Any]: """ Get video information as a dictionary. For CLI compatibility. Returns: Dictionary of video information Example: >>> info_dict = video.get_info() >>> print(f"Resolution: {info_dict['width']}x{info_dict['height']}") """ return self._get_result_info().to_dict()
def _get_result_info(self) -> VideoInfo: """Get info with operations applied.""" if self._operations: result = self._apply_operations() return result.info() return self.info() @property def duration(self) -> float: """Get video duration in seconds.""" return self._get_result_info().duration @property def width(self) -> int: """Get video width in pixels.""" return self._get_result_info().width @property def height(self) -> int: """Get video height in pixels.""" return self._get_result_info().height @property def fps(self) -> float: """Get video frame rate.""" return self._get_result_info().fps @property def bitrate(self) -> Optional[str]: """Get video bitrate.""" return self._get_result_info().bitrate @property def has_audio(self) -> bool: """Check if video has audio track.""" return self._get_result_info().has_audio
[docs] def trim(self, start: float, end: Optional[float] = None) -> 'Video': """ Trim video to specified time range. Args: start: Start time in seconds end: End time in seconds (None means until the end) Returns: Self for method chaining Example: >>> video.trim(10, 20) # Keep only seconds 10-20 """ return self._add_operation("trim", start=start, end=end)
[docs] def resize(self, width: Optional[int] = None, height: Optional[int] = None, keep_aspect: bool = True) -> 'Video': """ Resize video to specified dimensions. Args: width: Target width (None to auto-calculate from height) height: Target height (None to auto-calculate from width) keep_aspect: Maintain aspect ratio if only one dimension is specified Returns: Self for method chaining Example: >>> video.resize(720) # Resize to 720p (height auto-calculated) >>> video.resize(width=1280, height=720) # Resize to 720p """ return self._add_operation("resize", width=width, height=height, keep_aspect=keep_aspect)
[docs] def crop(self, x: int, y: int, width: int, height: int) -> 'Video': """ Crop video to specified region. Args: x: X coordinate of top-left corner y: Y coordinate of top-left corner width: Width of crop region height: Height of crop region Returns: Self for method chaining Example: >>> video.crop(100, 100, 500, 300) # Crop to 500x300 region """ return self._add_operation("crop", x=x, y=y, width=width, height=height)
[docs] def rotate(self, degrees: float) -> 'Video': """ Rotate video by specified degrees. Args: degrees: Rotation angle in degrees Returns: Self for method chaining Example: >>> video.rotate(90) # Rotate 90 degrees clockwise """ return self._add_operation("rotate", degrees=degrees)
[docs] def grayscale(self) -> 'Video': """ Convert video to grayscale. Returns: Self for method chaining Example: >>> video.grayscale() # Convert to black and white """ return self._add_operation("grayscale")
[docs] def blur(self, radius: float) -> 'Video': """ Apply Gaussian blur with specified radius. Args: radius: Blur radius Returns: Self for method chaining Example: >>> video.blur(5) # Apply blur with radius 5 """ if radius < 0: raise ValueError("Blur radius must be >= 0") return self._add_operation("blur", radius=radius)
[docs] def brightness(self, factor: float) -> 'Video': """ Adjust video brightness. Args: factor: Brightness factor (1.0 = original, 1.5 = +50%, 0.5 = -50%) Returns: Self for method chaining Example: >>> video.brightness(1.2) # Increase brightness by 20% """ if factor < 0: raise ValueError("Brightness factor must be >= 0") return self._add_operation("brightness", factor=factor)
[docs] def contrast(self, factor: float) -> 'Video': """ Adjust video contrast. Args: factor: Contrast factor (1.0 = original, 1.5 = +50%, 0.5 = -50%) Returns: Self for method chaining Example: >>> video.contrast(1.2) # Increase contrast by 20% """ if factor <= 0: raise ValueError("Contrast factor must be > 0") return self._add_operation("contrast", factor=factor) """ Adjust video contrast. Args: factor: Contrast factor (1.0 = original, 1.5 = +50%, 0.5 = -50%) Returns: Self for method chaining Example: >>> video.contrast(1.2) # Increase contrast by 20% """ return self._add_operation("contrast", factor=factor)
[docs] def mute(self) -> 'Video': """ Remove audio track. Returns: Self for method chaining Example: >>> video.mute() # Remove audio """ return self._add_operation("mute")
[docs] def volume(self, level: float) -> 'Video': """ Set audio volume. Args: level: Volume level (1.0 = original, 0.5 = 50%, 2.0 = 200%) Returns: Self for method chaining Example: >>> video.volume(0.8) # Reduce volume to 80% """ if level < 0: raise ValueError("Volume level must be >= 0") return self._add_operation("volume", level=level)
[docs] def add_audio(self, audio_path: Union[str, Path], start: float = 0, volume: float = 1.0) -> 'Video': """ Add audio track to the video. Args: audio_path: Path to audio file start: Start time for the audio (seconds) volume: Volume level (1.0 = original) Returns: Self for method chaining Example: >>> video.add_audio("music.mp3", start=5, volume=0.8) """ return self._add_operation("add_audio", audio_path=audio_path, start=start, volume=volume)
[docs] def add_subtitles(self, subtitle_path: Union[str, Path], font: str = "Arial", size: int = 24, color: Union[str, Tuple[int, int, int]] = "white", position: Optional[Tuple[int, int]] = None) -> 'Video': """ Add subtitles to the video. Args: subtitle_path: Path to subtitle file (.srt or .vtt) font: Font name or path size: Font size in pixels color: Text color (name or RGB tuple) position: Optional (x, y) position (None for bottom center) Returns: Self for method chaining Example: >>> video.add_subtitles("subtitles.srt") >>> video.add_subtitles("subtitles.vtt", font="DejaVuSans.ttf", size=32) """ subtitle_path = Path(subtitle_path) format = subtitle_path.suffix.lower().lstrip('.') try: entries = SubtitleParser.parse(subtitle_path, format) return self._add_operation("add_subtitles", entries=entries, font=font, size=size, color=color, position=position) except Exception as e: logger.error(f"Failed to add subtitles: {e}") raise
[docs] def add_subtitle_text(self, text: str, start_time: float, end_time: float, font: str = "Arial", size: int = 24, color: Union[str, Tuple[int, int, int]] = "white", position: Optional[Tuple[int, int]] = None) -> 'Video': """ Add a single subtitle entry directly to the video. Args: text: Subtitle text start_time: Start time in seconds end_time: End time in seconds font: Font name or path size: Font size in pixels color: Text color (name or RGB tuple) position: Optional (x, y) position (None for bottom center) Returns: Self for method chaining Example: >>> video.add_subtitle_text("Hello world", 5.0, 10.0) """ entry = SubtitleEntry(start_time=start_time, end_time=end_time, text=text, position=position) return self._add_operation("add_subtitle_text", entry=entry, font=font, size=size, color=color)
[docs] def extract_frame(self, time: float): """ Extract a single frame at specified time. Args: time: Time in seconds Returns: Frame object Example: >>> frame = video.extract_frame(10.5) >>> frame.save("thumbnail.jpg") """ self._ensure_loaded() result = self._apply_operations() from .frame import Frame frame_data = result._backend.extract_frame(result._video, time) return Frame(frame_data, backend=result._backend)
[docs] def get_frame(self, time: float): """ Extract a single frame at specified time (alias for extract_frame). Args: time: Time in seconds Returns: Frame object (numpy array) Example: >>> frame = video.get_frame(1.0) >>> frame.shape # (height, width, channels) """ self._ensure_loaded() result = self._apply_operations() frame_data = result._backend.extract_frame(result._video, time) return np.array(frame_data)
[docs] def sharpen(self, amount: float = 1.5) -> 'Video': """ Sharpen video (not yet implemented). Args: amount: Sharpening amount Returns: Self for method chaining """ raise NotImplementedError("Sharpen effect is not yet implemented")
[docs] def denoise(self, strength: float = 0.5) -> 'Video': """ Denoise video (not yet implemented). Args: strength: Denoising strength Returns: Self for method chaining """ raise NotImplementedError("Denoise effect is not yet implemented")
[docs] def animate(self, prop: str, start: float, end: float, duration: float) -> 'Video': """ Animate a property (not yet implemented). Args: prop: Property to animate start: Start value end: End value duration: Animation duration Returns: Self for method chaining """ raise NotImplementedError("Animation effects are not yet implemented")
[docs] def overlay(self, video: 'Video', position: Tuple[int, int] = (0, 0), start: float = 0, duration: Optional[float] = None, opacity: float = 1.0) -> 'Video': """ Overlay another video (picture-in-picture). Args: video: Video to overlay position: (x, y) coordinates for placement start: Start time in seconds duration: Duration to display overlay (None = full duration) opacity: Opacity of overlay (1.0 = fully opaque) Returns: Self for method chaining Example: >>> main_video.overlay(pip_video, position=(50, 50), opacity=0.8) """ # Process operations on overlay video first overlay_result = video._apply_operations() return self._add_operation( "overlay", overlay_video=overlay_result._video, position=position, start=start, duration=duration, opacity=opacity )
[docs] @classmethod def create(cls, width: int, height: int, duration: float, fps: float = 30, color: Tuple[int, int, int] = (0, 0, 0), backend: Union[str, Backend] = "auto", **kwargs) -> 'Video': """ Create a blank video canvas. Args: width: Width in pixels height: Height in pixels duration: Duration in seconds fps: Frames per second color: Background color as RGB tuple backend: Backend to use **kwargs: Additional options Returns: Video object Example: >>> canvas = Video.create(1920, 1080, 10) # 10 second blank video """ video = cls(backend=backend) video._backend = get_backend(backend) video._video = video._backend.create(width, height, duration, fps, color, **kwargs) return video
[docs] @classmethod def concat(cls, videos: List['Video'], backend: Union[str, Backend] = "auto", **kwargs) -> 'Video': """ Concatenate multiple videos. Args: videos: List of Video objects backend: Backend to use **kwargs: Additional options Returns: New concatenated Video object Example: >>> combined = Video.concat([video1, video2, video3]) """ if not videos: raise ValueError("No videos provided for concatenation") # Process operations on all videos processed_videos = [v._apply_operations() for v in videos] # Use the backend from the first video if not specified if backend == "auto" and videos: backend = processed_videos[0]._backend_name # Create a new Video object result = cls(backend=backend) result._backend = get_backend(backend) # Concatenate the videos result._video = result._backend.concat([v._video for v in processed_videos], **kwargs) return result
[docs] @classmethod def grid(cls, videos: List['Video'], rows: int, cols: int, backend: Union[str, Backend] = "auto", **kwargs) -> 'Video': """ Arrange videos in a grid layout. Args: videos: List of Video objects rows: Number of rows cols: Number of columns backend: Backend to use **kwargs: Additional options Returns: New grid Video object Example: >>> grid_video = Video.grid([video1, video2, video3, video4], rows=2, cols=2) """ if not videos: raise ValueError("No videos provided for grid layout") if len(videos) > rows * cols: logger.warning(f"More videos ({len(videos)}) than grid cells ({rows}x{cols}). " f"Extra videos will be ignored.") # Process operations on all videos processed_videos = [v._apply_operations() for v in videos] # Use the backend from the first video if not specified if backend == "auto" and videos: backend = processed_videos[0]._backend_name # Create a new Video object result = cls(backend=backend) result._backend = get_backend(backend) # Create the grid result._video = result._backend.grid([v._video for v in processed_videos], rows, cols, **kwargs) return result
[docs] def speed(self, factor: float) -> 'Video': """ Change video playback speed. Args: factor: Speed factor (2.0 = 2x faster, 0.5 = half speed) Returns: Self for method chaining Example: >>> video.speed(2.0) # 2x speed """ return self._add_operation("speed", factor=factor)
[docs] def saturation(self, factor: float) -> 'Video': """ Adjust video saturation. Args: factor: Saturation factor (1.0 = original, 1.5 = +50%, 0.5 = -50%, 0 = grayscale) Returns: Self for method chaining Example: >>> video.saturation(1.5) # Increase saturation """ return self._add_operation("saturation", factor=factor)
[docs] def fade_in(self, duration: float) -> 'Video': """ Fade video in from black. Args: duration: Fade duration in seconds Returns: Self for method chaining Example: >>> video.fade_in(1.0) # 1 second fade in """ return self._add_operation("fade_in", duration=duration)
[docs] def fade_out(self, duration: float) -> 'Video': """ Fade video out to black. Args: duration: Fade duration in seconds Returns: Self for method chaining Example: >>> video.fade_out(1.0) # 1 second fade out """ return self._add_operation("fade_out", duration=duration)
[docs] def reverse(self) -> 'Video': """ Reverse video playback. Returns: Self for method chaining Example: >>> video.reverse() """ return self._add_operation("reverse")
[docs] def chroma_key(self, color: Tuple[int, int, int], similarity: float = 0.1, blend: float = 0.0) -> 'Video': """ Remove green screen / chroma key. Args: color: RGB color to key out (e.g., (0, 255, 0) for green screen) similarity: Color similarity threshold (0.01 to 1.0) blend: Blend factor for semi-transparent edges (0.0 to 1.0) Returns: Self for method chaining Example: >>> video.chroma_key((0, 255, 0)) # Remove green screen """ return self._add_operation("chroma_key", color=color, similarity=similarity, blend=blend)
[docs] def crossfade(self, other_video: 'Video', duration: float) -> 'Video': """ Crossfade between this video and another. Args: other_video: Video to crossfade to duration: Crossfade duration in seconds Returns: Self for method chaining Example: >>> video1.crossfade(video2, 1.0) """ other_result = other_video._apply_operations() return self._add_operation("crossfade", video2=other_result._video, duration=duration)
[docs] def fade_audio_in(self, duration: float) -> 'Video': """ Fade audio in. Args: duration: Fade duration in seconds Returns: Self for method chaining Example: >>> video.fade_audio_in(1.0) """ return self._add_operation("fade_audio_in", duration=duration)
[docs] def fade_audio_out(self, duration: float) -> 'Video': """ Fade audio out. Args: duration: Fade duration in seconds Returns: Self for method chaining Example: >>> video.fade_audio_out(1.0) """ return self._add_operation("fade_audio_out", duration=duration)
[docs] def normalize_audio(self, target_db: float = -16.0) -> 'Video': """ Normalize audio to target dB level. Args: target_db: Target loudness in dB (typically -16 to -20) Returns: Self for method chaining Example: >>> video.normalize_audio(-16.0) """ return self._add_operation("normalize_audio", target_db=target_db)
[docs] def replace_audio(self, audio_path: Union[str, Path]) -> 'Video': """ Replace audio track with new audio. Args: audio_path: Path to new audio file Returns: Self for method chaining Example: >>> video.replace_audio("new_audio.mp3") """ return self._add_operation("replace_audio", audio_path=audio_path)
[docs] def to_gif(self, path: Union[str, Path], fps: Optional[float] = None, quality: int = 95, loop: int = 0, width: Optional[int] = None) -> None: """ Export video as animated GIF. Args: path: Output GIF path fps: Frame rate for GIF (None = use original, capped at 15) quality: Quality (1-100, higher = better) loop: Loop count (0 = infinite) width: Resize width (None = keep original) Example: >>> video.to_gif("output.gif", fps=10, width=320) """ result = self._apply_operations() result._backend.export_gif(result._video, path, fps, quality, loop, width)
[docs] def to_image_sequence(self, output_dir: Union[str, Path], prefix: str = "frame_", format: str = "png", start: Optional[float] = None, end: Optional[float] = None, fps: Optional[float] = None) -> List[Path]: """ Export video frames as images. Args: output_dir: Output directory path prefix: Filename prefix format: Image format (png, jpg, etc.) start: Start time in seconds (None = from beginning) end: End time in seconds (None = to end) fps: Frame rate for extraction (None = use original fps) Returns: List of exported frame paths Example: >>> frames = video.to_image_sequence("frames/", prefix="img_", format="jpg") """ result = self._apply_operations() return result._backend.export_frames(result._video, output_dir, prefix, format, start, end, fps)
[docs] def add_text(self, text: str, position: Union[Tuple[int, int], str] = "center", font: str = "Arial", size: int = 32, color: Union[str, Tuple[int, int, int]] = "white", duration: Optional[float] = None, start_time: float = 0) -> 'Video': """ Add text overlay to the video. Args: text: Text to display position: Position as (x, y) tuple or preset ("center", "top", "bottom", etc.) font: Font name or path to font file size: Font size in pixels color: Text color (name or RGB tuple) duration: Duration to display text (None = entire video) start_time: When to start displaying text Returns: Self for method chaining Example: >>> video.add_text("Hello World", position="center", size=48) """ from ..operations.text import TextOverlay overlay = TextOverlay( text=text, position=position, font=font, size=size, color=color, duration=duration, start_time=start_time ) return self._add_operation("add_text", overlay=overlay)
[docs] def add_image(self, image_path: Union[str, Path], position: Tuple[int, int] = (0, 0), opacity: float = 1.0, scale: float = 1.0, duration: Optional[float] = None, start_time: float = 0) -> 'Video': """ Add image overlay to the video. Args: image_path: Path to image file position: (x, y) coordinates for placement opacity: Opacity of overlay (0.0 to 1.0) scale: Scale factor for image duration: Duration to display image (None = entire video) start_time: When to start displaying image Returns: Self for method chaining Example: >>> video.add_image("logo.png", position=(10, 10), opacity=0.8) """ from ..operations.text import GraphicsOverlay overlay = GraphicsOverlay( image_path=image_path, position=position, opacity=opacity, scale=scale, duration=duration, start_time=start_time ) return self._add_operation("add_image", overlay=overlay)
[docs] def apply_filter(self, filter_string: str) -> 'Video': """ Apply custom FFmpeg filter string. Args: filter_string: FFmpeg filter string Returns: Self for method chaining Example: >>> video.apply_filter("negate") # Invert colors """ return self._add_operation("apply_filter", filter=filter_string)
[docs] def detect_scenes(self, threshold: float = 30.0, min_scene_length: int = 15) -> List[Any]: """ Detect scene changes in the video. Args: threshold: Difference threshold for scene detection min_scene_length: Minimum frames between scene changes Returns: List of scene change objects Example: >>> scenes = video.detect_scenes() """ from ..operations.analysis import SceneDetector result = self._apply_operations() detector = SceneDetector(threshold=threshold, min_scene_length=min_scene_length) return detector.detect_from_video_object(result)
[docs] def analyze_audio(self) -> Any: """ Analyze audio content of the video. Returns: Audio analysis results Example: >>> analysis = video.analyze_audio() """ from ..operations.analysis import AudioAnalyzer result = self._apply_operations() analyzer = AudioAnalyzer() # Extract audio first import tempfile import subprocess audio_path = os.path.join(tempfile.gettempdir(), "temp_audio.wav") subprocess.run([ "ffmpeg", "-i", result._video["path"], "-q:a", "0", "-map", "a", "-y", audio_path ], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # Load and analyze import librosa y, sr = librosa.load(audio_path) # Calculate expected format rms = librosa.feature.rms(y=y)[0] peak = np.max(np.abs(y)) # Get tempo tempo, _ = librosa.beat.beat_track(y=y, sr=sr) return { "volume_rms": float(np.mean(rms)), "volume_peak": float(peak), "tempo": float(tempo) }
[docs] def stream_to(self, rtmp_url: str, **kwargs) -> None: """ Stream video to RTMP server. Args: rtmp_url: RTMP stream URL **kwargs: Additional streaming options Example: >>> video.stream_to("rtmp://live.example.com/stream/key") """ result = self._apply_operations() result._backend.save(result._video, rtmp_url, **kwargs)
[docs] def __repr__(self) -> str: """String representation of the Video object.""" if self._video is None: return "Video(not loaded)" try: info = self.info() return f"Video({info.width}x{info.height}, {info.duration:.2f}s, operations: {len(self._operations)})" except Exception as e: return f"Video(error getting info: {e})"