Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 117 additions & 12 deletions tests/a2a/test_registry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@
create_task,
poll_task,
search_agent_cards,
truncate_utf8_bytes,
)
from veadk.tools.builtin_tools.a2a_registry import (
build_a2a_registry_tools,
build_remote_a2a_agent_tools,
)
from veadk.tools.builtin_tools.a2a_registry import build_a2a_registry_tools


def _mock_response(payload: dict, status_code: int = 200) -> Mock:
Expand Down Expand Up @@ -106,6 +110,37 @@ def test_search_agent_cards_sanitizes_and_signs_request(post: Mock):
assert "https://example.test/a2a" not in serialized


@patch.dict(
"os.environ",
{
"AGENTKIT_ACCESS_KEY": "ak-test",
"AGENTKIT_SECRET_KEY": "sk-test",
},
clear=False,
)
@patch("veadk.a2a.registry_client.requests.post")
def test_search_agent_cards_truncates_prompt_to_2048_utf8_bytes(post: Mock):
card = _agent_card()
post.return_value = _mock_response(
{
"ResponseMetadata": {"RequestId": "req-1"},
"Result": {"AgentCards": [json.dumps(card)], "TotalCount": 1},
}
)
prompt = "番茄炒蛋" * 300

search_agent_cards(
prompt,
3,
AgentKitA2ARegistryConfig(space_id="space-test"),
)

request_body = json.loads(post.call_args.kwargs["data"].decode("utf-8"))
request_prompt = request_body["Prompt"]
assert len(request_prompt.encode("utf-8")) <= 2048
assert request_prompt == truncate_utf8_bytes(prompt, 2048)


@patch.dict(
"os.environ",
{
Expand Down Expand Up @@ -236,9 +271,7 @@ def test_poll_task_returns_terminal_without_sleep(post: Mock, sleep: Mock):
"result": {
"id": "task-1",
"status": {"state": "completed"},
"artifacts": [
{"parts": [{"kind": "text", "text": "任务完成。"}]}
],
"artifacts": [{"parts": [{"kind": "text", "text": "任务完成。"}]}],
}
}
),
Expand Down Expand Up @@ -291,6 +324,77 @@ def test_a2a_registry_tool_descriptions_guide_model_flow():
assert "rejected" in poll_doc


@patch.dict(
"os.environ",
{
"AGENTKIT_ACCESS_KEY": "ak-test",
"AGENTKIT_SECRET_KEY": "sk-test",
},
clear=False,
)
@patch("veadk.a2a.registry_client.requests.post")
def test_build_remote_a2a_agent_tools_searches_gets_and_sends(post: Mock):
card = _agent_card()
post.side_effect = [
_mock_response(
{
"ResponseMetadata": {"RequestId": "search-req"},
"Result": {"AgentCards": [json.dumps(card)], "TotalCount": 1},
}
),
_mock_response(
{
"ResponseMetadata": {"RequestId": "get-req"},
"Result": {
"Id": "agent-id",
"Status": "running",
"AgentCard": json.dumps(card),
},
}
),
_mock_response(
{
"result": {
"kind": "message",
"parts": [{"kind": "text", "text": "今天北京晴。"}],
}
}
),
]
config = AgentKitA2ARegistryConfig(space_id="space-test", top_k=7)

tools = build_remote_a2a_agent_tools("北京天气", config)
assert [tool.__name__ for tool in tools] == ["remote_a2a_weather_a2a_agent"]
assert post.call_count == 1
assert post.call_args_list[0].kwargs["params"]["Action"] == "SearchAgentCards"

result = tools[0](input="北京天气")
tool_doc = " ".join((tools[0].__doc__ or "").split())

