Source code for fmusvid.ai.translation

"""
Translation module for FMUS-VID.

This module provides classes for translating text using local LLMs like Ollama.
"""

import os
import time
import logging
import asyncio
import threading
import queue
import json
from typing import Dict, List, Optional, Union, Set, Any

logger = logging.getLogger(__name__)

# Try to import Ollama client, but don't fail if it's not available
try:
    import ollama
    OLLAMA_AVAILABLE = True
except ImportError:
    logger.warning("ollama not installed. Ollama-based translation will not be available.")
    OLLAMA_AVAILABLE = False

# Try to import requests for API calls
try:
    import requests
    REQUESTS_AVAILABLE = True
except ImportError:
    logger.warning("requests not installed. HTTP API-based translation will be limited.")
    REQUESTS_AVAILABLE = False


[docs] class LLMTranslator: """ Translate text using local large language models. This class provides translation capabilities using local LLMs like Ollama. """
[docs] def __init__( self, model: str = "ollama:mistral", target_languages: List[str] = None, source_language: str = "en", host: str = "http://localhost:11434", prompt_template: Optional[str] = None, temperature: float = 0.3, context_window: int = 8192, max_length: int = 200, batch_size: int = 1, timeout: float = 10.0, throttle_time: float = 0.1, cache_size: int = 1000 ): """ Initialize the LLM translator. Args: model: LLM model identifier (e.g., 'ollama:mistral', 'ollama:llama2') target_languages: List of target language codes (ISO 639-1) source_language: Source language code (ISO 639-1) host: Host URL for the LLM service prompt_template: Custom prompt template (None for default) temperature: Sampling temperature (0.0 to 1.0) context_window: Maximum context window size in tokens max_length: Maximum length of translation in tokens batch_size: Number of translations to process in a batch timeout: Timeout for translation requests in seconds throttle_time: Time to wait between translation requests cache_size: Size of translation cache """ self.model = model self.target_languages = target_languages or ["es", "fr", "de", "zh"] self.source_language = source_language self.host = host self.temperature = temperature self.context_window = context_window self.max_length = max_length self.batch_size = batch_size self.timeout = timeout self.throttle_time = throttle_time self.cache_size = cache_size # Set up prompt template self.prompt_template = prompt_template or ( "Translate the following {source_language} text to {target_language}. " "Provide only the translation, without any additional explanations or notes.\n\n" "Text: {text}\n\n" "Translation:" ) # Internal state self._backend = None self._translation_cache = {} self._cache_keys = [] self._cache_lock = threading.Lock() self._translation_queue = queue.Queue() self._result_queues = {} self._processing_thread = None self._running = False # Language names for prompts self._language_names = { "en": "English", "es": "Spanish", "fr": "French", "de": "German", "it": "Italian", "pt": "Portuguese", "ru": "Russian", "zh": "Chinese", "ja": "Japanese", "ko": "Korean", "ar": "Arabic", "hi": "Hindi", "tr": "Turkish", "nl": "Dutch", "sv": "Swedish", "pl": "Polish", "vi": "Vietnamese", "th": "Thai" } # Initialize backend based on model prefix self._initialize_backend()
def _initialize_backend(self): """Initialize the LLM backend based on the model prefix.""" if self.model.startswith("ollama:"): if not OLLAMA_AVAILABLE: raise ImportError("ollama package is required for Ollama-based translation") self._backend = "ollama" self.model_name = self.model.split(":", 1)[1] # Set Ollama API endpoint ollama.client.base_url = self.host logger.info(f"Initialized Ollama backend with model {self.model_name}") else: raise ValueError(f"Unsupported model: {self.model}") def _format_prompt(self, text: str, target_language: str) -> str: """ Format the translation prompt. Args: text: Text to translate target_language: Target language code Returns: Formatted prompt """ source_lang_name = self._language_names.get(self.source_language, self.source_language) target_lang_name = self._language_names.get(target_language, target_language) return self.prompt_template.format( source_language=source_lang_name, target_language=target_lang_name, text=text ) def _query_ollama(self, prompt: str) -> str: """ Query Ollama model. Args: prompt: Prompt to send to the model Returns: Model response """ try: response = ollama.generate( model=self.model_name, prompt=prompt, temperature=self.temperature, num_predict=self.max_length ) return response.get("response", "").strip() except Exception as e: logger.error(f"Ollama query error: {e}") return f"Error: {str(e)}" async def _query_ollama_async(self, prompt: str) -> str: """ Query Ollama model asynchronously. Args: prompt: Prompt to send to the model Returns: Model response """ # Ollama Python client doesn't have async API, so run in thread pool loop = asyncio.get_event_loop() return await loop.run_in_executor(None, self._query_ollama, prompt) def _translate_with_ollama(self, text: str, target_language: str) -> str: """ Translate text using Ollama. Args: text: Text to translate target_language: Target language code Returns: Translated text """ prompt = self._format_prompt(text, target_language) return self._query_ollama(prompt) async def _translate_with_ollama_async(self, text: str, target_language: str) -> str: """ Translate text using Ollama asynchronously. Args: text: Text to translate target_language: Target language code Returns: Translated text """ prompt = self._format_prompt(text, target_language) return await self._query_ollama_async(prompt)
[docs] def translate(self, text: str, target_languages: Optional[List[str]] = None) -> Dict[str, str]: """ Translate text to multiple languages. Args: text: Text to translate target_languages: List of target language codes (None for default languages) Returns: Dictionary mapping language codes to translated text """ if not text or not text.strip(): return {} target_languages = target_languages or self.target_languages result = {} # Check cache for each language uncached_languages = [] for lang in target_languages: if lang == self.source_language: result[lang] = text continue cache_key = f"{lang}:{text}" with self._cache_lock: if cache_key in self._translation_cache: result[lang] = self._translation_cache[cache_key] else: uncached_languages.append(lang) # Translate to languages not in cache for lang in uncached_languages: try: start_time = time.time() # Translate using appropriate backend if self._backend == "ollama": translation = self._translate_with_ollama(text, lang) else: raise ValueError(f"Unsupported backend: {self._backend}") result[lang] = translation # Update cache with self._cache_lock: cache_key = f"{lang}:{text}" self._translation_cache[cache_key] = translation self._cache_keys.append(cache_key) # Trim cache if needed if len(self._cache_keys) > self.cache_size: old_key = self._cache_keys.pop(0) if old_key in self._translation_cache: del self._translation_cache[old_key] elapsed = time.time() - start_time logger.debug(f"Translated to {lang} in {elapsed:.2f}s") # Throttle to avoid overloading the LLM service if self.throttle_time > 0 and lang != uncached_languages[-1]: time.sleep(self.throttle_time) except Exception as e: logger.error(f"Translation error for language {lang}: {e}") result[lang] = f"Error: {str(e)}" return result
[docs] async def translate_async(self, text: str, target_languages: Optional[List[str]] = None) -> Dict[str, str]: """ Translate text to multiple languages asynchronously. Args: text: Text to translate target_languages: List of target language codes (None for default languages) Returns: Dictionary mapping language codes to translated text """ if not text or not text.strip(): return {} target_languages = target_languages or self.target_languages result = {} # Check cache for each language uncached_languages = [] for lang in target_languages: if lang == self.source_language: result[lang] = text continue cache_key = f"{lang}:{text}" with self._cache_lock: if cache_key in self._translation_cache: result[lang] = self._translation_cache[cache_key] else: uncached_languages.append(lang) # Translate to languages not in cache tasks = [] for lang in uncached_languages: task = asyncio.create_task(self._translate_language_async(text, lang)) tasks.append((lang, task)) # Wait for all translations to complete for lang, task in tasks: try: translation = await task result[lang] = translation # Update cache with self._cache_lock: cache_key = f"{lang}:{text}" self._translation_cache[cache_key] = translation self._cache_keys.append(cache_key) # Trim cache if needed if len(self._cache_keys) > self.cache_size: old_key = self._cache_keys.pop(0) if old_key in self._translation_cache: del self._translation_cache[old_key] except Exception as e: logger.error(f"Async translation error for language {lang}: {e}") result[lang] = f"Error: {str(e)}" return result
async def _translate_language_async(self, text: str, lang: str) -> str: """ Translate text to a single language asynchronously. Args: text: Text to translate lang: Target language code Returns: Translated text """ start_time = time.time() try: # Translate using appropriate backend if self._backend == "ollama": translation = await self._translate_with_ollama_async(text, lang) else: raise ValueError(f"Unsupported backend: {self._backend}") elapsed = time.time() - start_time logger.debug(f"Async translated to {lang} in {elapsed:.2f}s") return translation except Exception as e: logger.error(f"Async translation error for language {lang}: {e}") raise
[docs] def start_batch_translation(self): """Start background thread for batch translation.""" if self._running: return self._running = True # Clear queues while not self._translation_queue.empty(): self._translation_queue.get() for queue in self._result_queues.values(): while not queue.empty(): queue.get() # Start processing thread self._processing_thread = threading.Thread(target=self._process_translation_queue) self._processing_thread.daemon = True self._processing_thread.start() logger.info("Started batch translation thread")
[docs] def stop_batch_translation(self): """Stop background thread for batch translation.""" if not self._running: return self._running = False # Wait for processing thread to finish if self._processing_thread: self._processing_thread.join(timeout=2.0) self._processing_thread = None logger.info("Stopped batch translation thread")
def _process_translation_queue(self): """Background thread for processing batch translations.""" try: logger.info("Batch translation thread started") while self._running: # Collect batch of translations batch = [] batch_ids = [] try: # Get first item with timeout item = self._translation_queue.get(timeout=0.5) batch.append(item) batch_ids.append(item["id"]) # Get more items without blocking try: while len(batch) < self.batch_size: item = self._translation_queue.get_nowait() batch.append(item) batch_ids.append(item["id"]) except queue.Empty: pass except queue.Empty: # No items in queue, continue waiting continue # Process batch try: for item in batch: text = item["text"] lang = item["language"] request_id = item["id"] result_queue = self._result_queues.get(request_id) if not result_queue: logger.warning(f"No result queue for request {request_id}") continue try: # Translate if self._backend == "ollama": translation = self._translate_with_ollama(text, lang) else: translation = f"Error: Unsupported backend {self._backend}" # Add to result queue result_queue.put({ "language": lang, "translation": translation, "error": None }) # Update cache with self._cache_lock: cache_key = f"{lang}:{text}" self._translation_cache[cache_key] = translation self._cache_keys.append(cache_key) # Trim cache if needed if len(self._cache_keys) > self.cache_size: old_key = self._cache_keys.pop(0) if old_key in self._translation_cache: del self._translation_cache[old_key] except Exception as e: logger.error(f"Batch translation error: {e}") # Add error to result queue result_queue.put({ "language": lang, "translation": None, "error": str(e) }) # Mark item as done self._translation_queue.task_done() # Throttle to avoid overloading the LLM service if self.throttle_time > 0: time.sleep(self.throttle_time) except Exception as e: logger.error(f"Batch processing error: {e}") # Mark all items as done for _ in range(len(batch)): self._translation_queue.task_done() except Exception as e: logger.error(f"Batch translation thread error: {e}") finally: logger.info("Batch translation thread stopped")
[docs] def batch_translate(self, text: str, target_languages: Optional[List[str]] = None) -> Dict[str, str]: """ Submit text for batch translation. Args: text: Text to translate target_languages: List of target language codes (None for default languages) Returns: Dictionary mapping language codes to translated text """ if not self._running: self.start_batch_translation() target_languages = target_languages or self.target_languages result = {} # Check cache for each language uncached_languages = [] for lang in target_languages: if lang == self.source_language: result[lang] = text continue cache_key = f"{lang}:{text}" with self._cache_lock: if cache_key in self._translation_cache: result[lang] = self._translation_cache[cache_key] else: uncached_languages.append(lang) # If all languages are cached, return immediately if not uncached_languages: return result # Create a request ID and result queue request_id = f"{time.time()}_{id(text)}" result_queue = queue.Queue() self._result_queues[request_id] = result_queue # Submit translations to queue for lang in uncached_languages: self._translation_queue.put({ "id": request_id, "text": text, "language": lang }) # Wait for results timeout_per_language = self.timeout total_timeout = timeout_per_language * len(uncached_languages) end_time = time.time() + total_timeout for lang in uncached_languages: try: # Calculate remaining timeout remaining = max(0.1, end_time - time.time()) # Wait for result item = result_queue.get(timeout=remaining) if item["error"] is None: result[item["language"]] = item["translation"] else: result[item["language"]] = f"Error: {item['error']}" result_queue.task_done() except queue.Empty: logger.warning(f"Timeout waiting for translation to {lang}") result[lang] = "Error: Translation timed out" # Clean up del self._result_queues[request_id] return result
[docs] def __del__(self): """Clean up resources when the object is garbage collected.""" self.stop_batch_translation()