Skip to content

Commit 462eb3c

Browse files
authored
feat: Implementation of DefaultRequestHandlerV2 (#933)
This pull request introduces a significant refactoring of the agent execution layer, implementing an ActiveTask system and a DefaultRequestHandlerV2 to better manage task lifecycles, concurrency, and event streaming. Fixes #869
1 parent 15fb9b7 commit 462eb3c

14 files changed

Lines changed: 4932 additions & 21 deletions

src/a2a/server/agent_execution/active_task.py

Lines changed: 629 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import logging
5+
6+
from typing import TYPE_CHECKING
7+
8+
9+
if TYPE_CHECKING:
10+
from a2a.server.agent_execution.agent_executor import AgentExecutor
11+
from a2a.server.context import ServerCallContext
12+
from a2a.server.tasks.push_notification_sender import PushNotificationSender
13+
from a2a.server.tasks.task_store import TaskStore
14+
15+
from a2a.server.agent_execution.active_task import ActiveTask
16+
from a2a.server.tasks.task_manager import TaskManager
17+
18+
19+
logger = logging.getLogger(__name__)
20+
21+
22+
class ActiveTaskRegistry:
23+
"""A registry for active ActiveTask instances."""
24+
25+
def __init__(
26+
self,
27+
agent_executor: AgentExecutor,
28+
task_store: TaskStore,
29+
push_sender: PushNotificationSender | None = None,
30+
):
31+
self._agent_executor = agent_executor
32+
self._task_store = task_store
33+
self._push_sender = push_sender
34+
self._active_tasks: dict[str, ActiveTask] = {}
35+
self._lock = asyncio.Lock()
36+
self._cleanup_tasks: set[asyncio.Task[None]] = set()
37+
38+
async def get_or_create(
39+
self,
40+
task_id: str,
41+
call_context: ServerCallContext,
42+
context_id: str | None = None,
43+
create_task_if_missing: bool = False,
44+
) -> ActiveTask:
45+
"""Retrieves an existing ActiveTask or creates a new one."""
46+
async with self._lock:
47+
if task_id in self._active_tasks:
48+
return self._active_tasks[task_id]
49+
50+
task_manager = TaskManager(
51+
task_id=task_id,
52+
context_id=context_id,
53+
task_store=self._task_store,
54+
initial_message=None,
55+
context=call_context,
56+
)
57+
58+
active_task = ActiveTask(
59+
agent_executor=self._agent_executor,
60+
task_id=task_id,
61+
task_manager=task_manager,
62+
push_sender=self._push_sender,
63+
on_cleanup=self._on_active_task_cleanup,
64+
)
65+
self._active_tasks[task_id] = active_task
66+
67+
await active_task.start(
68+
call_context=call_context,
69+
create_task_if_missing=create_task_if_missing,
70+
)
71+
return active_task
72+
73+
def _on_active_task_cleanup(self, active_task: ActiveTask) -> None:
74+
"""Called by ActiveTask when it's finished and has no subscribers."""
75+
logger.debug('Active task %s cleanup scheduled', active_task.task_id)
76+
task = asyncio.create_task(self._remove_task(active_task.task_id))
77+
self._cleanup_tasks.add(task)
78+
task.add_done_callback(self._cleanup_tasks.discard)
79+
80+
async def _remove_task(self, task_id: str) -> None:
81+
async with self._lock:
82+
self._active_tasks.pop(task_id, None)
83+
logger.debug('Removed active task for %s from registry', task_id)
84+
85+
async def get(self, task_id: str) -> ActiveTask | None:
86+
"""Retrieves an existing task."""
87+
async with self._lock:
88+
return self._active_tasks.get(task_id)

src/a2a/server/agent_execution/agent_executor.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from abc import ABC, abstractmethod
22

33
from a2a.server.agent_execution.context import RequestContext
4-
from a2a.server.events.event_queue import EventQueue
4+
from a2a.server.events.event_queue_v2 import EventQueue
55

66

77
class AgentExecutor(ABC):
@@ -23,6 +23,18 @@ async def execute(
2323
return once the agent's execution for this request is complete or
2424
yields control (e.g., enters an input-required state).
2525
26+
TODO: Document request lifecycle and AgentExecutor responsibilities:
27+
- Should not close the event_queue.
28+
- Guarantee single execution per request (no concurrent execution).
29+
- Throwing exception will result in TaskState.TASK_STATE_ERROR (CHECK!)
30+
- Once call is completed it should not access context or event_queue
31+
- Before completing the call it SHOULD update task status to terminal or interrupted state.
32+
- Explain AUTH_REQUIRED workflow.
33+
- Explain INPUT_REQUIRED workflow.
34+
- Explain how cancelation work (executor task will be canceled, cancel() is called, order of calls, etc)
35+
- Explain if execute can wait for cancel and if cancel can wait for execute.
36+
- Explain behaviour of streaming / not-immediate when execute() returns in active state.
37+
2638
Args:
2739
context: The request context containing the message, task ID, etc.
2840
event_queue: The queue to publish events to.
@@ -38,6 +50,10 @@ async def cancel(
3850
in the context and publish a `TaskStatusUpdateEvent` with state
3951
`TaskState.TASK_STATE_CANCELED` to the `event_queue`.
4052
53+
TODO: Document cancelation workflow.
54+
- What if TaskState.TASK_STATE_CANCELED is not set by cancel() ?
55+
- How it can interact with execute() ?
56+
4157
Args:
4258
context: The request context containing the task ID to cancel.
4359
event_queue: The queue to publish the cancellation status update to.

src/a2a/server/agent_execution/context.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ def current_task(self) -> Task | None:
120120
return self._current_task
121121

122122
@current_task.setter
123-
def current_task(self, task: Task) -> None:
123+
def current_task(self, task: Task | None) -> None:
124124
"""Sets the current task object."""
125125
self._current_task = task
126126

src/a2a/server/events/event_queue_v2.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,11 @@ class EventQueueSource(EventQueue):
2828
in `_incoming_queue` and distributed to all child Sinks by a background dispatcher task.
2929
"""
3030

31-
def __init__(self, max_queue_size: int = DEFAULT_MAX_QUEUE_SIZE) -> None:
31+
def __init__(
32+
self,
33+
max_queue_size: int = DEFAULT_MAX_QUEUE_SIZE,
34+
create_default_sink: bool = True,
35+
) -> None:
3236
"""Initializes the EventQueueSource."""
3337
if max_queue_size <= 0:
3438
raise ValueError('max_queue_size must be greater than 0')
@@ -41,10 +45,15 @@ def __init__(self, max_queue_size: int = DEFAULT_MAX_QUEUE_SIZE) -> None:
4145
self._is_closed = False
4246

4347
# Internal sink for backward compatibility
44-
self._default_sink = EventQueueSink(
45-
parent=self, max_queue_size=max_queue_size
46-
)
47-
self._sinks.add(self._default_sink)
48+
self._default_sink: EventQueueSink | None
49+
if create_default_sink:
50+
self._default_sink = EventQueueSink(
51+
parent=self, max_queue_size=max_queue_size
52+
)
53+
self._sinks.add(self._default_sink)
54+
else:
55+
self._default_sink = None
56+
4857
self._dispatcher_task = asyncio.create_task(self._dispatch_loop())
4958

5059
self._dispatcher_task_expected_to_cancel = False
@@ -54,6 +63,8 @@ def __init__(self, max_queue_size: int = DEFAULT_MAX_QUEUE_SIZE) -> None:
5463
@property
5564
def queue(self) -> AsyncQueue[Event]:
5665
"""Returns the underlying asyncio.Queue of the default sink."""
66+
if self._default_sink is None:
67+
raise ValueError('No default sink available.')
5768
return self._default_sink.queue
5869

5970
async def _dispatch_loop(self) -> None:
@@ -183,10 +194,14 @@ async def enqueue_event(self, event: Event) -> None:
183194

184195
async def dequeue_event(self) -> Event:
185196
"""Dequeues an event from the default internal sink queue."""
197+
if self._default_sink is None:
198+
raise ValueError('No default sink available.')
186199
return await self._default_sink.dequeue_event()
187200

188201
def task_done(self) -> None:
189202
"""Signals that a formerly enqueued task is complete via the default internal sink queue."""
203+
if self._default_sink is None:
204+
raise ValueError('No default sink available.')
190205
self._default_sink.task_done()
191206

192207
async def close(self, immediate: bool = False) -> None:

src/a2a/server/request_handlers/__init__.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
import logging
44

55
from a2a.server.request_handlers.default_request_handler import (
6-
DefaultRequestHandler,
6+
LegacyRequestHandler,
7+
)
8+
from a2a.server.request_handlers.default_request_handler_v2 import (
9+
DefaultRequestHandlerV2,
710
)
811
from a2a.server.request_handlers.request_handler import (
912
RequestHandler,
@@ -40,11 +43,15 @@ def __init__(self, *args, **kwargs):
4043
) from _original_error
4144

4245

46+
DefaultRequestHandler = DefaultRequestHandlerV2
47+
4348
__all__ = [
4449
'DefaultGrpcServerCallContextBuilder',
4550
'DefaultRequestHandler',
51+
'DefaultRequestHandlerV2',
4652
'GrpcHandler',
4753
'GrpcServerCallContextBuilder',
54+
'LegacyRequestHandler',
4855
'RequestHandler',
4956
'build_error_response',
5057
'prepare_response_object',

src/a2a/server/request_handlers/default_request_handler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@
7474

7575

7676
@trace_class(kind=SpanKind.SERVER)
77-
class DefaultRequestHandler(RequestHandler):
77+
class LegacyRequestHandler(RequestHandler):
7878
"""Default request handler for all incoming requests.
7979
8080
This handler provides default implementations for all A2A JSON-RPC methods,

0 commit comments

Comments
 (0)