"""
Core Video class for FMUS-VID.
This module provides the main Video class that users interact with.
"""
from typing import Union, List, Tuple, Optional, Dict, Any, Callable
from pathlib import Path
import asyncio
import logging
from dataclasses import dataclass
import os
import numpy as np
from ..backends.manager import get_backend, Backend
from ..operations.subtitle import SubtitleParser, SubtitleEntry, SubtitleRenderer
logger = logging.getLogger(__name__)
@dataclass
class VideoInfo:
"""Video information container."""
width: int
height: int
duration: float
fps: float
has_audio: bool
codec: str
audio_codec: Optional[str] = None
bitrate: Optional[str] = None
def __str__(self) -> str:
"""String representation of video info."""
return (f"{self.width}x{self.height}, {self.duration:.2f}s, {self.fps} fps, "
f"codec: {self.codec}" +
(f", audio: {self.audio_codec}" if self.has_audio else ", no audio"))
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for API compatibility."""
return {
"width": self.width,
"height": self.height,
"duration": self.duration,
"fps": self.fps,
"has_audio": self.has_audio,
"codec": self.codec,
"audio_codec": self.audio_codec,
"bitrate": self.bitrate
}
[docs]
class Video:
"""
Main Video class for FMUS-VID.
This class provides a unified interface for all video operations.
"""
[docs]
def __init__(self, path: Optional[Union[str, Path]] = None,
backend: Union[str, Backend] = "auto", **kwargs):
"""
Initialize a Video object.
Args:
path: Path to the video file (None for blank video)
backend: Backend to use
**kwargs: Additional options
"""
self._path = Path(path) if path else None
self._backend_name = backend
self._backend = None
self._video = None
self._operations = []
# Load the video if path is provided
if self._path:
self._load_video(**kwargs)
def _load_video(self, **kwargs):
"""Load the video using the selected backend."""
self._backend = get_backend(self._backend_name)
self._video = self._backend.load(self._path, **kwargs)
def _ensure_loaded(self):
"""Ensure the video is loaded."""
if self._video is None:
raise ValueError("No video loaded. Use fmusvid.load() or fmusvid.create() first.")
def _add_operation(self, name: str, **kwargs):
"""
Add an operation to the queue.
Args:
name: Operation name
**kwargs: Operation parameters
Returns:
Self for method chaining
"""
self._operations.append((name, kwargs))
return self
def _apply_operations(self):
"""
Apply all queued operations.
Returns:
New Video object with operations applied
"""
self._ensure_loaded()
# Create a new Video object
result = Video()
result._backend = self._backend
result._backend_name = self._backend_name
result._video = self._video
# Apply each operation
for name, kwargs in self._operations:
if not hasattr(self._backend, name):
raise ValueError(f"Operation {name} not supported by backend {self._backend_name}")
operation = getattr(self._backend, name)
result._video = operation(result._video, **kwargs)
return result
[docs]
def save(self, path: Union[str, Path], **kwargs) -> None:
"""
Save video to file.
Args:
path: Output file path
**kwargs: Export options (codec, bitrate, etc.)
Example:
>>> video.save("output.mp4")
>>> video.save("output.mp4", codec="h264", bitrate="5M")
"""
# Apply all operations
result = self._apply_operations()
# Save the video
result._backend.save(result._video, path, **kwargs)
return self
[docs]
async def save_async(self, path: Union[str, Path],
progress_callback: Optional[Callable[[float], None]] = None,
**kwargs) -> None:
"""
Save video asynchronously.
Args:
path: Output file path
progress_callback: Function to call with progress (0-1)
**kwargs: Export options (codec, bitrate, etc.)
Example:
>>> await video.save_async("output.mp4",
... progress_callback=lambda p: print(f"{p*100:.1f}%"))
"""
# Apply all operations
result = self._apply_operations()
# Save the video in a thread pool
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
lambda: result._backend.save(
result._video,
path,
progress_callback=progress_callback,
**kwargs
)
)
return self
[docs]
def info(self) -> VideoInfo:
"""
Get video information.
Returns:
VideoInfo object
Example:
>>> info = video.info()
>>> print(f"Resolution: {info.width}x{info.height}")
"""
self._ensure_loaded()
info_dict = self._backend.get_info(self._video)
return VideoInfo(
width=info_dict.get("width", 0),
height=info_dict.get("height", 0),
duration=info_dict.get("duration", 0.0),
fps=info_dict.get("fps", 0.0),
has_audio=info_dict.get("has_audio", False),
codec=info_dict.get("codec", "unknown"),
audio_codec=info_dict.get("audio_codec"),
bitrate=info_dict.get("bitrate")
)
[docs]
def get_info(self) -> Dict[str, Any]:
"""
Get video information as a dictionary.
For CLI compatibility.
Returns:
Dictionary of video information
Example:
>>> info_dict = video.get_info()
>>> print(f"Resolution: {info_dict['width']}x{info_dict['height']}")
"""
return self._get_result_info().to_dict()
def _get_result_info(self) -> VideoInfo:
"""Get info with operations applied."""
if self._operations:
result = self._apply_operations()
return result.info()
return self.info()
@property
def duration(self) -> float:
"""Get video duration in seconds."""
return self._get_result_info().duration
@property
def width(self) -> int:
"""Get video width in pixels."""
return self._get_result_info().width
@property
def height(self) -> int:
"""Get video height in pixels."""
return self._get_result_info().height
@property
def fps(self) -> float:
"""Get video frame rate."""
return self._get_result_info().fps
@property
def bitrate(self) -> Optional[str]:
"""Get video bitrate."""
return self._get_result_info().bitrate
@property
def has_audio(self) -> bool:
"""Check if video has audio track."""
return self._get_result_info().has_audio
[docs]
def trim(self, start: float, end: Optional[float] = None) -> 'Video':
"""
Trim video to specified time range.
Args:
start: Start time in seconds
end: End time in seconds (None means until the end)
Returns:
Self for method chaining
Example:
>>> video.trim(10, 20) # Keep only seconds 10-20
"""
return self._add_operation("trim", start=start, end=end)
[docs]
def resize(self, width: Optional[int] = None, height: Optional[int] = None,
keep_aspect: bool = True) -> 'Video':
"""
Resize video to specified dimensions.
Args:
width: Target width (None to auto-calculate from height)
height: Target height (None to auto-calculate from width)
keep_aspect: Maintain aspect ratio if only one dimension is specified
Returns:
Self for method chaining
Example:
>>> video.resize(720) # Resize to 720p (height auto-calculated)
>>> video.resize(width=1280, height=720) # Resize to 720p
"""
return self._add_operation("resize", width=width, height=height, keep_aspect=keep_aspect)
[docs]
def crop(self, x: int, y: int, width: int, height: int) -> 'Video':
"""
Crop video to specified region.
Args:
x: X coordinate of top-left corner
y: Y coordinate of top-left corner
width: Width of crop region
height: Height of crop region
Returns:
Self for method chaining
Example:
>>> video.crop(100, 100, 500, 300) # Crop to 500x300 region
"""
return self._add_operation("crop", x=x, y=y, width=width, height=height)
[docs]
def rotate(self, degrees: float) -> 'Video':
"""
Rotate video by specified degrees.
Args:
degrees: Rotation angle in degrees
Returns:
Self for method chaining
Example:
>>> video.rotate(90) # Rotate 90 degrees clockwise
"""
return self._add_operation("rotate", degrees=degrees)
[docs]
def grayscale(self) -> 'Video':
"""
Convert video to grayscale.
Returns:
Self for method chaining
Example:
>>> video.grayscale() # Convert to black and white
"""
return self._add_operation("grayscale")
[docs]
def blur(self, radius: float) -> 'Video':
"""
Apply Gaussian blur with specified radius.
Args:
radius: Blur radius
Returns:
Self for method chaining
Example:
>>> video.blur(5) # Apply blur with radius 5
"""
if radius < 0:
raise ValueError("Blur radius must be >= 0")
return self._add_operation("blur", radius=radius)
[docs]
def brightness(self, factor: float) -> 'Video':
"""
Adjust video brightness.
Args:
factor: Brightness factor (1.0 = original, 1.5 = +50%, 0.5 = -50%)
Returns:
Self for method chaining
Example:
>>> video.brightness(1.2) # Increase brightness by 20%
"""
if factor < 0:
raise ValueError("Brightness factor must be >= 0")
return self._add_operation("brightness", factor=factor)
[docs]
def contrast(self, factor: float) -> 'Video':
"""
Adjust video contrast.
Args:
factor: Contrast factor (1.0 = original, 1.5 = +50%, 0.5 = -50%)
Returns:
Self for method chaining
Example:
>>> video.contrast(1.2) # Increase contrast by 20%
"""
if factor <= 0:
raise ValueError("Contrast factor must be > 0")
return self._add_operation("contrast", factor=factor)
"""
Adjust video contrast.
Args:
factor: Contrast factor (1.0 = original, 1.5 = +50%, 0.5 = -50%)
Returns:
Self for method chaining
Example:
>>> video.contrast(1.2) # Increase contrast by 20%
"""
return self._add_operation("contrast", factor=factor)
[docs]
def mute(self) -> 'Video':
"""
Remove audio track.
Returns:
Self for method chaining
Example:
>>> video.mute() # Remove audio
"""
return self._add_operation("mute")
[docs]
def volume(self, level: float) -> 'Video':
"""
Set audio volume.
Args:
level: Volume level (1.0 = original, 0.5 = 50%, 2.0 = 200%)
Returns:
Self for method chaining
Example:
>>> video.volume(0.8) # Reduce volume to 80%
"""
if level < 0:
raise ValueError("Volume level must be >= 0")
return self._add_operation("volume", level=level)
[docs]
def add_audio(self, audio_path: Union[str, Path], start: float = 0,
volume: float = 1.0) -> 'Video':
"""
Add audio track to the video.
Args:
audio_path: Path to audio file
start: Start time for the audio (seconds)
volume: Volume level (1.0 = original)
Returns:
Self for method chaining
Example:
>>> video.add_audio("music.mp3", start=5, volume=0.8)
"""
return self._add_operation("add_audio", audio_path=audio_path, start=start, volume=volume)
[docs]
def add_subtitles(self, subtitle_path: Union[str, Path],
font: str = "Arial", size: int = 24,
color: Union[str, Tuple[int, int, int]] = "white",
position: Optional[Tuple[int, int]] = None) -> 'Video':
"""
Add subtitles to the video.
Args:
subtitle_path: Path to subtitle file (.srt or .vtt)
font: Font name or path
size: Font size in pixels
color: Text color (name or RGB tuple)
position: Optional (x, y) position (None for bottom center)
Returns:
Self for method chaining
Example:
>>> video.add_subtitles("subtitles.srt")
>>> video.add_subtitles("subtitles.vtt", font="DejaVuSans.ttf", size=32)
"""
subtitle_path = Path(subtitle_path)
format = subtitle_path.suffix.lower().lstrip('.')
try:
entries = SubtitleParser.parse(subtitle_path, format)
return self._add_operation("add_subtitles", entries=entries, font=font,
size=size, color=color, position=position)
except Exception as e:
logger.error(f"Failed to add subtitles: {e}")
raise
[docs]
def add_subtitle_text(self, text: str, start_time: float, end_time: float,
font: str = "Arial", size: int = 24,
color: Union[str, Tuple[int, int, int]] = "white",
position: Optional[Tuple[int, int]] = None) -> 'Video':
"""
Add a single subtitle entry directly to the video.
Args:
text: Subtitle text
start_time: Start time in seconds
end_time: End time in seconds
font: Font name or path
size: Font size in pixels
color: Text color (name or RGB tuple)
position: Optional (x, y) position (None for bottom center)
Returns:
Self for method chaining
Example:
>>> video.add_subtitle_text("Hello world", 5.0, 10.0)
"""
entry = SubtitleEntry(start_time=start_time, end_time=end_time,
text=text, position=position)
return self._add_operation("add_subtitle_text", entry=entry, font=font,
size=size, color=color)
[docs]
def get_frame(self, time: float):
"""
Extract a single frame at specified time (alias for extract_frame).
Args:
time: Time in seconds
Returns:
Frame object (numpy array)
Example:
>>> frame = video.get_frame(1.0)
>>> frame.shape # (height, width, channels)
"""
self._ensure_loaded()
result = self._apply_operations()
frame_data = result._backend.extract_frame(result._video, time)
return np.array(frame_data)
[docs]
def sharpen(self, amount: float = 1.5) -> 'Video':
"""
Sharpen video (not yet implemented).
Args:
amount: Sharpening amount
Returns:
Self for method chaining
"""
raise NotImplementedError("Sharpen effect is not yet implemented")
[docs]
def denoise(self, strength: float = 0.5) -> 'Video':
"""
Denoise video (not yet implemented).
Args:
strength: Denoising strength
Returns:
Self for method chaining
"""
raise NotImplementedError("Denoise effect is not yet implemented")
[docs]
def animate(self, prop: str, start: float, end: float,
duration: float) -> 'Video':
"""
Animate a property (not yet implemented).
Args:
prop: Property to animate
start: Start value
end: End value
duration: Animation duration
Returns:
Self for method chaining
"""
raise NotImplementedError("Animation effects are not yet implemented")
[docs]
def overlay(self, video: 'Video', position: Tuple[int, int] = (0, 0),
start: float = 0, duration: Optional[float] = None,
opacity: float = 1.0) -> 'Video':
"""
Overlay another video (picture-in-picture).
Args:
video: Video to overlay
position: (x, y) coordinates for placement
start: Start time in seconds
duration: Duration to display overlay (None = full duration)
opacity: Opacity of overlay (1.0 = fully opaque)
Returns:
Self for method chaining
Example:
>>> main_video.overlay(pip_video, position=(50, 50), opacity=0.8)
"""
# Process operations on overlay video first
overlay_result = video._apply_operations()
return self._add_operation(
"overlay",
overlay_video=overlay_result._video,
position=position,
start=start,
duration=duration,
opacity=opacity
)
[docs]
@classmethod
def create(cls, width: int, height: int, duration: float, fps: float = 30,
color: Tuple[int, int, int] = (0, 0, 0),
backend: Union[str, Backend] = "auto", **kwargs) -> 'Video':
"""
Create a blank video canvas.
Args:
width: Width in pixels
height: Height in pixels
duration: Duration in seconds
fps: Frames per second
color: Background color as RGB tuple
backend: Backend to use
**kwargs: Additional options
Returns:
Video object
Example:
>>> canvas = Video.create(1920, 1080, 10) # 10 second blank video
"""
video = cls(backend=backend)
video._backend = get_backend(backend)
video._video = video._backend.create(width, height, duration, fps, color, **kwargs)
return video
[docs]
@classmethod
def concat(cls, videos: List['Video'], backend: Union[str, Backend] = "auto", **kwargs) -> 'Video':
"""
Concatenate multiple videos.
Args:
videos: List of Video objects
backend: Backend to use
**kwargs: Additional options
Returns:
New concatenated Video object
Example:
>>> combined = Video.concat([video1, video2, video3])
"""
if not videos:
raise ValueError("No videos provided for concatenation")
# Process operations on all videos
processed_videos = [v._apply_operations() for v in videos]
# Use the backend from the first video if not specified
if backend == "auto" and videos:
backend = processed_videos[0]._backend_name
# Create a new Video object
result = cls(backend=backend)
result._backend = get_backend(backend)
# Concatenate the videos
result._video = result._backend.concat([v._video for v in processed_videos], **kwargs)
return result
[docs]
@classmethod
def grid(cls, videos: List['Video'], rows: int, cols: int,
backend: Union[str, Backend] = "auto", **kwargs) -> 'Video':
"""
Arrange videos in a grid layout.
Args:
videos: List of Video objects
rows: Number of rows
cols: Number of columns
backend: Backend to use
**kwargs: Additional options
Returns:
New grid Video object
Example:
>>> grid_video = Video.grid([video1, video2, video3, video4], rows=2, cols=2)
"""
if not videos:
raise ValueError("No videos provided for grid layout")
if len(videos) > rows * cols:
logger.warning(f"More videos ({len(videos)}) than grid cells ({rows}x{cols}). "
f"Extra videos will be ignored.")
# Process operations on all videos
processed_videos = [v._apply_operations() for v in videos]
# Use the backend from the first video if not specified
if backend == "auto" and videos:
backend = processed_videos[0]._backend_name
# Create a new Video object
result = cls(backend=backend)
result._backend = get_backend(backend)
# Create the grid
result._video = result._backend.grid([v._video for v in processed_videos], rows, cols, **kwargs)
return result
[docs]
def speed(self, factor: float) -> 'Video':
"""
Change video playback speed.
Args:
factor: Speed factor (2.0 = 2x faster, 0.5 = half speed)
Returns:
Self for method chaining
Example:
>>> video.speed(2.0) # 2x speed
"""
return self._add_operation("speed", factor=factor)
[docs]
def saturation(self, factor: float) -> 'Video':
"""
Adjust video saturation.
Args:
factor: Saturation factor (1.0 = original, 1.5 = +50%, 0.5 = -50%, 0 = grayscale)
Returns:
Self for method chaining
Example:
>>> video.saturation(1.5) # Increase saturation
"""
return self._add_operation("saturation", factor=factor)
[docs]
def fade_in(self, duration: float) -> 'Video':
"""
Fade video in from black.
Args:
duration: Fade duration in seconds
Returns:
Self for method chaining
Example:
>>> video.fade_in(1.0) # 1 second fade in
"""
return self._add_operation("fade_in", duration=duration)
[docs]
def fade_out(self, duration: float) -> 'Video':
"""
Fade video out to black.
Args:
duration: Fade duration in seconds
Returns:
Self for method chaining
Example:
>>> video.fade_out(1.0) # 1 second fade out
"""
return self._add_operation("fade_out", duration=duration)
[docs]
def reverse(self) -> 'Video':
"""
Reverse video playback.
Returns:
Self for method chaining
Example:
>>> video.reverse()
"""
return self._add_operation("reverse")
[docs]
def chroma_key(self, color: Tuple[int, int, int],
similarity: float = 0.1, blend: float = 0.0) -> 'Video':
"""
Remove green screen / chroma key.
Args:
color: RGB color to key out (e.g., (0, 255, 0) for green screen)
similarity: Color similarity threshold (0.01 to 1.0)
blend: Blend factor for semi-transparent edges (0.0 to 1.0)
Returns:
Self for method chaining
Example:
>>> video.chroma_key((0, 255, 0)) # Remove green screen
"""
return self._add_operation("chroma_key", color=color, similarity=similarity, blend=blend)
[docs]
def crossfade(self, other_video: 'Video', duration: float) -> 'Video':
"""
Crossfade between this video and another.
Args:
other_video: Video to crossfade to
duration: Crossfade duration in seconds
Returns:
Self for method chaining
Example:
>>> video1.crossfade(video2, 1.0)
"""
other_result = other_video._apply_operations()
return self._add_operation("crossfade", video2=other_result._video, duration=duration)
[docs]
def fade_audio_in(self, duration: float) -> 'Video':
"""
Fade audio in.
Args:
duration: Fade duration in seconds
Returns:
Self for method chaining
Example:
>>> video.fade_audio_in(1.0)
"""
return self._add_operation("fade_audio_in", duration=duration)
[docs]
def fade_audio_out(self, duration: float) -> 'Video':
"""
Fade audio out.
Args:
duration: Fade duration in seconds
Returns:
Self for method chaining
Example:
>>> video.fade_audio_out(1.0)
"""
return self._add_operation("fade_audio_out", duration=duration)
[docs]
def normalize_audio(self, target_db: float = -16.0) -> 'Video':
"""
Normalize audio to target dB level.
Args:
target_db: Target loudness in dB (typically -16 to -20)
Returns:
Self for method chaining
Example:
>>> video.normalize_audio(-16.0)
"""
return self._add_operation("normalize_audio", target_db=target_db)
[docs]
def replace_audio(self, audio_path: Union[str, Path]) -> 'Video':
"""
Replace audio track with new audio.
Args:
audio_path: Path to new audio file
Returns:
Self for method chaining
Example:
>>> video.replace_audio("new_audio.mp3")
"""
return self._add_operation("replace_audio", audio_path=audio_path)
[docs]
def to_gif(self, path: Union[str, Path], fps: Optional[float] = None,
quality: int = 95, loop: int = 0, width: Optional[int] = None) -> None:
"""
Export video as animated GIF.
Args:
path: Output GIF path
fps: Frame rate for GIF (None = use original, capped at 15)
quality: Quality (1-100, higher = better)
loop: Loop count (0 = infinite)
width: Resize width (None = keep original)
Example:
>>> video.to_gif("output.gif", fps=10, width=320)
"""
result = self._apply_operations()
result._backend.export_gif(result._video, path, fps, quality, loop, width)
[docs]
def to_image_sequence(self, output_dir: Union[str, Path], prefix: str = "frame_",
format: str = "png", start: Optional[float] = None,
end: Optional[float] = None, fps: Optional[float] = None) -> List[Path]:
"""
Export video frames as images.
Args:
output_dir: Output directory path
prefix: Filename prefix
format: Image format (png, jpg, etc.)
start: Start time in seconds (None = from beginning)
end: End time in seconds (None = to end)
fps: Frame rate for extraction (None = use original fps)
Returns:
List of exported frame paths
Example:
>>> frames = video.to_image_sequence("frames/", prefix="img_", format="jpg")
"""
result = self._apply_operations()
return result._backend.export_frames(result._video, output_dir, prefix, format, start, end, fps)
[docs]
def add_text(self, text: str, position: Union[Tuple[int, int], str] = "center",
font: str = "Arial", size: int = 32,
color: Union[str, Tuple[int, int, int]] = "white",
duration: Optional[float] = None,
start_time: float = 0) -> 'Video':
"""
Add text overlay to the video.
Args:
text: Text to display
position: Position as (x, y) tuple or preset ("center", "top", "bottom", etc.)
font: Font name or path to font file
size: Font size in pixels
color: Text color (name or RGB tuple)
duration: Duration to display text (None = entire video)
start_time: When to start displaying text
Returns:
Self for method chaining
Example:
>>> video.add_text("Hello World", position="center", size=48)
"""
from ..operations.text import TextOverlay
overlay = TextOverlay(
text=text,
position=position,
font=font,
size=size,
color=color,
duration=duration,
start_time=start_time
)
return self._add_operation("add_text", overlay=overlay)
[docs]
def add_image(self, image_path: Union[str, Path],
position: Tuple[int, int] = (0, 0),
opacity: float = 1.0,
scale: float = 1.0,
duration: Optional[float] = None,
start_time: float = 0) -> 'Video':
"""
Add image overlay to the video.
Args:
image_path: Path to image file
position: (x, y) coordinates for placement
opacity: Opacity of overlay (0.0 to 1.0)
scale: Scale factor for image
duration: Duration to display image (None = entire video)
start_time: When to start displaying image
Returns:
Self for method chaining
Example:
>>> video.add_image("logo.png", position=(10, 10), opacity=0.8)
"""
from ..operations.text import GraphicsOverlay
overlay = GraphicsOverlay(
image_path=image_path,
position=position,
opacity=opacity,
scale=scale,
duration=duration,
start_time=start_time
)
return self._add_operation("add_image", overlay=overlay)
[docs]
def apply_filter(self, filter_string: str) -> 'Video':
"""
Apply custom FFmpeg filter string.
Args:
filter_string: FFmpeg filter string
Returns:
Self for method chaining
Example:
>>> video.apply_filter("negate") # Invert colors
"""
return self._add_operation("apply_filter", filter=filter_string)
[docs]
def detect_scenes(self, threshold: float = 30.0,
min_scene_length: int = 15) -> List[Any]:
"""
Detect scene changes in the video.
Args:
threshold: Difference threshold for scene detection
min_scene_length: Minimum frames between scene changes
Returns:
List of scene change objects
Example:
>>> scenes = video.detect_scenes()
"""
from ..operations.analysis import SceneDetector
result = self._apply_operations()
detector = SceneDetector(threshold=threshold, min_scene_length=min_scene_length)
return detector.detect_from_video_object(result)
[docs]
def analyze_audio(self) -> Any:
"""
Analyze audio content of the video.
Returns:
Audio analysis results
Example:
>>> analysis = video.analyze_audio()
"""
from ..operations.analysis import AudioAnalyzer
result = self._apply_operations()
analyzer = AudioAnalyzer()
# Extract audio first
import tempfile
import subprocess
audio_path = os.path.join(tempfile.gettempdir(), "temp_audio.wav")
subprocess.run([
"ffmpeg", "-i", result._video["path"],
"-q:a", "0", "-map", "a",
"-y", audio_path
], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
# Load and analyze
import librosa
y, sr = librosa.load(audio_path)
# Calculate expected format
rms = librosa.feature.rms(y=y)[0]
peak = np.max(np.abs(y))
# Get tempo
tempo, _ = librosa.beat.beat_track(y=y, sr=sr)
return {
"volume_rms": float(np.mean(rms)),
"volume_peak": float(peak),
"tempo": float(tempo)
}
[docs]
def stream_to(self, rtmp_url: str, **kwargs) -> None:
"""
Stream video to RTMP server.
Args:
rtmp_url: RTMP stream URL
**kwargs: Additional streaming options
Example:
>>> video.stream_to("rtmp://live.example.com/stream/key")
"""
result = self._apply_operations()
result._backend.save(result._video, rtmp_url, **kwargs)
[docs]
def __repr__(self) -> str:
"""String representation of the Video object."""
if self._video is None:
return "Video(not loaded)"
try:
info = self.info()
return f"Video({info.width}x{info.height}, {info.duration:.2f}s, operations: {len(self._operations)})"
except Exception as e:
return f"Video(error getting info: {e})"