diff --git a/tests/a2a/test_registry_client.py b/tests/a2a/test_registry_client.py index 6cabaddb..8fd3315e 100644 --- a/tests/a2a/test_registry_client.py +++ b/tests/a2a/test_registry_client.py @@ -27,8 +27,12 @@ create_task, poll_task, search_agent_cards, + truncate_utf8_bytes, +) +from veadk.tools.builtin_tools.a2a_registry import ( + build_a2a_registry_tools, + build_remote_a2a_agent_tools, ) -from veadk.tools.builtin_tools.a2a_registry import build_a2a_registry_tools def _mock_response(payload: dict, status_code: int = 200) -> Mock: @@ -106,6 +110,37 @@ def test_search_agent_cards_sanitizes_and_signs_request(post: Mock): assert "https://example.test/a2a" not in serialized +@patch.dict( + "os.environ", + { + "AGENTKIT_ACCESS_KEY": "ak-test", + "AGENTKIT_SECRET_KEY": "sk-test", + }, + clear=False, +) +@patch("veadk.a2a.registry_client.requests.post") +def test_search_agent_cards_truncates_prompt_to_2048_utf8_bytes(post: Mock): + card = _agent_card() + post.return_value = _mock_response( + { + "ResponseMetadata": {"RequestId": "req-1"}, + "Result": {"AgentCards": [json.dumps(card)], "TotalCount": 1}, + } + ) + prompt = "番茄炒蛋" * 300 + + search_agent_cards( + prompt, + 3, + AgentKitA2ARegistryConfig(space_id="space-test"), + ) + + request_body = json.loads(post.call_args.kwargs["data"].decode("utf-8")) + request_prompt = request_body["Prompt"] + assert len(request_prompt.encode("utf-8")) <= 2048 + assert request_prompt == truncate_utf8_bytes(prompt, 2048) + + @patch.dict( "os.environ", { @@ -236,9 +271,7 @@ def test_poll_task_returns_terminal_without_sleep(post: Mock, sleep: Mock): "result": { "id": "task-1", "status": {"state": "completed"}, - "artifacts": [ - {"parts": [{"kind": "text", "text": "任务完成。"}]} - ], + "artifacts": [{"parts": [{"kind": "text", "text": "任务完成。"}]}], } } ), @@ -291,6 +324,77 @@ def test_a2a_registry_tool_descriptions_guide_model_flow(): assert "rejected" in poll_doc +@patch.dict( + "os.environ", + { + "AGENTKIT_ACCESS_KEY": "ak-test", + "AGENTKIT_SECRET_KEY": "sk-test", + }, + clear=False, +) +@patch("veadk.a2a.registry_client.requests.post") +def test_build_remote_a2a_agent_tools_searches_gets_and_sends(post: Mock): + card = _agent_card() + post.side_effect = [ + _mock_response( + { + "ResponseMetadata": {"RequestId": "search-req"}, + "Result": {"AgentCards": [json.dumps(card)], "TotalCount": 1}, + } + ), + _mock_response( + { + "ResponseMetadata": {"RequestId": "get-req"}, + "Result": { + "Id": "agent-id", + "Status": "running", + "AgentCard": json.dumps(card), + }, + } + ), + _mock_response( + { + "result": { + "kind": "message", + "parts": [{"kind": "text", "text": "今天北京晴。"}], + } + } + ), + ] + config = AgentKitA2ARegistryConfig(space_id="space-test", top_k=7) + + tools = build_remote_a2a_agent_tools("北京天气", config) + assert [tool.__name__ for tool in tools] == ["remote_a2a_weather_a2a_agent"] + assert post.call_count == 1 + assert post.call_args_list[0].kwargs["params"]["Action"] == "SearchAgentCards" + + result = tools[0](input="北京天气") + tool_doc = " ".join((tools[0].__doc__ or "").split()) + + search_body = json.loads(post.call_args_list[0].kwargs["data"].decode("utf-8")) + assert search_body["TopK"] == 7 + assert "Weather agent" in tool_doc + assert "a2a_registry_task_poll" in tool_doc + assert "Weather-A2A-Agent" in tool_doc + assert result["outcome"] == "success" + assert result["selected_agent"]["name"] == "Weather-A2A-Agent" + assert result["response"]["text"] == "今天北京晴。" + assert post.call_args_list[0].kwargs["params"]["Action"] == "SearchAgentCards" + assert post.call_args_list[1].kwargs["params"]["Action"] == "GetA2aAgent" + assert post.call_args_list[2].args[0] == "https://example.test/a2a" + + +@patch("veadk.tools.builtin_tools.a2a_registry.search_agent_cards") +def test_build_remote_a2a_agent_tools_returns_empty_on_search_failure(search: Mock): + search.side_effect = RegistryError("AGENT_NOT_FOUND", "no agents") + + tools = build_remote_a2a_agent_tools( + "unknown task", AgentKitA2ARegistryConfig(space_id="space-test") + ) + + assert tools == [] + + @patch("veadk.tools.builtin_tools.a2a_registry.search_agent_cards") def test_search_tool_accepts_prompt(search: Mock): config = AgentKitA2ARegistryConfig(space_id="space-test") @@ -363,11 +467,14 @@ def test_agentkit_http_error_uses_safe_diagnostics(): "401 Client Error", response=response ) - with patch.dict( - "os.environ", - {"AGENTKIT_ACCESS_KEY": "ak-test", "AGENTKIT_SECRET_KEY": "sk-test"}, - clear=False, - ), patch("veadk.a2a.registry_client.requests.post", return_value=response): + with ( + patch.dict( + "os.environ", + {"AGENTKIT_ACCESS_KEY": "ak-test", "AGENTKIT_SECRET_KEY": "sk-test"}, + clear=False, + ), + patch("veadk.a2a.registry_client.requests.post", return_value=response), + ): with pytest.raises(RegistryError) as ctx: search_agent_cards( "weather", @@ -378,9 +485,7 @@ def test_agentkit_http_error_uses_safe_diagnostics(): assert ctx.value.code == "AGENTKIT_OPENAPI_FAILED" assert ctx.value.diagnostics["status_code"] == 401 assert ctx.value.diagnostics["request_id"] == "req-401" - assert ctx.value.diagnostics["response_error"]["Code"] == ( - "SignatureDoesNotMatch" - ) + assert ctx.value.diagnostics["response_error"]["Code"] == ("SignatureDoesNotMatch") serialized = json.dumps(ctx.value.diagnostics, ensure_ascii=False) assert "Authorization" not in serialized assert "ak-test" not in serialized diff --git a/tests/cloud/test_harness_app_contract.py b/tests/cloud/test_harness_app_contract.py index 5383d812..6719c5cd 100644 --- a/tests/cloud/test_harness_app_contract.py +++ b/tests/cloud/test_harness_app_contract.py @@ -182,6 +182,15 @@ def test_registry_overrides_remount_registry_tools(self): assert "_remove_a2a_registry_tools(" in source assert "build_a2a_registry_tools(overridden_config)" in source + def test_registry_dynamic_tools_are_added_per_run(self): + utils_source = Path("veadk/cloud/harness_app/utils.py").read_text() + app_source = Path("veadk/cloud/harness_app/app.py").read_text() + + assert "build_remote_a2a_agent_tools(prompt, registry_config)" in utils_source + assert "def spawn_harness_run_agent(" in utils_source + assert "has_a2a_registry_config(self.agent)" in app_source + assert "spawn_harness_run_agent(" in app_source + class TestRequestResponseSchemas: def test_run_agent_request_fields(self): diff --git a/veadk/a2a/registry_client.py b/veadk/a2a/registry_client.py index dcaf46b5..b058984c 100644 --- a/veadk/a2a/registry_client.py +++ b/veadk/a2a/registry_client.py @@ -36,6 +36,7 @@ DEFAULT_TOP_K = 3 DEFAULT_TIMEOUT_MS = 60000 DEFAULT_POLL_INTERVAL_MS = 5000 +SEARCH_PROMPT_MAX_BYTES = 2048 TERMINAL_STATES = {"completed", "failed", "canceled", "rejected"} @@ -99,6 +100,8 @@ def search_agent_cards( prompt: str, top_k: int | None = None, config: AgentKitA2ARegistryConfig | None = None, + *, + strip_prompt: bool = True, ) -> dict[str, Any]: """Search AgentKit A2A registry by prompt and return sanitized AgentCards.""" @@ -109,10 +112,12 @@ def search_agent_cards( _require_space_id(config) safe_top_k = max(1, min(int(top_k or config.top_k or DEFAULT_TOP_K), 20)) + request_prompt = prompt.strip() if strip_prompt else prompt + request_prompt = truncate_utf8_bytes(request_prompt, SEARCH_PROMPT_MAX_BYTES) response, request_duration_ms = _agentkit_post( config, "SearchAgentCards", - {"SpaceId": config.space_id, "Prompt": prompt.strip(), "TopK": safe_top_k}, + {"SpaceId": config.space_id, "Prompt": request_prompt, "TopK": safe_top_k}, ) result = response.get("Result") or {} raw_cards = result.get("AgentCards") or [] @@ -145,6 +150,15 @@ def search_agent_cards( ) +def truncate_utf8_bytes(text: str, max_bytes: int = SEARCH_PROMPT_MAX_BYTES) -> str: + """Return ``text`` truncated to at most ``max_bytes`` UTF-8 bytes.""" + + encoded = text.encode("utf-8") + if len(encoded) <= max_bytes: + return text + return encoded[:max_bytes].decode("utf-8", errors="ignore") + + def create_task( agent_name: str, input_text: str, @@ -216,7 +230,8 @@ def _resolve_config( space_id=config.space_id or env_config.space_id, endpoint=config.endpoint or env_config.endpoint or DEFAULT_ENDPOINT, version=config.version or env_config.version or DEFAULT_VERSION, - service_name=config.service_name or env_config.service_name + service_name=config.service_name + or env_config.service_name or DEFAULT_SERVICE_NAME, region=config.region or env_config.region or DEFAULT_REGION, top_k=max(1, min(int(config.top_k or env_config.top_k or DEFAULT_TOP_K), 20)), diff --git a/veadk/cloud/harness_app/app.py b/veadk/cloud/harness_app/app.py index 9262cd9e..097d80d7 100644 --- a/veadk/cloud/harness_app/app.py +++ b/veadk/cloud/harness_app/app.py @@ -20,12 +20,14 @@ * ``POST /harness/invoke`` — the harness entry point, with once-time ``harness`` overrides (clone the base agent, apply the override, run a throwaway runner). * The Google ADK web/api routes (``/run``, ``/run_sse``, ``/list-apps``, session - management, …), served by an ``AdkWebServer`` over the single in-memory agent. + management, …), served by an ``AdkWebServer`` over the single in-memory agent; + ``/run_sse`` is wrapped so harness overrides and per-turn registry tools can + be applied before the model call. * The A2A protocol routes (agent card at ``/.well-known/agent-card.json`` plus the JSON-RPC endpoint), mounted at ``/`` for the base agent. -The ADK and A2A surfaces serve the base agent only; once-time overrides are a -``/harness/invoke`` feature. +The A2A protocol surface serves the base agent only. Harness once-time overrides +are available through ``/harness/invoke`` and the wrapped ``/run_sse`` route. Run with either: python app.py @@ -69,7 +71,8 @@ from veadk.cloud.harness_app.utils import ( SkillLoadError, ToolLoadError, - spawn_harness_agent, + has_a2a_registry_config, + spawn_harness_run_agent, ) from veadk.memory.short_term_memory import ShortTermMemory from veadk.runner import Runner @@ -85,6 +88,16 @@ ) +def _content_text(content: Any) -> str: + parts = getattr(content, "parts", None) or [] + texts = [] + for part in parts: + text = getattr(part, "text", None) + if text: + texts.append(text) + return "\n".join(texts) + + class _HarnessAgentLoader(BaseAgentLoader): """Serve the single env-built harness agent to the ADK web server. @@ -122,7 +135,8 @@ class HarnessRunAgentRequest(RunAgentRequest): """ADK ``/run_sse`` request plus an optional once-time harness override. When ``harness`` is set, the streaming run uses a spawned agent (base agent - cloned with the override applied); otherwise it uses the base agent. + cloned with the override applied). A registry-enabled base agent is also + cloned per turn so dynamic remote A2A tools stay scoped to that turn. """ harness: HarnessOverrides | None = None @@ -206,8 +220,11 @@ async def invoke_harness( with tempfile.TemporaryDirectory( prefix="harness_invoke_" ) as work_dir: - agent = spawn_harness_agent( - self.agent, request.harness, download_dir=Path(work_dir) + agent = spawn_harness_run_agent( + self.agent, + request.prompt, + request.harness, + download_dir=Path(work_dir), ) runner = Runner( agent=agent, @@ -220,6 +237,19 @@ async def invoke_harness( session_id=request.run_agent_request.session_id, run_config=run_config, ) + elif has_a2a_registry_config(self.agent): + agent = spawn_harness_run_agent(self.agent, request.prompt) + runner = Runner( + agent=agent, + short_term_memory=self.short_term_memory, + app_name=self.harness_name, + ) + output = await runner.run( + messages=[request.prompt], + user_id=request.run_agent_request.user_id, + session_id=request.run_agent_request.session_id, + run_config=run_config, + ) else: output = await self.runner.run( messages=[request.prompt], @@ -258,9 +288,10 @@ def _mount_run_sse_override(self): """Override ADK's ``/run_sse`` so it honors once-time harness overrides. ADK's default ``/run_sse`` always runs the served (base) agent. We wrap it: - when the request carries a ``harness`` override, stream a *spawned* agent - (base cloned + override applied); otherwise **delegate to ADK's original - handler unchanged** — so the no-override path is identical to stock run_sse. + when the request carries a ``harness`` override or the base agent has an + A2A registry, stream a *spawned* agent (base cloned + override and/or + dynamic remote A2A tools applied); otherwise delegate to ADK's original + handler unchanged. """ # Capture ADK's default /run_sse handler to delegate to when there is no # override (keeps the base path bit-for-bit ADK behavior). @@ -274,7 +305,11 @@ def _mount_run_sse_override(self): @self.app.post("/run_sse") async def run_sse(req: HarnessRunAgentRequest): - if req.harness is None and adk_run_sse is not None: + if ( + req.harness is None + and not has_a2a_registry_config(self.agent) + and adk_run_sse is not None + ): # No override -> exactly ADK's default /run_sse. return await adk_run_sse(req) return StreamingResponse( @@ -297,6 +332,7 @@ async def _run_sse_events(self, req: "HarnessRunAgentRequest"): streaming_mode=StreamingMode.SSE if req.streaming else StreamingMode.NONE ) work_dir_ctx = None + prompt = _content_text(req.new_message) try: if req.harness is not None: logger.info(f"run_sse once-time override: {req.harness}") @@ -304,8 +340,9 @@ async def _run_sse_events(self, req: "HarnessRunAgentRequest"): # run, so keep it alive for the whole stream. work_dir_ctx = tempfile.TemporaryDirectory(prefix="harness_run_sse_") try: - agent = spawn_harness_agent( + agent = spawn_harness_run_agent( self.agent, + prompt, req.harness, download_dir=Path(work_dir_ctx.name), ) @@ -313,6 +350,8 @@ async def _run_sse_events(self, req: "HarnessRunAgentRequest"): logger.error(f"Once-time override failed to load: {e}") yield f"data: {json.dumps({'error': str(e)})}\n\n" return + elif has_a2a_registry_config(self.agent): + agent = spawn_harness_run_agent(self.agent, prompt) else: agent = self.agent diff --git a/veadk/cloud/harness_app/utils.py b/veadk/cloud/harness_app/utils.py index cdef4175..e4674e30 100644 --- a/veadk/cloud/harness_app/utils.py +++ b/veadk/cloud/harness_app/utils.py @@ -21,6 +21,8 @@ from the skill hub and mounting them as an ADK skill toolset. * :func:`spawn_harness_agent` — temporary, one-off creation that clones the base agent and applies a per-request override (incremental tools/skills on top). +* :func:`spawn_harness_run_agent` — per-turn clone that also attaches dynamic + registry-discovered remote A2A tools for the current user message. """ import io @@ -70,6 +72,8 @@ "config_from_env", "init_harness_agent", "spawn_harness_agent", + "spawn_harness_run_agent", + "has_a2a_registry_config", ] @@ -422,6 +426,36 @@ def _apply_registry_overrides( setattr(agent, _REGISTRY_CONFIG_ATTR, overridden_config) +def has_a2a_registry_config(agent: Agent) -> bool: + """Return whether ``agent`` has an AgentKit A2A registry configured.""" + + return getattr(agent, _REGISTRY_CONFIG_ATTR, None) is not None + + +def _add_dynamic_a2a_agent_tools(agent: Agent, prompt: str) -> None: + registry_config = getattr(agent, _REGISTRY_CONFIG_ATTR, None) + if registry_config is None or not prompt or not prompt.strip(): + return + + from veadk.tools.builtin_tools.a2a_registry import build_remote_a2a_agent_tools + + dynamic_tools = build_remote_a2a_agent_tools(prompt, registry_config) + if not dynamic_tools: + return + + existing = {name for tool in agent.tools if (name := _tool_name(tool))} + attached = 0 + for tool in dynamic_tools: + name = _tool_name(tool) + if not name or name in existing: + continue + agent.tools.append(tool) + existing.add(name) + attached += 1 + if attached: + logger.info(f"Attached {attached} dynamic A2A agent tools for this turn.") + + def spawn_harness_agent( base_agent: Agent, overrides: HarnessOverrides, download_dir: Path | None = None ) -> Agent: @@ -462,3 +496,20 @@ def spawn_harness_agent( ) return cloned + + +def spawn_harness_run_agent( + base_agent: Agent, + prompt: str, + overrides: HarnessOverrides | None = None, + download_dir: Path | None = None, +) -> Agent: + """Clone a harness agent for one run and attach per-turn dynamic tools.""" + + if overrides is not None: + cloned = spawn_harness_agent(base_agent, overrides, download_dir=download_dir) + else: + cloned = base_agent.clone(update={}) + + _add_dynamic_a2a_agent_tools(cloned, prompt) + return cloned diff --git a/veadk/tools/builtin_tools/a2a_registry.py b/veadk/tools/builtin_tools/a2a_registry.py index 3d29c502..b46178b7 100644 --- a/veadk/tools/builtin_tools/a2a_registry.py +++ b/veadk/tools/builtin_tools/a2a_registry.py @@ -14,6 +14,7 @@ from __future__ import annotations +import re from collections.abc import Callable from typing import Any @@ -26,6 +27,9 @@ registry_config_from_env, search_agent_cards, ) +from veadk.utils.logger import get_logger + +logger = get_logger(__name__) def build_a2a_registry_tools( @@ -85,8 +89,9 @@ def a2a_registry_task_poll( final response. This tool calls A2A `tasks/get` once with the same `agent_name` and `task_id`. If `is_terminal` is false, do not create a new task; call this tool again with the same `task_id` until the task - reaches a terminal state. When the task is terminal, return the A2A - task's query result to the user. + reaches a terminal state such as `completed`, `failed`, `canceled`, or + `rejected`. When the task is terminal, return the A2A task's query + result to the user. """ try: @@ -101,3 +106,103 @@ def a2a_registry_task_poll( a2a_registry_task_create, a2a_registry_task_poll, ] + + +def build_remote_a2a_agent_tools( + prompt: str, + config: AgentKitA2ARegistryConfig | None = None, +) -> list[Callable[..., dict[str, Any]]]: + """Build one-turn remote A2A agent tools from a registry semantic search.""" + + resolved_config = config or registry_config_from_env() + try: + search_result = search_agent_cards( + prompt, + None, + resolved_config, + strip_prompt=False, + ) + except RegistryError as exc: + logger.warning(f"Skipping dynamic A2A agent tools: {exc.code}: {exc.message}") + return [] + + agents = search_result.get("agents") or [] + tools: list[Callable[..., dict[str, Any]]] = [] + seen_agent_names: set[str] = set() + seen_tool_names: set[str] = set() + for index, agent in enumerate(agents): + agent_name = agent.get("name") if isinstance(agent, dict) else "" + if not agent_name or agent_name in seen_agent_names: + continue + seen_agent_names.add(agent_name) + + tool_name = _unique_remote_a2a_tool_name(agent_name, index, seen_tool_names) + tools.append( + _make_remote_a2a_agent_tool( + tool_name=tool_name, + agent=agent, + config=resolved_config, + ) + ) + + return tools + + +def _unique_remote_a2a_tool_name( + agent_name: str, index: int, seen_names: set[str] +) -> str: + normalized = re.sub(r"[^0-9A-Za-z_]+", "_", agent_name).strip("_").lower() + if not normalized: + normalized = f"agent_{index + 1}" + if normalized[0].isdigit(): + normalized = f"agent_{normalized}" + + base_name = f"remote_a2a_{normalized}" + name = base_name + suffix = 2 + while name in seen_names: + name = f"{base_name}_{suffix}" + suffix += 1 + seen_names.add(name) + return name + + +def _make_remote_a2a_agent_tool( + *, + tool_name: str, + agent: dict[str, Any], + config: AgentKitA2ARegistryConfig, +) -> Callable[..., dict[str, Any]]: + agent_name = agent.get("name") or tool_name + agent_description = agent.get("description") or "" + skill_descriptions = "; ".join( + skill.get("description", "") + for skill in agent.get("skills", []) + if isinstance(skill, dict) and skill.get("description") + ) + + def remote_a2a_agent_tool(input: str, task_id: str | None = None) -> dict[str, Any]: + try: + if not input or not input.strip(): + return failure("INVALID_ARGUMENT", "input is required") + + return create_task(agent_name, input, task_id, config) + except RegistryError as exc: + return failure(exc.code, exc.message, exc.diagnostics) + except Exception as exc: # noqa: BLE001 + return failure("INTERNAL_ERROR", str(exc)) + + remote_a2a_agent_tool.__name__ = tool_name + remote_a2a_agent_tool.__qualname__ = tool_name + remote_a2a_agent_tool.__doc__ = ( + f"Remote A2A agent `{agent_name}`. " + f"{agent_description or 'Use this tool when the request matches this remote agent.'} " + f"Skills: {skill_descriptions or 'No skills listed.'} " + "Put the full user request in `input`. If continuing an existing remote " + "task, pass its `task_id`; otherwise leave `task_id` empty. The tool " + "returns either `response.text` or a remote `task.id`. If it returns a " + f"`task.id` without `response.text`, call `a2a_registry_task_poll` with " + f"`agent_name` set to `{agent_name}` and the returned `task_id` until " + "the task reaches a terminal state." + ) + return remote_a2a_agent_tool