diff --git a/python/packages/kagent-adk/src/kagent/adk/models/_bedrock.py b/python/packages/kagent-adk/src/kagent/adk/models/_bedrock.py index e1ebdbfb6..2518eb7f1 100644 --- a/python/packages/kagent-adk/src/kagent/adk/models/_bedrock.py +++ b/python/packages/kagent-adk/src/kagent/adk/models/_bedrock.py @@ -2,7 +2,10 @@ Uses boto3's Converse API which provides a consistent interface across all Bedrock-supported models (Anthropic, Meta, Mistral, Amazon, Cohere, etc.). -Authenticates via the standard AWS credential chain (env vars, IAM role, etc.). + +Supports two authentication methods: +- **Bearer token**: Set ``AWS_BEARER_TOKEN_BEDROCK`` env var (API key auth). +- **IAM credentials**: Standard AWS credential chain (env vars, IAM role, etc.). """ from __future__ import annotations @@ -16,6 +19,8 @@ from typing import TYPE_CHECKING, Any, AsyncGenerator, Optional import boto3 +from botocore import UNSIGNED +from botocore.config import Config from google.adk.models import BaseLlm from google.adk.models.llm_response import LlmResponse from google.genai import types @@ -54,12 +59,33 @@ def _sanitize_tool_id(tool_id: str, id_map: dict[str, str], counter: list[int]) return sanitized +def _inject_bearer_token(token: str, request, **kwargs): + """Event handler that injects a Bearer token into the Authorization header. + + Registered on the ``before-sign`` event so it runs after boto3 builds the + request but before SigV4 signing (which is disabled via UNSIGNED). + """ + request.headers["Authorization"] = f"Bearer {token}" + + def _get_bedrock_client(extra_headers: Optional[dict[str, str]] = None): region = os.environ.get("AWS_DEFAULT_REGION") or os.environ.get("AWS_REGION") or "us-east-1" kwargs: dict[str, Any] = {"region_name": region} if extra_headers: # boto3 doesn't support custom headers natively; log and ignore logger.warning("extra_headers are not supported for Bedrock models and will be ignored.") + + bearer_token = os.environ.get("AWS_BEARER_TOKEN_BEDROCK", "").strip() + if bearer_token: + logger.info("Using bearer token authentication for Bedrock") + kwargs["config"] = Config(signature_version=UNSIGNED) + client = boto3.client("bedrock-runtime", **kwargs) + client.meta.events.register( + "before-sign.bedrock-runtime.*", + lambda request, **kw: _inject_bearer_token(bearer_token, request, **kw), + ) + return client + return boto3.client("bedrock-runtime", **kwargs) diff --git a/python/packages/kagent-adk/tests/unittests/models/test_bedrock.py b/python/packages/kagent-adk/tests/unittests/models/test_bedrock.py index fab018520..ce447b21d 100644 --- a/python/packages/kagent-adk/tests/unittests/models/test_bedrock.py +++ b/python/packages/kagent-adk/tests/unittests/models/test_bedrock.py @@ -5,7 +5,7 @@ import pytest -from kagent.adk.models._bedrock import KAgentBedrockLlm, _get_bedrock_client +from kagent.adk.models._bedrock import KAgentBedrockLlm, _get_bedrock_client, _inject_bearer_token class TestGetBedrockClient: @@ -30,6 +30,53 @@ def test_defaults_to_us_east_1(self): _get_bedrock_client() assert mock_boto.call_args.kwargs["region_name"] == "us-east-1" + def test_bearer_token_uses_unsigned_config(self): + """When AWS_BEARER_TOKEN_BEDROCK is set, client uses UNSIGNED signature.""" + from botocore import UNSIGNED + + env = {k: v for k, v in __import__("os").environ.items() if k not in ("AWS_DEFAULT_REGION", "AWS_REGION")} + env["AWS_BEARER_TOKEN_BEDROCK"] = "test-token-123" + with mock.patch.dict("os.environ", env, clear=True): + with mock.patch("kagent.adk.models._bedrock.boto3.client") as mock_boto: + mock_client = mock.MagicMock() + mock_boto.return_value = mock_client + _get_bedrock_client() + config = mock_boto.call_args.kwargs["config"] + assert config.signature_version == UNSIGNED + + def test_bearer_token_registers_event_handler(self): + """When AWS_BEARER_TOKEN_BEDROCK is set, a before-sign handler is registered.""" + env = {k: v for k, v in __import__("os").environ.items() if k not in ("AWS_DEFAULT_REGION", "AWS_REGION")} + env["AWS_BEARER_TOKEN_BEDROCK"] = "test-token-123" + with mock.patch.dict("os.environ", env, clear=True): + with mock.patch("kagent.adk.models._bedrock.boto3.client") as mock_boto: + mock_client = mock.MagicMock() + mock_boto.return_value = mock_client + _get_bedrock_client() + mock_client.meta.events.register.assert_called_once() + call_args = mock_client.meta.events.register.call_args + assert call_args[0][0] == "before-sign.bedrock-runtime.*" + + def test_no_bearer_token_uses_standard_auth(self): + """When AWS_BEARER_TOKEN_BEDROCK is not set, standard credential chain is used.""" + env = {k: v for k, v in __import__("os").environ.items() if k not in ("AWS_DEFAULT_REGION", "AWS_REGION", "AWS_BEARER_TOKEN_BEDROCK")} + with mock.patch.dict("os.environ", env, clear=True): + with mock.patch("kagent.adk.models._bedrock.boto3.client") as mock_boto: + mock_client = mock.MagicMock() + mock_boto.return_value = mock_client + _get_bedrock_client() + assert "config" not in mock_boto.call_args.kwargs + mock_client.meta.events.register.assert_not_called() + + +class TestInjectBearerToken: + def test_injects_authorization_header(self): + """_inject_bearer_token sets the correct Authorization header.""" + mock_request = mock.MagicMock() + mock_request.headers = {} + _inject_bearer_token("my-secret-token", mock_request) + assert mock_request.headers["Authorization"] == "Bearer my-secret-token" + class TestKAgentBedrockLlm: def test_default_construction(self):