Source code for fmusvid.stream.rtmp

"""
RTMP streaming module for FMUS-VID.

This module provides classes for capturing from RTMP streams, outputting to RTMP
streams, and monitoring stream health.
"""

import os
import time
import asyncio
import logging
import subprocess
import threading
import tempfile
from typing import Dict, List, Tuple, Optional, AsyncGenerator, Generator, Union
from pathlib import Path
import numpy as np
import cv2

logger = logging.getLogger(__name__)

# Try to import FFmpeg-python, but don't fail if it's not available
try:
    import ffmpeg
    FFMPEG_AVAILABLE = True
except ImportError:
    logger.warning("ffmpeg-python not installed. Some RTMP functionality will be limited.")
    FFMPEG_AVAILABLE = False


[docs] class RTMPCapture: """ Capture frames and audio from an RTMP stream. This class uses FFmpeg to read from an RTMP stream and provides methods to access frames and audio data. """
[docs] def __init__( self, rtmp_url: str, width: Optional[int] = None, height: Optional[int] = None, fps: Optional[float] = None, audio_sample_rate: int = 16000, buffer_size: int = 10, gpu_acceleration: bool = True ): """ Initialize RTMP capture. Args: rtmp_url: RTMP URL to capture from (e.g., 'rtmp://localhost/live/stream') width: Output width (None to use source width) height: Output height (None to use source height) fps: Output frame rate (None to use source frame rate) audio_sample_rate: Audio sample rate in Hz buffer_size: Size of frame buffer gpu_acceleration: Whether to use GPU acceleration if available """ self.rtmp_url = rtmp_url self.width = width self.height = height self.fps = fps self.audio_sample_rate = audio_sample_rate self.buffer_size = buffer_size self.gpu_acceleration = gpu_acceleration # Internal state self._process = None self._frame_buffer = [] self._audio_buffer = [] self._running = False self._frame_count = 0 self._last_frame_time = 0 self._start_time = 0 self._temp_dir = None self._temp_video_file = None self._temp_audio_file = None if not FFMPEG_AVAILABLE: raise ImportError("ffmpeg-python is required for RTMPCapture")
[docs] async def start(self): """Start capturing from the RTMP stream.""" if self._running: return self._running = True self._start_time = time.time() self._frame_count = 0 # Create temporary directory for intermediate files self._temp_dir = tempfile.TemporaryDirectory() self._temp_video_file = os.path.join(self._temp_dir.name, "temp_video.mp4") self._temp_audio_file = os.path.join(self._temp_dir.name, "temp_audio.wav") # Start FFmpeg process input_args = {'loglevel': 'quiet', 'rtmp_buffer': '100000'} output_args = { 'format': 'rawvideo', 'pix_fmt': 'rgb24' } if self.width and self.height: output_args['s'] = f"{self.width}x{self.height}" if self.fps: output_args['r'] = str(self.fps) # GPU acceleration if requested and available if self.gpu_acceleration: # Check for available hardware acceleration try: # Try NVIDIA GPU acceleration input_args['hwaccel'] = 'cuda' except Exception: logger.warning("GPU acceleration requested but not available") try: # Start the FFmpeg process for video process = ( ffmpeg .input(self.rtmp_url, **input_args) .output('pipe:', **output_args) .run_async(pipe_stdout=True) ) self._process = process # Start the audio extraction in a separate thread audio_thread = threading.Thread( target=self._extract_audio, args=(self.rtmp_url, self._temp_audio_file) ) audio_thread.daemon = True audio_thread.start() logger.info(f"Started RTMP capture from {self.rtmp_url}") except Exception as e: self._running = False logger.error(f"Failed to start RTMP capture: {e}") raise
def _extract_audio(self, rtmp_url, output_file): """Extract audio from RTMP stream in a separate thread.""" try: ( ffmpeg .input(rtmp_url) .output( output_file, format='wav', acodec='pcm_s16le', ar=str(self.audio_sample_rate), ac='1' ) .run(capture_stdout=True, capture_stderr=True) ) except Exception as e: logger.error(f"Audio extraction failed: {e}")
[docs] async def stop(self): """Stop capturing from the RTMP stream.""" if not self._running: return self._running = False # Terminate FFmpeg process if self._process: self._process.terminate() self._process = None # Clean up temporary files if self._temp_dir: self._temp_dir.cleanup() self._temp_dir = None logger.info(f"Stopped RTMP capture from {self.rtmp_url}")
[docs] async def get_frame(self) -> np.ndarray: """ Get the next video frame from the stream. Returns: np.ndarray: RGB image as numpy array Raises: RuntimeError: If the stream is not running """ if not self._running: raise RuntimeError("RTMP capture not running") if not self._process: raise RuntimeError("FFmpeg process not started") try: # Read raw video frame from stdout if self.width and self.height: frame_size = self.width * self.height * 3 else: # This is just a default size, will be resized after first frame frame_size = 1920 * 1080 * 3 in_bytes = self._process.stdout.read(frame_size) if not in_bytes: raise EOFError("End of stream") # Convert to numpy array if self.width and self.height: frame = np.frombuffer(in_bytes, np.uint8).reshape([self.height, self.width, 3]) else: # Need to determine dimensions from the first frame # This is simplified and might need more robust handling frame = np.frombuffer(in_bytes, np.uint8) # Assuming 16:9 aspect ratio and RGB est_height = int(np.sqrt(len(frame) / 3 / 16 * 9)) est_width = int(est_height * 16 / 9) frame = frame[:est_height*est_width*3].reshape([est_height, est_width, 3]) # Update dimensions for future frames self.height, self.width = frame.shape[:2] self._frame_count += 1 self._last_frame_time = time.time() return frame except Exception as e: logger.error(f"Error getting frame: {e}") raise
[docs] async def get_audio(self, duration: float = 0.1) -> np.ndarray: """ Get audio samples for a specified duration. Args: duration: Duration of audio to get in seconds Returns: np.ndarray: Audio samples as numpy array """ if not self._running: raise RuntimeError("RTMP capture not running") # Calculate number of samples to read num_samples = int(self.audio_sample_rate * duration) # This is a simplified implementation # In a real implementation, we would read directly from the audio extraction process # or use a real-time audio buffer try: if os.path.exists(self._temp_audio_file): # Read audio from the temporary WAV file import soundfile as sf with sf.SoundFile(self._temp_audio_file, 'r') as f: # Get current position in the stream elapsed = time.time() - self._start_time target_position = int(elapsed * f.samplerate) # Seek to the current position f.seek(max(0, target_position - num_samples)) # Read samples audio = f.read(num_samples) return audio else: # Return silence if audio file doesn't exist yet return np.zeros(num_samples, dtype=np.float32) except Exception as e: logger.error(f"Error getting audio: {e}") # Return silence on error return np.zeros(num_samples, dtype=np.float32)
[docs] async def get_frame_and_audio(self) -> Tuple[np.ndarray, np.ndarray]: """ Get the next video frame and corresponding audio. Returns: Tuple[np.ndarray, np.ndarray]: RGB image and audio samples """ frame = await self.get_frame() # Estimate audio duration based on frame rate if self.fps: audio_duration = 1.0 / self.fps else: audio_duration = 0.033 # ~30fps audio = await self.get_audio(audio_duration) return frame, audio
[docs] async def aiter_frames(self) -> AsyncGenerator[np.ndarray, None]: """ Async iterator for video frames. Yields: np.ndarray: RGB image as numpy array """ if not self._running: await self.start() try: while self._running: frame = await self.get_frame() yield frame finally: await self.stop()
[docs] async def aiter_frames_with_audio(self) -> AsyncGenerator[Tuple[np.ndarray, np.ndarray], None]: """ Async iterator for video frames with corresponding audio. Yields: Tuple[np.ndarray, np.ndarray]: RGB image and audio samples """ if not self._running: await self.start() try: while self._running: frame, audio = await self.get_frame_and_audio() yield frame, audio finally: await self.stop()
[docs] def iter_frames(self) -> Generator[np.ndarray, None, None]: """ Iterator for video frames (non-async version). Yields: np.ndarray: RGB image as numpy array """ loop = asyncio.get_event_loop() async def _get_frames(): async for frame in self.aiter_frames(): yield frame for frame in loop.run_until_complete(_get_frames().__aiter__()): yield frame
[docs] def iter_frames_with_audio(self) -> Generator[Tuple[np.ndarray, np.ndarray], None, None]: """ Iterator for video frames with corresponding audio (non-async version). Yields: Tuple[np.ndarray, np.ndarray]: RGB image and audio samples """ loop = asyncio.get_event_loop() async def _get_frames_with_audio(): async for frame, audio in self.aiter_frames_with_audio(): yield frame, audio for frame, audio in loop.run_until_complete(_get_frames_with_audio().__aiter__()): yield frame, audio
[docs] def __del__(self): """Clean up resources when the object is garbage collected.""" loop = asyncio.get_event_loop() if loop.is_running(): asyncio.create_task(self.stop()) else: loop.run_until_complete(self.stop())
[docs] class RTMPOutput: """ Output frames to an RTMP stream. This class uses FFmpeg to send frames to an RTMP stream. """
[docs] def __init__( self, rtmp_url: str, width: int, height: int, fps: float = 30.0, bitrate: str = "2500k", format: str = "flv", vcodec: str = "libx264", acodec: str = "aac", audio_sample_rate: int = 44100, preset: str = "ultrafast", gpu_acceleration: bool = True ): """ Initialize RTMP output. Args: rtmp_url: RTMP URL to output to (e.g., 'rtmp://localhost/live/stream') width: Width of output video height: Height of output video fps: Frame rate of output video bitrate: Video bitrate format: Output format (usually 'flv' for RTMP) vcodec: Video codec acodec: Audio codec audio_sample_rate: Audio sample rate in Hz preset: FFmpeg preset for encoding speed vs quality tradeoff gpu_acceleration: Whether to use GPU acceleration if available """ self.rtmp_url = rtmp_url self.width = width self.height = height self.fps = fps self.bitrate = bitrate self.format = format self.vcodec = vcodec self.acodec = acodec self.audio_sample_rate = audio_sample_rate self.preset = preset self.gpu_acceleration = gpu_acceleration # Internal state self._process = None self._running = False self._frame_count = 0 self._start_time = 0 if not FFMPEG_AVAILABLE: raise ImportError("ffmpeg-python is required for RTMPOutput")
[docs] async def start(self): """Start the RTMP output stream.""" if self._running: return self._running = True self._start_time = time.time() self._frame_count = 0 # Set up FFmpeg command output_args = { 'format': self.format, 'pix_fmt': 'yuv420p', # Standard for most video codecs 'vcodec': self.vcodec, 'acodec': self.acodec, 'ar': str(self.audio_sample_rate), 'video_bitrate': self.bitrate, 'r': str(self.fps), 'preset': self.preset, 'f': 'flv' # Force FLV format for RTMP } # GPU acceleration if requested and available if self.gpu_acceleration and self.vcodec == 'libx264': try: # Try NVIDIA GPU acceleration output_args['vcodec'] = 'h264_nvenc' except Exception: logger.warning("GPU acceleration requested but not available") try: # Start the FFmpeg process process = ( ffmpeg .input( 'pipe:', format='rawvideo', pix_fmt='rgb24', s=f"{self.width}x{self.height}", r=str(self.fps) ) .output(self.rtmp_url, **output_args) .run_async(pipe_stdin=True) ) self._process = process logger.info(f"Started RTMP output to {self.rtmp_url}") except Exception as e: self._running = False logger.error(f"Failed to start RTMP output: {e}") raise
[docs] async def stop(self): """Stop the RTMP output stream.""" if not self._running: return self._running = False # Terminate FFmpeg process if self._process: self._process.stdin.close() self._process.wait() self._process = None logger.info(f"Stopped RTMP output to {self.rtmp_url}")
[docs] async def write_frame(self, frame: np.ndarray): """ Write a video frame to the RTMP stream. Args: frame: RGB image as numpy array Raises: RuntimeError: If the stream is not running """ if not self._running: await self.start() if not self._process: raise RuntimeError("FFmpeg process not started") try: # Ensure the frame is the right size if frame.shape[0] != self.height or frame.shape[1] != self.width: frame = cv2.resize(frame, (self.width, self.height)) # Ensure the frame is in RGB format if frame.shape[2] != 3: raise ValueError("Frame must be RGB (3 channels)") # Write the frame to FFmpeg's stdin self._process.stdin.write(frame.tobytes()) self._frame_count += 1 except Exception as e: logger.error(f"Error writing frame: {e}") raise
[docs] async def write_frame_with_audio(self, frame: np.ndarray, audio: np.ndarray): """ Write a video frame with audio to the RTMP stream. Note: This is a simplified implementation. In practice, handling audio in this way is complex and would require more sophisticated audio buffering and synchronization. Args: frame: RGB image as numpy array audio: Audio samples as numpy array """ # For now, we just write the frame and ignore the audio # A complete implementation would need to handle audio separately await self.write_frame(frame)
[docs] async def write_frame_async(self, frame: np.ndarray): """ Alias for write_frame to maintain naming consistency with other async methods. Args: frame: RGB image as numpy array """ await self.write_frame(frame)
[docs] async def close(self): """Alias for stop().""" await self.stop()
[docs] def __del__(self): """Clean up resources when the object is garbage collected.""" loop = asyncio.get_event_loop() if loop.is_running(): asyncio.create_task(self.stop()) else: try: loop.run_until_complete(self.stop()) except: pass # Ignore errors during cleanup
[docs] class StreamMonitor: """ Monitor health and performance of video streams. This class provides metrics and alerting for stream health. """
[docs] def __init__(self, metrics: List[str] = None): """ Initialize stream monitor. Args: metrics: List of metrics to monitor (e.g., ["bitrate", "fps", "latency"]) """ self.metrics = metrics or ["fps", "latency", "bitrate"] # Initialize metrics storage self._metrics_data = {metric: [] for metric in self.metrics} self._streams = {} self._running = False self._monitor_thread = None self._lock = threading.Lock()
[docs] def attach_stream(self, stream: Union[RTMPCapture, RTMPOutput], name: Optional[str] = None): """ Attach a stream to monitor. Args: stream: RTMP stream to monitor name: Optional name for the stream """ if name is None: if isinstance(stream, RTMPCapture): name = f"capture_{id(stream)}" else: name = f"output_{id(stream)}" with self._lock: self._streams[name] = { "stream": stream, "metrics": {metric: [] for metric in self.metrics}, "last_update": time.time(), "frame_count": 0, "alerts": [] } if not self._running: self._start_monitoring()
[docs] def detach_stream(self, name: str): """ Detach a stream from monitoring. Args: name: Name of the stream to detach """ with self._lock: if name in self._streams: del self._streams[name]
def _start_monitoring(self): """Start the monitoring thread.""" if self._running: return self._running = True self._monitor_thread = threading.Thread(target=self._monitor_loop) self._monitor_thread.daemon = True self._monitor_thread.start() def _stop_monitoring(self): """Stop the monitoring thread.""" self._running = False if self._monitor_thread: self._monitor_thread.join() self._monitor_thread = None def _monitor_loop(self): """Background thread for monitoring streams.""" while self._running: try: self._update_metrics() time.sleep(1.0) # Update once per second except Exception as e: logger.error(f"Error in monitor loop: {e}") def _update_metrics(self): """Update metrics for all streams.""" with self._lock: current_time = time.time() for name, data in self._streams.items(): stream = data["stream"] # Update metrics based on stream type and available data if isinstance(stream, RTMPCapture): # Calculate FPS if hasattr(stream, "_frame_count") and hasattr(stream, "_start_time"): elapsed = current_time - stream._start_time if elapsed > 0: fps = stream._frame_count / elapsed data["metrics"]["fps"].append(fps) # Trim metrics history (keep last 60 seconds) for metric, values in data["metrics"].items(): if len(values) > 60: data["metrics"][metric] = values[-60:]
[docs] def get_metric(self, metric: str, stream_name: Optional[str] = None) -> float: """ Get the current value of a metric. Args: metric: Name of the metric stream_name: Name of the stream (None for average across all streams) Returns: float: Current value of the metric """ with self._lock: if stream_name is not None: if stream_name not in self._streams: raise ValueError(f"Stream {stream_name} not found") values = self._streams[stream_name]["metrics"].get(metric, []) if not values: return 0.0 return values[-1] else: # Average across all streams all_values = [] for data in self._streams.values(): values = data["metrics"].get(metric, []) if values: all_values.append(values[-1]) if not all_values: return 0.0 return sum(all_values) / len(all_values)
[docs] def get_alerts(self, stream_name: Optional[str] = None) -> List[Dict]: """ Get active alerts for streams. Args: stream_name: Name of the stream (None for all streams) Returns: List[Dict]: List of alert dictionaries """ with self._lock: if stream_name is not None: if stream_name not in self._streams: return [] return self._streams[stream_name]["alerts"] else: # All alerts all_alerts = [] for name, data in self._streams.items(): for alert in data["alerts"]: alert_copy = alert.copy() alert_copy["stream"] = name all_alerts.append(alert_copy) return all_alerts
[docs] def __del__(self): """Clean up resources when the object is garbage collected.""" self._stop_monitoring()