"""
Composition operations for FMUS-VID.
This module provides functions for combining multiple videos or images.
"""
import os
from typing import List, Union, Optional, Tuple
from pathlib import Path
import cv2
import numpy as np
from ..core.video import Video
[docs]
def images_to_video(
images: Union[List[Union[str, Path]], Path],
output: Union[str, Path],
fps: float = 30.0,
**kwargs
) -> Video:
"""
Convert a sequence of images to a video.
Args:
images: List of image paths or directory containing images
output: Path to save the output video
fps: Frames per second
**kwargs: Additional options
Returns:
Video object for the created video
Example:
>>> # From list of images
>>> video = fmusvid.images_to_video(['frame1.png', 'frame2.png', 'frame3.png'], 'output.mp4')
>>> # From directory
>>> video = fmusvid.images_to_video('frames_directory', 'output.mp4', fps=24)
"""
# Convert output to Path
output_path = Path(output)
# Determine if images is a list or directory
if isinstance(images, (str, Path)):
# It's a directory, get all image files
image_dir = Path(images)
if not image_dir.is_dir():
raise ValueError(f"Image path '{image_dir}' is not a directory")
# Get all image files
image_files = []
for ext in ['.png', '.jpg', '.jpeg', '.bmp', '.tiff']:
image_files.extend(list(image_dir.glob(f'*{ext}')))
image_files.extend(list(image_dir.glob(f'*{ext.upper()}')))
# Sort image files
image_files.sort()
else:
# It's a list of image files
image_files = [Path(img) for img in images]
# Verify we have images
if not image_files:
raise ValueError("No images found")
# Read the first image to get dimensions
first_image = cv2.imread(str(image_files[0]))
if first_image is None:
raise ValueError(f"Could not read image: {image_files[0]}")
height, width, _ = first_image.shape
# Determine codec based on output extension
if output_path.suffix.lower() == '.mp4':
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
elif output_path.suffix.lower() == '.avi':
fourcc = cv2.VideoWriter_fourcc(*'XVID')
else:
# Default to MP4
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
# Create video writer
video_writer = cv2.VideoWriter(str(output_path), fourcc, fps, (width, height))
# Write each image to the video
for image_file in image_files:
img = cv2.imread(str(image_file))
# Skip invalid images
if img is None:
print(f"Warning: Could not read image: {image_file}")
continue
# Resize if dimensions don't match
if img.shape[0] != height or img.shape[1] != width:
img = cv2.resize(img, (width, height))
# Write the frame
video_writer.write(img)
# Release the writer
video_writer.release()
# Return a Video object
from ..core.video import Video
return Video(output_path)
[docs]
def side_by_side(videos: List[Video], output: Union[str, Path] = None, **kwargs) -> Video:
"""
Arrange videos horizontally side by side.
Args:
videos: List of Video objects
output: Path to save the output video (optional)
**kwargs: Additional options
Returns:
New Video object with side-by-side arrangement
Example:
>>> comparison = fmusvid.side_by_side([video1, video2, video3])
>>> comparison.save("comparison.mp4")
"""
# Check if we have videos
if not videos:
raise ValueError("No videos provided")
# Get information about the videos
infos = [video.info() for video in videos]
# Calculate output dimensions
total_width = sum(info.width for info in infos)
max_height = max(info.height for info in infos)
# Get the maximum duration
max_duration = max(info.duration for info in infos)
# Get the average fps
avg_fps = sum(info.fps for info in infos) / len(infos)
# Create a blank video
if output:
output_path = Path(output)
else:
# Create a temporary file
import tempfile
temp_file = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False)
output_path = Path(temp_file.name)
temp_file.close()
# Determine codec
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
# Create video writer
video_writer = cv2.VideoWriter(str(output_path), fourcc, avg_fps, (total_width, max_height))
# Open all videos
caps = [cv2.VideoCapture(str(video._path)) for video in videos]
# Calculate total frames
total_frames = int(max_duration * avg_fps)
# Process each frame
for frame_idx in range(total_frames):
# Get frames from each video
frames = []
for i, cap in enumerate(caps):
# Calculate position
position = frame_idx / total_frames * infos[i].duration
cap.set(cv2.CAP_PROP_POS_MSEC, position * 1000)
# Read frame
ret, frame = cap.read()
if ret:
# Resize to maintain aspect ratio with max_height
aspect_ratio = infos[i].width / infos[i].height
new_height = max_height
new_width = int(new_height * aspect_ratio)
frame = cv2.resize(frame, (new_width, new_height))
else:
# If no more frames, use black frame
frame = np.zeros((max_height, infos[i].width, 3), dtype=np.uint8)
frames.append(frame)
# Combine frames horizontally
combined_frame = np.hstack(frames)
# Write the combined frame
video_writer.write(combined_frame)
# Release resources
for cap in caps:
cap.release()
video_writer.release()
# Return Video object
return Video(output_path)