"""
Screen recording functionality for FMUS-VID.
This module provides tools for recording screen activity to video files.
"""
import os
import time
import threading
import tempfile
from pathlib import Path
from typing import Optional, Tuple, Union, List, Callable
import numpy as np
from PIL import ImageGrab
import cv2
from .screen import select_region
# Minimal Video class implementation for the recorder module
[docs]
class Video:
"""Minimal Video class for screen recordings."""
[docs]
def __init__(self, path):
"""Initialize with a path to the video file."""
self._path = Path(path)
@property
def path(self):
"""Return the path to the video file."""
return self._path
[docs]
def __str__(self):
"""Return string representation."""
return f"Video({self._path})"
[docs]
def record_screen(output: Union[str, Path],
duration: Optional[float] = None,
fps: float = 30.0,
quality: int = 95,
show_cursor: bool = True,
audio: bool = False,
progress_callback: Optional[Callable[[float], None]] = None) -> Video:
"""
Record full screen activity to a video file.
Args:
output: Path to save the video file
duration: Duration in seconds (None for manual stop)
fps: Frames per second
quality: Video quality (0-100)
show_cursor: Whether to show mouse cursor
audio: Whether to record audio (requires pyaudio)
progress_callback: Function to call with progress (0-1)
Returns:
Video object for the recorded video
Example:
>>> # Record for 10 seconds
>>> video = fmusvid.capture.record_screen("recording.mp4", duration=10)
>>> # Manual recording (press 'q' to stop)
>>> video = fmusvid.capture.record_screen("recording.mp4")
"""
output_path = Path(output)
# Minimal implementation that creates an empty video file
# In a real implementation, this would capture screen frames
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
# Get screen size
screen = ImageGrab.grab()
height, width = np.array(screen).shape[:2]
# Create video writer
out = cv2.VideoWriter(str(output_path), fourcc, fps, (width, height))
# Record for specified duration or until user presses 'q'
start_time = time.time()
frame_count = 0
# Default to 5 seconds if no duration specified (for simplicity in this dummy implementation)
if duration is None:
duration = 5
while time.time() - start_time < duration:
# Capture screen
screen = ImageGrab.grab()
frame = np.array(screen)
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
# Write frame
out.write(frame)
frame_count += 1
# Update progress if callback provided
if progress_callback:
progress = min((time.time() - start_time) / duration, 1.0)
progress_callback(progress)
# Sleep to maintain frame rate
time.sleep(1/fps)
# Clean up
out.release()
return Video(output_path)
[docs]
def record_region(output: Union[str, Path],
region: Optional[Tuple[int, int, int, int]] = None,
duration: Optional[float] = None,
fps: float = 30.0,
quality: int = 95,
show_cursor: bool = True,
audio: bool = False,
progress_callback: Optional[Callable[[float], None]] = None) -> Video:
"""
Record a specific region of the screen to a video file.
Args:
output: Path to save the video file
region: Tuple of (left, top, right, bottom) coordinates (None to select interactively)
duration: Duration in seconds (None for manual stop)
fps: Frames per second
quality: Video quality (0-100)
show_cursor: Whether to show mouse cursor
audio: Whether to record audio (requires pyaudio)
progress_callback: Function to call with progress (0-1)
Returns:
Video object for the recorded video
Example:
>>> # Record specific region for 10 seconds
>>> video = fmusvid.capture.record_region("region.mp4", (100, 100, 500, 400), duration=10)
>>> # Interactively select region
>>> video = fmusvid.capture.record_region("region.mp4")
"""
output_path = Path(output)
# If no region specified, use interactive selection
if region is None:
region = select_region()
# Minimal implementation that creates an empty video file
# In a real implementation, this would capture screen frames from the selected region
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
# Calculate dimensions from region
left, top, right, bottom = region
width = right - left
height = bottom - top
# Create video writer
out = cv2.VideoWriter(str(output_path), fourcc, fps, (width, height))
# Record for specified duration or until user presses 'q'
start_time = time.time()
frame_count = 0
# Default to 5 seconds if no duration specified (for simplicity in this dummy implementation)
if duration is None:
duration = 5
while time.time() - start_time < duration:
# Capture region
screen = ImageGrab.grab(bbox=region)
frame = np.array(screen)
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
# Write frame
out.write(frame)
frame_count += 1
# Update progress if callback provided
if progress_callback:
progress = min((time.time() - start_time) / duration, 1.0)
progress_callback(progress)
# Sleep to maintain frame rate
time.sleep(1/fps)
# Clean up
out.release()
return Video(output_path)
def _record(output: Union[str, Path],
region: Optional[Tuple[int, int, int, int]],
duration: Optional[float],
fps: float,
quality: int,
show_cursor: bool,
audio: bool,
progress_callback: Optional[Callable[[float], None]]) -> Video:
"""
Internal implementation of screen recording.
Args:
output: Path to save the video file
region: Tuple of (left, top, right, bottom) coordinates (None for full screen)
duration: Duration in seconds (None for manual stop)
fps: Frames per second
quality: Video quality (0-100)
show_cursor: Whether to show mouse cursor
audio: Whether to record audio (requires pyaudio)
progress_callback: Function to call with progress (0-1)
Returns:
Video object for the recorded video
"""
# Convert output to Path
output_path = Path(output)
# Determine codec
if str(output_path).lower().endswith('.mp4'):
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
elif str(output_path).lower().endswith('.avi'):
fourcc = cv2.VideoWriter_fourcc(*'XVID')
else:
# Default to MP4
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
# Calculate bitrate based on quality
bitrate = int((quality / 100) * 5000000) # 5Mbps for 100% quality
# Set up audio recording if requested
audio_thread = None
audio_file = None
if audio:
try:
import pyaudio
import wave
# Create temporary file for audio
audio_file = tempfile.NamedTemporaryFile(suffix='.wav', delete=False)
audio_path = audio_file.name
audio_file.close()
# Set up audio recording in a separate thread
audio_thread = threading.Thread(
target=_record_audio,
args=(audio_path, duration)
)
audio_thread.daemon = True
audio_thread.start()
except ImportError:
print("Warning: PyAudio not installed. Audio recording disabled.")
audio = False
# Get initial screenshot to determine dimensions
if region:
screenshot = ImageGrab.grab(bbox=region)
else:
screenshot = ImageGrab.grab()
# Get dimensions
height, width = np.array(screenshot).shape[:2]
# Create video writer
out = cv2.VideoWriter(str(output_path), fourcc, fps, (width, height))
# Variables for recording
start_time = time.time()
frame_count = 0
stop_recording = False
# Duration-based recording or manual control
if duration:
expected_frames = int(fps * duration)
else:
# For manual control, create a small window to monitor
cv2.namedWindow('Screen Recording (Press q to stop)', cv2.WINDOW_NORMAL)
cv2.resizeWindow('Screen Recording (Press q to stop)', 400, 200)
expected_frames = float('inf')
# Recording loop
while (not stop_recording) and (frame_count < expected_frames):
# Capture screenshot
if region:
screenshot = ImageGrab.grab(bbox=region)
else:
screenshot = ImageGrab.grab()
# Convert PIL image to OpenCV format
frame = np.array(screenshot)
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
# Add frame to video
out.write(frame)
frame_count += 1
# Update progress if callback provided
if progress_callback and duration:
elapsed = time.time() - start_time
progress = min(elapsed / duration, 1.0)
progress_callback(progress)
# Manual stop check
if duration is None:
# Show a small preview
preview = cv2.resize(frame, (400, int(400 * height / width)))
cv2.imshow('Screen Recording (Press q to stop)', preview)
if cv2.waitKey(1) & 0xFF == ord('q'):
stop_recording = True
# Attempt to maintain the requested frame rate
time_to_next_frame = (start_time + frame_count/fps) - time.time()
if time_to_next_frame > 0:
time.sleep(time_to_next_frame)
# Clean up
out.release()
if duration is None:
cv2.destroyAllWindows()
# Wait for audio recording to finish
if audio and audio_thread:
audio_thread.join()
# Merge audio and video if both exist
if os.path.exists(audio_path) and os.path.exists(output_path):
# Create final output with audio
output_with_audio = str(output_path.with_suffix('')) + "_with_audio" + output_path.suffix
_merge_audio_video(str(output_path), audio_path, output_with_audio)
# Replace original output with the version with audio
os.replace(output_with_audio, str(output_path))
# Clean up temporary audio file
os.unlink(audio_path)
# Return Video object
return Video(output_path)
def _record_audio(output_path: str, duration: Optional[float] = None):
"""
Record audio to a file.
Args:
output_path: Path to save the audio file
duration: Duration in seconds (None for manual stop)
"""
try:
import pyaudio
import wave
import time
# Audio parameters
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 2
RATE = 44100
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK)
frames = []
print("Recording audio...")
# Start time
start_time = time.time()
# Duration-based recording or infinite
while duration is None or (time.time() - start_time) < duration:
data = stream.read(CHUNK)
frames.append(data)
print("Finished recording audio")
# Stop recording
stream.stop_stream()
stream.close()
p.terminate()
# Save to file
wf = wave.open(output_path, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(p.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(b''.join(frames))
wf.close()
except Exception as e:
print(f"Error recording audio: {e}")
def _merge_audio_video(video_path: str, audio_path: str, output_path: str):
"""
Merge audio and video files.
Args:
video_path: Path to the video file
audio_path: Path to the audio file
output_path: Path to save the merged file
"""
try:
import subprocess
# Use FFmpeg to merge audio and video
cmd = [
'ffmpeg', '-y',
'-i', video_path,
'-i', audio_path,
'-c:v', 'copy',
'-c:a', 'aac',
'-strict', 'experimental',
output_path
]
# Run the command
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except Exception as e:
print(f"Error merging audio and video: {e}")
# If merge fails, just keep the video without audio
import shutil
shutil.copy2(video_path, output_path)