Skip to content
Merged
79 changes: 78 additions & 1 deletion src/a2a/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,40 @@ def validate(
and returns a boolean.
error_message: An optional custom error message for the `UnsupportedOperationError`.
If None, the string representation of the expression will be used.

Examples:
Basic usage with capability check:
>>> class MyAgent:
... def __init__(self, streaming_enabled: bool):
... self.streaming_enabled = streaming_enabled
...
... @validate(lambda self: self.streaming_enabled)
... async def stream_response(self, message: str):
... return f'Streaming: {message}'

With custom error message:
>>> class SecureAgent:
... def __init__(self):
... self.auth_enabled = False
...
... @validate(
... lambda self: self.auth_enabled,
... 'Authentication must be enabled for this operation',
... )
... def secure_operation(self, data: str):
... return f'Processing secure data: {data}'

Error case example:
>>> from a2a.utils.errors import ServerError
>>> agent = SecureAgent()
>>> try:
... agent.secure_operation('secret')
... except ServerError as e:
... print(e.error.message)
Authentication must be enabled for this operation
Comment thread
ahoblitz marked this conversation as resolved.

Note:
This decorator works with both sync and async methods automatically.
"""

def decorator(function: Callable) -> Callable:
Expand Down Expand Up @@ -174,7 +208,7 @@ def sync_wrapper(self: Any, *args, **kwargs) -> Any:
def validate_async_generator(
expression: Callable[[Any], bool], error_message: str | None = None
):
"""Decorator that validates if a given expression evaluates to True.
"""Decorator that validates if a given expression evaluates to True for async generators.

Typically used on class methods to check capabilities or configuration
before executing the method's logic. If the expression is False,
Expand All @@ -185,6 +219,49 @@ def validate_async_generator(
and returns a boolean.
error_message: An optional custom error message for the `UnsupportedOperationError`.
If None, the string representation of the expression will be used.

Examples:
Streaming capability validation:
>>> class StreamingAgent:
... def __init__(self, streaming_enabled: bool):
... self.streaming_enabled = streaming_enabled
...
... @validate_async_generator(
... lambda self: self.streaming_enabled,
... 'Streaming is not supported by this agent',
... )
... async def stream_messages(self, count: int):
... for i in range(count):
... yield f'Message {i}'
Comment thread
ahoblitz marked this conversation as resolved.
Outdated

Error case - validation fails:
>>> from a2a.utils.errors import ServerError
>>> import asyncio
>>>
>>> class FeatureAgent:
... def __init__(self):
... self.features = {'real_time': False}
...
... @validate_async_generator(
... lambda self: self.features.get('real_time', False),
... 'Real-time feature must be enabled to stream updates',
... )
... async def real_time_updates(self):
... yield 'This should not be yielded'
>>> async def run_test():
... agent = FeatureAgent()
... try:
... async for _ in agent.real_time_updates():
... pass
... except ServerError as e:
... print(e.error.message)
>>>
>>> asyncio.run(run_test())
Real-time feature must be enabled to stream updates

Note:
This decorator is specifically for async generator methods (async def with yield).
The validation happens before the generator starts yielding values.
"""

def decorator(function):
Expand Down
Loading