search_body = json.loads(post.call_args_list[0].kwargs["data"].decode("utf-8"))
assert search_body["TopK"] == 7
assert "Weather agent" in tool_doc
assert "a2a_registry_task_poll" in tool_doc
assert "Weather-A2A-Agent" in tool_doc
assert result["outcome"] == "success"
assert result["selected_agent"]["name"] == "Weather-A2A-Agent"
assert result["response"]["text"] == "今天北京晴。"
assert post.call_args_list[0].kwargs["params"]["Action"] == "SearchAgentCards"
assert post.call_args_list[1].kwargs["params"]["Action"] == "GetA2aAgent"
assert post.call_args_list[2].args[0] == "https://example.test/a2a"


@patch("veadk.tools.builtin_tools.a2a_registry.search_agent_cards")
def test_build_remote_a2a_agent_tools_returns_empty_on_search_failure(search: Mock):
search.side_effect = RegistryError("AGENT_NOT_FOUND", "no agents")

tools = build_remote_a2a_agent_tools(
"unknown task", AgentKitA2ARegistryConfig(space_id="space-test")
)

assert tools == []


@patch("veadk.tools.builtin_tools.a2a_registry.search_agent_cards")
def test_search_tool_accepts_prompt(search: Mock):
config = AgentKitA2ARegistryConfig(space_id="space-test")
Expand Down Expand Up @@ -363,11 +467,14 @@ def test_agentkit_http_error_uses_safe_diagnostics():
"401 Client Error", response=response
)

