diff --git a/agent_assembly/client/gateway.py b/agent_assembly/client/gateway.py index 01a1bce..c75f7d9 100644 --- a/agent_assembly/client/gateway.py +++ b/agent_assembly/client/gateway.py @@ -22,6 +22,7 @@ def __init__( delegation_reason: str | None = None, spawned_by_tool: str | None = None, depth: int | None = None, + enforcement_mode: str | None = None, ) -> None: """ Initialize the GatewayClient. @@ -36,6 +37,10 @@ def __init__( delegation_reason: Human-readable reason for delegation spawned_by_tool: Name of the tool that spawned this agent depth: Spawn depth in the agent lineage tree + enforcement_mode: Per-agent governance posture sent to the gateway + at registration. ``None`` (the default) omits the field from + the request body so a legacy gateway sees the same wire shape + as before; the gateway then defaults to live enforcement. """ self.gateway_url = gateway_url.rstrip("/") self.agent_id = agent_id @@ -46,6 +51,7 @@ def __init__( self.delegation_reason = delegation_reason self.spawned_by_tool = spawned_by_tool self.depth = depth + self.enforcement_mode = enforcement_mode self._client: httpx.Client | None = None @property @@ -97,6 +103,8 @@ async def register_agent(self) -> dict: body["spawned_by_tool"] = self.spawned_by_tool if self.depth is not None: body["depth"] = self.depth + if self.enforcement_mode is not None: + body["enforcement_mode"] = self.enforcement_mode try: response = self.client.post( f"/agents/{self.agent_id}/register", diff --git a/agent_assembly/core/assembly.py b/agent_assembly/core/assembly.py index 5af1210..1ad5913 100644 --- a/agent_assembly/core/assembly.py +++ b/agent_assembly/core/assembly.py @@ -19,8 +19,20 @@ RuntimeMode = Literal["auto", "ebpf", "proxy", "sdk-only"] NetworkMode = Literal["ebpf", "proxy", "sdk-only"] +EnforcementMode = Literal["enforce", "observe", "disabled"] +"""Posture the governance gateway should apply to this agent's actions. + +* ``"enforce"`` — default; deny blocks the action, redact strips secrets. +* ``"observe"`` — dry-run; the gateway records what *would* have happened + but lets every action through. Surfaced by ``aa audit list --dry-run-only``. +* ``"disabled"`` — policy evaluation skipped entirely. Hermetic test only. + +Mirrors ``aa_core::EnforcementMode`` on the wire; uses the same snake_case +tokens the gateway expects in the registration body.""" + _DEFAULT_AGENT_ID = "agent-assembly-default" _VALID_RUNTIME_MODES = {"auto", "ebpf", "proxy", "sdk-only"} +_VALID_ENFORCEMENT_MODES: frozenset[EnforcementMode] = frozenset({"enforce", "observe", "disabled"}) _INIT_LOCK = Lock() _ACTIVE_CONTEXT: AssemblyContext | None = None @@ -115,6 +127,7 @@ def init_assembly( delegation_reason: str | None = None, spawned_by_tool: str | None = None, depth: int | None = None, + enforcement_mode: EnforcementMode | None = None, ) -> AssemblyContext: """Initialize the Agent Assembly SDK runtime for this process. @@ -125,10 +138,18 @@ def init_assembly( through the resolver chain (env → config file → local default with optional auto-start) per Epic 17 S-G — see ``agent_assembly.core.gateway_resolver``. + + :param enforcement_mode: Per-agent governance posture sent to the gateway + at registration (see :data:`EnforcementMode`). Defaults to ``None``, + which omits the field from the registration body — the gateway then + applies its server-side default (live ``enforce``). Pass ``"observe"`` + to register the agent in dry-run / sandbox mode: every action + proceeds and the gateway records would-be violations as shadow audit + events. """ gateway_url = resolve_gateway_url(gateway_url) api_key = resolve_api_key(api_key) - _validate_inputs(gateway_url=gateway_url, mode=mode) + _validate_inputs(gateway_url=gateway_url, mode=mode, enforcement_mode=enforcement_mode) if delegation_reason is not None and len(delegation_reason) > 256: raise ValueError("delegation_reason must be <= 256 characters") @@ -164,6 +185,7 @@ def init_assembly( delegation_reason=delegation_reason, spawned_by_tool=spawned_by_tool, depth=depth, + enforcement_mode=enforcement_mode, ) registered_adapters: list[FrameworkAdapter] = [] @@ -190,11 +212,17 @@ def init_assembly( return context -def _validate_inputs(*, gateway_url: str, mode: RuntimeMode) -> None: +def _validate_inputs( + *, gateway_url: str, mode: RuntimeMode, enforcement_mode: EnforcementMode | None = None +) -> None: if not gateway_url: raise ConfigurationError("gateway_url is required") if mode not in _VALID_RUNTIME_MODES: raise ConfigurationError("mode must be one of: auto, ebpf, proxy, sdk-only") + if enforcement_mode is not None and enforcement_mode not in _VALID_ENFORCEMENT_MODES: + raise ConfigurationError( + f"enforcement_mode must be one of: enforce, observe, disabled (got: {enforcement_mode!r})" + ) def _register_adapters( diff --git a/test/unit/client/test_gateway_topology.py b/test/unit/client/test_gateway_topology.py index 7b938c6..837edaa 100644 --- a/test/unit/client/test_gateway_topology.py +++ b/test/unit/client/test_gateway_topology.py @@ -106,3 +106,59 @@ async def test_register_agent_includes_depth_when_set() -> None: _, call_kwargs = mock_post.call_args body = call_kwargs.get("json") or {} assert body["depth"] == 3 + + +# ── enforcement_mode wire-shape (AAASM-1560) ───────────────────────────────── + + +@pytest.mark.asyncio +async def test_register_agent_sends_enforcement_mode_when_set() -> None: + """Registering an agent with enforcement_mode=observe puts the snake_case + string on the wire so the gateway's REST → gRPC bridge can map it to + RegisterRequest.enforcement_mode (proto enum) per AAASM-1555/1557. + """ + client = GatewayClient( + gateway_url="http://gw.test", + agent_id="experimental-agent", + api_key="key", + enforcement_mode="observe", + ) + mock_post = MagicMock(return_value=_make_ok_response()) + with patch.object( + type(client), + "client", + new_callable=lambda: property(lambda self: MagicMock(post=mock_post)), + ): + await client.register_agent() + + _, kwargs = mock_post.call_args + body = kwargs.get("json") or {} + assert body["enforcement_mode"] == "observe" + + +@pytest.mark.asyncio +async def test_register_agent_omits_enforcement_mode_when_none() -> None: + """A client constructed without enforcement_mode (the pre-feature path) + must NOT include the key in the body — keeps the wire shape identical + to before so a legacy gateway that doesn't know about the field still + accepts the registration cleanly. + """ + client = GatewayClient( + gateway_url="http://gw.test", + agent_id="legacy-agent", + api_key="key", + # no enforcement_mode kwarg + ) + mock_post = MagicMock(return_value=_make_ok_response()) + with patch.object( + type(client), + "client", + new_callable=lambda: property(lambda self: MagicMock(post=mock_post)), + ): + await client.register_agent() + + _, kwargs = mock_post.call_args + body = kwargs.get("json") + # body may be None (no fields set) or a dict that omits the key. + if body is not None: + assert "enforcement_mode" not in body diff --git a/test/unit/test_assembly.py b/test/unit/test_assembly.py index 8bf0c1a..b9dbb89 100644 --- a/test/unit/test_assembly.py +++ b/test/unit/test_assembly.py @@ -431,3 +431,86 @@ def test_init_assembly_delegation_reason_too_long_raises( api_key="test-api-key", delegation_reason="x" * 257, ) + + +# ── enforcement_mode parameter (AAASM-1560) ────────────────────────────────── + + +@pytest.mark.parametrize("mode", ["enforce", "observe", "disabled"]) +def test_init_assembly_enforcement_mode_forwarded_to_client( + monkeypatch: pytest.MonkeyPatch, + mode: str, +) -> None: + """All three valid EnforcementMode values land on the GatewayClient.""" + monkeypatch.setattr(core_assembly, "_register_adapters", lambda **kwargs: []) + monkeypatch.setattr( + core_assembly, + "_start_network_layer", + lambda **kwargs: ("sdk-only", core_assembly._noop_shutdown), + ) + + context = init_assembly( + gateway_url="http://localhost:8080", + api_key="test-api-key", + agent_id=f"agent-{mode}", + enforcement_mode=mode, # type: ignore[arg-type] + ) + + try: + assert context.client.enforcement_mode == mode + finally: + context.shutdown() + + +def test_init_assembly_enforcement_mode_invalid_raises_configuration_error( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Unknown enforcement_mode string fails fast at init time. + + Catches typos like ``"obesrve"`` before the agent silently registers + under live enforcement — the security-conscious default. + """ + from agent_assembly.exceptions import ConfigurationError + + monkeypatch.setattr(core_assembly, "_register_adapters", lambda **kwargs: []) + monkeypatch.setattr( + core_assembly, + "_start_network_layer", + lambda **kwargs: ("sdk-only", core_assembly._noop_shutdown), + ) + + with pytest.raises(ConfigurationError, match="enforcement_mode must be one of"): + init_assembly( + gateway_url="http://localhost:8080", + api_key="test-api-key", + enforcement_mode="invalid-mode", # type: ignore[arg-type] + ) + + +def test_init_assembly_enforcement_mode_defaults_to_none_to_preserve_wire_shape( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Omitting enforcement_mode keeps the pre-feature registration body shape. + + The default is `None` (not `"enforce"`) so a pre-feature SDK caller's + `register_agent()` request emits a body that omits the field entirely. + The gateway then applies its server-side default of live enforcement, + so semantic behaviour is identical to before. + """ + monkeypatch.setattr(core_assembly, "_register_adapters", lambda **kwargs: []) + monkeypatch.setattr( + core_assembly, + "_start_network_layer", + lambda **kwargs: ("sdk-only", core_assembly._noop_shutdown), + ) + + context = init_assembly( + gateway_url="http://localhost:8080", + api_key="test-api-key", + agent_id="default-mode-agent", + ) + + try: + assert context.client.enforcement_mode is None + finally: + context.shutdown()