From 508e2e17425e0d2ef189f32e64680cb2899d8b9b Mon Sep 17 00:00:00 2001 From: Alexander Tsyplikhin Date: Wed, 20 May 2026 17:32:22 -0700 Subject: [PATCH 01/44] feat(registry): paginate discovery/listDevices to bound NATS payloads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit At fleet scale (~1400 devices) the JSON-encoded reply from `discovery/listDevices` crosses NATS's default 1 MB max_payload, breaking every agent-tools call that resolves a selector — `discover()`, `discover_labels()`, and `broadcast()` all fail because the registry loads the full fleet before any filter and tries to publish it in a single message. This change makes the reply size independent of fleet size: - New optional `offset` / `limit` params on `discovery/listDevices`; new reply fields `next_offset` and `total_matched`. Omitting `limit` preserves the legacy single-shot reply for older clients. - `DeviceRegistry.list_devices_page(...)` returns a slice plus metadata alongside the existing `list_devices(...)` (untouched). - Both `RegistryClient` implementations (edge + server-internal) loop internally over 100-device pages — externally unchanged, but no single reply ever carries the whole fleet. Page size is tunable via `DEVICE_CONNECT_LIST_PAGE_SIZE`. A new `list_devices_page()` exposes metadata to callers that want explicit paging. - Client backward compat: a server that doesn't return `next_offset` causes the page loop to exit after one request. - Defense-in-depth: `setup_deployment.sh` now bakes `max_payload: 8MB` into the NATS config it generates, so operators who rerun it preserve the ceiling. Tests: 6 new in test_registry_service.py (including a full 1400-device walk), 5 new in test_registry_client.py covering paging, legacy-server fallback, filter forwarding, and explicit-page metadata. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../device_connect_edge/registry_client.py | 103 ++++++++++++-- .../tests/test_registry_client.py | 118 ++++++++++++++++ .../device_connect_server/registry/client.py | 78 ++++++++-- .../registry/service/main.py | 59 ++++++-- .../registry/service/registry.py | 58 +++++++- .../security_infra/setup_deployment.sh | 3 + .../test_registry_service.py | 133 ++++++++++++++++++ 7 files changed, 515 insertions(+), 37 deletions(-) diff --git a/packages/device-connect-edge/device_connect_edge/registry_client.py b/packages/device-connect-edge/device_connect_edge/registry_client.py index 91dad47..6e4bc4f 100644 --- a/packages/device-connect-edge/device_connect_edge/registry_client.py +++ b/packages/device-connect-edge/device_connect_edge/registry_client.py @@ -30,15 +30,24 @@ import asyncio import json import logging +import os import time import uuid -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple from device_connect_edge.messaging.base import MessagingClient from device_connect_edge.messaging.exceptions import RequestTimeoutError logger = logging.getLogger(__name__) +# Per-page chunk size when the client transparently iterates the full fleet. +# Sized to keep one JSON-RPC reply well under the default NATS max_payload +# of 1 MB even when device records carry rich function schemas (~10 KB each +# in the worst case observed): 100 * ~10 KB = ~1 MB, with the actual upper +# bound for typical records (~6 KB) landing at ~600 KB. Operators on +# unusually rich schemas can drop this via DEVICE_CONNECT_LIST_PAGE_SIZE. +_DEFAULT_LIST_PAGE_SIZE = int(os.getenv("DEVICE_CONNECT_LIST_PAGE_SIZE", "100")) + class RegistryClient: """JSON-RPC client for the device registry service. @@ -157,8 +166,82 @@ async def list_devices( self._cache, device_type, location, capabilities, ) + # Page through the registry transparently so the wire never carries + # a fleet-sized reply (NATS default max_payload is 1 MB and was + # being exceeded at ~1400 devices). Older servers that don't + # understand ``limit`` just return everything in one reply with + # ``next_offset`` absent, so the loop exits after a single + # iteration — fully backward compatible. + devices: List[Dict[str, Any]] = [] + offset = 0 + while True: + page, next_offset, _total = await self._list_devices_page( + device_type=device_type, + location=location, + capabilities=capabilities, + offset=offset, + limit=_DEFAULT_LIST_PAGE_SIZE, + timeout=timeout, + ) + devices.extend(page) + if next_offset is None: + break + offset = next_offset + logger.debug("Discovered %d devices from registry", len(devices)) + + # Update cache (store unfiltered if we fetched without filters) + if ( + self._cache_ttl > 0 + and device_type is None + and location is None + and not capabilities + ): + self._cache = devices + self._cache_time = time.time() + + return devices + + async def list_devices_page( + self, + *, + offset: int = 0, + limit: int = _DEFAULT_LIST_PAGE_SIZE, + device_type: Optional[str] = None, + location: Optional[str] = None, + capabilities: Optional[List[str]] = None, + timeout: Optional[float] = None, + ) -> Tuple[List[Dict[str, Any]], Optional[int], int]: + """Fetch a single page of devices with pagination metadata. + + Use this when you want to display a paged UI or stream results; + most callers should stick with :meth:`list_devices`, which loops + internally and returns the full fleet. + + Returns: + ``(devices, next_offset, total_matched)`` where ``next_offset`` + is ``None`` on the final page. + """ + return await self._list_devices_page( + device_type=device_type, + location=location, + capabilities=capabilities, + offset=offset, + limit=limit, + timeout=timeout, + ) + + async def _list_devices_page( + self, + *, + device_type: Optional[str], + location: Optional[str], + capabilities: Optional[List[str]], + offset: int, + limit: int, + timeout: Optional[float], + ) -> Tuple[List[Dict[str, Any]], Optional[int], int]: subject = f"device-connect.{self._tenant}.discovery" - params: Dict[str, Any] = {} + params: Dict[str, Any] = {"offset": int(offset), "limit": int(limit)} if device_type: params["device_type"] = device_type if location: @@ -167,20 +250,12 @@ async def list_devices( params["capabilities"] = capabilities result = await self._request( - subject, - "discovery/listDevices", - params if params else None, - timeout, + subject, "discovery/listDevices", params, timeout, ) devices = result.get("devices", []) - logger.debug("Discovered %d devices from registry", len(devices)) - - # Update cache (store unfiltered if we fetched without filters) - if self._cache_ttl > 0 and not params: - self._cache = devices - self._cache_time = time.time() - - return devices + next_offset = result.get("next_offset") + total = result.get("total_matched", len(devices)) + return devices, next_offset, total async def get_device( self, diff --git a/packages/device-connect-edge/tests/test_registry_client.py b/packages/device-connect-edge/tests/test_registry_client.py index b295924..28708f7 100644 --- a/packages/device-connect-edge/tests/test_registry_client.py +++ b/packages/device-connect-edge/tests/test_registry_client.py @@ -99,3 +99,121 @@ async def test_request_raises_after_all_retries_exhausted(self, mock_sleep): assert messaging.request.call_count == 3 + +class TestListDevicesPagination: + """Verify list_devices transparently pages through the registry.""" + + @staticmethod + def _paged_responses(total: int, page_size: int): + """Build the sequence of NATS reply bytes the server would emit.""" + devices = [{"device_id": f"dev-{i:04d}"} for i in range(total)] + responses = [] + for start in range(0, total, page_size): + page = devices[start:start + page_size] + end = start + page_size + next_offset = end if end < total else None + responses.append(json.dumps({ + "jsonrpc": "2.0", + "id": "rpc-test", + "result": { + "devices": page, + "next_offset": next_offset, + "total_matched": total, + }, + }).encode()) + if not responses: + # Empty fleet: still need one round-trip + responses.append(json.dumps({ + "jsonrpc": "2.0", + "id": "rpc-test", + "result": {"devices": [], "next_offset": None, "total_matched": 0}, + }).encode()) + return responses + + @pytest.mark.asyncio + async def test_list_devices_pages_through_full_fleet(self): + """1400 devices should arrive across multiple round-trips.""" + client, messaging = _make_client() + messaging.request = AsyncMock(side_effect=self._paged_responses(1400, 100)) + + devices = await client.list_devices() + + assert len(devices) == 1400 + assert [d["device_id"] for d in devices] == [ + f"dev-{i:04d}" for i in range(1400) + ] + # 1400 / 100 = 14 round-trips + assert messaging.request.call_count == 14 + + @pytest.mark.asyncio + async def test_list_devices_passes_offset_and_limit_in_params(self): + """Each request must carry the pagination params on the wire.""" + client, messaging = _make_client() + messaging.request = AsyncMock(side_effect=self._paged_responses(250, 100)) + + await client.list_devices() + + offsets = [] + limits = [] + for call_args in messaging.request.call_args_list: + payload = json.loads(call_args.args[1]) + offsets.append(payload["params"]["offset"]) + limits.append(payload["params"]["limit"]) + + assert offsets == [0, 100, 200] + assert all(lim == 100 for lim in limits) + + @pytest.mark.asyncio + async def test_list_devices_legacy_server_single_reply(self): + """Server without pagination (no next_offset) terminates after 1 call.""" + client, messaging = _make_client() + # Legacy reply shape: devices only, no pagination metadata. + legacy = json.dumps({ + "jsonrpc": "2.0", + "id": "rpc-test", + "result": {"devices": [{"device_id": "a"}, {"device_id": "b"}]}, + }).encode() + messaging.request = AsyncMock(return_value=legacy) + + devices = await client.list_devices() + + assert len(devices) == 2 + # next_offset absent => loop exits after one request + assert messaging.request.call_count == 1 + + @pytest.mark.asyncio + async def test_list_devices_page_returns_metadata(self): + """list_devices_page exposes next_offset and total_matched to caller.""" + client, messaging = _make_client() + reply = json.dumps({ + "jsonrpc": "2.0", + "id": "rpc-test", + "result": { + "devices": [{"device_id": "a"}, {"device_id": "b"}], + "next_offset": 2, + "total_matched": 10, + }, + }).encode() + messaging.request = AsyncMock(return_value=reply) + + page, next_offset, total = await client.list_devices_page( + offset=0, limit=2, + ) + + assert len(page) == 2 + assert next_offset == 2 + assert total == 10 + + @pytest.mark.asyncio + async def test_list_devices_forwards_filters(self): + """device_type / location filters must accompany pagination params.""" + client, messaging = _make_client() + messaging.request = AsyncMock(side_effect=self._paged_responses(0, 100)) + + await client.list_devices(device_type="camera", location="lab-A") + + payload = json.loads(messaging.request.call_args.args[1]) + assert payload["params"]["device_type"] == "camera" + assert payload["params"]["location"] == "lab-A" + assert payload["params"]["offset"] == 0 + assert payload["params"]["limit"] == 100 diff --git a/packages/device-connect-server/device_connect_server/registry/client.py b/packages/device-connect-server/device_connect_server/registry/client.py index 1ba12bd..796315b 100644 --- a/packages/device-connect-server/device_connect_server/registry/client.py +++ b/packages/device-connect-server/device_connect_server/registry/client.py @@ -29,8 +29,14 @@ import json import logging +import os import time -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple + +# Per-page chunk size when ``list_devices`` transparently iterates the +# fleet. Matches the edge client default; see device_connect_edge's +# registry_client._DEFAULT_LIST_PAGE_SIZE for the rationale. +_DEFAULT_LIST_PAGE_SIZE = int(os.getenv("DEVICE_CONNECT_LIST_PAGE_SIZE", "100")) from device_connect_edge.messaging import MessagingClient from device_connect_edge.messaging.config import MessagingConfig @@ -195,9 +201,64 @@ async def list_devices( # Get only cameras cameras = await registry.list_devices(device_type="camera") """ - subject = f"device-connect.{self._tenant}.discovery" + # Page through the registry so the reply never exceeds NATS's + # max_payload limit at fleet scale. Older servers that ignore + # ``limit`` still work — they return everything in one reply with + # ``next_offset`` absent and the loop exits immediately. + devices: List[Dict[str, Any]] = [] + offset = 0 + while True: + page, next_offset, _total = await self._list_devices_page( + device_type=device_type, + location=location, + capabilities=capabilities, + offset=offset, + limit=_DEFAULT_LIST_PAGE_SIZE, + timeout=timeout, + ) + devices.extend(page) + if next_offset is None: + break + offset = next_offset + self._logger.debug("Listed %d devices", len(devices)) + return devices + + async def list_devices_page( + self, + *, + offset: int = 0, + limit: int = _DEFAULT_LIST_PAGE_SIZE, + device_type: Optional[str] = None, + location: Optional[str] = None, + capabilities: Optional[List[str]] = None, + timeout: Optional[float] = None, + ) -> Tuple[List[Dict[str, Any]], Optional[int], int]: + """Fetch one page of devices with pagination metadata. - params: Dict[str, Any] = {} + Returns ``(devices, next_offset, total_matched)``; ``next_offset`` + is ``None`` on the final page. + """ + return await self._list_devices_page( + device_type=device_type, + location=location, + capabilities=capabilities, + offset=offset, + limit=limit, + timeout=timeout, + ) + + async def _list_devices_page( + self, + *, + device_type: Optional[str], + location: Optional[str], + capabilities: Optional[List[str]], + offset: int, + limit: int, + timeout: Optional[float], + ) -> Tuple[List[Dict[str, Any]], Optional[int], int]: + subject = f"device-connect.{self._tenant}.discovery" + params: Dict[str, Any] = {"offset": int(offset), "limit": int(limit)} if device_type: params["device_type"] = device_type if location: @@ -206,15 +267,12 @@ async def list_devices( params["capabilities"] = capabilities result = await self._request( - subject, - "discovery/listDevices", - params if params else None, - timeout, + subject, "discovery/listDevices", params, timeout, ) - devices = result.get("devices", []) - self._logger.debug("Listed %d devices", len(devices)) - return devices + next_offset = result.get("next_offset") + total = result.get("total_matched", len(devices)) + return devices, next_offset, total async def get_device( self, diff --git a/packages/device-connect-server/device_connect_server/registry/service/main.py b/packages/device-connect-server/device_connect_server/registry/service/main.py index 9661d6e..b9e7a10 100644 --- a/packages/device-connect-server/device_connect_server/registry/service/main.py +++ b/packages/device-connect-server/device_connect_server/registry/service/main.py @@ -378,19 +378,54 @@ async def rpc_discovery(data: bytes, reply: Optional[str]): if method == "discovery/listDevices": device_type = params.get("device_type") location = params.get("location") - devs = await asyncio.to_thread( - registry.list_devices, tenant, - device_type=device_type, location=location, - ) - if acl_manager: - requester_id = params.get("requester_id", "") - devs = acl_manager.filter_visible_devices( - requester_id, devs, tenant=tenant + # Pagination: ``offset`` and ``limit`` are optional. When + # ``limit`` is omitted the legacy single-shot reply is + # returned for backward compat with older clients. New + # clients pass ``limit`` to keep each reply under the NATS + # max_payload regardless of fleet size. + paged = "limit" in params + if paged: + page, next_offset, total = await asyncio.to_thread( + registry.list_devices_page, tenant, + device_type=device_type, + location=location, + offset=int(params.get("offset", 0) or 0), + limit=int(params["limit"]), + ) + if acl_manager: + requester_id = params.get("requester_id", "") + # ACL filtering runs after pagination — devices the + # caller is not allowed to see are dropped from the + # page rather than from the unsliced fleet, so the + # totals reported here may be larger than what + # eventually reaches the requester. That's + # acceptable: ACL is opt-in and primarily a server- + # side hint, not a strict cardinality contract. + page = acl_manager.filter_visible_devices( + requester_id, page, tenant=tenant + ) + await messaging.publish( + reply, + build_rpc_response(payload.get("id"), { + "devices": page, + "next_offset": next_offset, + "total_matched": total, + }) + ) + else: + devs = await asyncio.to_thread( + registry.list_devices, tenant, + device_type=device_type, location=location, + ) + if acl_manager: + requester_id = params.get("requester_id", "") + devs = acl_manager.filter_visible_devices( + requester_id, devs, tenant=tenant + ) + await messaging.publish( + reply, + build_rpc_response(payload.get("id"), {"devices": devs}) ) - await messaging.publish( - reply, - build_rpc_response(payload.get("id"), {"devices": devs}) - ) elif method == "discovery/getDevice": device_id = params.get("device_id") if not device_id: diff --git a/packages/device-connect-server/device_connect_server/registry/service/registry.py b/packages/device-connect-server/device_connect_server/registry/service/registry.py index ff759d7..2f525a7 100644 --- a/packages/device-connect-server/device_connect_server/registry/service/registry.py +++ b/packages/device-connect-server/device_connect_server/registry/service/registry.py @@ -20,7 +20,7 @@ import logging import os from dataclasses import dataclass, field -from typing import Any, Dict, List +from typing import Any, Dict, List, Tuple import etcd3gw @@ -171,6 +171,44 @@ def list_devices( ] return devices + def list_devices_page( + self, + tenant: str, + *, + device_type: str | None = None, + location: str | None = None, + offset: int = 0, + limit: int | None = None, + ) -> Tuple[List[dict], int | None, int]: + """Return one page of registered device payloads plus pagination metadata. + + Slices the filtered fleet by ``offset`` and ``limit``. Existing etcd + load behavior is unchanged — we still scan the tenant prefix — but the + reply carries only the requested page, keeping NATS payloads bounded + regardless of fleet size. Stability: device order follows etcd key + order, which is deterministic for a steady-state fleet; concurrent + registrations/expirations can shift records across pages. + + Returns: + (devices_page, next_offset, total_matched). + ``next_offset`` is None when the page reaches the end of the + filtered list. ``total_matched`` is the size after filtering and + before pagination. + """ + all_devices = self.list_devices( + tenant, device_type=device_type, location=location, + ) + total = len(all_devices) + safe_offset = max(0, int(offset or 0)) + if limit is None or limit <= 0: + page = all_devices[safe_offset:] + next_offset: int | None = None + else: + end = safe_offset + int(limit) + page = all_devices[safe_offset:end] + next_offset = end if end < total else None + return page, next_offset, total + def get_device(self, tenant: str, device_id: str) -> dict | None: """Return a single device payload by direct key lookup (O(1)). @@ -254,6 +292,24 @@ def list_devices( return _REGISTRY.list_devices(tenant, device_type=device_type, location=location) +def list_devices_page( + tenant: str, + *, + device_type: str | None = None, + location: str | None = None, + offset: int = 0, + limit: int | None = None, +) -> Tuple[List[dict], int | None, int]: + """Module-level wrapper for :meth:`DeviceRegistry.list_devices_page`.""" + return _REGISTRY.list_devices_page( + tenant, + device_type=device_type, + location=location, + offset=offset, + limit=limit, + ) + + def get_device(tenant: str, device_id: str) -> dict | None: """Return a single device by direct key lookup.""" return _REGISTRY.get_device(tenant, device_id) diff --git a/packages/device-connect-server/security_infra/setup_deployment.sh b/packages/device-connect-server/security_infra/setup_deployment.sh index c93d058..b6ee409 100755 --- a/packages/device-connect-server/security_infra/setup_deployment.sh +++ b/packages/device-connect-server/security_infra/setup_deployment.sh @@ -107,6 +107,9 @@ cat >> "${OUTPUT_CONF}" < Date: Wed, 20 May 2026 18:00:53 -0700 Subject: [PATCH 02/44] lint: move _DEFAULT_LIST_PAGE_SIZE below imports (ruff E402) The constant was sitting between the stdlib and device_connect_edge import groups, which trips ruff's "module level import not at top of file" rule. Move it after the imports to match the edge client layout. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../device_connect_server/registry/client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/device-connect-server/device_connect_server/registry/client.py b/packages/device-connect-server/device_connect_server/registry/client.py index 796315b..e116fd7 100644 --- a/packages/device-connect-server/device_connect_server/registry/client.py +++ b/packages/device-connect-server/device_connect_server/registry/client.py @@ -33,14 +33,14 @@ import time from typing import Any, Dict, List, Optional, Tuple +from device_connect_edge.messaging import MessagingClient +from device_connect_edge.messaging.config import MessagingConfig + # Per-page chunk size when ``list_devices`` transparently iterates the # fleet. Matches the edge client default; see device_connect_edge's # registry_client._DEFAULT_LIST_PAGE_SIZE for the rationale. _DEFAULT_LIST_PAGE_SIZE = int(os.getenv("DEVICE_CONNECT_LIST_PAGE_SIZE", "100")) -from device_connect_edge.messaging import MessagingClient -from device_connect_edge.messaging.config import MessagingConfig - class RegistryClient: """Client for querying the device registry. From 2349130b6c0b0014af603fec252b41cf6b273e2c Mon Sep 17 00:00:00 2001 From: Alexander Tsyplikhin Date: Thu, 21 May 2026 03:04:38 +0000 Subject: [PATCH 03/44] feat(registry): cap listDevices replies server-side to prevent NATS payload overflow At fleet scale (1400+ devices, ~13KB per record) the legacy unpaginated listDevices path would emit a single multi-megabyte reply that exceeded the NATS broker's max_payload, killing every concurrent caller. The new _LIST_DEVICES_MAX_LIMIT (default 200, env DC_LIST_DEVICES_MAX_LIMIT) clamps every reply regardless of what the caller passed, so an unpaginated or misconfigured client now gets the first page plus next_offset instead of an overflow error. The dead "legacy single-shot" else-branch is removed. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../registry/service/main.py | 102 ++++++++++-------- 1 file changed, 57 insertions(+), 45 deletions(-) diff --git a/packages/device-connect-server/device_connect_server/registry/service/main.py b/packages/device-connect-server/device_connect_server/registry/service/main.py index b9e7a10..c6c4bc3 100644 --- a/packages/device-connect-server/device_connect_server/registry/service/main.py +++ b/packages/device-connect-server/device_connect_server/registry/service/main.py @@ -53,6 +53,16 @@ _DEFAULT_TTL = 15 # Fallback TTL when device doesn't report one _PULL_REGISTRATION_TIMEOUT = 5 # Timeout for requestRegistration RPC +# Server-side cap for discovery/listDevices page sizes. NATS rejects any +# single publish larger than the broker's max_payload, so the registry +# must guarantee no reply ever exceeds it regardless of what `limit` the +# caller asked for. Empirically a flashlight-auditorium phone record is +# ~13KB serialized, so 200 records ~= 2.6MB, which fits comfortably under +# the 32MB broker ceiling while keeping per-page round-trip small. Old +# clients that omit `limit` entirely also get this cap applied so they +# can't trigger an unpaginated fleet snapshot at scale. +_LIST_DEVICES_MAX_LIMIT = int(os.getenv("DC_LIST_DEVICES_MAX_LIMIT", "200")) + def _resolve_tenants() -> List[str]: """Resolve the list of tenants to handle. @@ -378,54 +388,56 @@ async def rpc_discovery(data: bytes, reply: Optional[str]): if method == "discovery/listDevices": device_type = params.get("device_type") location = params.get("location") - # Pagination: ``offset`` and ``limit`` are optional. When - # ``limit`` is omitted the legacy single-shot reply is - # returned for backward compat with older clients. New - # clients pass ``limit`` to keep each reply under the NATS - # max_payload regardless of fleet size. - paged = "limit" in params - if paged: - page, next_offset, total = await asyncio.to_thread( - registry.list_devices_page, tenant, - device_type=device_type, - location=location, - offset=int(params.get("offset", 0) or 0), - limit=int(params["limit"]), - ) - if acl_manager: - requester_id = params.get("requester_id", "") - # ACL filtering runs after pagination — devices the - # caller is not allowed to see are dropped from the - # page rather than from the unsliced fleet, so the - # totals reported here may be larger than what - # eventually reaches the requester. That's - # acceptable: ACL is opt-in and primarily a server- - # side hint, not a strict cardinality contract. - page = acl_manager.filter_visible_devices( - requester_id, page, tenant=tenant - ) - await messaging.publish( - reply, - build_rpc_response(payload.get("id"), { - "devices": page, - "next_offset": next_offset, - "total_matched": total, - }) - ) + # Pagination: ``offset`` and ``limit`` are optional. The + # registry always paginates server-side and clamps the + # page size to ``_LIST_DEVICES_MAX_LIMIT`` so a single + # reply can never exceed the broker's max_payload, even + # for old clients that don't pass ``limit``. Callers see + # ``next_offset`` and must loop through pages — the edge + # ``RegistryClient.list_devices`` already does this + # transparently for callers that want the whole fleet. + requested_limit = params.get("limit") + if requested_limit is None: + effective_limit = _LIST_DEVICES_MAX_LIMIT else: - devs = await asyncio.to_thread( - registry.list_devices, tenant, - device_type=device_type, location=location, - ) - if acl_manager: - requester_id = params.get("requester_id", "") - devs = acl_manager.filter_visible_devices( - requester_id, devs, tenant=tenant + try: + requested_limit = int(requested_limit) + except (TypeError, ValueError): + requested_limit = _LIST_DEVICES_MAX_LIMIT + if requested_limit <= 0: + effective_limit = _LIST_DEVICES_MAX_LIMIT + else: + effective_limit = min( + requested_limit, _LIST_DEVICES_MAX_LIMIT, ) - await messaging.publish( - reply, - build_rpc_response(payload.get("id"), {"devices": devs}) + page, next_offset, total = await asyncio.to_thread( + registry.list_devices_page, tenant, + device_type=device_type, + location=location, + offset=int(params.get("offset", 0) or 0), + limit=effective_limit, + ) + if acl_manager: + requester_id = params.get("requester_id", "") + # ACL filtering runs after pagination — devices the + # caller is not allowed to see are dropped from the + # page rather than from the unsliced fleet, so the + # totals reported here may be larger than what + # eventually reaches the requester. That's + # acceptable: ACL is opt-in and primarily a + # server-side hint, not a strict cardinality + # contract. + page = acl_manager.filter_visible_devices( + requester_id, page, tenant=tenant ) + await messaging.publish( + reply, + build_rpc_response(payload.get("id"), { + "devices": page, + "next_offset": next_offset, + "total_matched": total, + }) + ) elif method == "discovery/getDevice": device_id = params.get("device_id") if not device_id: From 6193c3eac6518b827ea2978653a2be57f787dd5c Mon Sep 17 00:00:00 2001 From: Alexander Tsyplikhin Date: Thu, 21 May 2026 03:04:58 +0000 Subject: [PATCH 04/44] feat(portal): lazy-load device detail rows on the live dashboard Previously the live-devices fragment rendered hidden detail markup (functions, events, identity, raw-JSON pre-block) for every device in every poll. At 1400 phones each poll returned ~18 MB on a 3-second interval, blocking the portal event loop and saturating client traffic. The fragment now ships only summary rows (~500 KB at 1400 devices) plus an htmx placeholder per row; the detail markup is fetched from a new GET /api/devices/{device_id}/live-detail endpoint when a row is expanded. The etcd prefix-scan is wrapped in asyncio.to_thread so it no longer blocks other portal requests. Poll cadence is relaxed from 3 s to 10 s. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../portal/templates/dashboard.html | 2 +- .../templates/devices/_live_detail.html | 111 +++++++++++++++++ .../portal/templates/devices/_live_table.html | 116 +----------------- .../portal/views/dashboard.py | 42 ++++++- 4 files changed, 158 insertions(+), 113 deletions(-) create mode 100644 packages/device-connect-server/device_connect_server/portal/templates/devices/_live_detail.html diff --git a/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html b/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html index 368d9a9..39a8a52 100644 --- a/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html +++ b/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html @@ -47,7 +47,7 @@

