-
Notifications
You must be signed in to change notification settings - Fork 429
fix(server): ensure DefaultRequestHandler cancels producer on blocking failure #645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
7cc5d03
a13055f
25e2100
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -312,6 +312,7 @@ async def on_message_send( | |||||||||||||
| blocking = False | ||||||||||||||
|
|
||||||||||||||
| interrupted_or_non_blocking = False | ||||||||||||||
| success = False | ||||||||||||||
| try: | ||||||||||||||
| # Create async callback for push notifications | ||||||||||||||
| async def push_notification_callback() -> None: | ||||||||||||||
|
|
@@ -327,6 +328,7 @@ async def push_notification_callback() -> None: | |||||||||||||
| blocking=blocking, | ||||||||||||||
| event_callback=push_notification_callback, | ||||||||||||||
| ) | ||||||||||||||
| success = True | ||||||||||||||
|
|
||||||||||||||
| except Exception: | ||||||||||||||
| logger.exception('Agent execution failed') | ||||||||||||||
|
|
@@ -339,7 +341,14 @@ async def push_notification_callback() -> None: | |||||||||||||
| cleanup_task.set_name(f'cleanup_producer:{task_id}') | ||||||||||||||
| self._track_background_task(cleanup_task) | ||||||||||||||
| else: | ||||||||||||||
| await self._cleanup_producer(producer_task, task_id) | ||||||||||||||
| # If we are blocking and not interrupted, but the result is not set | ||||||||||||||
| # (meaning exception or other failure), we should cancel the producer. | ||||||||||||||
| # 'result' (local var) is bound before this block if success. | ||||||||||||||
| # However, to be safe, we can check if successful using a flag. | ||||||||||||||
| cancel_producer = not success | ||||||||||||||
| await self._cleanup_producer( | ||||||||||||||
| producer_task, task_id, cancel=cancel_producer | ||||||||||||||
| ) | ||||||||||||||
|
|
||||||||||||||
| if not result: | ||||||||||||||
| raise ServerError(error=InternalError()) | ||||||||||||||
|
|
@@ -433,8 +442,11 @@ async def _cleanup_producer( | |||||||||||||
| self, | ||||||||||||||
| producer_task: asyncio.Task, | ||||||||||||||
| task_id: str, | ||||||||||||||
| cancel: bool = False, | ||||||||||||||
| ) -> None: | ||||||||||||||
| """Cleans up the agent execution task and queue manager entry.""" | ||||||||||||||
| if cancel: | ||||||||||||||
| producer_task.cancel() | ||||||||||||||
| await producer_task | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When
Suggested change
|
||||||||||||||
| await self._queue_manager.close(task_id) | ||||||||||||||
| async with self._running_agents_lock: | ||||||||||||||
|
|
||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic here is correct, but the comment is quite verbose and an intermediate variable is used. For improved readability and conciseness, you could combine the variable assignment with the function call and shorten the comment.