Source code for fmusvid.capture.camera

#!/usr/bin/env python
"""
Camera Capture Module

This module provides functionality for capturing video and images from webcams
and other camera devices using OpenCV.
"""

import os
import time
import logging
import threading
import queue
from typing import Union, Optional, Tuple, List, Dict, Any
from pathlib import Path

import cv2
import numpy as np

logger = logging.getLogger(__name__)

[docs] class CameraCapture: """ Class for capturing video and images from webcams and camera devices. This class provides an easy-to-use interface for webcam capture with features like device selection, resolution control, and both synchronous and asynchronous capture modes. """
[docs] def __init__( self, camera_id: int = 0, width: Optional[int] = None, height: Optional[int] = None, fps: float = 30.0, backend: Optional[int] = None, buffer_size: int = 10, auto_start: bool = False ): """ Initialize the camera capture. Args: camera_id: Camera device ID (typically 0 for built-in webcam) width: Requested camera width (None uses default) height: Requested camera height (None uses default) fps: Requested camera frame rate backend: OpenCV backend ID (None uses default) buffer_size: Size of the frame buffer for async capture auto_start: Whether to start capturing immediately """ self.camera_id = camera_id self.width = width self.height = height self.fps = fps self.backend = backend self.buffer_size = buffer_size self.cap = None self.is_capturing = False self.async_thread = None self.frame_queue = queue.Queue(maxsize=buffer_size) self.last_frame = None self.last_frame_time = 0 # Camera properties self.actual_width = 0 self.actual_height = 0 self.actual_fps = 0 # Auto-start if requested if auto_start: self.start()
[docs] def start(self) -> bool: """ Start the camera capture. Returns: True if successfully started, False otherwise """ if self.is_capturing: logger.warning("Camera already capturing") return True try: # Initialize camera with specified backend if provided if self.backend is not None: self.cap = cv2.VideoCapture(self.camera_id, self.backend) else: self.cap = cv2.VideoCapture(self.camera_id) if not self.cap.isOpened(): logger.error(f"Failed to open camera with ID: {self.camera_id}") return False # Set camera properties if specified if self.width is not None and self.height is not None: self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.width) self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height) if self.fps > 0: self.cap.set(cv2.CAP_PROP_FPS, self.fps) # Get actual camera properties self.actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) self.actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) self.actual_fps = self.cap.get(cv2.CAP_PROP_FPS) logger.info( f"Camera started: {self.actual_width}x{self.actual_height} " f"at {self.actual_fps} FPS" ) self.is_capturing = True return True except Exception as e: logger.error(f"Error starting camera: {e}") self.stop() return False
[docs] def stop(self) -> None: """Stop the camera capture.""" self.is_capturing = False # Stop async thread if running if self.async_thread and self.async_thread.is_alive(): self.async_thread.join(timeout=1.0) # Release camera if self.cap: self.cap.release() self.cap = None # Clear queue while not self.frame_queue.empty(): try: self.frame_queue.get_nowait() except queue.Empty: break logger.info("Camera stopped")
[docs] def get_frame(self) -> Optional[np.ndarray]: """ Capture a single frame from the camera. Returns: Frame as numpy array, or None if capture failed """ if not self.is_capturing or not self.cap: if not self.start(): return None ret, frame = self.cap.read() if not ret: logger.warning("Failed to capture frame") return None self.last_frame = frame self.last_frame_time = time.time() return frame
[docs] def start_async_capture(self) -> bool: """ Start asynchronous frame capture in a background thread. Returns: True if async capture started successfully, False otherwise """ if not self.is_capturing: if not self.start(): return False if self.async_thread and self.async_thread.is_alive(): logger.warning("Async capture already running") return True # Start capture thread self.async_thread = threading.Thread( target=self._async_capture_loop, daemon=True ) self.async_thread.start() logger.info("Async capture started") return True
[docs] def stop_async_capture(self) -> None: """Stop asynchronous frame capture.""" if self.async_thread and self.async_thread.is_alive(): self.is_capturing = False self.async_thread.join(timeout=1.0) self.async_thread = None logger.info("Async capture stopped")
[docs] def get_latest_frame(self) -> Optional[np.ndarray]: """ Get the latest frame from the async capture queue. Returns: Latest frame as numpy array, or None if no frames available """ if not self.frame_queue.empty(): try: frame = self.frame_queue.get_nowait() self.last_frame = frame self.last_frame_time = time.time() return frame except queue.Empty: pass return self.last_frame
def _async_capture_loop(self) -> None: """Background thread function for async frame capture.""" while self.is_capturing: if not self.cap or not self.cap.isOpened(): logger.error("Camera not available for async capture") break ret, frame = self.cap.read() if not ret: logger.warning("Failed to capture frame in async mode") time.sleep(0.01) # Small delay to prevent CPU spinning continue # Add to queue, removing oldest frame if full if self.frame_queue.full(): try: self.frame_queue.get_nowait() except queue.Empty: pass try: self.frame_queue.put(frame, block=False) except queue.Full: pass # Skip frame if queue is still full
[docs] def capture_image(self, path: Union[str, Path]) -> bool: """ Capture a single image and save it to disk. Args: path: Output path for the captured image Returns: True if image was successfully captured and saved, False otherwise """ frame = self.get_frame() if frame is None: return False try: # Ensure directory exists os.makedirs(os.path.dirname(os.path.abspath(path)), exist_ok=True) # Save image result = cv2.imwrite(str(path), frame) if result: logger.info(f"Image saved to: {path}") return True else: logger.error(f"Failed to save image to: {path}") return False except Exception as e: logger.error(f"Error saving image: {e}") return False
[docs] def record_video( self, path: Union[str, Path], duration: Optional[float] = None, codec: str = 'mp4v', fps: Optional[float] = None ) -> bool: """ Record video from camera and save to disk. Args: path: Output path for the video file duration: Recording duration in seconds (None for indefinite) codec: FourCC codec code (default is mp4v) fps: Frame rate (None uses camera's frame rate) Returns: True if video was successfully recorded, False otherwise """ if not self.is_capturing and not self.start(): return False # Get frame to determine size if needed frame = self.get_frame() if frame is None: logger.error("Failed to get initial frame for video recording") return False # Determine video properties if fps is None: fps = self.actual_fps if self.actual_fps > 0 else 30.0 width = frame.shape[1] height = frame.shape[0] try: # Ensure directory exists os.makedirs(os.path.dirname(os.path.abspath(path)), exist_ok=True) # Create video writer fourcc = cv2.VideoWriter_fourcc(*codec) out = cv2.VideoWriter(str(path), fourcc, fps, (width, height)) if not out.isOpened(): logger.error(f"Failed to create video writer for: {path}") return False logger.info(f"Started recording to: {path}") # Record frames start_time = time.time() frame_count = 0 while self.is_capturing: # Check if duration reached if duration is not None and time.time() - start_time >= duration: break # Get frame ret, frame = self.cap.read() if not ret: logger.warning("Failed to capture frame during recording") time.sleep(0.01) # Small delay to prevent CPU spinning continue # Write frame out.write(frame) frame_count += 1 # Maintain desired frame rate target_time = start_time + (frame_count / fps) current_time = time.time() if target_time > current_time: time.sleep(target_time - current_time) # Release video writer out.release() duration_actual = time.time() - start_time logger.info( f"Finished recording: {frame_count} frames in {duration_actual:.2f}s " f"({frame_count / duration_actual:.2f} FPS)" ) return True except Exception as e: logger.error(f"Error recording video: {e}") return False
[docs] def __enter__(self): """Context manager entry.""" self.start() return self
[docs] def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" self.stop()
[docs] def __del__(self): """Destructor to ensure resources are released.""" self.stop()
[docs] def list_cameras(max_cameras: int = 10) -> List[Dict[str, Any]]: """ List available camera devices. Args: max_cameras: Maximum number of cameras to check Returns: List of camera info dictionaries with 'id', 'working', and 'name' keys """ camera_list = [] for i in range(max_cameras): cap = cv2.VideoCapture(i) if cap.isOpened(): working = True # Try to get a frame to verify the camera is actually working ret, _ = cap.read() if not ret: working = False # Get camera name if possible try: name = cap.getBackendName() except: name = f"Camera {i}" camera_list.append({ 'id': i, 'working': working, 'name': name, 'width': int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), 'height': int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)), 'fps': cap.get(cv2.CAP_PROP_FPS) }) cap.release() return camera_list
[docs] def get_camera_backends() -> Dict[int, str]: """ Get available OpenCV camera backends. Returns: Dictionary mapping backend IDs to names """ backends = {} # Standard OpenCV backends backend_list = [ (cv2.CAP_ANY, "Any"), (cv2.CAP_DSHOW, "DirectShow"), (cv2.CAP_MSMF, "Media Foundation"), (cv2.CAP_V4L2, "Video4Linux2"), (cv2.CAP_GSTREAMER, "GStreamer"), (cv2.CAP_FFMPEG, "FFmpeg") ] # Add other backends if available if hasattr(cv2, 'CAP_IMAGES'): backend_list.append((cv2.CAP_IMAGES, "Images")) if hasattr(cv2, 'CAP_AVFOUNDATION'): backend_list.append((cv2.CAP_AVFOUNDATION, "AVFoundation")) if hasattr(cv2, 'CAP_ANDROID'): backend_list.append((cv2.CAP_ANDROID, "Android")) if hasattr(cv2, 'CAP_WINRT'): backend_list.append((cv2.CAP_WINRT, "WinRT")) if hasattr(cv2, 'CAP_OPENCV_MJPEG'): backend_list.append((cv2.CAP_OPENCV_MJPEG, "OpenCV MJPEG")) # Filter to only include backends that are available for backend_id, name in backend_list: try: cap = cv2.VideoCapture(0, backend_id) # If we can open it or at least the constant exists if cap.isOpened() or backend_id in cv2.__dict__.values(): backends[backend_id] = name cap.release() except Exception: # Skip backends that cause errors pass return backends
# Simple demonstration of camera functionality if __name__ == "__main__": logging.basicConfig(level=logging.INFO) print("Available cameras:") cameras = list_cameras() for camera in cameras: print(f"Camera {camera['id']}: {camera['name']} ({camera['width']}x{camera['height']})") if cameras: print("\nTesting camera capture...") camera = CameraCapture(camera_id=cameras[0]['id']) if camera.start(): print(f"Camera started: {camera.actual_width}x{camera.actual_height} at {camera.actual_fps} FPS") # Capture a test image camera.capture_image("camera_test.jpg") print("Test image saved to camera_test.jpg") # Display live preview print("Showing live preview (press 'q' to quit)") try: while True: frame = camera.get_frame() if frame is not None: cv2.imshow("Camera Preview", frame) if cv2.waitKey(1) & 0xFF == ord('q'): break finally: cv2.destroyAllWindows() camera.stop() else: print("Failed to start camera")