"""
RTMP streaming module for FMUS-VID.
This module provides classes for capturing from RTMP streams, outputting to RTMP
streams, and monitoring stream health.
"""
import os
import time
import asyncio
import logging
import subprocess
import threading
import tempfile
from typing import Dict, List, Tuple, Optional, AsyncGenerator, Generator, Union
from pathlib import Path
import numpy as np
import cv2
logger = logging.getLogger(__name__)
# Try to import FFmpeg-python, but don't fail if it's not available
try:
import ffmpeg
FFMPEG_AVAILABLE = True
except ImportError:
logger.warning("ffmpeg-python not installed. Some RTMP functionality will be limited.")
FFMPEG_AVAILABLE = False
[docs]
class RTMPCapture:
"""
Capture frames and audio from an RTMP stream.
This class uses FFmpeg to read from an RTMP stream and provides
methods to access frames and audio data.
"""
[docs]
def __init__(
self,
rtmp_url: str,
width: Optional[int] = None,
height: Optional[int] = None,
fps: Optional[float] = None,
audio_sample_rate: int = 16000,
buffer_size: int = 10,
gpu_acceleration: bool = True
):
"""
Initialize RTMP capture.
Args:
rtmp_url: RTMP URL to capture from (e.g., 'rtmp://localhost/live/stream')
width: Output width (None to use source width)
height: Output height (None to use source height)
fps: Output frame rate (None to use source frame rate)
audio_sample_rate: Audio sample rate in Hz
buffer_size: Size of frame buffer
gpu_acceleration: Whether to use GPU acceleration if available
"""
self.rtmp_url = rtmp_url
self.width = width
self.height = height
self.fps = fps
self.audio_sample_rate = audio_sample_rate
self.buffer_size = buffer_size
self.gpu_acceleration = gpu_acceleration
# Internal state
self._process = None
self._frame_buffer = []
self._audio_buffer = []
self._running = False
self._frame_count = 0
self._last_frame_time = 0
self._start_time = 0
self._temp_dir = None
self._temp_video_file = None
self._temp_audio_file = None
if not FFMPEG_AVAILABLE:
raise ImportError("ffmpeg-python is required for RTMPCapture")
[docs]
async def start(self):
"""Start capturing from the RTMP stream."""
if self._running:
return
self._running = True
self._start_time = time.time()
self._frame_count = 0
# Create temporary directory for intermediate files
self._temp_dir = tempfile.TemporaryDirectory()
self._temp_video_file = os.path.join(self._temp_dir.name, "temp_video.mp4")
self._temp_audio_file = os.path.join(self._temp_dir.name, "temp_audio.wav")
# Start FFmpeg process
input_args = {'loglevel': 'quiet', 'rtmp_buffer': '100000'}
output_args = {
'format': 'rawvideo',
'pix_fmt': 'rgb24'
}
if self.width and self.height:
output_args['s'] = f"{self.width}x{self.height}"
if self.fps:
output_args['r'] = str(self.fps)
# GPU acceleration if requested and available
if self.gpu_acceleration:
# Check for available hardware acceleration
try:
# Try NVIDIA GPU acceleration
input_args['hwaccel'] = 'cuda'
except Exception:
logger.warning("GPU acceleration requested but not available")
try:
# Start the FFmpeg process for video
process = (
ffmpeg
.input(self.rtmp_url, **input_args)
.output('pipe:', **output_args)
.run_async(pipe_stdout=True)
)
self._process = process
# Start the audio extraction in a separate thread
audio_thread = threading.Thread(
target=self._extract_audio,
args=(self.rtmp_url, self._temp_audio_file)
)
audio_thread.daemon = True
audio_thread.start()
logger.info(f"Started RTMP capture from {self.rtmp_url}")
except Exception as e:
self._running = False
logger.error(f"Failed to start RTMP capture: {e}")
raise
def _extract_audio(self, rtmp_url, output_file):
"""Extract audio from RTMP stream in a separate thread."""
try:
(
ffmpeg
.input(rtmp_url)
.output(
output_file,
format='wav',
acodec='pcm_s16le',
ar=str(self.audio_sample_rate),
ac='1'
)
.run(capture_stdout=True, capture_stderr=True)
)
except Exception as e:
logger.error(f"Audio extraction failed: {e}")
[docs]
async def stop(self):
"""Stop capturing from the RTMP stream."""
if not self._running:
return
self._running = False
# Terminate FFmpeg process
if self._process:
self._process.terminate()
self._process = None
# Clean up temporary files
if self._temp_dir:
self._temp_dir.cleanup()
self._temp_dir = None
logger.info(f"Stopped RTMP capture from {self.rtmp_url}")
[docs]
async def get_frame(self) -> np.ndarray:
"""
Get the next video frame from the stream.
Returns:
np.ndarray: RGB image as numpy array
Raises:
RuntimeError: If the stream is not running
"""
if not self._running:
raise RuntimeError("RTMP capture not running")
if not self._process:
raise RuntimeError("FFmpeg process not started")
try:
# Read raw video frame from stdout
if self.width and self.height:
frame_size = self.width * self.height * 3
else:
# This is just a default size, will be resized after first frame
frame_size = 1920 * 1080 * 3
in_bytes = self._process.stdout.read(frame_size)
if not in_bytes:
raise EOFError("End of stream")
# Convert to numpy array
if self.width and self.height:
frame = np.frombuffer(in_bytes, np.uint8).reshape([self.height, self.width, 3])
else:
# Need to determine dimensions from the first frame
# This is simplified and might need more robust handling
frame = np.frombuffer(in_bytes, np.uint8)
# Assuming 16:9 aspect ratio and RGB
est_height = int(np.sqrt(len(frame) / 3 / 16 * 9))
est_width = int(est_height * 16 / 9)
frame = frame[:est_height*est_width*3].reshape([est_height, est_width, 3])
# Update dimensions for future frames
self.height, self.width = frame.shape[:2]
self._frame_count += 1
self._last_frame_time = time.time()
return frame
except Exception as e:
logger.error(f"Error getting frame: {e}")
raise
[docs]
async def get_audio(self, duration: float = 0.1) -> np.ndarray:
"""
Get audio samples for a specified duration.
Args:
duration: Duration of audio to get in seconds
Returns:
np.ndarray: Audio samples as numpy array
"""
if not self._running:
raise RuntimeError("RTMP capture not running")
# Calculate number of samples to read
num_samples = int(self.audio_sample_rate * duration)
# This is a simplified implementation
# In a real implementation, we would read directly from the audio extraction process
# or use a real-time audio buffer
try:
if os.path.exists(self._temp_audio_file):
# Read audio from the temporary WAV file
import soundfile as sf
with sf.SoundFile(self._temp_audio_file, 'r') as f:
# Get current position in the stream
elapsed = time.time() - self._start_time
target_position = int(elapsed * f.samplerate)
# Seek to the current position
f.seek(max(0, target_position - num_samples))
# Read samples
audio = f.read(num_samples)
return audio
else:
# Return silence if audio file doesn't exist yet
return np.zeros(num_samples, dtype=np.float32)
except Exception as e:
logger.error(f"Error getting audio: {e}")
# Return silence on error
return np.zeros(num_samples, dtype=np.float32)
[docs]
async def get_frame_and_audio(self) -> Tuple[np.ndarray, np.ndarray]:
"""
Get the next video frame and corresponding audio.
Returns:
Tuple[np.ndarray, np.ndarray]: RGB image and audio samples
"""
frame = await self.get_frame()
# Estimate audio duration based on frame rate
if self.fps:
audio_duration = 1.0 / self.fps
else:
audio_duration = 0.033 # ~30fps
audio = await self.get_audio(audio_duration)
return frame, audio
[docs]
async def aiter_frames(self) -> AsyncGenerator[np.ndarray, None]:
"""
Async iterator for video frames.
Yields:
np.ndarray: RGB image as numpy array
"""
if not self._running:
await self.start()
try:
while self._running:
frame = await self.get_frame()
yield frame
finally:
await self.stop()
[docs]
async def aiter_frames_with_audio(self) -> AsyncGenerator[Tuple[np.ndarray, np.ndarray], None]:
"""
Async iterator for video frames with corresponding audio.
Yields:
Tuple[np.ndarray, np.ndarray]: RGB image and audio samples
"""
if not self._running:
await self.start()
try:
while self._running:
frame, audio = await self.get_frame_and_audio()
yield frame, audio
finally:
await self.stop()
[docs]
def iter_frames(self) -> Generator[np.ndarray, None, None]:
"""
Iterator for video frames (non-async version).
Yields:
np.ndarray: RGB image as numpy array
"""
loop = asyncio.get_event_loop()
async def _get_frames():
async for frame in self.aiter_frames():
yield frame
for frame in loop.run_until_complete(_get_frames().__aiter__()):
yield frame
[docs]
def iter_frames_with_audio(self) -> Generator[Tuple[np.ndarray, np.ndarray], None, None]:
"""
Iterator for video frames with corresponding audio (non-async version).
Yields:
Tuple[np.ndarray, np.ndarray]: RGB image and audio samples
"""
loop = asyncio.get_event_loop()
async def _get_frames_with_audio():
async for frame, audio in self.aiter_frames_with_audio():
yield frame, audio
for frame, audio in loop.run_until_complete(_get_frames_with_audio().__aiter__()):
yield frame, audio
[docs]
def __del__(self):
"""Clean up resources when the object is garbage collected."""
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.create_task(self.stop())
else:
loop.run_until_complete(self.stop())
[docs]
class RTMPOutput:
"""
Output frames to an RTMP stream.
This class uses FFmpeg to send frames to an RTMP stream.
"""
[docs]
def __init__(
self,
rtmp_url: str,
width: int,
height: int,
fps: float = 30.0,
bitrate: str = "2500k",
format: str = "flv",
vcodec: str = "libx264",
acodec: str = "aac",
audio_sample_rate: int = 44100,
preset: str = "ultrafast",
gpu_acceleration: bool = True
):
"""
Initialize RTMP output.
Args:
rtmp_url: RTMP URL to output to (e.g., 'rtmp://localhost/live/stream')
width: Width of output video
height: Height of output video
fps: Frame rate of output video
bitrate: Video bitrate
format: Output format (usually 'flv' for RTMP)
vcodec: Video codec
acodec: Audio codec
audio_sample_rate: Audio sample rate in Hz
preset: FFmpeg preset for encoding speed vs quality tradeoff
gpu_acceleration: Whether to use GPU acceleration if available
"""
self.rtmp_url = rtmp_url
self.width = width
self.height = height
self.fps = fps
self.bitrate = bitrate
self.format = format
self.vcodec = vcodec
self.acodec = acodec
self.audio_sample_rate = audio_sample_rate
self.preset = preset
self.gpu_acceleration = gpu_acceleration
# Internal state
self._process = None
self._running = False
self._frame_count = 0
self._start_time = 0
if not FFMPEG_AVAILABLE:
raise ImportError("ffmpeg-python is required for RTMPOutput")
[docs]
async def start(self):
"""Start the RTMP output stream."""
if self._running:
return
self._running = True
self._start_time = time.time()
self._frame_count = 0
# Set up FFmpeg command
output_args = {
'format': self.format,
'pix_fmt': 'yuv420p', # Standard for most video codecs
'vcodec': self.vcodec,
'acodec': self.acodec,
'ar': str(self.audio_sample_rate),
'video_bitrate': self.bitrate,
'r': str(self.fps),
'preset': self.preset,
'f': 'flv' # Force FLV format for RTMP
}
# GPU acceleration if requested and available
if self.gpu_acceleration and self.vcodec == 'libx264':
try:
# Try NVIDIA GPU acceleration
output_args['vcodec'] = 'h264_nvenc'
except Exception:
logger.warning("GPU acceleration requested but not available")
try:
# Start the FFmpeg process
process = (
ffmpeg
.input(
'pipe:',
format='rawvideo',
pix_fmt='rgb24',
s=f"{self.width}x{self.height}",
r=str(self.fps)
)
.output(self.rtmp_url, **output_args)
.run_async(pipe_stdin=True)
)
self._process = process
logger.info(f"Started RTMP output to {self.rtmp_url}")
except Exception as e:
self._running = False
logger.error(f"Failed to start RTMP output: {e}")
raise
[docs]
async def stop(self):
"""Stop the RTMP output stream."""
if not self._running:
return
self._running = False
# Terminate FFmpeg process
if self._process:
self._process.stdin.close()
self._process.wait()
self._process = None
logger.info(f"Stopped RTMP output to {self.rtmp_url}")
[docs]
async def write_frame(self, frame: np.ndarray):
"""
Write a video frame to the RTMP stream.
Args:
frame: RGB image as numpy array
Raises:
RuntimeError: If the stream is not running
"""
if not self._running:
await self.start()
if not self._process:
raise RuntimeError("FFmpeg process not started")
try:
# Ensure the frame is the right size
if frame.shape[0] != self.height or frame.shape[1] != self.width:
frame = cv2.resize(frame, (self.width, self.height))
# Ensure the frame is in RGB format
if frame.shape[2] != 3:
raise ValueError("Frame must be RGB (3 channels)")
# Write the frame to FFmpeg's stdin
self._process.stdin.write(frame.tobytes())
self._frame_count += 1
except Exception as e:
logger.error(f"Error writing frame: {e}")
raise
[docs]
async def write_frame_with_audio(self, frame: np.ndarray, audio: np.ndarray):
"""
Write a video frame with audio to the RTMP stream.
Note: This is a simplified implementation. In practice, handling
audio in this way is complex and would require more sophisticated
audio buffering and synchronization.
Args:
frame: RGB image as numpy array
audio: Audio samples as numpy array
"""
# For now, we just write the frame and ignore the audio
# A complete implementation would need to handle audio separately
await self.write_frame(frame)
[docs]
async def write_frame_async(self, frame: np.ndarray):
"""
Alias for write_frame to maintain naming consistency with other async methods.
Args:
frame: RGB image as numpy array
"""
await self.write_frame(frame)
[docs]
async def close(self):
"""Alias for stop()."""
await self.stop()
[docs]
def __del__(self):
"""Clean up resources when the object is garbage collected."""
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.create_task(self.stop())
else:
try:
loop.run_until_complete(self.stop())
except:
pass # Ignore errors during cleanup
[docs]
class StreamMonitor:
"""
Monitor health and performance of video streams.
This class provides metrics and alerting for stream health.
"""
[docs]
def __init__(self, metrics: List[str] = None):
"""
Initialize stream monitor.
Args:
metrics: List of metrics to monitor (e.g., ["bitrate", "fps", "latency"])
"""
self.metrics = metrics or ["fps", "latency", "bitrate"]
# Initialize metrics storage
self._metrics_data = {metric: [] for metric in self.metrics}
self._streams = {}
self._running = False
self._monitor_thread = None
self._lock = threading.Lock()
[docs]
def attach_stream(self, stream: Union[RTMPCapture, RTMPOutput], name: Optional[str] = None):
"""
Attach a stream to monitor.
Args:
stream: RTMP stream to monitor
name: Optional name for the stream
"""
if name is None:
if isinstance(stream, RTMPCapture):
name = f"capture_{id(stream)}"
else:
name = f"output_{id(stream)}"
with self._lock:
self._streams[name] = {
"stream": stream,
"metrics": {metric: [] for metric in self.metrics},
"last_update": time.time(),
"frame_count": 0,
"alerts": []
}
if not self._running:
self._start_monitoring()
[docs]
def detach_stream(self, name: str):
"""
Detach a stream from monitoring.
Args:
name: Name of the stream to detach
"""
with self._lock:
if name in self._streams:
del self._streams[name]
def _start_monitoring(self):
"""Start the monitoring thread."""
if self._running:
return
self._running = True
self._monitor_thread = threading.Thread(target=self._monitor_loop)
self._monitor_thread.daemon = True
self._monitor_thread.start()
def _stop_monitoring(self):
"""Stop the monitoring thread."""
self._running = False
if self._monitor_thread:
self._monitor_thread.join()
self._monitor_thread = None
def _monitor_loop(self):
"""Background thread for monitoring streams."""
while self._running:
try:
self._update_metrics()
time.sleep(1.0) # Update once per second
except Exception as e:
logger.error(f"Error in monitor loop: {e}")
def _update_metrics(self):
"""Update metrics for all streams."""
with self._lock:
current_time = time.time()
for name, data in self._streams.items():
stream = data["stream"]
# Update metrics based on stream type and available data
if isinstance(stream, RTMPCapture):
# Calculate FPS
if hasattr(stream, "_frame_count") and hasattr(stream, "_start_time"):
elapsed = current_time - stream._start_time
if elapsed > 0:
fps = stream._frame_count / elapsed
data["metrics"]["fps"].append(fps)
# Trim metrics history (keep last 60 seconds)
for metric, values in data["metrics"].items():
if len(values) > 60:
data["metrics"][metric] = values[-60:]
[docs]
def get_metric(self, metric: str, stream_name: Optional[str] = None) -> float:
"""
Get the current value of a metric.
Args:
metric: Name of the metric
stream_name: Name of the stream (None for average across all streams)
Returns:
float: Current value of the metric
"""
with self._lock:
if stream_name is not None:
if stream_name not in self._streams:
raise ValueError(f"Stream {stream_name} not found")
values = self._streams[stream_name]["metrics"].get(metric, [])
if not values:
return 0.0
return values[-1]
else:
# Average across all streams
all_values = []
for data in self._streams.values():
values = data["metrics"].get(metric, [])
if values:
all_values.append(values[-1])
if not all_values:
return 0.0
return sum(all_values) / len(all_values)
[docs]
def get_alerts(self, stream_name: Optional[str] = None) -> List[Dict]:
"""
Get active alerts for streams.
Args:
stream_name: Name of the stream (None for all streams)
Returns:
List[Dict]: List of alert dictionaries
"""
with self._lock:
if stream_name is not None:
if stream_name not in self._streams:
return []
return self._streams[stream_name]["alerts"]
else:
# All alerts
all_alerts = []
for name, data in self._streams.items():
for alert in data["alerts"]:
alert_copy = alert.copy()
alert_copy["stream"] = name
all_alerts.append(alert_copy)
return all_alerts
[docs]
def __del__(self):
"""Clean up resources when the object is garbage collected."""
self._stop_monitoring()