Skip to content

Commit a84f89e

Browse files
committed
WIP
1 parent e6462ca commit a84f89e

4 files changed

Lines changed: 21 additions & 49 deletions

File tree

src/a2a/client/transports/http_helpers.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,9 @@ async def send_http_stream_request(
9090
continue
9191
yield sse.data
9292
except SSEError as e:
93-
if 'application/json' in event_source.response.headers.get('content-type', ''):
93+
if 'application/json' in event_source.response.headers.get(
94+
'content-type', ''
95+
):
9496
content = await event_source.response.aread()
9597
yield content.decode('utf-8')
9698
else:

src/a2a/server/apps/jsonrpc/jsonrpc_app.py

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,7 @@ async def _process_streaming_request(
482482
request_obj, context
483483
)
484484

485-
return await self._create_response(context, handler_result)
485+
return self._create_response(context, handler_result)
486486

487487
async def _process_non_streaming_request(
488488
self,
@@ -562,9 +562,9 @@ async def _process_non_streaming_request(
562562
)
563563
return self._generate_error_response(request_id, error)
564564

565-
return await self._create_response(context, handler_result)
565+
return self._create_response(context, handler_result)
566566

567-
async def _create_response(
567+
def _create_response(
568568
self,
569569
context: ServerCallContext,
570570
handler_result: AsyncGenerator[dict[str, Any]] | dict[str, Any],
@@ -587,35 +587,15 @@ async def _create_response(
587587
if exts := context.activated_extensions:
588588
headers[HTTP_EXTENSION_HEADER] = ', '.join(sorted(exts))
589589
if isinstance(handler_result, AsyncGenerator):
590-
try:
591-
# Prime to see if it fails upfront
592-
first_item = await handler_result.__anext__()
593-
except StopAsyncIteration:
594-
595-
async def empty_generator() -> AsyncGenerator[dict[str, str], None]:
596-
if False:
597-
yield {}
598-
599-
return EventSourceResponse(empty_generator(), headers=headers)
600-
except Exception as e:
601-
logger.debug('Upfront exception in streaming handler: %s', e)
602-
if not isinstance(e, A2AError | JSONRPCError):
603-
e = InternalError(message=str(e))
604-
request_id = context.state.get('request_id')
605-
error_payload = build_error_response(request_id, e)
606-
return JSONResponse(error_payload, headers=headers)
607-
608590
# Result is a stream of dict objects
609591
async def event_generator(
610592
stream: AsyncGenerator[dict[str, Any]],
611-
first_item: dict[str, Any],
612-
) -> AsyncGenerator[dict[str, str], None]:
613-
yield {'data': json.dumps(first_item)}
593+
) -> AsyncGenerator[dict[str, str]]:
614594
async for item in stream:
615595
yield {'data': json.dumps(item)}
616596

617597
return EventSourceResponse(
618-
event_generator(handler_result, first_item), headers=headers
598+
event_generator(handler_result), headers=headers
619599
)
620600

621601
# handler_result is a dict (JSON-RPC response)

src/a2a/utils/error_handlers.py

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -165,26 +165,17 @@ async def wrapper(*args: Any, **kwargs: Any) -> Any:
165165
):
166166
original_iterator = response.body_iterator
167167

168-
try:
169-
# Prime the stream to catch upfront errors
170-
first_item = await original_iterator.__anext__()
171-
except StopAsyncIteration:
172-
# Stream is empty
173-
pass
174-
except Exception as e: # noqa: BLE001
175-
return _create_error_response(e)
176-
else:
177-
178-
async def error_catching_iterator() -> AsyncGenerator[Any, None]:
179-
yield first_item
180-
try:
181-
async for item in original_iterator:
182-
yield item
183-
except Exception as stream_error:
184-
_log_error(stream_error)
185-
raise stream_error
186-
187-
response.body_iterator = error_catching_iterator()
168+
async def error_catching_iterator() -> AsyncGenerator[
169+
Any, None
170+
]:
171+
try:
172+
async for item in original_iterator:
173+
yield item
174+
except Exception as stream_error:
175+
_log_error(stream_error)
176+
raise stream_error
177+
178+
response.body_iterator = error_catching_iterator()
188179

189180
except Exception as e: # noqa: BLE001
190181
return _create_error_response(e)

src/a2a/utils/helpers.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -304,13 +304,12 @@ def validate_async_generator(
304304

305305
def decorator(function):
306306
@functools.wraps(function)
307-
async def wrapper(self, *args, **kwargs):
307+
def wrapper(self, *args, **kwargs):
308308
if not expression(self):
309309
final_message = error_message or str(expression)
310310
logger.error('Unsupported Operation: %s', final_message)
311311
raise UnsupportedOperationError(message=final_message)
312-
async for i in function(self, *args, **kwargs):
313-
yield i
312+
return function(self, *args, **kwargs)
314313

315314
return wrapper
316315

0 commit comments

Comments
 (0)