"""
Progress tracking functionality for FMUS-VID.
This module provides utilities for tracking and reporting operation progress.
"""
from typing import Optional, Callable, Dict, Any
import time
import threading
from enum import Enum
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
[docs]
class OperationType(Enum):
"""Types of video operations for progress tracking."""
LOAD = "Loading"
SAVE = "Saving"
ENCODE = "Encoding"
DECODE = "Decoding"
PROCESS = "Processing"
ANALYZE = "Analyzing"
EXPORT = "Exporting"
STREAM = "Streaming"
[docs]
@dataclass
class ProgressStats:
"""Statistics for operation progress."""
start_time: float
current_time: float
total_frames: int
processed_frames: int
fps: float
eta: timedelta
percentage: float
[docs]
class ProgressTracker:
"""Class for tracking operation progress."""
[docs]
def __init__(self, operation: OperationType, total_frames: int,
callback: Optional[Callable[[ProgressStats], None]] = None,
update_interval: float = 0.1):
"""
Initialize progress tracker.
Args:
operation: Type of operation being tracked
total_frames: Total number of frames to process
callback: Function to call with progress updates
update_interval: Minimum time between progress updates
"""
self.operation = operation
self.total_frames = total_frames
self.callback = callback
self.update_interval = update_interval
self.processed_frames = 0
self.start_time = time.time()
self.last_update = 0
self.active = True
# Start update thread if callback provided
if callback:
self._update_thread = threading.Thread(target=self._update_loop)
self._update_thread.daemon = True
self._update_thread.start()
[docs]
def update(self, frames: int = 1) -> None:
"""
Update progress.
Args:
frames: Number of frames processed since last update
"""
self.processed_frames += frames
# Call callback if enough time has passed
current_time = time.time()
if (self.callback and
current_time - self.last_update >= self.update_interval):
self._send_update()
self.last_update = current_time
def _send_update(self) -> None:
"""Send progress update to callback."""
current_time = time.time()
elapsed = current_time - self.start_time
# Calculate statistics
if elapsed > 0:
fps = self.processed_frames / elapsed
remaining_frames = self.total_frames - self.processed_frames
eta_seconds = remaining_frames / fps if fps > 0 else 0
eta = timedelta(seconds=int(eta_seconds))
else:
fps = 0
eta = timedelta(0)
percentage = (self.processed_frames / self.total_frames * 100
if self.total_frames > 0 else 0)
stats = ProgressStats(
start_time=self.start_time,
current_time=current_time,
total_frames=self.total_frames,
processed_frames=self.processed_frames,
fps=fps,
eta=eta,
percentage=percentage
)
try:
self.callback(stats)
except Exception as e:
logger.error(f"Error in progress callback: {e}")
def _update_loop(self) -> None:
"""Background thread for sending periodic updates."""
while self.active:
if self.processed_frames < self.total_frames:
self._send_update()
time.sleep(self.update_interval)
[docs]
def complete(self) -> None:
"""Mark operation as complete."""
self.active = False
if self.callback:
self.processed_frames = self.total_frames
self._send_update()
[docs]
class ConsoleProgress:
"""Console-based progress reporter."""
[docs]
def __init__(self, operation: OperationType, total_frames: int):
"""
Initialize console progress reporter.
Args:
operation: Type of operation being tracked
total_frames: Total number of frames to process
"""
self.operation = operation
self.tracker = ProgressTracker(
operation=operation,
total_frames=total_frames,
callback=self._update_progress
)
def _update_progress(self, stats: ProgressStats) -> None:
"""Update progress display in console."""
# Calculate progress bar
width = 50
filled = int(width * stats.percentage / 100)
bar = "=" * filled + "-" * (width - filled)
# Format statistics
status = (
f"\r{self.operation.value}: [{bar}] "
f"{stats.percentage:.1f}% "
f"({stats.processed_frames}/{stats.total_frames} frames) "
f"@ {stats.fps:.1f} fps "
f"ETA: {stats.eta}"
)
# Print status (overwrite previous line)
print(status, end="", flush=True)
# Print newline when complete
if stats.processed_frames >= stats.total_frames:
print()
[docs]
def update(self, frames: int = 1) -> None:
"""
Update progress.
Args:
frames: Number of frames processed
"""
self.tracker.update(frames)
[docs]
def complete(self) -> None:
"""Mark operation as complete."""
self.tracker.complete()
[docs]
class ProgressManager:
"""Manager for tracking multiple operations."""
[docs]
def __init__(self):
"""Initialize progress manager."""
self.operations: Dict[str, ProgressTracker] = {}
[docs]
def start_operation(self, name: str, operation: OperationType,
total_frames: int,
callback: Optional[Callable[[ProgressStats], None]] = None) -> ProgressTracker:
"""
Start tracking a new operation.
Args:
name: Operation identifier
operation: Type of operation
total_frames: Total frames to process
callback: Progress callback function
Returns:
Progress tracker for the operation
"""
tracker = ProgressTracker(operation, total_frames, callback)
self.operations[name] = tracker
return tracker
[docs]
def update(self, name: str, frames: int = 1) -> None:
"""
Update operation progress.
Args:
name: Operation identifier
frames: Frames processed
"""
if name in self.operations:
self.operations[name].update(frames)
[docs]
def complete(self, name: str) -> None:
"""
Mark operation as complete.
Args:
name: Operation identifier
"""
if name in self.operations:
self.operations[name].complete()
del self.operations[name]
[docs]
def get_stats(self, name: str) -> Optional[ProgressStats]:
"""
Get current statistics for an operation.
Args:
name: Operation identifier
Returns:
Current progress statistics or None if operation not found
"""
if name in self.operations:
tracker = self.operations[name]
current_time = time.time()
elapsed = current_time - tracker.start_time
if elapsed > 0:
fps = tracker.processed_frames / elapsed
remaining_frames = tracker.total_frames - tracker.processed_frames
eta_seconds = remaining_frames / fps if fps > 0 else 0
eta = timedelta(seconds=int(eta_seconds))
else:
fps = 0
eta = timedelta(0)
percentage = (tracker.processed_frames / tracker.total_frames * 100
if tracker.total_frames > 0 else 0)
return ProgressStats(
start_time=tracker.start_time,
current_time=current_time,
total_frames=tracker.total_frames,
processed_frames=tracker.processed_frames,
fps=fps,
eta=eta,
percentage=percentage
)
return None