Skip to content

Commit 3f1c842

Browse files
Merge branch '1.0-dev' into guglielmoc/refactor_utils_and_helpers
2 parents d31107d + 1863359 commit 3f1c842

7 files changed

Lines changed: 182 additions & 22 deletions

File tree

.github/workflows/itk.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,4 @@ jobs:
2828
run: bash run_itk.sh
2929
working-directory: itk
3030
env:
31-
A2A_SAMPLES_REVISION: itk-v.0.11-alpha
31+
A2A_SAMPLES_REVISION: itk-v.015-alpha

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,4 @@ docs/ai/ai_learnings.md
1818
itk/a2a-samples/
1919
itk/pyproto/
2020
itk/instruction.proto
21+
itk/logs/

itk/README.md

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ You must set the `A2A_SAMPLES_REVISION` environment variable to specify which re
3636

3737
Example:
3838
```bash
39-
export A2A_SAMPLES_REVISION=itk-v.0.11-alpha
39+
export A2A_SAMPLES_REVISION=itk-v.015-alpha
4040
```
4141

4242
### 2. Execute Tests
@@ -52,3 +52,22 @@ The script will:
5252
- Checkout the specified revision.
5353
- Build the ITK service Docker image.
5454
- Run the tests and output results.
55+
56+
## Debugging
57+
58+
To enable debug logging and persist logs for inspection:
59+
60+
1. Set the `ITK_LOG_LEVEL` environment variable to `DEBUG`:
61+
```bash
62+
export ITK_LOG_LEVEL=DEBUG
63+
```
64+
2. Run the test script:
65+
```bash
66+
./run_itk.sh
67+
```
68+
69+
When run in `DEBUG` mode:
70+
- The `logs/` directory will be created in this directory (if it doesn't exist).
71+
- The `logs/` directory will be mounted to the container.
72+
- The test execution will produce detailed logs in `logs/` (e.g., `agent_current.log`).
73+
- The `logs/` directory will **not** be removed during cleanup.

itk/main.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import asyncio
33
import base64
44
import logging
5+
import os
56
import uuid
67

78
import grpc
@@ -36,7 +37,8 @@
3637
from a2a.utils import TransportProtocol
3738

3839

39-
logging.basicConfig(level=logging.INFO)
40+
log_level = os.environ.get('ITK_LOG_LEVEL', 'INFO').upper()
41+
logging.basicConfig(level=log_level)
4042
logger = logging.getLogger(__name__)
4143

4244

@@ -352,8 +354,9 @@ async def main_async(http_port: int, grpc_port: int) -> None:
352354
grpc_port,
353355
)
354356

357+
uvicorn_log_level = os.environ.get('ITK_LOG_LEVEL', 'INFO').lower()
355358
config = uvicorn.Config(
356-
app, host='127.0.0.1', port=http_port, log_level='info'
359+
app, host='127.0.0.1', port=http_port, log_level=uvicorn_log_level
357360
)
358361
uvicorn_server = uvicorn.Server(config)
359362

itk/run_itk.sh

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
#!/bin/bash
22
set -ex
33

4+
# Set default log level
5+
export ITK_LOG_LEVEL="${ITK_LOG_LEVEL:-INFO}"
6+
47
# Initialize default exit code
58
RESULT=1
69

@@ -63,9 +66,21 @@ ITK_DIR=$(pwd)
6366
# Stop existing container if any
6467
docker rm -f itk-service || true
6568

69+
# Create logs directory if debug
70+
if [ "${ITK_LOG_LEVEL^^}" = "DEBUG" ]; then
71+
mkdir -p "$ITK_DIR/logs"
72+
fi
73+
74+
DOCKER_MOUNT_LOGS=""
75+
if [ "${ITK_LOG_LEVEL^^}" = "DEBUG" ]; then
76+
DOCKER_MOUNT_LOGS="-v $ITK_DIR/logs:/app/logs"
77+
fi
78+
6679
docker run -d --name itk-service \
6780
-v "$A2A_PYTHON_ROOT:/app/agents/repo" \
6881
-v "$ITK_DIR:/app/agents/repo/itk" \
82+
$DOCKER_MOUNT_LOGS \
83+
-e ITK_LOG_LEVEL="$ITK_LOG_LEVEL" \
6984
-p 8000:8000 \
7085
itk_service
7186

