-
Notifications
You must be signed in to change notification settings - Fork 429
Expand file tree
/
Copy patherror_handlers.py
More file actions
127 lines (109 loc) · 4.04 KB
/
error_handlers.py
File metadata and controls
127 lines (109 loc) · 4.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import functools
import logging
from collections.abc import Awaitable, Callable, Coroutine
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from starlette.responses import JSONResponse, Response
else:
try:
from starlette.responses import JSONResponse, Response
except ImportError:
JSONResponse = Any
Response = Any
from a2a._base import A2ABaseModel
from a2a.types import (
AuthenticatedExtendedCardNotConfiguredError,
ContentTypeNotSupportedError,
InternalError,
InvalidAgentResponseError,
InvalidParamsError,
InvalidRequestError,
JSONParseError,
MethodNotFoundError,
PushNotificationNotSupportedError,
TaskNotCancelableError,
TaskNotFoundError,
UnsupportedOperationError,
)
from a2a.utils.errors import ServerError
logger = logging.getLogger(__name__)
A2AErrorToHttpStatus: dict[type[A2ABaseModel], int] = {
JSONParseError: 400,
InvalidRequestError: 400,
MethodNotFoundError: 404,
InvalidParamsError: 422,
InternalError: 500,
TaskNotFoundError: 404,
TaskNotCancelableError: 409,
PushNotificationNotSupportedError: 501,
UnsupportedOperationError: 501,
ContentTypeNotSupportedError: 415,
InvalidAgentResponseError: 502,
AuthenticatedExtendedCardNotConfiguredError: 404,
}
def rest_error_handler(
func: Callable[..., Awaitable[Response]],
) -> Callable[..., Awaitable[Response]]:
"""Decorator to catch ServerError and map it to an appropriate JSONResponse."""
@functools.wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Response:
try:
return await func(*args, **kwargs)
except ServerError as e:
error = e.error or InternalError(
message='Internal error due to unknown reason'
)
http_code = A2AErrorToHttpStatus.get(type(error), 500)
log_level = (
logging.ERROR
if isinstance(error, InternalError)
else logging.WARNING
)
logger.log(
log_level,
'Request error: '
f"Code={error.code}, Message='{error.message}'"
f'{", Data=" + str(error.data) if error.data else ""}',
)
return JSONResponse(
content={'message': error.message}, status_code=http_code
)
except Exception as e: # noqa: BLE001
logger.log(logging.ERROR, f'Unknown error occurred {e}')
return JSONResponse(
content={'message': 'unknown exception'}, status_code=500
)
return wrapper
def rest_stream_error_handler(
func: Callable[..., Coroutine[Any, Any, Any]],
) -> Callable[..., Coroutine[Any, Any, Any]]:
"""Decorator to catch ServerError for a streaming method,log it and then rethrow it to be handled by framework."""
@functools.wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
try:
return await func(*args, **kwargs)
except ServerError as e:
error = e.error or InternalError(
message='Internal error due to unknown reason'
)
log_level = (
logging.ERROR
if isinstance(error, InternalError)
else logging.WARNING
)
logger.log(
log_level,
'Request error: '
f"Code={error.code}, Message='{error.message}'"
f'{", Data=" + str(error.data) if error.data else ""}',
)
# Since the stream has started, we can't return a JSONResponse.
# Instead, we runt the error handling logic (provides logging)
# and reraise the error and let server framework manage
raise e
except Exception as e:
# Since the stream has started, we can't return a JSONResponse.
# Instead, we runt the error handling logic (provides logging)
# and reraise the error and let server framework manage
raise e
return wrapper