fix: gateway session data loss and restore deleted _delivery module#2102
Conversation
- Resume gateway sessions from the latest session_data snapshot instead of the initial empty one written at session creation - Persist active sessions on gateway stop() via close_session() - Restore bots/_delivery.py accidentally removed in #2054; imports for DurableDelivery and DurableAdapterMixin were still live - Add regression tests for resume, shutdown persist, and delivery imports Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com>
|
@coderabbitai review |
|
/review |
Qodo reviews are paused for this user.Troubleshooting steps vary by plan Learn more → On a Teams plan? Using GitHub Enterprise Server, GitLab Self-Managed, or Bitbucket Data Center? |
✅ Action performedReview finished.
|
|
Important Review skippedBot user detected. To trigger a single review, invoke the ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughAdds a new ChangesDurable Bot Delivery Module
Gateway Session Persistence Fixes and Tests
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@copilot Do a thorough review of this PR. Read ALL existing reviewer comments above from Qodo, Coderabbit, and Gemini first — incorporate their findings. Review areas:
|
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
src/praisonai/praisonai/bots/_delivery.py (1)
247-254: ⚡ Quick winRate limiter not using per-channel limiting.
The
RateLimiter.acquire()method supports an optionalchannel_idparameter for per-channel throttling, but_acquire_rate_limitdoesn't pass it. This reduces effectiveness—multiple sends to the same channel won't respect per-channel delays documented in the rate limiter.♻️ Suggested fix
- async def _acquire_rate_limit(self) -> None: - """Acquire rate limit token if platform needs it.""" + async def _acquire_rate_limit(self, channel_id: Optional[str] = None) -> None: + """Acquire rate limit token if platform needs it. + + Args: + channel_id: Optional channel ID for per-channel limiting + """ if self._rate_limiter is None: # Use platform-specific rate limiter self._rate_limiter = RateLimiter.for_platform(self.bot.platform) if self._rate_limiter: - await self._rate_limiter.acquire() + await self._rate_limiter.acquire(channel_id)Then update call sites to pass
channel_id:- await self._acquire_rate_limit() + await self._acquire_rate_limit(channel_id)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/praisonai/praisonai/bots/_delivery.py` around lines 247 - 254, The _acquire_rate_limit method is not leveraging the per-channel throttling capability of the rate limiter. Modify the _acquire_rate_limit method to accept a channel_id parameter and pass it to the self._rate_limiter.acquire() call as an optional argument. Then update all call sites that invoke _acquire_rate_limit to provide the appropriate channel_id value so that the rate limiter can properly enforce per-channel delays as documented.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/praisonai/praisonai/bots/_delivery.py`:
- Around line 182-191: The message editing logic performs a redundant final edit
when the streaming loop completes successfully with the full text content. To
fix this, introduce a boolean flag (such as is_complete) that tracks whether the
streaming loop in the edit_message flow successfully sent all content (when
next_pos >= len(text) evaluates to true). Then, before the final edit operation
that occurs around lines 220-230 (after the main streaming loop ends), check
this flag and only proceed with the edit if the content was not already fully
sent during the loop. This prevents wasting an API call when the complete text
has already been delivered to the Discord message.
- Around line 87-89: The fire-and-forget task created by
asyncio.create_task(self._show_typing(channel_id)) lacks a stored reference,
risking garbage collection before completion and swallowing exceptions. Store
the task reference returned by asyncio.create_task in a task set or collection
maintained by the class to track background tasks, ensuring proper lifecycle
management and debugging capability. Make sure to add appropriate cleanup logic
to remove completed tasks from the tracking collection to prevent memory leaks.
In `@src/praisonai/tests/unit/gateway/test_gateway_persistence_fixes.py`:
- Around line 49-72: The test_stop_persists_active_sessions test calls await
gw.stop() while _is_running is False, causing the stop method to return early
without executing session close persistence logic. Additionally, the test reuses
the same gateway instance after stop, so in-memory state masks whether
persistence actually worked. Fix this by ensuring the gateway is marked as
running before calling stop (set _is_running to True), and then create a
completely new gateway instance that loads from the persisted session store to
verify the data was truly persisted and not just held in memory.
---
Nitpick comments:
In `@src/praisonai/praisonai/bots/_delivery.py`:
- Around line 247-254: The _acquire_rate_limit method is not leveraging the
per-channel throttling capability of the rate limiter. Modify the
_acquire_rate_limit method to accept a channel_id parameter and pass it to the
self._rate_limiter.acquire() call as an optional argument. Then update all call
sites that invoke _acquire_rate_limit to provide the appropriate channel_id
value so that the rate limiter can properly enforce per-channel delays as
documented.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 5838afd3-8488-4fb2-b364-b9b1237bb6cc
📒 Files selected for processing (3)
src/praisonai/praisonai/bots/_delivery.pysrc/praisonai/praisonai/gateway/server.pysrc/praisonai/tests/unit/gateway/test_gateway_persistence_fixes.py
| # Show typing indicator if supported | ||
| if typing and caps.supports_typing: | ||
| asyncio.create_task(self._show_typing(channel_id)) |
There was a problem hiding this comment.
Store a reference to the typing indicator task.
The fire-and-forget task created here has no stored reference, which risks garbage collection before completion and swallows any exceptions. While typing indicator failure is non-critical, storing the reference enables proper lifecycle management and debugging.
🛠️ Suggested fix
Add a task set to track background tasks and ensure proper cleanup:
class UnifiedDelivery:
def __init__(self, bot: "BotProtocol"):
self.bot = bot
self._capabilities: Optional["PlatformCapabilities"] = None
self._rate_limiter: Optional[RateLimiter] = None
self._streamer: Optional[DraftStreamer] = None
+ self._background_tasks: set[asyncio.Task] = set() # Show typing indicator if supported
if typing and caps.supports_typing:
- asyncio.create_task(self._show_typing(channel_id))
+ task = asyncio.create_task(self._show_typing(channel_id))
+ self._background_tasks.add(task)
+ task.add_done_callback(self._background_tasks.discard)🧰 Tools
🪛 Ruff (0.15.17)
[warning] 89-89: Store a reference to the return value of asyncio.create_task
(RUF006)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/praisonai/praisonai/bots/_delivery.py` around lines 87 - 89, The
fire-and-forget task created by
asyncio.create_task(self._show_typing(channel_id)) lacks a stored reference,
risking garbage collection before completion and swallowing exceptions. Store
the task reference returned by asyncio.create_task in a task set or collection
maintained by the class to track background tasks, ensuring proper lifecycle
management and debugging capability. Make sure to add appropriate cleanup logic
to remove completed tasks from the tracking collection to prevent memory leaks.
Source: Linters/SAST tools
| await self.bot.edit_message( | ||
| channel_id, | ||
| message.message_id, | ||
| display_text, | ||
| ) | ||
|
|
||
| if next_pos >= len(text): | ||
| break # Complete text has been sent | ||
|
|
||
| current_pos = next_pos |
There was a problem hiding this comment.
Redundant final edit when streaming completes successfully.
When the streaming loop completes with next_pos >= len(text) (line 188), the message has already been edited with the complete text. However, if the text fits within max_message_length, lines 220-230 perform another edit with the same content. This wastes an API call and could unnecessarily trigger rate limits.
♻️ Suggested fix
Track whether the loop completed with the full content and skip the redundant edit:
+ streaming_completed = False
# Stream partial content
while current_pos < len(text):
# ... existing loop code ...
if next_pos >= len(text):
+ streaming_completed = True
break # Complete text has been sent
current_pos = next_pos
# If text is too long and couldn't fit in one message, chunk it
if _calculate_length(text, caps.length_unit) > caps.max_message_length:
# ... existing chunking code ...
- else:
+ elif not streaming_completed:
# Final edit with complete content (fits in limit)
await asyncio.sleep(caps.edit_interval_ms / 1000.0)
# ... rest of else block ...Also applies to: 220-230
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/praisonai/praisonai/bots/_delivery.py` around lines 182 - 191, The
message editing logic performs a redundant final edit when the streaming loop
completes successfully with the full text content. To fix this, introduce a
boolean flag (such as is_complete) that tracks whether the streaming loop in the
edit_message flow successfully sent all content (when next_pos >= len(text)
evaluates to true). Then, before the final edit operation that occurs around
lines 220-230 (after the main streaming loop ends), check this flag and only
proceed with the edit if the content was not already fully sent during the loop.
This prevents wasting an API call when the complete text has already been
delivered to the Discord message.
| async def test_stop_persists_active_sessions(self, tmp_path): | ||
| gw = _make_gateway(tmp_path) | ||
| session_id = "sess-stop-1" | ||
|
|
||
| session = gw.create_session("agent-1", "client-1", session_id) | ||
| session.add_message( | ||
| GatewayMessage( | ||
| content="Shutdown test message", | ||
| sender_id="user", | ||
| session_id=session_id, | ||
| ) | ||
| ) | ||
|
|
||
| await gw.stop() | ||
|
|
||
| store = gw._session_store | ||
| assert store is not None | ||
| assert store.session_exists(session_id) | ||
|
|
||
| resumed = gw.create_session("agent-1", "client-2", session_id) | ||
| messages = resumed.get_messages() | ||
|
|
||
| assert len(messages) == 1 | ||
| assert messages[0].content == "Shutdown test message" |
There was a problem hiding this comment.
test_stop_persists_active_sessions can pass without exercising stop-time persistence
Line 62 calls await gw.stop() while _is_running is still False, so stop() returns early and never runs session close persistence. The test then reuses the same gateway instance, so in-memory state can hide persistence regressions.
Suggested test hardening
@@
async def test_stop_persists_active_sessions(self, tmp_path):
gw = _make_gateway(tmp_path)
session_id = "sess-stop-1"
@@
- await gw.stop()
+ # Exercise the actual shutdown branch in stop()
+ gw._is_running = True
+ await gw.stop()
@@
- resumed = gw.create_session("agent-1", "client-2", session_id)
+ # Rehydrate from storage, not in-memory state
+ gw2 = _make_gateway(tmp_path)
+ resumed = gw2.create_session("agent-1", "client-2", session_id)
messages = resumed.get_messages()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def test_stop_persists_active_sessions(self, tmp_path): | |
| gw = _make_gateway(tmp_path) | |
| session_id = "sess-stop-1" | |
| session = gw.create_session("agent-1", "client-1", session_id) | |
| session.add_message( | |
| GatewayMessage( | |
| content="Shutdown test message", | |
| sender_id="user", | |
| session_id=session_id, | |
| ) | |
| ) | |
| await gw.stop() | |
| store = gw._session_store | |
| assert store is not None | |
| assert store.session_exists(session_id) | |
| resumed = gw.create_session("agent-1", "client-2", session_id) | |
| messages = resumed.get_messages() | |
| assert len(messages) == 1 | |
| assert messages[0].content == "Shutdown test message" | |
| async def test_stop_persists_active_sessions(self, tmp_path): | |
| gw = _make_gateway(tmp_path) | |
| session_id = "sess-stop-1" | |
| session = gw.create_session("agent-1", "client-1", session_id) | |
| session.add_message( | |
| GatewayMessage( | |
| content="Shutdown test message", | |
| sender_id="user", | |
| session_id=session_id, | |
| ) | |
| ) | |
| # Exercise the actual shutdown branch in stop() | |
| gw._is_running = True | |
| await gw.stop() | |
| store = gw._session_store | |
| assert store is not None | |
| assert store.session_exists(session_id) | |
| # Rehydrate from storage, not in-memory state | |
| gw2 = _make_gateway(tmp_path) | |
| resumed = gw2.create_session("agent-1", "client-2", session_id) | |
| messages = resumed.get_messages() | |
| assert len(messages) == 1 | |
| assert messages[0].content == "Shutdown test message" |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/praisonai/tests/unit/gateway/test_gateway_persistence_fixes.py` around
lines 49 - 72, The test_stop_persists_active_sessions test calls await gw.stop()
while _is_running is False, causing the stop method to return early without
executing session close persistence logic. Additionally, the test reuses the
same gateway instance after stop, so in-memory state masks whether persistence
actually worked. Fix this by ensuring the gateway is marked as running before
calling stop (set _is_running to True), and then create a completely new gateway
instance that loads from the persisted session store to verify the data was
truly persisted and not just held in memory.
|
@claude You are the FINAL architecture reviewer. If the branch is under MervinPraison/PraisonAI (not a fork), you are able to make modifications to this branch and push directly. SCOPE: Focus ONLY on Python packages (praisonaiagents, praisonai). Do NOT modify praisonai-rust or praisonai-ts. Read ALL comments above from Gemini, Qodo, CodeRabbit, and Copilot carefully before responding. Phase 1: Review per AGENTS.md
Phase 2: FIX Valid Issues Phase 3: Final Verdict |
|
Claude finished @MervinPraison's task —— View job 🔍 Final Architecture ReviewTodo List:
Phase 1: Architecture Review✅ Protocol-driven design: The implementation follows the correct pattern - the _delivery module provides heavy implementation in the wrapper layer (praisonai/), not in the core SDK. ✅ Backward compatibility: The restored _delivery module maintains all previously exported functions, ensuring no regressions. ✅ Performance: No hot-path regressions detected. The gateway fixes improve correctness without adding overhead. Phase 2: Fixed Valid Issues✅ Fixed all valid issues from reviewers:
Phase 3: Final Verdict✅ APPROVED - Ready to MergeThis PR correctly fixes three critical bugs:
All architectural principles are followed, backward compatibility is maintained, and reviewer feedback has been addressed. The changes are minimal, focused, and include appropriate tests. Files Modified:
Skipped Changes:
|
- Store reference to typing indicator background task to prevent GC - Fix redundant final edit when streaming completes successfully - Fix test_stop_persists_active_sessions to properly test persistence - Add channel_id parameter to rate limiter for per-channel throttling Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com>
Qodo reviews are paused for this user.Troubleshooting steps vary by plan Learn more → On a Teams plan? Using GitHub Enterprise Server, GitLab Self-Managed, or Bitbucket Data Center? |
Greptile SummaryThis PR fixes three regressions: session history was silently discarded on reconnect due to a
Confidence Score: 4/5Safe to merge — all three fixes are minimal and targeted, none of the changes widen any code path in a way that could introduce new failures. The session-resume and shutdown-persistence fixes in server.py are correct: iterating without break picks the last snapshot as intended, and routing through close_session() in stop() properly persists state. The restored _delivery.py satisfies all existing imports. The only open items are a missing deliver_chunked entry in the TYPE_CHECKING block (affects static analysis only, not runtime) and a missing trailing newline — neither affects correctness or runtime behavior. No files require special attention; the two minor style issues are in bots/init.py and bots/_delivery.py. Important Files Changed
Sequence Diagram%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant C as Client
participant GW as WebSocketGateway
participant SS as SessionStore
Note over GW: Session Create
GW->>SS: "add_message(role=system, session_data=empty_snapshot)"
Note over C,GW: Client sends messages
C->>GW: message
GW->>GW: session.add_message()
Note over GW: Session Close
GW->>GW: close_session(session_id)
GW->>SS: "add_message(role=system, session_data=full_snapshot)"
Note over C,GW: Client reconnects
C->>GW: join with session_id
GW->>SS: get_session(session_id)
SS-->>GW: messages [system(empty), ..., system(full)]
Note over GW: OLD: break on first - empty snapshot
Note over GW: NEW: iterate all - last snapshot wins
GW->>GW: GatewaySession.from_dict(full_snapshot)
GW-->>C: "joined (resumed=true, history intact)"
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant C as Client
participant GW as WebSocketGateway
participant SS as SessionStore
Note over GW: Session Create
GW->>SS: "add_message(role=system, session_data=empty_snapshot)"
Note over C,GW: Client sends messages
C->>GW: message
GW->>GW: session.add_message()
Note over GW: Session Close
GW->>GW: close_session(session_id)
GW->>SS: "add_message(role=system, session_data=full_snapshot)"
Note over C,GW: Client reconnects
C->>GW: join with session_id
GW->>SS: get_session(session_id)
SS-->>GW: messages [system(empty), ..., system(full)]
Note over GW: OLD: break on first - empty snapshot
Note over GW: NEW: iterate all - last snapshot wins
GW->>GW: GatewaySession.from_dict(full_snapshot)
GW-->>C: "joined (resumed=true, history intact)"
Reviews (1): Last reviewed commit: "fix: address code review feedback from r..." | Re-trigger Greptile |
| "DurableDelivery", | ||
| ] No newline at end of file |
There was a problem hiding this comment.
The file is missing a trailing newline. Most Python linters (flake8 W292, ruff) and many editors flag this, and it can cause noisy diffs when content is appended later.
| "DurableDelivery", | |
| ] | |
| "DurableDelivery", | |
| ] |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
…2102) * fix: gateway session data loss and restore deleted _delivery module - Resume gateway sessions from the latest session_data snapshot instead of the initial empty one written at session creation - Persist active sessions on gateway stop() via close_session() - Restore bots/_delivery.py accidentally removed in #2054; imports for DurableDelivery and DurableAdapterMixin were still live - Add regression tests for resume, shutdown persist, and delivery imports Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com> * fix: address code review feedback from reviewers - Store reference to typing indicator background task to prevent GC - Fix redundant final edit when streaming completes successfully - Fix test_stop_persists_active_sessions to properly test persistence - Add channel_id parameter to rate limiter for per-channel throttling Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com> --------- Co-authored-by: Cursor Agent <cursoragent@cursor.com> Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com> Co-authored-by: praisonai-triage-agent[bot] <272766704+praisonai-triage-agent[bot]@users.noreply.github.com>
Summary
Critical bug scan found three high-severity regressions in recent commits. This PR applies minimal fixes with regression tests.
Bugs fixed
1. Gateway session resume drops conversation history (data loss)
Impact: With session persistence enabled, every disconnect → reconnect cycle restored an empty session instead of the latest conversation state.
Root cause:
create_session()wrote an initial emptysession_datasystem message on create and a full snapshot on close. Resume logic usedbreakon the first matching system message, always selecting the empty initial snapshot.Fix: Iterate all system messages and keep the latest
session_datasnapshot.2. Gateway
stop()loses in-flight sessions (data loss)Impact: Graceful shutdown (SIGTERM /
stop()) closed sessions in memory without persisting them, unlike WebSocket disconnect which callsclose_session().Root cause:
stop()calledsession.close()directly instead ofclose_session(session_id).Fix: Call
close_session()for each active session during shutdown.3. Missing
bots/_delivery.pycauses import crashesImpact:
from praisonai.bots import DurableDelivery,DurableAdapterMixin, ordeliver_chunkedraisedModuleNotFoundError.Root cause: PR #2054 removed
_delivery.pyas "dead code", but_durable_adapter.pyandbots/__init__.pystill import from it.Fix: Restore
_delivery.pyfrom pre-removal history.Validation
tests/unit/gateway/test_gateway_persistence_fixes.pySummary by CodeRabbit
New Features
Bug Fixes
Tests