src/a2a/server/agent_execution/agent_executor.py

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,43 @@ async def execute(
2323
return once the agent's execution for this request is complete or
2424
yields control (e.g., enters an input-required state).
2525
26-
TODO: Document request lifecycle and AgentExecutor responsibilities:
27-
- Should not close the event_queue.
28-
- Guarantee single execution per request (no concurrent execution).
29-
- Throwing exception will result in TaskState.TASK_STATE_ERROR (CHECK!)
30-
- Once call is completed it should not access context or event_queue
31-
- Before completing the call it SHOULD update task status to terminal or interrupted state.
32-
- Explain AUTH_REQUIRED workflow.
33-
- Explain INPUT_REQUIRED workflow.
34-
- Explain how cancelation work (executor task will be canceled, cancel() is called, order of calls, etc)
35-
- Explain if execute can wait for cancel and if cancel can wait for execute.
36-
- Explain behaviour of streaming / not-immediate when execute() returns in active state.
37-
- Possible workflows:
38-
- Enqueue a SINGLE Message object
39-
- Enqueue TaskStatusUpdateEvent (TASK_STATE_SUBMITTED or TASK_STATE_REJECTED) and continue with TaskStatusUpdateEvent / TaskArtifactUpdateEvent.
26+
Request Lifecycle & AgentExecutor Responsibilities:
27+
- **Concurrency**: The framework guarantees single execution per request;
28+
`execute()` will not be called concurrently for the same request context.
29+
- **Exception Handling**: Unhandled exceptions raised by `execute()` will be
30+
caught by the framework and result in the task transitioning to
31+
`TaskState.TASK_STATE_ERROR`.
32+
- **Post-Completion**: Once `execute()` completes (returns or raises), the
33+
executor must not access the `context` or `event_queue` anymore.
34+
- **Terminal States**: Before completing the call normally, the executor
35+
SHOULD publish a `TaskStatusUpdateEvent` to transition the task to a
36+
terminal state (e.g., `TASK_STATE_COMPLETED`) or an interrupted state
37+
(`TASK_STATE_INPUT_REQUIRED` or `TASK_STATE_AUTH_REQUIRED`).
38+
- **Interrupted Workflows**:
39+
- `TASK_STATE_INPUT_REQUIRED`: The executor publishes a `TaskStatusUpdateEvent` with
40+
`TaskState.TASK_STATE_INPUT_REQUIRED` and returns to yield control.
41+
The request will resume once user input is provided.
42+
- `TASK_STATE_AUTH_REQUIRED`: There are in-bound and out-of-bound auth models.
43+
In both scenarios, the agent publishes a `TaskStatusUpdateEvent` with
44+
`TaskState.TASK_STATE_AUTH_REQUIRED`.
45+
- In-bound: The agent should return from `execute()`. The framework will
46+
call `execute()` again once the user response is received.
47+
- Out-of-bound: The agent should not return from `execute()`. It should wait
48+
for the out-of-band auth provider to complete the authentication and then
49+
continue execution.
50+
51+
- **Cancellation Workflow**: When a cancellation request is received, the
52+
async task running `execute()` is cancelled (raising an `asyncio.CancelledError`),
53+
and `cancel()` is explicitly called by the framework.
54+
55+
Allowed Workflows:
56+
- Immediate response: Enqueue a SINGLE `Message` object.
57+
- Asynchronous/Long-running: Enqueue a `Task` object, perform work, and emit
58+
multiple `TaskStatusUpdateEvent` / `TaskArtifactUpdateEvent` objects over time.
59+
60+
Note that the framework waits with response to the send_message request with
61+
`return_immediately=True` parameter until the first event (Message or Task)
62+
is enqueued by AgentExecutor.
4063
4164
Args:
4265
context: The request context containing the message, task ID, etc.
@@ -53,10 +76,6 @@ async def cancel(
5376
in the context and publish a `TaskStatusUpdateEvent` with state
5477
`TaskState.TASK_STATE_CANCELED` to the `event_queue`.
5578
56-
TODO: Document cancelation workflow.
57-
- What if TaskState.TASK_STATE_CANCELED is not set by cancel() ?
58-
- How it can interact with execute() ?
59-
6079
Args:
6180
context: The request context containing the task ID to cancel.
6281
event_queue: The queue to publish the cancellation status update to.

tests/integration/test_scenarios.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,22 @@ def agent_card():
113113
)
114114

115115

116+
def get_task_id(event):
117+
if event.HasField('task'):
118+
return event.task.id
119+
if event.HasField('status_update'):
120+
return event.status_update.task_id
121+
assert False, f'Event {event} has no task_id'
122+
123+
124+
def get_task_context_id(event):
125+
if event.HasField('task'):
126+
return event.task.context_id
127+
if event.HasField('status_update'):
128+
return event.status_update.context_id
129+
assert False, f'Event {event} has no context_id'
130+
131+
116132
def get_state(event):
117133
if event.HasField('task'):
118134
return event.task.status.state
@@ -1265,6 +1281,93 @@ async def cancel(
12651281
)
12661282

12671283

1284+
# Scenario: Auth required and in channel unblocking
1285+
@pytest.mark.timeout(2.0)
1286+
@pytest.mark.asyncio
1287+
@pytest.mark.parametrize('use_legacy', [False, True], ids=['v2', 'legacy'])
1288+
@pytest.mark.parametrize(
1289+
'streaming', [False, True], ids=['blocking', 'streaming']
1290+
)
1291+
async def test_scenario_auth_required_in_channel(use_legacy, streaming):
1292+
class AuthAgent(AgentExecutor):
1293+
async def execute(
1294+
self, context: RequestContext, event_queue: EventQueue
1295+
):
1296+
message = context.message
1297+
if message and message.parts and message.parts[0].text == 'start':
1298+
await event_queue.enqueue_event(
1299+
TaskStatusUpdateEvent(
1300+
task_id=context.task_id,
1301+
context_id=context.context_id,
1302+
status=TaskStatus(
1303+
state=TaskState.TASK_STATE_AUTH_REQUIRED
1304+
),
1305+
)
1306+
)
1307+
elif (
1308+
message
1309+
and message.parts
1310+
and message.parts[0].text == 'credentials'
1311+
):
1312+
await event_queue.enqueue_event(
1313+
TaskStatusUpdateEvent(
1314+
task_id=context.task_id,
1315+
context_id=context.context_id,
1316+
status=TaskStatus(state=TaskState.TASK_STATE_COMPLETED),
1317+
)
1318+
)
1319+
else:
1320+
raise ValueError(f'Unexpected message {message}')
1321+
1322+
async def cancel(
1323+
self, context: RequestContext, event_queue: EventQueue
1324+
):
1325+
pass
1326+
1327+
handler = create_handler(AuthAgent(), use_legacy)
1328+
client = await create_client(
1329+
handler, agent_card=agent_card(), streaming=streaming
1330+
)
1331+
1332+
msg1 = Message(
1333+
message_id='msg-start', role=Role.ROLE_USER, parts=[Part(text='start')]
1334+
)
1335+
1336+
it = client.send_message(
1337+
SendMessageRequest(
1338+
message=msg1,
1339+
configuration=SendMessageConfiguration(return_immediately=False),
1340+
)
1341+
)
1342+
1343+
events1 = [event async for event in it]
1344+
assert [get_state(event) for event in events1] == [
1345+
TaskState.TASK_STATE_AUTH_REQUIRED,
1346+
]
1347+
task_id = get_task_id(events1[0])
1348+
context_id = get_task_context_id(events1[0])
1349+
1350+
# Now send another message with credentials
1351+
msg2 = Message(
1352+
task_id=task_id,
1353+
context_id=context_id,
1354+
message_id='msg-creds',
1355+
role=Role.ROLE_USER,
1356+
parts=[Part(text='credentials')],
1357+
)
1358+
1359+
it2 = client.send_message(
1360+
SendMessageRequest(
1361+
message=msg2,
1362+
configuration=SendMessageConfiguration(return_immediately=False),
1363+
)
1364+
)
1365+
1366+
assert [get_state(event) async for event in it2] == [
1367+
TaskState.TASK_STATE_COMPLETED,
1368+
]
1369+
1370+
12681371
# Scenario: Parallel subscribe attach detach
12691372
# Migrated from: test_parallel_subscribe_attach_detach in test_handler_comparison
12701373
@pytest.mark.timeout(5.0)

0 commit comments

Comments
 (0)