"""
Translation module for FMUS-VID.
This module provides classes for translating text using local LLMs like Ollama.
"""
import os
import time
import logging
import asyncio
import threading
import queue
import json
from typing import Dict, List, Optional, Union, Set, Any
logger = logging.getLogger(__name__)
# Try to import Ollama client, but don't fail if it's not available
try:
import ollama
OLLAMA_AVAILABLE = True
except ImportError:
logger.warning("ollama not installed. Ollama-based translation will not be available.")
OLLAMA_AVAILABLE = False
# Try to import requests for API calls
try:
import requests
REQUESTS_AVAILABLE = True
except ImportError:
logger.warning("requests not installed. HTTP API-based translation will be limited.")
REQUESTS_AVAILABLE = False
[docs]
class LLMTranslator:
"""
Translate text using local large language models.
This class provides translation capabilities using local LLMs like Ollama.
"""
[docs]
def __init__(
self,
model: str = "ollama:mistral",
target_languages: List[str] = None,
source_language: str = "en",
host: str = "http://localhost:11434",
prompt_template: Optional[str] = None,
temperature: float = 0.3,
context_window: int = 8192,
max_length: int = 200,
batch_size: int = 1,
timeout: float = 10.0,
throttle_time: float = 0.1,
cache_size: int = 1000
):
"""
Initialize the LLM translator.
Args:
model: LLM model identifier (e.g., 'ollama:mistral', 'ollama:llama2')
target_languages: List of target language codes (ISO 639-1)
source_language: Source language code (ISO 639-1)
host: Host URL for the LLM service
prompt_template: Custom prompt template (None for default)
temperature: Sampling temperature (0.0 to 1.0)
context_window: Maximum context window size in tokens
max_length: Maximum length of translation in tokens
batch_size: Number of translations to process in a batch
timeout: Timeout for translation requests in seconds
throttle_time: Time to wait between translation requests
cache_size: Size of translation cache
"""
self.model = model
self.target_languages = target_languages or ["es", "fr", "de", "zh"]
self.source_language = source_language
self.host = host
self.temperature = temperature
self.context_window = context_window
self.max_length = max_length
self.batch_size = batch_size
self.timeout = timeout
self.throttle_time = throttle_time
self.cache_size = cache_size
# Set up prompt template
self.prompt_template = prompt_template or (
"Translate the following {source_language} text to {target_language}. "
"Provide only the translation, without any additional explanations or notes.\n\n"
"Text: {text}\n\n"
"Translation:"
)
# Internal state
self._backend = None
self._translation_cache = {}
self._cache_keys = []
self._cache_lock = threading.Lock()
self._translation_queue = queue.Queue()
self._result_queues = {}
self._processing_thread = None
self._running = False
# Language names for prompts
self._language_names = {
"en": "English",
"es": "Spanish",
"fr": "French",
"de": "German",
"it": "Italian",
"pt": "Portuguese",
"ru": "Russian",
"zh": "Chinese",
"ja": "Japanese",
"ko": "Korean",
"ar": "Arabic",
"hi": "Hindi",
"tr": "Turkish",
"nl": "Dutch",
"sv": "Swedish",
"pl": "Polish",
"vi": "Vietnamese",
"th": "Thai"
}
# Initialize backend based on model prefix
self._initialize_backend()
def _initialize_backend(self):
"""Initialize the LLM backend based on the model prefix."""
if self.model.startswith("ollama:"):
if not OLLAMA_AVAILABLE:
raise ImportError("ollama package is required for Ollama-based translation")
self._backend = "ollama"
self.model_name = self.model.split(":", 1)[1]
# Set Ollama API endpoint
ollama.client.base_url = self.host
logger.info(f"Initialized Ollama backend with model {self.model_name}")
else:
raise ValueError(f"Unsupported model: {self.model}")
def _format_prompt(self, text: str, target_language: str) -> str:
"""
Format the translation prompt.
Args:
text: Text to translate
target_language: Target language code
Returns:
Formatted prompt
"""
source_lang_name = self._language_names.get(self.source_language, self.source_language)
target_lang_name = self._language_names.get(target_language, target_language)
return self.prompt_template.format(
source_language=source_lang_name,
target_language=target_lang_name,
text=text
)
def _query_ollama(self, prompt: str) -> str:
"""
Query Ollama model.
Args:
prompt: Prompt to send to the model
Returns:
Model response
"""
try:
response = ollama.generate(
model=self.model_name,
prompt=prompt,
temperature=self.temperature,
num_predict=self.max_length
)
return response.get("response", "").strip()
except Exception as e:
logger.error(f"Ollama query error: {e}")
return f"Error: {str(e)}"
async def _query_ollama_async(self, prompt: str) -> str:
"""
Query Ollama model asynchronously.
Args:
prompt: Prompt to send to the model
Returns:
Model response
"""
# Ollama Python client doesn't have async API, so run in thread pool
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._query_ollama, prompt)
def _translate_with_ollama(self, text: str, target_language: str) -> str:
"""
Translate text using Ollama.
Args:
text: Text to translate
target_language: Target language code
Returns:
Translated text
"""
prompt = self._format_prompt(text, target_language)
return self._query_ollama(prompt)
async def _translate_with_ollama_async(self, text: str, target_language: str) -> str:
"""
Translate text using Ollama asynchronously.
Args:
text: Text to translate
target_language: Target language code
Returns:
Translated text
"""
prompt = self._format_prompt(text, target_language)
return await self._query_ollama_async(prompt)
[docs]
def translate(self, text: str, target_languages: Optional[List[str]] = None) -> Dict[str, str]:
"""
Translate text to multiple languages.
Args:
text: Text to translate
target_languages: List of target language codes (None for default languages)
Returns:
Dictionary mapping language codes to translated text
"""
if not text or not text.strip():
return {}
target_languages = target_languages or self.target_languages
result = {}
# Check cache for each language
uncached_languages = []
for lang in target_languages:
if lang == self.source_language:
result[lang] = text
continue
cache_key = f"{lang}:{text}"
with self._cache_lock:
if cache_key in self._translation_cache:
result[lang] = self._translation_cache[cache_key]
else:
uncached_languages.append(lang)
# Translate to languages not in cache
for lang in uncached_languages:
try:
start_time = time.time()
# Translate using appropriate backend
if self._backend == "ollama":
translation = self._translate_with_ollama(text, lang)
else:
raise ValueError(f"Unsupported backend: {self._backend}")
result[lang] = translation
# Update cache
with self._cache_lock:
cache_key = f"{lang}:{text}"
self._translation_cache[cache_key] = translation
self._cache_keys.append(cache_key)
# Trim cache if needed
if len(self._cache_keys) > self.cache_size:
old_key = self._cache_keys.pop(0)
if old_key in self._translation_cache:
del self._translation_cache[old_key]
elapsed = time.time() - start_time
logger.debug(f"Translated to {lang} in {elapsed:.2f}s")
# Throttle to avoid overloading the LLM service
if self.throttle_time > 0 and lang != uncached_languages[-1]:
time.sleep(self.throttle_time)
except Exception as e:
logger.error(f"Translation error for language {lang}: {e}")
result[lang] = f"Error: {str(e)}"
return result
[docs]
async def translate_async(self, text: str, target_languages: Optional[List[str]] = None) -> Dict[str, str]:
"""
Translate text to multiple languages asynchronously.
Args:
text: Text to translate
target_languages: List of target language codes (None for default languages)
Returns:
Dictionary mapping language codes to translated text
"""
if not text or not text.strip():
return {}
target_languages = target_languages or self.target_languages
result = {}
# Check cache for each language
uncached_languages = []
for lang in target_languages:
if lang == self.source_language:
result[lang] = text
continue
cache_key = f"{lang}:{text}"
with self._cache_lock:
if cache_key in self._translation_cache:
result[lang] = self._translation_cache[cache_key]
else:
uncached_languages.append(lang)
# Translate to languages not in cache
tasks = []
for lang in uncached_languages:
task = asyncio.create_task(self._translate_language_async(text, lang))
tasks.append((lang, task))
# Wait for all translations to complete
for lang, task in tasks:
try:
translation = await task
result[lang] = translation
# Update cache
with self._cache_lock:
cache_key = f"{lang}:{text}"
self._translation_cache[cache_key] = translation
self._cache_keys.append(cache_key)
# Trim cache if needed
if len(self._cache_keys) > self.cache_size:
old_key = self._cache_keys.pop(0)
if old_key in self._translation_cache:
del self._translation_cache[old_key]
except Exception as e:
logger.error(f"Async translation error for language {lang}: {e}")
result[lang] = f"Error: {str(e)}"
return result
async def _translate_language_async(self, text: str, lang: str) -> str:
"""
Translate text to a single language asynchronously.
Args:
text: Text to translate
lang: Target language code
Returns:
Translated text
"""
start_time = time.time()
try:
# Translate using appropriate backend
if self._backend == "ollama":
translation = await self._translate_with_ollama_async(text, lang)
else:
raise ValueError(f"Unsupported backend: {self._backend}")
elapsed = time.time() - start_time
logger.debug(f"Async translated to {lang} in {elapsed:.2f}s")
return translation
except Exception as e:
logger.error(f"Async translation error for language {lang}: {e}")
raise
[docs]
def start_batch_translation(self):
"""Start background thread for batch translation."""
if self._running:
return
self._running = True
# Clear queues
while not self._translation_queue.empty():
self._translation_queue.get()
for queue in self._result_queues.values():
while not queue.empty():
queue.get()
# Start processing thread
self._processing_thread = threading.Thread(target=self._process_translation_queue)
self._processing_thread.daemon = True
self._processing_thread.start()
logger.info("Started batch translation thread")
[docs]
def stop_batch_translation(self):
"""Stop background thread for batch translation."""
if not self._running:
return
self._running = False
# Wait for processing thread to finish
if self._processing_thread:
self._processing_thread.join(timeout=2.0)
self._processing_thread = None
logger.info("Stopped batch translation thread")
def _process_translation_queue(self):
"""Background thread for processing batch translations."""
try:
logger.info("Batch translation thread started")
while self._running:
# Collect batch of translations
batch = []
batch_ids = []
try:
# Get first item with timeout
item = self._translation_queue.get(timeout=0.5)
batch.append(item)
batch_ids.append(item["id"])
# Get more items without blocking
try:
while len(batch) < self.batch_size:
item = self._translation_queue.get_nowait()
batch.append(item)
batch_ids.append(item["id"])
except queue.Empty:
pass
except queue.Empty:
# No items in queue, continue waiting
continue
# Process batch
try:
for item in batch:
text = item["text"]
lang = item["language"]
request_id = item["id"]
result_queue = self._result_queues.get(request_id)
if not result_queue:
logger.warning(f"No result queue for request {request_id}")
continue
try:
# Translate
if self._backend == "ollama":
translation = self._translate_with_ollama(text, lang)
else:
translation = f"Error: Unsupported backend {self._backend}"
# Add to result queue
result_queue.put({
"language": lang,
"translation": translation,
"error": None
})
# Update cache
with self._cache_lock:
cache_key = f"{lang}:{text}"
self._translation_cache[cache_key] = translation
self._cache_keys.append(cache_key)
# Trim cache if needed
if len(self._cache_keys) > self.cache_size:
old_key = self._cache_keys.pop(0)
if old_key in self._translation_cache:
del self._translation_cache[old_key]
except Exception as e:
logger.error(f"Batch translation error: {e}")
# Add error to result queue
result_queue.put({
"language": lang,
"translation": None,
"error": str(e)
})
# Mark item as done
self._translation_queue.task_done()
# Throttle to avoid overloading the LLM service
if self.throttle_time > 0:
time.sleep(self.throttle_time)
except Exception as e:
logger.error(f"Batch processing error: {e}")
# Mark all items as done
for _ in range(len(batch)):
self._translation_queue.task_done()
except Exception as e:
logger.error(f"Batch translation thread error: {e}")
finally:
logger.info("Batch translation thread stopped")
[docs]
def batch_translate(self, text: str, target_languages: Optional[List[str]] = None) -> Dict[str, str]:
"""
Submit text for batch translation.
Args:
text: Text to translate
target_languages: List of target language codes (None for default languages)
Returns:
Dictionary mapping language codes to translated text
"""
if not self._running:
self.start_batch_translation()
target_languages = target_languages or self.target_languages
result = {}
# Check cache for each language
uncached_languages = []
for lang in target_languages:
if lang == self.source_language:
result[lang] = text
continue
cache_key = f"{lang}:{text}"
with self._cache_lock:
if cache_key in self._translation_cache:
result[lang] = self._translation_cache[cache_key]
else:
uncached_languages.append(lang)
# If all languages are cached, return immediately
if not uncached_languages:
return result
# Create a request ID and result queue
request_id = f"{time.time()}_{id(text)}"
result_queue = queue.Queue()
self._result_queues[request_id] = result_queue
# Submit translations to queue
for lang in uncached_languages:
self._translation_queue.put({
"id": request_id,
"text": text,
"language": lang
})
# Wait for results
timeout_per_language = self.timeout
total_timeout = timeout_per_language * len(uncached_languages)
end_time = time.time() + total_timeout
for lang in uncached_languages:
try:
# Calculate remaining timeout
remaining = max(0.1, end_time - time.time())
# Wait for result
item = result_queue.get(timeout=remaining)
if item["error"] is None:
result[item["language"]] = item["translation"]
else:
result[item["language"]] = f"Error: {item['error']}"
result_queue.task_done()
except queue.Empty:
logger.warning(f"Timeout waiting for translation to {lang}")
result[lang] = "Error: Translation timed out"
# Clean up
del self._result_queues[request_id]
return result
[docs]
def __del__(self):
"""Clean up resources when the object is garbage collected."""
self.stop_batch_translation()