-
Notifications
You must be signed in to change notification settings - Fork 429
Expand file tree
/
Copy pathhelpers.py
More file actions
112 lines (94 loc) · 4.01 KB
/
helpers.py
File metadata and controls
112 lines (94 loc) · 4.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""Helper functions for the A2A client."""
from typing import Any
from google.protobuf.json_format import ParseDict
from a2a.types.a2a_pb2 import AgentCard
def parse_agent_card(agent_card_data: dict[str, Any]) -> AgentCard:
"""Parse AgentCard JSON dictionary and handle backward compatibility."""
_handle_extended_card_compatibility(agent_card_data)
_handle_connection_fields_compatibility(agent_card_data)
_handle_security_compatibility(agent_card_data)
return ParseDict(agent_card_data, AgentCard(), ignore_unknown_fields=True)
def _handle_extended_card_compatibility(
agent_card_data: dict[str, Any],
) -> None:
"""Map legacy supportsAuthenticatedExtendedCard to capabilities."""
if agent_card_data.pop('supportsAuthenticatedExtendedCard', None):
capabilities = agent_card_data.setdefault('capabilities', {})
if 'extendedAgentCard' not in capabilities:
capabilities['extendedAgentCard'] = True
def _handle_connection_fields_compatibility(
agent_card_data: dict[str, Any],
) -> None:
"""Map legacy connection and transport fields to supportedInterfaces."""
main_url = agent_card_data.pop('url', None)
main_transport = agent_card_data.pop('preferredTransport', 'JSONRPC')
version = agent_card_data.pop('protocolVersion', '0.3.0')
additional_interfaces = (
agent_card_data.pop('additionalInterfaces', None) or []
)
if 'supportedInterfaces' not in agent_card_data and main_url:
supported_interfaces = []
supported_interfaces.append(
{
'url': main_url,
'protocolBinding': main_transport,
'protocolVersion': version,
}
)
supported_interfaces.extend(
{
'url': iface.get('url'),
'protocolBinding': iface.get('transport'),
'protocolVersion': version,
}
for iface in additional_interfaces
)
agent_card_data['supportedInterfaces'] = supported_interfaces
def _map_legacy_security(
sec_list: list[dict[str, list[str]]],
) -> list[dict[str, Any]]:
"""Convert a legacy security requirement list into the 1.0.0 Protobuf format."""
return [
{
'schemes': {
scheme_name: {'list': scopes}
for scheme_name, scopes in sec_dict.items()
}
}
for sec_dict in sec_list
]
def _handle_security_compatibility(agent_card_data: dict[str, Any]) -> None:
"""Map legacy security requirements and schemas to their 1.0.0 Protobuf equivalents."""
legacy_security = agent_card_data.pop('security', None)
if (
'securityRequirements' not in agent_card_data
and legacy_security is not None
):
agent_card_data['securityRequirements'] = _map_legacy_security(
legacy_security
)
for skill in agent_card_data.get('skills', []):
legacy_skill_sec = skill.pop('security', None)
if 'securityRequirements' not in skill and legacy_skill_sec is not None:
skill['securityRequirements'] = _map_legacy_security(
legacy_skill_sec
)
security_schemes = agent_card_data.get('securitySchemes', {})
if security_schemes:
type_mapping = {
'apiKey': 'apiKeySecurityScheme',
'http': 'httpAuthSecurityScheme',
'oauth2': 'oauth2SecurityScheme',
'openIdConnect': 'openIdConnectSecurityScheme',
'mutualTLS': 'mtlsSecurityScheme',
}
for scheme in security_schemes.values():
scheme_type = scheme.pop('type', None)
if scheme_type in type_mapping:
# Map legacy 'in' to modern 'location'
if scheme_type == 'apiKey' and 'in' in scheme:
scheme['location'] = scheme.pop('in')
mapped_name = type_mapping[scheme_type]
new_scheme_wrapper = {mapped_name: scheme.copy()}
scheme.clear()
scheme.update(new_scheme_wrapper)