Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions agent_assembly/client/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def __init__(
delegation_reason: str | None = None,
spawned_by_tool: str | None = None,
depth: int | None = None,
enforcement_mode: str | None = None,
) -> None:
"""
Initialize the GatewayClient.
Expand All @@ -36,6 +37,10 @@ def __init__(
delegation_reason: Human-readable reason for delegation
spawned_by_tool: Name of the tool that spawned this agent
depth: Spawn depth in the agent lineage tree
enforcement_mode: Per-agent governance posture sent to the gateway
at registration. ``None`` (the default) omits the field from
the request body so a legacy gateway sees the same wire shape
as before; the gateway then defaults to live enforcement.
"""
self.gateway_url = gateway_url.rstrip("/")
self.agent_id = agent_id
Expand All @@ -46,6 +51,7 @@ def __init__(
self.delegation_reason = delegation_reason
self.spawned_by_tool = spawned_by_tool
self.depth = depth
self.enforcement_mode = enforcement_mode
self._client: httpx.Client | None = None

@property
Expand Down Expand Up @@ -97,6 +103,8 @@ async def register_agent(self) -> dict:
body["spawned_by_tool"] = self.spawned_by_tool
if self.depth is not None:
body["depth"] = self.depth
if self.enforcement_mode is not None:
body["enforcement_mode"] = self.enforcement_mode
try:
response = self.client.post(
f"/agents/{self.agent_id}/register",
Expand Down
32 changes: 30 additions & 2 deletions agent_assembly/core/assembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,20 @@
RuntimeMode = Literal["auto", "ebpf", "proxy", "sdk-only"]
NetworkMode = Literal["ebpf", "proxy", "sdk-only"]

EnforcementMode = Literal["enforce", "observe", "disabled"]
"""Posture the governance gateway should apply to this agent's actions.

* ``"enforce"`` — default; deny blocks the action, redact strips secrets.
* ``"observe"`` — dry-run; the gateway records what *would* have happened
but lets every action through. Surfaced by ``aa audit list --dry-run-only``.
* ``"disabled"`` — policy evaluation skipped entirely. Hermetic test only.

Mirrors ``aa_core::EnforcementMode`` on the wire; uses the same snake_case
tokens the gateway expects in the registration body."""

_DEFAULT_AGENT_ID = "agent-assembly-default"
_VALID_RUNTIME_MODES = {"auto", "ebpf", "proxy", "sdk-only"}
_VALID_ENFORCEMENT_MODES: frozenset[EnforcementMode] = frozenset({"enforce", "observe", "disabled"})
_INIT_LOCK = Lock()
_ACTIVE_CONTEXT: AssemblyContext | None = None

Expand Down Expand Up @@ -115,6 +127,7 @@ def init_assembly(
delegation_reason: str | None = None,
spawned_by_tool: str | None = None,
depth: int | None = None,
enforcement_mode: EnforcementMode | None = None,
) -> AssemblyContext:
"""Initialize the Agent Assembly SDK runtime for this process.

Expand All @@ -125,10 +138,18 @@ def init_assembly(
through the resolver chain (env → config file → local default with
optional auto-start) per Epic 17 S-G — see
``agent_assembly.core.gateway_resolver``.

:param enforcement_mode: Per-agent governance posture sent to the gateway
at registration (see :data:`EnforcementMode`). Defaults to ``None``,
which omits the field from the registration body — the gateway then
applies its server-side default (live ``enforce``). Pass ``"observe"``
to register the agent in dry-run / sandbox mode: every action
proceeds and the gateway records would-be violations as shadow audit
events.
"""
gateway_url = resolve_gateway_url(gateway_url)
api_key = resolve_api_key(api_key)
_validate_inputs(gateway_url=gateway_url, mode=mode)
_validate_inputs(gateway_url=gateway_url, mode=mode, enforcement_mode=enforcement_mode)
if delegation_reason is not None and len(delegation_reason) > 256:
raise ValueError("delegation_reason must be <= 256 characters")

Expand Down Expand Up @@ -164,6 +185,7 @@ def init_assembly(
delegation_reason=delegation_reason,
spawned_by_tool=spawned_by_tool,
depth=depth,
enforcement_mode=enforcement_mode,
)

registered_adapters: list[FrameworkAdapter] = []
Expand All @@ -190,11 +212,17 @@ def init_assembly(
return context


def _validate_inputs(*, gateway_url: str, mode: RuntimeMode) -> None:
def _validate_inputs(
*, gateway_url: str, mode: RuntimeMode, enforcement_mode: EnforcementMode | None = None
) -> None:
if not gateway_url:
raise ConfigurationError("gateway_url is required")
if mode not in _VALID_RUNTIME_MODES:
raise ConfigurationError("mode must be one of: auto, ebpf, proxy, sdk-only")
if enforcement_mode is not None and enforcement_mode not in _VALID_ENFORCEMENT_MODES:
raise ConfigurationError(
f"enforcement_mode must be one of: enforce, observe, disabled (got: {enforcement_mode!r})"
)


def _register_adapters(
Expand Down
56 changes: 56 additions & 0 deletions test/unit/client/test_gateway_topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,59 @@ async def test_register_agent_includes_depth_when_set() -> None:
_, call_kwargs = mock_post.call_args
body = call_kwargs.get("json") or {}
assert body["depth"] == 3


# ── enforcement_mode wire-shape (AAASM-1560) ─────────────────────────────────


@pytest.mark.asyncio
async def test_register_agent_sends_enforcement_mode_when_set() -> None:
"""Registering an agent with enforcement_mode=observe puts the snake_case
string on the wire so the gateway's REST → gRPC bridge can map it to
RegisterRequest.enforcement_mode (proto enum) per AAASM-1555/1557.
"""
client = GatewayClient(
gateway_url="http://gw.test",
agent_id="experimental-agent",
api_key="key",
enforcement_mode="observe",
)
mock_post = MagicMock(return_value=_make_ok_response())
with patch.object(
type(client),
"client",
new_callable=lambda: property(lambda self: MagicMock(post=mock_post)),
):
await client.register_agent()

_, kwargs = mock_post.call_args
body = kwargs.get("json") or {}
assert body["enforcement_mode"] == "observe"


@pytest.mark.asyncio
async def test_register_agent_omits_enforcement_mode_when_none() -> None:
"""A client constructed without enforcement_mode (the pre-feature path)
must NOT include the key in the body — keeps the wire shape identical
to before so a legacy gateway that doesn't know about the field still
accepts the registration cleanly.
"""
client = GatewayClient(
gateway_url="http://gw.test",
agent_id="legacy-agent",
api_key="key",
# no enforcement_mode kwarg
)
mock_post = MagicMock(return_value=_make_ok_response())
with patch.object(
type(client),
"client",
new_callable=lambda: property(lambda self: MagicMock(post=mock_post)),
):
await client.register_agent()

_, kwargs = mock_post.call_args
body = kwargs.get("json")
# body may be None (no fields set) or a dict that omits the key.
if body is not None:
assert "enforcement_mode" not in body
83 changes: 83 additions & 0 deletions test/unit/test_assembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,3 +431,86 @@ def test_init_assembly_delegation_reason_too_long_raises(
api_key="test-api-key",
delegation_reason="x" * 257,
)


# ── enforcement_mode parameter (AAASM-1560) ──────────────────────────────────


@pytest.mark.parametrize("mode", ["enforce", "observe", "disabled"])
def test_init_assembly_enforcement_mode_forwarded_to_client(
monkeypatch: pytest.MonkeyPatch,
mode: str,
) -> None:
"""All three valid EnforcementMode values land on the GatewayClient."""
monkeypatch.setattr(core_assembly, "_register_adapters", lambda **kwargs: [])
monkeypatch.setattr(
core_assembly,
"_start_network_layer",
lambda **kwargs: ("sdk-only", core_assembly._noop_shutdown),
)

context = init_assembly(
gateway_url="http://localhost:8080",
api_key="test-api-key",
agent_id=f"agent-{mode}",
enforcement_mode=mode, # type: ignore[arg-type]
)

try:
assert context.client.enforcement_mode == mode
finally:
context.shutdown()


def test_init_assembly_enforcement_mode_invalid_raises_configuration_error(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Unknown enforcement_mode string fails fast at init time.

Catches typos like ``"obesrve"`` before the agent silently registers
under live enforcement — the security-conscious default.
"""
from agent_assembly.exceptions import ConfigurationError

monkeypatch.setattr(core_assembly, "_register_adapters", lambda **kwargs: [])
monkeypatch.setattr(
core_assembly,
"_start_network_layer",
lambda **kwargs: ("sdk-only", core_assembly._noop_shutdown),
)

with pytest.raises(ConfigurationError, match="enforcement_mode must be one of"):
init_assembly(
gateway_url="http://localhost:8080",
api_key="test-api-key",
enforcement_mode="invalid-mode", # type: ignore[arg-type]
)


def test_init_assembly_enforcement_mode_defaults_to_none_to_preserve_wire_shape(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Omitting enforcement_mode keeps the pre-feature registration body shape.

The default is `None` (not `"enforce"`) so a pre-feature SDK caller's
`register_agent()` request emits a body that omits the field entirely.
The gateway then applies its server-side default of live enforcement,
so semantic behaviour is identical to before.
"""
monkeypatch.setattr(core_assembly, "_register_adapters", lambda **kwargs: [])
monkeypatch.setattr(
core_assembly,
"_start_network_layer",
lambda **kwargs: ("sdk-only", core_assembly._noop_shutdown),
)

context = init_assembly(
gateway_url="http://localhost:8080",
api_key="test-api-key",
agent_id="default-mode-agent",
)

try:
assert context.client.enforcement_mode is None
finally:
context.shutdown()