#!/usr/bin/env python
"""
Camera Capture Module
This module provides functionality for capturing video and images from webcams
and other camera devices using OpenCV.
"""
import os
import time
import logging
import threading
import queue
from typing import Union, Optional, Tuple, List, Dict, Any
from pathlib import Path
import cv2
import numpy as np
logger = logging.getLogger(__name__)
[docs]
class CameraCapture:
"""
Class for capturing video and images from webcams and camera devices.
This class provides an easy-to-use interface for webcam capture with
features like device selection, resolution control, and both synchronous
and asynchronous capture modes.
"""
[docs]
def __init__(
self,
camera_id: int = 0,
width: Optional[int] = None,
height: Optional[int] = None,
fps: float = 30.0,
backend: Optional[int] = None,
buffer_size: int = 10,
auto_start: bool = False
):
"""
Initialize the camera capture.
Args:
camera_id: Camera device ID (typically 0 for built-in webcam)
width: Requested camera width (None uses default)
height: Requested camera height (None uses default)
fps: Requested camera frame rate
backend: OpenCV backend ID (None uses default)
buffer_size: Size of the frame buffer for async capture
auto_start: Whether to start capturing immediately
"""
self.camera_id = camera_id
self.width = width
self.height = height
self.fps = fps
self.backend = backend
self.buffer_size = buffer_size
self.cap = None
self.is_capturing = False
self.async_thread = None
self.frame_queue = queue.Queue(maxsize=buffer_size)
self.last_frame = None
self.last_frame_time = 0
# Camera properties
self.actual_width = 0
self.actual_height = 0
self.actual_fps = 0
# Auto-start if requested
if auto_start:
self.start()
[docs]
def start(self) -> bool:
"""
Start the camera capture.
Returns:
True if successfully started, False otherwise
"""
if self.is_capturing:
logger.warning("Camera already capturing")
return True
try:
# Initialize camera with specified backend if provided
if self.backend is not None:
self.cap = cv2.VideoCapture(self.camera_id, self.backend)
else:
self.cap = cv2.VideoCapture(self.camera_id)
if not self.cap.isOpened():
logger.error(f"Failed to open camera with ID: {self.camera_id}")
return False
# Set camera properties if specified
if self.width is not None and self.height is not None:
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height)
if self.fps > 0:
self.cap.set(cv2.CAP_PROP_FPS, self.fps)
# Get actual camera properties
self.actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
self.actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
self.actual_fps = self.cap.get(cv2.CAP_PROP_FPS)
logger.info(
f"Camera started: {self.actual_width}x{self.actual_height} "
f"at {self.actual_fps} FPS"
)
self.is_capturing = True
return True
except Exception as e:
logger.error(f"Error starting camera: {e}")
self.stop()
return False
[docs]
def stop(self) -> None:
"""Stop the camera capture."""
self.is_capturing = False
# Stop async thread if running
if self.async_thread and self.async_thread.is_alive():
self.async_thread.join(timeout=1.0)
# Release camera
if self.cap:
self.cap.release()
self.cap = None
# Clear queue
while not self.frame_queue.empty():
try:
self.frame_queue.get_nowait()
except queue.Empty:
break
logger.info("Camera stopped")
[docs]
def get_frame(self) -> Optional[np.ndarray]:
"""
Capture a single frame from the camera.
Returns:
Frame as numpy array, or None if capture failed
"""
if not self.is_capturing or not self.cap:
if not self.start():
return None
ret, frame = self.cap.read()
if not ret:
logger.warning("Failed to capture frame")
return None
self.last_frame = frame
self.last_frame_time = time.time()
return frame
[docs]
def start_async_capture(self) -> bool:
"""
Start asynchronous frame capture in a background thread.
Returns:
True if async capture started successfully, False otherwise
"""
if not self.is_capturing:
if not self.start():
return False
if self.async_thread and self.async_thread.is_alive():
logger.warning("Async capture already running")
return True
# Start capture thread
self.async_thread = threading.Thread(
target=self._async_capture_loop,
daemon=True
)
self.async_thread.start()
logger.info("Async capture started")
return True
[docs]
def stop_async_capture(self) -> None:
"""Stop asynchronous frame capture."""
if self.async_thread and self.async_thread.is_alive():
self.is_capturing = False
self.async_thread.join(timeout=1.0)
self.async_thread = None
logger.info("Async capture stopped")
[docs]
def get_latest_frame(self) -> Optional[np.ndarray]:
"""
Get the latest frame from the async capture queue.
Returns:
Latest frame as numpy array, or None if no frames available
"""
if not self.frame_queue.empty():
try:
frame = self.frame_queue.get_nowait()
self.last_frame = frame
self.last_frame_time = time.time()
return frame
except queue.Empty:
pass
return self.last_frame
def _async_capture_loop(self) -> None:
"""Background thread function for async frame capture."""
while self.is_capturing:
if not self.cap or not self.cap.isOpened():
logger.error("Camera not available for async capture")
break
ret, frame = self.cap.read()
if not ret:
logger.warning("Failed to capture frame in async mode")
time.sleep(0.01) # Small delay to prevent CPU spinning
continue
# Add to queue, removing oldest frame if full
if self.frame_queue.full():
try:
self.frame_queue.get_nowait()
except queue.Empty:
pass
try:
self.frame_queue.put(frame, block=False)
except queue.Full:
pass # Skip frame if queue is still full
[docs]
def capture_image(self, path: Union[str, Path]) -> bool:
"""
Capture a single image and save it to disk.
Args:
path: Output path for the captured image
Returns:
True if image was successfully captured and saved, False otherwise
"""
frame = self.get_frame()
if frame is None:
return False
try:
# Ensure directory exists
os.makedirs(os.path.dirname(os.path.abspath(path)), exist_ok=True)
# Save image
result = cv2.imwrite(str(path), frame)
if result:
logger.info(f"Image saved to: {path}")
return True
else:
logger.error(f"Failed to save image to: {path}")
return False
except Exception as e:
logger.error(f"Error saving image: {e}")
return False
[docs]
def record_video(
self,
path: Union[str, Path],
duration: Optional[float] = None,
codec: str = 'mp4v',
fps: Optional[float] = None
) -> bool:
"""
Record video from camera and save to disk.
Args:
path: Output path for the video file
duration: Recording duration in seconds (None for indefinite)
codec: FourCC codec code (default is mp4v)
fps: Frame rate (None uses camera's frame rate)
Returns:
True if video was successfully recorded, False otherwise
"""
if not self.is_capturing and not self.start():
return False
# Get frame to determine size if needed
frame = self.get_frame()
if frame is None:
logger.error("Failed to get initial frame for video recording")
return False
# Determine video properties
if fps is None:
fps = self.actual_fps if self.actual_fps > 0 else 30.0
width = frame.shape[1]
height = frame.shape[0]
try:
# Ensure directory exists
os.makedirs(os.path.dirname(os.path.abspath(path)), exist_ok=True)
# Create video writer
fourcc = cv2.VideoWriter_fourcc(*codec)
out = cv2.VideoWriter(str(path), fourcc, fps, (width, height))
if not out.isOpened():
logger.error(f"Failed to create video writer for: {path}")
return False
logger.info(f"Started recording to: {path}")
# Record frames
start_time = time.time()
frame_count = 0
while self.is_capturing:
# Check if duration reached
if duration is not None and time.time() - start_time >= duration:
break
# Get frame
ret, frame = self.cap.read()
if not ret:
logger.warning("Failed to capture frame during recording")
time.sleep(0.01) # Small delay to prevent CPU spinning
continue
# Write frame
out.write(frame)
frame_count += 1
# Maintain desired frame rate
target_time = start_time + (frame_count / fps)
current_time = time.time()
if target_time > current_time:
time.sleep(target_time - current_time)
# Release video writer
out.release()
duration_actual = time.time() - start_time
logger.info(
f"Finished recording: {frame_count} frames in {duration_actual:.2f}s "
f"({frame_count / duration_actual:.2f} FPS)"
)
return True
except Exception as e:
logger.error(f"Error recording video: {e}")
return False
[docs]
def __enter__(self):
"""Context manager entry."""
self.start()
return self
[docs]
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.stop()
[docs]
def __del__(self):
"""Destructor to ensure resources are released."""
self.stop()
[docs]
def list_cameras(max_cameras: int = 10) -> List[Dict[str, Any]]:
"""
List available camera devices.
Args:
max_cameras: Maximum number of cameras to check
Returns:
List of camera info dictionaries with 'id', 'working', and 'name' keys
"""
camera_list = []
for i in range(max_cameras):
cap = cv2.VideoCapture(i)
if cap.isOpened():
working = True
# Try to get a frame to verify the camera is actually working
ret, _ = cap.read()
if not ret:
working = False
# Get camera name if possible
try:
name = cap.getBackendName()
except:
name = f"Camera {i}"
camera_list.append({
'id': i,
'working': working,
'name': name,
'width': int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
'height': int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)),
'fps': cap.get(cv2.CAP_PROP_FPS)
})
cap.release()
return camera_list
[docs]
def get_camera_backends() -> Dict[int, str]:
"""
Get available OpenCV camera backends.
Returns:
Dictionary mapping backend IDs to names
"""
backends = {}
# Standard OpenCV backends
backend_list = [
(cv2.CAP_ANY, "Any"),
(cv2.CAP_DSHOW, "DirectShow"),
(cv2.CAP_MSMF, "Media Foundation"),
(cv2.CAP_V4L2, "Video4Linux2"),
(cv2.CAP_GSTREAMER, "GStreamer"),
(cv2.CAP_FFMPEG, "FFmpeg")
]
# Add other backends if available
if hasattr(cv2, 'CAP_IMAGES'):
backend_list.append((cv2.CAP_IMAGES, "Images"))
if hasattr(cv2, 'CAP_AVFOUNDATION'):
backend_list.append((cv2.CAP_AVFOUNDATION, "AVFoundation"))
if hasattr(cv2, 'CAP_ANDROID'):
backend_list.append((cv2.CAP_ANDROID, "Android"))
if hasattr(cv2, 'CAP_WINRT'):
backend_list.append((cv2.CAP_WINRT, "WinRT"))
if hasattr(cv2, 'CAP_OPENCV_MJPEG'):
backend_list.append((cv2.CAP_OPENCV_MJPEG, "OpenCV MJPEG"))
# Filter to only include backends that are available
for backend_id, name in backend_list:
try:
cap = cv2.VideoCapture(0, backend_id)
# If we can open it or at least the constant exists
if cap.isOpened() or backend_id in cv2.__dict__.values():
backends[backend_id] = name
cap.release()
except Exception:
# Skip backends that cause errors
pass
return backends
# Simple demonstration of camera functionality
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
print("Available cameras:")
cameras = list_cameras()
for camera in cameras:
print(f"Camera {camera['id']}: {camera['name']} ({camera['width']}x{camera['height']})")
if cameras:
print("\nTesting camera capture...")
camera = CameraCapture(camera_id=cameras[0]['id'])
if camera.start():
print(f"Camera started: {camera.actual_width}x{camera.actual_height} at {camera.actual_fps} FPS")
# Capture a test image
camera.capture_image("camera_test.jpg")
print("Test image saved to camera_test.jpg")
# Display live preview
print("Showing live preview (press 'q' to quit)")
try:
while True:
frame = camera.get_frame()
if frame is not None:
cv2.imshow("Camera Preview", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
finally:
cv2.destroyAllWindows()
camera.stop()
else:
print("Failed to start camera")