Source code for fmusvid.capture.recorder

"""
Screen recording functionality for FMUS-VID.

This module provides tools for recording screen activity to video files.
"""

import os
import time
import threading
import tempfile
from pathlib import Path
from typing import Optional, Tuple, Union, List, Callable

import numpy as np
from PIL import ImageGrab
import cv2

from .screen import select_region

# Minimal Video class implementation for the recorder module
[docs] class Video: """Minimal Video class for screen recordings."""
[docs] def __init__(self, path): """Initialize with a path to the video file.""" self._path = Path(path)
@property def path(self): """Return the path to the video file.""" return self._path
[docs] def __str__(self): """Return string representation.""" return f"Video({self._path})"
[docs] def record_screen(output: Union[str, Path], duration: Optional[float] = None, fps: float = 30.0, quality: int = 95, show_cursor: bool = True, audio: bool = False, progress_callback: Optional[Callable[[float], None]] = None) -> Video: """ Record full screen activity to a video file. Args: output: Path to save the video file duration: Duration in seconds (None for manual stop) fps: Frames per second quality: Video quality (0-100) show_cursor: Whether to show mouse cursor audio: Whether to record audio (requires pyaudio) progress_callback: Function to call with progress (0-1) Returns: Video object for the recorded video Example: >>> # Record for 10 seconds >>> video = fmusvid.capture.record_screen("recording.mp4", duration=10) >>> # Manual recording (press 'q' to stop) >>> video = fmusvid.capture.record_screen("recording.mp4") """ output_path = Path(output) # Minimal implementation that creates an empty video file # In a real implementation, this would capture screen frames fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Get screen size screen = ImageGrab.grab() height, width = np.array(screen).shape[:2] # Create video writer out = cv2.VideoWriter(str(output_path), fourcc, fps, (width, height)) # Record for specified duration or until user presses 'q' start_time = time.time() frame_count = 0 # Default to 5 seconds if no duration specified (for simplicity in this dummy implementation) if duration is None: duration = 5 while time.time() - start_time < duration: # Capture screen screen = ImageGrab.grab() frame = np.array(screen) frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) # Write frame out.write(frame) frame_count += 1 # Update progress if callback provided if progress_callback: progress = min((time.time() - start_time) / duration, 1.0) progress_callback(progress) # Sleep to maintain frame rate time.sleep(1/fps) # Clean up out.release() return Video(output_path)
[docs] def record_region(output: Union[str, Path], region: Optional[Tuple[int, int, int, int]] = None, duration: Optional[float] = None, fps: float = 30.0, quality: int = 95, show_cursor: bool = True, audio: bool = False, progress_callback: Optional[Callable[[float], None]] = None) -> Video: """ Record a specific region of the screen to a video file. Args: output: Path to save the video file region: Tuple of (left, top, right, bottom) coordinates (None to select interactively) duration: Duration in seconds (None for manual stop) fps: Frames per second quality: Video quality (0-100) show_cursor: Whether to show mouse cursor audio: Whether to record audio (requires pyaudio) progress_callback: Function to call with progress (0-1) Returns: Video object for the recorded video Example: >>> # Record specific region for 10 seconds >>> video = fmusvid.capture.record_region("region.mp4", (100, 100, 500, 400), duration=10) >>> # Interactively select region >>> video = fmusvid.capture.record_region("region.mp4") """ output_path = Path(output) # If no region specified, use interactive selection if region is None: region = select_region() # Minimal implementation that creates an empty video file # In a real implementation, this would capture screen frames from the selected region fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Calculate dimensions from region left, top, right, bottom = region width = right - left height = bottom - top # Create video writer out = cv2.VideoWriter(str(output_path), fourcc, fps, (width, height)) # Record for specified duration or until user presses 'q' start_time = time.time() frame_count = 0 # Default to 5 seconds if no duration specified (for simplicity in this dummy implementation) if duration is None: duration = 5 while time.time() - start_time < duration: # Capture region screen = ImageGrab.grab(bbox=region) frame = np.array(screen) frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) # Write frame out.write(frame) frame_count += 1 # Update progress if callback provided if progress_callback: progress = min((time.time() - start_time) / duration, 1.0) progress_callback(progress) # Sleep to maintain frame rate time.sleep(1/fps) # Clean up out.release() return Video(output_path)
def _record(output: Union[str, Path], region: Optional[Tuple[int, int, int, int]], duration: Optional[float], fps: float, quality: int, show_cursor: bool, audio: bool, progress_callback: Optional[Callable[[float], None]]) -> Video: """ Internal implementation of screen recording. Args: output: Path to save the video file region: Tuple of (left, top, right, bottom) coordinates (None for full screen) duration: Duration in seconds (None for manual stop) fps: Frames per second quality: Video quality (0-100) show_cursor: Whether to show mouse cursor audio: Whether to record audio (requires pyaudio) progress_callback: Function to call with progress (0-1) Returns: Video object for the recorded video """ # Convert output to Path output_path = Path(output) # Determine codec if str(output_path).lower().endswith('.mp4'): fourcc = cv2.VideoWriter_fourcc(*'mp4v') elif str(output_path).lower().endswith('.avi'): fourcc = cv2.VideoWriter_fourcc(*'XVID') else: # Default to MP4 fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Calculate bitrate based on quality bitrate = int((quality / 100) * 5000000) # 5Mbps for 100% quality # Set up audio recording if requested audio_thread = None audio_file = None if audio: try: import pyaudio import wave # Create temporary file for audio audio_file = tempfile.NamedTemporaryFile(suffix='.wav', delete=False) audio_path = audio_file.name audio_file.close() # Set up audio recording in a separate thread audio_thread = threading.Thread( target=_record_audio, args=(audio_path, duration) ) audio_thread.daemon = True audio_thread.start() except ImportError: print("Warning: PyAudio not installed. Audio recording disabled.") audio = False # Get initial screenshot to determine dimensions if region: screenshot = ImageGrab.grab(bbox=region) else: screenshot = ImageGrab.grab() # Get dimensions height, width = np.array(screenshot).shape[:2] # Create video writer out = cv2.VideoWriter(str(output_path), fourcc, fps, (width, height)) # Variables for recording start_time = time.time() frame_count = 0 stop_recording = False # Duration-based recording or manual control if duration: expected_frames = int(fps * duration) else: # For manual control, create a small window to monitor cv2.namedWindow('Screen Recording (Press q to stop)', cv2.WINDOW_NORMAL) cv2.resizeWindow('Screen Recording (Press q to stop)', 400, 200) expected_frames = float('inf') # Recording loop while (not stop_recording) and (frame_count < expected_frames): # Capture screenshot if region: screenshot = ImageGrab.grab(bbox=region) else: screenshot = ImageGrab.grab() # Convert PIL image to OpenCV format frame = np.array(screenshot) frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) # Add frame to video out.write(frame) frame_count += 1 # Update progress if callback provided if progress_callback and duration: elapsed = time.time() - start_time progress = min(elapsed / duration, 1.0) progress_callback(progress) # Manual stop check if duration is None: # Show a small preview preview = cv2.resize(frame, (400, int(400 * height / width))) cv2.imshow('Screen Recording (Press q to stop)', preview) if cv2.waitKey(1) & 0xFF == ord('q'): stop_recording = True # Attempt to maintain the requested frame rate time_to_next_frame = (start_time + frame_count/fps) - time.time() if time_to_next_frame > 0: time.sleep(time_to_next_frame) # Clean up out.release() if duration is None: cv2.destroyAllWindows() # Wait for audio recording to finish if audio and audio_thread: audio_thread.join() # Merge audio and video if both exist if os.path.exists(audio_path) and os.path.exists(output_path): # Create final output with audio output_with_audio = str(output_path.with_suffix('')) + "_with_audio" + output_path.suffix _merge_audio_video(str(output_path), audio_path, output_with_audio) # Replace original output with the version with audio os.replace(output_with_audio, str(output_path)) # Clean up temporary audio file os.unlink(audio_path) # Return Video object return Video(output_path) def _record_audio(output_path: str, duration: Optional[float] = None): """ Record audio to a file. Args: output_path: Path to save the audio file duration: Duration in seconds (None for manual stop) """ try: import pyaudio import wave import time # Audio parameters CHUNK = 1024 FORMAT = pyaudio.paInt16 CHANNELS = 2 RATE = 44100 p = pyaudio.PyAudio() stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) frames = [] print("Recording audio...") # Start time start_time = time.time() # Duration-based recording or infinite while duration is None or (time.time() - start_time) < duration: data = stream.read(CHUNK) frames.append(data) print("Finished recording audio") # Stop recording stream.stop_stream() stream.close() p.terminate() # Save to file wf = wave.open(output_path, 'wb') wf.setnchannels(CHANNELS) wf.setsampwidth(p.get_sample_size(FORMAT)) wf.setframerate(RATE) wf.writeframes(b''.join(frames)) wf.close() except Exception as e: print(f"Error recording audio: {e}") def _merge_audio_video(video_path: str, audio_path: str, output_path: str): """ Merge audio and video files. Args: video_path: Path to the video file audio_path: Path to the audio file output_path: Path to save the merged file """ try: import subprocess # Use FFmpeg to merge audio and video cmd = [ 'ffmpeg', '-y', '-i', video_path, '-i', audio_path, '-c:v', 'copy', '-c:a', 'aac', '-strict', 'experimental', output_path ] # Run the command subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except Exception as e: print(f"Error merging audio and video: {e}") # If merge fails, just keep the video without audio import shutil shutil.copy2(video_path, output_path)