Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
508e2e1
feat(registry): paginate discovery/listDevices to bound NATS payloads
atsyplikhin May 21, 2026
eb70ca9
lint: move _DEFAULT_LIST_PAGE_SIZE below imports (ruff E402)
atsyplikhin May 21, 2026
2349130
feat(registry): cap listDevices replies server-side to prevent NATS p…
atsyplikhin May 21, 2026
6193c3e
feat(portal): lazy-load device detail rows on the live dashboard
atsyplikhin May 21, 2026
a070061
feat(edge,registry): break the registration herd at fleet scale
atsyplikhin May 21, 2026
8eb1437
fix(registry,tests): etcd pool enlargement compatible with etcd3gw 2.…
atsyplikhin May 21, 2026
0673652
fix(edge): make @on subscription behavior in portal mode less confusing
atsyplikhin May 21, 2026
88f6668
perf(portal): cache the NATS invoke client + log RPC timing
atsyplikhin May 21, 2026
09c85aa
fix(portal): load device-detail rows on click, not on htmx revealed
atsyplikhin May 21, 2026
38a5117
fix(portal): refresh header counters on every dashboard poll
atsyplikhin May 21, 2026
79247e6
fix(registry): preserve legacy unpaged reply when limit is absent
atsyplikhin May 21, 2026
c6049de
fix(portal): lazy-init invoke lock, best-effort close stale NATS client
atsyplikhin May 21, 2026
614823f
docs(registry): ACL+pagination caveat; warn on etcd3gw session mismatch
atsyplikhin May 21, 2026
52aec09
fix(registry,portal,edge): address review feedback on pagination PR
atsyplikhin May 26, 2026
e234278
fix(tests): drop unused json import flagged by ruff
atsyplikhin May 26, 2026
1e581bb
fix(portal,registry,edge): address review feedback round 3
atsyplikhin May 26, 2026
f075901
fix(portal): don't capture fragment-endpoint URLs as post-login `next`
atsyplikhin May 27, 2026
1716f8d
fix(edge): use asyncio.Lock idiom in _resubscribe_after_reconnect
atsyplikhin May 27, 2026
c5ddac7
fix(portal,edge): tighten lifecycle + reconnect handling; lock review…
atsyplikhin May 27, 2026
4976a2e
fix(portal): keep live-devices poll running while a row is expanded
atsyplikhin May 27, 2026
67cface
fix(registry): warn when discovery/listDevices clamps caller's limit
atsyplikhin May 27, 2026
39d9914
fix(portal): preserve live event-log panel across the 10s poll swap
atsyplikhin May 27, 2026
725b671
feat(portal): mark UI as testing-only, not for production
atsyplikhin May 26, 2026
17dbf77
fix(portal): hoist event-log drawer out of #live-devices so the SSE
atsyplikhin May 27, 2026
747a708
fix(portal): use MutationObserver to keep rows expanded across poll swap
atsyplikhin May 27, 2026
cc454cf
debug(portal): instrument dashboard.html to trace row-collapse bug
atsyplikhin May 27, 2026
e925b0a
debug+fix(portal): unconditionally re-apply expansion + log row state
atsyplikhin May 28, 2026
46ff757
debug(portal): log row state immediately + after next animation frame
atsyplikhin May 28, 2026
f6d9619
debug(portal): bright in-row marker + fetch instrumentation
atsyplikhin May 28, 2026
f0644f8
debug(portal): fixed-position banner + slot.innerHTML readback
atsyplikhin May 28, 2026
afefd20
debug(portal): check if the row we're modifying is the painted one
atsyplikhin May 28, 2026
b55065c
debug(portal): check paint state of slot+marker and watch for reverts
atsyplikhin May 28, 2026
e8c6c72
debug(portal): check row position in tbody vs summary row
atsyplikhin May 28, 2026
351da98
fix(portal): replace table-wide htmx swap with in-place JSON poll
atsyplikhin May 28, 2026
82b0049
feat(portal): handle add/remove devices in the JSON poll
atsyplikhin May 28, 2026
1dafead
feat(portal): move event-log panel back into the device's detail row
atsyplikhin May 28, 2026
69d0923
feat(portal): full revoke for credentials (UI button + CLI command)
atsyplikhin May 28, 2026
35b7ebe
docs(portal): document devices revoke in AGENTS.md agent playbook
atsyplikhin May 28, 2026
55aeb1f
fix(portal,edge): tighten revoke failure modes; close dashboard poll …
atsyplikhin May 28, 2026
b6cc3bf
Merge remote-tracking branch 'origin/main' into feat/registry-pagination
atsyplikhin May 28, 2026
add28a6
chore: bump all three packages 0.2.3 -> 0.2.4
atsyplikhin May 28, 2026
32c814f
fix(portal): route live-fragment reload through htmx so OOB counters …
atsyplikhin May 28, 2026
3be6265
security_infra: optional WebSocket listener for browser-based devices
atsyplikhin May 28, 2026
a49ffa5
security_infra: address PR review (port-pair, empty origins, port val…
atsyplikhin May 28, 2026
c41f890
security_infra: preserve appended directives on tenant-config regen
atsyplikhin May 29, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/device-connect-agent-tools/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "device-connect-agent-tools"
version = "0.2.3"
version = "0.2.4"
description = "Framework-agnostic tools for Device Connect — discover and invoke IoT devices over NATS/Zenoh"
readme = "README.md"
requires-python = ">=3.11"
Expand Down
87 changes: 83 additions & 4 deletions packages/device-connect-edge/device_connect_edge/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ async def capture_image(self, resolution: str = "1080p") -> dict:
import json
import logging
import os
import random
import re
import time
import uuid
Expand Down Expand Up @@ -90,6 +91,44 @@ async def capture_image(self, resolution: str = "1080p") -> dict:
logger = logging.getLogger(__name__)


def _env_float(name: str, default: float) -> float:
"""Best-effort float env-var parser; falls back to default on garbage."""
raw = os.getenv(name)
if raw is None or raw == "":
return default
try:
return float(raw)
except ValueError:
return default


# Registration knobs. At fleet scale a 2s request timeout combined with N
# phones starting in lockstep produces congestion collapse on the
# registry: every queued-but-late reply triggers a retry that re-enters
# the queue. A larger timeout lets the registry catch up before any
# retry fires, and an up-front jitter spreads the initial herd so the
# registry never sees a synchronized burst in the first place. Both are
# env-tunable (jitter can be disabled by setting it to 0). The 2s jitter
# default is a compromise: it decorrelates ~1000 devices into ~500/sec
# (much better than lockstep) while staying tolerable for single-device
# development. Operators at fleet scale should bump this via
# DEVICE_CONNECT_REGISTER_JITTER=10 (or higher) to spread the herd
# further.
#
# Lease-TTL interaction: the registry creates the etcd lease at the
# moment _do_register runs, so if a slow registry takes ~timeout
# seconds to reply, the lease can be near-expired before the heartbeat
# loop emits its first beat (`run()` awaits _register before starting
# the heartbeat task). With the 15s timeout default and the 15s `ttl`
# default that race is real; it self-heals — the next heartbeat fires
# `has_lease()=False` on the registry and triggers a requestRegistration
# round-trip — but operators raising DEVICE_CONNECT_REGISTER_TIMEOUT
# (or running a stressed registry where requests routinely take >ttl/3)
# should raise `ttl` in lockstep or shorten `heartbeat_interval` so
# the first beat lands inside the lease window.
_REGISTER_REQUEST_TIMEOUT = _env_float("DEVICE_CONNECT_REGISTER_TIMEOUT", 15.0)
_REGISTER_STARTUP_JITTER = _env_float("DEVICE_CONNECT_REGISTER_JITTER", 2.0)


def build_rpc_response(id_: str, result: Any) -> bytes:
return json.dumps({"jsonrpc": "2.0", "id": id_, "result": result}).encode()
Expand Down Expand Up @@ -974,6 +1013,19 @@ async def _register(self, force: bool = False) -> None:
self._logger.debug("Registration completed by another task, skipping")
return

# Spread the herd. With 1000+ phones spinning up in lockstep
# the registry sees a single synchronized burst that times
# out most callers and amplifies into a retry storm. A small
# randomized delay before the first request decorrelates the
# arrivals; subsequent retries already have exponential
# backoff so we only jitter once per _register call.
if _REGISTER_STARTUP_JITTER > 0:
jitter = random.uniform(0, _REGISTER_STARTUP_JITTER)
self._logger.debug(
"Pre-registration jitter: sleeping %.2fs before first request", jitter,
)
await asyncio.sleep(jitter)

delay = 1 # initial retry delay in seconds
while True:
req_id = f"{self.device_id}-{int(time.time()*1000)}"
Expand All @@ -983,7 +1035,7 @@ async def _register(self, force: bool = False) -> None:
response_data = await self.messaging.request(
f"device-connect.{self.tenant}.registry",
json.dumps({"jsonrpc": "2.0", "id": req_id, "method": "registerDevice", "params": params}).encode(),
timeout=2,
timeout=_REGISTER_REQUEST_TIMEOUT,
)
self._handle_registration_reply(response_data)
# Note: device/online event is published by the registry service
Expand Down Expand Up @@ -1762,7 +1814,10 @@ async def _setup_agentic_driver(self) -> None:
if not isinstance(self._driver, DeviceDriver):
return

self._logger.info("Setting up DeviceDriver D2D capabilities")
self._logger.info(
"Setting up DeviceDriver inter-device messaging "
"(router, registry, @on subscriptions)"
)

# Create and set D2D router (inline — no orchestration dependency).
router = _RemoteInvoker(
Expand Down Expand Up @@ -1796,7 +1851,11 @@ async def _setup_agentic_driver(self) -> None:

# Set up event subscriptions
await self._driver.setup_subscriptions()
self._logger.info("DeviceDriver D2D setup complete")
registry_kind = "D2DRegistry" if self._d2d_mode else "RegistryClient"
self._logger.info(
"DeviceDriver inter-device messaging ready (registry=%s)",
registry_kind,
)

async def _teardown_agentic_driver(self) -> None:
"""Teardown DeviceDriver subscriptions if applicable."""
Expand Down Expand Up @@ -1825,10 +1884,30 @@ async def _resubscribe_after_reconnect(self) -> None:
Uses ``_subscription_lock`` to prevent concurrent invocations
from rapid reconnects.
"""
if not self._subscription_lock.acquire_nowait():
# Review notes (do not re-litigate without reading these):
#
# 1. ``asyncio.Lock`` does NOT have ``acquire_nowait()``. That
# was a latent bug in the original implementation — the
# method only exists on ``threading.Lock``. At fleet scale
# during a reconnect storm it raised ``AttributeError`` on
# every reconnect and silently killed @on resubscription.
# See commit 1716f8d.
#
# 2. The ``locked() then await acquire()`` pattern below looks
# like a TOCTOU race but is safe under single-loop asyncio:
# ``Lock.locked()`` is synchronous and ``Lock.acquire()``
# has a fast path that returns without yielding when the
# lock is free. Two concurrent callers cannot both observe
# ``locked() is False`` between the check and the take
# because there is no event-loop yield in that window.
# If you switch to a multi-loop primitive (anyio, trio,
# threading) this assumption breaks — use ``wait_for(...,
# timeout=0)`` over ``acquire()`` instead.
if self._subscription_lock.locked():
self._logger.debug("Subscription re-establishment already in progress, skipping")
return

await self._subscription_lock.acquire()
try:
delay = 1
while True:
Expand Down
53 changes: 51 additions & 2 deletions packages/device-connect-edge/device_connect_edge/drivers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1049,16 +1049,47 @@ async def wait_for_device(
def _collect_event_subscriptions(self) -> List[Dict[str, Any]]:
"""Collect all @on decorated methods.

Scans single-underscore-prefixed methods as well as public ones so
drivers can keep ``@on`` handlers conventionally private without
them silently becoming no-ops. Dunders are still skipped.

Returns:
List of subscription definitions

Review notes (do not re-litigate without reading):
- Skipping all ``_``-prefixed attrs (the original behavior)
silently dropped ``@on async def _on_foo`` handlers — Python
convention puts callbacks behind ``_`` and drivers expected
that to work. Fixed in 0673652.
- The ``_is_event_subscription`` marker check below is the
authoritative filter; the name prefix is *only* used to skip
dunders so we don't resolve descriptors like ``__class__``.
"""
subscriptions = []

# We iterate ``dir(self)`` rather than ``__dict__`` so handlers
# inherited from a base class are still picked up. The trade-off
# is that ``getattr`` here will invoke ``@property`` descriptors,
# which may have side effects on driver subclasses (the @on
# decorator only marks methods, but properties live in the same
# namespace). We swallow exceptions from the resolve step so a
# broken / lazy property never breaks subscription setup for an
# unrelated handler. ``inspect.getattr_static`` would avoid this
# entirely but also bypasses descriptors we *do* want resolved
# (classmethod / staticmethod) -- so dynamic ``getattr`` plus a
# narrow try/except is the right balance here.
for attr_name in dir(self):
if attr_name.startswith("_"):
if attr_name.startswith("__"):
continue

try:
attr = getattr(self, attr_name, None)
except Exception:
# A property raised. Not a subscription candidate (the
# @on decorator marks methods, not descriptors) so skip
# silently rather than failing the whole driver.
continue

attr = getattr(self, attr_name, None)
if attr is None or not callable(attr):
continue

Expand Down Expand Up @@ -1225,6 +1256,24 @@ async def _setup_subscription(self, sub: Dict[str, Any]) -> None:

logger.info("[%s] Subscribing to: %s", self_id, subject)

# device_type filtering relies on the D2D peer cache to resolve the
# source device's type. In portal/registry mode there is no peer
# cache, so the cache miss path passes the event through unfiltered.
# Warn once at setup so subscribers don't silently see events from
# other device types. Strict filtering can be added in-handler.
if (
device_type
and not is_lifecycle
and getattr(self._device, "_d2d_collector", None) is None
):
logger.warning(
"[%s] @on(device_type=%r) on %s: device_type filtering is "
"best-effort in registry/portal mode. The wildcard broker "
"subject delivers every device's matching event; add an "
"in-handler type check if you need strict filtering.",
self_id, device_type, subject,
)

# Use subscribe_with_subject to get the matched subject in callback
# This allows extracting device_id from wildcard subscriptions
messaging_client = self._router._messaging
Expand Down
124 changes: 110 additions & 14 deletions packages/device-connect-edge/device_connect_edge/registry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,24 @@
import asyncio
import json
import logging
import os
import time
import uuid
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple

from device_connect_edge.messaging.base import MessagingClient
from device_connect_edge.messaging.exceptions import RequestTimeoutError

logger = logging.getLogger(__name__)

# Per-page chunk size when the client transparently iterates the full fleet.
# Sized to keep one JSON-RPC reply well under the default NATS max_payload
# of 1 MB even when device records carry rich function schemas (~10 KB each
# in the worst case observed): 100 * ~10 KB = ~1 MB, with the actual upper
# bound for typical records (~6 KB) landing at ~600 KB. Operators on
# unusually rich schemas can drop this via DEVICE_CONNECT_LIST_PAGE_SIZE.
_DEFAULT_LIST_PAGE_SIZE = int(os.getenv("DEVICE_CONNECT_LIST_PAGE_SIZE", "100"))


class RegistryClient:
"""JSON-RPC client for the device registry service.
Expand Down Expand Up @@ -157,8 +166,103 @@ async def list_devices(
self._cache, device_type, location, capabilities,
)

# Page through the registry transparently so the wire never carries
# a fleet-sized reply (NATS default max_payload is 1 MB and was
# being exceeded at ~1400 devices). Older servers that don't
# understand ``limit`` just return everything in one reply with
# ``next_offset`` absent, so the loop exits after a single
# iteration — fully backward compatible.
devices: List[Dict[str, Any]] = []
offset = 0
while True:
page, next_offset, _total = await self._list_devices_page(
device_type=device_type,
location=location,
capabilities=capabilities,
offset=offset,
limit=_DEFAULT_LIST_PAGE_SIZE,
timeout=timeout,
)
devices.extend(page)
if next_offset is None:
break
# Defense-in-depth: a buggy or future server returning a
# non-advancing cursor would loop forever otherwise. Break
# with a warning so a fleet-scale incident becomes a
# recoverable log line.
if next_offset <= offset:
logger.warning(
"Registry returned non-advancing next_offset=%s (current offset=%s); "
"stopping page walk to avoid infinite loop",
next_offset, offset,
)
break
offset = next_offset
logger.debug("Discovered %d devices from registry", len(devices))

# Update cache (store unfiltered if we fetched without filters)
if (
self._cache_ttl > 0
and device_type is None
and location is None
and not capabilities
):
self._cache = devices
self._cache_time = time.time()

return devices

async def list_devices_page(
self,
*,
offset: int = 0,
limit: int = _DEFAULT_LIST_PAGE_SIZE,
device_type: Optional[str] = None,
location: Optional[str] = None,
capabilities: Optional[List[str]] = None,
timeout: Optional[float] = None,
) -> Tuple[List[Dict[str, Any]], Optional[int], int]:
"""Fetch a single page of devices with pagination metadata.

Use this when you want to display a paged UI or stream results;
most callers should stick with :meth:`list_devices`, which loops
internally and returns the full fleet.

Returns:
``(devices, next_offset, total_matched)`` where ``next_offset``
is ``None`` on the final page.

ACL caveat:
When the registry has ACLs enabled, server-side filtering
runs *after* slicing. As a result ``len(devices)`` for a
given page may be smaller than ``limit`` even when more
pages follow, and ``total_matched`` is the unfiltered total
(before the caller's ACL applies). UIs should treat
``total_matched`` as an upper bound on what the caller will
ever see, and must not assume ``len(devices) == limit``
implies a full page.
"""
return await self._list_devices_page(
device_type=device_type,
location=location,
capabilities=capabilities,
offset=offset,
limit=limit,
timeout=timeout,
)

async def _list_devices_page(
self,
*,
device_type: Optional[str],
location: Optional[str],
capabilities: Optional[List[str]],
offset: int,
limit: int,
timeout: Optional[float],
) -> Tuple[List[Dict[str, Any]], Optional[int], int]:
subject = f"device-connect.{self._tenant}.discovery"
params: Dict[str, Any] = {}
params: Dict[str, Any] = {"offset": int(offset), "limit": int(limit)}
if device_type:
params["device_type"] = device_type
if location:
Expand All @@ -167,20 +271,12 @@ async def list_devices(
params["capabilities"] = capabilities

result = await self._request(
subject,
"discovery/listDevices",
params if params else None,
timeout,
subject, "discovery/listDevices", params, timeout,
)
devices = result.get("devices", [])
logger.debug("Discovered %d devices from registry", len(devices))

# Update cache (store unfiltered if we fetched without filters)
if self._cache_ttl > 0 and not params:
self._cache = devices
self._cache_time = time.time()

return devices
next_offset = result.get("next_offset")
total = result.get("total_matched", len(devices))
return devices, next_offset, total

async def get_device(
self,
Expand Down
2 changes: 1 addition & 1 deletion packages/device-connect-edge/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "device-connect-edge"
version = "0.2.3"
version = "0.2.4"
description = "Device Connect Edge — lightweight edge device runtime with Zenoh/NATS messaging and D2D communication"
readme = "README.md"
requires-python = ">=3.11"
Expand Down
Loading