diff --git a/packages/apps/src/microsoft_teams/apps/auth/__init__.py b/packages/apps/src/microsoft_teams/apps/auth/__init__.py index f64ff6335..bf7b38ffe 100644 --- a/packages/apps/src/microsoft_teams/apps/auth/__init__.py +++ b/packages/apps/src/microsoft_teams/apps/auth/__init__.py @@ -4,6 +4,6 @@ """ from .remote_function_jwt_middleware import validate_remote_function_request -from .token_validator import TokenValidator +from .token_validator import InboundActivityTokenValidator, TokenValidator -__all__ = ["TokenValidator", "validate_remote_function_request"] +__all__ = ["InboundActivityTokenValidator", "TokenValidator", "validate_remote_function_request"] diff --git a/packages/apps/src/microsoft_teams/apps/auth/token_validator.py b/packages/apps/src/microsoft_teams/apps/auth/token_validator.py index a4f1df1d4..437500600 100644 --- a/packages/apps/src/microsoft_teams/apps/auth/token_validator.py +++ b/packages/apps/src/microsoft_teams/apps/auth/token_validator.py @@ -14,6 +14,8 @@ from microsoft_teams.api.auth.cloud_environment import PUBLIC, CloudEnvironment JWT_LEEWAY_SECONDS = 300 # Allowable clock skew when validating JWTs +_MAX_ENTRA_VALIDATOR_CACHE_SIZE = 100 +ENTRA_V1_ISSUER_PREFIX = "https://sts.windows.net/" logger = logging.getLogger(__name__) @@ -113,7 +115,7 @@ def for_entra( # are still issued with the v1 issuer. # See: https://learn.microsoft.com/en-us/entra/identity-platform/access-tokens valid_issuers.append(f"{env.login_endpoint}/{tenant_id}/v2.0") - valid_issuers.append(f"https://sts.windows.net/{tenant_id}/") + valid_issuers.append(f"{ENTRA_V1_ISSUER_PREFIX}{tenant_id}/") else: logger.warning( "No tenant_id provided for Entra token validation. " @@ -222,3 +224,61 @@ def _validate_scope(self, payload: Dict[str, Any], required_scope: str) -> None: if required_scope not in scope_set: logger.error(f"Token missing required scope: {required_scope}") raise jwt.InvalidTokenError(f"Token missing required scope: {required_scope}") + + +class InboundActivityTokenValidator: + """Validator for inbound Teams activities. + + Classic bot activities use Bot Framework connector tokens. Agent ID activities use + Entra tokens whose audience is the agent identity blueprint app ID. + """ + + def __init__(self, app_id: str, cloud: Optional[CloudEnvironment] = None): + self._app_id = app_id + self._cloud = cloud or PUBLIC + self._service_validator = TokenValidator.for_service(app_id, cloud=self._cloud) + self._entra_validators_by_tenant: dict[str, TokenValidator] = {} + + async def validate_token(self, raw_token: str, service_url: Optional[str] = None) -> Dict[str, Any]: + if not raw_token: + logger.error("No token provided") + raise jwt.InvalidTokenError("No token provided") + + unverified_payload = jwt.decode(raw_token, algorithms=["RS256"], options={"verify_signature": False}) + issuer = unverified_payload.get("iss", "") + if self._is_entra_issuer(issuer): + return await self._validate_entra_token(raw_token, unverified_payload) + + return await self._service_validator.validate_token(raw_token, service_url) + + def _is_entra_issuer(self, issuer: Any) -> bool: + if not isinstance(issuer, str): + return False + + return issuer.startswith(self._cloud.login_endpoint) or issuer.startswith(ENTRA_V1_ISSUER_PREFIX) + + async def _validate_entra_token(self, raw_token: str, unverified_payload: Dict[str, Any]) -> Dict[str, Any]: + tenant_id = unverified_payload.get("tid") + if not tenant_id or not isinstance(tenant_id, str): + raise jwt.InvalidTokenError("Entra inbound token is missing tid") + + validator = self._get_entra_validator(tenant_id) + # TODO: Agent ID inbound Entra tokens currently do not include serviceurl. Revisit service URL + # validation for this path once the platform defines a signed service URL claim or equivalent. + return await validator.validate_token(raw_token) + + def _get_entra_validator(self, tenant_id: str) -> TokenValidator: + cached_validator = self._entra_validators_by_tenant.get(tenant_id) + if cached_validator: + return cached_validator + + validator = TokenValidator.for_entra( + self._app_id, + tenant_id, + cloud=self._cloud, + ) + self._entra_validators_by_tenant[tenant_id] = validator + if len(self._entra_validators_by_tenant) > _MAX_ENTRA_VALIDATOR_CACHE_SIZE: + oldest_tenant_id = next(iter(self._entra_validators_by_tenant)) + self._entra_validators_by_tenant.pop(oldest_tenant_id) + return validator diff --git a/packages/apps/src/microsoft_teams/apps/http/http_server.py b/packages/apps/src/microsoft_teams/apps/http/http_server.py index e11c411a9..eb7a5129a 100644 --- a/packages/apps/src/microsoft_teams/apps/http/http_server.py +++ b/packages/apps/src/microsoft_teams/apps/http/http_server.py @@ -13,7 +13,7 @@ from microsoft_teams.api.auth.json_web_token import JsonWebToken from pydantic import BaseModel -from ..auth import TokenValidator +from ..auth import InboundActivityTokenValidator from ..events import ActivityEvent, CoreActivity from .adapter import HttpRequest, HttpResponse, HttpServerAdapter @@ -43,7 +43,7 @@ def __init__(self, adapter: HttpServerAdapter, messaging_endpoint: str = "/api/m raise ValueError("messaging_endpoint must be a non-empty path starting with '/'.") self._messaging_endpoint = normalized_endpoint self._on_request: Optional[Callable[[ActivityEvent], Awaitable[InvokeResponse[Any]]]] = None - self._token_validator: Optional[TokenValidator] = None + self._token_validator: Optional[InboundActivityTokenValidator] = None self._skip_auth: bool = False self._cloud: CloudEnvironment = PUBLIC self._initialized: bool = False @@ -89,7 +89,7 @@ def initialize( app_id = getattr(credentials, "client_id", None) if credentials else None if app_id and not skip_auth: - self._token_validator = TokenValidator.for_service( + self._token_validator = InboundActivityTokenValidator( app_id, cloud=self._cloud, ) diff --git a/packages/apps/tests/test_token_validator.py b/packages/apps/tests/test_token_validator.py index eaa1bab60..f949be367 100644 --- a/packages/apps/tests/test_token_validator.py +++ b/packages/apps/tests/test_token_validator.py @@ -3,11 +3,12 @@ Licensed under the MIT License. """ -from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import jwt import pytest -from microsoft_teams.apps.auth.token_validator import TokenValidator +from microsoft_teams.apps.auth import token_validator as token_validator_module +from microsoft_teams.apps.auth.token_validator import InboundActivityTokenValidator, TokenValidator # pyright: basic @@ -407,3 +408,94 @@ async def test_validate_entra_token_v1_sts_issuer(self, mock_jwks_client): result = await validator.validate_token("v1.entra.token") assert result["iss"] == "https://sts.windows.net/test-tenant-id/" assert result["ver"] == "1.0" + + +class TestInboundActivityTokenValidator: + @pytest.mark.asyncio + async def test_validate_token_uses_service_validator_for_bot_framework_tokens(self): + validator = InboundActivityTokenValidator("test-app-id") + validator._service_validator.validate_token = AsyncMock(return_value={"iss": "https://api.botframework.com"}) + + with patch("jwt.decode", return_value={"iss": "https://api.botframework.com"}) as decode: + result = await validator.validate_token("bot-token", "https://service.example") + + assert result == {"iss": "https://api.botframework.com"} + decode.assert_called_once_with("bot-token", algorithms=["RS256"], options={"verify_signature": False}) + validator._service_validator.validate_token.assert_called_once_with("bot-token", "https://service.example") + + @pytest.mark.asyncio + async def test_validate_token_uses_entra_validator_for_v2_issuer(self): + validator = InboundActivityTokenValidator("test-app-id") + validator._service_validator.validate_token = AsyncMock() + entra_validator = MagicMock() + entra_validator.validate_token = AsyncMock(return_value={"tid": "tenant-id"}) + + with patch.object(validator, "_get_entra_validator", return_value=entra_validator) as get_validator: + with patch( + "jwt.decode", + return_value={"iss": "https://login.microsoftonline.com/tenant-id/v2.0", "tid": "tenant-id"}, + ): + result = await validator.validate_token("entra-token", "https://service.example") + + assert result == {"tid": "tenant-id"} + get_validator.assert_called_once_with("tenant-id") + entra_validator.validate_token.assert_called_once_with("entra-token") + validator._service_validator.validate_token.assert_not_called() + + @pytest.mark.asyncio + async def test_validate_token_uses_entra_validator_for_v1_sts_issuer(self): + validator = InboundActivityTokenValidator("test-app-id") + entra_validator = MagicMock() + entra_validator.validate_token = AsyncMock(return_value={"tid": "tenant-id"}) + + with patch.object(validator, "_get_entra_validator", return_value=entra_validator) as get_validator: + with patch("jwt.decode", return_value={"iss": "https://sts.windows.net/tenant-id/", "tid": "tenant-id"}): + result = await validator.validate_token("entra-v1-token") + + assert result == {"tid": "tenant-id"} + get_validator.assert_called_once_with("tenant-id") + entra_validator.validate_token.assert_called_once_with("entra-v1-token") + + @pytest.mark.asyncio + async def test_validate_token_rejects_entra_token_without_tid(self): + validator = InboundActivityTokenValidator("test-app-id") + + with patch("jwt.decode", return_value={"iss": "https://login.microsoftonline.com/tenant-id/v2.0"}): + with pytest.raises(jwt.InvalidTokenError, match="missing tid"): + await validator.validate_token("entra-token") + + @pytest.mark.asyncio + async def test_validate_token_rejects_empty_token_before_routing_decode(self): + validator = InboundActivityTokenValidator("test-app-id") + + with patch("jwt.decode") as decode: + with pytest.raises(jwt.InvalidTokenError, match="No token provided"): + await validator.validate_token("") + + decode.assert_not_called() + + def test_get_entra_validator_caches_by_tenant(self): + validator = InboundActivityTokenValidator("test-app-id") + + with patch("microsoft_teams.apps.auth.token_validator.TokenValidator.for_entra") as for_entra: + for_entra.return_value = MagicMock() + + first = validator._get_entra_validator("tenant-id") + second = validator._get_entra_validator("tenant-id") + + assert first is second + for_entra.assert_called_once_with("test-app-id", "tenant-id", cloud=validator._cloud) + + def test_get_entra_validator_cache_is_bounded(self): + validator = InboundActivityTokenValidator("test-app-id") + + with patch("microsoft_teams.apps.auth.token_validator.TokenValidator.for_entra") as for_entra: + for_entra.side_effect = lambda _app_id, tenant_id, **_kwargs: MagicMock(name=tenant_id) + + for index in range(token_validator_module._MAX_ENTRA_VALIDATOR_CACHE_SIZE + 1): + validator._get_entra_validator(f"tenant-{index}") + + assert len(validator._entra_validators_by_tenant) == token_validator_module._MAX_ENTRA_VALIDATOR_CACHE_SIZE + assert "tenant-0" not in validator._entra_validators_by_tenant + last_tenant_id = f"tenant-{token_validator_module._MAX_ENTRA_VALIDATOR_CACHE_SIZE}" + assert last_tenant_id in validator._entra_validators_by_tenant