Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 5 additions & 28 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,14 @@
"name": "Debug HelloWorld Agent",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/examples/helloworld/__main__.py",
"program": "${workspaceFolder}/samples/hello_world_agent.py",
"console": "integratedTerminal",
"justMyCode": false,
"python": "${workspaceFolder}/.venv/bin/python3",
"cwd": "${workspaceFolder}",
"env": {
"PYTHONPATH": "${workspaceFolder}"
},
"cwd": "${workspaceFolder}/examples/helloworld",
"args": [
"--host",
"localhost",
"--port",
"9999"
]
},
{
"name": "Debug Currency Agent",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/examples/langgraph/__main__.py",
"console": "integratedTerminal",
"justMyCode": false,
"env": {
"PYTHONPATH": "${workspaceFolder}"
},
"cwd": "${workspaceFolder}/examples/langgraph",
"args": [
"--host",
"localhost",
"--port",
"10000"
]
"PYTHONPATH": "${workspaceFolder}/src"
}
},
{
"name": "Pytest All",
Expand Down
60 changes: 60 additions & 0 deletions samples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# A2A Python SDK — Samples

This directory contains runnable examples demonstrating how to build and interact with an A2A-compliant agent using the Python SDK.

## Contents

| File | Role | Description |
|---|---|---|
| `hello_world_agent.py` | **Server** | A2A agent server |
| `cli.py` | **Client** | Interactive terminal client |

The samples are designed to work together out of the box: the agent listens on `http://127.0.0.1:41241`, which is the default URL used by the client.
Comment thread
sokoliva marked this conversation as resolved.
---

## `hello_world_agent.py` — Agent Server

Implements an A2A agent that responds to simple greeting messages (e.g., "hello", "how are you", "bye") with text replies, simulating a 1-second processing delay.

Demonstrates:
- Subclassing `AgentExecutor` and implementing `execute()` / `cancel()`
- Publishing streaming status updates and artifacts via `TaskUpdater`
- Exposing all three transports in both protocol versions (v1.0 and v0.3 compat) simultaneously:
- **JSON-RPC** (v1.0 and v0.3) at `http://127.0.0.1:41241/a2a/jsonrpc`
- **HTTP+JSON (REST)** (v1.0 and v0.3) at `http://127.0.0.1:41241/a2a/rest`
- **gRPC v1.0** on port `50051`
- **gRPC v0.3 (compat)** on port `50052`
- Serving the agent card at `http://127.0.0.1:41241/.well-known/agent-card.json`

**Run:**

```bash
uv run python samples/hello_world_agent.py
```

---

## `cli.py` — Client

An interactive terminal client with full visibility into the streaming event flow. Each `TaskStatusUpdate` and `TaskArtifactUpdate` event is printed as it arrives.

Features:
- Transport selection via `--transport` flag (`JSONRPC`, `HTTP+JSON`, `GRPC`)
- Session management (`context_id` persisted across messages, `task_id` per task)
- Graceful error handling for HTTP and gRPC failures

**Run:**

```bash
# Connect to the local hello_world_agent (default):
uv run python samples/cli.py

# Connect to a different URL, using gRPC:
uv run python samples/cli.py --url http://192.168.1.10:41241 --transport GRPC
```

Then type a message like `hello` and press Enter.

Type `/quit` or `/exit` to stop, or press `Ctrl+C`.


67 changes: 39 additions & 28 deletions samples/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,43 +11,52 @@

from a2a.client import A2ACardResolver, ClientConfig, create_client
from a2a.types import Message, Part, Role, SendMessageRequest, TaskState
from a2a.utils import get_artifact_text, get_message_text


async def _handle_stream(
stream: Any, current_task_id: str | None
) -> str | None:
async for event, task in stream:
if not task:
async for event in stream:
if event.HasField('message'):
print(f'Message: {get_message_text(event.message, delimiter=" ")}')
continue
if not current_task_id:
current_task_id = task.id

if event:
if event.HasField('status_update'):
state_name = TaskState.Name(event.status_update.status.state)
print(f'TaskStatusUpdate [state={state_name}]:', end=' ')
if event.status_update.status.HasField('message'):
for part in event.status_update.status.message.parts:
if part.text:
print(part.text, end=' ')
print()

if (
event.status_update.status.state
== TaskState.TASK_STATE_COMPLETED
):
current_task_id = None
print('--- Task Completed ---')

if not current_task_id:
# V2 handler emits Task or Message first.
if event.HasField('task'):
current_task_id = event.task.id
state_name = TaskState.Name(event.task.status.state)
print(f'Task [state={state_name}]')
# Legacy handler might not send a leading Task event.
elif event.HasField('status_update'):
current_task_id = event.status_update.task_id
elif event.HasField('artifact_update'):
print(
f'TaskArtifactUpdate [name={event.artifact_update.artifact.name}]:',
end=' ',
)
for part in event.artifact_update.artifact.parts:
if part.text:
print(part.text, end=' ')
current_task_id = event.artifact_update.artifact.task_id

if event.HasField('status_update'):
state_name = TaskState.Name(event.status_update.status.state)
print(f'TaskStatusUpdate [state={state_name}]:', end=' ')
if event.status_update.status.HasField('message'):
message = event.status_update.status.message
print(get_message_text(message, delimiter=' '))
else:
print()
if (
event.status_update.status.state
== TaskState.TASK_STATE_COMPLETED
):
current_task_id = None
print('--- Task Completed ---')

elif event.HasField('artifact_update'):
print(
f'TaskArtifactUpdate [name={event.artifact_update.artifact.name}]:',
end=' ',
)
print(
get_artifact_text(event.artifact_update.artifact, delimiter=' ')
)

return current_task_id

Expand All @@ -68,6 +77,8 @@ async def main() -> None:
config = ClientConfig()
if args.transport:
config.supported_protocol_bindings = [args.transport]
if args.transport == 'GRPC':
config.grpc_channel_factory = grpc.aio.insecure_channel

print(
f'Connecting to {args.url} (preferred transport: {args.transport or "Any"})'
Expand Down
23 changes: 22 additions & 1 deletion src/a2a/server/request_handlers/default_request_handler_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
SubscribeToTaskRequest,
Task,
TaskPushNotificationConfig,
TaskState,
TaskStatus,
TaskStatusUpdateEvent,
)
from a2a.utils.errors import (
Expand Down Expand Up @@ -302,16 +304,35 @@ async def on_message_send_stream( # noqa: D102
params: SendMessageRequest,
context: ServerCallContext,
) -> AsyncGenerator[Event, None]:
is_new_task = not params.message.task_id

active_task, request_context = await self._setup_active_task(
params, context
)

task_id = cast('str', request_context.task_id)
context_id = cast('str', request_context.context_id)
first_event = True

async for event in active_task.subscribe(
request=request_context,
include_initial_task=False,
):
if (
first_event
and is_new_task
and not isinstance(event, (Task, Message))
):
# Agent didn't emit a Task/Message first.
# The stream MUST begin with a Task or Message.
submitted_task = Task(
id=task_id,
context_id=context_id,
status=TaskStatus(state=TaskState.TASK_STATE_SUBMITTED),
history=[params.message],
)
Comment on lines +327 to +332
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task_manager is already covering this use case, we can use the task created and saved on the task store

async def ensure_task_id(self, task_id: str, context_id: str) -> Task:

yield apply_history_length(submitted_task, params.configuration)
first_event = False

if isinstance(event, Task):
self._validate_task_id_match(task_id, event.id)
yield apply_history_length(event, params.configuration)
Expand Down
Loading
Loading