with patch.dict(
"os.environ",
{"AGENTKIT_ACCESS_KEY": "ak-test", "AGENTKIT_SECRET_KEY": "sk-test"},
clear=False,
), patch("veadk.a2a.registry_client.requests.post", return_value=response):
with (
patch.dict(
"os.environ",
{"AGENTKIT_ACCESS_KEY": "ak-test", "AGENTKIT_SECRET_KEY": "sk-test"},
clear=False,
),
patch("veadk.a2a.registry_client.requests.post", return_value=response),
):
with pytest.raises(RegistryError) as ctx:
search_agent_cards(
"weather",
Expand All @@ -378,9 +485,7 @@ def test_agentkit_http_error_uses_safe_diagnostics():
assert ctx.value.code == "AGENTKIT_OPENAPI_FAILED"
assert ctx.value.diagnostics["status_code"] == 401
assert ctx.value.diagnostics["request_id"] == "req-401"
assert ctx.value.diagnostics["response_error"]["Code"] == (
"SignatureDoesNotMatch"
)
assert ctx.value.diagnostics["response_error"]["Code"] == ("SignatureDoesNotMatch")
serialized = json.dumps(ctx.value.diagnostics, ensure_ascii=False)
assert "Authorization" not in serialized
assert "ak-test" not in serialized
Expand Down
9 changes: 9 additions & 0 deletions tests/cloud/test_harness_app_contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,15 @@ def test_registry_overrides_remount_registry_tools(self):
assert "_remove_a2a_registry_tools(" in source
assert "build_a2a_registry_tools(overridden_config)" in source

def test_registry_dynamic_tools_are_added_per_run(self):
utils_source = Path("veadk/cloud/harness_app/utils.py").read_text()
app_source = Path("veadk/cloud/harness_app/app.py").read_text()

assert "build_remote_a2a_agent_tools(prompt, registry_config)" in utils_source
assert "def spawn_harness_run_agent(" in utils_source
assert "has_a2a_registry_config(self.agent)" in app_source
assert "spawn_harness_run_agent(" in app_source


class TestRequestResponseSchemas:
def test_run_agent_request_fields(self):
Expand Down
19 changes: 17 additions & 2 deletions veadk/a2a/registry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
DEFAULT_TOP_K = 3
DEFAULT_TIMEOUT_MS = 60000
DEFAULT_POLL_INTERVAL_MS = 5000
SEARCH_PROMPT_MAX_BYTES = 2048
TERMINAL_STATES = {"completed", "failed", "canceled", "rejected"}


Expand Down Expand Up @@ -99,6 +100,8 @@ def search_agent_cards(
prompt: str,
top_k: int | None = None,
config: AgentKitA2ARegistryConfig | None = None,
*,
strip_prompt: bool = True,
) -> dict[str, Any]:
"""Search AgentKit A2A registry by prompt and return sanitized AgentCards."""

Expand All @@ -109,10 +112,12 @@ def search_agent_cards(
_require_space_id(config)

safe_top_k = max(1, min(int(top_k or config.top_k or DEFAULT_TOP_K), 20))
request_prompt = prompt.strip() if strip_prompt else prompt
request_prompt = truncate_utf8_bytes(request_prompt, SEARCH_PROMPT_MAX_BYTES)
response, request_duration_ms = _agentkit_post(
config,
"SearchAgentCards",
{"SpaceId": config.space_id, "Prompt": prompt.strip(), "TopK": safe_top_k},
{"SpaceId": config.space_id, "Prompt": request_prompt, "TopK": safe_top_k},
)
result = response.get("Result") or {}
raw_cards = result.get("AgentCards") or []
Expand Down Expand Up @@ -145,6 +150,15 @@ def search_agent_cards(
)


def truncate_utf8_bytes(text: str, max_bytes: int = SEARCH_PROMPT_MAX_BYTES) -> str:
"""Return ``text`` truncated to at most ``max_bytes`` UTF-8 bytes."""

encoded = text.encode("utf-8")
if len(encoded) <= max_bytes:
return text
return encoded[:max_bytes].decode("utf-8", errors="ignore")


def create_task(
agent_name: str,
input_text: str,
Expand Down Expand Up @@ -216,7 +230,8 @@ def _resolve_config(
space_id=config.space_id or env_config.space_id,
endpoint=config.endpoint or env_config.endpoint or DEFAULT_ENDPOINT,
version=config.version or env_config.version or DEFAULT_VERSION,
service_name=config.service_name or env_config.service_name
service_name=config.service_name
or env_config.service_name
or DEFAULT_SERVICE_NAME,
region=config.region or env_config.region or DEFAULT_REGION,
top_k=max(1, min(int(config.top_k or env_config.top_k or DEFAULT_TOP_K), 20)),
Expand Down
63 changes: 51 additions & 12 deletions veadk/cloud/harness_app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
* ``POST /harness/invoke`` — the harness entry point, with once-time ``harness``
overrides (clone the base agent, apply the override, run a throwaway runner).
* The Google ADK web/api routes (``/run``, ``/run_sse``, ``/list-apps``, session
management, …), served by an ``AdkWebServer`` over the single in-memory agent.
management, …), served by an ``AdkWebServer`` over the single in-memory agent;
``/run_sse`` is wrapped so harness overrides and per-turn registry tools can
be applied before the model call.
* The A2A protocol routes (agent card at ``/.well-known/agent-card.json`` plus the
JSON-RPC endpoint), mounted at ``/`` for the base agent.

The ADK and A2A surfaces serve the base agent only; once-time overrides are a
``/harness/invoke`` feature.
The A2A protocol surface serves the base agent only. Harness once-time overrides
are available through ``/harness/invoke`` and the wrapped ``/run_sse`` route.

Run with either:
python app.py
Expand Down Expand Up @@ -69,7 +71,8 @@
from veadk.cloud.harness_app.utils import (
SkillLoadError,
ToolLoadError,
spawn_harness_agent,
has_a2a_registry_config,
spawn_harness_run_agent,
)
from veadk.memory.short_term_memory import ShortTermMemory
from veadk.runner import Runner
Expand All @@ -85,6 +88,16 @@
)


def _content_text(content: Any) -> str:
parts = getattr(content, "parts", None) or []
texts = []
for part in parts:
text = getattr(part, "text", None)
if text:
texts.append(text)
return "\n".join(texts)


class _HarnessAgentLoader(BaseAgentLoader):
"""Serve the single env-built harness agent to the ADK web server.

Expand Down Expand Up @@ -122,7 +135,8 @@ class HarnessRunAgentRequest(RunAgentRequest):
"""ADK ``/run_sse`` request plus an optional once-time harness override.

When ``harness`` is set, the streaming run uses a spawned agent (base agent
cloned with the override applied); otherwise it uses the base agent.
cloned with the override applied). A registry-enabled base agent is also
cloned per turn so dynamic remote A2A tools stay scoped to that turn.
"""

harness: HarnessOverrides | None = None
Expand Down Expand Up @@ -206,8 +220,11 @@ async def invoke_harness(
with tempfile.TemporaryDirectory(
prefix="harness_invoke_"
) as work_dir:
agent = spawn_harness_agent(
self.agent, request.harness, download_dir=Path(work_dir)
agent = spawn_harness_run_agent(
self.agent,
request.prompt,
request.harness,
download_dir=Path(work_dir),
)
runner = Runner(
agent=agent,
Expand All @@ -220,6 +237,19 @@ async def invoke_harness(
session_id=request.run_agent_request.session_id,
run_config=run_config,
)
elif has_a2a_registry_config(self.agent):
agent = spawn_harness_run_agent(self.agent, request.prompt)
runner = Runner(
agent=agent,
short_term_memory=self.short_term_memory,
app_name=self.harness_name,
)
output = await runner.run(
messages=[request.prompt],
user_id=request.run_agent_request.user_id,
session_id=request.run_agent_request.session_id,
run_config=run_config,
)
else:
output = await self.runner.run(
messages=[request.prompt],
Expand Down Expand Up @@ -258,9 +288,10 @@ def _mount_run_sse_override(self):
"""Override ADK's ``/run_sse`` so it honors once-time harness overrides.

ADK's default ``/run_sse`` always runs the served (base) agent. We wrap it:
when the request carries a ``harness`` override, stream a *spawned* agent
(base cloned + override applied); otherwise **delegate to ADK's original
handler unchanged** — so the no-override path is identical to stock run_sse.
when the request carries a ``harness`` override or the base agent has an
A2A registry, stream a *spawned* agent (base cloned + override and/or
dynamic remote A2A tools applied); otherwise delegate to ADK's original
handler unchanged.
"""
# Capture ADK's default /run_sse handler to delegate to when there is no
# override (keeps the base path bit-for-bit ADK behavior).
Expand All @@ -274,7 +305,11 @@ def _mount_run_sse_override(self):

@self.app.post("/run_sse")
async def run_sse(req: HarnessRunAgentRequest):
if req.harness is None and adk_run_sse is not None:
if (
req.harness is None
and not has_a2a_registry_config(self.agent)
and adk_run_sse is not None
):
# No override -> exactly ADK's default /run_sse.
return await adk_run_sse(req)
return StreamingResponse(
Expand All @@ -297,22 +332,26 @@ async def _run_sse_events(self, req: "HarnessRunAgentRequest"):
streaming_mode=StreamingMode.SSE if req.streaming else StreamingMode.NONE
)
work_dir_ctx = None
prompt = _content_text(req.new_message)
try:
if req.harness is not None:
logger.info(f"run_sse once-time override: {req.harness}")
# Skills may download into a temp dir read from disk during the
# run, so keep it alive for the whole stream.
work_dir_ctx = tempfile.TemporaryDirectory(prefix="harness_run_sse_")
try:
agent = spawn_harness_agent(
agent = spawn_harness_run_agent(
self.agent,
prompt,
req.harness,
download_dir=Path(work_dir_ctx.name),
)
except (SkillLoadError, ToolLoadError) as e:
logger.error(f"Once-time override failed to load: {e}")
yield f"data: {json.dumps({'error': str(e)})}\n\n"
return
elif has_a2a_registry_config(self.agent):
agent = spawn_harness_run_agent(self.agent, prompt)
else:
agent = self.agent

Expand Down
Loading
Loading