Source code for fmusvid.operations.composition

"""
Composition operations for FMUS-VID.

This module provides functions for combining multiple videos or images.
"""

import os
from typing import List, Union, Optional, Tuple
from pathlib import Path

import cv2
import numpy as np

from ..core.video import Video


[docs] def images_to_video( images: Union[List[Union[str, Path]], Path], output: Union[str, Path], fps: float = 30.0, **kwargs ) -> Video: """ Convert a sequence of images to a video. Args: images: List of image paths or directory containing images output: Path to save the output video fps: Frames per second **kwargs: Additional options Returns: Video object for the created video Example: >>> # From list of images >>> video = fmusvid.images_to_video(['frame1.png', 'frame2.png', 'frame3.png'], 'output.mp4') >>> # From directory >>> video = fmusvid.images_to_video('frames_directory', 'output.mp4', fps=24) """ # Convert output to Path output_path = Path(output) # Determine if images is a list or directory if isinstance(images, (str, Path)): # It's a directory, get all image files image_dir = Path(images) if not image_dir.is_dir(): raise ValueError(f"Image path '{image_dir}' is not a directory") # Get all image files image_files = [] for ext in ['.png', '.jpg', '.jpeg', '.bmp', '.tiff']: image_files.extend(list(image_dir.glob(f'*{ext}'))) image_files.extend(list(image_dir.glob(f'*{ext.upper()}'))) # Sort image files image_files.sort() else: # It's a list of image files image_files = [Path(img) for img in images] # Verify we have images if not image_files: raise ValueError("No images found") # Read the first image to get dimensions first_image = cv2.imread(str(image_files[0])) if first_image is None: raise ValueError(f"Could not read image: {image_files[0]}") height, width, _ = first_image.shape # Determine codec based on output extension if output_path.suffix.lower() == '.mp4': fourcc = cv2.VideoWriter_fourcc(*'mp4v') elif output_path.suffix.lower() == '.avi': fourcc = cv2.VideoWriter_fourcc(*'XVID') else: # Default to MP4 fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Create video writer video_writer = cv2.VideoWriter(str(output_path), fourcc, fps, (width, height)) # Write each image to the video for image_file in image_files: img = cv2.imread(str(image_file)) # Skip invalid images if img is None: print(f"Warning: Could not read image: {image_file}") continue # Resize if dimensions don't match if img.shape[0] != height or img.shape[1] != width: img = cv2.resize(img, (width, height)) # Write the frame video_writer.write(img) # Release the writer video_writer.release() # Return a Video object from ..core.video import Video return Video(output_path)
[docs] def side_by_side(videos: List[Video], output: Union[str, Path] = None, **kwargs) -> Video: """ Arrange videos horizontally side by side. Args: videos: List of Video objects output: Path to save the output video (optional) **kwargs: Additional options Returns: New Video object with side-by-side arrangement Example: >>> comparison = fmusvid.side_by_side([video1, video2, video3]) >>> comparison.save("comparison.mp4") """ # Check if we have videos if not videos: raise ValueError("No videos provided") # Get information about the videos infos = [video.info() for video in videos] # Calculate output dimensions total_width = sum(info.width for info in infos) max_height = max(info.height for info in infos) # Get the maximum duration max_duration = max(info.duration for info in infos) # Get the average fps avg_fps = sum(info.fps for info in infos) / len(infos) # Create a blank video if output: output_path = Path(output) else: # Create a temporary file import tempfile temp_file = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) output_path = Path(temp_file.name) temp_file.close() # Determine codec fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Create video writer video_writer = cv2.VideoWriter(str(output_path), fourcc, avg_fps, (total_width, max_height)) # Open all videos caps = [cv2.VideoCapture(str(video._path)) for video in videos] # Calculate total frames total_frames = int(max_duration * avg_fps) # Process each frame for frame_idx in range(total_frames): # Get frames from each video frames = [] for i, cap in enumerate(caps): # Calculate position position = frame_idx / total_frames * infos[i].duration cap.set(cv2.CAP_PROP_POS_MSEC, position * 1000) # Read frame ret, frame = cap.read() if ret: # Resize to maintain aspect ratio with max_height aspect_ratio = infos[i].width / infos[i].height new_height = max_height new_width = int(new_height * aspect_ratio) frame = cv2.resize(frame, (new_width, new_height)) else: # If no more frames, use black frame frame = np.zeros((max_height, infos[i].width, 3), dtype=np.uint8) frames.append(frame) # Combine frames horizontally combined_frame = np.hstack(frames) # Write the combined frame video_writer.write(combined_frame) # Release resources for cap in caps: cap.release() video_writer.release() # Return Video object return Video(output_path)