diff --git a/src/ccbot/bot/_session_create.py b/src/ccbot/bot/_session_create.py index 01fa5cf0..9ac2259c 100644 --- a/src/ccbot/bot/_session_create.py +++ b/src/ccbot/bot/_session_create.py @@ -86,10 +86,15 @@ async def create_and_activate_session( # SessionStart hook. The hook is confirmed (and the fresh session_id # bound) after the paint, before any pending text is forwarded. if resume_session_id: - # A near-limit transcript auto-compacts on resume (60-110s); flag the - # window so the held pending text (and any first message) waits for - # the pane to settle instead of being typed mid-compaction and lost. - session_manager.mark_window_resuming(created_wid) + # A near-limit transcript auto-compacts on resume (60-110s); flag + # the window so any prompt that arrives while we're still + # compacting buffers into _pending_sends instead of being typed + # mid-compaction. The background watcher drains the buffer after + # the pane settles AND refreshes Telegram TYPING in the meantime + # so the chat doesn't look frozen. + session_manager.mark_window_resuming( + created_wid, bot=context.bot, user_id=user.id + ) hook_ok = await session_manager.wait_for_session_map_entry( created_wid, timeout=15.0 ) diff --git a/src/ccbot/bot/messages.py b/src/ccbot/bot/messages.py index 81d4fcdf..d1b1ec8d 100644 --- a/src/ccbot/bot/messages.py +++ b/src/ccbot/bot/messages.py @@ -20,7 +20,6 @@ from typing import Any from telegram import Bot, Update -from telegram.constants import ChatAction from telegram.error import BadRequest from telegram.ext import ContextTypes @@ -53,6 +52,7 @@ repost_card, resume_card_view, ) +from ..handlers.typing import fire_typing from ..session_models import Session from ..handlers.inbox import save_inbox_file from ..markdown_v2 import convert_markdown @@ -232,18 +232,7 @@ async def forward_command_handler( logger.info( "Forwarding command %s to window %s (user=%d)", cc_slash, display, user.id ) - await update.message.chat.send_action(ChatAction.TYPING) - logger.info( - "typing_fired source=forward_command user=%d wid=%s", - user.id, - wid, - extra={ - "event": "typing_fired", - "source": "forward_command", - "user_id": user.id, - "window_id": wid, - }, - ) + await fire_typing(context.bot, user.id, "forward_command", window_id=wid) if await _intercept_if_pending_ui(context.bot, user.id, wid, update.message): return sess = session_manager.find_session_by_window(wid) @@ -380,18 +369,7 @@ async def unsupported_content_handler( body_parts.extend(hidden_urls) text_to_send = "\n".join(body_parts) - await msg.chat.send_action(ChatAction.TYPING) - logger.info( - "typing_fired source=caption_forward user=%d wid=%s", - user.id, - wid, - extra={ - "event": "typing_fired", - "source": "caption_forward", - "user_id": user.id, - "window_id": wid, - }, - ) + await fire_typing(context.bot, user.id, "caption_forward", window_id=wid) if await _intercept_if_pending_ui(context.bot, user.id, wid, msg): return sess = session_manager.find_session_by_window(wid) @@ -445,23 +423,7 @@ async def _forward_inbox_file( else: rel_path = str(file_path) text_to_send = f"{caption}\n\n{rel_path}" if caption.strip() else rel_path - try: - await bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING) - logger.info( - "typing_fired source=inbox_file_forward user=%d wid=%s label=%s", - user_id, - wid, - label, - extra={ - "event": "typing_fired", - "source": "inbox_file_forward", - "user_id": user_id, - "window_id": wid, - "label": label, - }, - ) - except Exception: - pass + await fire_typing(bot, user_id, "inbox_file_forward", window_id=wid, label=label) return await session_manager.send_to_window(wid, text_to_send) @@ -638,18 +600,7 @@ async def voice_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> N await safe_reply(update.message, f"⚠ Transcription failed: {e}") return - await update.message.chat.send_action(ChatAction.TYPING) - logger.info( - "typing_fired source=voice_handler user=%d wid=%s", - user.id, - wid, - extra={ - "event": "typing_fired", - "source": "voice_handler", - "user_id": user.id, - "window_id": wid, - }, - ) + await fire_typing(context.bot, user.id, "voice_handler", window_id=wid) if await _intercept_if_pending_ui(context.bot, user.id, wid, update.message): return @@ -919,24 +870,10 @@ async def _dispatch_text_to_active( # its first event (long tool prelude / thinking) and # ``status_polling`` won't fire typing until the pane enters # the busy-spinner state. Without this early fire the chat - # looks frozen. - try: - await context.bot.send_chat_action( - chat_id=user_id, action=ChatAction.TYPING - ) - logger.info( - "typing_fired source=text_handler.post_send user=%d wid=%s", - user_id, - wid, - extra={ - "event": "typing_fired", - "source": "text_handler.post_send", - "user_id": user_id, - "window_id": wid, - }, - ) - except Exception: - pass + # looks frozen. fire_typing throttles to one call per ~4 s + # per user — if text_handler already fired Typing a moment + # ago, this is a silent no-op (the indicator is still on). + await fire_typing(context.bot, user_id, "text_handler.post_send", window_id=wid) sess = session_manager.find_session_by_window(wid) if sess is not None: @@ -1001,18 +938,7 @@ async def text_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No if wid is None: return - await update.message.chat.send_action(ChatAction.TYPING) - logger.info( - "typing_fired source=text_handler user=%d wid=%s", - user.id, - wid, - extra={ - "event": "typing_fired", - "source": "text_handler", - "user_id": user.id, - "window_id": wid, - }, - ) + await fire_typing(context.bot, user.id, "text_handler", window_id=wid) # New message pushes pane content down — kill any in-flight bash capture. cancel_bash_capture(user.id, wid) diff --git a/src/ccbot/bot/session_events.py b/src/ccbot/bot/session_events.py index 2c7b8820..63094cea 100644 --- a/src/ccbot/bot/session_events.py +++ b/src/ccbot/bot/session_events.py @@ -20,7 +20,6 @@ from pathlib import Path from telegram import Bot -from telegram.constants import ChatAction from ..config import config from ..handlers import bg_status @@ -35,6 +34,7 @@ refresh_panel, update_session_card, ) +from ..handlers.typing import fire_typing from ..session import session_manager from ..session_monitor import NewMessage from ..terminal_parser import extract_interactive_content @@ -96,23 +96,13 @@ async def handle_new_message(msg: NewMessage, bot: Bot) -> None: # once events stop. Bg sessions skip — they don't surface in # the chat header. if is_active: - try: - await bot.send_chat_action(chat_id=user_id, action=ChatAction.TYPING) - logger.info( - "typing_fired source=session_events user=%d sess=%s ctype=%s", - user_id, - sess.id, - msg.content_type, - extra={ - "event": "typing_fired", - "source": "session_events", - "user_id": user_id, - "session_id": sess.id, - "content_type": msg.content_type, - }, - ) - except Exception as e: - logger.debug("send_chat_action TYPING failed: %s", e) + await fire_typing( + bot, + user_id, + "session_events", + session_id=sess.id, + content_type=msg.content_type, + ) if not config.show_tool_calls and msg.content_type in ( "tool_use", diff --git a/src/ccbot/handlers/archive.py b/src/ccbot/handlers/archive.py index bb5e6de9..ce5c09fa 100644 --- a/src/ccbot/handlers/archive.py +++ b/src/ccbot/handlers/archive.py @@ -306,7 +306,6 @@ async def restore_session(bot: Bot, user_id: int, sess: Session) -> tuple[bool, Returns (success, message). """ - del bot # unused for now; reserved for future logging if sess.state in ("active", "idle"): return False, "Session already live" workdir = sess.workdir or "" @@ -321,10 +320,13 @@ async def restore_session(bot: Bot, user_id: int, sess: Session) -> tuple[bool, return False, message # A near-limit transcript auto-compacts on resume (60-110s) before it - # accepts input. Flag the window so the first message waits the pane out - # instead of getting typed into a busy pane and dropped. + # accepts input. Flag the window so any prompts that arrive while + # we're still compacting buffer into _pending_sends instead of being + # typed mid-compaction. The background watcher drains the buffer + # once the pane settles AND keeps Telegram TYPING refreshed so the + # chat doesn't look frozen during the wait. if sess.claude_session_id: - session_manager.mark_window_resuming(created_wid) + session_manager.mark_window_resuming(created_wid, bot=bot, user_id=user_id) hook_ok = await session_manager.wait_for_session_map_entry( created_wid, timeout=15.0 diff --git a/src/ccbot/handlers/status_polling.py b/src/ccbot/handlers/status_polling.py index 112b2503..0e9a0de8 100644 --- a/src/ccbot/handlers/status_polling.py +++ b/src/ccbot/handlers/status_polling.py @@ -21,7 +21,6 @@ from typing import TYPE_CHECKING from telegram import Bot -from telegram.constants import ChatAction from ..config import config from ..session import session_manager @@ -50,6 +49,7 @@ maybe_finalize_stalled, refresh_panel, ) +from .typing import fire_typing # Match option lines like " 1. Yes" / " ❯ 2. Yes, and don't ask again". _OPTION_LINE_RE = re.compile(r"^[\s❯>]*?(\d+)\.\s+(.+?)\s*$") @@ -350,30 +350,16 @@ async def _drive_typing_indicator( ) if not is_bg_session and not in_menu and (card_busy or pane_busy): - try: - await bot.send_chat_action(chat_id=user_id, action=ChatAction.TYPING) - logger.info( - "typing_fired source=status_polling user=%d sess=%s wid=%s " - "card_busy=%s pane_busy=%s status=%r", - user_id, - sess.id if sess else "-", - window_id, - card_busy, - pane_busy, - status_line[:40] if status_line else "", - extra={ - "event": "typing_fired", - "source": "status_polling", - "user_id": user_id, - "session_id": sess.id if sess else None, - "window_id": window_id, - "card_busy": card_busy, - "pane_busy": pane_busy, - "status_line": status_line[:80] if status_line else "", - }, - ) - except Exception as e: - logger.debug("send_chat_action TYPING failed: %s", e) + await fire_typing( + bot, + user_id, + "status_polling", + session_id=sess.id if sess else None, + window_id=window_id, + card_busy=card_busy, + pane_busy=pane_busy, + status_line=status_line[:80] if status_line else "", + ) async def update_status_message( diff --git a/src/ccbot/handlers/typing.py b/src/ccbot/handlers/typing.py new file mode 100644 index 00000000..47d3df6e --- /dev/null +++ b/src/ccbot/handlers/typing.py @@ -0,0 +1,78 @@ +"""Per-user throttle on send_chat_action(TYPING). + +Telegram's typing indicator stays visible for ~5 s after one +``send_chat_action`` call. Two of our call sites — ``status_polling`` +(every 1 s while a session is busy) and ``session_events`` (every +inbound claude event) — used to re-fire the action many times per +second during heavy tool sequences. Each call counts toward +Telegram's per-chat rate budget and was a measurable contributor to +the 429 ``Retry after 71 s`` bans seen during heavy multi-session +usage. + +``fire_typing`` collapses all callers behind a single per-user +timestamp. A call within ``TYPING_REFRESH_INTERVAL`` of the last +successful fire for the same user is a silent no-op — the indicator +is still on, no API call needed. +""" + +from __future__ import annotations + +import logging +import time +from typing import Any + +from telegram import Bot +from telegram.constants import ChatAction + +logger = logging.getLogger(__name__) + +# Telegram refreshes the indicator on every chat-action; one call +# keeps it visible for ~5 s. We refresh at 4 s so the indicator +# stays solid for a steadily-emitting session, but every call within +# 4 s of the last one is dropped — that's the entire point. +TYPING_REFRESH_INTERVAL = 4.0 + +_last_fired: dict[int, float] = {} + + +async def fire_typing( + bot: Bot, + user_id: int, + source: str, + **extra: Any, +) -> bool: + """Fire ``send_chat_action(TYPING)`` for ``user_id``, throttled. + + Returns ``True`` iff the chat-action was actually sent. Calls + within ``TYPING_REFRESH_INTERVAL`` of the last successful fire + for this user are dropped silently (the indicator is still + showing — there's nothing to log). + + ``source`` and ``**extra`` are stamped onto the structured + ``typing_fired`` log record for parity with the prior per-site + logging. + """ + now = time.monotonic() + last = _last_fired.get(user_id, 0.0) + if now - last < TYPING_REFRESH_INTERVAL: + return False + try: + await bot.send_chat_action(chat_id=user_id, action=ChatAction.TYPING) + except Exception as e: + logger.debug("send_chat_action TYPING failed: %s", e) + return False + _last_fired[user_id] = now + pretty = " ".join(f"{k}={v}" for k, v in extra.items()) + logger.info( + "typing_fired source=%s user=%d %s", + source, + user_id, + pretty, + extra={ + "event": "typing_fired", + "source": source, + "user_id": user_id, + **extra, + }, + ) + return True diff --git a/src/ccbot/session.py b/src/ccbot/session.py index 2dff3e3c..6350d0d4 100644 --- a/src/ccbot/session.py +++ b/src/ccbot/session.py @@ -28,10 +28,13 @@ import time from dataclasses import dataclass, field from pathlib import Path -from typing import Any, ClassVar +from typing import TYPE_CHECKING, Any, ClassVar import aiofiles +if TYPE_CHECKING: + from telegram import Bot + from .config import config from .session_models import ClaudeSession, Session, SessionState, WindowState from .terminal_parser import parse_status_line @@ -55,6 +58,12 @@ _RESUME_SETTLE_BUSY_GRACE = 6.0 # s to wait for a compaction spinner to appear _RESUME_SETTLE_IDLE_STABLE = 4.0 # s the pane must stay idle to count as settled _RESUME_SETTLE_POLL = 1.5 # s between pane captures +# Inter-send gap when the background watcher drains buffered prompts — +# claude's TUI needs a tick to separate back-to-back Enter submissions. +_RESUME_SETTLE_DRAIN_GAP = 0.3 +# How often the watcher fires Telegram TYPING while waiting. ~4s matches +# fire_typing's own throttle and Telegram's ~5s indicator decay. +_RESUME_SETTLE_TYPING_REFRESH = 4.0 def key_matches_window(key: str, window_id: str) -> bool: @@ -120,11 +129,18 @@ class SessionManager: # Value = {"summary": str, "mtime": float, "ts": float}. summary_cache: dict[str, dict[str, Any]] = field(default_factory=dict) # Window ids that were just `claude --resume`d and may still be - # auto-compacting. The first ``send_to_window`` to such a window waits - # for the pane to settle before typing (see ``_wait_for_resume_settle``). + # auto-compacting. While a window is here, ``send_to_window`` buffers + # prompts into ``_pending_sends`` instead of typing them, and a + # background task in ``_resume_settle_tasks`` drains the buffer once + # the pane settles (see ``_watch_resume_settle``). # In-memory only — never persisted; a restart means compaction has long - # finished anyway. Discarded on the first settled send. + # finished anyway. _resuming_windows: set[str] = field(default_factory=set) + # Prompts buffered while a window is mid-resume. Drained in arrival + # order by ``_watch_resume_settle`` after the pane goes idle. + _pending_sends: dict[str, list[str]] = field(default_factory=dict) + # Background watcher tasks, one per resuming window. + _resume_settle_tasks: dict[str, "asyncio.Task[None]"] = field(default_factory=dict) def __post_init__(self) -> None: self._load_state() @@ -879,11 +895,120 @@ def all_user_sessions_with_claude_id( # --- Tmux helpers --- - def mark_window_resuming(self, window_id: str) -> None: - """Flag a window as freshly ``--resume``d so its first send waits out - any auto-compaction (see ``_wait_for_resume_settle``).""" - if config.resume_settle_timeout > 0: + def mark_window_resuming( + self, + window_id: str, + *, + bot: "Bot | None" = None, + user_id: int | None = None, + ) -> None: + """Flag a window as freshly ``--resume``d and spawn the settle watcher. + + Prompts that arrive while the window is flagged are buffered into + ``_pending_sends`` by ``send_to_window`` and drained by + ``_watch_resume_settle`` once the pane goes idle — the message + handler never blocks on the wait. When ``bot``/``user_id`` are + supplied, the watcher also keeps Telegram's TYPING indicator alive + so the chat doesn't look frozen during a long compaction. + """ + if config.resume_settle_timeout <= 0: + return + if window_id in self._resuming_windows: + return # already being watched + try: + loop = asyncio.get_running_loop() + except RuntimeError: + # Called outside an event loop (e.g. sync test setup) — keep + # the flag-only fallback so existing behavior is preserved. self._resuming_windows.add(window_id) + return + self._resuming_windows.add(window_id) + self._resume_settle_tasks[window_id] = loop.create_task( + self._watch_resume_settle(window_id, bot, user_id), + name=f"resume-settle:{window_id}", + ) + + async def _watch_resume_settle( + self, + window_id: str, + bot: "Bot | None", + user_id: int | None, + ) -> None: + """Background watcher for a resuming window. + + Polls the pane via ``_wait_for_resume_settle`` until it settles + (or the configured timeout elapses), then drains anything + ``send_to_window`` buffered into ``_pending_sends`` while the + gate was up. Concurrently keeps Telegram's TYPING indicator + refreshed so the user sees the bot is still working. + """ + stop_typing = asyncio.Event() + + async def _typing_keepalive() -> None: + if bot is None or user_id is None: + return + # Local import — handlers.typing pulls in telegram modules + # and ``session`` is imported very early. + from .handlers.typing import fire_typing + + while not stop_typing.is_set(): + try: + await fire_typing( + bot, user_id, "resume_settle", window_id=window_id + ) + except Exception as e: + logger.debug("resume-settle typing keepalive failed: %s", e) + try: + await asyncio.wait_for( + stop_typing.wait(), timeout=_RESUME_SETTLE_TYPING_REFRESH + ) + except asyncio.TimeoutError: + pass + + keepalive_task = asyncio.create_task( + _typing_keepalive(), name=f"resume-settle-typing:{window_id}" + ) + try: + settled = await self._wait_for_resume_settle(window_id) + logger.info( + "resume-settle gate cleared for window %s (settled=%s, background)", + window_id, + settled, + ) + pending = self._pending_sends.pop(window_id, []) + for i, text in enumerate(pending): + ok = await tmux_manager.send_keys(window_id, text) + if not ok: + logger.warning( + "resume-settle: failed to drain pending send #%d " + "for window %s (text_len=%d)", + i, + window_id, + len(text), + ) + if i < len(pending) - 1: + await asyncio.sleep(_RESUME_SETTLE_DRAIN_GAP) + if pending: + logger.info( + "resume-settle: drained %d pending send(s) for window %s", + len(pending), + window_id, + ) + except Exception as e: + logger.exception( + "resume-settle watcher failed for window %s: %s", window_id, e + ) + finally: + stop_typing.set() + try: + await keepalive_task + except Exception: + pass + self._resuming_windows.discard(window_id) + self._resume_settle_tasks.pop(window_id, None) + # Belt-and-suspenders: on exception path, drop any prompts + # that didn't get drained so they can't leak forever. + self._pending_sends.pop(window_id, None) async def _wait_for_resume_settle(self, window_id: str) -> bool: """Block until a just-resumed window is safe to type into. @@ -931,7 +1056,14 @@ async def _wait_for_resume_settle(self, window_id: str) -> bool: return False async def send_to_window(self, window_id: str, text: str) -> tuple[bool, str]: - """Send text to a tmux window by ID.""" + """Send text to a tmux window by ID. + + For windows mid-resume (``mark_window_resuming`` was called and + the background watcher hasn't settled yet), the text is buffered + into ``_pending_sends`` and we return success immediately — the + watcher drains the buffer when the pane is ready. This keeps the + message handler off the hot path of a 60-200s compaction wait. + """ display = self.get_display_name(window_id) logger.debug( "send_to_window: window_id=%s (%s), text_len=%d", @@ -942,16 +1074,17 @@ async def send_to_window(self, window_id: str, text: str) -> tuple[bool, str]: window = await tmux_manager.find_window_by_id(window_id) if not window: return False, "Window not found (may have been closed)" - # A freshly-resumed window may still be auto-compacting; hold the - # first send until the pane settles so the prompt isn't dropped. if window_id in self._resuming_windows: - settled = await self._wait_for_resume_settle(window_id) - self._resuming_windows.discard(window_id) + queue = self._pending_sends.setdefault(window_id, []) + queue.append(text) logger.info( - "resume-settle gate cleared for window %s (settled=%s)", + "send_to_window buffered: window=%s pending=%d text_len=%d " + "(resume in progress)", window_id, - settled, + len(queue), + len(text), ) + return True, f"Queued for {display} (session restoring)" success = await tmux_manager.send_keys(window.window_id, text) if success: return True, f"Sent to {display}" diff --git a/src/ccbot/terminal_parser.py b/src/ccbot/terminal_parser.py index 59e3c190..b05463f5 100644 --- a/src/ccbot/terminal_parser.py +++ b/src/ccbot/terminal_parser.py @@ -333,6 +333,20 @@ def is_interactive_ui(pane_text: str) -> bool: _STATUS_TIME_STATS_RE = re.compile(r"\(\s*\d+(?:m\s*\d+)?\s*[smh]") +# Post-thinking finishing markers like ``✻ Cogitated for 2m 23s`` or +# ``✻ Thought for 14s`` use the same spinner glyph as a live status line +# but are *static* — they sit on the pane indefinitely after a turn +# closes. A ``claude --resume`` re-renders the previous state, so these +# lines persist on the pane forever and would otherwise read as +# "permanently busy" to ``parse_status_line``, locking +# ``_wait_for_resume_settle`` until its 200s timeout. +# Discriminator: live status uses present-participle (``Cogitating…``); +# finishing marker uses past-tense `` for