-
Notifications
You must be signed in to change notification settings - Fork 428
Expand file tree
/
Copy pathagent_executor.py
More file actions
82 lines (68 loc) · 4.08 KB
/
agent_executor.py
File metadata and controls
82 lines (68 loc) · 4.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from abc import ABC, abstractmethod
from a2a.server.agent_execution.context import RequestContext
from a2a.server.events.event_queue_v2 import EventQueue
class AgentExecutor(ABC):
"""Agent Executor interface.
Implementations of this interface contain the core logic of the agent,
executing tasks based on requests and publishing updates to an event queue.
"""
@abstractmethod
async def execute(
self, context: RequestContext, event_queue: EventQueue
) -> None:
"""Execute the agent's logic for a given request context.
The agent should read necessary information from the `context` and
publish `Task` or `Message` events, or `TaskStatusUpdateEvent` /
`TaskArtifactUpdateEvent` to the `event_queue`. This method should
return once the agent's execution for this request is complete or
yields control (e.g., enters an input-required state).
Request Lifecycle & AgentExecutor Responsibilities:
- **Concurrency**: The framework guarantees single execution per request;
`execute()` will not be called concurrently for the same request context.
- **Exception Handling**: Unhandled exceptions raised by `execute()` will be
caught by the framework and result in the task transitioning to
`TaskState.TASK_STATE_ERROR`.
- **Post-Completion**: Once `execute()` completes (returns or raises), the
executor must not access the `context` or `event_queue` anymore.
- **Terminal States**: Before completing the call normally, the executor
SHOULD publish a `TaskStatusUpdateEvent` to transition the task to a
terminal state (e.g., `TASK_STATE_COMPLETED`) or an interrupted state
(`TASK_STATE_INPUT_REQUIRED` or `TASK_STATE_AUTH_REQUIRED`).
- **Interrupted Workflows**:
- `TASK_STATE_INPUT_REQUIRED`: The executor publishes a `TaskStatusUpdateEvent` with
`TaskState.TASK_STATE_INPUT_REQUIRED` and returns to yield control.
The request will resume once user input is provided.
- `TASK_STATE_AUTH_REQUIRED`: There are in-bound and out-of-bound auth models.
In both scenarios, the agent publishes a `TaskStatusUpdateEvent` with
`TaskState.TASK_STATE_AUTH_REQUIRED`.
- In-bound: The agent should return from `execute()`. The framework will
call `execute()` again once the user response is received.
- Out-of-bound: The agent should not return from `execute()`. It should wait
for the out-of-band auth provider to complete the authentication and then
continue execution.
- **Cancellation Workflow**: When a cancellation request is received, the
async task running `execute()` is cancelled (raising an `asyncio.CancelledError`),
and `cancel()` is explicitly called by the framework.
Allowed Workflows:
- Immediate response: Enqueue a SINGLE `Message` object.
- Asynchronous/Long-running: Enqueue a `Task` object, perform work, and emit
multiple `TaskStatusUpdateEvent` / `TaskArtifactUpdateEvent` objects over time.
Note that the framework waits with response to the send_message request with
`return_immediately=True` parameter until the first event (Message or Task)
is enqueued by AgentExecutor.
Args:
context: The request context containing the message, task ID, etc.
event_queue: The queue to publish events to.
"""
@abstractmethod
async def cancel(
self, context: RequestContext, event_queue: EventQueue
) -> None:
"""Request the agent to cancel an ongoing task.
The agent should attempt to stop the task identified by the task_id
in the context and publish a `TaskStatusUpdateEvent` with state
`TaskState.TASK_STATE_CANCELED` to the `event_queue`.
Args:
context: The request context containing the task ID to cancel.
event_queue: The queue to publish the cancellation status update to.
"""