Skip to content

Commit 8eae2a3

Browse files
committed
fix: do not crash on SSE comment line
1 parent fdbf22f commit 8eae2a3

4 files changed

Lines changed: 137 additions & 0 deletions

File tree

src/a2a/client/transports/jsonrpc.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,8 @@ async def send_message_streaming(
176176
try:
177177
event_source.response.raise_for_status()
178178
async for sse in event_source.aiter_sse():
179+
if not sse.data:
180+
continue
179181
response = SendStreamingMessageResponse.model_validate(
180182
json.loads(sse.data)
181183
)

src/a2a/client/transports/rest.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@ async def send_message_streaming(
154154
try:
155155
event_source.response.raise_for_status()
156156
async for sse in event_source.aiter_sse():
157+
if not sse.data:
158+
continue
157159
event = a2a_pb2.StreamResponse()
158160
Parse(sse.data, event)
159161
yield proto_utils.FromProto.stream_response(event)

tests/client/transports/test_jsonrpc_client.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import httpx
88
import pytest
9+
import respx
910

1011
from httpx_sse import EventSource, SSEError, ServerSentEvent
1112

@@ -466,6 +467,71 @@ async def test_send_message_streaming_success(
466467
== mock_stream_response_2.result.model_dump()
467468
)
468469

470+
# Repro of https://github.com/a2aproject/a2a-python/issues/540
471+
@pytest.mark.asyncio
472+
@respx.mock
473+
async def test_send_message_streaming_comment_success(
474+
self,
475+
mock_agent_card: MagicMock,
476+
):
477+
async with httpx.AsyncClient() as client:
478+
transport = JsonRpcTransport(
479+
httpx_client=client, agent_card=mock_agent_card
480+
)
481+
params = MessageSendParams(
482+
message=create_text_message_object(content='Hello stream')
483+
)
484+
mock_stream_response_1 = SendMessageSuccessResponse(
485+
id='stream_id_123',
486+
jsonrpc='2.0',
487+
result=create_text_message_object(
488+
content='First part ', role=Role.agent
489+
),
490+
)
491+
mock_stream_response_2 = SendMessageSuccessResponse(
492+
id='stream_id_123',
493+
jsonrpc='2.0',
494+
result=create_text_message_object(
495+
content='second part ', role=Role.agent
496+
),
497+
)
498+
499+
sse_content = (
500+
'id: stream_id_1\n'
501+
f'data: {mock_stream_response_1.model_dump_json()}\n\n'
502+
': keep-alive\n\n'
503+
'id: stream_id_2\n'
504+
f'data: {mock_stream_response_2.model_dump_json()}\n\n'
505+
': keep-alive\n\n'
506+
)
507+
508+
respx.post(mock_agent_card.url).mock(
509+
return_value=httpx.Response(
510+
200,
511+
headers={'Content-Type': 'text/event-stream'},
512+
content=sse_content,
513+
)
514+
)
515+
516+
results = [
517+
item
518+
async for item in transport.send_message_streaming(
519+
request=params
520+
)
521+
]
522+
523+
assert len(results) == 2
524+
assert isinstance(results[0], Message)
525+
assert (
526+
results[0].model_dump()
527+
== mock_stream_response_1.result.model_dump()
528+
)
529+
assert isinstance(results[1], Message)
530+
assert (
531+
results[1].model_dump()
532+
== mock_stream_response_2.result.model_dump()
533+
)
534+
469535
@pytest.mark.asyncio
470536
async def test_send_request_http_status_error(
471537
self, mock_httpx_client: AsyncMock, mock_agent_card: MagicMock

tests/client/transports/test_rest_client.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,23 @@
33

44
import httpx
55
import pytest
6+
import respx
67

8+
from google.protobuf.json_format import MessageToJson
79
from httpx_sse import EventSource, ServerSentEvent
810

911
from a2a.client import create_text_message_object
1012
from a2a.client.errors import A2AClientHTTPError
1113
from a2a.client.transports.rest import RestTransport
1214
from a2a.extensions.common import HTTP_EXTENSION_HEADER
15+
from a2a.grpc import a2a_pb2
1316
from a2a.types import (
1417
AgentCapabilities,
1518
AgentCard,
1619
MessageSendParams,
20+
Role,
1721
)
22+
from a2a.utils import proto_utils
1823

1924

2025
@pytest.fixture
@@ -88,6 +93,68 @@ async def test_send_message_with_default_extensions(
8893
},
8994
)
9095

96+
# Repro of https://github.com/a2aproject/a2a-python/issues/540
97+
@pytest.mark.asyncio
98+
@respx.mock
99+
async def test_send_message_streaming_comment_success(
100+
self,
101+
mock_agent_card: MagicMock,
102+
):
103+
"""Test successful streaming in RestTransport."""
104+
async with httpx.AsyncClient() as client:
105+
transport = RestTransport(
106+
httpx_client=client, agent_card=mock_agent_card
107+
)
108+
params = MessageSendParams(
109+
message=create_text_message_object(content='Hello stream')
110+
)
111+
112+
mock_stream_response_1 = a2a_pb2.StreamResponse(
113+
msg=proto_utils.ToProto.message(
114+
create_text_message_object(
115+
content='First part', role=Role.agent
116+
)
117+
)
118+
)
119+
mock_stream_response_2 = a2a_pb2.StreamResponse(
120+
msg=proto_utils.ToProto.message(
121+
create_text_message_object(
122+
content='Second part', role=Role.agent
123+
)
124+
)
125+
)
126+
127+
sse_content = (
128+
'id: stream_id_1\n'
129+
f'data: {MessageToJson(mock_stream_response_1, indent=None)}\n\n'
130+
': keep-alive\n\n'
131+
'id: stream_id_2\n'
132+
f'data: {MessageToJson(mock_stream_response_2, indent=None)}\n\n'
133+
': keep-alive\n\n'
134+
)
135+
print(sse_content)
136+
137+
respx.post(
138+
f'{mock_agent_card.url.rstrip("/")}/v1/message:stream'
139+
).mock(
140+
return_value=httpx.Response(
141+
200,
142+
headers={'Content-Type': 'text/event-stream'},
143+
content=sse_content,
144+
)
145+
)
146+
147+
results = [
148+
item
149+
async for item in transport.send_message_streaming(
150+
request=params
151+
)
152+
]
153+
154+
assert len(results) == 2
155+
assert results[0].parts[0].root.text == 'First part'
156+
assert results[1].parts[0].root.text == 'Second part'
157+
91158
@pytest.mark.asyncio
92159
@patch('a2a.client.transports.rest.aconnect_sse')
93160
async def test_send_message_streaming_with_new_extensions(

0 commit comments

Comments
 (0)