Skip to content

Commit 9a35d24

Browse files
committed
chore: merge main into 1.0-dev
2 parents 1c4838f + 697ab8e commit 9a35d24

11 files changed

Lines changed: 427 additions & 358 deletions

File tree

.github/workflows/python-publish.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ jobs:
2626
run: uv build
2727

2828
- name: Upload distributions
29-
uses: actions/upload-artifact@v6
29+
uses: actions/upload-artifact@v7
3030
with:
3131
name: release-dists
3232
path: dist/
@@ -40,7 +40,7 @@ jobs:
4040

4141
steps:
4242
- name: Retrieve release distributions
43-
uses: actions/download-artifact@v7
43+
uses: actions/download-artifact@v8
4444
with:
4545
name: release-dists
4646
path: dist/

CHANGELOG.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,18 @@
11
# Changelog
22

3+
## [0.3.25](https://github.com/a2aproject/a2a-python/compare/v0.3.24...v0.3.25) (2026-03-10)
4+
5+
6+
### Features
7+
8+
* Implement a vertex based task store ([#752](https://github.com/a2aproject/a2a-python/issues/752)) ([fa14dbf](https://github.com/a2aproject/a2a-python/commit/fa14dbf46b603f288a1f1c474401483bf53950e4))
9+
10+
11+
### Bug Fixes
12+
13+
* return background task from consume_and_break_on_interrupt to prevent GC ([#775](https://github.com/a2aproject/a2a-python/issues/775)) ([a236d4d](https://github.com/a2aproject/a2a-python/commit/a236d4df8dceb2db1e1170e0b57599f3837ebd71))
14+
* use default_factory for mutable field defaults in ServerCallContext ([#744](https://github.com/a2aproject/a2a-python/issues/744)) ([22b25d6](https://github.com/a2aproject/a2a-python/commit/22b25d653e57e2d1453bbc282052e51dbd904ac6))
15+
316
## [0.3.24](https://github.com/a2aproject/a2a-python/compare/v0.3.23...v0.3.24) (2026-02-20)
417

518

src/a2a/contrib/__init__.py

Whitespace-only changes.

src/a2a/server/context.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ class ServerCallContext(BaseModel):
1919

2020
model_config = ConfigDict(arbitrary_types_allowed=True)
2121

22-
state: State = Field(default={})
23-
user: User = Field(default=UnauthenticatedUser())
22+
state: State = Field(default_factory=dict)
23+
user: User = Field(default_factory=UnauthenticatedUser)
2424
tenant: str = Field(default='')
2525
requested_extensions: set[str] = Field(default_factory=set)
2626
activated_extensions: set[str] = Field(default_factory=set)

src/a2a/server/request_handlers/default_request_handler.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,12 +351,17 @@ async def push_notification_callback(event: Event) -> None:
351351
(
352352
result,
353353
interrupted_or_non_blocking,
354+
bg_consume_task,
354355
) = await result_aggregator.consume_and_break_on_interrupt(
355356
consumer,
356357
blocking=blocking,
357358
event_callback=push_notification_callback,
358359
)
359360

361+
if bg_consume_task is not None:
362+
bg_consume_task.set_name(f'continue_consuming:{task_id}')
363+
self._track_background_task(bg_consume_task)
364+
360365
except Exception:
361366
logger.exception('Agent execution failed')
362367
producer_task.cancel()

src/a2a/server/tasks/result_aggregator.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ async def consume_and_break_on_interrupt(
9999
consumer: EventConsumer,
100100
blocking: bool = True,
101101
event_callback: Callable[[Event], Awaitable[None]] | None = None,
102-
) -> tuple[Task | Message | None, bool]:
102+
) -> tuple[Task | Message | None, bool, asyncio.Task | None]:
103103
"""Processes the event stream until completion or an interruptible state is encountered.
104104
105105
If `blocking` is False, it returns after the first event that creates a Task or Message.
@@ -119,16 +119,23 @@ async def consume_and_break_on_interrupt(
119119
A tuple containing:
120120
- The current aggregated result (`Task` or `Message`) at the point of completion or interruption.
121121
- A boolean indicating whether the consumption was interrupted (`True`) or completed naturally (`False`).
122+
- The background ``asyncio.Task`` that continues consuming events
123+
after an interruption, or ``None`` when no background work was
124+
spawned. **Callers must hold a strong reference** to this task
125+
(e.g. in a ``set``) to prevent the garbage collector from
126+
collecting it before it finishes — the event loop only keeps
127+
weak references to tasks.
122128
123129
Raises:
124130
BaseException: If the `EventConsumer` raises an exception during consumption.
125131
"""
126132
event_stream = consumer.consume_all()
127133
interrupted = False
134+
bg_task: asyncio.Task | None = None
128135
async for event in event_stream:
129136
if isinstance(event, Message):
130137
self._message = event
131-
return event, False
138+
return event, False, None
132139
await self.task_manager.process(event)
133140

134141
if event_callback:
@@ -161,13 +168,13 @@ async def consume_and_break_on_interrupt(
161168

162169
if should_interrupt:
163170
# Continue consuming the rest of the events in the background.
164-
# TODO: We should track all outstanding tasks to ensure they eventually complete.
165-
asyncio.create_task( # noqa: RUF006
171+
# The caller is responsible for tracking this task to prevent GC.
172+
bg_task = asyncio.create_task(
166173
self._continue_consuming(event_stream, event_callback)
167174
)
168175
interrupted = True
169176
break
170-
return await self.task_manager.get_task(), interrupted
177+
return await self.task_manager.get_task(), interrupted, bg_task
171178

172179
async def _continue_consuming(
173180
self,

tck/sut_agent.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
DefaultRequestHandler,
1616
)
1717
from a2a.server.tasks.inmemory_task_store import InMemoryTaskStore
18+
from a2a.server.tasks.task_store import TaskStore
1819
from a2a.types import (
1920
AgentCapabilities,
2021
AgentCard,
@@ -128,8 +129,8 @@ async def execute(
128129
await event_queue.enqueue_event(final_update)
129130

130131

131-
def main() -> None:
132-
"""Main entrypoint."""
132+
def serve(task_store: TaskStore) -> None:
133+
"""Sets up the A2A service and starts the HTTP server."""
133134
http_port = int(os.environ.get('HTTP_PORT', '41241'))
134135

135136
agent_card = AgentCard(
@@ -168,7 +169,7 @@ def main() -> None:
168169

169170
request_handler = DefaultRequestHandler(
170171
agent_executor=SUTAgentExecutor(),
171-
task_store=InMemoryTaskStore(),
172+
task_store=task_store,
172173
)
173174

174175
server = A2AStarletteApplication(
@@ -182,5 +183,10 @@ def main() -> None:
182183
uvicorn.run(app, host='127.0.0.1', port=http_port, log_level='info')
183184

184185

186+
def main() -> None:
187+
"""Main entrypoint."""
188+
serve(InMemoryTaskStore())
189+
190+
185191
if __name__ == '__main__':
186192
main()

tests/contrib/__init__.py

Whitespace-only changes.

tests/server/request_handlers/test_default_request_handler.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,7 @@ async def test_on_message_send_with_push_notification():
541541
mock_result_aggregator_instance.consume_and_break_on_interrupt.return_value = (
542542
final_task_result,
543543
False,
544+
None,
544545
)
545546

546547
# Mock the current_result async property to return the final task result
@@ -643,6 +644,7 @@ async def test_on_message_send_with_push_notification_in_non_blocking_request():
643644
mock_result_aggregator_instance.consume_and_break_on_interrupt.return_value = (
644645
initial_task,
645646
True, # interrupted = True for non-blocking
647+
MagicMock(spec=asyncio.Task), # background task
646648
)
647649

648650
# Mock the current_result async property to return the final task
@@ -666,7 +668,11 @@ async def mock_consume_and_break_on_interrupt(
666668
event_callback_received = event_callback
667669
if event_callback_received:
668670
await event_callback_received(final_task)
669-
return initial_task, True # interrupted = True for non-blocking
671+
return (
672+
initial_task,
673+
True,
674+
MagicMock(spec=asyncio.Task),
675+
) # interrupted = True for non-blocking
670676

671677
mock_result_aggregator_instance.consume_and_break_on_interrupt = (
672678
mock_consume_and_break_on_interrupt
@@ -758,6 +764,7 @@ async def test_on_message_send_with_push_notification_no_existing_Task():
758764
mock_result_aggregator_instance.consume_and_break_on_interrupt.return_value = (
759765
final_task_result,
760766
False,
767+
None,
761768
)
762769

763770
# Mock the current_result async property to return the final task result
@@ -815,6 +822,7 @@ async def test_on_message_send_no_result_from_aggregator():
815822
mock_result_aggregator_instance.consume_and_break_on_interrupt.return_value = (
816823
None,
817824
False,
825+
None,
818826
)
819827

820828
with (
@@ -864,6 +872,7 @@ async def test_on_message_send_task_id_mismatch():
864872
mock_result_aggregator_instance.consume_and_break_on_interrupt.return_value = (
865873
mismatched_task,
866874
False,
875+
None,
867876
)
868877

869878
with (
@@ -1069,6 +1078,7 @@ async def test_on_message_send_interrupted_flow():
10691078
mock_result_aggregator_instance.consume_and_break_on_interrupt.return_value = (
10701079
interrupt_task_result,
10711080
True,
1081+
MagicMock(spec=asyncio.Task), # background task
10721082
) # Interrupted = True
10731083

10741084
# Collect coroutines passed to create_task so we can close them

tests/server/tasks/test_result_aggregator.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,12 +231,14 @@ async def mock_consume_generator():
231231
(
232232
result,
233233
interrupted,
234+
bg_task,
234235
) = await self.aggregator.consume_and_break_on_interrupt(
235236
self.mock_event_consumer
236237
)
237238

238239
self.assertEqual(result, sample_message)
239240
self.assertFalse(interrupted)
241+
self.assertIsNone(bg_task)
240242
self.mock_task_manager.process.assert_not_called() # Process is not called for the Message if returned directly
241243
# _continue_consuming should not be called if it's a message interrupt
242244
# and no auth_required state.
@@ -268,12 +270,14 @@ async def mock_consume_generator():
268270
(
269271
result,
270272
interrupted,
273+
bg_task,
271274
) = await self.aggregator.consume_and_break_on_interrupt(
272275
self.mock_event_consumer
273276
)
274277

275278
self.assertEqual(result, auth_task)
276279
self.assertTrue(interrupted)
280+
self.assertIsNotNone(bg_task)
277281
self.mock_task_manager.process.assert_called_once_with(auth_task)
278282
mock_create_task.assert_called_once() # Check that create_task was called
279283
# self.aggregator._continue_consuming is an AsyncMock.
@@ -322,12 +326,14 @@ async def mock_consume_generator():
322326
(
323327
result,
324328
interrupted,
329+
bg_task,
325330
) = await self.aggregator.consume_and_break_on_interrupt(
326331
self.mock_event_consumer
327332
)
328333

329334
self.assertEqual(result, current_task_state_after_update)
330335
self.assertTrue(interrupted)
336+
self.assertIsNotNone(bg_task)
331337
self.mock_task_manager.process.assert_called_once_with(
332338
auth_status_update
333339
)
@@ -358,13 +364,15 @@ async def mock_consume_generator():
358364
(
359365
result,
360366
interrupted,
367+
bg_task,
361368
) = await self.aggregator.consume_and_break_on_interrupt(
362369
self.mock_event_consumer
363370
)
364371

365372
# If the first event is a Message, it's returned directly.
366373
self.assertEqual(result, event1)
367374
self.assertFalse(interrupted)
375+
self.assertIsNone(bg_task)
368376
# process() is NOT called for the Message if it's the one causing the return
369377
self.mock_task_manager.process.assert_not_called()
370378
self.mock_task_manager.get_task.assert_not_called()
@@ -420,12 +428,14 @@ async def mock_consume_generator():
420428
(
421429
result,
422430
interrupted,
431+
bg_task,
423432
) = await self.aggregator.consume_and_break_on_interrupt(
424433
self.mock_event_consumer, blocking=False
425434
)
426435

427436
self.assertEqual(result, first_event)
428437
self.assertTrue(interrupted)
438+
self.assertIsNotNone(bg_task)
429439
self.mock_task_manager.process.assert_called_once_with(first_event)
430440
mock_create_task.assert_called_once()
431441
# The background task should be created with the remaining stream
@@ -474,7 +484,7 @@ async def initial_consume_generator():
474484
mock_create_task.side_effect = lambda coro: asyncio.ensure_future(coro)
475485

476486
# Call the main method that triggers _continue_consuming via create_task
477-
_, _ = await self.aggregator.consume_and_break_on_interrupt(
487+
_, _, _ = await self.aggregator.consume_and_break_on_interrupt(
478488
self.mock_event_consumer
479489
)
480490

0 commit comments

Comments
 (0)