Source code for fmusvid.utils.progress

"""
Progress tracking functionality for FMUS-VID.

This module provides utilities for tracking and reporting operation progress.
"""

from typing import Optional, Callable, Dict, Any
import time
import threading
from enum import Enum
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta

logger = logging.getLogger(__name__)

[docs] class OperationType(Enum): """Types of video operations for progress tracking.""" LOAD = "Loading" SAVE = "Saving" ENCODE = "Encoding" DECODE = "Decoding" PROCESS = "Processing" ANALYZE = "Analyzing" EXPORT = "Exporting" STREAM = "Streaming"
[docs] @dataclass class ProgressStats: """Statistics for operation progress.""" start_time: float current_time: float total_frames: int processed_frames: int fps: float eta: timedelta percentage: float
[docs] class ProgressTracker: """Class for tracking operation progress."""
[docs] def __init__(self, operation: OperationType, total_frames: int, callback: Optional[Callable[[ProgressStats], None]] = None, update_interval: float = 0.1): """ Initialize progress tracker. Args: operation: Type of operation being tracked total_frames: Total number of frames to process callback: Function to call with progress updates update_interval: Minimum time between progress updates """ self.operation = operation self.total_frames = total_frames self.callback = callback self.update_interval = update_interval self.processed_frames = 0 self.start_time = time.time() self.last_update = 0 self.active = True # Start update thread if callback provided if callback: self._update_thread = threading.Thread(target=self._update_loop) self._update_thread.daemon = True self._update_thread.start()
[docs] def update(self, frames: int = 1) -> None: """ Update progress. Args: frames: Number of frames processed since last update """ self.processed_frames += frames # Call callback if enough time has passed current_time = time.time() if (self.callback and current_time - self.last_update >= self.update_interval): self._send_update() self.last_update = current_time
def _send_update(self) -> None: """Send progress update to callback.""" current_time = time.time() elapsed = current_time - self.start_time # Calculate statistics if elapsed > 0: fps = self.processed_frames / elapsed remaining_frames = self.total_frames - self.processed_frames eta_seconds = remaining_frames / fps if fps > 0 else 0 eta = timedelta(seconds=int(eta_seconds)) else: fps = 0 eta = timedelta(0) percentage = (self.processed_frames / self.total_frames * 100 if self.total_frames > 0 else 0) stats = ProgressStats( start_time=self.start_time, current_time=current_time, total_frames=self.total_frames, processed_frames=self.processed_frames, fps=fps, eta=eta, percentage=percentage ) try: self.callback(stats) except Exception as e: logger.error(f"Error in progress callback: {e}") def _update_loop(self) -> None: """Background thread for sending periodic updates.""" while self.active: if self.processed_frames < self.total_frames: self._send_update() time.sleep(self.update_interval)
[docs] def complete(self) -> None: """Mark operation as complete.""" self.active = False if self.callback: self.processed_frames = self.total_frames self._send_update()
[docs] class ConsoleProgress: """Console-based progress reporter."""
[docs] def __init__(self, operation: OperationType, total_frames: int): """ Initialize console progress reporter. Args: operation: Type of operation being tracked total_frames: Total number of frames to process """ self.operation = operation self.tracker = ProgressTracker( operation=operation, total_frames=total_frames, callback=self._update_progress )
def _update_progress(self, stats: ProgressStats) -> None: """Update progress display in console.""" # Calculate progress bar width = 50 filled = int(width * stats.percentage / 100) bar = "=" * filled + "-" * (width - filled) # Format statistics status = ( f"\r{self.operation.value}: [{bar}] " f"{stats.percentage:.1f}% " f"({stats.processed_frames}/{stats.total_frames} frames) " f"@ {stats.fps:.1f} fps " f"ETA: {stats.eta}" ) # Print status (overwrite previous line) print(status, end="", flush=True) # Print newline when complete if stats.processed_frames >= stats.total_frames: print()
[docs] def update(self, frames: int = 1) -> None: """ Update progress. Args: frames: Number of frames processed """ self.tracker.update(frames)
[docs] def complete(self) -> None: """Mark operation as complete.""" self.tracker.complete()
[docs] class ProgressManager: """Manager for tracking multiple operations."""
[docs] def __init__(self): """Initialize progress manager.""" self.operations: Dict[str, ProgressTracker] = {}
[docs] def start_operation(self, name: str, operation: OperationType, total_frames: int, callback: Optional[Callable[[ProgressStats], None]] = None) -> ProgressTracker: """ Start tracking a new operation. Args: name: Operation identifier operation: Type of operation total_frames: Total frames to process callback: Progress callback function Returns: Progress tracker for the operation """ tracker = ProgressTracker(operation, total_frames, callback) self.operations[name] = tracker return tracker
[docs] def update(self, name: str, frames: int = 1) -> None: """ Update operation progress. Args: name: Operation identifier frames: Frames processed """ if name in self.operations: self.operations[name].update(frames)
[docs] def complete(self, name: str) -> None: """ Mark operation as complete. Args: name: Operation identifier """ if name in self.operations: self.operations[name].complete() del self.operations[name]
[docs] def get_stats(self, name: str) -> Optional[ProgressStats]: """ Get current statistics for an operation. Args: name: Operation identifier Returns: Current progress statistics or None if operation not found """ if name in self.operations: tracker = self.operations[name] current_time = time.time() elapsed = current_time - tracker.start_time if elapsed > 0: fps = tracker.processed_frames / elapsed remaining_frames = tracker.total_frames - tracker.processed_frames eta_seconds = remaining_frames / fps if fps > 0 else 0 eta = timedelta(seconds=int(eta_seconds)) else: fps = 0 eta = timedelta(0) percentage = (tracker.processed_frames / tracker.total_frames * 100 if tracker.total_frames > 0 else 0) return ProgressStats( start_time=tracker.start_time, current_time=current_time, total_frames=tracker.total_frames, processed_frames=tracker.processed_frames, fps=fps, eta=eta, percentage=percentage ) return None