diff --git a/CHANGELOG.md b/CHANGELOG.md index 7fdbd3d..9b243c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,139 @@ and this file MUST be updated together whenever `__version__` changes. --- +## [0.8.0-dev3] — 2026-06-01 + +### Added — Subject taxonomy + `DedupStore` + `ReflexContext` (foundation for multi-source sensing) + +Third sub-step of the brain refactor. Locks in two things that every +subsequent sub-step depends on: the **NATS subject taxonomy** and the +**dedup contract**. No new publishers and no behavior change in +production yet — that lands in `0.8.0-dev4`. This PR is intentionally a +pure refactor so the foundations can be reviewed in isolation. + +#### Subject taxonomy + +`docs/architecture/subjects.md` is the new authoritative spec. +Companion machine-readable constants in +`netcortex/contracts/subjects.py`: + +* `SENSORY_EVENT_CLASSES` — closed vocabulary (`link_down`, `link_up`, + `bgp_drop`, `bgp_up`, `device_reboot`, `device_unreachable`, + `device_reachable`, `security_alert`, `config_change`, + `topology_change`, `route_advertisement_change`). Adding a class + requires a doc + constant change in the same PR. +* `SENSORY_SOURCES` — `_` tokens + (`snmp_trap`, `snmp_poll`, `meraki_webhook`, `gnmi_dialout`, …). +* `sensory_subject(event_class, source, *target_parts)` — validated + builder. Refuses unknown classes/sources, empty parts, embedded + dots, whitespace. +* `parse_sensory_subject(subject)` — inverse extractor; handlers use + it to derive the `fact_key` from the incoming subject. + +**Why event-class-first**: `sensory...` +lets a single handler subscribe to `sensory.link_down.>` and catch +every source of link-down. The earlier `sensory....` +ordering forced per-source subscriptions and made the +"same-event-multiple-sources" dedup story awkward. + +#### `DedupStore` Protocol + `InMemoryDedupStore` + +`netcortex/contracts/dedup_store.py` defines the atomic check-and-record +contract. `netcortex/working/dedup/in_memory.py` ships the only 0.8.0 +implementation: + +* Asyncio-safe (lock-protected mutations). +* TTL-bounded with lazy expired-entry sweep (bounded budget per call + so tail latency is predictable). +* Size-bounded with LRU eviction so a misbehaving publisher cannot OOM. +* Accepts an injectable clock for fast deterministic unit tests. +* Cap warnings and long-TTL warnings on long-lived state that would + mask flap behavior. + +Redis-backed implementation lands in 0.9.0 alongside working memory. +Contract tests (9 cases) parametrize over every registered +implementation — Redis only needs a factory function and a registry +row to gain full coverage when it arrives. + +#### `ReflexContext` (handler dependency injection) + +`netcortex.reflex.protocol.ReflexContext` is the runtime dependency +bag every handler receives on `handle(event, ctx)`. Frozen dataclass, +all fields optional, new resources added by appending fields so old +handlers are unaffected. + +* `ctx.dedup_store: DedupStore | None` — 0.8.0 +* `ctx.semantic_memory`, `ctx.working_memory`, `ctx.policy_engine`, + … — appended in later releases + +The `ReflexRunner` owns one `ReflexContext` (default-constructed if +not supplied) and threads it through every dispatch. Existing failure- +isolation and lifecycle behavior unchanged. + +#### Handler refactor — new patterns + dedup logic + +| Handler | Old pattern (dev2) | New pattern (dev3) | Dedup window | +|---|---|---|---| +| `link_down` | `sensory.snmp.trap.link_down.>` | `sensory.link_down.>` | 60 s | +| `bgp_drop` | `sensory.snmp.trap.bgp_backward_transition.>` | `sensory.bgp_drop.>` | 60 s | +| `security_alert` (renamed from `security_webhook`) | `sensory.meraki.webhook.security.>` | `sensory.security_alert.>` | 300 s | + +Renaming `security_webhook` → `security_alert` because the new handler +is source-agnostic (Meraki today, Cisco AMP / future SIEM tomorrow); +the file moved from `security_webhook.py` → `security_alert.py`. The +operator-facing handler id is renamed accordingly. + +Each handler now constructs a `fact_key = "|"` +(plus `event_type` for `security_alert`) and consults `ctx.dedup_store` +when present. Duplicates return `outcome="skipped"` with the dedup +rationale; first arrivals return `outcome="logged"` as before. Severity +is intentionally demoted to `info` on skipped outcomes — they are +corroboration telemetry, not a second incident. + +**Known limitation in 0.8.0**: a real flap (down/up/down within one +window) collapses to a single fact. Tracked: the fusion stage in +0.9.0 handles state transitions explicitly. See subjects.md "Dedup +model" section. + +#### Tests + +* `tests/contracts/dedup_store/test_dedup_store_contract.py` — 9 cases + parametrized over every registered store implementation + (atomicity-under-concurrency, TTL expiry, empty-key rejection, + non-positive-TTL rejection, close idempotency, use-after-close raises). +* `tests/contracts/test_subjects.py` — 13 cases for the taxonomy + builders, parser, vocabulary integrity checks. +* `tests/working/dedup/test_in_memory.py` — 6 cases for the in-memory + store specifics (LRU eviction, lazy sweep, fake clock, ctor + validation, close clears state). +* `tests/reflex/test_handlers.py` — updated for new patterns + 5 new + dedup cases (cross-source dedup for `link_down`, different-target + independence, missing-target skip-dedup, Meraki retry dedup for + `security_alert`, distinct-event-type-no-dedup, trap+gnmi dedup for + `bgp_drop`). +* `tests/reflex/test_runner.py` — updated for new signature + 2 new + cases (default-context wiring, explicit-context threading). +* `tests/reflex/test_registry.py` — updated for new handler signature + on the stub. + +### Breaking — pre-release only + +* `ReflexHandler.handle(event)` → `ReflexHandler.handle(event, ctx)` — + every handler implementation must take the context. The three + first-party handlers were updated in this PR; no external consumers + exist yet. +* Handler ids: `security_webhook` → `security_alert`. The dev2 release + never persisted these to anywhere stable, so this rename is + cost-free; future renames after publishers exist will require the + dual-publish dance described in subjects.md. + +### Not yet wired + +* Still no publishers. Pollers continue to call correlator + writeback + directly. The first dual-write publisher lands in `0.8.0-dev4`. +* Outcomes are logged only — Neo4j `:ReflexEvent` persistence + NetBox + journal mirror also land in `0.8.0-dev4`. + ## [0.8.0-dev2] — 2026-06-01 ### Added — Reflex skeleton: registry + runner + 3 first-party handlers diff --git a/docs/architecture/subjects.md b/docs/architecture/subjects.md new file mode 100644 index 0000000..564bb1a --- /dev/null +++ b/docs/architecture/subjects.md @@ -0,0 +1,177 @@ +# NATS Subject Taxonomy + +The bus (Thalamus, see [`brain.md`](./brain.md)) carries every observation, +every derived fact, every reflex outcome, and every motor action through a +single shared subject namespace. The shape of that namespace determines +how easily handlers can subscribe to **all sources of one thing** versus +**all things from one source** — those two needs pull in different +directions, and the taxonomy below is what we landed on. + +## Top-level namespaces + +| Namespace | Who publishes | Who subscribes | Lifetime | +|---|---|---|---| +| `sensory.>` | Receivers (webhook / trap / telemetry) and pollers (SNMP, Meraki API, etc.) | Resolver (Stage 2), fusion (Stage 3), reflex (Stage 4 fallback), episodic memory | Raw observations, retained briefly in JetStream | +| `fact.>` (0.9.0+) | Fusion stage, after dedup + corroboration | Reflex, working memory, semantic memory | Derived facts, retained longer | +| `reflex.>` | Reflex handlers | UI, episodic memory, downstream agents | Handler outcomes | +| `motor.>` (future) | Conductor / agents | Action executors, audit, semantic memory | Outbound actions | +| `consolidation.>` (future) | Consolidation cycles | Semantic memory, pattern memory | Bulk derived state | + +Anything published outside these four namespaces will be rejected by the +receiver-side validators once those are in place; for now (0.8.0) it is +strongly discouraged. + +## `sensory...` + +Event-class first. This is the **only** ordering that lets a single +handler subscribe to "all link-down observations regardless of where +they came from" with a NATS wildcard. + +### `` — a closed vocabulary + +| Class | Meaning | Typical sources | +|---|---|---| +| `link_down` | An interface transitioned to operationally-down | SNMP linkDown trap, SNMP poll diff, Meraki webhook, gNMI dial-out | +| `link_up` | An interface transitioned to operationally-up | same | +| `bgp_drop` | A BGP session entered a non-Established state | BGP4-MIB trap, CISCO-BGP4-MIB trap, gNMI BGP neighbor sample | +| `bgp_up` | A BGP session entered Established | same | +| `device_reboot` | A device restarted | coldStart trap, sysUpTime reset on poll, Meraki status webhook | +| `device_unreachable` | A device stopped responding to our reachability probes | poller probe failure, Meraki status webhook | +| `device_reachable` | The inverse | same | +| `security_alert` | A security-class event observed | Meraki webhook (IDS/malware/blocked-URL), Cisco AMP webhook, future SIEM | +| `config_change` | A device configuration changed | Meraki webhook, NETCONF change notification, RANCID-style diff | +| `topology_change` | A new neighbor appeared or an existing one disappeared | LLDP/CDP poll diff | +| `route_advertisement_change` | Advertised prefix set changed on a session | BGP RIB poll diff | + +New classes are added by amending this table **and** the +`SENSORY_EVENT_CLASSES` constant in `netcortex/contracts/event_bus.py` +in the same PR. Reviewers reject PRs that grow one without the other. + +### `` — `_` joined as one token + +Why one token: NATS wildcards match exactly one token (`*`) or trailing +greedy (`>`). Compound sources as separate tokens would make +"all snmp sources" subscriptions require multiple subscriptions. + +| Modality | Source token | Notes | +|---|---|---| +| SNMP | `snmp_trap`, `snmp_poll`, `snmp_walk` | `_walk` reserved for bulk MIB walks distinct from per-OID polls | +| HTTP webhook | `meraki_webhook`, `thousandeyes_webhook`, `cisco_amp_webhook`, `catalyst_center_webhook` | Source names match the upstream platform | +| Streaming telemetry | `gnmi_dialout`, `gnmi_dialin`, `netconf_yangpush`, `cisco_mdt` | | +| API poll | `meraki_api`, `intersight_api`, `vsphere_api`, `fmc_api`, `nexus_dashboard_api` | One per platform adapter | +| Synthetic | `netcortex_inference` | When the system itself derives an observation (rare; prefer `fact.*`) | + +### `` — canonical identifiers, dot-separated tokens + +Targets are the entities the event is about. Multi-part keys use `|` +(URL-safe, unambiguous) **within** a token, dots between tokens: + +| Event class | Target shape | Example | +|---|---|---| +| `link_down`, `link_up` | `\|` | `sensory.link_down.snmp_trap.cpn-ful-cat9k1\|Gi1/0/12` | +| `bgp_drop`, `bgp_up` | `\|` | `sensory.bgp_drop.gnmi_dialout.cpn-ful-cat8k1\|10.0.1.5` | +| `device_*` | `` | `sensory.device_reboot.snmp_trap.cpn-ful-cat9k1` | +| `security_alert` | `\|` | `sensory.security_alert.meraki_webhook.N_1\|aa:bb:cc:dd:ee:ff` | +| `config_change` | `` or `\|
` | `sensory.config_change.meraki_webhook.Q2XX-YYYY-ZZZZ` | +| `topology_change` | `\|\|\|` | `sensory.topology_change.snmp_poll.r1\|Gi0/1\|r2\|Gi0/24` | +| `route_advertisement_change` | `\|` | `sensory.route_advertisement_change.snmp_poll.r1\|10.0.1.5` | + +**Canonicalization happens before publish.** Receivers publish with the +identifier shape they observed (raw IP, MAC, serial, ifIndex). The +resolver stage (0.9.0+) canonicalizes to NetBox-blessed names by querying +semantic memory and re-publishes under the same subject with the +canonical target. Until then (0.8.0), best-effort canonicalization +happens inline in the receiver; un-canonicalized observations still pass +through, they just may dedup imperfectly. + +## `fact..` (lands in 0.9.0) + +After the fusion stage dedups same-event-different-source observations +within a per-class time window, the surviving fact is republished under +`fact.>`. Reflex handlers eventually move from `sensory.*` subscription +to `fact.*` subscription, gaining: + +* one-fire-per-real-event semantics for free (no per-handler dedup code) +* corroboration metadata (which sources observed it) +* identity already canonicalized + +## `reflex..` (future) + +Reflex handlers may emit their own events back to the bus so other +consumers (UI live tail, downstream automation, episodic memory) can +react. Not used in 0.8.0-dev3 but the namespace is reserved. + +Examples: + +* `reflex.link_down.applied.cpn-ful-cat9k1|Gi1/0/12` +* `reflex.bgp_drop.skipped.cpn-ful-cat8k1|10.0.1.5` (skipped because deduped) +* `reflex.security_webhook.errored.N_1|aa:bb:cc:dd:ee:ff` + +## `motor...` (future) + +Reserved for actuation. Not in scope until the policy library + write-gate +land. + +## Dedup model (0.8.0) + +Same-event-multiple-sources is handled at the **reflex handler layer** +using a `DedupStore` (in-memory in 0.8.0, Redis in 0.9.0+, see +[`netcortex.contracts.dedup_store.DedupStore`](../../netcortex/contracts/dedup_store.py)). +The fact key is intentionally short and scoped by event class so two +handlers reacting to the same target on different conditions (e.g. +`link_down` and `link_up`) do not collide: + +``` +fact_key = f"{event_class}|{canonical_target}" +``` + +The store enforces a TTL window per call. First arrival succeeds and +records the key with TTL = `handler.dedup_window_seconds`. Any later +arrival of the same `fact_key` before TTL expiry is treated as a +duplicate and the handler returns a `skipped` outcome (still recorded, +so operators see corroboration in the UI). + +**Known limitation in 0.8.0:** a real flap whose down→up→down +transitions all land within one window collapses to one fire. We accept +this because flap detection is a working-memory concern (0.9.0) and +adding a transition counter in 0.8.0 would require publishers to emit +one, which they cannot uniformly do. The fusion stage in 0.9.0 tracks +state transitions explicitly. + +Per-handler defaults: + +| Handler | `dedup_window_seconds` | Rationale | +|---|---|---| +| `link_down` | 60 | A real flap surfaces as multiple facts; a duplicate-source surfaces once | +| `bgp_drop` | 60 | Same as link_down — BGP sessions don't flap meaningfully faster | +| `security_webhook` | 300 | Meraki retries delivery; same alert may arrive 2-3 times within minutes | + +Handlers can override per-event by computing a different `fact_key` (e.g., +include a `transition_id` from the payload) — see each handler's docstring. + +## Wildcards and worked subscriptions + +Common reflex subscriptions and what they catch: + +```text +sensory.link_down.> # ALL link-down regardless of source +sensory.link_down.snmp_trap.> # link-down ONLY from SNMP traps +sensory.link_down.*.cpn-ful-cat9k1.* # link-down from any source for one device +sensory.bgp_drop.> # ALL bgp drops +sensory.security_alert.meraki_webhook.> # Meraki security alerts only +sensory.*.snmp_trap.> # Everything from any SNMP trap source +sensory.> # Firehose (episodic memory only) +fact.link_down.> # 0.9.0+: post-fusion link-down facts +``` + +## Versioning and breaking changes + +Subject names are part of the **operator-facing contract**. Renaming an +event class or source token is a breaking change and requires: + +1. A `BREAKING CHANGE:` note in the changelog +2. Dual-publish under both old and new subjects for one minor version +3. A deprecation warning in the CHANGELOG entry +4. Removal of the old subject in the version after that + +Adding a new event class or source token is **not** a breaking change. diff --git a/netcortex/__init__.py b/netcortex/__init__.py index 6dc8033..ed242a3 100644 --- a/netcortex/__init__.py +++ b/netcortex/__init__.py @@ -22,4 +22,4 @@ ``CHANGELOG.md`` MUST be kept in sync whenever ``__version__`` changes. """ -__version__ = "0.8.0-dev2" +__version__ = "0.8.0-dev3" diff --git a/netcortex/contracts/__init__.py b/netcortex/contracts/__init__.py index 78aea96..f911429 100644 --- a/netcortex/contracts/__init__.py +++ b/netcortex/contracts/__init__.py @@ -33,12 +33,14 @@ from __future__ import annotations +from netcortex.contracts.dedup_store import DedupStore from netcortex.contracts.event_bus import EventBus, EventBusValidationError, EventMessage from netcortex.contracts.policy import Decision, Policy, PolicyContext from netcortex.contracts.sensory_adapter import SensoryAdapter, SensoryEvent __all__ = [ "Decision", + "DedupStore", "EventBus", "EventBusValidationError", "EventMessage", diff --git a/netcortex/contracts/dedup_store.py b/netcortex/contracts/dedup_store.py new file mode 100644 index 0000000..b042b6d --- /dev/null +++ b/netcortex/contracts/dedup_store.py @@ -0,0 +1,91 @@ +"""``DedupStore`` Protocol — time-windowed deduplication for sensory events. + +When the same real-world event arrives via multiple modalities (trap + +webhook + poll), reflex handlers consult a ``DedupStore`` to avoid firing +once per source. The store's only job is atomic check-and-record over a +TTL window. + +Implementations +--------------- +* ``netcortex.working.dedup.in_memory.InMemoryDedupStore`` — single-process, + asyncio-safe, used everywhere in 0.8.0 (reflex runner, tests, dev) and + the only implementation in CI. +* ``netcortex.working.dedup.redis.RedisDedupStore`` (lands in 0.9.0) — + multi-replica-safe, uses ``SET key 1 NX EX ttl`` for atomic insert. + +Both implementations conform to the same contract tests in +``tests/contracts/dedup_store/`` — Redis-backed tests skip unless +``REDIS_URL`` is set in the environment, matching the pattern dev1 +established for ``NatsEventBus``. + +Atomicity guarantee +------------------- +``record_unless_duplicate`` MUST be atomic with respect to other concurrent +calls for the same ``fact_key``. Without this, two concurrent observations +could both pass the check and both fire the reflex — which is the exact +race we're trying to suppress. + +Redis gives us this for free via ``SET NX``. The in-memory implementation +uses an ``asyncio.Lock`` per call. Future implementations must document +their atomicity story. + +Cross-process atomicity is only required of multi-replica stores; the +in-memory store is by definition single-process and operators MUST NOT +deploy multi-replica reflex runners with the in-memory store. The +runtime config that picks the store is responsible for enforcing this. +""" + +from __future__ import annotations + +from typing import Protocol, runtime_checkable + + +@runtime_checkable +class DedupStore(Protocol): + """Atomic check-and-record over a TTL window, keyed by string. + + The store is not a general key-value cache — it intentionally exposes + only the one operation the reflex layer needs, so adding new + implementations is a small, well-scoped task. + """ + + async def record_unless_duplicate( + self, + fact_key: str, + *, + ttl_seconds: float, + ) -> bool: + """Atomically record ``fact_key`` if it is not already present. + + Returns ``True`` when this call recorded the key (i.e. the event + is **new** within the current TTL window — the caller should + proceed). Returns ``False`` when the key was already present + (the event is a **duplicate** — the caller should short-circuit). + + ``ttl_seconds`` MUST be > 0. The store rounds sub-millisecond + precision down. Very long TTLs (> 1 day) are accepted but a + warning is logged because long-lived dedup state tends to mask + real flap behavior. + + The atomicity guarantee applies per ``fact_key`` only. Different + ``fact_key`` values may interleave freely. + + Raises + ------ + ValueError + If ``fact_key`` is empty or ``ttl_seconds`` <= 0. + """ + ... + + async def close(self) -> None: + """Release any underlying resources. + + Idempotent: a second call is a no-op. After ``close()`` further + calls to ``record_unless_duplicate`` raise ``RuntimeError``. + In-memory implementations make this trivial; Redis-backed + implementations close their connection pool here. + """ + ... + + +__all__ = ["DedupStore"] diff --git a/netcortex/contracts/subjects.py b/netcortex/contracts/subjects.py new file mode 100644 index 0000000..78f6e25 --- /dev/null +++ b/netcortex/contracts/subjects.py @@ -0,0 +1,206 @@ +"""NATS subject taxonomy — constants and helpers. + +The full specification lives in [`docs/architecture/subjects.md`](../../docs/architecture/subjects.md). +This module is the **machine-readable** half of that contract: an +authoritative enumeration of the closed event-class vocabulary, plus a +handful of small helpers for constructing and parsing subjects in a way +that matches what receivers actually emit. + +Why a closed vocabulary +----------------------- +Reflex handlers, fusion rules, and operator alerts all switch on event +class. If publishers were free to invent new classes ad-hoc, downstream +consumers would have to defensively handle unknown values everywhere. +Instead we lock the vocabulary here and force a documentation + code +change to introduce a new one — see the doc's "Versioning and breaking +changes" section. +""" + +from __future__ import annotations + +from typing import Final + +# --------------------------------------------------------------------------- +# Top-level namespaces +# --------------------------------------------------------------------------- + +SENSORY_NS: Final[str] = "sensory" +FACT_NS: Final[str] = "fact" +REFLEX_NS: Final[str] = "reflex" +MOTOR_NS: Final[str] = "motor" +CONSOLIDATION_NS: Final[str] = "consolidation" + +NAMESPACES: Final[frozenset[str]] = frozenset({ + SENSORY_NS, + FACT_NS, + REFLEX_NS, + MOTOR_NS, + CONSOLIDATION_NS, +}) + +# --------------------------------------------------------------------------- +# Closed vocabulary of event classes used after `sensory.` and `fact.` +# --------------------------------------------------------------------------- + +#: Adding to this set MUST be accompanied by a documentation update in +#: docs/architecture/subjects.md in the same PR. Reviewers reject changes +#: that grow one without the other. +SENSORY_EVENT_CLASSES: Final[frozenset[str]] = frozenset({ + "link_down", + "link_up", + "bgp_drop", + "bgp_up", + "device_reboot", + "device_unreachable", + "device_reachable", + "security_alert", + "config_change", + "topology_change", + "route_advertisement_change", +}) + +# --------------------------------------------------------------------------- +# Source-token registry (modality_provenance, single NATS token) +# --------------------------------------------------------------------------- + +#: Sources we currently know how to receive from. Add to this set when a +#: new receiver lands. Receivers ARE expected to validate their own source +#: token against this set at startup, so a typo fails fast. +SENSORY_SOURCES: Final[frozenset[str]] = frozenset({ + # SNMP + "snmp_trap", + "snmp_poll", + "snmp_walk", + # Webhooks + "meraki_webhook", + "thousandeyes_webhook", + "cisco_amp_webhook", + "catalyst_center_webhook", + # Streaming telemetry + "gnmi_dialout", + "gnmi_dialin", + "netconf_yangpush", + "cisco_mdt", + # API pollers (one per platform adapter) + "meraki_api", + "intersight_api", + "vsphere_api", + "fmc_api", + "nexus_dashboard_api", + "catalyst_center_api", + # Synthetic — when NetCortex itself derives an observation. Rare; + # most synthetic state should publish to `fact.*` instead. + "netcortex_inference", +}) + + +# --------------------------------------------------------------------------- +# Builders +# --------------------------------------------------------------------------- + + +def sensory_subject(event_class: str, source: str, *target_parts: str) -> str: + """Build a validated `sensory...` subject. + + Receivers should use this rather than f-string concatenation. The + validation catches drift between the doc, this module, and what + publishers actually emit — the kind of drift that produces + "subscriptions silently match nothing" bugs that are miserable to + debug in production. + """ + if event_class not in SENSORY_EVENT_CLASSES: + raise ValueError( + f"unknown sensory event class {event_class!r}; " + f"add to SENSORY_EVENT_CLASSES + docs/architecture/subjects.md" + ) + if source not in SENSORY_SOURCES: + raise ValueError( + f"unknown sensory source {source!r}; " + f"add to SENSORY_SOURCES + docs/architecture/subjects.md" + ) + if not target_parts: + raise ValueError( + f"sensory subject {event_class}/{source} requires at least one target part; " + f"got 0 — an empty subject tail makes subjects with wildcards ambiguous" + ) + # NATS tokens cannot contain '.' (token separator) or whitespace. + # We don't try to validate the full grammar here; we just catch the + # two cases that produce silent-but-wrong subjects. + for i, part in enumerate(target_parts): + if not part: + raise ValueError( + f"sensory subject target part {i} is empty — would produce " + f"a malformed subject like 'sensory.{event_class}.{source}..foo'" + ) + if "." in part: + raise ValueError( + f"sensory subject target part {i}={part!r} contains '.' which is " + f"the NATS token separator; use '|' to join compound identifiers" + ) + if any(ch.isspace() for ch in part): + raise ValueError( + f"sensory subject target part {i}={part!r} contains whitespace; " + f"NATS subjects must be whitespace-free" + ) + return ".".join([SENSORY_NS, event_class, source, *target_parts]) + + +def fact_subject(event_class: str, *target_parts: str) -> str: + """Build a validated `fact..` subject. + + Lands in production with the fusion stage (0.9.0). Provided here so + code written against the future fact namespace can validate today. + """ + if event_class not in SENSORY_EVENT_CLASSES: + raise ValueError( + f"unknown fact event class {event_class!r}; " + f"add to SENSORY_EVENT_CLASSES + docs/architecture/subjects.md" + ) + if not target_parts: + raise ValueError( + f"fact subject {event_class} requires at least one target part" + ) + return ".".join([FACT_NS, event_class, *target_parts]) + + +# --------------------------------------------------------------------------- +# Parser — used by handlers to extract the canonical fact_key +# --------------------------------------------------------------------------- + + +def parse_sensory_subject(subject: str) -> tuple[str, str, str]: + """Parse a sensory subject into (event_class, source, target_joined). + + `target_joined` is the remaining tokens after class+source, re-joined + with the NATS separator (``.``). Handlers typically use this to + compute their dedup fact_key. + + Returns a 3-tuple even when the subject is malformed — the caller + decides what to do. Empty fields signal "we don't know." + + Examples + -------- + >>> parse_sensory_subject("sensory.link_down.snmp_trap.r1|Gi0/1") + ('link_down', 'snmp_trap', 'r1|Gi0/1') + >>> parse_sensory_subject("sensory.security_alert.meraki_webhook.N_1.aa:bb:cc:dd:ee:ff") + ('security_alert', 'meraki_webhook', 'N_1.aa:bb:cc:dd:ee:ff') + """ + parts = subject.split(".") + if len(parts) < 4 or parts[0] != SENSORY_NS: + return ("", "", "") + return (parts[1], parts[2], ".".join(parts[3:])) + + +__all__ = [ + "CONSOLIDATION_NS", + "FACT_NS", + "MOTOR_NS", + "NAMESPACES", + "REFLEX_NS", + "SENSORY_EVENT_CLASSES", + "SENSORY_NS", + "SENSORY_SOURCES", + "fact_subject", + "parse_sensory_subject", + "sensory_subject", +] diff --git a/netcortex/reflex/__init__.py b/netcortex/reflex/__init__.py index c443a5b..5f90c73 100644 --- a/netcortex/reflex/__init__.py +++ b/netcortex/reflex/__init__.py @@ -18,7 +18,7 @@ brain-mapped architecture. """ -from netcortex.reflex.protocol import ReflexHandler, ReflexOutcome +from netcortex.reflex.protocol import ReflexContext, ReflexHandler, ReflexOutcome from netcortex.reflex.registry import ( DuplicateHandlerError, all_handlers, @@ -29,6 +29,7 @@ __all__ = [ "DuplicateHandlerError", + "ReflexContext", "ReflexHandler", "ReflexOutcome", "ReflexRunner", diff --git a/netcortex/reflex/handlers/__init__.py b/netcortex/reflex/handlers/__init__.py index 83ecadb..cea7b0f 100644 --- a/netcortex/reflex/handlers/__init__.py +++ b/netcortex/reflex/handlers/__init__.py @@ -23,4 +23,4 @@ # Keep these imports alphabetical so a diff reviewer can spot additions. from netcortex.reflex.handlers import bgp_drop # noqa: F401 from netcortex.reflex.handlers import link_down # noqa: F401 -from netcortex.reflex.handlers import security_webhook # noqa: F401 +from netcortex.reflex.handlers import security_alert # noqa: F401 diff --git a/netcortex/reflex/handlers/bgp_drop.py b/netcortex/reflex/handlers/bgp_drop.py index 1cafee2..217a55e 100644 --- a/netcortex/reflex/handlers/bgp_drop.py +++ b/netcortex/reflex/handlers/bgp_drop.py @@ -1,28 +1,21 @@ """``bgp_drop`` — reflex handler for BGP session state-down signals. -Subscribes to the BGP4-MIB ``bgpBackwardTransition`` trap (and, once the -streaming telemetry adapter lands, also ``sensory.cisco.mdt.bgp.>`` -neighbor-down samples). The fast deterministic response is to record a -session-down outcome so the deliberative loop (route convergence -analysis, prefix advertisement drift) has the wall-clock anchor. - -This module is dev2 scaffolding. When publishers land in 0.8.0-dev3+ the -handler will additionally: - -* resolve the peer IP against semantic memory's ``:BgpSession`` nodes so - the outcome carries the canonical session identifier, not just the - peer address; -* check whether the device is in a maintenance window OR the peer is a - known-flapping route-server (operator policy); -* attach a NetBox journal entry to the BGP session object (once the - reconciliation engine starts surfacing those — they are not first- - class in NetBox today, so the journal will live on the device); -* trigger a deliberative follow-up to assess prefix-advertisement - impact, comparing the last-known advertised prefix set on this - session against the post-drop topology snapshot. - -None of that is in dev2. The current handler logs and returns a -``high``-severity outcome; downstream consumers can already key off it. +Subscribes to ``sensory.bgp_drop.>`` so the same handler reacts to +every source of a BGP session backward-transition (SNMP trap, gNMI +neighbor-state sample, future RIB poll diff). + +The event class was ``bgp_backward_transition`` in dev2's draft +taxonomy — renamed to the cleaner ``bgp_drop`` in dev3's event-class- +first taxonomy. See ``docs/architecture/subjects.md``. + +Dedup +----- +A real BGP session drop typically produces one trap immediately and a +gNMI sample 1-2 seconds later. Both arrivals collapse to one fire via +the runner-supplied :class:`DedupStore`. Per-handler window default is +60 seconds — short enough that a real flap (drop, restore, drop within +a minute) collapses to one fact but a same-session re-drop after the +window fires fresh. Operators tune via constructor arg. """ from __future__ import annotations @@ -31,17 +24,13 @@ class in NetBox today, so the journal will live on the device); from typing import Final from netcortex.contracts.event_bus import EventMessage -from netcortex.reflex.protocol import ReflexOutcome +from netcortex.contracts.subjects import parse_sensory_subject +from netcortex.reflex.protocol import ReflexContext, ReflexOutcome from netcortex.reflex.registry import register_handler -# Subject pattern. -# -# Real publishers will use -# ``sensory.snmp.trap.bgp_backward_transition.`` or -# ``sensory.cisco.mdt.bgp_neighbor_state.``. For dev2 the -# handler subscribes to the SNMP trap subject only; the second -# subscription (or a glob) lands once the telemetry adapter exists. -_PATTERN: Final[str] = "sensory.snmp.trap.bgp_backward_transition.>" +_PATTERN: Final[str] = "sensory.bgp_drop.>" + +_DEFAULT_DEDUP_WINDOW_SECONDS: Final[float] = 60.0 class BgpDropHandler: @@ -50,8 +39,18 @@ class BgpDropHandler: id: Final[str] = "bgp_drop" pattern: Final[str] = _PATTERN - async def handle(self, event: EventMessage) -> ReflexOutcome | None: + def __init__( + self, + *, + dedup_window_seconds: float = _DEFAULT_DEDUP_WINDOW_SECONDS, + ) -> None: + self.dedup_window_seconds = dedup_window_seconds + + async def handle( + self, event: EventMessage, ctx: ReflexContext + ) -> ReflexOutcome | None: payload = event.payload + event_class, source, _ = parse_sensory_subject(event.subject) device = ( payload.get("device_id") or payload.get("device") @@ -60,27 +59,45 @@ async def handle(self, event: EventMessage) -> ReflexOutcome | None: peer = payload.get("peer") or payload.get("peer_ip") peer_asn = payload.get("peer_asn") or payload.get("remote_as") last_state = payload.get("last_state") or payload.get("previous_state") - # Compose a target identifier that survives whether or not the peer - # IP is known — preferring the canonical session "device|peer" key - # when both are available, falling back to whichever is present. if device and peer: - target = f"{device}|{peer}" + target: str | None = f"{device}|{peer}" elif peer: target = str(peer) elif device: target = str(device) else: target = None + + now = datetime.now(tz=timezone.utc) + + if ctx.dedup_store is not None and target is not None and event_class: + fact_key = f"{event_class}|{target}" + is_new = await ctx.dedup_store.record_unless_duplicate( + fact_key, ttl_seconds=self.dedup_window_seconds + ) + if not is_new: + return ReflexOutcome( + handler=self.id, + subject=event.subject, + target=target, + severity="info", + occurred_at=now, + payload={"source": source}, + outcome="skipped", + rationale=( + f"duplicate bgp_drop within {self.dedup_window_seconds}s " + f"dedup window (source={source!r})" + ), + ) + return ReflexOutcome( handler=self.id, subject=event.subject, target=target, - # BGP session loss is high-severity by default. Operators can - # tune later via the policy library once it exists; we do not - # second-guess severity inside the handler. severity="high", - occurred_at=datetime.now(tz=timezone.utc), + occurred_at=now, payload={ + "source": source, "device": device, "peer": peer, "peer_asn": peer_asn, @@ -88,8 +105,8 @@ async def handle(self, event: EventMessage) -> ReflexOutcome | None: }, outcome="logged", rationale=( - f"BGP session {target!r} backward-transition observed; " - f"last_state={last_state!r} — dev2 idle handler" + f"bgp_drop on {target!r} from {source!r}; " + f"last_state={last_state!r} — dev3 dedup wired" ), ) diff --git a/netcortex/reflex/handlers/link_down.py b/netcortex/reflex/handlers/link_down.py index c562a49..c5c3b53 100644 --- a/netcortex/reflex/handlers/link_down.py +++ b/netcortex/reflex/handlers/link_down.py @@ -1,29 +1,22 @@ """``link_down`` — reflex handler for interface-down signals. -Subscribes to the SNMP linkDown trap subject. In the brain-mapped -architecture this is the fast deterministic response to an interface -going hard-down: log it now, let the deliberative loop (prefrontal, -0.11.0) decide whether to open a ticket, page someone, or just wait -for the symmetric linkUp. - -This module is dev2 scaffolding — the handler is registered and the -runner will subscribe it to the bus, but no publisher exists yet. The -first real linkDown publish lands in 0.8.0-dev3 when the SNMP-trap -sensory adapter (``sensory/trap/snmp.py``) is wired in. - -When publishers exist, the handler will also: - -* fetch the affected (device, interface) from semantic memory and verify - it is not in a maintenance window; -* deduplicate against a Redis "recently seen" window so a flapping link - produces one outcome per minute, not one per trap; -* attach a NetBox journal entry on the Interface object so an operator - sees the trap immediately in the tool they live in; -* emit a follow-up ``reflex.link_down.applied`` event so consolidation - knows a synthetic interface-state transition has been recorded. - -None of that is in dev2. The current implementation captures the trap, -extracts the target, and returns a ``logged`` outcome. +Subscribes to ``sensory.link_down.>`` so the same handler reacts to +every source of a link-down observation (SNMP trap, SNMP poll diff, +Meraki webhook, gNMI dial-out sample) without needing per-source +duplicate registrations. + +Dedup +----- +The same physical link going down typically arrives multiple times +within a short window — the trap, then the Meraki webhook ~50ms later, +then the SNMP poll on its next 30-second pass. We collapse those to a +single fire via the runner-supplied :class:`DedupStore`. Per-handler +defaults documented in ``docs/architecture/subjects.md``. + +Dev2 was idle (no publishers). Dev3 wires the dedup contract but is +still idle in terms of upstream publishers — the first real +``sensory.link_down.snmp_poll.`` publish lands in 0.8.0-dev4 +when the ingest worker starts dual-writing detected state changes. """ from __future__ import annotations @@ -32,50 +25,93 @@ from typing import Final from netcortex.contracts.event_bus import EventMessage -from netcortex.reflex.protocol import ReflexOutcome +from netcortex.contracts.subjects import parse_sensory_subject +from netcortex.reflex.protocol import ReflexContext, ReflexOutcome from netcortex.reflex.registry import register_handler -# Subject pattern. -# -# Real publishers in 0.8.0-dev3+ will use -# ``sensory.snmp.trap.link_down.`` and emit one event per -# (device, interface) transition. The trailing ``>`` matches any number -# of further tokens so the handler can be subscribed today and the -# publisher's exact subject layout can evolve without redeploying the -# handler. -_PATTERN: Final[str] = "sensory.snmp.trap.link_down.>" +_PATTERN: Final[str] = "sensory.link_down.>" + +_DEFAULT_DEDUP_WINDOW_SECONDS: Final[float] = 60.0 class LinkDownHandler: - """Reflex for IF-MIB linkDown traps.""" + """Reflex for link-down observations from any source.""" id: Final[str] = "link_down" pattern: Final[str] = _PATTERN - async def handle(self, event: EventMessage) -> ReflexOutcome | None: + def __init__( + self, + *, + dedup_window_seconds: float = _DEFAULT_DEDUP_WINDOW_SECONDS, + ) -> None: + self.dedup_window_seconds = dedup_window_seconds + + async def handle( + self, event: EventMessage, ctx: ReflexContext + ) -> ReflexOutcome | None: payload = event.payload - target = ( + event_class, source, _target_from_subject = parse_sensory_subject( + event.subject + ) + device = ( payload.get("device_id") or payload.get("device") or payload.get("target") ) interface = payload.get("interface") or payload.get("if_name") + if device and interface: + target: str | None = f"{device}|{interface}" + elif device: + target = str(device) + else: + target = None + + now = datetime.now(tz=timezone.utc) + + # Dedup check — only when both a store is provided AND we have + # a canonical target. An event with no target identifier cannot + # be meaningfully deduped (every arrival is treated as unique). + if ctx.dedup_store is not None and target is not None and event_class: + fact_key = f"{event_class}|{target}" + is_new = await ctx.dedup_store.record_unless_duplicate( + fact_key, ttl_seconds=self.dedup_window_seconds + ) + if not is_new: + return ReflexOutcome( + handler=self.id, + subject=event.subject, + target=target, + # Severity is intentionally demoted on the skipped + # outcome — it is informational corroboration, not a + # second incident. + severity="info", + occurred_at=now, + payload={"source": source}, + outcome="skipped", + rationale=( + f"duplicate link_down within {self.dedup_window_seconds}s " + f"dedup window (source={source!r})" + ), + ) + return ReflexOutcome( handler=self.id, subject=event.subject, - target=str(target) if target else None, + target=target, severity="high", - occurred_at=datetime.now(tz=timezone.utc), + occurred_at=now, payload={ + "source": source, "interface": interface, - # Cap the upstream payload echo so a chatty publisher - # cannot blow up the outcome record. + # Cap upstream key echo at 16 — same rationale as dev2 + # (a chatty publisher cannot blow up the outcome). "upstream_keys": sorted(payload.keys())[:16], }, outcome="logged", rationale=( - f"linkDown observed on {target!r} interface {interface!r}; " - "dev2 idle handler — no remediation yet" + f"link_down on {target!r} from {source!r}; " + "dev3 dedup wired, no remediation yet" ), ) diff --git a/netcortex/reflex/handlers/security_alert.py b/netcortex/reflex/handlers/security_alert.py new file mode 100644 index 0000000..f0cb982 --- /dev/null +++ b/netcortex/reflex/handlers/security_alert.py @@ -0,0 +1,129 @@ +"""``security_alert`` — reflex handler for security webhook events. + +Renamed from ``security_webhook`` (dev2) to ``security_alert`` in dev3 +to reflect the event-class-first taxonomy: the handler subscribes to +``sensory.security_alert.>`` and reacts to every webhook source +(Meraki today; Cisco AMP, future SIEM webhooks tomorrow) without +needing per-source registrations. + +Dedup +----- +Meraki retries webhook delivery on failure (typically 2-3 retries spaced +over a minute or two), so the same ``alertId`` can arrive several times +within a few minutes. Default dedup window is 300 seconds — long enough +to absorb the retry pattern, short enough that distinct alerts on the +same target later in the day still fire fresh. + +Note that ``alertId`` is the *upstream* dedup identifier; we still +construct our own ``fact_key`` from (event_class + target) so the same +underlying incident observed via Meraki AND via a future Cisco AMP +webhook would still collapse to one outcome — assuming both publishers +agree on the canonical target. Identity reconciliation across webhook +sources is a 0.9.0 fusion-layer concern; in 0.8.0 we accept some +slippage (e.g., Meraki using clientMac while AMP uses clientIp). +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Final + +from netcortex.contracts.event_bus import EventMessage +from netcortex.contracts.subjects import parse_sensory_subject +from netcortex.reflex.protocol import ReflexContext, ReflexOutcome, Severity +from netcortex.reflex.registry import register_handler + +_PATTERN: Final[str] = "sensory.security_alert.>" + +_DEFAULT_DEDUP_WINDOW_SECONDS: Final[float] = 300.0 + +# Coarse mapping for dev3. Real severity policy lives in +# policy/security_severity.py once the policy library lands (0.8.x). +_MERAKI_SEVERITY_MAP: Final[dict[str, Severity]] = { + "informational": "info", + "info": "info", + "warning": "warn", + "warn": "warn", + "high": "high", + "critical": "critical", +} + + +class SecurityAlertHandler: + """Reflex for security-class webhook events.""" + + id: Final[str] = "security_alert" + pattern: Final[str] = _PATTERN + + def __init__( + self, + *, + dedup_window_seconds: float = _DEFAULT_DEDUP_WINDOW_SECONDS, + ) -> None: + self.dedup_window_seconds = dedup_window_seconds + + async def handle( + self, event: EventMessage, ctx: ReflexContext + ) -> ReflexOutcome | None: + payload = event.payload + event_class, source, _ = parse_sensory_subject(event.subject) + upstream_sev = str(payload.get("severity") or "").lower() + severity: Severity = _MERAKI_SEVERITY_MAP.get(upstream_sev, "warn") + target = ( + payload.get("clientMac") + or payload.get("deviceSerial") + or payload.get("networkId") + or payload.get("target") + ) + target = str(target) if target else None + event_type = payload.get("alertType") or payload.get("eventType") + + now = datetime.now(tz=timezone.utc) + + if ctx.dedup_store is not None and target is not None and event_class: + # Include event_type in the fact_key when present so two + # different alert types on the same client don't dedup with + # each other. + fact_key_parts = [event_class, target] + if event_type: + fact_key_parts.append(str(event_type)) + fact_key = "|".join(fact_key_parts) + is_new = await ctx.dedup_store.record_unless_duplicate( + fact_key, ttl_seconds=self.dedup_window_seconds + ) + if not is_new: + return ReflexOutcome( + handler=self.id, + subject=event.subject, + target=target, + severity="info", + occurred_at=now, + payload={"source": source, "event_type": event_type}, + outcome="skipped", + rationale=( + f"duplicate security_alert within {self.dedup_window_seconds}s " + f"dedup window (source={source!r}, event_type={event_type!r})" + ), + ) + + return ReflexOutcome( + handler=self.id, + subject=event.subject, + target=target, + severity=severity, + occurred_at=now, + payload={ + "source": source, + "alert_id": payload.get("alertId"), + "event_type": event_type, + "network_id": payload.get("networkId"), + }, + outcome="logged", + rationale=( + f"security_alert {event_type!r} on {target!r} from {source!r}; " + "dev3 dedup wired" + ), + ) + + +_HANDLER = register_handler(SecurityAlertHandler()) diff --git a/netcortex/reflex/handlers/security_webhook.py b/netcortex/reflex/handlers/security_webhook.py deleted file mode 100644 index c9a40d7..0000000 --- a/netcortex/reflex/handlers/security_webhook.py +++ /dev/null @@ -1,88 +0,0 @@ -"""``security_webhook`` — reflex handler for Meraki security webhooks. - -Subscribes to security-class Meraki Dashboard webhooks (IDS alerts, -malware, anomalous traffic, blocked URL hit, etc.) once the webhook -receiver in ``sensory/webhook/meraki.py`` lands (0.8.x patch). Dev2 -ships the handler skeleton so the pattern is reserved and the runner's -subscription map is complete from day one. - -When publishers exist, this handler will: - -* dedupe via Meraki's ``alertId`` (a Redis "seen recently" set with a - short TTL — Meraki retries the same alert on delivery failure); -* check whether the affected client is known to semantic memory; if not, - promote it to the working-memory "unknown clients" set so the - operator UI can surface unrecognized endpoints; -* compute a severity from the ``occurredAt`` + ``eventType`` cross - product using a policy in ``policy/security_severity.py`` (so the - threshold is operator-tunable, not hardcoded); -* attach a NetBox journal entry on the affected IPAddress/Interface - when it can be resolved. - -None of that is in dev2. The current implementation logs the inbound -event and returns an outcome whose severity comes verbatim from the -Meraki payload's ``severity`` field when present, defaulting to -``warn`` (Meraki's own scale runs informational/warning/critical; we -collapse to our four-bucket scale in the policy module later). -""" - -from __future__ import annotations - -from datetime import datetime, timezone -from typing import Final - -from netcortex.contracts.event_bus import EventMessage -from netcortex.reflex.protocol import ReflexOutcome, Severity -from netcortex.reflex.registry import register_handler - -_PATTERN: Final[str] = "sensory.meraki.webhook.security.>" - -# Coarse mapping for dev2. Real severity policy lives in -# policy/security_severity.py once the policy library lands (0.8.x). -_MERAKI_SEVERITY_MAP: Final[dict[str, Severity]] = { - "informational": "info", - "info": "info", - "warning": "warn", - "warn": "warn", - "high": "high", - "critical": "critical", -} - - -class SecurityWebhookHandler: - """Reflex for Meraki security-class webhook events.""" - - id: Final[str] = "security_webhook" - pattern: Final[str] = _PATTERN - - async def handle(self, event: EventMessage) -> ReflexOutcome | None: - payload = event.payload - upstream_sev = str(payload.get("severity") or "").lower() - severity: Severity = _MERAKI_SEVERITY_MAP.get(upstream_sev, "warn") - target = ( - payload.get("clientMac") - or payload.get("deviceSerial") - or payload.get("networkId") - or payload.get("target") - ) - event_type = payload.get("alertType") or payload.get("eventType") - return ReflexOutcome( - handler=self.id, - subject=event.subject, - target=str(target) if target else None, - severity=severity, - occurred_at=datetime.now(tz=timezone.utc), - payload={ - "alert_id": payload.get("alertId"), - "event_type": event_type, - "network_id": payload.get("networkId"), - }, - outcome="logged", - rationale=( - f"meraki security webhook {event_type!r} on {target!r}; " - "dev2 idle handler — dedupe + classification pending" - ), - ) - - -_HANDLER = register_handler(SecurityWebhookHandler()) diff --git a/netcortex/reflex/protocol.py b/netcortex/reflex/protocol.py index 77ef3f8..1026dcb 100644 --- a/netcortex/reflex/protocol.py +++ b/netcortex/reflex/protocol.py @@ -30,6 +30,7 @@ from datetime import datetime from typing import Any, Literal, Protocol, runtime_checkable +from netcortex.contracts.dedup_store import DedupStore from netcortex.contracts.event_bus import EventMessage Severity = Literal["info", "warn", "high", "critical"] @@ -101,6 +102,36 @@ class of "the value I saw at log time differs from what I wrote to used). Not surfaced in the operator UI by default.""" +@dataclass(frozen=True) +class ReflexContext: + """Runtime dependencies a handler may use during ``handle()``. + + Threaded through by the :class:`netcortex.reflex.runner.ReflexRunner` + so handlers can consult shared resources (the dedup store today; + semantic memory, working memory, and the policy engine in later + releases) without each one carrying its own constructor wiring. + + Frozen so a handler cannot replace another handler's view of the + world mid-dispatch. New shared resources are added by appending + fields here; handlers that don't consume them are unaffected. That + forward-compatibility is the whole point of using a context dataclass + rather than positional arguments. + + All fields default to ``None`` so the runner can be constructed + without ever wiring a context (the default-context path) and tests + can pass partial contexts that exercise only the resources they + care about. + """ + + dedup_store: DedupStore | None = None + """If set, handlers should consult it via + :meth:`DedupStore.record_unless_duplicate` to suppress duplicate + arrivals of the same logical event (e.g. trap + webhook + poll all + observing one interface going down). Handlers that opt out of dedup + — because their event class is inherently de-duplicated upstream — + may ignore this field.""" + + @runtime_checkable class ReflexHandler(Protocol): """Minimum surface every reflex handler must expose. @@ -126,11 +157,21 @@ def pattern(self) -> str: Validated by the runner against the same grammar :class:`netcortex.thalamus.NatsEventBus` enforces. One pattern per - handler in 0.8.0-dev2. + handler in 0.8.0-dev3. + + Patterns follow the event-class-first taxonomy documented in + ``docs/architecture/subjects.md`` — + ``sensory...`` — so a single + wildcard like ``sensory.link_down.>`` catches every source of a + link-down observation. """ ... - async def handle(self, event: EventMessage) -> ReflexOutcome | None: + async def handle( + self, + event: EventMessage, + ctx: ReflexContext, + ) -> ReflexOutcome | None: """Process one event. Returning :class:`ReflexOutcome` instructs the runner to log / @@ -141,5 +182,14 @@ async def handle(self, event: EventMessage) -> ReflexOutcome | None: way to silently skip an error — raise instead, and the runner will convert it to an ``errored`` outcome with the traceback in ``diagnostic``. + + ``ctx`` carries shared runtime resources (dedup store and, in + later releases, semantic memory / working memory / policy). A + handler that consults the dedup store and finds the event is a + duplicate SHOULD return a ``ReflexOutcome`` with + ``outcome="skipped"`` and a ``rationale`` naming the dedup + window — the skipped outcome is itself useful telemetry + (corroboration count, visibility gaps) so the runner persists + it the same as any other outcome. """ ... diff --git a/netcortex/reflex/runner.py b/netcortex/reflex/runner.py index e36d67d..63b2579 100644 --- a/netcortex/reflex/runner.py +++ b/netcortex/reflex/runner.py @@ -40,7 +40,7 @@ from datetime import datetime, timezone from netcortex.contracts.event_bus import EventBus, EventMessage -from netcortex.reflex.protocol import ReflexHandler, ReflexOutcome +from netcortex.reflex.protocol import ReflexContext, ReflexHandler, ReflexOutcome from netcortex.reflex.registry import all_handlers _LOG = logging.getLogger(__name__) @@ -67,11 +67,17 @@ def __init__( bus: EventBus, *, handlers: list[ReflexHandler] | None = None, + context: ReflexContext | None = None, ) -> None: self._bus = bus self._handlers: list[ReflexHandler] = ( list(handlers) if handlers is not None else list(all_handlers()) ) + # Default context has every shared resource set to None — the + # opt-out path for handlers that don't need any of them. The + # production wiring (web pod / worker pod) replaces this with a + # context that has dedup_store + future memory layers attached. + self._context: ReflexContext = context or ReflexContext() self._tasks: list[asyncio.Task[None]] = [] self._started = False self._stopping = False @@ -89,6 +95,11 @@ def handlers(self) -> list[ReflexHandler]: """Snapshot of handlers this runner is driving (read-only).""" return list(self._handlers) + @property + def context(self) -> ReflexContext: + """The :class:`ReflexContext` every handler will receive on dispatch.""" + return self._context + # ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------ @@ -172,7 +183,7 @@ async def _dispatch_one( ) -> None: """Invoke one handler safely and persist its outcome.""" try: - outcome = await handler.handle(event) + outcome = await handler.handle(event, self._context) except asyncio.CancelledError: raise except Exception as exc: diff --git a/netcortex/thalamus/nats_bus.py b/netcortex/thalamus/nats_bus.py index af7f68b..1b80411 100644 --- a/netcortex/thalamus/nats_bus.py +++ b/netcortex/thalamus/nats_bus.py @@ -58,7 +58,16 @@ # Subject syntax — kept in sync with InMemoryEventBus so both backends reject # the same set of malformed subjects (the contract tests publish these as # negative cases and expect rejection from any conforming implementation). -_VALID_PUBLISH_SUBJECT = re.compile(r"^[A-Za-z0-9_\-]+(?:\.[A-Za-z0-9_\-]+)*$") +# NATS subject grammar: dot-separated tokens where each token is one or +# more printable, non-whitespace characters EXCLUDING the three reserved +# meta-characters: '.' (token separator), '*' (single-token wildcard), +# and '>' (multi-token wildcard). This admits real-world identifier +# characters like ':' (MAC addresses), '|' (compound NetCortex targets +# like 'device|interface'), '/' (interface names like Gi0/1), and '+'. +# Earlier revisions used [A-Za-z0-9_-] which silently rejected +# realistic subjects; that mismatch is now fixed so the bus accepts +# everything the taxonomy in docs/architecture/subjects.md emits. +_VALID_PUBLISH_SUBJECT = re.compile(r"^[^.\s*>]+(?:\.[^.\s*>]+)*$") def _validate_pattern(pattern: str) -> None: diff --git a/netcortex/working/__init__.py b/netcortex/working/__init__.py new file mode 100644 index 0000000..f21ceb7 --- /dev/null +++ b/netcortex/working/__init__.py @@ -0,0 +1,23 @@ +"""Working memory — short-lived, fast, replaceable state stores. + +Working memory in the brain-mapped architecture (see +[`docs/architecture/brain.md`](../../docs/architecture/brain.md)) is the +layer that holds **right now** state: active alerts, in-flight +reconciliation jobs, dedup windows, sliding metric aggregates. It is +deliberately separate from semantic memory (Neo4j, long-lived structural +knowledge) and episodic memory (Splunk, raw event history). + +Production target storage is Redis. The interfaces are defined in +``netcortex/contracts/`` and concrete backends live in the per-concern +subpackages here: + +* ``netcortex.working.dedup`` — TTL-windowed dedup (in-memory for 0.8.0, + Redis from 0.9.0) +* ``netcortex.working.activity`` (future) — sliding-window counters +* ``netcortex.working.queues`` (future) — per-flow rate budgets + +In 0.8.0 we ship only the in-memory implementations. They are functional +and used in CI; production deployments will swap them for Redis-backed +implementations in 0.9.0 with no call-site changes (the swap is wired +through the Protocols in ``netcortex/contracts/``). +""" diff --git a/netcortex/working/dedup/__init__.py b/netcortex/working/dedup/__init__.py new file mode 100644 index 0000000..5947605 --- /dev/null +++ b/netcortex/working/dedup/__init__.py @@ -0,0 +1,10 @@ +"""Working-memory dedup implementations. + +The Protocol lives in :mod:`netcortex.contracts.dedup_store`. This +package contains the concrete implementations the runner can be wired +to. +""" + +from netcortex.working.dedup.in_memory import InMemoryDedupStore + +__all__ = ["InMemoryDedupStore"] diff --git a/netcortex/working/dedup/in_memory.py b/netcortex/working/dedup/in_memory.py new file mode 100644 index 0000000..0a94da9 --- /dev/null +++ b/netcortex/working/dedup/in_memory.py @@ -0,0 +1,155 @@ +"""Single-process in-memory :class:`DedupStore`. + +Asyncio-safe (one ``asyncio.Lock`` serializes mutations), TTL-bounded, +and size-bounded with LRU eviction so a misbehaving publisher cannot OOM +the runner. The implementation is small on purpose — production +multi-replica deployments use the Redis-backed store starting in 0.9.0, +and the in-memory version is meant for single-replica production, +single-process CI, and unit tests. + +Operators MUST NOT deploy multi-replica reflex runners against this +store: there is no cross-process visibility, so two replicas would each +fire the reflex for the same event. The runtime config code that wires +the store is responsible for refusing this configuration, not this +class. +""" + +from __future__ import annotations + +import asyncio +import logging +import time +from collections import OrderedDict +from typing import Final + +_LOG = logging.getLogger(__name__) + +#: Hard upper bound on entries we keep. Keys beyond this are LRU-evicted +#: even if not yet expired. The number is large enough that a normal +#: production cardinality (10k devices × 10 event classes × 30 active +#: facts per device) sits comfortably below; small enough that a +#: pathological 1000x explosion still fits in a few MB. +_DEFAULT_MAX_ENTRIES: Final[int] = 100_000 + +#: Warn if a caller asks for a TTL longer than this — long-lived dedup +#: state tends to mask real flap behavior the operator needs to see. +_LONG_TTL_WARN_SECONDS: Final[float] = 24 * 60 * 60 + + +class InMemoryDedupStore: + """Time-windowed dedup, single-process, asyncio-safe. + + Parameters + ---------- + max_entries: + Cap on stored keys. When exceeded, the LRU entry (the one we + haven't touched longest) is evicted, even if its TTL has not + expired. Default sized for normal production workloads — raise + if you legitimately have more concurrent fact keys. + clock: + Override for the monotonic clock; tests pass a controllable + clock so they can advance time without ``asyncio.sleep``. + Production code passes nothing. + """ + + def __init__( + self, + *, + max_entries: int = _DEFAULT_MAX_ENTRIES, + clock: "callable[..., float] | None" = None, + ) -> None: + if max_entries <= 0: + raise ValueError(f"max_entries must be > 0, got {max_entries}") + self._max_entries = max_entries + self._clock = clock or time.monotonic + # OrderedDict tracks insertion order, which we mutate on every + # access so dict iteration is LRU. value = expiry_monotonic. + self._entries: OrderedDict[str, float] = OrderedDict() + self._lock = asyncio.Lock() + self._closed = False + + async def record_unless_duplicate( + self, + fact_key: str, + *, + ttl_seconds: float, + ) -> bool: + """Atomically record ``fact_key`` if absent. See Protocol docs.""" + if not fact_key: + raise ValueError("fact_key must be non-empty") + if ttl_seconds <= 0: + raise ValueError(f"ttl_seconds must be > 0, got {ttl_seconds}") + if self._closed: + raise RuntimeError("InMemoryDedupStore.record_unless_duplicate after close()") + if ttl_seconds > _LONG_TTL_WARN_SECONDS: + _LOG.warning( + "dedup_store.long_ttl ttl_seconds=%s fact_key=%s", + ttl_seconds, fact_key, + ) + + async with self._lock: + now = self._clock() + existing = self._entries.get(fact_key) + if existing is not None and existing > now: + # Touch for LRU — even a duplicate "refreshes interest". + # This matters under sustained duplicate floods: if we + # didn't touch, the duplicated key would age out for LRU + # eviction faster than fresh keys, which is the wrong + # direction. + self._entries.move_to_end(fact_key) + return False + + # Lazy GC: when we're at capacity OR have any stale entries + # adjacent to insertion, sweep a bounded number of expired + # heads off the LRU. Bounded so worst-case latency is O(B). + self._sweep_expired(now, budget=32) + + # Enforce hard cap by evicting LRU(s) if needed. + while len(self._entries) >= self._max_entries: + evicted_key, evicted_exp = self._entries.popitem(last=False) + _LOG.debug( + "dedup_store.evicted_lru key=%s expired_in_s=%s", + evicted_key, round(evicted_exp - now, 3), + ) + + self._entries[fact_key] = now + ttl_seconds + return True + + def _sweep_expired(self, now: float, *, budget: int) -> None: + """Evict up to ``budget`` expired entries from the LRU head. + + Called under lock. We sweep only from the head (oldest entries) + because that's where expired keys cluster — the move_to_end on + every touch means recently-touched keys are at the tail. A + bounded sweep keeps tail latency predictable; the rare + worst-case (all entries expired in one burst) just means future + calls each chip away another 32 entries until the head is clean. + """ + swept = 0 + while swept < budget and self._entries: + head_key = next(iter(self._entries)) + head_exp = self._entries[head_key] + if head_exp > now: + return # head is fresh — by LRU ordering, nothing behind is expired enough to matter for this sweep + del self._entries[head_key] + swept += 1 + + async def close(self) -> None: + """Drop all state. Idempotent.""" + if self._closed: + return + self._closed = True + async with self._lock: + self._entries.clear() + + # ------------------------------------------------------------------ + # Test helpers — not on the Protocol, do not depend on them in + # production code. + # ------------------------------------------------------------------ + + def size(self) -> int: + """Current number of tracked keys. Lock-free read for inspection.""" + return len(self._entries) + + +__all__ = ["InMemoryDedupStore"] diff --git a/pyproject.toml b/pyproject.toml index 5f2e4a0..75e8f6c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "netcortex" -version = "0.8.0.dev2" +version = "0.8.0.dev3" description = "The intelligence layer for your network — multi-dimensional graph of the network bridging Meraki, Catalyst Center, Intersight, and more with NetBox as SoT" readme = "README.md" requires-python = ">=3.12" diff --git a/tests/contracts/conftest.py b/tests/contracts/conftest.py index 83a3690..10d3adb 100644 --- a/tests/contracts/conftest.py +++ b/tests/contracts/conftest.py @@ -19,7 +19,7 @@ import pytest -from netcortex.contracts import EventBus, Policy, SensoryAdapter +from netcortex.contracts import DedupStore, EventBus, Policy, SensoryAdapter # --------------------------------------------------------------------------- # EventBus registry. @@ -120,10 +120,40 @@ def policy_factory(request: pytest.FixtureRequest) -> Callable[[], Policy]: raise RuntimeError(f"unknown policy impl: {name}") +# --------------------------------------------------------------------------- +# DedupStore registry. +# --------------------------------------------------------------------------- + + +def _make_in_memory_dedup_store() -> DedupStore: + from netcortex.working.dedup import InMemoryDedupStore + + return InMemoryDedupStore() + + +# Redis-backed store lands in 0.9.0; when it does, add a +# _make_redis_dedup_store factory here that pytest.skips when REDIS_URL is +# absent, exactly the same pattern as _make_nats_event_bus above. +DEDUP_STORE_IMPLEMENTATIONS: list[tuple[str, Callable[[], DedupStore]]] = [ + ("in_memory", _make_in_memory_dedup_store), +] + + +@pytest.fixture(params=[name for name, _ in DEDUP_STORE_IMPLEMENTATIONS], ids=lambda n: n) +def dedup_store_factory(request: pytest.FixtureRequest) -> Callable[[], DedupStore]: + name: str = request.param + for n, factory in DEDUP_STORE_IMPLEMENTATIONS: + if n == name: + return factory + raise RuntimeError(f"unknown dedup store impl: {name}") + + __all__: Iterable[str] = ( + "DEDUP_STORE_IMPLEMENTATIONS", "EVENT_BUS_IMPLEMENTATIONS", "SENSORY_ADAPTER_IMPLEMENTATIONS", "POLICY_IMPLEMENTATIONS", + "dedup_store_factory", "event_bus_factory", "sensory_adapter_factory", "policy_factory", diff --git a/tests/contracts/dedup_store/__init__.py b/tests/contracts/dedup_store/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/contracts/dedup_store/test_dedup_store_contract.py b/tests/contracts/dedup_store/test_dedup_store_contract.py new file mode 100644 index 0000000..ebeef3f --- /dev/null +++ b/tests/contracts/dedup_store/test_dedup_store_contract.py @@ -0,0 +1,138 @@ +"""Contract suite for :class:`netcortex.contracts.dedup_store.DedupStore`. + +Every implementation registered in +``tests/contracts/conftest.py::DEDUP_STORE_IMPLEMENTATIONS`` runs against +the same assertions. A future Redis-backed implementation only needs a +factory function and a row in the registry to gain full coverage. + +The cases below are the **observable** behavior the Protocol promises; +anything implementation-specific (LRU cap, eviction sweep, etc.) lives +in the implementation's own unit tests. +""" + +from __future__ import annotations + +import asyncio +from collections.abc import Callable + +import pytest + +from netcortex.contracts import DedupStore + +pytestmark = pytest.mark.asyncio + + +async def test_first_call_returns_true( + dedup_store_factory: Callable[[], DedupStore], +) -> None: + store = dedup_store_factory() + try: + assert await store.record_unless_duplicate("k1", ttl_seconds=1.0) is True + finally: + await store.close() + + +async def test_second_call_within_ttl_returns_false( + dedup_store_factory: Callable[[], DedupStore], +) -> None: + store = dedup_store_factory() + try: + assert await store.record_unless_duplicate("k1", ttl_seconds=5.0) is True + assert await store.record_unless_duplicate("k1", ttl_seconds=5.0) is False + finally: + await store.close() + + +async def test_different_keys_are_independent( + dedup_store_factory: Callable[[], DedupStore], +) -> None: + store = dedup_store_factory() + try: + assert await store.record_unless_duplicate("k1", ttl_seconds=5.0) is True + assert await store.record_unless_duplicate("k2", ttl_seconds=5.0) is True + # ``k1`` and ``k2`` did not collide. + assert await store.record_unless_duplicate("k1", ttl_seconds=5.0) is False + assert await store.record_unless_duplicate("k2", ttl_seconds=5.0) is False + finally: + await store.close() + + +async def test_call_after_ttl_expiry_returns_true( + dedup_store_factory: Callable[[], DedupStore], +) -> None: + """Once the TTL window closes, the key is treatable as new again.""" + store = dedup_store_factory() + try: + assert await store.record_unless_duplicate("k1", ttl_seconds=0.1) is True + # Sleep ~3x the TTL so the window has definitely closed even on + # heavily-loaded CI runners. Real-clock dependency is acceptable + # here because the TTL is short. + await asyncio.sleep(0.35) + assert await store.record_unless_duplicate("k1", ttl_seconds=0.1) is True + finally: + await store.close() + + +async def test_empty_key_rejected( + dedup_store_factory: Callable[[], DedupStore], +) -> None: + """An empty fact_key is almost always a publisher bug — fail loud.""" + store = dedup_store_factory() + try: + with pytest.raises(ValueError): + await store.record_unless_duplicate("", ttl_seconds=1.0) + finally: + await store.close() + + +@pytest.mark.parametrize("bad_ttl", [0.0, -1.0, -0.001]) +async def test_non_positive_ttl_rejected( + dedup_store_factory: Callable[[], DedupStore], + bad_ttl: float, +) -> None: + """A non-positive TTL is meaningless and likely a config error.""" + store = dedup_store_factory() + try: + with pytest.raises(ValueError): + await store.record_unless_duplicate("k1", ttl_seconds=bad_ttl) + finally: + await store.close() + + +async def test_close_is_idempotent( + dedup_store_factory: Callable[[], DedupStore], +) -> None: + store = dedup_store_factory() + await store.close() + await store.close() + + +async def test_use_after_close_raises( + dedup_store_factory: Callable[[], DedupStore], +) -> None: + """The Protocol requires post-close calls to raise RuntimeError.""" + store = dedup_store_factory() + await store.close() + with pytest.raises(RuntimeError): + await store.record_unless_duplicate("k1", ttl_seconds=1.0) + + +async def test_concurrent_callers_for_same_key_dedupe( + dedup_store_factory: Callable[[], DedupStore], +) -> None: + """The Protocol's atomicity guarantee — only one concurrent caller wins. + + Spawn many coroutines that all race on the same key simultaneously; + exactly one must observe ``True`` and the rest must observe ``False``. + A non-atomic implementation would let multiple racers all return True. + """ + store = dedup_store_factory() + try: + results = await asyncio.gather(*[ + store.record_unless_duplicate("contested", ttl_seconds=10.0) + for _ in range(50) + ]) + assert sum(1 for r in results if r is True) == 1 + assert sum(1 for r in results if r is False) == 49 + finally: + await store.close() diff --git a/tests/contracts/event_bus/in_memory.py b/tests/contracts/event_bus/in_memory.py index 8c58dcc..0201974 100644 --- a/tests/contracts/event_bus/in_memory.py +++ b/tests/contracts/event_bus/in_memory.py @@ -32,7 +32,19 @@ ) _LOG = logging.getLogger(__name__) -_VALID_PUBLISH_SUBJECT = re.compile(r"^[A-Za-z0-9_\-]+(?:\.[A-Za-z0-9_\-]+)*$") +# NATS subject grammar: dot-separated tokens where each token is one or +# more printable, non-whitespace characters EXCLUDING the three reserved +# meta-characters: '.' (token separator), '*' (single-token wildcard), +# and '>' (multi-token wildcard). This admits real-world identifier +# characters like ':' (MACs), '|' (compound NetCortex targets), '/' +# (interface names like Gi0/1), and '+' (URL-encoded payloads). +# +# Earlier revisions of this validator only allowed [A-Za-z0-9_-] which +# silently rejected publishes like +# 'sensory.link_down.snmp_trap.r1|Gi0/1', leading to subjects-look- +# correct-but-publish-throws bugs. Match the canonical taxonomy in +# docs/architecture/subjects.md. +_VALID_PUBLISH_SUBJECT = re.compile(r"^[^.\s*>]+(?:\.[^.\s*>]+)*$") def _compile_pattern(pattern: str) -> re.Pattern[str]: diff --git a/tests/contracts/test_subjects.py b/tests/contracts/test_subjects.py new file mode 100644 index 0000000..549d13b --- /dev/null +++ b/tests/contracts/test_subjects.py @@ -0,0 +1,140 @@ +"""Tests for the subject taxonomy module. + +These check the **validation** the builders perform — empty parts, +unknown event classes, unknown sources, embedded dots, whitespace — +because those are exactly the silent-but-wrong subject failures that +plague NATS-based systems when there is no centralized builder. The +closed-vocabulary checks are the contract that production publishers +will rely on starting in 0.8.0-dev4. +""" + +from __future__ import annotations + +import pytest + +from netcortex.contracts.subjects import ( + SENSORY_EVENT_CLASSES, + SENSORY_SOURCES, + fact_subject, + parse_sensory_subject, + sensory_subject, +) + + +# --------------------------------------------------------------------------- +# sensory_subject builder +# --------------------------------------------------------------------------- + + +def test_builds_well_formed_subject() -> None: + s = sensory_subject("link_down", "snmp_trap", "r1|Gi0/1") + assert s == "sensory.link_down.snmp_trap.r1|Gi0/1" + + +def test_supports_multi_token_targets() -> None: + s = sensory_subject( + "security_alert", "meraki_webhook", "N_1", "aa:bb:cc:dd:ee:ff" + ) + # Multi-token targets give downstream subscribers more wildcard + # options (e.g. sensory.security_alert.*.N_1.>). + assert s == "sensory.security_alert.meraki_webhook.N_1.aa:bb:cc:dd:ee:ff" + + +def test_rejects_unknown_event_class() -> None: + with pytest.raises(ValueError, match="unknown sensory event class"): + sensory_subject("not_an_event", "snmp_trap", "r1") + + +def test_rejects_unknown_source() -> None: + with pytest.raises(ValueError, match="unknown sensory source"): + sensory_subject("link_down", "carrier_pigeon", "r1") + + +def test_rejects_no_target_parts() -> None: + with pytest.raises(ValueError, match="at least one target part"): + sensory_subject("link_down", "snmp_trap") + + +def test_rejects_empty_target_part() -> None: + with pytest.raises(ValueError, match="empty"): + sensory_subject("link_down", "snmp_trap", "") + + +def test_rejects_dot_in_target_part() -> None: + """`.` is the NATS token separator — sneaking one in produces a + malformed subject with the wrong token count.""" + with pytest.raises(ValueError, match="separator"): + sensory_subject("link_down", "snmp_trap", "r1.Gi0/1") + + +def test_rejects_whitespace_in_target_part() -> None: + with pytest.raises(ValueError, match="whitespace"): + sensory_subject("link_down", "snmp_trap", "r1 problem") + + +# --------------------------------------------------------------------------- +# fact_subject builder +# --------------------------------------------------------------------------- + + +def test_fact_subject_basic() -> None: + s = fact_subject("link_down", "r1|Gi0/1") + assert s == "fact.link_down.r1|Gi0/1" + + +def test_fact_subject_rejects_unknown_class() -> None: + with pytest.raises(ValueError): + fact_subject("not_an_event", "r1") + + +def test_fact_subject_requires_target() -> None: + with pytest.raises(ValueError): + fact_subject("link_down") + + +# --------------------------------------------------------------------------- +# parse_sensory_subject +# --------------------------------------------------------------------------- + + +def test_parse_extracts_class_source_and_target() -> None: + assert parse_sensory_subject("sensory.link_down.snmp_trap.r1|Gi0/1") == ( + "link_down", + "snmp_trap", + "r1|Gi0/1", + ) + + +def test_parse_joins_multi_token_target() -> None: + assert parse_sensory_subject( + "sensory.security_alert.meraki_webhook.N_1.aabbccddeeff" + ) == ("security_alert", "meraki_webhook", "N_1.aabbccddeeff") + + +def test_parse_returns_empty_tuple_for_non_sensory() -> None: + assert parse_sensory_subject("fact.link_down.r1") == ("", "", "") + + +def test_parse_returns_empty_tuple_for_too_few_tokens() -> None: + assert parse_sensory_subject("sensory.link_down.snmp_trap") == ("", "", "") + + +# --------------------------------------------------------------------------- +# Vocabulary integrity +# --------------------------------------------------------------------------- + + +def test_all_event_classes_are_lower_snake_case() -> None: + """A taxonomy with mixed case becomes a footgun on case-sensitive + matching. Lock it down here.""" + for cls in SENSORY_EVENT_CLASSES: + assert cls == cls.lower(), f"event class {cls!r} must be lowercase" + assert " " not in cls, f"event class {cls!r} must not contain whitespace" + assert "." not in cls, f"event class {cls!r} must not contain '.'" + + +def test_all_sources_are_lower_snake_case() -> None: + for src in SENSORY_SOURCES: + assert src == src.lower(), f"source {src!r} must be lowercase" + assert " " not in src, f"source {src!r} must not contain whitespace" + assert "." not in src, f"source {src!r} must not contain '.'" diff --git a/tests/reflex/test_handlers.py b/tests/reflex/test_handlers.py index 49389ce..5661841 100644 --- a/tests/reflex/test_handlers.py +++ b/tests/reflex/test_handlers.py @@ -1,11 +1,19 @@ -"""Tests for the first-party reflex handlers. +"""Tests for the first-party reflex handlers (dev3 taxonomy). -These cover the per-handler outcome shape and the target-extraction -fallbacks. The runner-level dispatch is covered in ``test_runner.py``. +Covers: -Importing the handlers module side-registers them with the global -registry. We pin those identities here so a rename to a handler id — -which is an operator-facing field on every outcome — fails CI loudly. +* the operator-facing identity surface (handler id + subject pattern); +* per-handler outcome shape on the happy path; +* target-extraction fallbacks for partial / missing payload fields; +* dedup behavior when a :class:`DedupStore` is wired through the + :class:`ReflexContext`. + +The runner-level dispatch is covered separately in ``test_runner.py``. + +Importing ``netcortex.reflex.handlers`` side-registers all three first- +party handlers. We then read them back out of the registry by id; this +exercises the same import-for-side-effect path the runner uses in +production, rather than constructing fresh instances ourselves. """ from __future__ import annotations @@ -15,14 +23,14 @@ import pytest from netcortex.contracts.event_bus import EventMessage - -# Import the package so the handlers register themselves. We then read -# them back out of the registry by id rather than constructing fresh -# instances — that way a future change to the registration mechanics is -# automatically exercised by these tests too. -from netcortex.reflex import handlers as _handlers # noqa: F401 -from netcortex.reflex.protocol import ReflexHandler, ReflexOutcome +from netcortex.reflex import handlers as _handlers # noqa: F401 — side-effect import +from netcortex.reflex.protocol import ( + ReflexContext, + ReflexHandler, + ReflexOutcome, +) from netcortex.reflex.registry import get_handler +from netcortex.working.dedup import InMemoryDedupStore pytestmark = pytest.mark.asyncio @@ -36,6 +44,10 @@ def _event(subject: str, payload: dict[str, object]) -> EventMessage: ) +def _empty_ctx() -> ReflexContext: + return ReflexContext() + + # --------------------------------------------------------------------------- # Identity / wiring smoke tests # --------------------------------------------------------------------------- @@ -44,9 +56,9 @@ def _event(subject: str, payload: dict[str, object]) -> EventMessage: @pytest.mark.parametrize( "handler_id,expected_pattern", [ - ("link_down", "sensory.snmp.trap.link_down.>"), - ("security_webhook", "sensory.meraki.webhook.security.>"), - ("bgp_drop", "sensory.snmp.trap.bgp_backward_transition.>"), + ("link_down", "sensory.link_down.>"), + ("security_alert", "sensory.security_alert.>"), + ("bgp_drop", "sensory.bgp_drop.>"), ], ) async def test_handler_registered_with_expected_pattern( @@ -57,10 +69,8 @@ async def test_handler_registered_with_expected_pattern( A rename here is fine, but it MUST be intentional — production operators read these ids in alerts and on the reconciliation UI. - Declared ``async`` purely to match the module-level - ``pytestmark = pytest.mark.asyncio``; there is nothing to await - here. Splitting this into its own file just to avoid the marker - would be a worse trade-off. + Declared async to match the module-level ``pytestmark = + pytest.mark.asyncio`` (no await needed here). """ h = get_handler(handler_id) assert isinstance(h, ReflexHandler) @@ -75,25 +85,29 @@ async def test_handler_registered_with_expected_pattern( async def test_link_down_extracts_device_and_interface() -> None: h = get_handler("link_down") - outcome = await h.handle(_event( - "sensory.snmp.trap.link_down.r1", - {"device_id": "r1", "interface": "Gi0/1"}, - )) + outcome = await h.handle( + _event( + "sensory.link_down.snmp_trap.r1", + {"device_id": "r1", "interface": "Gi0/1"}, + ), + _empty_ctx(), + ) assert outcome is not None assert outcome.handler == "link_down" - assert outcome.target == "r1" + assert outcome.target == "r1|Gi0/1" assert outcome.severity == "high" assert outcome.payload["interface"] == "Gi0/1" + assert outcome.payload["source"] == "snmp_trap" assert outcome.outcome == "logged" async def test_link_down_handles_missing_target_field() -> None: - """No device field at all — outcome.target is None, not a stringified ``None``.""" + """No device field at all — outcome.target is None.""" h = get_handler("link_down") - outcome = await h.handle(_event( - "sensory.snmp.trap.link_down.unknown", - {"interface": "Gi0/1"}, - )) + outcome = await h.handle( + _event("sensory.link_down.snmp_trap.unknown", {"interface": "Gi0/1"}), + _empty_ctx(), + ) assert outcome is not None assert outcome.target is None @@ -102,13 +116,114 @@ async def test_link_down_caps_upstream_keys() -> None: """A pathologically wide payload doesn't blow up the outcome record.""" h = get_handler("link_down") wide = {f"k{i}": i for i in range(100)} - outcome = await h.handle(_event("sensory.snmp.trap.link_down.r1", wide)) + outcome = await h.handle( + _event("sensory.link_down.snmp_trap.r1", wide), _empty_ctx() + ) assert outcome is not None assert len(outcome.payload["upstream_keys"]) <= 16 +async def test_link_down_dedups_across_sources() -> None: + """The whole point of dev3: trap + webhook + poll on same target dedup.""" + h = get_handler("link_down") + store = InMemoryDedupStore() + ctx = ReflexContext(dedup_store=store) + try: + # First arrival: SNMP trap → fires + out1 = await h.handle( + _event( + "sensory.link_down.snmp_trap.r1", + {"device_id": "r1", "interface": "Gi0/1"}, + ), + ctx, + ) + assert out1 is not None + assert out1.outcome == "logged" + + # Same physical event arrives via Meraki webhook ~50ms later + out2 = await h.handle( + _event( + "sensory.link_down.meraki_webhook.r1", + {"device_id": "r1", "interface": "Gi0/1"}, + ), + ctx, + ) + assert out2 is not None + assert out2.outcome == "skipped" + assert "duplicate" in out2.rationale.lower() + # Severity demoted on the skipped outcome. + assert out2.severity == "info" + + # And via the SNMP poll diff + out3 = await h.handle( + _event( + "sensory.link_down.snmp_poll.r1", + {"device_id": "r1", "interface": "Gi0/1"}, + ), + ctx, + ) + assert out3 is not None + assert out3.outcome == "skipped" + finally: + await store.close() + + +async def test_link_down_does_not_dedup_different_targets() -> None: + """Different (device, interface) tuples are independent facts.""" + h = get_handler("link_down") + store = InMemoryDedupStore() + ctx = ReflexContext(dedup_store=store) + try: + out1 = await h.handle( + _event( + "sensory.link_down.snmp_trap.r1", + {"device_id": "r1", "interface": "Gi0/1"}, + ), + ctx, + ) + out2 = await h.handle( + _event( + "sensory.link_down.snmp_trap.r1", + {"device_id": "r1", "interface": "Gi0/2"}, + ), + ctx, + ) + out3 = await h.handle( + _event( + "sensory.link_down.snmp_trap.r2", + {"device_id": "r2", "interface": "Gi0/1"}, + ), + ctx, + ) + assert out1 is not None and out1.outcome == "logged" + assert out2 is not None and out2.outcome == "logged" + assert out3 is not None and out3.outcome == "logged" + finally: + await store.close() + + +async def test_link_down_skips_dedup_when_target_unknown() -> None: + """Without a canonical target we can't meaningfully dedup.""" + h = get_handler("link_down") + store = InMemoryDedupStore() + ctx = ReflexContext(dedup_store=store) + try: + # No device_id / interface — handler returns logged outcome twice, + # because there's no fact_key to dedup against. + out1 = await h.handle( + _event("sensory.link_down.snmp_trap.unknown", {}), ctx + ) + out2 = await h.handle( + _event("sensory.link_down.snmp_trap.unknown", {}), ctx + ) + assert out1 is not None and out1.outcome == "logged" + assert out2 is not None and out2.outcome == "logged" + finally: + await store.close() + + # --------------------------------------------------------------------------- -# security_webhook +# security_alert # --------------------------------------------------------------------------- @@ -127,43 +242,124 @@ async def test_link_down_caps_upstream_keys() -> None: ("CRITICAL", "critical"), ], ) -async def test_security_webhook_severity_mapping( +async def test_security_alert_severity_mapping( upstream: str, expected: str ) -> None: - h = get_handler("security_webhook") - outcome = await h.handle(_event( - "sensory.meraki.webhook.security.ids_alerted", - {"severity": upstream, "alertId": "abc-123", - "networkId": "N_1", "clientMac": "aa:bb:cc:dd:ee:ff", - "alertType": "ids_alerted"}, - )) + h = get_handler("security_alert") + outcome = await h.handle( + _event( + "sensory.security_alert.meraki_webhook.N_1|aabbccddeeff", + { + "severity": upstream, + "alertId": "abc-123", + "networkId": "N_1", + "clientMac": "aa:bb:cc:dd:ee:ff", + "alertType": "ids_alerted", + }, + ), + _empty_ctx(), + ) assert outcome is not None assert outcome.severity == expected -async def test_security_webhook_prefers_client_mac_as_target() -> None: - h = get_handler("security_webhook") - outcome = await h.handle(_event( - "sensory.meraki.webhook.security.ids_alerted", - {"clientMac": "aa:bb:cc:dd:ee:ff", - "deviceSerial": "Q2XX-YYYY-ZZZZ", - "networkId": "N_1", - "severity": "warning"}, - )) +async def test_security_alert_prefers_client_mac_as_target() -> None: + h = get_handler("security_alert") + outcome = await h.handle( + _event( + "sensory.security_alert.meraki_webhook.N_1|aabbccddeeff", + { + "clientMac": "aa:bb:cc:dd:ee:ff", + "deviceSerial": "Q2XX-YYYY-ZZZZ", + "networkId": "N_1", + "severity": "warning", + }, + ), + _empty_ctx(), + ) assert outcome is not None assert outcome.target == "aa:bb:cc:dd:ee:ff" -async def test_security_webhook_falls_back_to_device_serial() -> None: - h = get_handler("security_webhook") - outcome = await h.handle(_event( - "sensory.meraki.webhook.security.malware_detected", - {"deviceSerial": "Q2XX-YYYY-ZZZZ", "networkId": "N_1"}, - )) +async def test_security_alert_falls_back_to_device_serial() -> None: + h = get_handler("security_alert") + outcome = await h.handle( + _event( + "sensory.security_alert.meraki_webhook.N_1|Q2XX-YYYY-ZZZZ", + {"deviceSerial": "Q2XX-YYYY-ZZZZ", "networkId": "N_1"}, + ), + _empty_ctx(), + ) assert outcome is not None assert outcome.target == "Q2XX-YYYY-ZZZZ" +async def test_security_alert_dedups_repeated_meraki_retries() -> None: + """Meraki retries the same alert; we collapse to one outcome.""" + h = get_handler("security_alert") + store = InMemoryDedupStore() + ctx = ReflexContext(dedup_store=store) + try: + payload = { + "clientMac": "aa:bb:cc:dd:ee:ff", + "networkId": "N_1", + "severity": "warning", + "alertId": "retry-test", + "alertType": "ids_alerted", + } + # First delivery + out1 = await h.handle( + _event( + "sensory.security_alert.meraki_webhook.N_1|aabbccddeeff", + payload, + ), + ctx, + ) + # Second delivery (Meraki retry) + out2 = await h.handle( + _event( + "sensory.security_alert.meraki_webhook.N_1|aabbccddeeff", + payload, + ), + ctx, + ) + assert out1 is not None and out1.outcome == "logged" + assert out2 is not None and out2.outcome == "skipped" + finally: + await store.close() + + +async def test_security_alert_different_event_types_dont_dedup() -> None: + """Two distinct alert types on the same client are different facts.""" + h = get_handler("security_alert") + store = InMemoryDedupStore() + ctx = ReflexContext(dedup_store=store) + try: + base_payload = { + "clientMac": "aa:bb:cc:dd:ee:ff", + "networkId": "N_1", + "severity": "warning", + } + out1 = await h.handle( + _event( + "sensory.security_alert.meraki_webhook.N_1|aabbccddeeff", + {**base_payload, "alertType": "ids_alerted"}, + ), + ctx, + ) + out2 = await h.handle( + _event( + "sensory.security_alert.meraki_webhook.N_1|aabbccddeeff", + {**base_payload, "alertType": "malware_detected"}, + ), + ctx, + ) + assert out1 is not None and out1.outcome == "logged" + assert out2 is not None and out2.outcome == "logged" + finally: + await store.close() + + # --------------------------------------------------------------------------- # bgp_drop # --------------------------------------------------------------------------- @@ -172,63 +368,99 @@ async def test_security_webhook_falls_back_to_device_serial() -> None: async def test_bgp_drop_composes_session_target() -> None: """device+peer -> ``device|peer`` canonical session id.""" h = get_handler("bgp_drop") - outcome = await h.handle(_event( - "sensory.snmp.trap.bgp_backward_transition.r1", - {"device_id": "r1", "peer": "10.0.1.5", - "peer_asn": 65001, "last_state": "Established"}, - )) + outcome = await h.handle( + _event( + "sensory.bgp_drop.snmp_trap.r1|10.0.1.5", + { + "device_id": "r1", + "peer": "10.0.1.5", + "peer_asn": 65001, + "last_state": "Established", + }, + ), + _empty_ctx(), + ) assert outcome is not None assert outcome.target == "r1|10.0.1.5" assert outcome.severity == "high" assert outcome.payload["peer_asn"] == 65001 assert outcome.payload["last_state"] == "Established" + assert outcome.payload["source"] == "snmp_trap" async def test_bgp_drop_target_falls_back_to_peer_only() -> None: h = get_handler("bgp_drop") - outcome = await h.handle(_event( - "sensory.snmp.trap.bgp_backward_transition.unknown", - {"peer": "10.0.1.5"}, - )) + outcome = await h.handle( + _event( + "sensory.bgp_drop.snmp_trap.unknown|10.0.1.5", + {"peer": "10.0.1.5"}, + ), + _empty_ctx(), + ) assert outcome is not None assert outcome.target == "10.0.1.5" async def test_bgp_drop_target_none_when_no_identifiers() -> None: h = get_handler("bgp_drop") - outcome = await h.handle(_event( - "sensory.snmp.trap.bgp_backward_transition.unknown", - {"last_state": "Established"}, - )) + outcome = await h.handle( + _event( + "sensory.bgp_drop.snmp_trap.unknown", + {"last_state": "Established"}, + ), + _empty_ctx(), + ) assert outcome is not None assert outcome.target is None +async def test_bgp_drop_dedups_trap_and_gnmi_for_same_session() -> None: + h = get_handler("bgp_drop") + store = InMemoryDedupStore() + ctx = ReflexContext(dedup_store=store) + try: + out1 = await h.handle( + _event( + "sensory.bgp_drop.snmp_trap.r1|10.0.1.5", + {"device_id": "r1", "peer": "10.0.1.5"}, + ), + ctx, + ) + # Same session, observed via gNMI a moment later + out2 = await h.handle( + _event( + "sensory.bgp_drop.gnmi_dialout.r1|10.0.1.5", + {"device_id": "r1", "peer": "10.0.1.5"}, + ), + ctx, + ) + assert out1 is not None and out1.outcome == "logged" + assert out2 is not None and out2.outcome == "skipped" + finally: + await store.close() + + # --------------------------------------------------------------------------- # All-handler invariants # --------------------------------------------------------------------------- @pytest.mark.parametrize( - "handler_id", - ["link_down", "security_webhook", "bgp_drop"], + "handler_id,sample_subject", + [ + ("link_down", "sensory.link_down.snmp_trap.test"), + ("security_alert", "sensory.security_alert.meraki_webhook.test"), + ("bgp_drop", "sensory.bgp_drop.snmp_trap.test"), + ], ) async def test_handler_returns_frozen_outcome_with_required_fields( - handler_id: str, + handler_id: str, sample_subject: str, ) -> None: - """Every handler must produce a well-formed :class:`ReflexOutcome`. - - Catches the common bug of forgetting to set ``severity`` or returning - a dict instead of a dataclass. - """ + """Every handler must produce a well-formed :class:`ReflexOutcome`.""" h = get_handler(handler_id) - outcome = await h.handle(_event( - # Use a subject that matches each handler's pattern hierarchically. - # We don't strictly need this — handle() doesn't re-validate the - # subject — but it makes the test inputs realistic. - f"sensory.test.invocation.{handler_id}", - {"target": "test-target"}, - )) + outcome = await h.handle( + _event(sample_subject, {"target": "test-target"}), _empty_ctx() + ) assert outcome is not None assert isinstance(outcome, ReflexOutcome) assert outcome.handler == handler_id diff --git a/tests/reflex/test_registry.py b/tests/reflex/test_registry.py index 00f656a..c50f6af 100644 --- a/tests/reflex/test_registry.py +++ b/tests/reflex/test_registry.py @@ -14,7 +14,7 @@ import pytest from netcortex.contracts.event_bus import EventMessage -from netcortex.reflex.protocol import ReflexOutcome +from netcortex.reflex.protocol import ReflexContext, ReflexOutcome from netcortex.reflex.registry import ( DuplicateHandlerError, all_handlers, @@ -27,11 +27,13 @@ class _StubHandler: """Minimal structural ReflexHandler used by these tests.""" - def __init__(self, hid: str, pattern: str = "test.>") -> None: + def __init__(self, hid: str, pattern: str = "sensory.test.test_src.test") -> None: self.id: Final[str] = hid self.pattern: Final[str] = pattern - async def handle(self, event: EventMessage) -> ReflexOutcome | None: + async def handle( + self, event: EventMessage, ctx: ReflexContext + ) -> ReflexOutcome | None: return ReflexOutcome( handler=self.id, subject=event.subject, @@ -63,7 +65,6 @@ def _isolated_registry(): def test_register_and_lookup() -> None: h = _StubHandler("alpha") returned = register_handler(h) - # Decorator-style usage returns the handler unchanged. assert returned is h assert get_handler("alpha") is h assert list(all_handlers()) == [h] @@ -81,19 +82,11 @@ def test_register_duplicate_id_raises() -> None: register_handler(_StubHandler("dup")) with pytest.raises(DuplicateHandlerError) as ei: register_handler(_StubHandler("dup")) - # Error message names BOTH handler classes so the operator can find - # the offending file quickly. We do not pin the exact message; only - # that the registered id appears in it. assert "dup" in str(ei.value) def test_register_same_instance_is_idempotent() -> None: - """Re-registering the exact same instance is a no-op. - - Modules can be imported twice in the same process (test suite - isolation, plugin auto-discovery), so the registry treats an - identical re-register as the harmless event it is. - """ + """Re-registering the exact same instance is a no-op.""" h = _StubHandler("same") register_handler(h) register_handler(h) @@ -104,7 +97,6 @@ def test_register_rejects_non_handler() -> None: """Registration is type-checked at registration time.""" class NotAHandler: - # Missing handle(), pattern, id. pass with pytest.raises(TypeError): diff --git a/tests/reflex/test_runner.py b/tests/reflex/test_runner.py index 01bb033..4c53cc7 100644 --- a/tests/reflex/test_runner.py +++ b/tests/reflex/test_runner.py @@ -5,6 +5,12 @@ same Protocol, so passing here means the runner will behave identically against the production NATS backend (verified by the contract suite that NATS satisfies that Protocol). + +In dev3 the runner threads a :class:`ReflexContext` to every handler +call. These tests pin the wiring: default-context path (no context +supplied → empty context), explicit-context path (with a dedup store +that the handler can consult), and the unchanged failure-isolation / +lifecycle behavior. """ from __future__ import annotations @@ -16,8 +22,9 @@ import pytest from netcortex.contracts.event_bus import EventBus, EventMessage -from netcortex.reflex.protocol import ReflexOutcome +from netcortex.reflex.protocol import ReflexContext, ReflexOutcome from netcortex.reflex.runner import ReflexRunner +from netcortex.working.dedup import InMemoryDedupStore from tests.contracts.event_bus.in_memory import InMemoryEventBus pytestmark = pytest.mark.asyncio @@ -28,15 +35,23 @@ def _bus() -> EventBus: class _RecordingHandler: - """Handler that records every event it sees for test introspection.""" + """Handler that records every event it sees for test introspection. + + Also records the context it received so tests can confirm the runner + threaded a non-default context through. + """ def __init__(self, hid: str, pattern: str) -> None: self.id: Final[str] = hid self.pattern: Final[str] = pattern self.seen: list[EventMessage] = [] + self.contexts: list[ReflexContext] = [] - async def handle(self, event: EventMessage) -> ReflexOutcome | None: + async def handle( + self, event: EventMessage, ctx: ReflexContext + ) -> ReflexOutcome | None: self.seen.append(event) + self.contexts.append(ctx) return ReflexOutcome( handler=self.id, subject=event.subject, @@ -51,9 +66,11 @@ class _RaisingHandler: """Handler that raises — used to verify per-handler isolation.""" id: Final[str] = "boom" - pattern: Final[str] = "sensory.boom.>" + pattern: Final[str] = "sensory.link_down.test_src.boom" - async def handle(self, event: EventMessage) -> ReflexOutcome | None: + async def handle( + self, event: EventMessage, ctx: ReflexContext + ) -> ReflexOutcome | None: raise RuntimeError("simulated handler failure") @@ -61,9 +78,11 @@ class _SkippingHandler: """Handler that returns ``None`` — used to verify None-skips-recording.""" id: Final[str] = "skip" - pattern: Final[str] = "sensory.skip.>" + pattern: Final[str] = "sensory.link_up.test_src.skip" - async def handle(self, event: EventMessage) -> ReflexOutcome | None: + async def handle( + self, event: EventMessage, ctx: ReflexContext + ) -> ReflexOutcome | None: return None @@ -79,13 +98,13 @@ async def _wait_for(predicate: Any, timeout: float = 1.5) -> None: async def test_dispatches_event_to_matching_handler() -> None: bus = _bus() - handler = _RecordingHandler("link_down", "sensory.snmp.trap.link_down.>") + handler = _RecordingHandler("link_down", "sensory.link_down.>") runner = ReflexRunner(bus, handlers=[handler]) await runner.start() try: - await asyncio.sleep(0.05) # let subscription register + await asyncio.sleep(0.05) await bus.publish( - "sensory.snmp.trap.link_down.r1", + "sensory.link_down.snmp_trap.r1|Gi0/1", {"interface": "Gi0/1", "target": "r1"}, ) await _wait_for(lambda: len(handler.seen) == 1) @@ -93,7 +112,7 @@ async def test_dispatches_event_to_matching_handler() -> None: await runner.stop() await bus.close() - assert handler.seen[0].subject == "sensory.snmp.trap.link_down.r1" + assert handler.seen[0].subject == "sensory.link_down.snmp_trap.r1|Gi0/1" assert handler.seen[0].payload == {"interface": "Gi0/1", "target": "r1"} assert len(runner.outcomes) == 1 outcome = runner.outcomes[0] @@ -103,16 +122,14 @@ async def test_dispatches_event_to_matching_handler() -> None: async def test_pattern_filters_out_non_matching_events() -> None: bus = _bus() - handler = _RecordingHandler("only_a", "sensory.a.>") + handler = _RecordingHandler("only_link_down", "sensory.link_down.>") runner = ReflexRunner(bus, handlers=[handler]) await runner.start() try: await asyncio.sleep(0.05) - await bus.publish("sensory.a.event", {"i": 0}) - await bus.publish("sensory.b.event", {"i": 1}) # must NOT match - await bus.publish("sensory.a.deeper.event", {"i": 2}) - # Give the bus time to deliver any non-matching events too (and - # have them be filtered out, not silently delivered). + await bus.publish("sensory.link_down.snmp_trap.r1", {"i": 0}) + await bus.publish("sensory.link_up.snmp_trap.r1", {"i": 1}) # not match + await bus.publish("sensory.link_down.meraki_webhook.r2", {"i": 2}) await _wait_for(lambda: len(handler.seen) == 2) finally: await runner.stop() @@ -124,13 +141,13 @@ async def test_pattern_filters_out_non_matching_events() -> None: async def test_multiple_handlers_fan_out() -> None: """One event whose subject matches two handlers reaches both.""" bus = _bus() - a = _RecordingHandler("a", "sensory.shared.>") - b = _RecordingHandler("b", "sensory.shared.>") + a = _RecordingHandler("a", "sensory.link_down.>") + b = _RecordingHandler("b", "sensory.link_down.>") runner = ReflexRunner(bus, handlers=[a, b]) await runner.start() try: await asyncio.sleep(0.05) - await bus.publish("sensory.shared.event", {"i": 0}) + await bus.publish("sensory.link_down.snmp_trap.r1", {"i": 0}) await _wait_for(lambda: len(a.seen) == 1 and len(b.seen) == 1) finally: await runner.stop() @@ -145,8 +162,8 @@ async def test_handler_exception_does_not_kill_dispatcher() -> None: await runner.start() try: await asyncio.sleep(0.05) - await bus.publish("sensory.boom.event", {"n": 1}) - await bus.publish("sensory.boom.event", {"n": 2}) + await bus.publish("sensory.link_down.test_src.boom", {"n": 1}) + await bus.publish("sensory.link_down.test_src.boom", {"n": 2}) await _wait_for(lambda: len(runner.outcomes) == 2) finally: await runner.stop() @@ -154,7 +171,6 @@ async def test_handler_exception_does_not_kill_dispatcher() -> None: assert all(o.outcome == "errored" for o in runner.outcomes) assert all("RuntimeError" in o.rationale for o in runner.outcomes) - # Diagnostic carries a traceback for debugging. assert "traceback" in runner.outcomes[0].diagnostic assert ( "simulated handler failure" in runner.outcomes[0].diagnostic["traceback"] @@ -169,9 +185,7 @@ async def test_none_outcome_is_not_recorded() -> None: await runner.start() try: await asyncio.sleep(0.05) - await bus.publish("sensory.skip.event", {}) - # Wait long enough that, if an outcome were going to be recorded, - # it would have been. Then assert it was not. + await bus.publish("sensory.link_up.test_src.skip", {}) await asyncio.sleep(0.2) finally: await runner.stop() @@ -182,12 +196,11 @@ async def test_none_outcome_is_not_recorded() -> None: async def test_start_is_idempotent() -> None: bus = _bus() - handler = _RecordingHandler("x", "sensory.x.>") + handler = _RecordingHandler("x", "sensory.link_down.>") runner = ReflexRunner(bus, handlers=[handler]) await runner.start() - await runner.start() # second call must be a no-op + await runner.start() try: - # The handler list is one task, not two. assert len([t for t in runner._tasks if not t.done()]) == 1 # noqa: SLF001 finally: await runner.stop() @@ -197,18 +210,18 @@ async def test_start_is_idempotent() -> None: async def test_stop_is_idempotent() -> None: bus = _bus() runner = ReflexRunner( - bus, handlers=[_RecordingHandler("x", "sensory.x.>")] + bus, handlers=[_RecordingHandler("x", "sensory.link_down.>")] ) await runner.start() await runner.stop() - await runner.stop() # second call must be a no-op + await runner.stop() await bus.close() async def test_stop_without_start_is_safe() -> None: """Calling stop on a runner that never started is a no-op.""" bus = _bus() - runner = ReflexRunner(bus, handlers=[_RecordingHandler("x", "sensory.x.>")]) + runner = ReflexRunner(bus, handlers=[_RecordingHandler("x", "sensory.link_down.>")]) await runner.stop() await bus.close() @@ -221,13 +234,11 @@ async def test_runner_enumerates_registry_when_no_handlers_given() -> None: register_handler, ) - # Snapshot the production registry so we don't leak the cleared state - # to sibling test files that depend on the first-party handlers. snapshot = list(all_handlers()) clear_registry() try: - a = _RecordingHandler("reg-a", "sensory.reg.>") - b = _RecordingHandler("reg-b", "sensory.reg.>") + a = _RecordingHandler("reg-a", "sensory.link_down.>") + b = _RecordingHandler("reg-b", "sensory.link_up.>") register_handler(a) register_handler(b) bus = _bus() @@ -239,3 +250,47 @@ async def test_runner_enumerates_registry_when_no_handlers_given() -> None: clear_registry() for h in snapshot: register_handler(h) + + +# --------------------------------------------------------------------------- +# ReflexContext threading (dev3) +# --------------------------------------------------------------------------- + + +async def test_default_context_has_no_dedup_store() -> None: + """Runners built without an explicit context still pass one through.""" + bus = _bus() + handler = _RecordingHandler("ctx_default", "sensory.link_down.>") + runner = ReflexRunner(bus, handlers=[handler]) + await runner.start() + try: + await asyncio.sleep(0.05) + await bus.publish("sensory.link_down.snmp_trap.r1", {}) + await _wait_for(lambda: len(handler.contexts) == 1) + finally: + await runner.stop() + await bus.close() + + assert isinstance(handler.contexts[0], ReflexContext) + assert handler.contexts[0].dedup_store is None + + +async def test_explicit_context_is_threaded_to_handler() -> None: + """A context passed to ReflexRunner reaches every handle() call.""" + bus = _bus() + handler = _RecordingHandler("ctx_explicit", "sensory.link_down.>") + store = InMemoryDedupStore() + ctx = ReflexContext(dedup_store=store) + runner = ReflexRunner(bus, handlers=[handler], context=ctx) + try: + assert runner.context is ctx + await runner.start() + await asyncio.sleep(0.05) + await bus.publish("sensory.link_down.snmp_trap.r1", {}) + await _wait_for(lambda: len(handler.contexts) == 1) + assert handler.contexts[0] is ctx + assert handler.contexts[0].dedup_store is store + finally: + await runner.stop() + await store.close() + await bus.close() diff --git a/tests/working/__init__.py b/tests/working/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/working/dedup/__init__.py b/tests/working/dedup/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/working/dedup/test_in_memory.py b/tests/working/dedup/test_in_memory.py new file mode 100644 index 0000000..2e9cdbf --- /dev/null +++ b/tests/working/dedup/test_in_memory.py @@ -0,0 +1,149 @@ +"""Implementation-specific tests for :class:`InMemoryDedupStore`. + +The Protocol-level behavior (atomic check-and-record, TTL expiry, +empty-key rejection, etc.) is covered by the contract suite in +``tests/contracts/dedup_store/``. This module covers the bits that are +**specific** to the in-memory implementation: LRU eviction at the size +cap, lazy expired-entry sweep, controllable-clock fast-forward, the +constructor validation, and the ``size()`` helper. + +The use of a controllable monotonic clock is the key idea: we don't want +unit tests to depend on real-clock ``asyncio.sleep`` for the dedup-window +logic (slow + flaky), so the store accepts a ``clock`` callable that the +tests advance manually. +""" + +from __future__ import annotations + +import pytest + +from netcortex.working.dedup import InMemoryDedupStore + +pytestmark = pytest.mark.asyncio + + +class _FakeClock: + """Monotonically-non-decreasing time source we drive from the test.""" + + def __init__(self, start: float = 1000.0) -> None: + self.now = start + + def __call__(self) -> float: + return self.now + + def advance(self, seconds: float) -> None: + if seconds < 0: + raise ValueError("clock must not move backward") + self.now += seconds + + +async def test_constructor_rejects_non_positive_max_entries() -> None: + with pytest.raises(ValueError): + InMemoryDedupStore(max_entries=0) + with pytest.raises(ValueError): + InMemoryDedupStore(max_entries=-5) + + +async def test_size_starts_at_zero_and_tracks_inserts() -> None: + store = InMemoryDedupStore() + try: + assert store.size() == 0 + await store.record_unless_duplicate("a", ttl_seconds=10) + await store.record_unless_duplicate("b", ttl_seconds=10) + assert store.size() == 2 + # Duplicate insert does not grow size. + await store.record_unless_duplicate("a", ttl_seconds=10) + assert store.size() == 2 + finally: + await store.close() + + +async def test_fake_clock_drives_ttl_expiry_without_real_sleep() -> None: + """Whole point of accepting a clock parameter — fast deterministic tests.""" + clk = _FakeClock() + store = InMemoryDedupStore(clock=clk) + try: + assert await store.record_unless_duplicate("k", ttl_seconds=60) is True + clk.advance(30) + # Still within TTL — dup. + assert await store.record_unless_duplicate("k", ttl_seconds=60) is False + clk.advance(31) + # Now past TTL — fresh again. + assert await store.record_unless_duplicate("k", ttl_seconds=60) is True + finally: + await store.close() + + +async def test_lru_eviction_when_cap_exceeded() -> None: + """At capacity, the LRU entry is evicted to make room for a new one. + + The "least recently used" definition includes duplicate hits: a key + that gets hit (even as a dup) is moved to the tail. This test checks + that touching ``oldest`` mid-flow rescues it from eviction. + """ + clk = _FakeClock() + store = InMemoryDedupStore(max_entries=3, clock=clk) + try: + await store.record_unless_duplicate("oldest", ttl_seconds=1000) + await store.record_unless_duplicate("middle", ttl_seconds=1000) + await store.record_unless_duplicate("newest", ttl_seconds=1000) + assert store.size() == 3 + + # Touch "oldest" — dup hit — should move it to the tail. + assert ( + await store.record_unless_duplicate("oldest", ttl_seconds=1000) + is False + ) + + # Insert a fourth — should evict "middle" (now the LRU), not "oldest". + await store.record_unless_duplicate("fourth", ttl_seconds=1000) + assert store.size() == 3 + + # "middle" gone — a fresh insert should succeed. + assert ( + await store.record_unless_duplicate("middle", ttl_seconds=1000) + is True + ) + # "oldest" should still be deduped (was rescued). + assert ( + await store.record_unless_duplicate("oldest", ttl_seconds=1000) + is False + ) + finally: + await store.close() + + +async def test_expired_entries_are_swept_lazily() -> None: + """Stale head-of-LRU entries are reclaimed when new inserts happen. + + We don't promise eager cleanup — the sweep runs when ``record`` is + called, bounded so worst-case latency stays predictable. After many + inserts, expired-and-stale entries should be gone. + """ + clk = _FakeClock() + store = InMemoryDedupStore(clock=clk) + try: + # Plant some short-TTL keys, then age them all out. + for i in range(10): + await store.record_unless_duplicate(f"old{i}", ttl_seconds=1.0) + assert store.size() == 10 + + clk.advance(60) + + # New inserts trigger the sweep; the old keys should drop out + # after enough inserts to exhaust the per-call sweep budget. + for i in range(40): + await store.record_unless_duplicate(f"new{i}", ttl_seconds=10) + + # Only "new*" keys remain (40 of them, all within TTL). + assert store.size() == 40 + finally: + await store.close() + + +async def test_close_clears_state() -> None: + store = InMemoryDedupStore() + await store.record_unless_duplicate("k1", ttl_seconds=10) + assert store.size() == 1 + await store.close() + assert store.size() == 0