diff --git a/codec_voice.py b/codec_voice.py index 16bac64..4ec7dc2 100644 --- a/codec_voice.py +++ b/codec_voice.py @@ -119,10 +119,13 @@ def _resolve_voice_option_choice( low = raw.lower() # Strip simple punctuation for matching. low = re.sub(r"[^a-z0-9\s]", " ", low).strip() - # 1. Exact substring of any option label (case-insensitive). - for opt in options: - if str(opt).lower() in low: - return opt + # 1. Exact substring of any option label (case-insensitive). Prefer the + # LONGEST matching label so "yes and notify" wins over "yes" when both + # are options and both appear in the transcript (L2 / SR-62: was + # first-match, which mis-routed a non-strict multi-option answer). + substring_matches = [opt for opt in options if str(opt).lower() in low] + if substring_matches: + return max(substring_matches, key=lambda o: len(str(o))) # 2. Synonym map: if any synonym word appears, match the option label # whose lowercase contains the synonym key. low_words = set(low.split()) @@ -254,6 +257,11 @@ def _clear_voice_session_marker() -> None: SAMPLE_RATE = 16000 BYTES_PER_SAMPLE = 2 MIN_SPEECH_BYTES = int(SAMPLE_RATE * BYTES_PER_SAMPLE * VAD_MIN_SPEECH_SECONDS) +# L2 / SR-62: hard upper bound on a single utterance. Continuous mic noise above +# the VAD threshold keeps last_speech_time fresh, so the silence gate would never +# fire and audio_buffer would grow unbounded (~32 KB/s). Force-flush at this cap. +VAD_MAX_UTTERANCE_SECONDS = _vad_cfg.get("max_utterance_seconds", 30) +MAX_UTTERANCE_BYTES = int(SAMPLE_RATE * BYTES_PER_SAMPLE * VAD_MAX_UTTERANCE_SECONDS) # RMS threshold for interrupt detection (slightly lower than VAD to catch early speech) INTERRUPT_THRESHOLD = 1500 # raised from 600 — too sensitive to background noise @@ -383,6 +391,10 @@ def __init__(self, websocket, resume_session_id: str | None = None): self.utterance_queue = asyncio.Queue(maxsize=3) # completed utterances ready to process self.interrupted = asyncio.Event() # set when user speaks mid-response self.processing = False # True while generating/speaking a response + # L2 / SR-62: flipped False the moment the client disconnects, so sends + # in _speak / the crew callback no-op instead of raising against a dead + # socket (which previously aborted a detached crew dispatch mid-run). + self._ws_alive = True self.skills = {} self._http = httpx.AsyncClient(timeout=120.0) @@ -482,19 +494,31 @@ def feed_audio(self, chunk: bytes) -> Optional[bytes]: rms = self._rms(chunk) now = time.monotonic() - # Echo cooldown: ignore mic after Q speaks - if now - self.last_tts_end < VAD_ECHO_COOLDOWN: + # Echo cooldown: ignore mic after Q speaks — but ONLY suppress a NEW + # speech start. If the user is already mid-utterance (is_speaking), keep + # capturing so a barge-in's leading words aren't silently dropped + # (L2 / SR-62: was an unconditional return → truncated transcripts like + # "...end my email" instead of "send my email"). + if (now - self.last_tts_end < VAD_ECHO_COOLDOWN) and not self.is_speaking: return None if rms > VAD_SILENCE_THRESHOLD: self.is_speaking = True self.last_speech_time = now self.audio_buffer.extend(chunk) + # L2 / SR-62: force-flush a runaway utterance so continuous noise + # can't grow the buffer without bound. + if len(self.audio_buffer) >= MAX_UTTERANCE_BYTES: + utterance = bytes(self.audio_buffer) + self.audio_buffer = bytearray() + self.is_speaking = False + return utterance return None if self.is_speaking: self.audio_buffer.extend(chunk) - if now - self.last_speech_time > VAD_SILENCE_DURATION: + if (now - self.last_speech_time > VAD_SILENCE_DURATION + or len(self.audio_buffer) >= MAX_UTTERANCE_BYTES): self.is_speaking = False if len(self.audio_buffer) >= MIN_SPEECH_BYTES: utterance = bytes(self.audio_buffer) @@ -568,6 +592,9 @@ async def _stream_qwen(self, messages: list, max_tokens: int = 2000): # queue (CRITICAL) stays here; the per-token strip stays here too # (Qwen 3.6 may put thinking in content — strip it before yielding). import codec_llm + # L2 / SR-62: reset per-stream error flag. The consumer checks it to + # avoid persisting the spoken error sentinel as a real assistant turn. + self._stream_error = False await llm_queue.acquire(Priority.CRITICAL) try: async for token in codec_llm.astream( @@ -581,6 +608,7 @@ async def _stream_qwen(self, messages: list, max_tokens: int = 2000): yield token except Exception as e: log.error(f"Qwen error: {e}") + self._stream_error = True yield "Sorry, I had a processing error." finally: await llm_queue.release(Priority.CRITICAL) @@ -676,7 +704,12 @@ async def generate_response(self, user_text: str): async for chunk in self._stream_qwen(self._trimmed_messages()): full += chunk yield chunk - self.messages.append({"role": "assistant", "content": full}) + # L2 / SR-62: don't persist the spoken error sentinel as a real + # assistant turn — it would pollute conversation context + memory on + # the next turn (the LLM would "see" a fake apology it never reasoned). + # The user still HEARD it (it was yielded above). + if not getattr(self, "_stream_error", False): + self.messages.append({"role": "assistant", "content": full}) # ── TTS ─────────────────────────────────────────────────────────────── @@ -716,6 +749,32 @@ def _flush_on_boundary(self, buf: str) -> tuple[str, str]: # ── TTS with interruption check ─────────────────────────────────────── + async def _safe_send_bytes(self, data: bytes) -> bool: + """Send bytes only if the socket is still alive; swallow + flag on a + closed-socket error. Returns True if sent. (L2 / SR-62.)""" + if not self._ws_alive: + return False + try: + await self.ws.send_bytes(data) + return True + except Exception as e: + self._ws_alive = False + log.info(f"[Voice] send_bytes on dead socket — stopping: {e}") + return False + + async def _safe_send_json(self, obj: dict) -> bool: + """Send JSON only if the socket is still alive; swallow + flag on a + closed-socket error. Returns True if sent. (L2 / SR-62.)""" + if not self._ws_alive: + return False + try: + await self.ws.send_json(obj) + return True + except Exception as e: + self._ws_alive = False + log.info(f"[Voice] send_json on dead socket — stopping: {e}") + return False + async def _speak(self, text: str) -> bool: """ Synthesize and send one chunk of speech. @@ -727,7 +786,10 @@ async def _speak(self, text: str) -> bool: if self.interrupted.is_set(): return False if audio: - await self.ws.send_bytes(audio) + # L2 / SR-62: guard the send so a client disconnect mid-TTS doesn't + # raise out of every caller (_pipeline, crew dispatch, greeting). + if not await self._safe_send_bytes(audio): + return False self.last_tts_end = self._tts_playback_end_time(audio) return True @@ -775,12 +837,17 @@ async def _poll_pending_question_for_voice(self) -> Optional[dict]: # voice operation. if (rec.get("correlation_id") == self._cid or asked_from in ("crew", "voice")): - self._announced_question_ids.add(rec["id"]) + # L2 / SR-62: do NOT mark announced here — the caller marks it + # only AFTER a successful announce, so a failed/closed-socket + # announce retries next poll instead of silently swallowing the + # question (which left the user answering something unheard). return rec return None - async def _announce_pending_question(self, rec: dict) -> None: - """TTS-announce the question. If options present, list them.""" + async def _announce_pending_question(self, rec: dict) -> bool: + """TTS-announce the question. Returns True only if it was actually + spoken — the caller arms the answer slot + marks it announced on success + only (L2 / SR-62).""" try: agent = rec.get("agent") or "CODEC" question = rec.get("question") or "" @@ -796,9 +863,10 @@ async def _announce_pending_question(self, rec: dict) -> None: f" This is a destructive action — please say the word " f"'{verb}' clearly to confirm, or say 'cancel' to abort." ) - await self._speak(announcement) + return await self._speak(announcement) except Exception as e: log.error(f"ask_user announce failed: {e}") + return False async def _handle_voice_ask_user_answer(self, qid: str, user_text: str) -> None: """Route the user's spoken transcript to /api/agents/answer/{qid} @@ -951,10 +1019,12 @@ async def dispatch_crew_from_voice(self, user_text: str) -> Optional[str]: label = crew_name.replace("_", " ") notify = f"Starting {label}. This will take a few minutes. I'll keep you posted." - await self.ws.send_json({"type": "transcript", "role": "assistant", "text": notify}) + # L2 / SR-62: guarded sends — a disconnect during a multi-minute + # crew must not raise out of the callback (the crew runs detached + # in the background; an unguarded send would surface as an error). + await self._safe_send_json({"type": "transcript", "role": "assistant", "text": notify}) audio = await self.synthesize(notify) - if audio: - await self.ws.send_bytes(audio) + if audio and await self._safe_send_bytes(audio): self.last_tts_end = self._tts_playback_end_time(audio) async def voice_cb(update): @@ -967,10 +1037,10 @@ async def voice_cb(update): msg = f"{agent} is starting, step {update.get('task_num','')} of {update.get('total','')}." else: return - await self.ws.send_json({"type": "transcript", "role": "assistant", "text": msg}) + if not await self._safe_send_json({"type": "transcript", "role": "assistant", "text": msg}): + return a = await self.synthesize(msg) - if a: - await self.ws.send_bytes(a) + if a and await self._safe_send_bytes(a): self.last_tts_end = self._tts_playback_end_time(a) _dash = os.path.dirname(os.path.abspath(__file__)) @@ -998,6 +1068,13 @@ async def voice_cb(update): # ── Memory ──────────────────────────────────────────────────────────── def save_to_memory(self): + # L2 / SR-62: idempotent. BOTH VoicePipeline.run()'s finally AND the + # websocket route's finally (routes/websocket.py) call this — without a + # guard every voice turn was written to memory.db twice per session. + # First successful full save wins; a mid-loop failure leaves the flag + # unset so a later call can retry. + if getattr(self, "_memory_saved", False): + return try: _dash = os.path.dirname(os.path.abspath(__file__)) if _dash not in sys.path: @@ -1016,9 +1093,54 @@ def save_to_memory(self): continue mem.save(self.session_id, role, str(content)[:2000]) saved += 1 + self._memory_saved = True log.info(f"Saved {saved} messages → {self.session_id}") except Exception as e: log.error(f"Memory save error: {e}") + + # ── Concurrency helpers (L2 / SR-62) ─────────────────────────────────── + + def _enqueue_utterance(self, utterance: bytes) -> None: + """Non-blocking enqueue so the receiver loop never parks on a full + queue. `await queue.put()` on the maxsize=3 utterance_queue would block + the receiver while the pipeline is slow — and a blocked receiver stops + reading interrupt / ping control frames (head-of-line block). On + overflow we drop the OLDEST queued utterance to make room for the + newest (most recent speech is the most relevant).""" + try: + self.utterance_queue.put_nowait(utterance) + except asyncio.QueueFull: + try: + self.utterance_queue.get_nowait() + log.warning("[Voice] utterance queue full — dropped oldest to enqueue newest") + self.utterance_queue.put_nowait(utterance) + except (asyncio.QueueEmpty, asyncio.QueueFull): + pass + + @staticmethod + def _log_task_exception(task: "asyncio.Task") -> None: + """done_callback for fire-and-forget tasks: retrieve + log any exception + so it doesn't become an un-retrieved-future warning (and isn't silently + swallowed).""" + try: + exc = task.exception() + except asyncio.CancelledError: + return + if exc is not None: + log.warning(f"[Voice] background task failed: {type(exc).__name__}: {exc}") + + @staticmethod + def _spawn_detached(argv: list) -> None: + """Fire-and-forget a cosmetic side-effect subprocess (camera-shutter + sound, screen overlay). Popen returns immediately, so no event-loop + offload is needed — this replaces the orphaned run_in_executor futures + (+ deprecated get_event_loop()) the overlay used to create. A missing + afplay / tkinter is non-fatal. (L2 / SR-62.)""" + try: + subprocess.Popen(argv, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + except Exception as e: + log.debug(f"[Voice] detached spawn failed (non-fatal): {e}") + # ── Audio receiver task ──────────────────────────────────────────────── async def _audio_receiver(self): @@ -1035,6 +1157,9 @@ async def _audio_receiver(self): if msg_type == "websocket.disconnect": log.info("WebSocket disconnected in receiver") + # L2 / SR-62: flag the socket dead so any in-flight _speak / + # crew-callback send stops instead of raising on the close. + self._ws_alive = False await self.utterance_queue.put(None) # signal pipeline to stop # If unexpected, save state for possible resume if self._disconnect_reason != "user": @@ -1060,7 +1185,7 @@ async def _audio_receiver(self): self.audio_buffer = bytearray() self.is_speaking = False log.info(f"Your-turn: flushing {len(utterance)} bytes") - await self.utterance_queue.put(utterance) + self._enqueue_utterance(utterance) elif self.audio_buffer: # Buffer too short — discard and notify self.audio_buffer = bytearray() @@ -1107,16 +1232,20 @@ async def _audio_receiver(self): # Feed VAD so utterance is buffered (queued once processing ends) utterance = self.feed_audio(raw_bytes) if utterance: - await self.utterance_queue.put(utterance) + self._enqueue_utterance(utterance) continue # Normal VAD feeding — trigger warmup on speech start was_speaking = self.is_speaking utterance = self.feed_audio(raw_bytes) if self.is_speaking and not was_speaking and not self._warmed_up: - asyncio.create_task(self.warmup_llm()) + # L2 / SR-62: keep a ref (so it isn't GC'd mid-flight) + + # attach an exception logger (so a warmup failure isn't an + # un-retrieved-future warning). Cancelled in run()'s finally. + self._warmup_task = asyncio.create_task(self.warmup_llm()) + self._warmup_task.add_done_callback(self._log_task_exception) if utterance: - await self.utterance_queue.put(utterance) + self._enqueue_utterance(utterance) except Exception as e: log.error(f"Receiver error: {type(e).__name__}: {e}") @@ -1184,9 +1313,13 @@ async def _pipeline(self): # via TTS and switch into single-question listen mode for # the NEXT utterance (don't process this one as a command). _q = await self._poll_pending_question_for_voice() - if _q is not None: + if _q is not None and await self._announce_pending_question(_q): + # L2 / SR-62: arm the answer slot + mark announced ONLY after + # the user actually heard it. If the announce failed (TTS + # down / socket closed) we fall through to process this + # utterance normally and re-poll the question next loop. self._awaiting_ask_user = _q.get("id") - await self._announce_pending_question(_q) + self._announced_question_ids.add(_q.get("id")) self.processing = False await self.ws.send_json({"type": "status", "status": "listening"}) continue @@ -1196,10 +1329,8 @@ async def _pipeline(self): log.info("Screen analysis requested") await self.ws.send_json({"type": "status", "status": "analyzing_screen"}) # Camera shutter sound + overlay for visual feedback - asyncio.get_event_loop().run_in_executor(None, lambda: subprocess.Popen( - ["afplay", "/System/Library/Sounds/Tink.aiff"], - stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)) - asyncio.get_event_loop().run_in_executor(None, lambda: subprocess.Popen( + self._spawn_detached(["afplay", "/System/Library/Sounds/Tink.aiff"]) + self._spawn_detached( [sys.executable, "-c", "import tkinter as tk;r=tk.Tk();r.overrideredirect(1);r.attributes('-topmost',1);" "r.attributes('-alpha',0.95);r.configure(bg='#0a0a0a');" @@ -1208,8 +1339,7 @@ async def _pipeline(self): "c=tk.Canvas(r,bg='#0a0a0a',highlightthickness=0,width=w,height=h);c.pack();" "c.create_rectangle(1,1,w-1,h-1,outline='#00aaff',width=1);" "c.create_text(w//2,h//2,text='\U0001f4f7 Analyzing your screen...',fill='#00aaff',font=('Helvetica',13));" - "r.after(8000,r.destroy);r.mainloop()"], - stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)) + "r.after(8000,r.destroy);r.mainloop()"]) screenshot_b64 = await self._take_screenshot() log.info(f"Screenshot taken: {'OK' if screenshot_b64 else 'FAILED'}") if screenshot_b64: @@ -1401,6 +1531,11 @@ async def run(self): finally: receiver.cancel() pipeline.cancel() + # L2 / SR-62: cancel the fire-and-forget warmup task if still + # pending (was orphaned before — no ref, no cancel). + _wt = getattr(self, "_warmup_task", None) + if _wt is not None and not _wt.done(): + _wt.cancel() self.save_to_memory() log.info(f"Session ended: {self.session_id}") try: diff --git a/tests/test_voice_ask_user.py b/tests/test_voice_ask_user.py index 7936c6f..73942b8 100644 --- a/tests/test_voice_ask_user.py +++ b/tests/test_voice_ask_user.py @@ -296,7 +296,12 @@ def test_poll_skips_chat_asked_from_with_different_cid(temp_askuser_paths): def test_poll_skips_already_announced_question(temp_askuser_paths): """Once announced this session, the same qid is skipped on the next poll - (avoids re-announcing the same question forever).""" + (avoids re-announcing the same question forever). + + L2 / SR-62: the poll no longer self-marks announced — the caller marks it + only AFTER a successful announce (so a failed announce retries). The dedup + contract is unchanged; here we mark it ourselves to simulate the caller's + post-announce step, then confirm the next poll skips it.""" pq_path, _, _ = temp_askuser_paths _write_pending(pq_path, [{ "id": "q_xxx", "status": "pending", "correlation_id": "session-cid", @@ -305,9 +310,12 @@ def test_poll_skips_already_announced_question(temp_askuser_paths): p = _FakeVoicePipeline("session-cid") rec1 = _await(p._poll_pending_question_for_voice()) assert rec1 is not None - # Second poll — already announced. - rec2 = _await(p._poll_pending_question_for_voice()) - assert rec2 is None + # Poll again BEFORE marking → still returned (the fix: a failed announce + # must be able to retry). + assert _await(p._poll_pending_question_for_voice()) is not None + # Caller marks it announced after speaking it → now skipped. + p._announced_question_ids.add("q_xxx") + assert _await(p._poll_pending_question_for_voice()) is None def test_poll_skips_answered_questions(temp_askuser_paths): diff --git a/tests/test_voice_reliability_l2.py b/tests/test_voice_reliability_l2.py new file mode 100644 index 0000000..697b9aa --- /dev/null +++ b/tests/test_voice_reliability_l2.py @@ -0,0 +1,146 @@ +"""L2 — regression tests for codec_voice.py reliability fixes (review sweep). + + - _resolve_voice_option_choice prefers the LONGEST matching label + - _enqueue_utterance is non-blocking + drops oldest on overflow (no HOL block) + - feed_audio force-flushes a runaway utterance (no unbounded buffer) + - feed_audio keeps capturing a barge-in mid-utterance during echo cooldown + - save_to_memory is idempotent (the run() + route double-call no longer 2x) + - _stream_qwen flags an error so the consumer skips persisting the sentinel +""" +from __future__ import annotations + +import asyncio +import sys +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import httpx + +_REPO = Path(__file__).resolve().parents[1] +if str(_REPO) not in sys.path: + sys.path.insert(0, str(_REPO)) + + +def _make_pipeline(): + ws = AsyncMock() + ws.send_bytes = AsyncMock() + ws.send_json = AsyncMock() + with patch("codec_voice.VoicePipeline._load_skills"): + with patch("codec_voice._build_system_prompt", return_value="sys"): + from codec_voice import VoicePipeline + p = VoicePipeline(ws) + p.skills = {} + p._http = AsyncMock(spec=httpx.AsyncClient) + return p + + +# ── 1. longest-match option resolver ─────────────────────────────────────── +class TestOptionResolver: + def test_longest_label_wins(self): + from codec_voice import _resolve_voice_option_choice + opts = ["yes", "yes and notify"] + # "yes" appears first in the list but "yes and notify" is the real intent + got = _resolve_voice_option_choice("okay, yes and notify me", opts) + assert got == "yes and notify" + + def test_plain_yes_still_matches(self): + from codec_voice import _resolve_voice_option_choice + got = _resolve_voice_option_choice("yes", ["yes", "yes and notify"]) + assert got == "yes" + + def test_strict_mode_bypassed(self): + from codec_voice import _resolve_voice_option_choice + raw = _resolve_voice_option_choice("delete", ["delete", "cancel"], strict=True) + assert raw == "delete" # returned verbatim for the strict-consent gate + + +# ── 2. non-blocking enqueue drops oldest on overflow ─────────────────────── +class TestEnqueueUtterance: + def test_overflow_drops_oldest_keeps_newest(self): + p = _make_pipeline() + # maxsize is 3 + for i in range(3): + p.utterance_queue.put_nowait(bytes([i])) + assert p.utterance_queue.full() + p._enqueue_utterance(b"\xff") # 4th — must not block, drops oldest + assert p.utterance_queue.qsize() == 3 + items = [p.utterance_queue.get_nowait() for _ in range(3)] + assert items[0] == bytes([1]), "oldest (0) should have been dropped" + assert items[-1] == b"\xff", "newest should be present" + + def test_normal_enqueue_when_space(self): + p = _make_pipeline() + p._enqueue_utterance(b"hi") + assert p.utterance_queue.qsize() == 1 + + +# ── 3. feed_audio bounds + barge-in capture ───────────────────────────────── +class TestFeedAudio: + def test_runaway_utterance_force_flushes(self): + import codec_voice as cv + p = _make_pipeline() + # a single chunk above the cap → must flush immediately, not buffer + big = b"\x10\x10" * (cv.MAX_UTTERANCE_BYTES // 2 + 100) + out = p.feed_audio(big) + assert out is not None, "utterance over the cap must force-flush" + assert len(p.audio_buffer) == 0, "buffer must be reset after force-flush" + + def test_barge_in_during_cooldown_still_captures(self): + import time as _t + import codec_voice as cv + p = _make_pipeline() + # simulate: user already mid-utterance, CODEC's TTS cooldown active + p.is_speaking = True + p.last_tts_end = _t.monotonic() + cv.VAD_ECHO_COOLDOWN # cooldown in effect + before = len(p.audio_buffer) + loud = (b"\x00\x40") * 800 # above VAD threshold + p.feed_audio(loud) + # mid-utterance audio must be appended, not dropped by the cooldown + assert len(p.audio_buffer) > before, "barge-in audio was dropped during cooldown" + + def test_new_start_suppressed_during_cooldown(self): + import time as _t + import codec_voice as cv + p = _make_pipeline() + p.is_speaking = False # NOT already speaking + p.last_tts_end = _t.monotonic() + cv.VAD_ECHO_COOLDOWN + loud = (b"\x00\x40") * 800 + out = p.feed_audio(loud) + # a fresh start during cooldown is still suppressed (echo guard intact) + assert out is None + assert len(p.audio_buffer) == 0 + + +# ── 4. save_to_memory idempotency ─────────────────────────────────────────── +def test_save_to_memory_is_idempotent(): + p = _make_pipeline() + p.messages = [ + {"role": "user", "content": "hi"}, + {"role": "assistant", "content": "hello"}, + ] + mem = MagicMock() + with patch("codec_memory.CodecMemory", return_value=mem): + p.save_to_memory() + p.save_to_memory() # second call (route + run double-finally) must no-op + assert mem.save.call_count == 2, ( + f"expected 2 saves (one per message, once), got {mem.save.call_count}" + ) + + +# ── 5. _stream_qwen flags error so consumer can skip persisting sentinel ──── +def test_stream_qwen_sets_error_flag_on_failure(): + p = _make_pipeline() + + async def _boom(*a, **k): + raise RuntimeError("qwen down") + yield # pragma: no cover - makes this an async generator + + async def _drive(): + import codec_llm + with patch.object(codec_llm, "astream", _boom): + chunks = [c async for c in p._stream_qwen([{"role": "user", "content": "x"}])] + return chunks + + chunks = asyncio.run(_drive()) + assert p._stream_error is True + assert any("processing error" in c.lower() for c in chunks), "user should still hear the error"