Implement server ingest with flat JSON, background worker, and exponential backoff

This commit is contained in:
Adolfo Reyna
2026-02-26 23:06:24 -05:00
parent a4e17ce896
commit 1403604ce0

View File

@@ -1,4 +1,8 @@
import sys
import time
import requests
import threading
import json
from unittest.mock import MagicMock
# Comprehensive workaround for missing _lzma in some Python builds
@@ -25,13 +29,15 @@ from silero_vad import load_silero_vad, get_speech_timestamps
from transformers import MarianMTModel, MarianTokenizer
# Parameters
# Using the multilingual small model
WHISPER_MODEL = "mlx-community/whisper-small-mlx"
INGEST_URL = "https://emiapi.reynafamily.com/live-captions/ingest"
# Translation models (English -> Target)
# Map to the specific keys requested by the backend
TARGET_LANGS = {
"Spanish": "Helsinki-NLP/opus-mt-en-es",
"French": "Helsinki-NLP/opus-mt-en-fr",
"Arabic": "Helsinki-NLP/opus-mt-en-ar"
"es": "Helsinki-NLP/opus-mt-en-es",
"fr": "Helsinki-NLP/opus-mt-en-fr",
"ar": "Helsinki-NLP/opus-mt-en-ar" # Added Arabic as discussed before
}
CHANNELS = 1
@@ -42,25 +48,55 @@ BUFFER_LIMIT = SAMPLERATE * 30
MIN_SILENCE_DURATION_MS = 500
audio_queue = queue.Queue()
ingest_queue = queue.Queue()
def callback(indata, frames, time, status):
if status:
print(status, file=sys.stderr)
audio_queue.put(indata.copy())
def ingest_worker():
"""Background thread to handle server ingestion with retries."""
while True:
payload = ingest_queue.get()
if payload is None: break
delay = 1
max_delay = 15
success = False
while not success:
try:
response = requests.post(INGEST_URL, json=payload, timeout=5)
if response.status_code == 200:
success = True
else:
print(f"\n[Ingest Error] Server returned {response.status_code}. Retrying in {delay}s...")
except Exception as e:
print(f"\n[Ingest Error] {e}. Retrying in {delay}s...")
if not success:
time.sleep(delay)
delay = min(delay * 2, max_delay)
ingest_queue.task_done()
def main():
device = "mps" if torch.backends.mps.is_available() else "cpu"
print(f"Using device: {device}")
# Start ingest thread
threading.Thread(target=ingest_worker, daemon=True).start()
# 1. Load models
print(f"Loading Multilingual Whisper model '{WHISPER_MODEL}'...")
translation_engines = {}
for lang_name, model_id in TARGET_LANGS.items():
print(f"Loading {lang_name} translation model ({model_id})...")
for lang_key, model_id in TARGET_LANGS.items():
print(f"Loading {lang_key} translation model ({model_id})...")
tokenizer = MarianTokenizer.from_pretrained(model_id)
model = MarianMTModel.from_pretrained(model_id).to(device)
translation_engines[lang_name] = (model, tokenizer)
translation_engines[lang_key] = (model, tokenizer)
print("Loading Silero VAD model...")
vad_model = load_silero_vad()
@@ -77,7 +113,7 @@ def main():
print("Invalid input, using default device.")
device_index = None
print(f"\nStarting Multilingual live transcription... (Press Ctrl+C to stop)")
print(f"\nStarting live transcription & server ingest... (Press Ctrl+C to stop)")
audio_buffer = []
speech_started = False
@@ -109,34 +145,45 @@ def main():
if (buffer_len_samples - last_end) > (SAMPLERATE * MIN_SILENCE_DURATION_MS / 1000) or buffer_len_samples > BUFFER_LIMIT:
# 1. Transcribe & Detect Language
# We use task="transcribe" to get the original text and detect language
transcription_result = mlx_whisper.transcribe(current_audio, path_or_hf_repo=WHISPER_MODEL)
original_text = transcription_result['text'].strip()
detected_lang = transcription_result.get('language', 'unknown')
detected_lang = transcription_result.get('language', 'en')
if original_text:
print(f"\n[{detected_lang.upper()} detected]: {original_text}")
print(f"\n[{detected_lang.upper()}]: {original_text}")
# Prepare payload
payload = {"original": original_text}
# Rule 3: include source language key
if detected_lang in TARGET_LANGS or detected_lang == "en":
payload[detected_lang] = original_text
# 2. Bridge to English if not already English
if detected_lang != "en":
# Use Whisper to translate the segment to English
bridge_result = mlx_whisper.transcribe(current_audio, path_or_hf_repo=WHISPER_MODEL, task="translate")
english_text = bridge_result['text'].strip()
print(f"[EN Bridge]: {english_text}")
payload["en"] = english_text
else:
english_text = original_text
# 3. Translate from English to other languages
if english_text:
if len(english_text) > 250:
english_text = english_text[:247] + "..."
# Limit input length
clean_en = english_text[:247] + "..." if len(english_text) > 250 else english_text
for lang_name, (model, tokenizer) in translation_engines.items():
inputs = tokenizer(english_text, return_tensors="pt", padding=True).to(device)
for lang_key, (model, tokenizer) in translation_engines.items():
# Skip if we already filled this (e.g. detected lang was 'es')
if lang_key in payload: continue
inputs = tokenizer(clean_en, return_tensors="pt", padding=True).to(device)
with torch.no_grad():
translated_tokens = model.generate(**inputs, max_new_tokens=150)
translated_text = tokenizer.decode(translated_tokens[0], skip_special_tokens=True)
print(f"[{lang_name[:2].upper()}]: {translated_text}")
payload[lang_key] = translated_text
# Queue for background ingestion
ingest_queue.put(payload)
print(f"Sent to ingest: {list(payload.keys())}")
audio_buffer = []
speech_started = False