Files
whisper-translation/transcribe.py
T

203 lines
8.0 KiB
Python

import sys
import time
import requests
import threading
import json
from unittest.mock import MagicMock
# Comprehensive workaround for missing _lzma in some Python builds
try:
import lzma
except ImportError:
mock_lzma = MagicMock()
mock_lzma.FORMAT_XZ = 1
mock_lzma.FORMAT_ALONE = 2
mock_lzma.FORMAT_RAW = 3
mock_lzma.CHECK_NONE = 0
mock_lzma.CHECK_CRC32 = 1
mock_lzma.CHECK_CRC64 = 4
mock_lzma.CHECK_SHA256 = 10
sys.modules["_lzma"] = MagicMock()
sys.modules["lzma"] = mock_lzma
import mlx_whisper
import numpy as np
import sounddevice as sd
import queue
import torch
from silero_vad import load_silero_vad, get_speech_timestamps
from transformers import MarianMTModel, MarianTokenizer
# Parameters
WHISPER_MODEL = "mlx-community/whisper-small-mlx"
INGEST_URL = "https://emiapi.reynafamily.com/live-captions/ingest"
# Translation models (English -> Target)
# Map to the specific keys requested by the backend
TARGET_LANGS = {
"es": "Helsinki-NLP/opus-mt-en-es",
"fr": "Helsinki-NLP/opus-mt-en-fr",
"ar": "Helsinki-NLP/opus-mt-en-ar" # Added Arabic as discussed before
}
CHANNELS = 1
SAMPLERATE = 16000
BLOCK_SIZE = 512
VAD_THRESHOLD = 0.5
BUFFER_LIMIT = SAMPLERATE * 30
MIN_SILENCE_DURATION_MS = 500
audio_queue = queue.Queue()
ingest_queue = queue.Queue()
def callback(indata, frames, time, status):
if status:
print(status, file=sys.stderr)
audio_queue.put(indata.copy())
def ingest_worker():
"""Background thread to handle server ingestion with retries."""
while True:
payload = ingest_queue.get()
if payload is None: break
delay = 1
max_delay = 15
success = False
while not success:
try:
response = requests.post(INGEST_URL, json=payload, timeout=5)
if response.status_code == 200:
success = True
else:
print(f"\n[Ingest Error] Server returned {response.status_code}. Retrying in {delay}s...")
except Exception as e:
print(f"\n[Ingest Error] {e}. Retrying in {delay}s...")
if not success:
time.sleep(delay)
delay = min(delay * 2, max_delay)
ingest_queue.task_done()
def main():
device = "mps" if torch.backends.mps.is_available() else "cpu"
print(f"Using device: {device}")
# Start ingest thread
threading.Thread(target=ingest_worker, daemon=True).start()
# 1. Load models
print(f"Loading Multilingual Whisper model '{WHISPER_MODEL}'...")
translation_engines = {}
for lang_key, model_id in TARGET_LANGS.items():
print(f"Loading {lang_key} translation model ({model_id})...")
tokenizer = MarianTokenizer.from_pretrained(model_id)
model = MarianMTModel.from_pretrained(model_id).to(device)
translation_engines[lang_key] = (model, tokenizer)
print("Loading Silero VAD model...")
vad_model = load_silero_vad()
print("Models loaded.")
# 2. Select Audio Device
print("\nAvailable Audio Devices:")
print(sd.query_devices())
try:
device_input = input("\nSelect input device index (or press Enter for default): ")
device_index = int(device_input) if device_input.strip() else None
except ValueError:
print("Invalid input, using default device.")
device_index = None
print(f"\nStarting live transcription & server ingest... (Press Ctrl+C to stop)")
audio_buffer = []
speech_started = False
try:
with sd.InputStream(samplerate=SAMPLERATE, channels=CHANNELS, callback=callback, blocksize=BLOCK_SIZE, device=device_index):
while True:
while not audio_queue.empty():
data = audio_queue.get()
audio_buffer.append(data.flatten())
if len(audio_buffer) > 0:
current_audio = np.concatenate(audio_buffer)
audio_tensor = torch.from_numpy(current_audio)
speech_timestamps = get_speech_timestamps(
audio_tensor,
vad_model,
sampling_rate=SAMPLERATE,
threshold=VAD_THRESHOLD,
min_silence_duration_ms=MIN_SILENCE_DURATION_MS
)
if len(speech_timestamps) > 0:
speech_started = True
last_end = speech_timestamps[-1]['end']
buffer_len_samples = len(current_audio)
if (buffer_len_samples - last_end) > (SAMPLERATE * MIN_SILENCE_DURATION_MS / 1000) or buffer_len_samples > BUFFER_LIMIT:
# 1. Transcribe & Detect Language
transcription_result = mlx_whisper.transcribe(current_audio, path_or_hf_repo=WHISPER_MODEL)
original_text = transcription_result['text'].strip()
detected_lang = transcription_result.get('language', 'en')
if original_text:
print(f"\n[{detected_lang.upper()}]: {original_text}")
# Prepare payload
payload = {"original": original_text}
# Rule 3: include source language key
if detected_lang in TARGET_LANGS or detected_lang == "en":
payload[detected_lang] = original_text
# 2. Bridge to English if not already English
if detected_lang != "en":
bridge_result = mlx_whisper.transcribe(current_audio, path_or_hf_repo=WHISPER_MODEL, task="translate")
english_text = bridge_result['text'].strip()
payload["en"] = english_text
else:
english_text = original_text
# 3. Translate from English to other languages
if english_text:
# Limit input length
clean_en = english_text[:247] + "..." if len(english_text) > 250 else english_text
for lang_key, (model, tokenizer) in translation_engines.items():
# Skip if we already filled this (e.g. detected lang was 'es')
if lang_key in payload: continue
inputs = tokenizer(clean_en, return_tensors="pt", padding=True).to(device)
with torch.no_grad():
translated_tokens = model.generate(**inputs, max_new_tokens=150)
translated_text = tokenizer.decode(translated_tokens[0], skip_special_tokens=True)
payload[lang_key] = translated_text
# Queue for background ingestion
ingest_queue.put(payload)
print(f"Sent to ingest: {list(payload.keys())}")
audio_buffer = []
speech_started = False
elif not speech_started and len(current_audio) > SAMPLERATE * 2:
audio_buffer = []
except KeyboardInterrupt:
print("\nStopped by user.")
except Exception as e:
print(f"\nError: {e}")
if __name__ == "__main__":
import multiprocessing
multiprocessing.freeze_support()
main()