From a64d05cafdfabc47a090a5822137febdf6b5c450 Mon Sep 17 00:00:00 2001 From: Nosko Artem Date: Mon, 25 May 2026 07:58:49 +0300 Subject: [PATCH 1/2] refactor(typing): centralize chat-action throttle in handlers/typing.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pull the per-callsite ``send_chat_action(TYPING) + logger.info(typing_fired)`` blocks (status_polling, session_events, forward_command, caption_forward, inbox_file_forward, voice_handler, text_handler.post_send) behind a single ``fire_typing`` helper with a 4 s per-user throttle. A session emitting events at 1 Hz was burning ~5× the per-chat Telegram budget and contributed to the 429 ``Retry after 71 s`` bans seen during heavy multi-session usage. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/ccbot/bot/messages.py | 94 +++------------------------ src/ccbot/bot/session_events.py | 26 +++----- src/ccbot/handlers/status_polling.py | 36 ++++------- src/ccbot/handlers/typing.py | 78 +++++++++++++++++++++++ tests/test_typing_throttle.py | 95 ++++++++++++++++++++++++++++ 5 files changed, 202 insertions(+), 127 deletions(-) create mode 100644 src/ccbot/handlers/typing.py create mode 100644 tests/test_typing_throttle.py diff --git a/src/ccbot/bot/messages.py b/src/ccbot/bot/messages.py index 81d4fcdf..d1b1ec8d 100644 --- a/src/ccbot/bot/messages.py +++ b/src/ccbot/bot/messages.py @@ -20,7 +20,6 @@ from typing import Any from telegram import Bot, Update -from telegram.constants import ChatAction from telegram.error import BadRequest from telegram.ext import ContextTypes @@ -53,6 +52,7 @@ repost_card, resume_card_view, ) +from ..handlers.typing import fire_typing from ..session_models import Session from ..handlers.inbox import save_inbox_file from ..markdown_v2 import convert_markdown @@ -232,18 +232,7 @@ async def forward_command_handler( logger.info( "Forwarding command %s to window %s (user=%d)", cc_slash, display, user.id ) - await update.message.chat.send_action(ChatAction.TYPING) - logger.info( - "typing_fired source=forward_command user=%d wid=%s", - user.id, - wid, - extra={ - "event": "typing_fired", - "source": "forward_command", - "user_id": user.id, - "window_id": wid, - }, - ) + await fire_typing(context.bot, user.id, "forward_command", window_id=wid) if await _intercept_if_pending_ui(context.bot, user.id, wid, update.message): return sess = session_manager.find_session_by_window(wid) @@ -380,18 +369,7 @@ async def unsupported_content_handler( body_parts.extend(hidden_urls) text_to_send = "\n".join(body_parts) - await msg.chat.send_action(ChatAction.TYPING) - logger.info( - "typing_fired source=caption_forward user=%d wid=%s", - user.id, - wid, - extra={ - "event": "typing_fired", - "source": "caption_forward", - "user_id": user.id, - "window_id": wid, - }, - ) + await fire_typing(context.bot, user.id, "caption_forward", window_id=wid) if await _intercept_if_pending_ui(context.bot, user.id, wid, msg): return sess = session_manager.find_session_by_window(wid) @@ -445,23 +423,7 @@ async def _forward_inbox_file( else: rel_path = str(file_path) text_to_send = f"{caption}\n\n{rel_path}" if caption.strip() else rel_path - try: - await bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING) - logger.info( - "typing_fired source=inbox_file_forward user=%d wid=%s label=%s", - user_id, - wid, - label, - extra={ - "event": "typing_fired", - "source": "inbox_file_forward", - "user_id": user_id, - "window_id": wid, - "label": label, - }, - ) - except Exception: - pass + await fire_typing(bot, user_id, "inbox_file_forward", window_id=wid, label=label) return await session_manager.send_to_window(wid, text_to_send) @@ -638,18 +600,7 @@ async def voice_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> N await safe_reply(update.message, f"⚠ Transcription failed: {e}") return - await update.message.chat.send_action(ChatAction.TYPING) - logger.info( - "typing_fired source=voice_handler user=%d wid=%s", - user.id, - wid, - extra={ - "event": "typing_fired", - "source": "voice_handler", - "user_id": user.id, - "window_id": wid, - }, - ) + await fire_typing(context.bot, user.id, "voice_handler", window_id=wid) if await _intercept_if_pending_ui(context.bot, user.id, wid, update.message): return @@ -919,24 +870,10 @@ async def _dispatch_text_to_active( # its first event (long tool prelude / thinking) and # ``status_polling`` won't fire typing until the pane enters # the busy-spinner state. Without this early fire the chat - # looks frozen. - try: - await context.bot.send_chat_action( - chat_id=user_id, action=ChatAction.TYPING - ) - logger.info( - "typing_fired source=text_handler.post_send user=%d wid=%s", - user_id, - wid, - extra={ - "event": "typing_fired", - "source": "text_handler.post_send", - "user_id": user_id, - "window_id": wid, - }, - ) - except Exception: - pass + # looks frozen. fire_typing throttles to one call per ~4 s + # per user — if text_handler already fired Typing a moment + # ago, this is a silent no-op (the indicator is still on). + await fire_typing(context.bot, user_id, "text_handler.post_send", window_id=wid) sess = session_manager.find_session_by_window(wid) if sess is not None: @@ -1001,18 +938,7 @@ async def text_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No if wid is None: return - await update.message.chat.send_action(ChatAction.TYPING) - logger.info( - "typing_fired source=text_handler user=%d wid=%s", - user.id, - wid, - extra={ - "event": "typing_fired", - "source": "text_handler", - "user_id": user.id, - "window_id": wid, - }, - ) + await fire_typing(context.bot, user.id, "text_handler", window_id=wid) # New message pushes pane content down — kill any in-flight bash capture. cancel_bash_capture(user.id, wid) diff --git a/src/ccbot/bot/session_events.py b/src/ccbot/bot/session_events.py index 2c7b8820..63094cea 100644 --- a/src/ccbot/bot/session_events.py +++ b/src/ccbot/bot/session_events.py @@ -20,7 +20,6 @@ from pathlib import Path from telegram import Bot -from telegram.constants import ChatAction from ..config import config from ..handlers import bg_status @@ -35,6 +34,7 @@ refresh_panel, update_session_card, ) +from ..handlers.typing import fire_typing from ..session import session_manager from ..session_monitor import NewMessage from ..terminal_parser import extract_interactive_content @@ -96,23 +96,13 @@ async def handle_new_message(msg: NewMessage, bot: Bot) -> None: # once events stop. Bg sessions skip — they don't surface in # the chat header. if is_active: - try: - await bot.send_chat_action(chat_id=user_id, action=ChatAction.TYPING) - logger.info( - "typing_fired source=session_events user=%d sess=%s ctype=%s", - user_id, - sess.id, - msg.content_type, - extra={ - "event": "typing_fired", - "source": "session_events", - "user_id": user_id, - "session_id": sess.id, - "content_type": msg.content_type, - }, - ) - except Exception as e: - logger.debug("send_chat_action TYPING failed: %s", e) + await fire_typing( + bot, + user_id, + "session_events", + session_id=sess.id, + content_type=msg.content_type, + ) if not config.show_tool_calls and msg.content_type in ( "tool_use", diff --git a/src/ccbot/handlers/status_polling.py b/src/ccbot/handlers/status_polling.py index 112b2503..0e9a0de8 100644 --- a/src/ccbot/handlers/status_polling.py +++ b/src/ccbot/handlers/status_polling.py @@ -21,7 +21,6 @@ from typing import TYPE_CHECKING from telegram import Bot -from telegram.constants import ChatAction from ..config import config from ..session import session_manager @@ -50,6 +49,7 @@ maybe_finalize_stalled, refresh_panel, ) +from .typing import fire_typing # Match option lines like " 1. Yes" / " ❯ 2. Yes, and don't ask again". _OPTION_LINE_RE = re.compile(r"^[\s❯>]*?(\d+)\.\s+(.+?)\s*$") @@ -350,30 +350,16 @@ async def _drive_typing_indicator( ) if not is_bg_session and not in_menu and (card_busy or pane_busy): - try: - await bot.send_chat_action(chat_id=user_id, action=ChatAction.TYPING) - logger.info( - "typing_fired source=status_polling user=%d sess=%s wid=%s " - "card_busy=%s pane_busy=%s status=%r", - user_id, - sess.id if sess else "-", - window_id, - card_busy, - pane_busy, - status_line[:40] if status_line else "", - extra={ - "event": "typing_fired", - "source": "status_polling", - "user_id": user_id, - "session_id": sess.id if sess else None, - "window_id": window_id, - "card_busy": card_busy, - "pane_busy": pane_busy, - "status_line": status_line[:80] if status_line else "", - }, - ) - except Exception as e: - logger.debug("send_chat_action TYPING failed: %s", e) + await fire_typing( + bot, + user_id, + "status_polling", + session_id=sess.id if sess else None, + window_id=window_id, + card_busy=card_busy, + pane_busy=pane_busy, + status_line=status_line[:80] if status_line else "", + ) async def update_status_message( diff --git a/src/ccbot/handlers/typing.py b/src/ccbot/handlers/typing.py new file mode 100644 index 00000000..47d3df6e --- /dev/null +++ b/src/ccbot/handlers/typing.py @@ -0,0 +1,78 @@ +"""Per-user throttle on send_chat_action(TYPING). + +Telegram's typing indicator stays visible for ~5 s after one +``send_chat_action`` call. Two of our call sites — ``status_polling`` +(every 1 s while a session is busy) and ``session_events`` (every +inbound claude event) — used to re-fire the action many times per +second during heavy tool sequences. Each call counts toward +Telegram's per-chat rate budget and was a measurable contributor to +the 429 ``Retry after 71 s`` bans seen during heavy multi-session +usage. + +``fire_typing`` collapses all callers behind a single per-user +timestamp. A call within ``TYPING_REFRESH_INTERVAL`` of the last +successful fire for the same user is a silent no-op — the indicator +is still on, no API call needed. +""" + +from __future__ import annotations + +import logging +import time +from typing import Any + +from telegram import Bot +from telegram.constants import ChatAction + +logger = logging.getLogger(__name__) + +# Telegram refreshes the indicator on every chat-action; one call +# keeps it visible for ~5 s. We refresh at 4 s so the indicator +# stays solid for a steadily-emitting session, but every call within +# 4 s of the last one is dropped — that's the entire point. +TYPING_REFRESH_INTERVAL = 4.0 + +_last_fired: dict[int, float] = {} + + +async def fire_typing( + bot: Bot, + user_id: int, + source: str, + **extra: Any, +) -> bool: + """Fire ``send_chat_action(TYPING)`` for ``user_id``, throttled. + + Returns ``True`` iff the chat-action was actually sent. Calls + within ``TYPING_REFRESH_INTERVAL`` of the last successful fire + for this user are dropped silently (the indicator is still + showing — there's nothing to log). + + ``source`` and ``**extra`` are stamped onto the structured + ``typing_fired`` log record for parity with the prior per-site + logging. + """ + now = time.monotonic() + last = _last_fired.get(user_id, 0.0) + if now - last < TYPING_REFRESH_INTERVAL: + return False + try: + await bot.send_chat_action(chat_id=user_id, action=ChatAction.TYPING) + except Exception as e: + logger.debug("send_chat_action TYPING failed: %s", e) + return False + _last_fired[user_id] = now + pretty = " ".join(f"{k}={v}" for k, v in extra.items()) + logger.info( + "typing_fired source=%s user=%d %s", + source, + user_id, + pretty, + extra={ + "event": "typing_fired", + "source": source, + "user_id": user_id, + **extra, + }, + ) + return True diff --git a/tests/test_typing_throttle.py b/tests/test_typing_throttle.py new file mode 100644 index 00000000..080f4053 --- /dev/null +++ b/tests/test_typing_throttle.py @@ -0,0 +1,95 @@ +"""Tests for the per-user typing-indicator throttle. + +``fire_typing`` collapses the many ``send_chat_action(TYPING)`` callers +behind a single per-user timestamp so a session emitting events at +1 Hz doesn't burn 5× the per-chat Telegram budget. Repeated calls +within ``TYPING_REFRESH_INTERVAL`` of the last successful fire are +silent no-ops. Was a measurable contributor to the 429 ``Retry after +71 s`` bans before this throttle landed. +""" + +from __future__ import annotations + +from unittest.mock import AsyncMock + +import pytest + +from ccbot.handlers import typing as typing_mod +from ccbot.handlers.typing import TYPING_REFRESH_INTERVAL, fire_typing + + +@pytest.fixture(autouse=True) +def _clear_state(): + typing_mod._last_fired.clear() + yield + typing_mod._last_fired.clear() + + +@pytest.mark.asyncio +async def test_first_call_fires() -> None: + bot = AsyncMock() + sent = await fire_typing(bot, 42, "test") + assert sent is True + bot.send_chat_action.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_second_call_within_window_is_suppressed() -> None: + bot = AsyncMock() + await fire_typing(bot, 42, "test") + bot.send_chat_action.reset_mock() + sent = await fire_typing(bot, 42, "test") + assert sent is False + bot.send_chat_action.assert_not_called() + + +@pytest.mark.asyncio +async def test_call_after_window_fires_again(monkeypatch) -> None: + bot = AsyncMock() + fake_time = [1000.0] + monkeypatch.setattr(typing_mod.time, "monotonic", lambda: fake_time[0]) + await fire_typing(bot, 42, "test") + fake_time[0] += TYPING_REFRESH_INTERVAL + 0.01 + sent = await fire_typing(bot, 42, "test") + assert sent is True + assert bot.send_chat_action.await_count == 2 + + +@pytest.mark.asyncio +async def test_separate_users_are_throttled_independently() -> None: + bot = AsyncMock() + sent_a = await fire_typing(bot, 1, "test") + sent_b = await fire_typing(bot, 2, "test") + assert sent_a is True + assert sent_b is True + assert bot.send_chat_action.await_count == 2 + + +@pytest.mark.asyncio +async def test_api_failure_does_not_lock_throttle(monkeypatch) -> None: + """A failed send_chat_action must not stamp the cache — + otherwise a transient TG hiccup at t=0 would suppress every + later call until TYPING_REFRESH_INTERVAL expires.""" + bot = AsyncMock() + bot.send_chat_action.side_effect = RuntimeError("network") + sent = await fire_typing(bot, 42, "test") + assert sent is False + # Cache must be empty so the next call can retry. + assert 42 not in typing_mod._last_fired + + bot.send_chat_action.side_effect = None + sent = await fire_typing(bot, 42, "test") + assert sent is True + + +@pytest.mark.asyncio +async def test_high_rate_burst_collapses_to_one_call(monkeypatch) -> None: + """20 polls within 1 s — the original spam pattern from + status_polling. Exactly one chat-action should reach Telegram.""" + bot = AsyncMock() + fake_time = [1000.0] + monkeypatch.setattr(typing_mod.time, "monotonic", lambda: fake_time[0]) + for _ in range(20): + await fire_typing(bot, 42, "status_polling") + fake_time[0] += 0.05 # 50 ms between polls + assert bot.send_chat_action.await_count == 1 From 1cdf386a28700c4564ef3421bfe24c516417c1d5 Mon Sep 17 00:00:00 2001 From: Nosko Artem Date: Mon, 25 May 2026 07:59:15 +0300 Subject: [PATCH 2/2] fix(resume-settle): unfreeze bot during session restore (60-200 s outage) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A ``claude --resume`` of a near-limit transcript could leave the bot silent for the full ``CCBOT_RESUME_SETTLE_TIMEOUT=200 s`` after the user typed their first message — the chat looked like Telegram had spam-banned us. Two root causes, both fixed here. 1. parser false-positive on static finishing markers. ``parse_status_line`` recognised ``✻`` (SPINNER_ONLY) anywhere above the chrome separator. A resumed pane almost always ends with ``✻ Cogitated for 2m 23s`` or ``✻ Thought for 14s`` from the previous turn — these are STATIC, not a live spinner, but the parser kept returning them as a "busy" status forever, so ``_wait_for_resume_settle`` never accumulated 4 s of idle. Fix: ``_STATUS_FINISHED_RE`` skips `` for [smh]`` lines for SPINNER_ONLY chars. The SPINNER_AMBIGUOUS path already required time-stats parens and was unaffected. 2. ``send_to_window`` blocked the message-handler hot path for the full settle wait. ``begin_repost_intent`` had already buffered the live card, so during the wait the chat saw zero edits, zero typing indicator after ~5 s, and every subsequent user message stacked behind the same wait. Once the timeout finally cleared, 3–5 buffered messages drained at once and tripped Telegram's per-chat flood limit → AIORateLimiter 429 retry cascade. Fix: ``mark_window_resuming`` now spawns a background ``_watch_resume_settle`` task that * runs the existing settle-wait loop off the hot path, * refreshes ``fire_typing`` every 4 s so the chat stays "live", * drains ``_pending_sends`` in arrival order with a 0.3 s gap between back-to-back ``send_keys`` so claude separates the prompts. ``send_to_window`` now buffers into ``_pending_sends[wid]`` and returns immediately when a window is mid-resume. Restore callers (``handlers/archive.py``, ``bot/_session_create.py``) pass ``bot`` + ``user_id`` so the watcher can fire TYPING. Tests - ``test_terminal_parser``: 5 new cases (4 finishing-marker parametrizations + fallback to live spinner above a marker). - ``test_session.TestResumeSettleGate``: existing 3 cases updated to assert non-blocking semantics; new ``test_multiple_prompts_ buffered_and_drained_in_order`` and ``test_mark_resuming_is_ idempotent``. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/ccbot/bot/_session_create.py | 13 ++- src/ccbot/handlers/archive.py | 10 +- src/ccbot/session.py | 163 +++++++++++++++++++++++++--- src/ccbot/terminal_parser.py | 22 +++- tests/ccbot/test_session.py | 68 +++++++++++- tests/ccbot/test_terminal_parser.py | 27 +++++ 6 files changed, 275 insertions(+), 28 deletions(-) diff --git a/src/ccbot/bot/_session_create.py b/src/ccbot/bot/_session_create.py index 01fa5cf0..9ac2259c 100644 --- a/src/ccbot/bot/_session_create.py +++ b/src/ccbot/bot/_session_create.py @@ -86,10 +86,15 @@ async def create_and_activate_session( # SessionStart hook. The hook is confirmed (and the fresh session_id # bound) after the paint, before any pending text is forwarded. if resume_session_id: - # A near-limit transcript auto-compacts on resume (60-110s); flag the - # window so the held pending text (and any first message) waits for - # the pane to settle instead of being typed mid-compaction and lost. - session_manager.mark_window_resuming(created_wid) + # A near-limit transcript auto-compacts on resume (60-110s); flag + # the window so any prompt that arrives while we're still + # compacting buffers into _pending_sends instead of being typed + # mid-compaction. The background watcher drains the buffer after + # the pane settles AND refreshes Telegram TYPING in the meantime + # so the chat doesn't look frozen. + session_manager.mark_window_resuming( + created_wid, bot=context.bot, user_id=user.id + ) hook_ok = await session_manager.wait_for_session_map_entry( created_wid, timeout=15.0 ) diff --git a/src/ccbot/handlers/archive.py b/src/ccbot/handlers/archive.py index bb5e6de9..ce5c09fa 100644 --- a/src/ccbot/handlers/archive.py +++ b/src/ccbot/handlers/archive.py @@ -306,7 +306,6 @@ async def restore_session(bot: Bot, user_id: int, sess: Session) -> tuple[bool, Returns (success, message). """ - del bot # unused for now; reserved for future logging if sess.state in ("active", "idle"): return False, "Session already live" workdir = sess.workdir or "" @@ -321,10 +320,13 @@ async def restore_session(bot: Bot, user_id: int, sess: Session) -> tuple[bool, return False, message # A near-limit transcript auto-compacts on resume (60-110s) before it - # accepts input. Flag the window so the first message waits the pane out - # instead of getting typed into a busy pane and dropped. + # accepts input. Flag the window so any prompts that arrive while + # we're still compacting buffer into _pending_sends instead of being + # typed mid-compaction. The background watcher drains the buffer + # once the pane settles AND keeps Telegram TYPING refreshed so the + # chat doesn't look frozen during the wait. if sess.claude_session_id: - session_manager.mark_window_resuming(created_wid) + session_manager.mark_window_resuming(created_wid, bot=bot, user_id=user_id) hook_ok = await session_manager.wait_for_session_map_entry( created_wid, timeout=15.0 diff --git a/src/ccbot/session.py b/src/ccbot/session.py index 2dff3e3c..6350d0d4 100644 --- a/src/ccbot/session.py +++ b/src/ccbot/session.py @@ -28,10 +28,13 @@ import time from dataclasses import dataclass, field from pathlib import Path -from typing import Any, ClassVar +from typing import TYPE_CHECKING, Any, ClassVar import aiofiles +if TYPE_CHECKING: + from telegram import Bot + from .config import config from .session_models import ClaudeSession, Session, SessionState, WindowState from .terminal_parser import parse_status_line @@ -55,6 +58,12 @@ _RESUME_SETTLE_BUSY_GRACE = 6.0 # s to wait for a compaction spinner to appear _RESUME_SETTLE_IDLE_STABLE = 4.0 # s the pane must stay idle to count as settled _RESUME_SETTLE_POLL = 1.5 # s between pane captures +# Inter-send gap when the background watcher drains buffered prompts — +# claude's TUI needs a tick to separate back-to-back Enter submissions. +_RESUME_SETTLE_DRAIN_GAP = 0.3 +# How often the watcher fires Telegram TYPING while waiting. ~4s matches +# fire_typing's own throttle and Telegram's ~5s indicator decay. +_RESUME_SETTLE_TYPING_REFRESH = 4.0 def key_matches_window(key: str, window_id: str) -> bool: @@ -120,11 +129,18 @@ class SessionManager: # Value = {"summary": str, "mtime": float, "ts": float}. summary_cache: dict[str, dict[str, Any]] = field(default_factory=dict) # Window ids that were just `claude --resume`d and may still be - # auto-compacting. The first ``send_to_window`` to such a window waits - # for the pane to settle before typing (see ``_wait_for_resume_settle``). + # auto-compacting. While a window is here, ``send_to_window`` buffers + # prompts into ``_pending_sends`` instead of typing them, and a + # background task in ``_resume_settle_tasks`` drains the buffer once + # the pane settles (see ``_watch_resume_settle``). # In-memory only — never persisted; a restart means compaction has long - # finished anyway. Discarded on the first settled send. + # finished anyway. _resuming_windows: set[str] = field(default_factory=set) + # Prompts buffered while a window is mid-resume. Drained in arrival + # order by ``_watch_resume_settle`` after the pane goes idle. + _pending_sends: dict[str, list[str]] = field(default_factory=dict) + # Background watcher tasks, one per resuming window. + _resume_settle_tasks: dict[str, "asyncio.Task[None]"] = field(default_factory=dict) def __post_init__(self) -> None: self._load_state() @@ -879,11 +895,120 @@ def all_user_sessions_with_claude_id( # --- Tmux helpers --- - def mark_window_resuming(self, window_id: str) -> None: - """Flag a window as freshly ``--resume``d so its first send waits out - any auto-compaction (see ``_wait_for_resume_settle``).""" - if config.resume_settle_timeout > 0: + def mark_window_resuming( + self, + window_id: str, + *, + bot: "Bot | None" = None, + user_id: int | None = None, + ) -> None: + """Flag a window as freshly ``--resume``d and spawn the settle watcher. + + Prompts that arrive while the window is flagged are buffered into + ``_pending_sends`` by ``send_to_window`` and drained by + ``_watch_resume_settle`` once the pane goes idle — the message + handler never blocks on the wait. When ``bot``/``user_id`` are + supplied, the watcher also keeps Telegram's TYPING indicator alive + so the chat doesn't look frozen during a long compaction. + """ + if config.resume_settle_timeout <= 0: + return + if window_id in self._resuming_windows: + return # already being watched + try: + loop = asyncio.get_running_loop() + except RuntimeError: + # Called outside an event loop (e.g. sync test setup) — keep + # the flag-only fallback so existing behavior is preserved. self._resuming_windows.add(window_id) + return + self._resuming_windows.add(window_id) + self._resume_settle_tasks[window_id] = loop.create_task( + self._watch_resume_settle(window_id, bot, user_id), + name=f"resume-settle:{window_id}", + ) + + async def _watch_resume_settle( + self, + window_id: str, + bot: "Bot | None", + user_id: int | None, + ) -> None: + """Background watcher for a resuming window. + + Polls the pane via ``_wait_for_resume_settle`` until it settles + (or the configured timeout elapses), then drains anything + ``send_to_window`` buffered into ``_pending_sends`` while the + gate was up. Concurrently keeps Telegram's TYPING indicator + refreshed so the user sees the bot is still working. + """ + stop_typing = asyncio.Event() + + async def _typing_keepalive() -> None: + if bot is None or user_id is None: + return + # Local import — handlers.typing pulls in telegram modules + # and ``session`` is imported very early. + from .handlers.typing import fire_typing + + while not stop_typing.is_set(): + try: + await fire_typing( + bot, user_id, "resume_settle", window_id=window_id + ) + except Exception as e: + logger.debug("resume-settle typing keepalive failed: %s", e) + try: + await asyncio.wait_for( + stop_typing.wait(), timeout=_RESUME_SETTLE_TYPING_REFRESH + ) + except asyncio.TimeoutError: + pass + + keepalive_task = asyncio.create_task( + _typing_keepalive(), name=f"resume-settle-typing:{window_id}" + ) + try: + settled = await self._wait_for_resume_settle(window_id) + logger.info( + "resume-settle gate cleared for window %s (settled=%s, background)", + window_id, + settled, + ) + pending = self._pending_sends.pop(window_id, []) + for i, text in enumerate(pending): + ok = await tmux_manager.send_keys(window_id, text) + if not ok: + logger.warning( + "resume-settle: failed to drain pending send #%d " + "for window %s (text_len=%d)", + i, + window_id, + len(text), + ) + if i < len(pending) - 1: + await asyncio.sleep(_RESUME_SETTLE_DRAIN_GAP) + if pending: + logger.info( + "resume-settle: drained %d pending send(s) for window %s", + len(pending), + window_id, + ) + except Exception as e: + logger.exception( + "resume-settle watcher failed for window %s: %s", window_id, e + ) + finally: + stop_typing.set() + try: + await keepalive_task + except Exception: + pass + self._resuming_windows.discard(window_id) + self._resume_settle_tasks.pop(window_id, None) + # Belt-and-suspenders: on exception path, drop any prompts + # that didn't get drained so they can't leak forever. + self._pending_sends.pop(window_id, None) async def _wait_for_resume_settle(self, window_id: str) -> bool: """Block until a just-resumed window is safe to type into. @@ -931,7 +1056,14 @@ async def _wait_for_resume_settle(self, window_id: str) -> bool: return False async def send_to_window(self, window_id: str, text: str) -> tuple[bool, str]: - """Send text to a tmux window by ID.""" + """Send text to a tmux window by ID. + + For windows mid-resume (``mark_window_resuming`` was called and + the background watcher hasn't settled yet), the text is buffered + into ``_pending_sends`` and we return success immediately — the + watcher drains the buffer when the pane is ready. This keeps the + message handler off the hot path of a 60-200s compaction wait. + """ display = self.get_display_name(window_id) logger.debug( "send_to_window: window_id=%s (%s), text_len=%d", @@ -942,16 +1074,17 @@ async def send_to_window(self, window_id: str, text: str) -> tuple[bool, str]: window = await tmux_manager.find_window_by_id(window_id) if not window: return False, "Window not found (may have been closed)" - # A freshly-resumed window may still be auto-compacting; hold the - # first send until the pane settles so the prompt isn't dropped. if window_id in self._resuming_windows: - settled = await self._wait_for_resume_settle(window_id) - self._resuming_windows.discard(window_id) + queue = self._pending_sends.setdefault(window_id, []) + queue.append(text) logger.info( - "resume-settle gate cleared for window %s (settled=%s)", + "send_to_window buffered: window=%s pending=%d text_len=%d " + "(resume in progress)", window_id, - settled, + len(queue), + len(text), ) + return True, f"Queued for {display} (session restoring)" success = await tmux_manager.send_keys(window.window_id, text) if success: return True, f"Sent to {display}" diff --git a/src/ccbot/terminal_parser.py b/src/ccbot/terminal_parser.py index 59e3c190..b05463f5 100644 --- a/src/ccbot/terminal_parser.py +++ b/src/ccbot/terminal_parser.py @@ -333,6 +333,20 @@ def is_interactive_ui(pane_text: str) -> bool: _STATUS_TIME_STATS_RE = re.compile(r"\(\s*\d+(?:m\s*\d+)?\s*[smh]") +# Post-thinking finishing markers like ``✻ Cogitated for 2m 23s`` or +# ``✻ Thought for 14s`` use the same spinner glyph as a live status line +# but are *static* — they sit on the pane indefinitely after a turn +# closes. A ``claude --resume`` re-renders the previous state, so these +# lines persist on the pane forever and would otherwise read as +# "permanently busy" to ``parse_status_line``, locking +# ``_wait_for_resume_settle`` until its 200s timeout. +# Discriminator: live status uses present-participle (``Cogitating…``); +# finishing marker uses past-tense `` for