Skip to content

Commit c24ae05

Browse files
sokolivaCopilot
andauthored
fix(server): deliver push notifications across all owners (#1016)
# Description Fix a silent multi-tenant bug where push notifications were dropped for any deployment using authenticated users, and split the `PushNotificationConfigStore` read API into a user-callable owner-scoped read (`get_info`) and an internal cross-owner read for dispatch (`get_info_for_dispatch`). ## The bug `BasePushNotificationSender` accepted a `ServerCallContext` at construction time and called `config_store.get_info(task_id, self._call_context)` at dispatch. Because the sender is a process-wide singleton, callers passed a dummy `ServerCallContext()` (e.g. `itk/main.py`). The default `OwnerResolver`then resolved the dummy to the empty-string owner, which never matched any real registrar's partition. Result: `get_info` returned `[]` and every notification was silently dropped in any deployment with real authentication. ## The fix - **New non-abstract method** `PushNotificationConfigStore.get_info_for_dispatch(task_id)` returns every config for the task across all owners. Implemented in the in-memory and database stores. Custom 1.0 subclasses inherit a default implementation that forwards to `get_info(task_id, ServerCallContext())` preserving their 1.0 behavior exactly and emits a `DeprecationWarning` - **`BasePushNotificationSender`** no longer takes `context` in `__init__` and now calls `get_info_for_dispatch`. Identity is no longer held on the sender. - **`get_info(task_id, context)` is unchanged** and remains owner-scoped. Used by the user-callable read endpoints. The split encodes the asymmetry in the type system: the user-callable method requires a context, the dispatch-only method does not. Authorization (check if the user can create a config for a specific task) happens at registration (`set_info`), not at dispatch. Fixes #1015 🦕 --------- Co-authored-by: Copilot <copilot@github.com>
1 parent a470bae commit c24ae05

12 files changed

Lines changed: 1013 additions & 79 deletions

itk/main.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
InMemoryPushNotificationConfigStore,
3232
)
3333
from a2a.server.tasks.inmemory_task_store import InMemoryTaskStore
34-
from a2a.server.context import ServerCallContext
3534
from a2a.types import a2a_pb2_grpc
3635
from a2a.types.a2a_pb2 import (
3736
AgentCapabilities,
@@ -339,7 +338,6 @@ async def main_async(http_port: int, grpc_port: int) -> None:
339338
push_sender = BasePushNotificationSender(
340339
httpx_client=httpx.AsyncClient(),
341340
config_store=push_config_store,
342-
context=ServerCallContext(),
343341
)
344342

345343
handler = DefaultRequestHandler(

src/a2a/server/tasks/base_push_notification_sender.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,26 +27,39 @@ def __init__(
2727
self,
2828
httpx_client: httpx.AsyncClient,
2929
config_store: PushNotificationConfigStore,
30-
context: ServerCallContext,
30+
context: ServerCallContext | None = None,
3131
) -> None:
3232
"""Initializes the BasePushNotificationSender.
3333
3434
Args:
3535
httpx_client: An async HTTP client instance to send notifications.
36-
config_store: A PushNotificationConfigStore instance to retrieve configurations.
37-
context: The `ServerCallContext` that this push notification is produced under.
36+
config_store: A PushNotificationConfigStore instance to
37+
retrieve configurations.
38+
context: Deprecated and ignored. Accepted only for
39+
backward compatibility with 1.0 callers that constructed
40+
the sender with a (typically dummy) ServerCallContext.
41+
Pass None (the default) in new code. A non-None
42+
value logs a deprecation warning and is otherwise
43+
ignored.
3844
"""
45+
if context is not None:
46+
logger.warning(
47+
'BasePushNotificationSender no longer uses the context '
48+
'parameter; it is accepted only for backward compatibility '
49+
'with 1.0 and will be removed in a future major version. '
50+
'Push notifications now fan out across all owners via '
51+
'PushNotificationConfigStore.get_info_for_dispatch; the '
52+
'caller identity is not carried into dispatch. Drop the '
53+
'context argument from the constructor call.'
54+
)
3955
self._client = httpx_client
4056
self._config_store = config_store
41-
self._call_context: ServerCallContext = context
4257

4358
async def send_notification(
4459
self, task_id: str, event: PushNotificationEvent
4560
) -> None:
4661
"""Sends a push notification for an event if configuration exists."""
47-
push_configs = await self._config_store.get_info(
48-
task_id, self._call_context
49-
)
62+
push_configs = await self._config_store.get_info_for_dispatch(task_id)
5063
if not push_configs:
5164
return
5265

src/a2a/server/tasks/database_push_notification_config_store.py

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88

99
try:
10-
from sqlalchemy import Table, and_, delete, select
10+
from sqlalchemy import ColumnElement, Table, and_, delete, select
1111
from sqlalchemy.ext.asyncio import (
1212
AsyncEngine,
1313
AsyncSession,
@@ -304,21 +304,14 @@ async def set_info(
304304
owner,
305305
)
306306

307-
async def get_info(
307+
async def _select_configs(
308308
self,
309-
task_id: str,
310-
context: ServerCallContext,
309+
*predicates: 'ColumnElement[bool]',
311310
) -> list[TaskPushNotificationConfig]:
312-
"""Retrieves all push notification configurations for a task, for the given owner."""
311+
"""Loads configs matching the given predicates and decodes them."""
313312
await self._ensure_initialized()
314-
owner = self.owner_resolver(context)
315313
async with self.async_session_maker() as session:
316-
stmt = select(self.config_model).where(
317-
and_(
318-
self.config_model.task_id == task_id,
319-
self.config_model.owner == owner,
320-
)
321-
)
314+
stmt = select(self.config_model).where(and_(*predicates))
322315
result = await session.execute(stmt)
323316
models = result.scalars().all()
324317

@@ -331,10 +324,37 @@ async def get_info(
331324
'Could not deserialize push notification config for task %s, config %s, owner %s',
332325
model.task_id,
333326
model.config_id,
334-
owner,
327+
model.owner,
335328
)
336329
return configs
337330

331+
async def get_info(
332+
self,
333+
task_id: str,
334+
context: ServerCallContext,
335+
) -> list[TaskPushNotificationConfig]:
336+
"""Retrieves all push notification configurations for a task, for the given owner.
337+
338+
Used by the user-callable read endpoints.
339+
"""
340+
owner = self.owner_resolver(context)
341+
return await self._select_configs(
342+
self.config_model.task_id == task_id,
343+
self.config_model.owner == owner,
344+
)
345+
346+
async def get_info_for_dispatch(
347+
self,
348+
task_id: str,
349+
) -> list[TaskPushNotificationConfig]:
350+
"""Retrieves all push notification configurations for a task, across all owners.
351+
352+
Used by the push-notification dispatch path.
353+
"""
354+
return await self._select_configs(
355+
self.config_model.task_id == task_id,
356+
)
357+
338358
async def delete_info(
339359
self,
340360
task_id: str,

src/a2a/server/tasks/inmemory_push_notification_config_store.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,29 @@ async def get_info(
7272
task_id: str,
7373
context: ServerCallContext,
7474
) -> list[TaskPushNotificationConfig]:
75-
"""Retrieves all push notification configurations for a task from memory, for the given owner."""
75+
"""Retrieves all push notification configurations for a task from memory, for the given owner.
76+
77+
Used by the user-callable read endpoints.
78+
"""
7679
owner = self.owner_resolver(context)
7780
async with self.lock:
7881
owner_infos = self._get_owner_push_notification_infos(owner)
7982
return list(owner_infos.get(task_id, []))
8083

84+
async def get_info_for_dispatch(
85+
self,
86+
task_id: str,
87+
) -> list[TaskPushNotificationConfig]:
88+
"""Retrieves all push notification configurations for a task across all owners.
89+
90+
Used by the push-notification dispatch path.
91+
"""
92+
async with self.lock:
93+
results: list[TaskPushNotificationConfig] = []
94+
for all_configs in self._push_notification_infos.values():
95+
results.extend(all_configs.get(task_id, []))
96+
return results
97+
8198
async def delete_info(
8299
self,
83100
task_id: str,

src/a2a/server/tasks/push_notification_config_store.py

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
1+
import logging
2+
13
from abc import ABC, abstractmethod
24

35
from a2a.server.context import ServerCallContext
46
from a2a.types.a2a_pb2 import TaskPushNotificationConfig
57

68

9+
logger = logging.getLogger(__name__)
10+
11+
712
class PushNotificationConfigStore(ABC):
813
"""Interface for storing and retrieving push notification configurations for tasks."""
914

@@ -22,7 +27,46 @@ async def get_info(
2227
task_id: str,
2328
context: ServerCallContext,
2429
) -> list[TaskPushNotificationConfig]:
25-
"""Retrieves the push notification configuration for a task."""
30+
"""Retrieves push notification configurations for a task, scoped to the caller.
31+
32+
This is the user-callable read path. Implementations MUST return
33+
only configurations owned by the caller (as resolved from
34+
context).
35+
"""
36+
37+
async def get_info_for_dispatch(
38+
self,
39+
task_id: str,
40+
) -> list[TaskPushNotificationConfig]:
41+
"""Retrieves all push notification configurations for a task, across all owners.
42+
43+
This is the internal read path used by the push-notification
44+
dispatch loop. Implementations SHOULD override this method to
45+
return every configuration registered for task_id regardless of
46+
which user registered it. Authorization already happened at
47+
registration time and the dispatch path fires every registered
48+
webhook for the task.
49+
50+
The default implementation falls back to calling get_info with
51+
a synthetic empty ServerCallContext. This preserves 1.0
52+
behavior for subclasses that have not implemented the override
53+
but is INCORRECT for any deployment with multiple owners: the
54+
empty context resolves to the empty-string owner partition and
55+
returns no configs (silently dropping every notification). A
56+
warning is logged on every call to flag the misconfiguration.
57+
Custom subclasses MUST override this method to deliver
58+
notifications correctly in multi-owner deployments.
59+
"""
60+
logger.warning(
61+
'%s does not override '
62+
'PushNotificationConfigStore.get_info_for_dispatch; falling back '
63+
'to a context-less get_info call which silently drops '
64+
'notifications in any deployment with multiple owners. Override '
65+
'get_info_for_dispatch to return all configs for task_id across '
66+
'every owner.',
67+
type(self).__name__,
68+
)
69+
return await self.get_info(task_id, ServerCallContext())
2670

2771
@abstractmethod
2872
async def delete_info(

tests/e2e/push_notifications/agent_app.py

Lines changed: 84 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
import httpx
22

33
from fastapi import FastAPI
4+
from starlette.applications import Starlette
5+
from starlette.requests import Request
46

7+
from a2a.auth.user import UnauthenticatedUser, User
58
from a2a.server.agent_execution import AgentExecutor, RequestContext
69
from a2a.server.context import ServerCallContext
710
from a2a.server.events import EventQueue
8-
from starlette.applications import Starlette
9-
from a2a.server.routes.rest_routes import create_rest_routes
10-
from a2a.server.routes import create_agent_card_routes
1111
from a2a.server.request_handlers import DefaultRequestHandler
12+
from a2a.server.routes import create_agent_card_routes
13+
from a2a.server.routes.common import DefaultServerCallContextBuilder
14+
from a2a.server.routes.rest_routes import create_rest_routes
1215
from a2a.server.tasks import (
1316
BasePushNotificationSender,
1417
InMemoryPushNotificationConfigStore,
@@ -30,6 +33,9 @@
3033
)
3134

3235

36+
_TEST_USER_HEADER = 'x-test-user'
37+
38+
3339
def test_agent_card(url: str) -> AgentCard:
3440
"""Returns an agent card for the test agent."""
3541
return AgentCard(
@@ -151,11 +157,85 @@ def create_agent_app(
151157
push_sender=BasePushNotificationSender(
152158
httpx_client=notification_client,
153159
config_store=push_config_store,
154-
context=ServerCallContext(),
155160
),
156161
)
157162
rest_routes = create_rest_routes(request_handler=handler)
158163
agent_card_routes = create_agent_card_routes(
159164
agent_card=card, card_url='/.well-known/agent-card.json'
160165
)
161166
return Starlette(routes=[*rest_routes, *agent_card_routes])
167+
168+
169+
class _NamedTestUser(User):
170+
"""Authenticated test user identified by ``user_name``."""
171+
172+
def __init__(self, user_name: str) -> None:
173+
self._user_name = user_name
174+
175+
@property
176+
def is_authenticated(self) -> bool:
177+
return True
178+
179+
@property
180+
def user_name(self) -> str:
181+
return self._user_name
182+
183+
184+
class _HeaderUserContextBuilder(DefaultServerCallContextBuilder):
185+
"""Builds a ServerCallContext whose user is read from a request header."""
186+
187+
def build_user(self, request: Request) -> User:
188+
user_name = request.headers.get(_TEST_USER_HEADER)
189+
if user_name:
190+
return _NamedTestUser(user_name)
191+
return UnauthenticatedUser()
192+
193+
194+
def create_multi_user_agent_app(
195+
url: str, notification_client: httpx.AsyncClient
196+
) -> Starlette:
197+
"""Creates a multi-user variant of the test agent app.
198+
199+
Differences from create_agent_app:
200+
201+
- Identity is read from the x-test-user header on each request
202+
via _HeaderUserContextBuilder. Multiple authenticated
203+
users (e.g. alice, bob) can therefore call the same
204+
server.
205+
- The InMemoryTaskStore uses a constant owner resolver, so
206+
every authenticated user has access to every task.
207+
- The InMemoryPushNotificationConfigStore keeps the default
208+
per-user owner resolver, so each registrar's configs live in their
209+
own owner partition; this exercises cross-owner aggregation in
210+
get_info_for_dispatch.
211+
"""
212+
# Shared task visibility: any authenticated user can see any task.
213+
task_store = InMemoryTaskStore(owner_resolver=lambda _ctx: 'shared')
214+
215+
# Per-user push-config partitioning (the default).
216+
push_config_store = InMemoryPushNotificationConfigStore()
217+
218+
card = test_agent_card(url)
219+
extended_card = test_agent_card(url)
220+
extended_card.name = 'Test Agent Extended'
221+
222+
handler = DefaultRequestHandler(
223+
agent_executor=TestAgentExecutor(),
224+
task_store=task_store,
225+
agent_card=card,
226+
extended_agent_card=extended_card,
227+
push_config_store=push_config_store,
228+
push_sender=BasePushNotificationSender(
229+
httpx_client=notification_client,
230+
config_store=push_config_store,
231+
),
232+
)
233+
234+
rest_routes = create_rest_routes(
235+
request_handler=handler,
236+
context_builder=_HeaderUserContextBuilder(),
237+
)
238+
agent_card_routes = create_agent_card_routes(
239+
agent_card=card, card_url='/.well-known/agent-card.json'
240+
)
241+
return Starlette(routes=[*rest_routes, *agent_card_routes])

0 commit comments

Comments
 (0)