import sys import os import subprocess import time import re import queue import threading from unittest.mock import MagicMock # Comprehensive workaround for missing _lzma in some Python builds try: import lzma except ImportError: mock_lzma = MagicMock() mock_lzma.FORMAT_XZ = 1 mock_lzma.FORMAT_ALONE = 2 mock_lzma.FORMAT_RAW = 3 mock_lzma.CHECK_NONE = 0 mock_lzma.CHECK_CRC32 = 1 mock_lzma.CHECK_CRC64 = 4 mock_lzma.CHECK_SHA256 = 10 sys.modules["_lzma"] = MagicMock() sys.modules["lzma"] = mock_lzma import numpy as np import sounddevice as sd import torch import mlx_whisper from silero_vad import load_silero_vad, get_speech_timestamps from gtts import gTTS import pygame import io import sys # --- Configuration --- TRIGGER_WORD = "Jarvis" WHISPER_MODEL = "mlx-community/whisper-small-mlx" SAMPLERATE = 16000 BLOCK_SIZE = 512 VAD_THRESHOLD = 0.5 SILENCE_DURATION_MS = 100 MAX_BUFFER_SECONDS = 15 CONTEXT_CHARS = 500 # How much previous text to keep for context WORKSPACE_DIR = "workspace" LOGS_DIR = os.path.join(WORKSPACE_DIR, "transcription") SOUL_PATH = "soul.md" SYSTEM_SOUND = "/System/Library/Sounds/Tink.aiff" FOLLOW_UP_SOUND = "/System/Library/Sounds/Submarine.aiff" # Ensure workspace and logs exist for d in [WORKSPACE_DIR, LOGS_DIR]: if not os.path.exists(d): os.makedirs(d) def log_transcription(text): """Log all transcriptions with timestamps to a daily file.""" if not text: return date_str = time.strftime("%Y-%m-%d") timestamp = time.strftime("%H:%M:%S") log_file = os.path.join(LOGS_DIR, f"{date_str}.log") try: with open(log_file, "a") as f: f.write(f"[{timestamp}] {text}\n") except Exception as e: print(f"Error logging transcription: {e}") # Global state audio_queue = queue.Queue() rolling_context = "" current_session_id = None def play_sound(sound_path=SYSTEM_SOUND): """Play a system sound asynchronously.""" subprocess.Popen(["afplay", sound_path]) def get_latest_session_id(): """Retrieve the UUID of the most recent Gemini session.""" try: result = subprocess.run(["gemini", "--list-sessions"], capture_output=True, text=True, cwd=WORKSPACE_DIR) matches = re.findall(r"\[([a-f0-9\-]+)\]", result.stdout) if matches: return matches[-1] except Exception as e: print(f"Error fetching session ID: {e}") return None def speak_text(text): """Speak text using the 'say' command.""" if not text or text.strip() == "": return clean_text = text.replace("*", "").replace("#", "").replace("`", "") print(f"[Jarvis] Speaking: {clean_text}") subprocess.run(["say", clean_text]) def run_gemini(command, context="", is_init=False): """Call the gemini CLI, capture output, and speak it.""" global current_session_id # Combine context and command if provided full_prompt = command if context: full_prompt = f"Recent Context: {context}\n\nUser Command: {command}" args = ["gemini", "--prompt", full_prompt, "--yolo"] if current_session_id: args.extend(["--resume", current_session_id]) env = os.environ.copy() if os.path.exists(SOUL_PATH): with open(SOUL_PATH, 'r') as f: soul_content = f.read() if is_init: current_time = time.strftime("%A, %B %d, %Y, %I:%M %p") soul_content = f"Temporal Context: The current date and time is {current_time}.\n\n" + soul_content print(f"\n[Jarvis] Initializing system protocol...") system_md_path = os.path.abspath(os.path.join(WORKSPACE_DIR, ".system_prompt.md")) with open(system_md_path, 'w') as f: f.write(soul_content) env["GEMINI_SYSTEM_MD"] = system_md_path try: process = subprocess.run(args, capture_output=True, text=True, cwd=WORKSPACE_DIR, env=env) response = process.stdout.strip() if response: print(f"\n[Gemini Response]:\n{response}") speak_text(response) if is_init and not current_session_id: time.sleep(1) current_session_id = get_latest_session_id() if current_session_id: print(f"[Jarvis] Session protocol established: {current_session_id}") except Exception as e: print(f"Error running gemini: {e}") def audio_callback(indata, frames, time, status): if status: print(status, file=sys.stderr) audio_queue.put(indata.copy()) def main(): global rolling_context print("[Jarvis] Loading models...") device = "mps" if torch.backends.mps.is_available() else "cpu" print(f"[Jarvis] Using device: {device}") vad_model = load_silero_vad() print("[Jarvis] Models loaded.") print("[Jarvis] Booting system protocols...") run_gemini("System initialization complete. Awaiting orders, Sir.", is_init=True) print(f"[Jarvis] Always-active mic enabled. Listening for '{TRIGGER_WORD}'...") audio_buffer = [] speech_started = False last_change_time = time.time() try: with sd.InputStream(samplerate=SAMPLERATE, channels=1, callback=audio_callback, blocksize=BLOCK_SIZE): while True: while not audio_queue.empty(): data = audio_queue.get() audio_buffer.append(data.flatten()) if len(audio_buffer) > 0: current_audio = np.concatenate(audio_buffer) audio_tensor = torch.from_numpy(current_audio) buffer_duration = len(current_audio) / SAMPLERATE speech_timestamps = get_speech_timestamps( audio_tensor, vad_model, sampling_rate=SAMPLERATE, threshold=VAD_THRESHOLD, min_silence_duration_ms=SILENCE_DURATION_MS ) # Watchdog to prevent buffer bloat if buffer_duration > MAX_BUFFER_SECONDS: print("[Jarvis] Buffer limit reached. Resetting...") audio_buffer = [] speech_started = False continue if len(speech_timestamps) > 0: speech_started = True last_end = speech_timestamps[-1]['end'] buffer_len_samples = len(current_audio) # Check if speech has ended (silence after last speech) if (buffer_len_samples - last_end) > (SAMPLERATE * SILENCE_DURATION_MS / 1000): # Transcribe print("[Jarvis] Transcribing...") result = mlx_whisper.transcribe(current_audio, path_or_hf_repo=WHISPER_MODEL) text = result['text'].strip() if text: print(f"[Caption]: {text}") log_transcription(text) # Detect Trigger Word trigger_match = re.search(rf"\b{TRIGGER_WORD}\b", text, re.IGNORECASE) if trigger_match: print(f"[Jarvis] Trigger word detected!") play_sound(SYSTEM_SOUND) # Extract command (text after trigger word) start_idx = trigger_match.end() command = text[start_idx:].strip() if not command: print("[Jarvis] No command following trigger word. Using full text.") command = text # Call Gemini with context run_gemini(command, context=rolling_context) # Update context with this exchange rolling_context = (rolling_context + " " + text)[-CONTEXT_CHARS:].strip() else: # Update context with current transcription rolling_context = (rolling_context + " " + text)[-CONTEXT_CHARS:].strip() # Reset buffer after processing audio_buffer = [] speech_started = False elif not speech_started and len(current_audio) > SAMPLERATE * 2: # Clear buffer if no speech detected for 2 seconds audio_buffer = [] except KeyboardInterrupt: print("\n[Jarvis] Shutting down...") except Exception as e: print(f"\n[Jarvis] Fatal Error: {e}") if __name__ == "__main__": main()