forked from a2aproject/a2a-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy patherror_handlers.py
More file actions
129 lines (111 loc) · 4.03 KB
/
error_handlers.py
File metadata and controls
129 lines (111 loc) · 4.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import functools
import logging
from collections.abc import Awaitable, Callable, Coroutine
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from starlette.responses import JSONResponse, Response
else:
try:
from starlette.responses import JSONResponse, Response
except ImportError:
JSONResponse = Any
Response = Any
from a2a._base import A2ABaseModel
from a2a.types import (
AuthenticatedExtendedCardNotConfiguredError,
ContentTypeNotSupportedError,
InternalError,
InvalidAgentResponseError,
InvalidParamsError,
InvalidRequestError,
JSONParseError,
MethodNotFoundError,
PushNotificationNotSupportedError,
TaskNotCancelableError,
TaskNotFoundError,
UnsupportedOperationError,
)
from a2a.utils.errors import ServerError
logger = logging.getLogger(__name__)
A2AErrorToHttpStatus: dict[type[A2ABaseModel], int] = {
JSONParseError: 400,
InvalidRequestError: 400,
MethodNotFoundError: 404,
InvalidParamsError: 422,
InternalError: 500,
TaskNotFoundError: 404,
TaskNotCancelableError: 409,
PushNotificationNotSupportedError: 501,
UnsupportedOperationError: 501,
ContentTypeNotSupportedError: 415,
InvalidAgentResponseError: 502,
AuthenticatedExtendedCardNotConfiguredError: 404,
}
def rest_error_handler(
func: Callable[..., Awaitable[Response]],
) -> Callable[..., Awaitable[Response]]:
"""Decorator to catch ServerError and map it to an appropriate JSONResponse."""
@functools.wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Response:
try:
return await func(*args, **kwargs)
except ServerError as e:
error = e.error or InternalError(
message='Internal error due to unknown reason'
)
http_code = A2AErrorToHttpStatus.get(type(error), 500)
log_level = (
logging.ERROR
if isinstance(error, InternalError)
else logging.WARNING
)
logger.log(
log_level,
"Request error: Code=%s, Message='%s'%s",
error.code,
error.message,
', Data=' + str(error.data) if error.data else '',
)
return JSONResponse(
content={'message': error.message}, status_code=http_code
)
except Exception:
logger.exception('Unknown error occurred')
return JSONResponse(
content={'message': 'unknown exception'}, status_code=500
)
return wrapper
def rest_stream_error_handler(
func: Callable[..., Coroutine[Any, Any, Any]],
) -> Callable[..., Coroutine[Any, Any, Any]]:
"""Decorator to catch ServerError for a streaming method,log it and then rethrow it to be handled by framework."""
@functools.wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
try:
return await func(*args, **kwargs)
except ServerError as e:
error = e.error or InternalError(
message='Internal error due to unknown reason'
)
log_level = (
logging.ERROR
if isinstance(error, InternalError)
else logging.WARNING
)
logger.log(
log_level,
"Request error: Code=%s, Message='%s'%s",
error.code,
error.message,
', Data=' + str(error.data) if error.data else '',
)
# Since the stream has started, we can't return a JSONResponse.
# Instead, we runt the error handling logic (provides logging)
# and reraise the error and let server framework manage
raise e
except Exception as e:
# Since the stream has started, we can't return a JSONResponse.
# Instead, we runt the error handling logic (provides logging)
# and reraise the error and let server framework manage
raise e
return wrapper