From d83a7fc7dec2ac719e9d5f557a940f8c32a25ead Mon Sep 17 00:00:00 2001 From: Lukasz Kawka Date: Tue, 30 Sep 2025 09:56:22 +0000 Subject: [PATCH 1/7] feat: push notifications e2e tests --- tests/e2e/push_notifications/agent_app.py | 145 ++++++++++++ .../push_notifications/notifications_app.py | 47 ++++ .../test_default_push_notification_support.py | 220 ++++++++++++++++++ tests/e2e/push_notifications/utils.py | 46 ++++ 4 files changed, 458 insertions(+) create mode 100644 tests/e2e/push_notifications/agent_app.py create mode 100644 tests/e2e/push_notifications/notifications_app.py create mode 100644 tests/e2e/push_notifications/test_default_push_notification_support.py create mode 100644 tests/e2e/push_notifications/utils.py diff --git a/tests/e2e/push_notifications/agent_app.py b/tests/e2e/push_notifications/agent_app.py new file mode 100644 index 000000000..10fae2d98 --- /dev/null +++ b/tests/e2e/push_notifications/agent_app.py @@ -0,0 +1,145 @@ +import httpx + +from fastapi import FastAPI + +from a2a.server.agent_execution import AgentExecutor, RequestContext +from a2a.server.apps import A2ARESTFastAPIApplication +from a2a.server.events import EventQueue +from a2a.server.request_handlers import DefaultRequestHandler +from a2a.server.tasks import ( + BasePushNotificationSender, + InMemoryPushNotificationConfigStore, + InMemoryTaskStore, + TaskUpdater, +) +from a2a.types import ( + AgentCapabilities, + AgentCard, + AgentSkill, + InvalidParamsError, + Message, + Task, +) +from a2a.utils import ( + new_agent_text_message, + new_task, +) +from a2a.utils.errors import ServerError + + +def test_agent_card(url: str) -> AgentCard: + """Returns an agent card for the test agent.""" + return AgentCard( + name='Test Agent', + description='Just a test agent', + url=url, + version='1.0.0', + default_input_modes=['text'], + default_output_modes=['text'], + capabilities=AgentCapabilities(streaming=True, push_notifications=True), + skills=[ + AgentSkill( + id='greeting', + name='Greeting Agent', + description='just greats the user', + tags=['greeting'], + examples=['Hello Agent!', 'How are you?'], + ) + ], + supports_authenticated_extended_card=True, + ) + + +class TestAgent: + """Agent for push notification testing.""" + + async def invoke( + self, updater: TaskUpdater, msg: Message, task: Task + ) -> None: + # Fail for unsupported messages. + if ( + not msg.parts + or len(msg.parts) != 1 + or msg.parts[0].root.kind != 'text' + ): + await updater.failed( + new_agent_text_message( + 'Unsupported mesesage.', task.context_id, task.id + ) + ) + return + text_message = msg.parts[0].root.text + + # Simple request-response flow. + if text_message == 'Hello Agent!': + await updater.complete( + new_agent_text_message('Hello User!', task.context_id, task.id) + ) + + # Flow with user input required: "How are you?" -> "Good! How are you?" -> "Good" -> "Amazing". + elif text_message == 'How are you?': + await updater.requires_input( + new_agent_text_message( + 'Good! How are you?', task.context_id, task.id + ) + ) + elif text_message == 'Good': + await updater.complete( + new_agent_text_message('Amazing', task.context_id, task.id) + ) + + # Fail for unsupported messages. + else: + await updater.failed( + new_agent_text_message( + 'Unsupported message.', task.context_id, task.id + ) + ) + + +class TestAgentExecutor(AgentExecutor): + """Test AgentExecutor implementation.""" + + def __init__(self) -> None: + self.agent = TestAgent() + + async def execute( + self, + context: RequestContext, + event_queue: EventQueue, + ) -> None: + if not context.message: + raise ServerError(error=InvalidParamsError(message='No message')) + + task = context.current_task + if not task: + task = new_task(context.message) + await event_queue.enqueue_event(task) + updater = TaskUpdater(event_queue, task.id, task.context_id) + + await self.agent.invoke(updater, context.message, task) + + async def cancel( + self, context: RequestContext, event_queue: EventQueue + ) -> None: + raise Exception('cancel not supported') + + +def create_agent_app( + url: str, notification_client: httpx.AsyncClient +) -> FastAPI: + """Creates a new HTTP+REST FastAPI application for the test agent.""" + push_config_store = InMemoryPushNotificationConfigStore() + app = A2ARESTFastAPIApplication( + agent_card=test_agent_card(url), + http_handler=DefaultRequestHandler( + agent_executor=TestAgentExecutor(), + task_store=InMemoryTaskStore(), + push_config_store=push_config_store, + push_sender=BasePushNotificationSender( + httpx_client=notification_client, + config_store=push_config_store, + ), + ), + ) + return app.build() diff --git a/tests/e2e/push_notifications/notifications_app.py b/tests/e2e/push_notifications/notifications_app.py new file mode 100644 index 000000000..3a25153b1 --- /dev/null +++ b/tests/e2e/push_notifications/notifications_app.py @@ -0,0 +1,47 @@ +import asyncio + +from typing import Annotated + +from fastapi import FastAPI, Path, Request + + +def create_notifications_app() -> FastAPI: + """Creates a simple push notification injesting HTTP+REST application.""" + app = FastAPI() + store_lock = asyncio.Lock() + store = {} + + @app.post('/notifications') + async def add_notification(request: Request): + """Endpoint for injesting notifications from agents. It receives a JSON + payload and stores it in-memory. + """ + if not request.headers.get('x-a2a-notification-token'): + raise ValueError('Missing x-a2a-notification-token header.') + payload = await request.json() + task_id = payload['id'] + async with store_lock: + if task_id not in store: + store[task_id] = [] + store[task_id].append(payload) + return { + 'status': 'received', + } + + @app.get('/tasks/{task_id}/notifications') + async def list_notifications_by_task( + task_id: Annotated[ + str, Path(title='The ID of the task to list the notifications for.') + ], + ): + """Helper endpoint for retrieving injested notifications for a given task.""" + async with store_lock: + notifications = store.get(task_id, []) + return {'notifications': notifications} + + @app.get('/health') + def health_check(): + """Helper endpoint for checking if the server is up.""" + return {'status': 'ok'} + + return app diff --git a/tests/e2e/push_notifications/test_default_push_notification_support.py b/tests/e2e/push_notifications/test_default_push_notification_support.py new file mode 100644 index 000000000..0215552af --- /dev/null +++ b/tests/e2e/push_notifications/test_default_push_notification_support.py @@ -0,0 +1,220 @@ +import asyncio +import time +import uuid + +from multiprocessing import Lock + +import httpx +import pytest +import pytest_asyncio + +from agent_app import create_agent_app +from notifications_app import create_notifications_app +from utils import ( + create_app_process, + find_free_port, + wait_for_server_ready, +) + + +@pytest.fixture(scope='session') +def port_lock(): + """Multiprocessing lock for acquiring available ephemeral ports.""" + return Lock() + + +@pytest.fixture(scope='module') +def notifications_server(port_lock): + """ + Starts a simple push notifications injesting server and yields its URL. + """ + with port_lock: + host = '127.0.0.1' + port = find_free_port() + url = f'http://{host}:{port}' + + process = create_app_process(create_notifications_app(), host, port) + process.start() + try: + wait_for_server_ready(f'{url}/health') + except TimeoutError as e: + process.terminate() + raise e + + yield url + + process.terminate() + process.join() + + +@pytest_asyncio.fixture(scope='module') +async def notifications_client(): + """An async client fixture for calling the notifications server.""" + async with httpx.AsyncClient() as client: + yield client + + +@pytest.fixture(scope='module') +def agent_server( + port_lock, + notifications_client: httpx.AsyncClient, +): + """Starts a test agent server and yields its URL.""" + with port_lock: + host = '127.0.0.1' + port = find_free_port() + url = f'http://{host}:{port}' + + process = create_app_process( + create_agent_app(url, notifications_client), host, port + ) + process.start() + try: + wait_for_server_ready(f'{url}/v1/card') + except TimeoutError as e: + process.terminate() + raise e + + yield url + + process.terminate() + process.join() + + +@pytest_asyncio.fixture(scope='function') +async def http_client(): + """An async client fixture for test functions.""" + async with httpx.AsyncClient() as client: + yield client + + +@pytest.mark.asyncio +async def test_notification_triggering_with_in_message_config_e2e( + notifications_server: str, agent_server: str, http_client: httpx.AsyncClient +): + """ + Tests push notification triggering for in-message push notification config. + """ + # Send a message with a push notification config. + response = await http_client.post( + f'{agent_server}/v1/message:send', + json={ + 'configuration': { + 'pushNotification': { + 'id': 'n-1', + 'url': f'{notifications_server}/notifications', + 'token': uuid.uuid4().hex, + }, + }, + 'request': { + 'messageId': 'r-1', + 'role': 'ROLE_USER', + 'content': [{'text': 'Hello Agent!'}], + }, + }, + ) + assert response.status_code == 200 + task_id = response.json()['task']['id'] + assert task_id is not None + + # Retrive and check notifcations. + notifications = await wait_for_n_notifications( + http_client, + f'{notifications_server}/tasks/{task_id}/notifications', + n=2, + ) + assert notifications[0]['status']['state'] == 'submitted' + assert notifications[1]['status']['state'] == 'completed' + + +@pytest.mark.asyncio +async def test_notification_triggering_after_config_change_e2e( + notifications_server: str, agent_server: str, http_client: httpx.AsyncClient +): + """ + Tests notification triggering after setting the push notificaiton config in a seperate call. + """ + # Send an initial message without the push notification config. + response = await http_client.post( + f'{agent_server}/v1/message:send', + json={ + 'request': { + 'messageId': 'r-1', + 'role': 'ROLE_USER', + 'content': [{'text': 'How are you?'}], + }, + }, + ) + assert response.status_code == 200 + assert response.json()['task']['id'] is not None + task_id = response.json()['task']['id'] + + # Get the task to make sure that further input is required. + response = await http_client.get(f'{agent_server}/v1/tasks/{task_id}') + assert response.status_code == 200 + assert response.json()['status']['state'] == 'TASK_STATE_INPUT_REQUIRED' + + # Set a push notification config. + response = await http_client.post( + f'{agent_server}/v1/tasks/{task_id}/pushNotificationConfigs', + json={ + 'parent': f'tasks/{task_id}', + 'configId': uuid.uuid4().hex, + 'config': { + 'name': 'test-config', + 'pushNotificationConfig': { + 'id': 'n-2', + 'url': f'{notifications_server}/notifications', + 'token': uuid.uuid4().hex, + }, + }, + }, + ) + assert response.status_code == 200 + + # Send a follow-up message that should trigger a push notification. + response = await http_client.post( + f'{agent_server}/v1/message:send', + json={ + 'request': { + 'taskId': task_id, + 'messageId': 'r-2', + 'role': 'ROLE_USER', + 'content': [{'text': 'Good'}], + }, + }, + ) + assert response.status_code == 200 + + # Retrive and check the notification. + notifications = await wait_for_n_notifications( + http_client, + f'{notifications_server}/tasks/{task_id}/notifications', + n=1, + ) + assert notifications[0]['status']['state'] == 'completed' + + +async def wait_for_n_notifications( + http_client: httpx.AsyncClient, + url: str, + n: int, + timeout: int = 3, +): + """ + Queries the notification URL until the desired number of notifications + is received or the timeout is reached. + """ + start_time = time.time() + notifications = [] + while True: + response = await http_client.get(url) + assert response.status_code == 200 + notifications = response.json()['notifications'] + if len(notifications) == n: + return notifications + if time.time() - start_time > timeout: + raise TimeoutError( + f'Notification retrival timed out. Got {len(notifications)} notifications, want {n}.' + ) + await asyncio.sleep(0.1) diff --git a/tests/e2e/push_notifications/utils.py b/tests/e2e/push_notifications/utils.py new file mode 100644 index 000000000..79271b71a --- /dev/null +++ b/tests/e2e/push_notifications/utils.py @@ -0,0 +1,46 @@ +import socket +import time + +from multiprocessing import Process + +import httpx +import uvicorn + + +def find_free_port(): + """Finds and returns an available ephemeral localhost port.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('127.0.0.1', 0)) + return s.getsockname()[1] + + +def run_server(app, host, port) -> None: + """Runs a uvicorn server.""" + uvicorn.run(app, host=host, port=port, log_level='warning') + + +def wait_for_server_ready(url: str, timeout: int = 10) -> None: + """Polls the provided URL endpoint until the server is up.""" + start_time = time.time() + while True: + try: + with httpx.Client() as client: + response = client.get(url) + if response.status_code == 200: + return + except httpx.ConnectError: + pass + if time.time() - start_time > timeout: + raise TimeoutError( + f'Server at {url} failed to start after {timeout}s' + ) + time.sleep(0.1) + + +def create_app_process(app, host, port) -> Process: + """Creates a separate process for a given application.""" + return Process( + target=run_server, + args=(app, host, port), + daemon=True, + ) From 30a10c75ba3b92f667c66b95508f75e88257d6d1 Mon Sep 17 00:00:00 2001 From: Lukasz Kawka Date: Wed, 1 Oct 2025 12:21:39 +0000 Subject: [PATCH 2/7] fix: issues flaged by gemini --- tests/e2e/push_notifications/agent_app.py | 6 +++--- tests/e2e/push_notifications/notifications_app.py | 15 +++++++++++---- .../test_default_push_notification_support.py | 2 +- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/tests/e2e/push_notifications/agent_app.py b/tests/e2e/push_notifications/agent_app.py index 10fae2d98..1fa9bc546 100644 --- a/tests/e2e/push_notifications/agent_app.py +++ b/tests/e2e/push_notifications/agent_app.py @@ -41,7 +41,7 @@ def test_agent_card(url: str) -> AgentCard: AgentSkill( id='greeting', name='Greeting Agent', - description='just greats the user', + description='just greets the user', tags=['greeting'], examples=['Hello Agent!', 'How are you?'], ) @@ -64,7 +64,7 @@ async def invoke( ): await updater.failed( new_agent_text_message( - 'Unsupported mesesage.', task.context_id, task.id + 'Unsupported message.', task.context_id, task.id ) ) return @@ -122,7 +122,7 @@ async def execute( async def cancel( self, context: RequestContext, event_queue: EventQueue ) -> None: - raise Exception('cancel not supported') + raise NotImplementedError('cancel not supported') def create_agent_app( diff --git a/tests/e2e/push_notifications/notifications_app.py b/tests/e2e/push_notifications/notifications_app.py index 3a25153b1..0d9d796bd 100644 --- a/tests/e2e/push_notifications/notifications_app.py +++ b/tests/e2e/push_notifications/notifications_app.py @@ -2,14 +2,14 @@ from typing import Annotated -from fastapi import FastAPI, Path, Request +from fastapi import FastAPI, Path, Request, HTTPException def create_notifications_app() -> FastAPI: """Creates a simple push notification injesting HTTP+REST application.""" app = FastAPI() store_lock = asyncio.Lock() - store = {} + store: dict[str, list] = {} @app.post('/notifications') async def add_notification(request: Request): @@ -17,9 +17,16 @@ async def add_notification(request: Request): payload and stores it in-memory. """ if not request.headers.get('x-a2a-notification-token'): - raise ValueError('Missing x-a2a-notification-token header.') + raise HTTPException( + status_code=400, + detail='Missing "x-a2a-notification-token" header.', + ) payload = await request.json() - task_id = payload['id'] + task_id = payload.get('id') + if not task_id: + raise HTTPException( + status_code=400, detail='Missing "id" in notification payload.' + ) async with store_lock: if task_id not in store: store[task_id] = [] diff --git a/tests/e2e/push_notifications/test_default_push_notification_support.py b/tests/e2e/push_notifications/test_default_push_notification_support.py index 0215552af..226136fe1 100644 --- a/tests/e2e/push_notifications/test_default_push_notification_support.py +++ b/tests/e2e/push_notifications/test_default_push_notification_support.py @@ -215,6 +215,6 @@ async def wait_for_n_notifications( return notifications if time.time() - start_time > timeout: raise TimeoutError( - f'Notification retrival timed out. Got {len(notifications)} notifications, want {n}.' + f'Notification retrieval timed out. Got {len(notifications)} notifications, want {n}.' ) await asyncio.sleep(0.1) From 6b6cc9c4ac7e607dcdbf62fc1059b9c4a594fbb1 Mon Sep 17 00:00:00 2001 From: Lukasz Kawka Date: Wed, 1 Oct 2025 12:42:15 +0000 Subject: [PATCH 3/7] fix: add uvicorn to dev dependecies This is required for e2e testing. --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 0d2cb75a5..192e2151e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -103,6 +103,7 @@ dev = [ "autoflake", "no_implicit_optional", "trio", + "uvicorn>=0.35.0", ] [[tool.uv.index]] From 852ef93bfd88fbe153128b6d93f37ecb65e28d06 Mon Sep 17 00:00:00 2001 From: Lukasz Kawka Date: Wed, 1 Oct 2025 13:10:54 +0000 Subject: [PATCH 4/7] fix: avoid reliance on notification order --- tests/e2e/push_notifications/notifications_app.py | 2 +- .../test_default_push_notification_support.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/e2e/push_notifications/notifications_app.py b/tests/e2e/push_notifications/notifications_app.py index 0d9d796bd..7d504d2f8 100644 --- a/tests/e2e/push_notifications/notifications_app.py +++ b/tests/e2e/push_notifications/notifications_app.py @@ -2,7 +2,7 @@ from typing import Annotated -from fastapi import FastAPI, Path, Request, HTTPException +from fastapi import FastAPI, HTTPException, Path, Request def create_notifications_app() -> FastAPI: diff --git a/tests/e2e/push_notifications/test_default_push_notification_support.py b/tests/e2e/push_notifications/test_default_push_notification_support.py index 226136fe1..b43b371d4 100644 --- a/tests/e2e/push_notifications/test_default_push_notification_support.py +++ b/tests/e2e/push_notifications/test_default_push_notification_support.py @@ -123,8 +123,9 @@ async def test_notification_triggering_with_in_message_config_e2e( f'{notifications_server}/tasks/{task_id}/notifications', n=2, ) - assert notifications[0]['status']['state'] == 'submitted' - assert notifications[1]['status']['state'] == 'completed' + states = [notification['status']['state'] for notification in notifications] + assert 'completed' in states + assert 'submitted' in states @pytest.mark.asyncio From a6a8c6a7887307d96295253c71650434626edc35 Mon Sep 17 00:00:00 2001 From: lkawka Date: Thu, 2 Oct 2025 14:09:43 +0000 Subject: [PATCH 5/7] Resolve Mike's comments. --- .../push_notifications/notifications_app.py | 2 +- .../test_default_push_notification_support.py | 192 ++++++++++-------- tests/e2e/push_notifications/utils.py | 5 +- 3 files changed, 114 insertions(+), 85 deletions(-) diff --git a/tests/e2e/push_notifications/notifications_app.py b/tests/e2e/push_notifications/notifications_app.py index 7d504d2f8..f8a60e10e 100644 --- a/tests/e2e/push_notifications/notifications_app.py +++ b/tests/e2e/push_notifications/notifications_app.py @@ -6,7 +6,7 @@ def create_notifications_app() -> FastAPI: - """Creates a simple push notification injesting HTTP+REST application.""" + """Creates a simple push notification ingesting HTTP+REST application.""" app = FastAPI() store_lock = asyncio.Lock() store: dict[str, list] = {} diff --git a/tests/e2e/push_notifications/test_default_push_notification_support.py b/tests/e2e/push_notifications/test_default_push_notification_support.py index b43b371d4..c134485ab 100644 --- a/tests/e2e/push_notifications/test_default_push_notification_support.py +++ b/tests/e2e/push_notifications/test_default_push_notification_support.py @@ -16,6 +16,23 @@ wait_for_server_ready, ) +from a2a.client import ( + ClientConfig, + ClientFactory, + minimal_agent_card, +) +from a2a.types import ( + Message, + Part, + PushNotificationConfig, + Role, + Task, + TaskPushNotificationConfig, + TaskState, + TextPart, + TransportProtocol, +) + @pytest.fixture(scope='session') def port_lock(): @@ -90,42 +107,50 @@ async def http_client(): @pytest.mark.asyncio async def test_notification_triggering_with_in_message_config_e2e( - notifications_server: str, agent_server: str, http_client: httpx.AsyncClient + notifications_server: str, + agent_server: str, + http_client: httpx.AsyncClient, ): """ Tests push notification triggering for in-message push notification config. """ - # Send a message with a push notification config. - response = await http_client.post( - f'{agent_server}/v1/message:send', - json={ - 'configuration': { - 'pushNotification': { - 'id': 'n-1', - 'url': f'{notifications_server}/notifications', - 'token': uuid.uuid4().hex, - }, - }, - 'request': { - 'messageId': 'r-1', - 'role': 'ROLE_USER', - 'content': [{'text': 'Hello Agent!'}], - }, - }, - ) - assert response.status_code == 200 - task_id = response.json()['task']['id'] - assert task_id is not None + # Create an A2A client with a push notification config. + a2a_client = ClientFactory( + ClientConfig( + supported_transports=[TransportProtocol.http_json], + push_notification_configs=[ + PushNotificationConfig( + id='in-message-config', + url=f'{notifications_server}/notifications', + token=uuid.uuid4().hex, + ) + ], + ) + ).create(minimal_agent_card(agent_server, [TransportProtocol.http_json])) + + # Send a message and extract the returned task. + responses = [ + response + async for response in a2a_client.send_message( + Message( + message_id='hello-agent', + parts=[Part(root=TextPart(text='Hello Agent!'))], + role=Role.user, + ) + ) + ] + assert len(responses) == 1 + assert isinstance(responses[0], tuple) + assert isinstance(responses[0][0], Task) + task = responses[0][0] - # Retrive and check notifcations. + # Verify a single notification was sent. notifications = await wait_for_n_notifications( http_client, - f'{notifications_server}/tasks/{task_id}/notifications', - n=2, + f'{notifications_server}/tasks/{task.id}/notifications', + n=1, ) - states = [notification['status']['state'] for notification in notifications] - assert 'completed' in states - assert 'submitted' in states + assert notifications[0].status.state == 'completed' @pytest.mark.asyncio @@ -135,65 +160,70 @@ async def test_notification_triggering_after_config_change_e2e( """ Tests notification triggering after setting the push notificaiton config in a seperate call. """ - # Send an initial message without the push notification config. - response = await http_client.post( - f'{agent_server}/v1/message:send', - json={ - 'request': { - 'messageId': 'r-1', - 'role': 'ROLE_USER', - 'content': [{'text': 'How are you?'}], - }, - }, + # Configure an A2A client without a push notification config. + a2a_client = ClientFactory( + ClientConfig( + supported_transports=[TransportProtocol.http_json], + ) + ).create(minimal_agent_card(agent_server, [TransportProtocol.http_json])) + + # Send a message and extract the returned task. + responses = [ + response + async for response in a2a_client.send_message( + Message( + message_id='how-are-you', + parts=[Part(root=TextPart(text='How are you?'))], + role=Role.user, + ) + ) + ] + assert len(responses) == 1 + assert isinstance(responses[0], tuple) + assert isinstance(responses[0][0], Task) + task = responses[0][0] + assert task.status.state == TaskState.input_required + + # Verify that no notification has been sent yet. + response = await http_client.get( + f'{notifications_server}/tasks/{task.id}/notifications' ) assert response.status_code == 200 - assert response.json()['task']['id'] is not None - task_id = response.json()['task']['id'] - - # Get the task to make sure that further input is required. - response = await http_client.get(f'{agent_server}/v1/tasks/{task_id}') - assert response.status_code == 200 - assert response.json()['status']['state'] == 'TASK_STATE_INPUT_REQUIRED' - - # Set a push notification config. - response = await http_client.post( - f'{agent_server}/v1/tasks/{task_id}/pushNotificationConfigs', - json={ - 'parent': f'tasks/{task_id}', - 'configId': uuid.uuid4().hex, - 'config': { - 'name': 'test-config', - 'pushNotificationConfig': { - 'id': 'n-2', - 'url': f'{notifications_server}/notifications', - 'token': uuid.uuid4().hex, - }, - }, - }, + assert len(response.json().get('notifications', [])) == 0 + + # Set the push notification config. + await a2a_client.set_task_callback( + TaskPushNotificationConfig( + task_id=task.id, + push_notification_config=PushNotificationConfig( + id='after-config-change', + url=f'{notifications_server}/notifications', + token=uuid.uuid4().hex, + ), + ) ) - assert response.status_code == 200 - # Send a follow-up message that should trigger a push notification. - response = await http_client.post( - f'{agent_server}/v1/message:send', - json={ - 'request': { - 'taskId': task_id, - 'messageId': 'r-2', - 'role': 'ROLE_USER', - 'content': [{'text': 'Good'}], - }, - }, - ) - assert response.status_code == 200 + # Send another message that should trigger a push notification. + responses = [ + response + async for response in a2a_client.send_message( + Message( + task_id=task.id, + message_id='good', + parts=[Part(root=TextPart(text='Good'))], + role=Role.user, + ) + ) + ] + assert len(responses) == 1 - # Retrive and check the notification. + # Verify that the push notification was sent. notifications = await wait_for_n_notifications( http_client, - f'{notifications_server}/tasks/{task_id}/notifications', + f'{notifications_server}/tasks/{task.id}/notifications', n=1, ) - assert notifications[0]['status']['state'] == 'completed' + assert notifications[0].status.state == 'completed' async def wait_for_n_notifications( @@ -201,7 +231,7 @@ async def wait_for_n_notifications( url: str, n: int, timeout: int = 3, -): +) -> list[Task]: """ Queries the notification URL until the desired number of notifications is received or the timeout is reached. @@ -213,9 +243,9 @@ async def wait_for_n_notifications( assert response.status_code == 200 notifications = response.json()['notifications'] if len(notifications) == n: - return notifications + return [Task.model_validate(n) for n in notifications] if time.time() - start_time > timeout: raise TimeoutError( - f'Notification retrieval timed out. Got {len(notifications)} notifications, want {n}.' + f'Notification retrieval timed out. Got {len(notifications)} notification(s), want {n}. Retrieved notifications: {notifications}.' ) await asyncio.sleep(0.1) diff --git a/tests/e2e/push_notifications/utils.py b/tests/e2e/push_notifications/utils.py index 79271b71a..01d84a30f 100644 --- a/tests/e2e/push_notifications/utils.py +++ b/tests/e2e/push_notifications/utils.py @@ -1,3 +1,4 @@ +import contextlib import socket import time @@ -23,13 +24,11 @@ def wait_for_server_ready(url: str, timeout: int = 10) -> None: """Polls the provided URL endpoint until the server is up.""" start_time = time.time() while True: - try: + with contextlib.suppress(httpx.ConnectError): with httpx.Client() as client: response = client.get(url) if response.status_code == 200: return - except httpx.ConnectError: - pass if time.time() - start_time > timeout: raise TimeoutError( f'Server at {url} failed to start after {timeout}s' From 78a58cf9fa394edb5ae4eb8e844026351c50d1ff Mon Sep 17 00:00:00 2001 From: lkawka Date: Fri, 3 Oct 2025 08:09:12 +0000 Subject: [PATCH 6/7] Verify that notification tokens were properly passed around --- .../push_notifications/notifications_app.py | 37 +++++++++++++------ .../test_default_push_notification_support.py | 20 ++++++---- 2 files changed, 39 insertions(+), 18 deletions(-) diff --git a/tests/e2e/push_notifications/notifications_app.py b/tests/e2e/push_notifications/notifications_app.py index f8a60e10e..ed032dcb5 100644 --- a/tests/e2e/push_notifications/notifications_app.py +++ b/tests/e2e/push_notifications/notifications_app.py @@ -3,34 +3,49 @@ from typing import Annotated from fastapi import FastAPI, HTTPException, Path, Request +from pydantic import BaseModel, ValidationError + +from a2a.types import Task + + +class Notification(BaseModel): + """Encapsulates default push notification data.""" + + task: Task + token: str def create_notifications_app() -> FastAPI: """Creates a simple push notification ingesting HTTP+REST application.""" app = FastAPI() store_lock = asyncio.Lock() - store: dict[str, list] = {} + store: dict[str, list[Notification]] = {} @app.post('/notifications') async def add_notification(request: Request): """Endpoint for injesting notifications from agents. It receives a JSON payload and stores it in-memory. """ - if not request.headers.get('x-a2a-notification-token'): + token = request.headers.get('x-a2a-notification-token') + if not token: raise HTTPException( status_code=400, detail='Missing "x-a2a-notification-token" header.', ) - payload = await request.json() - task_id = payload.get('id') - if not task_id: - raise HTTPException( - status_code=400, detail='Missing "id" in notification payload.' - ) + try: + task = Task.model_validate(await request.json()) + except ValidationError as e: + raise HTTPException(status_code=400, detail=str(e)) + async with store_lock: - if task_id not in store: - store[task_id] = [] - store[task_id].append(payload) + if task.id not in store: + store[task.id] = [] + store[task.id].append( + Notification( + task=task, + token=token, + ) + ) return { 'status': 'received', } diff --git a/tests/e2e/push_notifications/test_default_push_notification_support.py b/tests/e2e/push_notifications/test_default_push_notification_support.py index c134485ab..7ac41b5d2 100644 --- a/tests/e2e/push_notifications/test_default_push_notification_support.py +++ b/tests/e2e/push_notifications/test_default_push_notification_support.py @@ -9,7 +9,7 @@ import pytest_asyncio from agent_app import create_agent_app -from notifications_app import create_notifications_app +from notifications_app import Notification, create_notifications_app from utils import ( create_app_process, find_free_port, @@ -115,6 +115,7 @@ async def test_notification_triggering_with_in_message_config_e2e( Tests push notification triggering for in-message push notification config. """ # Create an A2A client with a push notification config. + token = uuid.uuid4().hex a2a_client = ClientFactory( ClientConfig( supported_transports=[TransportProtocol.http_json], @@ -122,7 +123,7 @@ async def test_notification_triggering_with_in_message_config_e2e( PushNotificationConfig( id='in-message-config', url=f'{notifications_server}/notifications', - token=uuid.uuid4().hex, + token=token, ) ], ) @@ -150,7 +151,9 @@ async def test_notification_triggering_with_in_message_config_e2e( f'{notifications_server}/tasks/{task.id}/notifications', n=1, ) - assert notifications[0].status.state == 'completed' + assert notifications[0].token == token + assert notifications[0].task.id == task.id + assert notifications[0].task.status.state == 'completed' @pytest.mark.asyncio @@ -192,13 +195,14 @@ async def test_notification_triggering_after_config_change_e2e( assert len(response.json().get('notifications', [])) == 0 # Set the push notification config. + token = uuid.uuid4().hex await a2a_client.set_task_callback( TaskPushNotificationConfig( task_id=task.id, push_notification_config=PushNotificationConfig( id='after-config-change', url=f'{notifications_server}/notifications', - token=uuid.uuid4().hex, + token=token, ), ) ) @@ -223,7 +227,9 @@ async def test_notification_triggering_after_config_change_e2e( f'{notifications_server}/tasks/{task.id}/notifications', n=1, ) - assert notifications[0].status.state == 'completed' + assert notifications[0].task.id == task.id + assert notifications[0].task.status.state == 'completed' + assert notifications[0].token == token async def wait_for_n_notifications( @@ -231,7 +237,7 @@ async def wait_for_n_notifications( url: str, n: int, timeout: int = 3, -) -> list[Task]: +) -> list[Notification]: """ Queries the notification URL until the desired number of notifications is received or the timeout is reached. @@ -243,7 +249,7 @@ async def wait_for_n_notifications( assert response.status_code == 200 notifications = response.json()['notifications'] if len(notifications) == n: - return [Task.model_validate(n) for n in notifications] + return [Notification.model_validate(n) for n in notifications] if time.time() - start_time > timeout: raise TimeoutError( f'Notification retrieval timed out. Got {len(notifications)} notification(s), want {n}. Retrieved notifications: {notifications}.' From 4a596b5a30cfb1f739480d7f9d00bf6e5655af9c Mon Sep 17 00:00:00 2001 From: lkawka Date: Fri, 3 Oct 2025 08:20:43 +0000 Subject: [PATCH 7/7] Remove port lock Since Python is single-threaded and the fixtures turning-up servers aren't async, we don't need the lock. --- .../test_default_push_notification_support.py | 63 ++++++++----------- 1 file changed, 25 insertions(+), 38 deletions(-) diff --git a/tests/e2e/push_notifications/test_default_push_notification_support.py b/tests/e2e/push_notifications/test_default_push_notification_support.py index 7ac41b5d2..775bd7fb8 100644 --- a/tests/e2e/push_notifications/test_default_push_notification_support.py +++ b/tests/e2e/push_notifications/test_default_push_notification_support.py @@ -2,8 +2,6 @@ import time import uuid -from multiprocessing import Lock - import httpx import pytest import pytest_asyncio @@ -34,29 +32,22 @@ ) -@pytest.fixture(scope='session') -def port_lock(): - """Multiprocessing lock for acquiring available ephemeral ports.""" - return Lock() - - @pytest.fixture(scope='module') -def notifications_server(port_lock): +def notifications_server(): """ Starts a simple push notifications injesting server and yields its URL. """ - with port_lock: - host = '127.0.0.1' - port = find_free_port() - url = f'http://{host}:{port}' - - process = create_app_process(create_notifications_app(), host, port) - process.start() - try: - wait_for_server_ready(f'{url}/health') - except TimeoutError as e: - process.terminate() - raise e + host = '127.0.0.1' + port = find_free_port() + url = f'http://{host}:{port}' + + process = create_app_process(create_notifications_app(), host, port) + process.start() + try: + wait_for_server_ready(f'{url}/health') + except TimeoutError as e: + process.terminate() + raise e yield url @@ -72,25 +63,21 @@ async def notifications_client(): @pytest.fixture(scope='module') -def agent_server( - port_lock, - notifications_client: httpx.AsyncClient, -): +def agent_server(notifications_client: httpx.AsyncClient): """Starts a test agent server and yields its URL.""" - with port_lock: - host = '127.0.0.1' - port = find_free_port() - url = f'http://{host}:{port}' + host = '127.0.0.1' + port = find_free_port() + url = f'http://{host}:{port}' - process = create_app_process( - create_agent_app(url, notifications_client), host, port - ) - process.start() - try: - wait_for_server_ready(f'{url}/v1/card') - except TimeoutError as e: - process.terminate() - raise e + process = create_app_process( + create_agent_app(url, notifications_client), host, port + ) + process.start() + try: + wait_for_server_ready(f'{url}/v1/card') + except TimeoutError as e: + process.terminate() + raise e yield url