Files
jarvis/jarvis.py
2026-03-03 12:40:16 -05:00

251 lines
9.2 KiB
Python

import sys
import os
import subprocess
import time
import re
import queue
import threading
from unittest.mock import MagicMock
# Comprehensive workaround for missing _lzma in some Python builds
try:
import lzma
except ImportError:
mock_lzma = MagicMock()
mock_lzma.FORMAT_XZ = 1
mock_lzma.FORMAT_ALONE = 2
mock_lzma.FORMAT_RAW = 3
mock_lzma.CHECK_NONE = 0
mock_lzma.CHECK_CRC32 = 1
mock_lzma.CHECK_CRC64 = 4
mock_lzma.CHECK_SHA256 = 10
sys.modules["_lzma"] = MagicMock()
sys.modules["lzma"] = mock_lzma
import numpy as np
import sounddevice as sd
import torch
import mlx_whisper
from silero_vad import load_silero_vad, get_speech_timestamps
from gtts import gTTS
import pygame
import io
import sys
# --- Configuration ---
TRIGGER_WORD = "Jarvis"
WHISPER_MODEL = "mlx-community/whisper-small-mlx"
SAMPLERATE = 16000
BLOCK_SIZE = 512
VAD_THRESHOLD = 0.5
SILENCE_DURATION_MS = 100
MAX_BUFFER_SECONDS = 15
CONTEXT_CHARS = 500 # How much previous text to keep for context
WORKSPACE_DIR = "workspace"
LOGS_DIR = os.path.join(WORKSPACE_DIR, "transcription")
SOUL_PATH = "soul.md"
SYSTEM_SOUND = "/System/Library/Sounds/Tink.aiff"
FOLLOW_UP_SOUND = "/System/Library/Sounds/Submarine.aiff"
# Ensure workspace and logs exist
for d in [WORKSPACE_DIR, LOGS_DIR]:
if not os.path.exists(d):
os.makedirs(d)
def log_transcription(text):
"""Log all transcriptions with timestamps to a daily file."""
if not text:
return
date_str = time.strftime("%Y-%m-%d")
timestamp = time.strftime("%H:%M:%S")
log_file = os.path.join(LOGS_DIR, f"{date_str}.log")
try:
with open(log_file, "a") as f:
f.write(f"[{timestamp}] {text}\n")
except Exception as e:
print(f"Error logging transcription: {e}")
# Global state
audio_queue = queue.Queue()
rolling_context = ""
current_session_id = None
def play_sound(sound_path=SYSTEM_SOUND):
"""Play a system sound asynchronously."""
subprocess.Popen(["afplay", sound_path])
def get_latest_session_id():
"""Retrieve the UUID of the most recent Gemini session."""
try:
result = subprocess.run(["gemini", "--list-sessions"], capture_output=True, text=True, cwd=WORKSPACE_DIR)
matches = re.findall(r"\[([a-f0-9\-]+)\]", result.stdout)
if matches:
return matches[-1]
except Exception as e:
print(f"Error fetching session ID: {e}")
return None
def speak_text(text):
"""Speak text using the 'say' command."""
if not text or text.strip() == "":
return
clean_text = text.replace("*", "").replace("#", "").replace("`", "")
print(f"[Jarvis] Speaking: {clean_text}")
subprocess.run(["say", clean_text])
def run_gemini(command, context="", is_init=False):
"""Call the gemini CLI, capture output, and speak it."""
global current_session_id
# Combine context and command if provided
full_prompt = command
if context:
full_prompt = f"Recent Context: {context}\n\nUser Command: {command}"
args = ["gemini", "--prompt", full_prompt, "--yolo"]
if current_session_id:
args.extend(["--resume", current_session_id])
env = os.environ.copy()
if os.path.exists(SOUL_PATH):
with open(SOUL_PATH, 'r') as f:
soul_content = f.read()
if is_init:
current_time = time.strftime("%A, %B %d, %Y, %I:%M %p")
soul_content = f"Temporal Context: The current date and time is {current_time}.\n\n" + soul_content
print(f"\n[Jarvis] Initializing system protocol...")
system_md_path = os.path.abspath(os.path.join(WORKSPACE_DIR, ".system_prompt.md"))
with open(system_md_path, 'w') as f:
f.write(soul_content)
env["GEMINI_SYSTEM_MD"] = system_md_path
try:
process = subprocess.run(args, capture_output=True, text=True, cwd=WORKSPACE_DIR, env=env)
response = process.stdout.strip()
if response:
print(f"\n[Gemini Response]:\n{response}")
speak_text(response)
if is_init and not current_session_id:
time.sleep(1)
current_session_id = get_latest_session_id()
if current_session_id:
print(f"[Jarvis] Session protocol established: {current_session_id}")
except Exception as e:
print(f"Error running gemini: {e}")
def audio_callback(indata, frames, time, status):
if status:
print(status, file=sys.stderr)
audio_queue.put(indata.copy())
def main():
global rolling_context
print("[Jarvis] Loading models...")
device = "mps" if torch.backends.mps.is_available() else "cpu"
print(f"[Jarvis] Using device: {device}")
vad_model = load_silero_vad()
print("[Jarvis] Models loaded.")
print("[Jarvis] Booting system protocols...")
run_gemini("System initialization complete. Awaiting orders, Sir.", is_init=True)
print(f"[Jarvis] Always-active mic enabled. Listening for '{TRIGGER_WORD}'...")
audio_buffer = []
speech_started = False
last_change_time = time.time()
try:
with sd.InputStream(samplerate=SAMPLERATE, channels=1, callback=audio_callback, blocksize=BLOCK_SIZE):
while True:
while not audio_queue.empty():
data = audio_queue.get()
audio_buffer.append(data.flatten())
if len(audio_buffer) > 0:
current_audio = np.concatenate(audio_buffer)
audio_tensor = torch.from_numpy(current_audio)
buffer_duration = len(current_audio) / SAMPLERATE
speech_timestamps = get_speech_timestamps(
audio_tensor,
vad_model,
sampling_rate=SAMPLERATE,
threshold=VAD_THRESHOLD,
min_silence_duration_ms=SILENCE_DURATION_MS
)
# Watchdog to prevent buffer bloat
if buffer_duration > MAX_BUFFER_SECONDS:
print("[Jarvis] Buffer limit reached. Resetting...")
audio_buffer = []
speech_started = False
continue
if len(speech_timestamps) > 0:
speech_started = True
last_end = speech_timestamps[-1]['end']
buffer_len_samples = len(current_audio)
# Check if speech has ended (silence after last speech)
if (buffer_len_samples - last_end) > (SAMPLERATE * SILENCE_DURATION_MS / 1000):
# Transcribe
print("[Jarvis] Transcribing...")
result = mlx_whisper.transcribe(current_audio, path_or_hf_repo=WHISPER_MODEL)
text = result['text'].strip()
if text:
print(f"[Caption]: {text}")
log_transcription(text)
# Detect Trigger Word
trigger_match = re.search(rf"\b{TRIGGER_WORD}\b", text, re.IGNORECASE)
if trigger_match:
print(f"[Jarvis] Trigger word detected!")
play_sound(SYSTEM_SOUND)
# Extract command (text after trigger word)
start_idx = trigger_match.end()
command = text[start_idx:].strip()
if not command:
print("[Jarvis] No command following trigger word. Using full text.")
command = text
# Call Gemini with context
run_gemini(command, context=rolling_context)
# Update context with this exchange
rolling_context = (rolling_context + " " + text)[-CONTEXT_CHARS:].strip()
else:
# Update context with current transcription
rolling_context = (rolling_context + " " + text)[-CONTEXT_CHARS:].strip()
# Reset buffer after processing
audio_buffer = []
speech_started = False
elif not speech_started and len(current_audio) > SAMPLERATE * 2:
# Clear buffer if no speech detected for 2 seconds
audio_buffer = []
except KeyboardInterrupt:
print("\n[Jarvis] Shutting down...")
except Exception as e:
print(f"\n[Jarvis] Fatal Error: {e}")
if __name__ == "__main__":
main()