Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions samples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# A2A Python SDK — Samples

This directory contains runnable examples demonstrating how to build and interact with an A2A-compliant agent using the Python SDK.

## Contents

| File | Role | Description |
|---|---|---|
| `hello_world_agent.py` | **Server** | A2A agent server |
| `cli.py` | **Client** | Interactive terminal client |

The samples are designed to work together out of the box: the agent listens on `http://127.0.0.1:41241`, which is the default URL used by the client.
---

## `hello_world_agent.py` — Agent Server

Implements an A2A agent that responds to simple greeting messages (e.g., "hello", "how are you", "bye") with text replies, simulating a 1-second processing delay.

Demonstrates:
- Subclassing `AgentExecutor` and implementing `execute()` / `cancel()`
- Publishing streaming status updates and artifacts via `TaskUpdater`
- Exposing all three transports in both protocol versions (v1.0 and v0.3 compat) simultaneously:
- **JSON-RPC** (v1.0 and v0.3) at `http://127.0.0.1:41241/a2a/jsonrpc`
- **HTTP+JSON (REST)** (v1.0 and v0.3) at `http://127.0.0.1:41241/a2a/rest`
- **gRPC v1.0** on port `50051`
- **gRPC v0.3 (compat)** on port `50052`
- Serving the agent card at `http://127.0.0.1:41241/.well-known/agent-card.json`

**Run:**

```bash
uv run python samples/hello_world_agent.py
```

---

## `cli.py` — Client

An interactive terminal client with full visibility into the streaming event flow. Each `TaskStatusUpdate` and `TaskArtifactUpdate` event is printed as it arrives.

Features:
- Transport selection via `--transport` flag (`JSONRPC`, `HTTP+JSON`, `GRPC`)
- Session management (`context_id` persisted across messages, `task_id` per task)
- Graceful error handling for HTTP and gRPC failures

**Run:**

```bash
# Connect to the local hello_world_agent (default):
uv run python samples/cli.py

# Connect to a different URL, using gRPC:
uv run python samples/cli.py --url http://192.168.1.10:41241 --transport GRPC
```

Then type a message like `hello` and press Enter.

Type `/quit` or `/exit` to stop, or press `Ctrl+C`.
71 changes: 40 additions & 31 deletions samples/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,42 +13,51 @@
from a2a.types import Message, Part, Role, SendMessageRequest, TaskState


async def _handle_stream(
async def _handle_stream( # noqa: PLR0912
stream: Any, current_task_id: str | None
) -> str | None:
async for event, task in stream:
if not task:
continue
async for event in stream:
if event.HasField('message'):
print('Message:', end=' ')
for part in event.message.parts:
if part.text:
print(part.text, end=' ')
print()
return None

if not current_task_id:
current_task_id = task.id

if event:
if event.HasField('status_update'):
state_name = TaskState.Name(event.status_update.status.state)
print(f'TaskStatusUpdate [state={state_name}]:', end=' ')
if event.status_update.status.HasField('message'):
for part in event.status_update.status.message.parts:
if part.text:
print(part.text, end=' ')
print()

if (
event.status_update.status.state
== TaskState.TASK_STATE_COMPLETED
):
current_task_id = None
print('--- Task Completed ---')

elif event.HasField('artifact_update'):
print(
f'TaskArtifactUpdate [name={event.artifact_update.artifact.name}]:',
end=' ',
)
for part in event.artifact_update.artifact.parts:
if event.HasField('task'):
current_task_id = event.task.id
print('--- Task Started ---')
print(f'Task [state={TaskState.Name(event.task.status.state)}]')
else:
raise ValueError(f'Unexpected first event: {event}')

if event.HasField('status_update'):
state_name = TaskState.Name(event.status_update.status.state)
print(f'TaskStatusUpdate [state={state_name}]:', end=' ')
if event.status_update.status.HasField('message'):
for part in event.status_update.status.message.parts:
if part.text:
print(part.text, end=' ')
print()

print()
if state_name in (
'TASK_STATE_COMPLETED',
'TASK_STATE_FAILED',
'TASK_STATE_CANCELED',
'TASK_STATE_REJECTED',
):
current_task_id = None
print('--- Task Finished ---')
elif event.HasField('artifact_update'):
print(
f'TaskArtifactUpdate [name={event.artifact_update.artifact.name}]:',
end=' ',
)
for part in event.artifact_update.artifact.parts:
if part.text:
print(part.text, end=' ')
print()
return current_task_id


Expand Down
12 changes: 12 additions & 0 deletions samples/hello_world_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
AgentProvider,
AgentSkill,
Part,
Task,
TaskState,
TaskStatus,
a2a_pb2_grpc,
)

Expand Down Expand Up @@ -75,6 +78,15 @@ async def execute(
context_id,
)

await event_queue.enqueue_event(
Task(
id=task_id,
context_id=context_id,
status=TaskStatus(state=TaskState.TASK_STATE_SUBMITTED),
history=[user_message],
)
)

updater = TaskUpdater(
event_queue=event_queue,
task_id=task_id,
Expand Down
Loading