Live Devices

Loading devices...
diff --git a/packages/device-connect-server/device_connect_server/portal/templates/devices/_live_detail.html b/packages/device-connect-server/device_connect_server/portal/templates/devices/_live_detail.html new file mode 100644 index 0000000..d2d2b77 --- /dev/null +++ b/packages/device-connect-server/device_connect_server/portal/templates/devices/_live_detail.html @@ -0,0 +1,111 @@ +
+ + {# Capabilities: functions #} + {% set caps = device.capabilities or {} %} +
+

Functions (RPCs)

+ {% if caps.functions %} +
+ {% for fn in caps.functions %} +
+ {{ fn.name }}({% for pname, pspec in (fn.parameters.properties or {}).items() %}{{ pname }}{% if pspec.default is defined %}={{ pspec.default | tojson }}{% endif %}{% if not loop.last %}, {% endif %}{% endfor %}) + {% if fn.description %} +

{{ fn.description }}

+ {% endif %} + +
+ {% endfor %} +
+ {% else %} +

No RPC functions registered

+ {% endif %} +
+ + {# Capabilities: events #} +
+

Events

+ {% if caps.events %} +
+ {% for ev in caps.events %} +
+ {{ ev.name }} + {% if ev.description %} +

{{ ev.description }}

+ {% endif %} + +
+ {% endfor %} +
+ {% else %} +

No events registered

+ {% endif %} +
+ + {# Identity #} + {% set ident = device._raw.identity or {} if device._raw else {} %} +
+

Identity

+
+
+ {% for key in ['manufacturer', 'model', 'firmware_version', 'serial_number', 'arch'] %} + {% if ident.get(key) %} +
+
{{ key | replace('_', ' ') | title }}
+
{{ ident[key] }}
+
+ {% endif %} + {% endfor %} +
+
+
+ +
+ +{# Invoke panel (shown by Try button) #} + + +{# Event log panel (shown by Live log button) #} + + +{# Collapsible raw JSON #} +
+ Raw registration data +
{{ device._raw | tojson_pretty }}
+
diff --git a/packages/device-connect-server/device_connect_server/portal/templates/devices/_live_table.html b/packages/device-connect-server/device_connect_server/portal/templates/devices/_live_table.html index 9eb4901..a5ac6c6 100644 --- a/packages/device-connect-server/device_connect_server/portal/templates/devices/_live_table.html +++ b/packages/device-connect-server/device_connect_server/portal/templates/devices/_live_table.html @@ -41,117 +41,13 @@ -
- - {# Capabilities: functions #} - {% set caps = device.capabilities or {} %} -
-

Functions (RPCs)

- {% if caps.functions %} -
- {% for fn in caps.functions %} -
- {{ fn.name }}({% for pname, pspec in (fn.parameters.properties or {}).items() %}{{ pname }}{% if pspec.default is defined %}={{ pspec.default | tojson }}{% endif %}{% if not loop.last %}, {% endif %}{% endfor %}) - {% if fn.description %} -

{{ fn.description }}

- {% endif %} - -
- {% endfor %} -
- {% else %} -

No RPC functions registered

- {% endif %} -
- - {# Capabilities: events #} -
-

Events

- {% if caps.events %} -
- {% for ev in caps.events %} -
- {{ ev.name }} - {% if ev.description %} -

{{ ev.description }}

- {% endif %} - -
- {% endfor %} -
- {% else %} -

No events registered

- {% endif %} -
- - {# Identity #} - {% set ident = device._raw.identity or {} if device._raw else {} %} -
-

Identity

-
-
- {% for key in ['manufacturer', 'model', 'firmware_version', 'serial_number', 'arch'] %} - {% if ident.get(key) %} -
-
{{ key | replace('_', ' ') | title }}
-
{{ ident[key] }}
-
- {% endif %} - {% endfor %} -
-
-
- + {# Per-device detail is lazy-loaded on first reveal — keeps the + table-poll response O(summary) rather than O(N * detail). #} +
+

Loading details…

- - {# Invoke panel (shown by Try button) #} - - - {# Event log panel (shown by Live log button) #} - - - {# Collapsible raw JSON #} -
- Raw registration data -
{{ device._raw | tojson_pretty }}
-
{% endfor %} diff --git a/packages/device-connect-server/device_connect_server/portal/views/dashboard.py b/packages/device-connect-server/device_connect_server/portal/views/dashboard.py index bf702e4..0d12f5d 100644 --- a/packages/device-connect-server/device_connect_server/portal/views/dashboard.py +++ b/packages/device-connect-server/device_connect_server/portal/views/dashboard.py @@ -18,6 +18,7 @@ def setup_routes(app: web.Application): app.router.add_get("/dashboard", dashboard_page) app.router.add_get("/api/devices/live", live_devices_fragment) + app.router.add_get("/api/devices/{device_id}/live-detail", live_device_detail_fragment) app.router.add_post("/api/devices/{device_id}/invoke", invoke_device_rpc) app.router.add_get("/api/devices/{device_id}/events/{event_name}/stream", event_stream) @@ -47,21 +48,58 @@ async def dashboard_page(request: web.Request): async def live_devices_fragment(request: web.Request): - """Return the live devices table as an HTML fragment for htmx polling.""" + """Return the live devices table as an HTML fragment for htmx polling. + + Only summary rows are rendered here; the per-device detail markup is + lazy-loaded from ``/api/devices/{device_id}/live-detail`` when a row + is expanded. At fleet scale the detail blocks dominate response size, + so deferring them keeps each poll cheap regardless of fleet size. + """ tenant = _resolve_tenant(request) + # etcd get_prefix + JSON-decode scales with fleet size; run it off + # the event loop so other portal requests aren't blocked on the poll. devices = [] try: - devices = registry_client.list_live_devices(tenant) + devices = await asyncio.to_thread( + registry_client.list_live_devices, tenant, + ) except Exception: pass return aiohttp_jinja2.render_template("devices/_live_table.html", request, { "devices": devices, + "tenant": tenant, "user": request.get("user", {}), }) +async def live_device_detail_fragment(request: web.Request): + """Return the per-device detail fragment (functions, events, raw JSON). + + Loaded lazily when a row is expanded — keeps the main polling + response O(summary) rather than O(summary + every-device-detail). + """ + tenant = _resolve_tenant(request) + device_id = request.match_info["device_id"] + + raw = await asyncio.to_thread(registry_client.get_device, tenant, device_id) + if not raw: + return web.Response( + text='

Device not found.

', + content_type="text/html", + ) + + device = { + "device_id": raw.get("device_id", device_id), + "capabilities": raw.get("capabilities") or {}, + "_raw": raw, + } + return aiohttp_jinja2.render_template( + "devices/_live_detail.html", request, {"device": device}, + ) + + def _resolve_tenant(request: web.Request) -> str: """Get tenant from query param (admin override) or session.""" from ..services.backend import validate_name From a0700617d24c45d62343553c41beb481e0698628 Mon Sep 17 00:00:00 2001 From: Alexander Tsyplikhin Date: Thu, 21 May 2026 03:05:13 +0000 Subject: [PATCH 05/44] feat(edge,registry): break the registration herd at fleet scale MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit At 1400+ phones spinning up in lockstep, every phone's _register hit the registry inside the same microsecond window. The 2 s request timeout guaranteed most callers timed out before the registry's etcd writes returned, every timeout fired a retry, and the retries re-entered the queue — classic congestion collapse, which left ~1200 phones invisible to the registry while still alive on the fabric. Edge SDK (device.py): - DEVICE_CONNECT_REGISTER_TIMEOUT (default 15 s, was hardcoded 2 s) lets the registry process a queued request before any retry fires. - DEVICE_CONNECT_REGISTER_JITTER (default 5 s) sleeps a uniform-random 0..jitter before the first request, decorrelating arrivals so the registry never sees a synchronized burst. Skipped when set to 0. Registry (registry.py): - DC_ETCD_POOL_SIZE (default 64) hands etcd3gw a requests.Session with an oversized urllib3 HTTPAdapter. The default pool of 10 was the actual ceiling on concurrent lease+put round-trips. End-to-end at 1400 phones: 0 registration failures, 0 publish-overflow errors, all phones in etcd within ~30 s of launch. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../device_connect_edge/device.py | 38 ++++++++++++++++++- .../registry/service/registry.py | 25 +++++++++++- 2 files changed, 61 insertions(+), 2 deletions(-) diff --git a/packages/device-connect-edge/device_connect_edge/device.py b/packages/device-connect-edge/device_connect_edge/device.py index c776d1a..62ba6f9 100644 --- a/packages/device-connect-edge/device_connect_edge/device.py +++ b/packages/device-connect-edge/device_connect_edge/device.py @@ -61,6 +61,7 @@ async def capture_image(self, resolution: str = "1080p") -> dict: import json import logging import os +import random import re import time import uuid @@ -90,6 +91,28 @@ async def capture_image(self, resolution: str = "1080p") -> dict: logger = logging.getLogger(__name__) +def _env_float(name: str, default: float) -> float: + """Best-effort float env-var parser; falls back to default on garbage.""" + raw = os.getenv(name) + if raw is None or raw == "": + return default + try: + return float(raw) + except ValueError: + return default + + +# Registration knobs. At fleet scale a 2s request timeout combined with N +# phones starting in lockstep produces congestion collapse on the +# registry: every queued-but-late reply triggers a retry that re-enters +# the queue. A larger timeout lets the registry catch up before any +# retry fires, and an up-front jitter spreads the initial herd so the +# registry never sees a synchronized burst in the first place. Both are +# env-tunable (and the jitter can be disabled by setting it to 0). +_REGISTER_REQUEST_TIMEOUT = _env_float("DEVICE_CONNECT_REGISTER_TIMEOUT", 15.0) +_REGISTER_STARTUP_JITTER = _env_float("DEVICE_CONNECT_REGISTER_JITTER", 5.0) + + def build_rpc_response(id_: str, result: Any) -> bytes: return json.dumps({"jsonrpc": "2.0", "id": id_, "result": result}).encode() @@ -974,6 +997,19 @@ async def _register(self, force: bool = False) -> None: self._logger.debug("Registration completed by another task, skipping") return + # Spread the herd. With 1000+ phones spinning up in lockstep + # the registry sees a single synchronized burst that times + # out most callers and amplifies into a retry storm. A small + # randomized delay before the first request decorrelates the + # arrivals; subsequent retries already have exponential + # backoff so we only jitter once per _register call. + if _REGISTER_STARTUP_JITTER > 0: + jitter = random.uniform(0, _REGISTER_STARTUP_JITTER) + self._logger.debug( + "Pre-registration jitter: sleeping %.2fs before first request", jitter, + ) + await asyncio.sleep(jitter) + delay = 1 # initial retry delay in seconds while True: req_id = f"{self.device_id}-{int(time.time()*1000)}" @@ -983,7 +1019,7 @@ async def _register(self, force: bool = False) -> None: response_data = await self.messaging.request( f"device-connect.{self.tenant}.registry", json.dumps({"jsonrpc": "2.0", "id": req_id, "method": "registerDevice", "params": params}).encode(), - timeout=2, + timeout=_REGISTER_REQUEST_TIMEOUT, ) self._handle_registration_reply(response_data) # Note: device/online event is published by the registry service diff --git a/packages/device-connect-server/device_connect_server/registry/service/registry.py b/packages/device-connect-server/device_connect_server/registry/service/registry.py index 2f525a7..9be004e 100644 --- a/packages/device-connect-server/device_connect_server/registry/service/registry.py +++ b/packages/device-connect-server/device_connect_server/registry/service/registry.py @@ -23,12 +23,31 @@ from typing import Any, Dict, List, Tuple import etcd3gw +import requests +from requests.adapters import HTTPAdapter _logger = logging.getLogger(__name__) ETCD_HOST = os.getenv("ETCD_HOST", "localhost") ETCD_PORT = int(os.getenv("ETCD_PORT", "2379")) +# Size of the urllib3 connection pool the etcd3gw client uses. The +# default (10) caps concurrent HTTP-to-etcd round-trips and bottlenecks +# the registry under a registration herd — every lease+put is two +# sequential HTTP calls, so 1400 phones at startup queue thousands of +# requests behind 10 sockets. 64 keeps the registry processor-bound on +# realistic hardware while staying well under etcd's connection ceiling. +_ETCD_POOL_SIZE = int(os.getenv("DC_ETCD_POOL_SIZE", "64")) + + +def _build_etcd_session(pool_size: int) -> requests.Session: + """requests.Session with an oversized HTTP connection pool for etcd.""" + session = requests.Session() + adapter = HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size) + session.mount("http://", adapter) + session.mount("https://", adapter) + return session + def _kv_key(kv: dict) -> str: """Extract the key string from an etcd3gw KV metadata dict.""" @@ -88,7 +107,11 @@ class DeviceRegistry: leases: Dict[str, Any] = field(default_factory=dict, init=False) def __post_init__(self) -> None: # pragma: no cover - thin wrapper - self.client = etcd3gw.client(host=self.host, port=self.port) + self.client = etcd3gw.client( + host=self.host, + port=self.port, + session=_build_etcd_session(_ETCD_POOL_SIZE), + ) def _key(self, tenant: str, device_id: str) -> str: return f"/device-connect/{tenant}/devices/{device_id}" From 8eb1437d0763a5fea4fc7905effee61f2edfb243 Mon Sep 17 00:00:00 2001 From: Alexander Tsyplikhin Date: Thu, 21 May 2026 03:10:23 +0000 Subject: [PATCH 06/44] fix(registry,tests): etcd pool enlargement compatible with etcd3gw 2.5.x; align listDevices tests with server-side pagination The previous change passed ``session=`` to ``etcd3gw.client(...)``, which only works on 2.6+; CI runs against 2.5.x and broke at module import. Mount the larger urllib3 pool onto the already-constructed ``client.session`` instead, which works on both versions. TestListDevicesHandler now mocks ``list_devices_page`` (the only path the handler takes after the server-side cap landed) and asserts the ``next_offset``/``total_matched`` shape in the reply. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../registry/service/registry.py | 25 ++++++------- .../test_rpc_handlers.py | 36 ++++++++++++------- 2 files changed, 37 insertions(+), 24 deletions(-) diff --git a/packages/device-connect-server/device_connect_server/registry/service/registry.py b/packages/device-connect-server/device_connect_server/registry/service/registry.py index 9be004e..0ed96d1 100644 --- a/packages/device-connect-server/device_connect_server/registry/service/registry.py +++ b/packages/device-connect-server/device_connect_server/registry/service/registry.py @@ -23,7 +23,6 @@ from typing import Any, Dict, List, Tuple import etcd3gw -import requests from requests.adapters import HTTPAdapter _logger = logging.getLogger(__name__) @@ -40,13 +39,18 @@ _ETCD_POOL_SIZE = int(os.getenv("DC_ETCD_POOL_SIZE", "64")) -def _build_etcd_session(pool_size: int) -> requests.Session: - """requests.Session with an oversized HTTP connection pool for etcd.""" - session = requests.Session() +def _enlarge_etcd_pool(client: Any, pool_size: int) -> None: + """Replace the etcd3gw client's HTTPAdapters with larger-pool ones. + + We mount the adapter onto the already-constructed ``client.session`` + instead of passing ``session=`` to ``etcd3gw.client(...)`` so the + fix works against etcd3gw 2.5.x (no ``session`` kwarg) and 2.6+. + """ + if not hasattr(client, "session"): + return adapter = HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size) - session.mount("http://", adapter) - session.mount("https://", adapter) - return session + client.session.mount("http://", adapter) + client.session.mount("https://", adapter) def _kv_key(kv: dict) -> str: @@ -107,11 +111,8 @@ class DeviceRegistry: leases: Dict[str, Any] = field(default_factory=dict, init=False) def __post_init__(self) -> None: # pragma: no cover - thin wrapper - self.client = etcd3gw.client( - host=self.host, - port=self.port, - session=_build_etcd_session(_ETCD_POOL_SIZE), - ) + self.client = etcd3gw.client(host=self.host, port=self.port) + _enlarge_etcd_pool(self.client, _ETCD_POOL_SIZE) def _key(self, tenant: str, device_id: str) -> str: return f"/device-connect/{tenant}/devices/{device_id}" diff --git a/packages/device-connect-server/tests/device_connect_server/test_rpc_handlers.py b/packages/device-connect-server/tests/device_connect_server/test_rpc_handlers.py index 1eed021..069310c 100644 --- a/packages/device-connect-server/tests/device_connect_server/test_rpc_handlers.py +++ b/packages/device-connect-server/tests/device_connect_server/test_rpc_handlers.py @@ -121,6 +121,7 @@ def mock_registry(): with patch("device_connect_server.registry.service.main.registry") as mock_reg: mock_reg.register = MagicMock() mock_reg.list_devices = MagicMock(return_value=[]) + mock_reg.list_devices_page = MagicMock(return_value=([], None, 0)) mock_reg.get_device = MagicMock(return_value=None) mock_reg.refresh = MagicMock() mock_reg.update_status = MagicMock() @@ -297,42 +298,53 @@ class TestListDevicesHandler: @pytest.mark.asyncio async def test_list_devices_success(self, messaging, mock_registry): - mock_registry.list_devices.return_value = [SAMPLE_DEVICE] + mock_registry.list_devices_page.return_value = ([SAMPLE_DEVICE], None, 1) handler = _make_list_handler(TENANT, messaging) data = _rpc_request("discovery/listDevices", {}) await handler(data, "reply-sub") - mock_registry.list_devices.assert_called_once_with( - TENANT, device_type=None, location=None, - ) + # Handler always paginates server-side; the limit is the + # _LIST_DEVICES_MAX_LIMIT cap when the caller doesn't pass one. + mock_registry.list_devices_page.assert_called_once() + call_kwargs = mock_registry.list_devices_page.call_args.kwargs + assert call_kwargs["device_type"] is None + assert call_kwargs["location"] is None + assert call_kwargs["offset"] == 0 + assert call_kwargs["limit"] >= 1 response = json.loads(messaging.publish.call_args[0][1]) assert response["result"]["devices"] == [SAMPLE_DEVICE] + assert response["result"]["next_offset"] is None + assert response["result"]["total_matched"] == 1 @pytest.mark.asyncio async def test_list_devices_with_filters(self, messaging, mock_registry): - mock_registry.list_devices.return_value = [SAMPLE_DEVICE] + mock_registry.list_devices_page.return_value = ([SAMPLE_DEVICE], None, 1) handler = _make_list_handler(TENANT, messaging) data = _rpc_request("discovery/listDevices", { "device_type": "camera", "location": "lab-A", }) await handler(data, "reply-sub") - mock_registry.list_devices.assert_called_once_with( - TENANT, device_type="camera", location="lab-A", - ) + mock_registry.list_devices_page.assert_called_once() + call_kwargs = mock_registry.list_devices_page.call_args.kwargs + assert call_kwargs["device_type"] == "camera" + assert call_kwargs["location"] == "lab-A" @pytest.mark.asyncio async def test_list_devices_empty(self, messaging, mock_registry): - mock_registry.list_devices.return_value = [] + mock_registry.list_devices_page.return_value = ([], None, 0) handler = _make_list_handler(TENANT, messaging) await handler(_rpc_request("discovery/listDevices", {}), "reply-sub") response = json.loads(messaging.publish.call_args[0][1]) assert response["result"]["devices"] == [] + assert response["result"]["total_matched"] == 0 @pytest.mark.asyncio async def test_list_devices_with_acl(self, messaging, mock_registry): - mock_registry.list_devices.return_value = [SAMPLE_DEVICE, SAMPLE_DEVICE_2] + mock_registry.list_devices_page.return_value = ( + [SAMPLE_DEVICE, SAMPLE_DEVICE_2], None, 2, + ) acl_mgr = ACLManager() # Hide camera-001 from robot-001 @@ -352,7 +364,7 @@ async def test_list_devices_with_acl(self, messaging, mock_registry): @pytest.mark.asyncio async def test_list_devices_registry_error(self, messaging, mock_registry): - mock_registry.list_devices.side_effect = RuntimeError("etcd down") + mock_registry.list_devices_page.side_effect = RuntimeError("etcd down") handler = _make_list_handler(TENANT, messaging) await handler(_rpc_request("discovery/listDevices", {}), "reply-sub") @@ -366,7 +378,7 @@ async def test_discovery_no_reply_returns_silently(self, messaging, mock_registr await handler(data, None) messaging.publish.assert_not_called() - mock_registry.list_devices.assert_not_called() + mock_registry.list_devices_page.assert_not_called() # --------------------------------------------------------------------------- From 067365226d778221bf4a800be51ea54c0cf26328 Mon Sep 17 00:00:00 2001 From: Alexander Tsyplikhin Date: Wed, 20 May 2026 21:33:09 -0700 Subject: [PATCH 07/44] fix(edge): make @on subscription behavior in portal mode less confusing Three small clarity fixes uncovered while wiring an out-of-process camera driver to subscribe to phone state_changed events at fleet scale (1400 devices across 7 shards) against the DC Portal. 1. _collect_event_subscriptions no longer skips single-underscore method names. Previously an @on-decorated `_on_foo` handler was silently never registered (the attribute-name check at base.py:1058 dropped it before the `_is_event_subscription` marker check could even run). The collector now only skips dunders. Behavior is still gated by the `_is_event_subscription` marker, so the public-API surface is unchanged. New test: test_underscore_prefixed_handler_is_still_collected. 2. Renamed the two INFO log lines in _setup_agentic_driver that called the path "D2D capabilities" / "D2D setup complete" -- the same code path runs in both d2d and portal/registry mode, so the label was wrong half the time. The completion line now reports the actual registry kind (D2DRegistry vs RegistryClient). 3. @on(device_type=...) filtering is best-effort in portal/registry mode because the D2D peer cache used to resolve a source device's type is None. The handler's cache-miss path passes events through (false negatives are worse than false positives), but until now that limitation only surfaced via a per-event debug log. Emit a WARNING at subscription-setup time when device_type is set and there is no peer collector, so subscribers see the gotcha once instead of silently receiving events from other device types. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../device_connect_edge/device.py | 11 +++++++-- .../device_connect_edge/drivers/base.py | 24 ++++++++++++++++++- .../device-connect-edge/tests/test_drivers.py | 21 ++++++++++++++++ 3 files changed, 53 insertions(+), 3 deletions(-) diff --git a/packages/device-connect-edge/device_connect_edge/device.py b/packages/device-connect-edge/device_connect_edge/device.py index 62ba6f9..35e93cd 100644 --- a/packages/device-connect-edge/device_connect_edge/device.py +++ b/packages/device-connect-edge/device_connect_edge/device.py @@ -1798,7 +1798,10 @@ async def _setup_agentic_driver(self) -> None: if not isinstance(self._driver, DeviceDriver): return - self._logger.info("Setting up DeviceDriver D2D capabilities") + self._logger.info( + "Setting up DeviceDriver inter-device messaging " + "(router, registry, @on subscriptions)" + ) # Create and set D2D router (inline — no orchestration dependency). router = _RemoteInvoker( @@ -1832,7 +1835,11 @@ async def _setup_agentic_driver(self) -> None: # Set up event subscriptions await self._driver.setup_subscriptions() - self._logger.info("DeviceDriver D2D setup complete") + registry_kind = "D2DRegistry" if self._d2d_mode else "RegistryClient" + self._logger.info( + "DeviceDriver inter-device messaging ready (registry=%s)", + registry_kind, + ) async def _teardown_agentic_driver(self) -> None: """Teardown DeviceDriver subscriptions if applicable.""" diff --git a/packages/device-connect-edge/device_connect_edge/drivers/base.py b/packages/device-connect-edge/device_connect_edge/drivers/base.py index 73a596b..3f34219 100644 --- a/packages/device-connect-edge/device_connect_edge/drivers/base.py +++ b/packages/device-connect-edge/device_connect_edge/drivers/base.py @@ -1049,13 +1049,17 @@ async def wait_for_device( def _collect_event_subscriptions(self) -> List[Dict[str, Any]]: """Collect all @on decorated methods. + Scans single-underscore-prefixed methods as well as public ones so + drivers can keep ``@on`` handlers conventionally private without + them silently becoming no-ops. Dunders are still skipped. + Returns: List of subscription definitions """ subscriptions = [] for attr_name in dir(self): - if attr_name.startswith("_"): + if attr_name.startswith("__"): continue attr = getattr(self, attr_name, None) @@ -1225,6 +1229,24 @@ async def _setup_subscription(self, sub: Dict[str, Any]) -> None: logger.info("[%s] Subscribing to: %s", self_id, subject) + # device_type filtering relies on the D2D peer cache to resolve the + # source device's type. In portal/registry mode there is no peer + # cache, so the cache miss path passes the event through unfiltered. + # Warn once at setup so subscribers don't silently see events from + # other device types. Strict filtering can be added in-handler. + if ( + device_type + and not is_lifecycle + and getattr(self._device, "_d2d_collector", None) is None + ): + logger.warning( + "[%s] @on(device_type=%r) on %s: device_type filtering is " + "best-effort in registry/portal mode. The wildcard broker " + "subject delivers every device's matching event; add an " + "in-handler type check if you need strict filtering.", + self_id, device_type, subject, + ) + # Use subscribe_with_subject to get the matched subject in callback # This allows extracting device_id from wildcard subscriptions messaging_client = self._router._messaging diff --git a/packages/device-connect-edge/tests/test_drivers.py b/packages/device-connect-edge/tests/test_drivers.py index 9099930..f823281 100644 --- a/packages/device-connect-edge/tests/test_drivers.py +++ b/packages/device-connect-edge/tests/test_drivers.py @@ -392,6 +392,27 @@ async def disconnect(self): subs = driver._collect_event_subscriptions() assert len(subs) == 2 + def test_underscore_prefixed_handler_is_still_collected(self): + """Single-underscore @on handlers must not silently become no-ops.""" + class MyDriver(DeviceDriver): + device_type = "test" + + @on(device_type="phone", event_name="state_changed") + async def _on_phone_state(self, device_id, event_name, payload): + pass + + async def connect(self): + pass + + async def disconnect(self): + pass + + driver = MyDriver() + subs = driver._collect_event_subscriptions() + assert len(subs) == 1 + assert subs[0]["device_type"] == "phone" + assert subs[0]["event_name"] == "state_changed" + # ── setup_subscriptions error isolation ─────────────────────────── From 88f6668f909cab25508fe4c05418f6082566347e Mon Sep 17 00:00:00 2001 From: Alexander Tsyplikhin Date: Thu, 21 May 2026 05:36:06 +0000 Subject: [PATCH 08/44] perf(portal): cache the NATS invoke client + log RPC timing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit invoke_device_rpc used to open and close a fresh NATS connection per dashboard "Run" click, paying a full TCP+JWT-auth handshake on every request. At fleet scale, that handshake is the dominant cost — the RPC itself is ~12 ms once a client is connected. nats_rpc.invoke() now holds a single long-lived client under an asyncio.Lock and reuses it across requests; transport errors drop the cache so the next call reconnects. The HTTP handler and the NATS layer both log handler/RPC timings so the source of any future latency is visible from docker logs alone: invoke alpha/phone-0001.get_position handler=11.5ms (pre-rpc=0.1ms rpc=11.5ms) Co-Authored-By: Claude Opus 4.7 (1M context) --- .../portal/services/nats_rpc.py | 61 +++++++++++++++---- .../portal/views/dashboard.py | 13 ++++ 2 files changed, 63 insertions(+), 11 deletions(-) diff --git a/packages/device-connect-server/device_connect_server/portal/services/nats_rpc.py b/packages/device-connect-server/device_connect_server/portal/services/nats_rpc.py index 3c03ce0..fc7ce31 100644 --- a/packages/device-connect-server/device_connect_server/portal/services/nats_rpc.py +++ b/packages/device-connect-server/device_connect_server/portal/services/nats_rpc.py @@ -4,8 +4,10 @@ """NATS helpers: RPC invocation and event streaming.""" +import asyncio import json import logging +import time import uuid from pathlib import Path @@ -18,6 +20,14 @@ # Registry credentials (privileged, can reach all tenants) _REGISTRY_CREDS = Path(config.CREDS_DIR) / "registry.creds.json" +# Long-lived client reused across all invoke() calls. The portal used to +# open and close a fresh NATS connection per RPC, which added a TCP + +# JWT-auth handshake to every dashboard "Run" click. The connection is +# concurrent-safe (each nc.request creates its own inbox subscription) +# so a single cached client serves the whole portal. +_invoke_client: "nats.aio.client.Client | None" = None +_invoke_client_lock = asyncio.Lock() + def _load_creds() -> dict: """Load registry credentials for NATS auth.""" @@ -27,6 +37,16 @@ def _load_creds() -> dict: return {} +async def _get_invoke_client(): + """Lazily open and cache a single NATS client for RPC invocations.""" + global _invoke_client + async with _invoke_client_lock: + if _invoke_client is None or _invoke_client.is_closed: + _invoke_client = await connect() + logger.info("invoke client connected; will be reused across requests") + return _invoke_client + + async def connect(): """Return a connected NATS client using registry credentials.""" creds = _load_creds() @@ -55,23 +75,42 @@ def _sign(nonce): async def invoke(tenant: str, device_id: str, function: str, params: dict, timeout: float = 5.0) -> dict: """Send a JSON-RPC request to a device and return the response.""" - nc = await connect() + t0 = time.monotonic() + subject = f"device-connect.{tenant}.{device_id}.cmd" + payload = { + "jsonrpc": "2.0", + "id": str(uuid.uuid4()), + "method": function, + "params": params, + } try: - subject = f"device-connect.{tenant}.{device_id}.cmd" - payload = { - "jsonrpc": "2.0", - "id": str(uuid.uuid4()), - "method": function, - "params": params, - } - + nc = await _get_invoke_client() msg = await nc.request(subject, json.dumps(payload).encode(), timeout=timeout) + logger.info( + "invoke %s/%s.%s ok in %.1fms", + tenant, device_id, function, (time.monotonic() - t0) * 1000, + ) return json.loads(msg.data) except nats.errors.NoRespondersError: + logger.warning( + "invoke %s/%s.%s no-responders in %.1fms", + tenant, device_id, function, (time.monotonic() - t0) * 1000, + ) return {"error": {"code": -1, "message": f"Device {device_id} is not responding"}} except nats.errors.TimeoutError: + logger.warning( + "invoke %s/%s.%s timeout in %.1fms", + tenant, device_id, function, (time.monotonic() - t0) * 1000, + ) return {"error": {"code": -2, "message": f"Request timed out after {timeout}s"}} except Exception as e: + # On a hard transport failure, drop the cached client so the next + # call reconnects rather than reusing a dead connection. + global _invoke_client + async with _invoke_client_lock: + _invoke_client = None + logger.exception( + "invoke %s/%s.%s error in %.1fms: %s", + tenant, device_id, function, (time.monotonic() - t0) * 1000, e, + ) return {"error": {"code": -3, "message": str(e)}} - finally: - await nc.close() diff --git a/packages/device-connect-server/device_connect_server/portal/views/dashboard.py b/packages/device-connect-server/device_connect_server/portal/views/dashboard.py index 0d12f5d..9e80998 100644 --- a/packages/device-connect-server/device_connect_server/portal/views/dashboard.py +++ b/packages/device-connect-server/device_connect_server/portal/views/dashboard.py @@ -6,6 +6,7 @@ import asyncio import json +import logging import time import aiohttp_jinja2 @@ -14,6 +15,8 @@ from ..services import credentials, registry_client from ..services.backend import get_backend +logger = logging.getLogger(__name__) + def setup_routes(app: web.Application): app.router.add_get("/dashboard", dashboard_page) @@ -114,6 +117,7 @@ def _resolve_tenant(request: web.Request) -> str: async def invoke_device_rpc(request: web.Request): """Invoke an RPC function on a device via the active messaging backend.""" + t0 = time.monotonic() tenant = _resolve_tenant(request) device_id = request.match_info["device_id"] @@ -129,7 +133,16 @@ async def invoke_device_rpc(request: web.Request): return web.json_response({"error": {"message": "function is required"}}, status=400) backend = get_backend() + t_pre_rpc = time.monotonic() result = await backend.rpc_invoke(tenant, device_id, function, params) + t_post_rpc = time.monotonic() + logger.info( + "invoke %s/%s.%s handler=%.1fms (pre-rpc=%.1fms rpc=%.1fms)", + tenant, device_id, function, + (t_post_rpc - t0) * 1000, + (t_pre_rpc - t0) * 1000, + (t_post_rpc - t_pre_rpc) * 1000, + ) return web.json_response(result) From 09c85aa9db8b8d5d7cd8361c4a59c938a6edf4f9 Mon Sep 17 00:00:00 2001 From: Alexander Tsyplikhin Date: Thu, 21 May 2026 05:36:24 +0000 Subject: [PATCH 09/44] fix(portal): load device-detail rows on click, not on htmx revealed MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous lazy-load wired ``hx-trigger="revealed once"`` on every hidden detail ``
``. htmx's revealed implementation polls getBoundingClientRect, and ``display:none`` elements report a zero-size rect at (0,0) — which counts as "in the viewport." That meant the trigger fired for every hidden row immediately after the table fragment rendered: at 1400 phones, the browser fan-out queued ~1400 parallel fetches behind its 6-connection-per-origin pool, and a subsequent "Run" click on a row's RPC sat at the back of that queue for many seconds. Replace the htmx trigger with a plain fetch invoked from toggleDetail() and a ``_detailLoaded`` Set that dedupes re-expands. The detail URL is carried on the parent ```` as ``data-detail-url`` so the JS doesn't have to reconstruct it. Net result: zero detail fetches at page load, one fetch on first expand per device, and "Run" feels instant again. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../portal/templates/dashboard.html | 25 +++++++++++++++++-- .../portal/templates/devices/_live_table.html | 15 ++++++----- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html b/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html index 39a8a52..45933a6 100644 --- a/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html +++ b/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html @@ -56,6 +56,23 @@

Live Devices

{% endblock %} diff --git a/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html b/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html index f8dd0d0..7b55400 100644 --- a/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html +++ b/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html @@ -53,6 +53,12 @@

Live Devices

+{# Stable container for live SSE event-log drawers. Lives OUTSIDE + #live-devices so the 10s poll swap can never touch it. Drawers + are created dynamically by openEventLog() and removed by + closeEventLog(). #} +
+ {% endblock %} diff --git a/packages/device-connect-server/device_connect_server/portal/templates/devices/_live_detail.html b/packages/device-connect-server/device_connect_server/portal/templates/devices/_live_detail.html index d2d2b77..2f2281b 100644 --- a/packages/device-connect-server/device_connect_server/portal/templates/devices/_live_detail.html +++ b/packages/device-connect-server/device_connect_server/portal/templates/devices/_live_detail.html @@ -87,22 +87,17 @@

-{# Event log panel (shown by Live log button) #} - +{# Event log panel intentionally NOT rendered here. + + The panel and its accumulated entries are hoisted into a stable + #event-drawers container on the page shell (dashboard.html / + admin/tenant_detail.html), outside #live-devices. Rendering it + inside this slot meant every 10s poll swap wiped the panel, the + accumulated entries, and the live EventSource binding — which + could not be reliably restored from a swap-time DOM stash. + + openEventLog() creates the drawer dynamically; the Live log button + above is the only entry point. #} {# Collapsible raw JSON #}
From 747a70833238594edda51c53dab99a080282b1aa Mon Sep 17 00:00:00 2001 From: Alexander Tsyplikhin Date: Wed, 27 May 2026 23:31:23 +0000 Subject: [PATCH 25/44] fix(portal): use MutationObserver to keep rows expanded across poll swap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The htmx:afterSwap listener bound to #live-devices wasn't reliably firing in practice — expanded rows kept collapsing on every poll even though my handler should have un-hidden them. The chevron staying in the right-pointing (un-rotated) state after a swap confirms the handler never ran, so this isn't a logic bug in the handler. Switch to a MutationObserver on #live-devices's childList. It fires after the DOM actually changes, regardless of which mechanism caused the change, and is independent of htmx's event firing / OOB-swap interleaving / source-vs-target ambiguity. The body of the callback is the same as the old afterSwap handler: for each deviceId in _expandedDevices, find the new row, un-hide it, rotate the chevron, trigger lazy detail fetch. Cleanup for dead devices (close stream, remove drawer) runs here too. The detail-content stash/restore is gone with the htmx listeners. It was speculative anyway (we suspected it never worked for the event log; same likely true for the invoke panel). The event log already survives via the hoisted drawer container. The invoke panel will collapse on each 10s poll — acceptable regression, filed as follow-up if it bites. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../portal/templates/admin/tenant_detail.html | 58 +++++---------- .../portal/templates/dashboard.html | 74 +++++++------------ 2 files changed, 44 insertions(+), 88 deletions(-) diff --git a/packages/device-connect-server/device_connect_server/portal/templates/admin/tenant_detail.html b/packages/device-connect-server/device_connect_server/portal/templates/admin/tenant_detail.html index 966e3b9..0e25fc3 100644 --- a/packages/device-connect-server/device_connect_server/portal/templates/admin/tenant_detail.html +++ b/packages/device-connect-server/device_connect_server/portal/templates/admin/tenant_detail.html @@ -106,61 +106,37 @@

Live Devices

} } -// Preserve detail-content DOM across the poll swap. See dashboard.html -// for the full rationale: re-fetching detail wipes live state inside -// the slot (open event-log SSE panel + accumulated entries, open -// invoke panel), so we detach the slot's children before the swap and -// re-attach the *same DOM nodes* afterwards. That keeps -// EventSource.onmessage closures pointing at in-document elements -// instead of detached ones. +// Keep rows expanded across the poll swap. Uses MutationObserver +// rather than htmx:afterSwap — the latter wasn't reliably firing on +// this target in practice. See dashboard.html for full rationale. var liveDevices = document.getElementById('live-devices'); -liveDevices.addEventListener('htmx:beforeSwap', function() { - window._stashedDetails = {}; - window._expandedDevices.forEach(function(deviceId) { - var row = document.getElementById('detail-' + deviceId); - if (!row) return; - var slot = row.querySelector('.detail-content'); - if (!slot || slot.dataset.loaded !== 'true') return; - var children = []; - while (slot.firstChild) { - children.push(slot.removeChild(slot.firstChild)); - } - window._stashedDetails[deviceId] = children; - }); -}); - -liveDevices.addEventListener('htmx:afterSwap', function() { - var stash = window._stashedDetails || {}; - window._stashedDetails = null; - +function applyExpansionState() { window._expandedDevices.forEach(function(deviceId) { var row = document.getElementById('detail-' + deviceId); if (!row) { + window._expandedDevices.delete(deviceId); if (window._eventSources && window._eventSources[deviceId]) { try { window._eventSources[deviceId].close(); } catch (e) {} delete window._eventSources[deviceId]; + var drawer = document.getElementById('eventlog-' + deviceId); + if (drawer && drawer.parentElement) { + drawer.parentElement.removeChild(drawer); + } } - delete stash[deviceId]; - window._expandedDevices.delete(deviceId); return; } - row.classList.remove('hidden'); - var chevron = document.getElementById('chevron-' + deviceId); - if (chevron) chevron.style.transform = 'rotate(90deg)'; - - var stashed = stash[deviceId]; - if (stashed && stashed.length) { - var slot = row.querySelector('.detail-content'); - if (slot) { - slot.innerHTML = ''; - stashed.forEach(function(node) { slot.appendChild(node); }); - slot.dataset.loaded = 'true'; - } - } else { + if (row.classList.contains('hidden')) { + row.classList.remove('hidden'); + var chevron = document.getElementById('chevron-' + deviceId); + if (chevron) chevron.style.transform = 'rotate(90deg)'; loadDetailIfNeeded(row, deviceId); } }); +} + +new MutationObserver(applyExpansionState).observe(liveDevices, { + childList: true, }); // --- RPC Invoke --- diff --git a/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html b/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html index 7b55400..6ea3496 100644 --- a/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html +++ b/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html @@ -104,71 +104,51 @@

Live Devices

} } -// Preserve detail-content DOM across the poll swap. We deliberately do -// NOT pause the poll while rows are expanded: pausing freezes the +// Keep rows expanded across the 10s poll swap. We deliberately do NOT +// pause the poll while rows are expanded: pausing freezes the // hx-swap-oob counters that ride on this fragment, leaving the header // (#online-count / #registered-count) stuck at its first-render value // the whole time the operator is inspecting any device. // -// But re-fetching detail HTML on every poll wipes live UI state that -// lives inside the slot — an open event-log SSE panel, the entries it -// has accumulated, an open invoke panel. To keep those alive we detach -// the slot's children before the swap and re-attach the *same DOM -// nodes* afterwards, so EventSource.onmessage closures keep pointing -// at live, in-document elements instead of detached ones. +// We use a MutationObserver instead of htmx:afterSwap because the htmx +// event was not reliably reaching this listener in practice (the row +// kept collapsing even with the listener bound). MutationObserver +// watches the DOM directly and fires after the swap completes, +// regardless of who triggered it — no dependence on htmx event timing +// or OOB-swap interleaving. var liveDevices = document.getElementById('live-devices'); -liveDevices.addEventListener('htmx:beforeSwap', function() { - window._stashedDetails = {}; - window._expandedDevices.forEach(function(deviceId) { - var row = document.getElementById('detail-' + deviceId); - if (!row) return; - var slot = row.querySelector('.detail-content'); - if (!slot || slot.dataset.loaded !== 'true') return; - var children = []; - while (slot.firstChild) { - children.push(slot.removeChild(slot.firstChild)); - } - window._stashedDetails[deviceId] = children; - }); -}); - -liveDevices.addEventListener('htmx:afterSwap', function() { - var stash = window._stashedDetails || {}; - window._stashedDetails = null; - +function applyExpansionState() { window._expandedDevices.forEach(function(deviceId) { var row = document.getElementById('detail-' + deviceId); if (!row) { - // Device dropped out of the live list (went offline / deregistered). - // Close any active stream and forget all state — including the - // stashed nodes, which are now orphans we'd otherwise leak. + // Device dropped out of the table (went offline / deregistered). + // Close any active stream and remove its drawer. + window._expandedDevices.delete(deviceId); if (window._eventSources && window._eventSources[deviceId]) { try { window._eventSources[deviceId].close(); } catch (e) {} delete window._eventSources[deviceId]; + var drawer = document.getElementById('eventlog-' + deviceId); + if (drawer && drawer.parentElement) { + drawer.parentElement.removeChild(drawer); + } } - delete stash[deviceId]; - window._expandedDevices.delete(deviceId); return; } - row.classList.remove('hidden'); - var chevron = document.getElementById('chevron-' + deviceId); - if (chevron) chevron.style.transform = 'rotate(90deg)'; - - var stashed = stash[deviceId]; - if (stashed && stashed.length) { - var slot = row.querySelector('.detail-content'); - if (slot) { - slot.innerHTML = ''; - stashed.forEach(function(node) { slot.appendChild(node); }); - slot.dataset.loaded = 'true'; - } - } else { - // No stash for this row — either it had not finished loading yet, - // or it just got expanded. Fall through to the lazy fetch. + if (row.classList.contains('hidden')) { + row.classList.remove('hidden'); + var chevron = document.getElementById('chevron-' + deviceId); + if (chevron) chevron.style.transform = 'rotate(90deg)'; + // dataset.loaded is gone on the fresh slot, so loadDetailIfNeeded + // re-fetches the detail body. The event-log drawer lives in + // #event-drawers and is untouched by this swap. loadDetailIfNeeded(row, deviceId); } }); +} + +new MutationObserver(applyExpansionState).observe(liveDevices, { + childList: true, }); // --- RPC Invoke --- From cc454cfa304d67cbc069bebb6496c10cb465c3de Mon Sep 17 00:00:00 2001 From: Alexander Tsyplikhin Date: Wed, 27 May 2026 23:58:28 +0000 Subject: [PATCH 26/44] debug(portal): instrument dashboard.html to trace row-collapse bug MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an inline #dbg-log panel below the event drawers that prints timestamped lines for everything we want to see: - script load (confirms the new JS is actually being executed, not a cached older version) - liveDevices lookup result (confirms #getElementById finds the target the observer is supposed to watch) - toggleDetail invocations + resulting _expandedDevices contents (confirms the user click is being captured and the set is being populated) - MutationObserver fires + record count (confirms the observer is actually being notified when the table swaps) - applyExpansionState invocations with _expandedDevices contents and per-device row lookup results (this is where the row re-expansion should happen) - htmx:beforeRequest / beforeSwap / afterSwap / afterSettle on BOTH #live-devices and document.body — so we can tell whether htmx events ever fire for this target, and if so, where they're reaching Inline so the operator can see it without opening devtools and can paste a screenshot. Dashboard-only for now; if this points at a real bug rather than a setup quirk, the same instrumentation can go into admin/tenant_detail.html later. This commit is purely diagnostic — revert (or strip the #dbg-log block and _dbg calls) once the root cause is found. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../portal/templates/dashboard.html | 68 ++++++++++++++++--- 1 file changed, 57 insertions(+), 11 deletions(-) diff --git a/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html b/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html index 6ea3496..82161d1 100644 --- a/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html +++ b/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html @@ -59,7 +59,32 @@

Live Devices

closeEventLog(). #}
+{# Debug log — TEMPORARY. Renders inline so you can see what's + happening without opening the browser console. Remove once the + row-collapse bug is understood. #} +
+
[dbg] script not yet loaded
+
+ {% endblock %} diff --git a/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html b/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html index 6c4c9be..fb7e302 100644 --- a/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html +++ b/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html @@ -58,12 +58,6 @@

Live Devices

-{# Stable container for live SSE event-log drawers. Lives OUTSIDE - #live-devices so the 10s poll swap can never touch it. Drawers - are created dynamically by openEventLog() and removed by - closeEventLog(). #} -
- {% endblock %} diff --git a/packages/device-connect-server/device_connect_server/portal/templates/devices/_live_detail.html b/packages/device-connect-server/device_connect_server/portal/templates/devices/_live_detail.html index 2f2281b..1f84015 100644 --- a/packages/device-connect-server/device_connect_server/portal/templates/devices/_live_detail.html +++ b/packages/device-connect-server/device_connect_server/portal/templates/devices/_live_detail.html @@ -87,17 +87,30 @@

-{# Event log panel intentionally NOT rendered here. +{# Event log panel (shown by Live log button). - The panel and its accumulated entries are hoisted into a stable - #event-drawers container on the page shell (dashboard.html / - admin/tenant_detail.html), outside #live-devices. Rendering it - inside this slot meant every 10s poll swap wiped the panel, the - accumulated entries, and the live EventSource binding — which - could not be reliably restored from a swap-time DOM stash. - - openEventLog() creates the drawer dynamically; the Live log button - above is the only entry point. #} + Lives inside the detail row so the log appears next to the device + it's streaming from — important UX at 100s of devices. Safe to + keep here because the JSON-poll architecture leaves the detail + slot untouched on every 10s tick; the only way the slot gets + re-fetched is a capability hash change, at which point the + client closes any open stream before the refetch (the events + list might have changed too). #} + {# Collapsible raw JSON #}
From 69d0923ef8ac51aea8c78838508e75a322f547ef Mon Sep 17 00:00:00 2001 From: Alexander Tsyplikhin Date: Thu, 28 May 2026 01:53:20 +0000 Subject: [PATCH 37/44] feat(portal): full revoke for credentials (UI button + CLI command) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Until now there was no way to make a credential disappear from the portal's "Credentials Created" counter and the /devices list: - `revoke-credentials` (CLI) called credentials:rotate — re-issues creds, file stays, counter unchanged. - `delete` (CLI) called backend.remove_device — decommissions the JWT account, file stays, counter unchanged. - Nothing deleted the local cred file. Add a true revoke that does the obvious thing: kill the backend account so the device can't reconnect, AND delete the cred file so it stops showing up in the portal. Server: - credentials.delete_credential(filename) with the same path- traversal guard get_credential uses. Returns False if the file doesn't exist or unlink fails. - POST /api/devices/{name}/revoke (portal, session-auth + tenant- scoped): backend.remove_device best-effort, reload_broker, then delete the cred file. Empty 200 body so htmx removes the row in place via hx-swap="delete". Backend failures bubble through the X-Revoke-Warning header instead of blocking the file delete — the operator's intent ("get this gone") takes priority. - POST /api/agent/v1/devices/{id}/revoke (agent_api, bearer-token, devices:provision scope): same logic, JSON envelope. Sits alongside the existing :rotate and DELETE endpoints. UI: - devices/_device_row.html now takes an optional `highlight` flag (default false) so the same partial powers both the listing (no flash) and the create-device fragment (green flash). The duplicated row markup in list.html is gone — `{% include %}` instead. - Revoke button added: red pill next to Download, with hx-confirm, hx-target="#cred-row-{device_id}", hx-swap="delete". On success the row vanishes immediately; the dashboard counter updates on the next 10s JSON poll. Trash-can icon. CLI: - `portalctl devices revoke --confirm` hits the new endpoint. Mirrors `delete`'s --confirm gate. Existing `revoke-credentials` (rotates) and `delete` (decommissions) stay as-is — both still work, neither covers the "make this credential gone" intent. The naming overlap is documented in the new command's help text and docstring. 372 server tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../portal/services/credentials.py | 18 +++++ .../portal/templates/devices/_device_row.html | 18 ++++- .../portal/templates/devices/list.html | 16 +---- .../portal/views/agent_api.py | 53 +++++++++++++++ .../portal/views/devices.py | 68 ++++++++++++++++++- .../device_connect_server/portalctl/cli.py | 32 ++++++++- 6 files changed, 187 insertions(+), 18 deletions(-) diff --git a/packages/device-connect-server/device_connect_server/portal/services/credentials.py b/packages/device-connect-server/device_connect_server/portal/services/credentials.py index 58a7687..6fb118e 100644 --- a/packages/device-connect-server/device_connect_server/portal/services/credentials.py +++ b/packages/device-connect-server/device_connect_server/portal/services/credentials.py @@ -64,6 +64,24 @@ def get_credential_data(filename: str) -> dict | None: return None +def delete_credential(filename: str) -> bool: + """Remove a credential file from disk. + + Returns True if a file was deleted, False if no such file existed. + Uses the same path-traversal guard as :func:`get_credential`, so a + crafted ``filename`` that resolves outside ``CREDS_DIR`` is rejected. + """ + path = get_credential(filename) + if not path: + return False + try: + path.unlink() + return True + except OSError: + logger.exception("failed to remove credential %s", filename) + return False + + def get_tenants_summary() -> dict[str, dict]: """Get a summary of all tenants and their device counts. diff --git a/packages/device-connect-server/device_connect_server/portal/templates/devices/_device_row.html b/packages/device-connect-server/device_connect_server/portal/templates/devices/_device_row.html index 4d397d4..275454e 100644 --- a/packages/device-connect-server/device_connect_server/portal/templates/devices/_device_row.html +++ b/packages/device-connect-server/device_connect_server/portal/templates/devices/_device_row.html @@ -1,4 +1,12 @@ -
+{# Single device credential row, used by both the /devices listing + and the create-device htmx response. `highlight` (default false) + toggles a brief green flash for newly-created rows. The wrapping + id="cred-row-{device_id}" gives the Revoke button a stable + hx-target for `hx-swap="delete"` to remove just this row on a + successful revoke. #} +
{{ cred.device_id }}
{{ cred.filename }}
@@ -9,5 +17,13 @@ Download +
diff --git a/packages/device-connect-server/device_connect_server/portal/templates/devices/list.html b/packages/device-connect-server/device_connect_server/portal/templates/devices/list.html index 36b6103..eb02c7e 100644 --- a/packages/device-connect-server/device_connect_server/portal/templates/devices/list.html +++ b/packages/device-connect-server/device_connect_server/portal/templates/devices/list.html @@ -33,21 +33,7 @@

Device Credentials

{% if credentials %} - {% for cred in credentials %} -
-
-
{{ cred.device_id }}
-
{{ cred.filename }}
-
- -
- {% endfor %} + {% for cred in credentials %}{% include "devices/_device_row.html" %}{% endfor %} {% else %}

No device credentials yet

diff --git a/packages/device-connect-server/device_connect_server/portal/views/agent_api.py b/packages/device-connect-server/device_connect_server/portal/views/agent_api.py index 0aae37c..cb9809f 100644 --- a/packages/device-connect-server/device_connect_server/portal/views/agent_api.py +++ b/packages/device-connect-server/device_connect_server/portal/views/agent_api.py @@ -72,6 +72,7 @@ def setup_routes(app: web.Application): r.add_get(PREFIX + "/devices/{device_id}/events", device_events) r.add_get(PREFIX + "/devices/{device_id}/credentials", device_credentials_get) r.add_post(PREFIX + "/devices/{device_id}/credentials:rotate", device_credentials_rotate) + r.add_post(PREFIX + "/devices/{device_id}/revoke", device_revoke) r.add_post(PREFIX + "/devices/{device_id}/invoke", device_invoke) r.add_post(PREFIX + "/invoke-with-fallback", invoke_with_fallback) r.add_get( @@ -576,6 +577,58 @@ async def device_credentials_rotate(request: web.Request) -> web.Response: return _ok({"filename": filename, "content": cred_data}, trace_id=trace) +async def device_revoke(request: web.Request) -> web.Response: + """Revoke a device: kill the backend account AND delete the local cred file. + + This is the full-revocation counterpart to ``device_delete`` (which + only decommissions the backend account) and to ``credentials:rotate`` + (which only re-issues). After ``revoke`` the device cannot reconnect + and disappears from the portal credentials list. + + Requires ``devices:provision`` — same scope as delete, since the + operation is at least as destructive. + """ + trace = _trace_id() + _, err = _require_scope(request, "devices:provision") + if err: + return err + tenant, err = _resolve_tenant(request) + if err: + return err + + device_id = request.match_info["device_id"] + full_name = _full_device_name(tenant, device_id) + filename = f"{full_name}.creds.json" + + # Same shape as the portal handler: backend revocation is best-effort + # (some backends may not support remove_device), but the file deletion + # is the part that makes the cred "disappear" from the operator's UI. + backend = get_backend() + remove = getattr(backend, "remove_device", None) + backend_error: str | None = None + if remove is not None: + try: + await remove(tenant, full_name) + await backend.reload_broker() + except Exception as e: + logger.exception("revoke: backend remove failed for %s/%s", tenant, full_name) + backend_error = str(e) + else: + backend_error = f"{backend.backend_name()} backend does not support remove_device" + + deleted = credentials_svc.delete_credential(filename) + if not deleted: + return _err(status=404, code="not_found", + message=f"Credential file not found: {filename}", + trace_id=trace) + + _audit(request, "revoke", trace_id=trace, device_id=full_name) + result = {"device_id": full_name, "revoked": True} + if backend_error: + result["backend_warning"] = backend_error + return _ok(result, trace_id=trace) + + async def device_delete(request: web.Request) -> web.Response: """Decommission a device. Requires devices:provision.""" trace = _trace_id() diff --git a/packages/device-connect-server/device_connect_server/portal/views/devices.py b/packages/device-connect-server/device_connect_server/portal/views/devices.py index 7f5bf1e..30533b0 100644 --- a/packages/device-connect-server/device_connect_server/portal/views/devices.py +++ b/packages/device-connect-server/device_connect_server/portal/views/devices.py @@ -23,6 +23,7 @@ def setup_routes(app: web.Application): app.router.add_get("/api/devices/agent-creds", download_agent_creds) app.router.add_get("/api/devices/demo-bundle", download_demo_bundle) app.router.add_get("/api/devices/{name}/creds", download_credential) + app.router.add_post("/api/devices/{name}/revoke", revoke_credential) app.router.add_get("/api/devices/bundle", download_bundle) @@ -138,7 +139,8 @@ async def create_device(request: web.Request): content_type="text/html", ) - # Return the new row as HTML fragment + # Return the new row as HTML fragment, with `highlight` flag on so + # the partial paints the brief green flash on the just-created row. cred = { "device_id": full_name, "filename": f"{full_name}.creds.json", @@ -146,6 +148,7 @@ async def create_device(request: web.Request): return aiohttp_jinja2.render_template("devices/_device_row.html", request, { "cred": cred, "user": user, + "highlight": True, }) @@ -175,6 +178,69 @@ async def download_credential(request: web.Request): ) +async def revoke_credential(request: web.Request): + """Revoke a device credential: kill the backend account, then delete the file. + + Returns an empty 200 body so an htmx swap can drop the row from the + page. The dashboard's JSON poll picks up the lower count and the + registry-entry removal on its next 10s tick. + + Auth: portal session, tenant-scoped. Admins may revoke across + tenants. Non-admins can only touch their own tenant's creds. + """ + user = request["user"] + tenant = user["tenant"] + device_name = request.match_info["name"] + filename = f"{device_name}.creds.json" + + cred_data = credentials.get_credential_data(filename) + if not cred_data: + raise web.HTTPNotFound(text=f"Credential file not found: {filename}") + cred_tenant = cred_data.get("tenant", "") + if cred_tenant != tenant and user.get("role") != "admin": + raise web.HTTPForbidden(text="Access denied: credential belongs to another tenant") + + # 1. Revoke the broker account so the device can no longer connect. + # Best-effort: a backend without remove_device returns 501-style + # behavior, but we still want the file gone in that case so the + # operator's UI reflects the intent. + backend = get_backend() + remove = getattr(backend, "remove_device", None) + backend_error: str | None = None + if remove is not None: + try: + await remove(cred_tenant, device_name) + await backend.reload_broker() + except Exception as e: + backend_error = str(e) + else: + backend_error = f"{backend.backend_name()} backend does not support remove_device" + + # 2. Delete the local credential file so it stops appearing in the + # portal's list. Done after the backend revocation so a backend + # failure doesn't leave us with a "ghost" file that points at a + # still-valid account. + deleted = credentials.delete_credential(filename) + if not deleted: + # File was there a moment ago (cred_data was non-None) but + # we couldn't unlink it. Surface this loudly. + raise web.HTTPInternalServerError( + text=f"Failed to remove credential file: {filename}" + + (f" (backend: {backend_error})" if backend_error else ""), + ) + + # The device's etcd registry entry expires on its TTL after the + # device disconnects; no explicit cleanup needed here. + + # Empty body — htmx's `hx-swap="delete"` removes the row from the + # page on a 2xx response. Surface backend errors as a non-blocking + # warning header so the operator at least sees it. + headers = {} + if backend_error: + headers["X-Revoke-Warning"] = backend_error + return web.Response(status=200, headers=headers, text="") + + async def download_bundle(request: web.Request): """Download a tenant credential bundle as .zip.""" user = request["user"] diff --git a/packages/device-connect-server/device_connect_server/portalctl/cli.py b/packages/device-connect-server/device_connect_server/portalctl/cli.py index fe4ccbb..00906f3 100644 --- a/packages/device-connect-server/device_connect_server/portalctl/cli.py +++ b/packages/device-connect-server/device_connect_server/portalctl/cli.py @@ -425,6 +425,27 @@ async def cmd_devices_delete(client: PortalClient, args) -> int: return _exit_for_status(status, body) +async def cmd_devices_revoke(client: PortalClient, args) -> int: + """Full revoke: kill the backend account and delete the credential file. + + Distinct from ``revoke-credentials`` (which only rotates: re-issues + creds, file stays) and from ``delete`` (which decommissions the + backend account but leaves the file on disk). After ``revoke`` the + cred is gone from the portal's "Credentials Created" count. + """ + if not args.confirm: + sys.stderr.write("revoke requires --confirm to proceed\n") + return 2 + params = {"tenant": args.tenant} if args.tenant else None + status, body = await client.request( + "POST", _device_subpath(args.device_id, "/revoke"), params=params, + ) + _maybe_print_error(status, body) + if 200 <= status < 300: + _emit(body, args.output) + return _exit_for_status(status, body) + + async def cmd_devices_invoke(client: PortalClient, args) -> int: try: params_obj = json.loads(args.params) if args.params else {} @@ -603,12 +624,21 @@ def _build_parser() -> argparse.ArgumentParser: d_rev.add_argument("device_id") d_rev.set_defaults(func=cmd_devices_revoke_credentials) - d_del = dsub.add_parser("delete", help="decommission a device") + d_del = dsub.add_parser("delete", help="decommission a device (leaves cred file)") d_del.add_argument("device_id") d_del.add_argument("--confirm", action="store_true", help="required: confirms the destructive action") d_del.set_defaults(func=cmd_devices_delete) + d_revoke = dsub.add_parser( + "revoke", + help="full revoke: kill backend account AND delete the cred file", + ) + d_revoke.add_argument("device_id") + d_revoke.add_argument("--confirm", action="store_true", + help="required: confirms the destructive action") + d_revoke.set_defaults(func=cmd_devices_revoke) + d_inv = dsub.add_parser("invoke", help="invoke a device function") d_inv.add_argument("device_id") d_inv.add_argument("function") From 35b7ebe931cecc999e712dd905cfdde3b6a2b665 Mon Sep 17 00:00:00 2001 From: Alexander Tsyplikhin Date: Thu, 28 May 2026 01:57:15 +0000 Subject: [PATCH 38/44] docs(portal): document devices revoke in AGENTS.md agent playbook MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three commands now overlap in the cred-lifecycle space, and the playbook only mentioned the first one: - revoke-credentials → rotate (re-issue, file replaced) - delete → decommission account, file lingers - revoke → kill account + delete file (new) Section 3 ("Provision a device") gets a new "Decommission, rotate, or revoke" subsection with a verb-vs-intent table so the agent picks the right one instead of reaching for `revoke-credentials` out of muscle memory. Quick-reference at the bottom lists all three with one-line semantics. The HTTP examples are included so non-CLI agent flows (CI tokens hitting /api/agent/v1/* directly) see the same surface. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../templates/coding_agents/AGENTS.md.j2 | 35 ++++++++++++++++--- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/packages/device-connect-server/device_connect_server/portal/templates/coding_agents/AGENTS.md.j2 b/packages/device-connect-server/device_connect_server/portal/templates/coding_agents/AGENTS.md.j2 index a55dbf2..7843f3b 100644 --- a/packages/device-connect-server/device_connect_server/portal/templates/coding_agents/AGENTS.md.j2 +++ b/packages/device-connect-server/device_connect_server/portal/templates/coding_agents/AGENTS.md.j2 @@ -151,9 +151,34 @@ curl -X POST -H "Authorization: Bearer $DEVICE_CONNECT_PORTAL_TOKEN" \ {{ portal_url }}/api/agent/v1/devices ``` -To decommission: `DELETE /api/agent/v1/devices/{id}` or -`dc-portalctl devices revoke-credentials …`. Decommission revokes the JWT — -the device process will be disconnected on its next NATS reconnect. +### Decommission, rotate, or revoke + +Three commands, three different intents — pick the verb that matches what you +actually want to happen. + +| Command | Broker account | Local cred file | When to use | +|---|---|---|---| +| `dc-portalctl devices revoke-credentials ` | re-issued (rotates) | replaced with new contents | Same device should come back with a new identity (e.g. suspected leak). | +| `dc-portalctl devices delete --confirm` | removed | **left on disk** | Rarely the right choice on its own — the cred file lingers and shows up in the portal even though it's no longer valid. | +| `dc-portalctl devices revoke --confirm` | removed | **deleted** | Default for "this credential is gone." The device disconnects on next reconnect and the credential disappears from the portal counter and `/devices` list. | + +HTTP equivalents (all need `devices:provision` scope): + +```bash +# Rotate +curl -X POST -H "Authorization: Bearer $DEVICE_CONNECT_PORTAL_TOKEN" \ + {{ portal_url }}/api/agent/v1/devices/{{ example_device_id }}/credentials:rotate + +# Delete (decommission only) +curl -X DELETE -H "Authorization: Bearer $DEVICE_CONNECT_PORTAL_TOKEN" \ + {{ portal_url }}/api/agent/v1/devices/{{ example_device_id }} + +# Revoke (the full one) +curl -X POST -H "Authorization: Bearer $DEVICE_CONNECT_PORTAL_TOKEN" \ + {{ portal_url }}/api/agent/v1/devices/{{ example_device_id }}/revoke +``` + +Either decommission flow disconnects the device on its next NATS reconnect. --- @@ -427,7 +452,9 @@ dc-portalctl devices events dc-portalctl devices provision --device-type X [--location Y] \ --creds-output-file dc-portalctl devices credentials --output-file -dc-portalctl devices revoke-credentials +dc-portalctl devices revoke-credentials # rotate: new creds, file replaced +dc-portalctl devices delete --confirm # decommission account, leaves cred file +dc-portalctl devices revoke --confirm # full revoke: account + cred file gone # Invocation dc-portalctl devices invoke --params '{…}' --reason "…" From 55aeb1fa9c544d9a5e37970339b29366e3613bc3 Mon Sep 17 00:00:00 2001 From: Alexander Tsyplikhin Date: Thu, 28 May 2026 04:25:03 +0000 Subject: [PATCH 39/44] fix(portal,edge): tighten revoke failure modes; close dashboard poll race Five small fixes from review round 5. Revoke endpoint (portal devices view + agent_api): - Hard backend failure (remove_device raised) used to delete the local cred file anyway, leaving a "ghost file pointing at a still-valid account" with no operator signal. Now: 502 + file kept in place so the operator can retry once the backend is healthy. Soft case (backend has no remove_device at all) is unchanged -- file is still deleted so the UI matches intent. - Backend-succeeded + file-delete-failed used to return a bare 500 that implied the whole operation failed. Now the 4xx/5xx response explicitly states the account is already revoked so the operator doesn't undo a partial-success state by retrying. - Corrected the misleading inline comment that claimed the previous order prevented ghost files (it did the opposite on failure). Dashboard JSON poll (dashboard.html + admin/tenant_detail.html): - Orphan-row race: poll N issues insertNewDeviceRow, poll N+1 fires before the row-html fetch lands and removeDeviceRow runs as a no-op, then the fetch response appends an orphan row that sticks around for ~10s. Now removeDeviceRow clears the pending flag and the insert callback discards a response whose flag is gone. Mirrored in tenant_detail.html. - Detail-load error path used unescaped `err` interpolation into innerHTML. In practice err is an Error so toString() is safe, but the pattern is fragile; switched to textContent on a constructed child node. Device SDK (device.py): - Documented the timeout/TTL interaction: a slow registry response near the 15s timeout default can land after the 15s lease TTL expires, leaving a brief "registered then immediately gone" window that self-heals via the next heartbeat's requestRegistration round-trip. Operators raising DEVICE_CONNECT_REGISTER_TIMEOUT should bump `ttl` in lockstep. Portal cost model (dashboard.py): - Documented that live_devices_json walks the full fleet via a full etcd get_prefix per page. At ~1400 devices with the default 100-device page that's ~14 etcd scans per JSON-poll tick per dashboard; the pagination fix bounds NATS payload but not registry CPU. Points operators at the two tuning knobs and the documented next iteration (selector pushdown / keyset). Tests: - test_portal_agent_api.py: new TestDeviceRevoke class with four cases: * test_hard_backend_failure_keeps_file -- pins the 502 + the critical "delete_credential never called" assertion. * test_unsupported_backend_is_soft_success -- backend with no remove_device still drops the file and surfaces a warning. * test_backend_succeeds_then_file_missing_explains_partial -- 404 message names the partial-success state. * test_happy_path -- backend removed + reloaded + 200. Server tests: 376 passed. Edge tests: 515 passed. Ruff: clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../device_connect_edge/device.py | 12 ++ .../portal/templates/admin/tenant_detail.html | 11 +- .../portal/templates/dashboard.html | 37 +++- .../portal/views/agent_api.py | 32 +++- .../portal/views/dashboard.py | 15 +- .../portal/views/devices.py | 47 +++-- .../test_portal_agent_api.py | 168 ++++++++++++++++++ 7 files changed, 291 insertions(+), 31 deletions(-) diff --git a/packages/device-connect-edge/device_connect_edge/device.py b/packages/device-connect-edge/device_connect_edge/device.py index 880c415..b4d52d9 100644 --- a/packages/device-connect-edge/device_connect_edge/device.py +++ b/packages/device-connect-edge/device_connect_edge/device.py @@ -114,6 +114,18 @@ def _env_float(name: str, default: float) -> float: # development. Operators at fleet scale should bump this via # DEVICE_CONNECT_REGISTER_JITTER=10 (or higher) to spread the herd # further. +# +# Lease-TTL interaction: the registry creates the etcd lease at the +# moment _do_register runs, so if a slow registry takes ~timeout +# seconds to reply, the lease can be near-expired before the heartbeat +# loop emits its first beat (`run()` awaits _register before starting +# the heartbeat task). With the 15s timeout default and the 15s `ttl` +# default that race is real; it self-heals — the next heartbeat fires +# `has_lease()=False` on the registry and triggers a requestRegistration +# round-trip — but operators raising DEVICE_CONNECT_REGISTER_TIMEOUT +# (or running a stressed registry where requests routinely take >ttl/3) +# should raise `ttl` in lockstep or shorten `heartbeat_interval` so +# the first beat lands inside the lease window. _REGISTER_REQUEST_TIMEOUT = _env_float("DEVICE_CONNECT_REGISTER_TIMEOUT", 15.0) _REGISTER_STARTUP_JITTER = _env_float("DEVICE_CONNECT_REGISTER_JITTER", 2.0) diff --git a/packages/device-connect-server/device_connect_server/portal/templates/admin/tenant_detail.html b/packages/device-connect-server/device_connect_server/portal/templates/admin/tenant_detail.html index fe71fa9..878d80d 100644 --- a/packages/device-connect-server/device_connect_server/portal/templates/admin/tenant_detail.html +++ b/packages/device-connect-server/device_connect_server/portal/templates/admin/tenant_detail.html @@ -84,7 +84,11 @@

Live Devices

}) .catch(function(err) { delete slot.dataset.loading; - slot.innerHTML = '

Failed to load details: ' + err + '

'; + slot.innerHTML = ''; + var p = document.createElement('p'); + p.className = 'text-xs text-red-500'; + p.textContent = 'Failed to load details: ' + (err && err.message ? err.message : err); + slot.appendChild(p); }); } @@ -145,6 +149,7 @@

Live Devices

return true; } +// See dashboard.html for the orphan-row race this guards against. window._pendingInserts = window._pendingInserts || {}; function insertNewDeviceRow(deviceId, capsHash) { @@ -154,6 +159,8 @@

Live Devices

{ credentials: 'same-origin' }) .then(function(r) { return r.ok ? r.text() : null; }) .then(function(html) { + // Cancelled by an interleaved removeDeviceRow — discard. + if (!window._pendingInserts[deviceId]) return; delete window._pendingInserts[deviceId]; if (!html) return; if (document.getElementById('summary-' + deviceId)) return; @@ -173,6 +180,8 @@

Live Devices

if (summary && summary.parentElement) summary.parentElement.removeChild(summary); if (detail && detail.parentElement) detail.parentElement.removeChild(detail); delete window._deviceCapsHash[deviceId]; + // Cancel any in-flight insert (see dashboard.html comment). + delete window._pendingInserts[deviceId]; if (typeof closeEventLog === 'function') closeEventLog(deviceId); } diff --git a/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html b/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html index fb7e302..5442973 100644 --- a/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html +++ b/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html @@ -82,7 +82,14 @@

Live Devices

}) .catch(function(err) { delete slot.dataset.loading; - slot.innerHTML = '

Failed to load details: ' + err + '

'; + // Use textContent on a child so an Error message that happens + // to contain HTML (or future server-side error text routed + // through this path) can't inject markup. + slot.innerHTML = ''; + var p = document.createElement('p'); + p.className = 'text-xs text-red-500'; + p.textContent = 'Failed to load details: ' + (err && err.message ? err.message : err); + slot.appendChild(p); }); } @@ -160,8 +167,19 @@

Live Devices

return true; } -// In-flight set so a slow row-html fetch doesn't get re-issued by a -// subsequent poll firing before the first response lands. +// In-flight map so a slow row-html fetch doesn't get re-issued by a +// subsequent poll firing before the first response lands. We track +// "still wanted" too: if the device disappears from a JSON poll +// between issuing the fetch and the response landing, we drop the +// response on the floor instead of appending an orphan row. +// +// Race we're guarding against: +// poll N : device X present -> insertNewDeviceRow(X) starts fetch +// poll N+1 : device X absent -> removeDeviceRow(X) is a no-op +// (no summary row in DOM yet) +// fetch lands: append row for X -> orphan row until next poll +// With the "still wanted" gate, removeDeviceRow clears the pending +// flag and the late response is discarded. window._pendingInserts = window._pendingInserts || {}; function insertNewDeviceRow(deviceId, capsHash, tbody) { @@ -171,11 +189,14 @@

Live Devices

{ credentials: 'same-origin' }) .then(function(r) { return r.ok ? r.text() : null; }) .then(function(html) { + // Was this insert cancelled by a removeDeviceRow that ran + // between fetch start and response landing? If so the flag is + // gone -- discard the response. + if (!window._pendingInserts[deviceId]) return; delete window._pendingInserts[deviceId]; if (!html) return; - // Re-check: a parallel poll or the device disappearing in the - // ~hundreds of ms since we issued the fetch may have changed - // things. + // Re-check the DOM: a parallel poll may have already inserted + // the row, in which case we must not duplicate it. if (document.getElementById('summary-' + deviceId)) return; var liveTbody = document.querySelector('#live-devices tbody'); if (!liveTbody) return; @@ -193,6 +214,10 @@

Live Devices

if (summary && summary.parentElement) summary.parentElement.removeChild(summary); if (detail && detail.parentElement) detail.parentElement.removeChild(detail); delete window._deviceCapsHash[deviceId]; + // Cancel any in-flight insertNewDeviceRow fetch for this id: the + // response landing after we've decided this device is gone would + // otherwise re-add an orphan row. + delete window._pendingInserts[deviceId]; // Tear down any live SSE drawer for this device. if (typeof closeEventLog === 'function') closeEventLog(deviceId); } diff --git a/packages/device-connect-server/device_connect_server/portal/views/agent_api.py b/packages/device-connect-server/device_connect_server/portal/views/agent_api.py index cb9809f..e458a19 100644 --- a/packages/device-connect-server/device_connect_server/portal/views/agent_api.py +++ b/packages/device-connect-server/device_connect_server/portal/views/agent_api.py @@ -600,13 +600,16 @@ async def device_revoke(request: web.Request) -> web.Response: full_name = _full_device_name(tenant, device_id) filename = f"{full_name}.creds.json" - # Same shape as the portal handler: backend revocation is best-effort - # (some backends may not support remove_device), but the file deletion - # is the part that makes the cred "disappear" from the operator's UI. + # Same shape as the portal handler: a backend without remove_device + # is a soft success (file still deleted so the operator's UI matches + # intent), but a hard failure from a backend that DOES support + # remove_device leaves the file in place so the operator can retry. + # Otherwise we'd risk a ghost file pointing at a still-valid account. backend = get_backend() remove = getattr(backend, "remove_device", None) + backend_supported = remove is not None backend_error: str | None = None - if remove is not None: + if backend_supported: try: await remove(tenant, full_name) await backend.reload_broker() @@ -616,11 +619,28 @@ async def device_revoke(request: web.Request) -> web.Response: else: backend_error = f"{backend.backend_name()} backend does not support remove_device" + if backend_supported and backend_error is not None: + # Hard backend failure: report it and keep the file. 502 tells + # the caller the upstream broker is the problem, not the portal. + return _err(status=502, code="backend_revoke_failed", + message=f"Backend revocation failed for {full_name}: " + f"{backend_error}. Credential file left in place; " + f"retry once the backend is healthy.", + trace_id=trace) + deleted = credentials_svc.delete_credential(filename) if not deleted: + # File was there a moment ago (or the backend successfully + # removed it); name the fact that the account is already gone + # so the caller knows the partial-success state. + message = f"Credential file not found: {filename}" + if backend_supported and backend_error is None: + message += ( + ". Backend account was already revoked; only the local " + "file is missing." + ) return _err(status=404, code="not_found", - message=f"Credential file not found: {filename}", - trace_id=trace) + message=message, trace_id=trace) _audit(request, "revoke", trace_id=trace, device_id=full_name) result = {"device_id": full_name, "revoked": True} diff --git a/packages/device-connect-server/device_connect_server/portal/views/dashboard.py b/packages/device-connect-server/device_connect_server/portal/views/dashboard.py index 668aa69..c5133d1 100644 --- a/packages/device-connect-server/device_connect_server/portal/views/dashboard.py +++ b/packages/device-connect-server/device_connect_server/portal/views/dashboard.py @@ -115,10 +115,17 @@ async def live_devices_json(request: web.Request): state survive. ``capabilities_hash`` lets the client decide whether an already-expanded detail panel needs re-fetching. - Minimum scope: new devices appearing or existing devices - disappearing are only reflected after a page reload; the page - reload was the failure mode the JSON poll was added to avoid for - *values*, not for fleet membership. + Cost: ``registry_client.list_live_devices`` paginates through the + full tenant fleet via the registry RPC; the registry, in turn, + does a full ``etcd get_prefix`` + JSON-decode per page (see + ``DeviceRegistry.list_devices_page``). At ~1400 devices with the + default 100-device page that's ~14 etcd scans per JSON-poll tick, + per dashboard. The pagination fix bounds NATS *payload* size but + NOT registry CPU; if the portal is being polled by many concurrent + operators, raise ``DEVICE_CONNECT_LIST_PAGE_SIZE`` (with the + matching ``DC_LIST_DEVICES_MAX_LIMIT`` and NATS ``max_payload``) + or lengthen the client poll interval. Selector pushdown / keyset + pagination is the documented next iteration. """ tenant = _resolve_tenant(request) devices = [] diff --git a/packages/device-connect-server/device_connect_server/portal/views/devices.py b/packages/device-connect-server/device_connect_server/portal/views/devices.py index 30533b0..a5247b9 100644 --- a/packages/device-connect-server/device_connect_server/portal/views/devices.py +++ b/packages/device-connect-server/device_connect_server/portal/views/devices.py @@ -201,13 +201,16 @@ async def revoke_credential(request: web.Request): raise web.HTTPForbidden(text="Access denied: credential belongs to another tenant") # 1. Revoke the broker account so the device can no longer connect. - # Best-effort: a backend without remove_device returns 501-style - # behavior, but we still want the file gone in that case so the - # operator's UI reflects the intent. + # A backend without remove_device is a "soft success": the file + # is still deleted so the operator's UI reflects intent, but a + # real failure from a backend that DOES support remove_device is + # not — leaving a ghost file pointing at a still-valid account + # is worse than leaving an orphan file the operator can retry. backend = get_backend() remove = getattr(backend, "remove_device", None) + backend_supported = remove is not None backend_error: str | None = None - if remove is not None: + if backend_supported: try: await remove(cred_tenant, device_name) await backend.reload_broker() @@ -216,25 +219,41 @@ async def revoke_credential(request: web.Request): else: backend_error = f"{backend.backend_name()} backend does not support remove_device" - # 2. Delete the local credential file so it stops appearing in the - # portal's list. Done after the backend revocation so a backend - # failure doesn't leave us with a "ghost" file that points at a - # still-valid account. + # 2. Delete the local credential file. Only attempted when the + # backend revocation succeeded (or the backend doesn't support + # it at all); a hard backend failure leaves the file in place + # so the operator can retry once the backend is healthy. + if backend_supported and backend_error is not None: + # Hard backend failure: surface it, keep the file. The + # 502 (rather than 500) tells the operator the upstream + # broker is the problem, not the portal. + raise web.HTTPBadGateway( + text=f"Backend revocation failed for {device_name}: {backend_error}. " + f"Credential file left in place; retry once the backend is healthy.", + ) + deleted = credentials.delete_credential(filename) if not deleted: - # File was there a moment ago (cred_data was non-None) but - # we couldn't unlink it. Surface this loudly. + # File was there a moment ago (cred_data was non-None) but we + # couldn't unlink it. If the backend revoke already succeeded, + # the account is gone — say so explicitly rather than implying + # the whole operation failed. + suffix = ( + " Backend account was already revoked; only the local file " + "remains and must be removed by hand." + if backend_supported and backend_error is None + else "" + ) raise web.HTTPInternalServerError( - text=f"Failed to remove credential file: {filename}" - + (f" (backend: {backend_error})" if backend_error else ""), + text=f"Failed to remove credential file: {filename}.{suffix}", ) # The device's etcd registry entry expires on its TTL after the # device disconnects; no explicit cleanup needed here. # Empty body — htmx's `hx-swap="delete"` removes the row from the - # page on a 2xx response. Surface backend errors as a non-blocking - # warning header so the operator at least sees it. + # page on a 2xx response. Surface backend warnings (e.g. unsupported + # backend) as a non-blocking header so the operator at least sees it. headers = {} if backend_error: headers["X-Revoke-Warning"] = backend_error diff --git a/packages/device-connect-server/tests/device_connect_server/test_portal_agent_api.py b/packages/device-connect-server/tests/device_connect_server/test_portal_agent_api.py index 287d299..d2a0d28 100644 --- a/packages/device-connect-server/tests/device_connect_server/test_portal_agent_api.py +++ b/packages/device-connect-server/tests/device_connect_server/test_portal_agent_api.py @@ -451,6 +451,174 @@ async def rpc_invoke(self, tenant, full_name, fn, params, timeout): assert seen["timeout"] == agent_api.MAX_INVOKE_TIMEOUT_S +# ── revoke endpoint failure modes (regression) ──────────────────── + + +@pytest.fixture +def provision_record(): + return { + "token_id": "prov0", + "username": "alice", + "tenant": "acme", + "role": "user", + "scopes": ["devices:provision"], + "created_at": "2026-05-01T00:00:00+00:00", + } + + +@pytest.fixture +async def provision_client(provision_record): + app = _build_app() + server = TestServer(app) + async with server: + async with TestClient(server) as cli: + with patch.object(tokens_svc, "verify_token", return_value=provision_record): + yield cli + + +class _RevokeBackend: + """Minimal stand-in for the credentials backend in revoke tests. + + - ``supports_remove=False`` simulates a backend with no + ``remove_device`` attribute (e.g. plain MQTT). The revoke + handler must treat this as a soft success and still drop the + local file. + - ``raise_on_remove`` simulates a hard backend failure (the broker + rejected the revoke). The handler must NOT delete the local + file in that case, so the operator can retry once the backend + is healthy. + """ + + def __init__(self, *, supports_remove: bool = True, + raise_on_remove: Exception | None = None): + self._supports = supports_remove + self._raise = raise_on_remove + self.removed: list[tuple[str, str]] = [] + self.reloaded = 0 + if supports_remove: + async def _remove(tenant: str, full_name: str) -> None: + if self._raise is not None: + raise self._raise + self.removed.append((tenant, full_name)) + self.remove_device = _remove + + def backend_name(self) -> str: + return "test" + + async def reload_broker(self) -> None: + self.reloaded += 1 + + +class TestDeviceRevoke: + async def test_hard_backend_failure_keeps_file(self, provision_client): + """If remove_device raises, the local cred file must remain in place. + + The old behavior deleted the file regardless, leaving a + ghost-file-pointing-at-still-valid-account state with no + operator-visible signal. Now the handler returns 502 and the + delete_credential helper is never called. + """ + backend = _RevokeBackend(raise_on_remove=RuntimeError("nats unreachable")) + deleted = [] + + def _delete(filename): + deleted.append(filename) + return True + + with patch( + "device_connect_server.portal.views.agent_api.get_backend", + return_value=backend, + ), patch( + "device_connect_server.portal.views.agent_api.credentials_svc.delete_credential", + side_effect=_delete, + ): + r = await provision_client.post( + "/api/agent/v1/devices/cam-001/revoke", headers=H(), + ) + assert r.status == 502 + body = await r.json() + assert body["error"]["code"] == "backend_revoke_failed" + # The critical assertion: file is NOT deleted on backend failure. + assert deleted == [] + + async def test_unsupported_backend_is_soft_success(self, provision_client): + """A backend without remove_device must still delete the file. + + Older / MQTT-style backends never had a remove primitive; the + only thing the operator can do is drop the file so the UI + reflects intent. + """ + backend = _RevokeBackend(supports_remove=False) + deleted = [] + + def _delete(filename): + deleted.append(filename) + return True + + with patch( + "device_connect_server.portal.views.agent_api.get_backend", + return_value=backend, + ), patch( + "device_connect_server.portal.views.agent_api.credentials_svc.delete_credential", + side_effect=_delete, + ): + r = await provision_client.post( + "/api/agent/v1/devices/cam-001/revoke", headers=H(), + ) + assert r.status == 200 + body = await r.json() + assert body["result"]["revoked"] is True + assert body["result"]["device_id"] == "acme-cam-001" + # The unsupported-backend warning must still surface so an + # operator who expected a real revoke knows it didn't happen. + assert "backend_warning" in body["result"] + assert deleted == ["acme-cam-001.creds.json"] + + async def test_backend_succeeds_then_file_missing_explains_partial( + self, provision_client, + ): + """File-delete fails after backend revoke succeeded: 404 must + say the account is already revoked so the operator doesn't + think the whole operation failed.""" + backend = _RevokeBackend() # remove_device succeeds + + with patch( + "device_connect_server.portal.views.agent_api.get_backend", + return_value=backend, + ), patch( + "device_connect_server.portal.views.agent_api.credentials_svc.delete_credential", + return_value=False, + ): + r = await provision_client.post( + "/api/agent/v1/devices/cam-001/revoke", headers=H(), + ) + assert r.status == 404 + body = await r.json() + # Be tolerant about exact wording, but the message must + # name the partial-success state explicitly. + msg = body["error"]["message"].lower() + assert "already revoked" in msg or "account" in msg + + async def test_happy_path(self, provision_client): + backend = _RevokeBackend() + with patch( + "device_connect_server.portal.views.agent_api.get_backend", + return_value=backend, + ), patch( + "device_connect_server.portal.views.agent_api.credentials_svc.delete_credential", + return_value=True, + ): + r = await provision_client.post( + "/api/agent/v1/devices/cam-001/revoke", headers=H(), + ) + assert r.status == 200 + body = await r.json() + assert body["result"]["revoked"] is True + assert "backend_warning" not in body["result"] + assert backend.removed == [("acme", "acme-cam-001")] + assert backend.reloaded == 1 + + # ── invoke-with-fallback duplicate device id (regression) ───────── From add28a6c09fa2dea765f52c1a4db3b13399c289d Mon Sep 17 00:00:00 2001 From: Alexander Tsyplikhin Date: Thu, 28 May 2026 05:03:40 +0000 Subject: [PATCH 40/44] chore: bump all three packages 0.2.3 -> 0.2.4 Lockstep release-workflow bump so the v0.2.4 tag built from this PR publishes device-connect-edge, device-connect-server, and device-connect-agent-tools together (matches the pattern from #35). Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/device-connect-agent-tools/pyproject.toml | 2 +- packages/device-connect-edge/pyproject.toml | 2 +- packages/device-connect-server/pyproject.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/device-connect-agent-tools/pyproject.toml b/packages/device-connect-agent-tools/pyproject.toml index da69ac7..9b603d0 100644 --- a/packages/device-connect-agent-tools/pyproject.toml +++ b/packages/device-connect-agent-tools/pyproject.toml @@ -8,7 +8,7 @@ build-backend = "setuptools.build_meta" [project] name = "device-connect-agent-tools" -version = "0.2.3" +version = "0.2.4" description = "Framework-agnostic tools for Device Connect — discover and invoke IoT devices over NATS/Zenoh" readme = "README.md" requires-python = ">=3.11" diff --git a/packages/device-connect-edge/pyproject.toml b/packages/device-connect-edge/pyproject.toml index 8a7a733..2b4a056 100644 --- a/packages/device-connect-edge/pyproject.toml +++ b/packages/device-connect-edge/pyproject.toml @@ -8,7 +8,7 @@ build-backend = "setuptools.build_meta" [project] name = "device-connect-edge" -version = "0.2.3" +version = "0.2.4" description = "Device Connect Edge — lightweight edge device runtime with Zenoh/NATS messaging and D2D communication" readme = "README.md" requires-python = ">=3.11" diff --git a/packages/device-connect-server/pyproject.toml b/packages/device-connect-server/pyproject.toml index 84f685d..9195164 100644 --- a/packages/device-connect-server/pyproject.toml +++ b/packages/device-connect-server/pyproject.toml @@ -8,7 +8,7 @@ build-backend = "setuptools.build_meta" [project] name = "device-connect-server" -version = "0.2.3" +version = "0.2.4" description = "Device Connect — edge device runtime with Zenoh/NATS messaging, D2D communication, and IoT orchestration" readme = "README.md" requires-python = ">=3.11" From 32c814fadb66278aafb09a353794219651fa98be Mon Sep 17 00:00:00 2001 From: Alexander Tsyplikhin Date: Thu, 28 May 2026 22:09:41 +0000 Subject: [PATCH 41/44] fix(portal): route live-fragment reload through htmx so OOB counters relocate reloadLiveFragment() used a raw fetch + innerHTML, which bypasses htmx and leaves the fragment's hx-swap-oob counter spans (online/registered/ creds) rendering as literal text below the table (the stray "1 1 17"). Route the reload through htmx.ajax() so the OOB swap is honored and the spans land in the header cards, matching the initial-load path. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../portal/templates/dashboard.html | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html b/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html index 5442973..4826b0f 100644 --- a/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html +++ b/packages/device-connect-server/device_connect_server/portal/templates/dashboard.html @@ -228,14 +228,15 @@

Live Devices

// can't bridge that gap because there's no to append to / // remove from. function reloadLiveFragment() { - fetch('/api/devices/live?tenant={{ tenant }}', { credentials: 'same-origin' }) - .then(function(r) { return r.ok ? r.text() : null; }) - .then(function(html) { - if (html == null) return; - var ld = document.getElementById('live-devices'); - if (ld) ld.innerHTML = html; - }) - .catch(function() {}); + // Route through htmx (not a raw fetch + innerHTML) so the fragment's + // hx-swap-oob counter spans are relocated into the header cards + // instead of rendering as literal text below the table. + if (window.htmx) { + htmx.ajax('GET', '/api/devices/live?tenant={{ tenant }}', { + target: '#live-devices', + swap: 'innerHTML', + }); + } } function pollDevices() { From 3be626522bd29ef94ffca1869f0aed5bfae24740 Mon Sep 17 00:00:00 2001 From: Alexander Tsyplikhin Date: Thu, 28 May 2026 18:57:52 +0000 Subject: [PATCH 42/44] security_infra: optional WebSocket listener for browser-based devices Adds an opt-in `--enable-websocket` flag to setup_deployment.sh that appends a `websocket {}` block to the generated NATS config. Default behavior is unchanged -- existing deployments are untouched. * setup_deployment.sh: --enable-websocket, --websocket-port, --websocket-allowed-origins, --websocket-tls-cert/--websocket-tls-key. TLS pair is both-or-neither; without TLS args, the listener is plain WS and intended to be fronted by a reverse proxy that terminates TLS. * infra/docker-compose-nats-websocket.yml: compose override that exposes the WS port. Binds to 127.0.0.1 by default; the bind interface is overridable via DC_NATS_WS_BIND, but the README warns loudly against exposing plain WS publicly. * security_infra/README.md: "Browser-based devices (WebSocket)" section covering the deployment shape, the reverse-proxy sketch (Caddy), and scoping shared browser credentials with nsc (token-prefix wildcards). Operator-mode JWT auth applies identically to WS and TCP clients -- this adds a transport, not a new auth path. Validated by generating configs in all three modes (no-TLS + allowed-origins, native TLS, defaults) and running `nats-server -t` against each. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../infra/docker-compose-nats-websocket.yml | 30 +++++ .../security_infra/README.md | 55 ++++++++ .../security_infra/setup_deployment.sh | 118 +++++++++++++++--- 3 files changed, 187 insertions(+), 16 deletions(-) create mode 100644 packages/device-connect-server/infra/docker-compose-nats-websocket.yml diff --git a/packages/device-connect-server/infra/docker-compose-nats-websocket.yml b/packages/device-connect-server/infra/docker-compose-nats-websocket.yml new file mode 100644 index 0000000..837e738 --- /dev/null +++ b/packages/device-connect-server/infra/docker-compose-nats-websocket.yml @@ -0,0 +1,30 @@ +# Copyright (c) 2024-2026, Arm Limited and Contributors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +# Override: expose the NATS WebSocket listener to the host for browser-based +# devices. Combine with the main multi-tenant compose file: +# +# docker compose -f infra/docker-compose-multitenant-nats.yml \ +# -f infra/docker-compose-nats-websocket.yml up -d +# +# Prerequisite: the generated NATS config must include a `websocket {}` block. +# Pass --enable-websocket to security_infra/setup_deployment.sh. +# +# IMPORTANT: +# The port is bound to 127.0.0.1 by default. The listener inside the +# container is plain WS (unless --websocket-tls-cert/--websocket-tls-key +# were passed to setup). A reverse proxy (Caddy, nginx, etc.) MUST do +# TLS termination on the host before the port is exposed to the network -- +# without TLS, NATS JWTs travel in cleartext. +# +# To deliberately expose the WS listener on a different interface, +# override DC_NATS_WS_BIND, e.g.: +# DC_NATS_WS_BIND=127.0.0.1:8443 docker compose -f ... up -d # default +# DC_NATS_WS_BIND=10.0.0.5:8443 docker compose -f ... up -d # LAN only +# Never set it to 0.0.0.0 without TLS termination. + +services: + nats: + ports: + - "${DC_NATS_WS_BIND:-127.0.0.1:8443}:${DC_NATS_WS_PORT:-8443}" diff --git a/packages/device-connect-server/security_infra/README.md b/packages/device-connect-server/security_infra/README.md index 7608c0d..d0a45f6 100644 --- a/packages/device-connect-server/security_infra/README.md +++ b/packages/device-connect-server/security_infra/README.md @@ -25,6 +25,7 @@ Tools for setting up NATS JWT authentication and multi-tenant isolation for Devi - [2. Application-level tenant namespacing](#2-application-level-tenant-namespacing) - [What about Zenoh?](#what-about-zenoh) - [Connecting Devices](#connecting-devices) + - [Browser-based devices (WebSocket)](#browser-based-devices-websocket) - [Script Reference](#script-reference) - [gen\_creds.sh flags](#gen_credssh-flags) - [Environment variables](#environment-variables) @@ -304,6 +305,60 @@ asyncio.run(main()) The device will register in the `alpha` tenant, and only devices within `alpha` will discover it. +## Browser-based devices (WebSocket) + +Browsers can't speak NATS over raw TCP — they need WebSocket. `setup_deployment.sh` has an opt-in `--enable-websocket` flag that adds a `websocket {}` block to the generated NATS config. The same operator/account JWT auth applies to WS clients; it's a transport, not a new auth path. + +```bash +./setup_deployment.sh \ + --nats-host dc.example.com \ + --enable-websocket \ + --websocket-port 8443 \ + --websocket-allowed-origins https://lights.example.com +``` + +Then bring NATS up with the WebSocket compose override so the port is exposed on the host: + +```bash +docker compose -f infra/docker-compose-multitenant-nats.yml \ + -f infra/docker-compose-nats-websocket.yml up -d +``` + +The override binds the port to `127.0.0.1:8443` by default. **Plain WS on the listener is intentional**: the assumption is that a reverse proxy (Caddy, nginx, Cloudflare Tunnel, ...) terminates TLS and proxies to loopback. Without TLS in front, NATS JWTs travel in cleartext. + +To skip the reverse proxy and have NATS do TLS termination natively, pass `--websocket-tls-cert` and `--websocket-tls-key` to `setup_deployment.sh`. The resulting block uses `tls { ... }` instead of `no_tls: true`, and you can safely expose the port directly (override `DC_NATS_WS_BIND=0.0.0.0:8443` in the env when running compose). + +### Reverse-proxy sketch (Caddy) + +```caddy +app.example.com { + @nats path /nats /nats/* + reverse_proxy @nats 127.0.0.1:8443 + reverse_proxy 127.0.0.1:8000 # your page / app +} +``` + +The browser device library (`nats.ws` / `@nats-io/nats-core`) connects to `wss://app.example.com/nats` and authenticates with the JWT just like a TCP client would. From the broker's perspective every browser is an ordinary NATS client. + +### Subject scoping for browser credentials + +Browser JWTs are easier to exfiltrate than server-side ones — anyone who can open dev tools sees them. For shared-credential use cases (a swarm of read-only viewers, a kiosk, audience phones), narrow the credential's pub/sub scope with `nsc` before distributing it: + +```bash +# Example: an audience credential scoped to a single subject subtree. +# Device IDs must be dot-separated (NATS wildcards match whole tokens): +# device-connect..audience...event. +nsc edit user audience-shared --account DEVICE_CONNECT \ + --allow-pub 'device-connect.alpha.audience.>' \ + --allow-pub 'device-connect.alpha.registry' \ + --allow-pub '_INBOX.>' \ + --allow-sub 'device-connect.alpha.audience.>' \ + --allow-sub 'device-connect.alpha.broadcast' \ + --allow-sub '_INBOX.>' +``` + +A leaked credential can then only reach the audience subtree, never cameras, robots, or the orchestrator. + ## Script Reference | Script | Purpose | diff --git a/packages/device-connect-server/security_infra/setup_deployment.sh b/packages/device-connect-server/security_infra/setup_deployment.sh index b6ee409..2b17cf2 100755 --- a/packages/device-connect-server/security_infra/setup_deployment.sh +++ b/packages/device-connect-server/security_infra/setup_deployment.sh @@ -22,33 +22,82 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" NATS_HOST="" NATS_PORT="4222" +ENABLE_WEBSOCKET=0 +WS_PORT="8443" +WS_ALLOWED_ORIGINS="" +WS_TLS_CERT="" +WS_TLS_KEY="" usage() { - echo "Usage: $0 --nats-host HOST [--nats-port PORT]" - echo "" - echo "One-time bootstrap for multi-tenant NATS JWT infrastructure." - echo "Creates the NATS operator/account and privileged credentials" - echo "(registry, facilitator)." - echo "" - echo "Options:" - echo " --nats-host HOST Public hostname or IP of the NATS server (required)" - echo " --nats-port PORT NATS port (default: 4222)" - echo " -h, --help Show this help" - echo "" - echo "After running this, use manage_tenants.sh to create tenants." + cat <<'USAGE' +Usage: setup_deployment.sh --nats-host HOST [options] + +One-time bootstrap for multi-tenant NATS JWT infrastructure. Creates the +NATS operator/account and privileged credentials (registry, facilitator). + +Required: + --nats-host HOST Public hostname or IP of the NATS server. + +Optional: + --nats-port PORT NATS TCP port (default: 4222). + +Browser-based devices (WebSocket): + --enable-websocket Add a `websocket {}` block to the generated + NATS config so browser-based devices can + connect over WS (nats.ws / @nats-io/nats-core). + OFF by default; existing deployments are + unaffected. + --websocket-port PORT WS listen port inside the container (default: 8443). + --websocket-allowed-origins LIST + Comma-separated list of allowed Origin headers. + Defaults to empty, which keeps nats-server's + same_origin=true behavior. Set this only when + a reverse proxy rewrites Host headers (e.g. + the page is at https://app.example.com and + the WS endpoint is wss://app.example.com/nats + proxied to a local NATS). + --websocket-tls-cert FILE Native TLS cert (path inside the NATS + --websocket-tls-key FILE container). When both are set, NATS does + TLS termination itself; otherwise the + listener is plain WS and MUST be fronted by + TLS (Caddy / nginx) before public exposure. + +After running this, use manage_tenants.sh to create tenants. + +Security notes for --enable-websocket: + * The compose port for the WS listener is provided via + infra/docker-compose-nats-websocket.yml (loopback-bound by default). + Combine it with the main compose file: + docker compose -f infra/docker-compose-multitenant-nats.yml \ + -f infra/docker-compose-nats-websocket.yml up -d + * Do NOT change the loopback binding without putting TLS in front; without + TLS, NATS JWTs travel in cleartext. +USAGE exit 1 } # Parse arguments while [[ $# -gt 0 ]]; do case "$1" in - --nats-host) NATS_HOST="$2"; shift 2 ;; - --nats-port) NATS_PORT="$2"; shift 2 ;; - -h|--help) usage ;; - *) echo "Unknown option: $1"; usage ;; + --nats-host) NATS_HOST="$2"; shift 2 ;; + --nats-port) NATS_PORT="$2"; shift 2 ;; + --enable-websocket) ENABLE_WEBSOCKET=1; shift ;; + --websocket-port) WS_PORT="$2"; shift 2 ;; + --websocket-allowed-origins) WS_ALLOWED_ORIGINS="$2"; shift 2 ;; + --websocket-tls-cert) WS_TLS_CERT="$2"; shift 2 ;; + --websocket-tls-key) WS_TLS_KEY="$2"; shift 2 ;; + -h|--help) usage ;; + *) echo "Unknown option: $1"; usage ;; esac done +# TLS pair: both or neither. +if { [ -n "$WS_TLS_CERT" ] && [ -z "$WS_TLS_KEY" ]; } || \ + { [ -z "$WS_TLS_CERT" ] && [ -n "$WS_TLS_KEY" ]; }; then + echo "Error: --websocket-tls-cert and --websocket-tls-key must be used together." + exit 1 +fi + if [ -z "$NATS_HOST" ]; then echo "Error: --nats-host is required" echo "" @@ -112,6 +161,43 @@ http_port: 8222 max_payload: 8MB EOF +# Optional: WebSocket listener for browser-based devices. +# Operator-mode JWT auth applies identically to WS and TCP clients; this +# block adds a transport, not a new auth path. +if [ "$ENABLE_WEBSOCKET" -eq 1 ]; then + { + echo "" + echo "# WebSocket listener (added by --enable-websocket)." + echo "# Browsers reach NATS via this listener; same JWT auth as TCP." + echo "websocket {" + echo " port: ${WS_PORT}" + if [ -n "$WS_TLS_CERT" ] && [ -n "$WS_TLS_KEY" ]; then + echo " tls {" + echo " cert_file: \"${WS_TLS_CERT}\"" + echo " key_file: \"${WS_TLS_KEY}\"" + echo " }" + else + echo " # Plain WS. The compose override binds this to 127.0.0.1 only;" + echo " # a reverse proxy (Caddy/nginx) MUST terminate TLS before this" + echo " # port is exposed to the network." + echo " no_tls: true" + fi + if [ -n "$WS_ALLOWED_ORIGINS" ]; then + origins_json=$(echo "$WS_ALLOWED_ORIGINS" | awk -F, '{ + out=""; for (i=1; i<=NF; i++) { + gsub(/^[ \t]+|[ \t]+$/, "", $i); + out = out (i>1 ? ", " : "") "\"" $i "\""; + } print out + }') + echo " allowed_origins: [${origins_json}]" + fi + echo " compression: true" + echo "}" + } >> "${OUTPUT_CONF}" + echo "" + echo "==> WebSocket listener enabled on port ${WS_PORT}" +fi + echo "" echo "============================================" echo " Deployment infrastructure ready!" From a49ffa515a72f3212c1adc3157e7a52b99f512b1 Mon Sep 17 00:00:00 2001 From: Alexander Tsyplikhin Date: Thu, 28 May 2026 23:00:25 +0000 Subject: [PATCH 43/44] security_infra: address PR review (port-pair, empty origins, port validation) Three small follow-ups to the initial commit, all in service of avoiding silent footguns when the operator strays from the default settings. 1. Port pair: --websocket-port writes the listener port into the NATS config; the compose override maps the host->container port via DC_NATS_WS_PORT (default 8443). If the operator passes --websocket-port 9000 without also setting DC_NATS_WS_PORT=9000, NATS listens on 9000 inside the container but compose maps to 8443 and the listener is silently unreachable. * Documented DC_NATS_WS_PORT in usage text, in the override file's own comment header, and in security_infra/README.md. * At "WebSocket listener enabled" the script now prints the exact `DC_NATS_WS_PORT= docker compose ...` invocation including the port value the operator passed, so the two can't drift. 2. Empty tokens in --websocket-allowed-origins: "a.com,,b.com" or a trailing comma previously produced an empty quoted entry in allowed_origins. Trim each token, skip empty ones, and skip emitting the line entirely if the input collapses to nothing after trimming. 3. Numeric validation: --websocket-port and --nats-port now require a numeric value. A typo (--websocket-port 84as3) is caught at the arg-parse stage with a clear error instead of flowing into the config and only surfacing at `nats-server -t`. Verified: all three fixes in isolation, plus a compose-merge with DC_NATS_WS_PORT=9000 DC_NATS_WS_BIND=127.0.0.1:9443 correctly produces "target: 9000, published: 9443, host_ip: 127.0.0.1". Co-Authored-By: Claude Opus 4.7 (1M context) --- .../infra/docker-compose-nats-websocket.yml | 21 +++++-- .../security_infra/README.md | 9 ++- .../security_infra/setup_deployment.sh | 56 ++++++++++++++++--- 3 files changed, 70 insertions(+), 16 deletions(-) diff --git a/packages/device-connect-server/infra/docker-compose-nats-websocket.yml b/packages/device-connect-server/infra/docker-compose-nats-websocket.yml index 837e738..46dba3c 100644 --- a/packages/device-connect-server/infra/docker-compose-nats-websocket.yml +++ b/packages/device-connect-server/infra/docker-compose-nats-websocket.yml @@ -18,11 +18,22 @@ # TLS termination on the host before the port is exposed to the network -- # without TLS, NATS JWTs travel in cleartext. # -# To deliberately expose the WS listener on a different interface, -# override DC_NATS_WS_BIND, e.g.: -# DC_NATS_WS_BIND=127.0.0.1:8443 docker compose -f ... up -d # default -# DC_NATS_WS_BIND=10.0.0.5:8443 docker compose -f ... up -d # LAN only -# Never set it to 0.0.0.0 without TLS termination. +# Env vars (read at bring-up time, both default to 8443): +# DC_NATS_WS_PORT The container-side port. MUST match the port that +# setup_deployment.sh --websocket-port wrote into the +# nats config. If they drift, NATS listens on one +# port and the host maps a different one, and the +# listener is silently unreachable. +# DC_NATS_WS_BIND The host-side bind address + port. Defaults to +# 127.0.0.1:8443. Examples: +# DC_NATS_WS_BIND=127.0.0.1:8443 # loopback only +# DC_NATS_WS_BIND=10.0.0.5:8443 # LAN only +# Never set it to 0.0.0.0 without TLS termination. +# +# Typical invocation (one-liner -- setup_deployment.sh prints this for you): +# DC_NATS_WS_PORT=8443 docker compose \ +# -f infra/docker-compose-multitenant-nats.yml \ +# -f infra/docker-compose-nats-websocket.yml up -d services: nats: diff --git a/packages/device-connect-server/security_infra/README.md b/packages/device-connect-server/security_infra/README.md index d0a45f6..b652b65 100644 --- a/packages/device-connect-server/security_infra/README.md +++ b/packages/device-connect-server/security_infra/README.md @@ -317,13 +317,16 @@ Browsers can't speak NATS over raw TCP — they need WebSocket. `setup_deploymen --websocket-allowed-origins https://lights.example.com ``` -Then bring NATS up with the WebSocket compose override so the port is exposed on the host: +Then bring NATS up with the WebSocket compose override so the port is exposed on the host. The container-side port is read from `DC_NATS_WS_PORT` (default `8443`) and **must match the `--websocket-port` you passed to `setup_deployment.sh`** — they're written to different files and would otherwise drift silently: ```bash -docker compose -f infra/docker-compose-multitenant-nats.yml \ - -f infra/docker-compose-nats-websocket.yml up -d +DC_NATS_WS_PORT=8443 \ + docker compose -f infra/docker-compose-multitenant-nats.yml \ + -f infra/docker-compose-nats-websocket.yml up -d ``` +`setup_deployment.sh` prints the exact one-liner (with the value you passed) at the end of its run so you can copy-paste it. + The override binds the port to `127.0.0.1:8443` by default. **Plain WS on the listener is intentional**: the assumption is that a reverse proxy (Caddy, nginx, Cloudflare Tunnel, ...) terminates TLS and proxies to loopback. Without TLS in front, NATS JWTs travel in cleartext. To skip the reverse proxy and have NATS do TLS termination natively, pass `--websocket-tls-cert` and `--websocket-tls-key` to `setup_deployment.sh`. The resulting block uses `tls { ... }` instead of `no_tls: true`, and you can safely expose the port directly (override `DC_NATS_WS_BIND=0.0.0.0:8443` in the env when running compose). diff --git a/packages/device-connect-server/security_infra/setup_deployment.sh b/packages/device-connect-server/security_infra/setup_deployment.sh index 2b17cf2..c0bad5f 100755 --- a/packages/device-connect-server/security_infra/setup_deployment.sh +++ b/packages/device-connect-server/security_infra/setup_deployment.sh @@ -67,9 +67,15 @@ After running this, use manage_tenants.sh to create tenants. Security notes for --enable-websocket: * The compose port for the WS listener is provided via infra/docker-compose-nats-websocket.yml (loopback-bound by default). - Combine it with the main compose file: - docker compose -f infra/docker-compose-multitenant-nats.yml \ - -f infra/docker-compose-nats-websocket.yml up -d + Combine it with the main compose file. The container-side WS port is + read from DC_NATS_WS_PORT (default 8443) and MUST match + --websocket-port -- otherwise NATS listens on one port and compose + maps a different one, silently leaving the listener unreachable. + DC_NATS_WS_PORT=8443 docker compose \ + -f infra/docker-compose-multitenant-nats.yml \ + -f infra/docker-compose-nats-websocket.yml up -d + (The "WebSocket listener enabled" message at the end of this script + prints the exact invocation including the value you passed.) * Do NOT change the loopback binding without putting TLS in front; without TLS, NATS JWTs travel in cleartext. USAGE @@ -98,6 +104,17 @@ if { [ -n "$WS_TLS_CERT" ] && [ -z "$WS_TLS_KEY" ]; } || \ exit 1 fi +# Port arguments must be numeric -- a typo like `--websocket-port 84as3` +# would otherwise flow into the generated config and only fail later. +if ! [[ "$NATS_PORT" =~ ^[0-9]+$ ]]; then + echo "Error: --nats-port must be numeric (got: ${NATS_PORT})." + exit 1 +fi +if [ "$ENABLE_WEBSOCKET" -eq 1 ] && ! [[ "$WS_PORT" =~ ^[0-9]+$ ]]; then + echo "Error: --websocket-port must be numeric (got: ${WS_PORT})." + exit 1 +fi + if [ -z "$NATS_HOST" ]; then echo "Error: --nats-host is required" echo "" @@ -183,19 +200,42 @@ if [ "$ENABLE_WEBSOCKET" -eq 1 ]; then echo " no_tls: true" fi if [ -n "$WS_ALLOWED_ORIGINS" ]; then + # Trim each token and skip empties so "a.com,,b.com" or a trailing + # comma doesn't produce a stray "" entry in allowed_origins. origins_json=$(echo "$WS_ALLOWED_ORIGINS" | awk -F, '{ - out=""; for (i=1; i<=NF; i++) { - gsub(/^[ \t]+|[ \t]+$/, "", $i); - out = out (i>1 ? ", " : "") "\"" $i "\""; - } print out + out=""; first=1; + for (i=1; i<=NF; i++) { + tok = $i; + gsub(/^[ \t]+|[ \t]+$/, "", tok); + if (tok == "") continue; + out = out (first ? "" : ", ") "\"" tok "\""; + first = 0; + } + print out }') - echo " allowed_origins: [${origins_json}]" + if [ -n "$origins_json" ]; then + echo " allowed_origins: [${origins_json}]" + fi fi echo " compression: true" echo "}" } >> "${OUTPUT_CONF}" echo "" echo "==> WebSocket listener enabled on port ${WS_PORT}" + echo "" + echo " Bring up the compose stack with BOTH files and pass the WS port" + echo " as DC_NATS_WS_PORT so the host->container mapping matches the" + echo " listener port written into the config (they live in different" + echo " files and would otherwise drift silently):" + echo "" + echo " DC_NATS_WS_PORT=${WS_PORT} \\" + echo " docker compose \\" + echo " -f infra/docker-compose-multitenant-nats.yml \\" + echo " -f infra/docker-compose-nats-websocket.yml up -d" + echo "" + echo " To bind the host port somewhere other than 127.0.0.1, also set" + echo " DC_NATS_WS_BIND=10.0.0.5:${WS_PORT} (LAN only, never 0.0.0.0" + echo " without TLS termination in front)." fi echo "" From c41f890ee118a08ef4472b196d1e71b64d0f6884 Mon Sep 17 00:00:00 2001 From: Alexander Tsyplikhin Date: Fri, 29 May 2026 15:11:06 +0000 Subject: [PATCH 44/44] security_infra: preserve appended directives on tenant-config regen manage_tenants.sh regenerate_nats_config() (used by create / add-device / reload-nats) reran `nsc generate config` and re-appended only listen + http_port, silently dropping every other directive below the "# Device Connect additions" marker -- including the websocket {} block this PR adds and the max_payload tuning. In production this took the browser WebSocket listener offline whenever a device was added: phones got a 502 on wss://.../nats and never registered. Capture the existing additions tail before regeneration and restore it afterward, falling back to the default listen/http_port only when no prior block exists. This keeps the WebSocket listener (and any other appended server config) alive across routine tenant operations. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../security_infra/manage_tenants.sh | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/packages/device-connect-server/security_infra/manage_tenants.sh b/packages/device-connect-server/security_infra/manage_tenants.sh index 23b9d6f..44a0a0b 100755 --- a/packages/device-connect-server/security_infra/manage_tenants.sh +++ b/packages/device-connect-server/security_infra/manage_tenants.sh @@ -60,13 +60,28 @@ usage() { regenerate_nats_config() { local output="${SCRIPT_DIR}/nats-jwt-generated.conf" + # Preserve any previously-appended server directives (listen, http_port, + # max_payload, the websocket {} block, native tls {}, ...) across this + # regeneration. `nsc generate config` rewrites the file from scratch, so + # without this it silently drops everything below the marker -- which has + # taken the browser WebSocket listener (added via setup_deployment.sh + # --enable-websocket) offline in production every time a tenant or device + # was added. All appended directives live below the marker line. + local additions="" + if [ -f "${output}" ] && grep -q '^# Device Connect additions' "${output}"; then + additions=$(sed -n '/^# Device Connect additions/,$p' "${output}") + fi nsc generate config --mem-resolver --config-file "${output}" 2>/dev/null - cat >> "${output}" <> "${output}" + else + cat >> "${output}" <