From a9961808585d755b3df221d56c1ac218d81404d3 Mon Sep 17 00:00:00 2001 From: "praisonai-triage-agent[bot]" <272766704+praisonai-triage-agent[bot]@users.noreply.github.com> Date: Mon, 22 Jun 2026 09:17:52 +0000 Subject: [PATCH 1/2] fix: gateway resume can silently lose events - add integrity checks (fixes #2153) - Track oldest retained cursor in session buffer - Add resync_required flag when requested cursor is below oldest retained - Send authoritative snapshot instead of partial replay when gap detected - Add top-level seq field to all events for client-side gap detection - Update wire protocol documentation with new resume fields Co-authored-by: MervinPraison --- .../praisonaiagents/gateway/protocols.py | 13 ++++ src/praisonai/praisonai/gateway/server.py | 60 +++++++++++++++++-- 2 files changed, 68 insertions(+), 5 deletions(-) diff --git a/src/praisonai-agents/praisonaiagents/gateway/protocols.py b/src/praisonai-agents/praisonaiagents/gateway/protocols.py index b250ed49b..7185f94ea 100644 --- a/src/praisonai-agents/praisonaiagents/gateway/protocols.py +++ b/src/praisonai-agents/praisonaiagents/gateway/protocols.py @@ -175,6 +175,19 @@ class GatewayEvent: source: Source identifier (agent_id, client_id, etc.) target: Target identifier (optional, for directed events) sequence: Monotonic sequence number for gap detection (optional) + + Wire Protocol Extensions: + When events are sent over the gateway, additional fields are added: + - seq: Top-level monotonic sequence number for gap detection + - cursor: Event cursor position (also stored in data['cursor']) + + Resume Protocol: + The 'joined' acknowledgment includes: + - cursor: Current head cursor position + - oldest_cursor: Oldest event still in buffer + - resync_required: True if requested 'since' is below oldest_cursor + + When resync_required=true, a 'snapshot' message follows with full state. """ type: Union[EventType, str] diff --git a/src/praisonai/praisonai/gateway/server.py b/src/praisonai/praisonai/gateway/server.py index f04750954..e1521f608 100644 --- a/src/praisonai/praisonai/gateway/server.py +++ b/src/praisonai/praisonai/gateway/server.py @@ -127,10 +127,40 @@ def add_event(self, event: GatewayEvent) -> int: self._events = self._events[-self._max_messages:] return self._event_cursor + def get_oldest_cursor(self) -> int: + """Get the oldest event cursor still retained in the buffer.""" + if self._events: + return self._events[0].data.get('cursor', self._event_cursor) + return self._event_cursor + def get_events_since(self, cursor: int) -> List[GatewayEvent]: """Get events since the given cursor.""" return [e for e in self._events if e.data.get('cursor', 0) > cursor] + def check_resync_required(self, since_cursor: Optional[int]) -> bool: + """Check if resync is required based on the requested cursor.""" + if since_cursor is None: + return False + oldest_cursor = self.get_oldest_cursor() + return since_cursor < oldest_cursor + + def get_snapshot(self) -> Dict[str, Any]: + """Get a snapshot of the current session state for resync.""" + return { + "session_id": self._session_id, + "agent_id": self._agent_id, + "state": dict(self._state), + "messages": [{ + "content": msg.content, + "sender_id": msg.sender_id, + "session_id": msg.session_id, + "message_id": msg.message_id, + "timestamp": msg.timestamp, + "metadata": msg.metadata, + } for msg in self._messages], + "event_cursor": self._event_cursor, + } + def to_dict(self) -> Dict[str, Any]: """Serialize session to dictionary for persistence.""" # Drain inbox queue to a list for serialization @@ -1327,6 +1357,10 @@ async def _handle_client_message(self, client_id: str, data: Dict[str, Any]) -> self._client_sessions[client_id] = session.session_id + # Check if resync is required + resync_required = session.check_resync_required(since_cursor) + oldest_cursor = session.get_oldest_cursor() + # Build presence snapshot presence_snapshot = [] if hasattr(self, '_presence_manager'): @@ -1340,13 +1374,15 @@ async def _handle_client_message(self, client_id: str, data: Dict[str, Any]) -> if replay_events and replay_events[0].sequence is not None: joined_sequence = replay_events[0].sequence - 1 - # Send join confirmation with protocol info and snapshot + # Send join confirmation with protocol info, integrity checks, and snapshot await self._send_to_client(client_id, { "type": "joined", "session_id": session.session_id, "agent_id": agent_id, "resumed": session._was_resumed, "cursor": session._event_cursor, + "oldest_cursor": oldest_cursor, + "resync_required": resync_required, "sequence": joined_sequence, # Sequence aligned with replay events "protocol_version": negotiated_version, "server_min_version": MIN_PROTOCOL_VERSION, @@ -1355,12 +1391,24 @@ async def _handle_client_message(self, client_id: str, data: Dict[str, Any]) -> "health": self.health(), # Health status }) - # Replay missed events if any - for event in replay_events: + if resync_required: + # Send authoritative snapshot instead of partial replay + snapshot = session.get_snapshot() await self._send_to_client(client_id, { - "type": "replay", - "event": event.to_dict(), + "type": "snapshot", + "state": snapshot, }) + else: + # Replay missed events if any + for event in replay_events: + event_data = event.to_dict() + # Include top-level sequence number from the cursor + seq = event.data.get('cursor', 0) + await self._send_to_client(client_id, { + "type": "replay", + "event": event_data, + "seq": seq, + }) # If session was resumed with pending messages or was executing, restart processing if session._was_resumed and (not session._inbox.empty() or session._is_executing): @@ -1576,6 +1624,8 @@ async def _send_to_client(self, client_id: str, data: Dict[str, Any]) -> None: cursor = session.add_event(event) # Add cursor to the data BEFORE sending data["cursor"] = cursor + # Add top-level sequence number for integrity checking + data["seq"] = cursor # Send ONCE with cursor already attached if applicable await ws.send_json(data) From ae2928c8a3687b17626698c73ec4b3557ec692a6 Mon Sep 17 00:00:00 2001 From: "praisonai-triage-agent[bot]" <272766704+praisonai-triage-agent[bot]@users.noreply.github.com> Date: Mon, 22 Jun 2026 09:32:31 +0000 Subject: [PATCH 2/2] fix: address reviewer feedback on gateway integrity checks - Add input validation for 'since' cursor to prevent type errors - Include token_stream and tool_call_stream events in seq stamping - Clarify get_oldest_cursor() semantics with documentation - Document resume_or_create_session() replay_events caveat Co-authored-by: Mervin Praison --- src/praisonai/praisonai/gateway/server.py | 33 ++++++++++++++++++++--- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/src/praisonai/praisonai/gateway/server.py b/src/praisonai/praisonai/gateway/server.py index e1521f608..a5b3aab4c 100644 --- a/src/praisonai/praisonai/gateway/server.py +++ b/src/praisonai/praisonai/gateway/server.py @@ -128,7 +128,12 @@ def add_event(self, event: GatewayEvent) -> int: return self._event_cursor def get_oldest_cursor(self) -> int: - """Get the oldest event cursor still retained in the buffer.""" + """Get the oldest event cursor still retained in the buffer. + + When the buffer is empty, returns the current cursor position, + which correctly indicates that any cursor < _event_cursor would + require resync (since no events are retained). + """ if self._events: return self._events[0].data.get('cursor', self._event_cursor) return self._event_cursor @@ -1342,7 +1347,18 @@ async def _handle_client_message(self, client_id: str, data: Dict[str, Any]) -> # Support reconnection with existing session session_id = data.get("session_id") # Optional: existing session to resume - since_cursor = data.get("since") # Optional: cursor for event replay + # Parse and validate the since parameter + since_raw = data.get("since") # Optional: cursor for event replay + since_cursor = None + if since_raw is not None: + try: + since_cursor = int(since_raw) + except (TypeError, ValueError): + await self._send_to_client(client_id, { + "type": "error", + "message": "Invalid 'since' cursor. Must be an integer.", + }) + return # Resume or create session session, replay_events = self.resume_or_create_session( @@ -1610,7 +1626,14 @@ async def _send_to_client(self, client_id: str, data: Dict[str, Any]) -> None: if ws: try: # Track event in session BEFORE sending if it's a response or important event - if data.get("type") in ["response", "message", "stream_end", "error"]: + if data.get("type") in [ + "response", + "message", + "stream_end", + "error", + "token_stream", + "tool_call_stream", + ]: session_id = self._client_sessions.get(client_id) if session_id: session = self._sessions.get(session_id) @@ -1915,7 +1938,9 @@ def resume_or_create_session( Returns: Tuple of (session, replay_events) where replay_events are events - that occurred after since_cursor + that occurred after since_cursor. Note: Callers must check + session.check_resync_required(since_cursor) before using replay_events, + as the events may not include the full gap if buffer was trimmed. """ replay_events = []