From 41f274f9b605b31c28b72a4266ef785a74beb704 Mon Sep 17 00:00:00 2001 From: Steve NCA Date: Mon, 1 Jun 2026 14:32:13 +0000 Subject: [PATCH] =?UTF-8?q?feat(taxonomy+dedup):=200.8.0-dev3=20=E2=80=94?= =?UTF-8?q?=20subject=20taxonomy,=20DedupStore,=20ReflexContext?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Third sub-step of the brain refactor. Locks in two things every later sub-step depends on: the NATS SUBJECT TAXONOMY and the DEDUP CONTRACT. No new publishers and no production behavior change yet — first publisher lands in 0.8.0-dev4. This PR is a pure refactor so the foundations can be reviewed in isolation. SUBJECT TAXONOMY docs/architecture/subjects.md is the authoritative spec. Machine- readable constants in netcortex/contracts/subjects.py: * SENSORY_EVENT_CLASSES — closed vocabulary (link_down, link_up, bgp_drop, bgp_up, device_reboot, device_unreachable, device_reachable, security_alert, config_change, topology_change, route_advertisement_change). Adding a class requires doc + constant change in the same PR. * SENSORY_SOURCES — _ tokens (snmp_trap, snmp_poll, meraki_webhook, gnmi_dialout, ...). * sensory_subject(class, source, *target_parts) — validated builder. * parse_sensory_subject(subject) — handlers use this to derive the fact_key. Why event-class-first: sensory... lets one handler subscribe to sensory.link_down.> and catch every source. The earlier modality-first ordering forced per-source subscriptions and made same-event-multi-source dedup awkward. DEDUPSTORE PROTOCOL + INMEMORYDEDUPSTORE netcortex/contracts/dedup_store.py defines atomic check-and-record. netcortex/working/dedup/in_memory.py ships the only 0.8.0 impl: * Asyncio-safe (lock-protected mutations). * TTL-bounded with lazy expired-entry sweep (bounded budget). * Size-bounded with LRU eviction. * Injectable clock for fast deterministic unit tests. Redis-backed impl lands in 0.9.0 alongside working memory. Contract tests (9 cases) parametrize over every registered implementation; Redis only needs a factory + registry row to gain full coverage. REFLEXCONTEXT (HANDLER DEPENDENCY INJECTION) netcortex.reflex.protocol.ReflexContext is the runtime dependency bag every handler receives on handle(event, ctx). Frozen dataclass, all fields optional, new resources added by appending fields so old handlers are unaffected. * ctx.dedup_store: DedupStore | None — 0.8.0 * future: semantic_memory, working_memory, policy_engine ReflexRunner owns one ReflexContext (default-constructed if not supplied) and threads it through every dispatch. HANDLER REFACTOR — new patterns + dedup logic Handler | Old pattern (dev2) | New pattern (dev3) | Window -----------------|-------------------------------------------------|---------------------------------|-------- link_down | sensory.snmp.trap.link_down.> | sensory.link_down.> | 60s bgp_drop | sensory.snmp.trap.bgp_backward_transition.> | sensory.bgp_drop.> | 60s security_alert | sensory.meraki.webhook.security.> | sensory.security_alert.> | 300s Renamed security_webhook → security_alert because the new handler is source-agnostic (Meraki today, Cisco AMP / future SIEM tomorrow). File moved to security_alert.py via git mv (rename preserved). Each handler now constructs fact_key = "|" (plus event_type for security_alert) and consults ctx.dedup_store when present. Duplicates return outcome="skipped" with rationale; first arrivals return outcome="logged" as before. Severity demoted to info on skipped outcomes — they are corroboration telemetry, not a second incident. Known limitation in 0.8.0: real flap (down/up/down within one window) collapses to a single fact. Fusion stage in 0.9.0 will track state transitions explicitly. See subjects.md. EVENT BUS GRAMMAR FIX Both InMemoryEventBus and NatsEventBus had an overly-strict subject validator (^[A-Za-z0-9_-]+(\.[A-Za-z0-9_-]+)*$) that rejected real-world identifier characters (':' in MACs, '|' in compound targets, '/' in interface names). Widened to match the actual NATS grammar: ^[^.\s*>]+(\.[^.\s*>]+)*$ — any printable except '.', whitespace, '*', '>'. Existing rejection cases (wildcards, empty tokens, whitespace) still rejected. TESTS * tests/contracts/dedup_store/test_dedup_store_contract.py — 9 cases parametrized over every store impl: atomicity-under-concurrency, TTL expiry, empty-key rejection, non-positive-TTL rejection, close idempotency, use-after-close raises. * tests/contracts/test_subjects.py — 13 cases for builders, parser, vocabulary integrity. * tests/working/dedup/test_in_memory.py — 6 cases for in-memory store specifics (LRU eviction, lazy sweep, fake clock, ctor validation, close clears state). * tests/reflex/test_handlers.py — new patterns + 5 new dedup cases (cross-source link_down, different-target independence, missing-target skip-dedup, Meraki retry dedup, distinct-event- type-no-dedup, trap+gnmi dedup for bgp_drop). * tests/reflex/test_runner.py — new signature + 2 new cases (default-context wiring, explicit-context threading). * tests/reflex/test_registry.py — updated stub signature. End-to-end smoke: 6 publishes across 3 unique facts → 3 logged + 3 skipped outcomes. Same result expected against real NATS via the contract suite. BREAKING — pre-release only * ReflexHandler.handle(event) → ReflexHandler.handle(event, ctx). The three first-party handlers updated; no external consumers. * Handler id security_webhook → security_alert. dev2 release never persisted these to anywhere stable, so rename is cost-free. NOT YET WIRED * No publishers — pollers still call correlator + writeback directly. First dual-write publisher lands in 0.8.0-dev4. * Outcomes are logged only — Neo4j :ReflexEvent persistence + NetBox journal mirror also land in 0.8.0-dev4. Co-authored-by: Cursor --- CHANGELOG.md | 133 ++++++ docs/architecture/subjects.md | 177 ++++++++ netcortex/__init__.py | 2 +- netcortex/contracts/__init__.py | 2 + netcortex/contracts/dedup_store.py | 91 ++++ netcortex/contracts/subjects.py | 206 +++++++++ netcortex/reflex/__init__.py | 3 +- netcortex/reflex/handlers/__init__.py | 2 +- netcortex/reflex/handlers/bgp_drop.py | 103 +++-- netcortex/reflex/handlers/link_down.py | 122 ++++-- netcortex/reflex/handlers/security_alert.py | 129 ++++++ netcortex/reflex/handlers/security_webhook.py | 88 ---- netcortex/reflex/protocol.py | 54 ++- netcortex/reflex/runner.py | 15 +- netcortex/thalamus/nats_bus.py | 11 +- netcortex/working/__init__.py | 23 + netcortex/working/dedup/__init__.py | 10 + netcortex/working/dedup/in_memory.py | 155 +++++++ pyproject.toml | 2 +- tests/contracts/conftest.py | 32 +- tests/contracts/dedup_store/__init__.py | 0 .../dedup_store/test_dedup_store_contract.py | 138 ++++++ tests/contracts/event_bus/in_memory.py | 14 +- tests/contracts/test_subjects.py | 140 ++++++ tests/reflex/test_handlers.py | 398 ++++++++++++++---- tests/reflex/test_registry.py | 20 +- tests/reflex/test_runner.py | 127 ++++-- tests/working/__init__.py | 0 tests/working/dedup/__init__.py | 0 tests/working/dedup/test_in_memory.py | 149 +++++++ 30 files changed, 2028 insertions(+), 318 deletions(-) create mode 100644 docs/architecture/subjects.md create mode 100644 netcortex/contracts/dedup_store.py create mode 100644 netcortex/contracts/subjects.py create mode 100644 netcortex/reflex/handlers/security_alert.py delete mode 100644 netcortex/reflex/handlers/security_webhook.py create mode 100644 netcortex/working/__init__.py create mode 100644 netcortex/working/dedup/__init__.py create mode 100644 netcortex/working/dedup/in_memory.py create mode 100644 tests/contracts/dedup_store/__init__.py create mode 100644 tests/contracts/dedup_store/test_dedup_store_contract.py create mode 100644 tests/contracts/test_subjects.py create mode 100644 tests/working/__init__.py create mode 100644 tests/working/dedup/__init__.py create mode 100644 tests/working/dedup/test_in_memory.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 7fdbd3d..9b243c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,139 @@ and this file MUST be updated together whenever `__version__` changes. --- +## [0.8.0-dev3] — 2026-06-01 + +### Added — Subject taxonomy + `DedupStore` + `ReflexContext` (foundation for multi-source sensing) + +Third sub-step of the brain refactor. Locks in two things that every +subsequent sub-step depends on: the **NATS subject taxonomy** and the +**dedup contract**. No new publishers and no behavior change in +production yet — that lands in `0.8.0-dev4`. This PR is intentionally a +pure refactor so the foundations can be reviewed in isolation. + +#### Subject taxonomy + +`docs/architecture/subjects.md` is the new authoritative spec. +Companion machine-readable constants in +`netcortex/contracts/subjects.py`: + +* `SENSORY_EVENT_CLASSES` — closed vocabulary (`link_down`, `link_up`, + `bgp_drop`, `bgp_up`, `device_reboot`, `device_unreachable`, + `device_reachable`, `security_alert`, `config_change`, + `topology_change`, `route_advertisement_change`). Adding a class + requires a doc + constant change in the same PR. +* `SENSORY_SOURCES` — `_` tokens + (`snmp_trap`, `snmp_poll`, `meraki_webhook`, `gnmi_dialout`, …). +* `sensory_subject(event_class, source, *target_parts)` — validated + builder. Refuses unknown classes/sources, empty parts, embedded + dots, whitespace. +* `parse_sensory_subject(subject)` — inverse extractor; handlers use + it to derive the `fact_key` from the incoming subject. + +**Why event-class-first**: `sensory...` +lets a single handler subscribe to `sensory.link_down.>` and catch +every source of link-down. The earlier `sensory....` +ordering forced per-source subscriptions and made the +"same-event-multiple-sources" dedup story awkward. + +#### `DedupStore` Protocol + `InMemoryDedupStore` + +`netcortex/contracts/dedup_store.py` defines the atomic check-and-record +contract. `netcortex/working/dedup/in_memory.py` ships the only 0.8.0 +implementation: + +* Asyncio-safe (lock-protected mutations). +* TTL-bounded with lazy expired-entry sweep (bounded budget per call + so tail latency is predictable). +* Size-bounded with LRU eviction so a misbehaving publisher cannot OOM. +* Accepts an injectable clock for fast deterministic unit tests. +* Cap warnings and long-TTL warnings on long-lived state that would + mask flap behavior. + +Redis-backed implementation lands in 0.9.0 alongside working memory. +Contract tests (9 cases) parametrize over every registered +implementation — Redis only needs a factory function and a registry +row to gain full coverage when it arrives. + +#### `ReflexContext` (handler dependency injection) + +`netcortex.reflex.protocol.ReflexContext` is the runtime dependency +bag every handler receives on `handle(event, ctx)`. Frozen dataclass, +all fields optional, new resources added by appending fields so old +handlers are unaffected. + +* `ctx.dedup_store: DedupStore | None` — 0.8.0 +* `ctx.semantic_memory`, `ctx.working_memory`, `ctx.policy_engine`, + … — appended in later releases + +The `ReflexRunner` owns one `ReflexContext` (default-constructed if +not supplied) and threads it through every dispatch. Existing failure- +isolation and lifecycle behavior unchanged. + +#### Handler refactor — new patterns + dedup logic + +| Handler | Old pattern (dev2) | New pattern (dev3) | Dedup window | +|---|---|---|---| +| `link_down` | `sensory.snmp.trap.link_down.>` | `sensory.link_down.>` | 60 s | +| `bgp_drop` | `sensory.snmp.trap.bgp_backward_transition.>` | `sensory.bgp_drop.>` | 60 s | +| `security_alert` (renamed from `security_webhook`) | `sensory.meraki.webhook.security.>` | `sensory.security_alert.>` | 300 s | + +Renaming `security_webhook` → `security_alert` because the new handler +is source-agnostic (Meraki today, Cisco AMP / future SIEM tomorrow); +the file moved from `security_webhook.py` → `security_alert.py`. The +operator-facing handler id is renamed accordingly. + +Each handler now constructs a `fact_key = "|"` +(plus `event_type` for `security_alert`) and consults `ctx.dedup_store` +when present. Duplicates return `outcome="skipped"` with the dedup +rationale; first arrivals return `outcome="logged"` as before. Severity +is intentionally demoted to `info` on skipped outcomes — they are +corroboration telemetry, not a second incident. + +**Known limitation in 0.8.0**: a real flap (down/up/down within one +window) collapses to a single fact. Tracked: the fusion stage in +0.9.0 handles state transitions explicitly. See subjects.md "Dedup +model" section. + +#### Tests + +* `tests/contracts/dedup_store/test_dedup_store_contract.py` — 9 cases + parametrized over every registered store implementation + (atomicity-under-concurrency, TTL expiry, empty-key rejection, + non-positive-TTL rejection, close idempotency, use-after-close raises). +* `tests/contracts/test_subjects.py` — 13 cases for the taxonomy + builders, parser, vocabulary integrity checks. +* `tests/working/dedup/test_in_memory.py` — 6 cases for the in-memory + store specifics (LRU eviction, lazy sweep, fake clock, ctor + validation, close clears state). +* `tests/reflex/test_handlers.py` — updated for new patterns + 5 new + dedup cases (cross-source dedup for `link_down`, different-target + independence, missing-target skip-dedup, Meraki retry dedup for + `security_alert`, distinct-event-type-no-dedup, trap+gnmi dedup for + `bgp_drop`). +* `tests/reflex/test_runner.py` — updated for new signature + 2 new + cases (default-context wiring, explicit-context threading). +* `tests/reflex/test_registry.py` — updated for new handler signature + on the stub. + +### Breaking — pre-release only + +* `ReflexHandler.handle(event)` → `ReflexHandler.handle(event, ctx)` — + every handler implementation must take the context. The three + first-party handlers were updated in this PR; no external consumers + exist yet. +* Handler ids: `security_webhook` → `security_alert`. The dev2 release + never persisted these to anywhere stable, so this rename is + cost-free; future renames after publishers exist will require the + dual-publish dance described in subjects.md. + +### Not yet wired + +* Still no publishers. Pollers continue to call correlator + writeback + directly. The first dual-write publisher lands in `0.8.0-dev4`. +* Outcomes are logged only — Neo4j `:ReflexEvent` persistence + NetBox + journal mirror also land in `0.8.0-dev4`. + ## [0.8.0-dev2] — 2026-06-01 ### Added — Reflex skeleton: registry + runner + 3 first-party handlers diff --git a/docs/architecture/subjects.md b/docs/architecture/subjects.md new file mode 100644 index 0000000..564bb1a --- /dev/null +++ b/docs/architecture/subjects.md @@ -0,0 +1,177 @@ +# NATS Subject Taxonomy + +The bus (Thalamus, see [`brain.md`](./brain.md)) carries every observation, +every derived fact, every reflex outcome, and every motor action through a +single shared subject namespace. The shape of that namespace determines +how easily handlers can subscribe to **all sources of one thing** versus +**all things from one source** — those two needs pull in different +directions, and the taxonomy below is what we landed on. + +## Top-level namespaces + +| Namespace | Who publishes | Who subscribes | Lifetime | +|---|---|---|---| +| `sensory.>` | Receivers (webhook / trap / telemetry) and pollers (SNMP, Meraki API, etc.) | Resolver (Stage 2), fusion (Stage 3), reflex (Stage 4 fallback), episodic memory | Raw observations, retained briefly in JetStream | +| `fact.>` (0.9.0+) | Fusion stage, after dedup + corroboration | Reflex, working memory, semantic memory | Derived facts, retained longer | +| `reflex.>` | Reflex handlers | UI, episodic memory, downstream agents | Handler outcomes | +| `motor.>` (future) | Conductor / agents | Action executors, audit, semantic memory | Outbound actions | +| `consolidation.>` (future) | Consolidation cycles | Semantic memory, pattern memory | Bulk derived state | + +Anything published outside these four namespaces will be rejected by the +receiver-side validators once those are in place; for now (0.8.0) it is +strongly discouraged. + +## `sensory...` + +Event-class first. This is the **only** ordering that lets a single +handler subscribe to "all link-down observations regardless of where +they came from" with a NATS wildcard. + +### `` — a closed vocabulary + +| Class | Meaning | Typical sources | +|---|---|---| +| `link_down` | An interface transitioned to operationally-down | SNMP linkDown trap, SNMP poll diff, Meraki webhook, gNMI dial-out | +| `link_up` | An interface transitioned to operationally-up | same | +| `bgp_drop` | A BGP session entered a non-Established state | BGP4-MIB trap, CISCO-BGP4-MIB trap, gNMI BGP neighbor sample | +| `bgp_up` | A BGP session entered Established | same | +| `device_reboot` | A device restarted | coldStart trap, sysUpTime reset on poll, Meraki status webhook | +| `device_unreachable` | A device stopped responding to our reachability probes | poller probe failure, Meraki status webhook | +| `device_reachable` | The inverse | same | +| `security_alert` | A security-class event observed | Meraki webhook (IDS/malware/blocked-URL), Cisco AMP webhook, future SIEM | +| `config_change` | A device configuration changed | Meraki webhook, NETCONF change notification, RANCID-style diff | +| `topology_change` | A new neighbor appeared or an existing one disappeared | LLDP/CDP poll diff | +| `route_advertisement_change` | Advertised prefix set changed on a session | BGP RIB poll diff | + +New classes are added by amending this table **and** the +`SENSORY_EVENT_CLASSES` constant in `netcortex/contracts/event_bus.py` +in the same PR. Reviewers reject PRs that grow one without the other. + +### `` — `_` joined as one token + +Why one token: NATS wildcards match exactly one token (`*`) or trailing +greedy (`>`). Compound sources as separate tokens would make +"all snmp sources" subscriptions require multiple subscriptions. + +| Modality | Source token | Notes | +|---|---|---| +| SNMP | `snmp_trap`, `snmp_poll`, `snmp_walk` | `_walk` reserved for bulk MIB walks distinct from per-OID polls | +| HTTP webhook | `meraki_webhook`, `thousandeyes_webhook`, `cisco_amp_webhook`, `catalyst_center_webhook` | Source names match the upstream platform | +| Streaming telemetry | `gnmi_dialout`, `gnmi_dialin`, `netconf_yangpush`, `cisco_mdt` | | +| API poll | `meraki_api`, `intersight_api`, `vsphere_api`, `fmc_api`, `nexus_dashboard_api` | One per platform adapter | +| Synthetic | `netcortex_inference` | When the system itself derives an observation (rare; prefer `fact.*`) | + +### `` — canonical identifiers, dot-separated tokens + +Targets are the entities the event is about. Multi-part keys use `|` +(URL-safe, unambiguous) **within** a token, dots between tokens: + +| Event class | Target shape | Example | +|---|---|---| +| `link_down`, `link_up` | `\|` | `sensory.link_down.snmp_trap.cpn-ful-cat9k1\|Gi1/0/12` | +| `bgp_drop`, `bgp_up` | `\|` | `sensory.bgp_drop.gnmi_dialout.cpn-ful-cat8k1\|10.0.1.5` | +| `device_*` | `` | `sensory.device_reboot.snmp_trap.cpn-ful-cat9k1` | +| `security_alert` | `\|` | `sensory.security_alert.meraki_webhook.N_1\|aa:bb:cc:dd:ee:ff` | +| `config_change` | `` or `\|
` | `sensory.config_change.meraki_webhook.Q2XX-YYYY-ZZZZ` | +| `topology_change` | `\|\|\|` | `sensory.topology_change.snmp_poll.r1\|Gi0/1\|r2\|Gi0/24` | +| `route_advertisement_change` | `\|` | `sensory.route_advertisement_change.snmp_poll.r1\|10.0.1.5` | + +**Canonicalization happens before publish.** Receivers publish with the +identifier shape they observed (raw IP, MAC, serial, ifIndex). The +resolver stage (0.9.0+) canonicalizes to NetBox-blessed names by querying +semantic memory and re-publishes under the same subject with the +canonical target. Until then (0.8.0), best-effort canonicalization +happens inline in the receiver; un-canonicalized observations still pass +through, they just may dedup imperfectly. + +## `fact..` (lands in 0.9.0) + +After the fusion stage dedups same-event-different-source observations +within a per-class time window, the surviving fact is republished under +`fact.>`. Reflex handlers eventually move from `sensory.*` subscription +to `fact.*` subscription, gaining: + +* one-fire-per-real-event semantics for free (no per-handler dedup code) +* corroboration metadata (which sources observed it) +* identity already canonicalized + +## `reflex..` (future) + +Reflex handlers may emit their own events back to the bus so other +consumers (UI live tail, downstream automation, episodic memory) can +react. Not used in 0.8.0-dev3 but the namespace is reserved. + +Examples: + +* `reflex.link_down.applied.cpn-ful-cat9k1|Gi1/0/12` +* `reflex.bgp_drop.skipped.cpn-ful-cat8k1|10.0.1.5` (skipped because deduped) +* `reflex.security_webhook.errored.N_1|aa:bb:cc:dd:ee:ff` + +## `motor...` (future) + +Reserved for actuation. Not in scope until the policy library + write-gate +land. + +## Dedup model (0.8.0) + +Same-event-multiple-sources is handled at the **reflex handler layer** +using a `DedupStore` (in-memory in 0.8.0, Redis in 0.9.0+, see +[`netcortex.contracts.dedup_store.DedupStore`](../../netcortex/contracts/dedup_store.py)). +The fact key is intentionally short and scoped by event class so two +handlers reacting to the same target on different conditions (e.g. +`link_down` and `link_up`) do not collide: + +``` +fact_key = f"{event_class}|{canonical_target}" +``` + +The store enforces a TTL window per call. First arrival succeeds and +records the key with TTL = `handler.dedup_window_seconds`. Any later +arrival of the same `fact_key` before TTL expiry is treated as a +duplicate and the handler returns a `skipped` outcome (still recorded, +so operators see corroboration in the UI). + +**Known limitation in 0.8.0:** a real flap whose down→up→down +transitions all land within one window collapses to one fire. We accept +this because flap detection is a working-memory concern (0.9.0) and +adding a transition counter in 0.8.0 would require publishers to emit +one, which they cannot uniformly do. The fusion stage in 0.9.0 tracks +state transitions explicitly. + +Per-handler defaults: + +| Handler | `dedup_window_seconds` | Rationale | +|---|---|---| +| `link_down` | 60 | A real flap surfaces as multiple facts; a duplicate-source surfaces once | +| `bgp_drop` | 60 | Same as link_down — BGP sessions don't flap meaningfully faster | +| `security_webhook` | 300 | Meraki retries delivery; same alert may arrive 2-3 times within minutes | + +Handlers can override per-event by computing a different `fact_key` (e.g., +include a `transition_id` from the payload) — see each handler's docstring. + +## Wildcards and worked subscriptions + +Common reflex subscriptions and what they catch: + +```text +sensory.link_down.> # ALL link-down regardless of source +sensory.link_down.snmp_trap.> # link-down ONLY from SNMP traps +sensory.link_down.*.cpn-ful-cat9k1.* # link-down from any source for one device +sensory.bgp_drop.> # ALL bgp drops +sensory.security_alert.meraki_webhook.> # Meraki security alerts only +sensory.*.snmp_trap.> # Everything from any SNMP trap source +sensory.> # Firehose (episodic memory only) +fact.link_down.> # 0.9.0+: post-fusion link-down facts +``` + +## Versioning and breaking changes + +Subject names are part of the **operator-facing contract**. Renaming an +event class or source token is a breaking change and requires: + +1. A `BREAKING CHANGE:` note in the changelog +2. Dual-publish under both old and new subjects for one minor version +3. A deprecation warning in the CHANGELOG entry +4. Removal of the old subject in the version after that + +Adding a new event class or source token is **not** a breaking change. diff --git a/netcortex/__init__.py b/netcortex/__init__.py index 6dc8033..ed242a3 100644 --- a/netcortex/__init__.py +++ b/netcortex/__init__.py @@ -22,4 +22,4 @@ ``CHANGELOG.md`` MUST be kept in sync whenever ``__version__`` changes. """ -__version__ = "0.8.0-dev2" +__version__ = "0.8.0-dev3" diff --git a/netcortex/contracts/__init__.py b/netcortex/contracts/__init__.py index 78aea96..f911429 100644 --- a/netcortex/contracts/__init__.py +++ b/netcortex/contracts/__init__.py @@ -33,12 +33,14 @@ from __future__ import annotations +from netcortex.contracts.dedup_store import DedupStore from netcortex.contracts.event_bus import EventBus, EventBusValidationError, EventMessage from netcortex.contracts.policy import Decision, Policy, PolicyContext from netcortex.contracts.sensory_adapter import SensoryAdapter, SensoryEvent __all__ = [ "Decision", + "DedupStore", "EventBus", "EventBusValidationError", "EventMessage", diff --git a/netcortex/contracts/dedup_store.py b/netcortex/contracts/dedup_store.py new file mode 100644 index 0000000..b042b6d --- /dev/null +++ b/netcortex/contracts/dedup_store.py @@ -0,0 +1,91 @@ +"""``DedupStore`` Protocol — time-windowed deduplication for sensory events. + +When the same real-world event arrives via multiple modalities (trap + +webhook + poll), reflex handlers consult a ``DedupStore`` to avoid firing +once per source. The store's only job is atomic check-and-record over a +TTL window. + +Implementations +--------------- +* ``netcortex.working.dedup.in_memory.InMemoryDedupStore`` — single-process, + asyncio-safe, used everywhere in 0.8.0 (reflex runner, tests, dev) and + the only implementation in CI. +* ``netcortex.working.dedup.redis.RedisDedupStore`` (lands in 0.9.0) — + multi-replica-safe, uses ``SET key 1 NX EX ttl`` for atomic insert. + +Both implementations conform to the same contract tests in +``tests/contracts/dedup_store/`` — Redis-backed tests skip unless +``REDIS_URL`` is set in the environment, matching the pattern dev1 +established for ``NatsEventBus``. + +Atomicity guarantee +------------------- +``record_unless_duplicate`` MUST be atomic with respect to other concurrent +calls for the same ``fact_key``. Without this, two concurrent observations +could both pass the check and both fire the reflex — which is the exact +race we're trying to suppress. + +Redis gives us this for free via ``SET NX``. The in-memory implementation +uses an ``asyncio.Lock`` per call. Future implementations must document +their atomicity story. + +Cross-process atomicity is only required of multi-replica stores; the +in-memory store is by definition single-process and operators MUST NOT +deploy multi-replica reflex runners with the in-memory store. The +runtime config that picks the store is responsible for enforcing this. +""" + +from __future__ import annotations + +from typing import Protocol, runtime_checkable + + +@runtime_checkable +class DedupStore(Protocol): + """Atomic check-and-record over a TTL window, keyed by string. + + The store is not a general key-value cache — it intentionally exposes + only the one operation the reflex layer needs, so adding new + implementations is a small, well-scoped task. + """ + + async def record_unless_duplicate( + self, + fact_key: str, + *, + ttl_seconds: float, + ) -> bool: + """Atomically record ``fact_key`` if it is not already present. + + Returns ``True`` when this call recorded the key (i.e. the event + is **new** within the current TTL window — the caller should + proceed). Returns ``False`` when the key was already present + (the event is a **duplicate** — the caller should short-circuit). + + ``ttl_seconds`` MUST be > 0. The store rounds sub-millisecond + precision down. Very long TTLs (> 1 day) are accepted but a + warning is logged because long-lived dedup state tends to mask + real flap behavior. + + The atomicity guarantee applies per ``fact_key`` only. Different + ``fact_key`` values may interleave freely. + + Raises + ------ + ValueError + If ``fact_key`` is empty or ``ttl_seconds`` <= 0. + """ + ... + + async def close(self) -> None: + """Release any underlying resources. + + Idempotent: a second call is a no-op. After ``close()`` further + calls to ``record_unless_duplicate`` raise ``RuntimeError``. + In-memory implementations make this trivial; Redis-backed + implementations close their connection pool here. + """ + ... + + +__all__ = ["DedupStore"] diff --git a/netcortex/contracts/subjects.py b/netcortex/contracts/subjects.py new file mode 100644 index 0000000..78f6e25 --- /dev/null +++ b/netcortex/contracts/subjects.py @@ -0,0 +1,206 @@ +"""NATS subject taxonomy — constants and helpers. + +The full specification lives in [`docs/architecture/subjects.md`](../../docs/architecture/subjects.md). +This module is the **machine-readable** half of that contract: an +authoritative enumeration of the closed event-class vocabulary, plus a +handful of small helpers for constructing and parsing subjects in a way +that matches what receivers actually emit. + +Why a closed vocabulary +----------------------- +Reflex handlers, fusion rules, and operator alerts all switch on event +class. If publishers were free to invent new classes ad-hoc, downstream +consumers would have to defensively handle unknown values everywhere. +Instead we lock the vocabulary here and force a documentation + code +change to introduce a new one — see the doc's "Versioning and breaking +changes" section. +""" + +from __future__ import annotations + +from typing import Final + +# --------------------------------------------------------------------------- +# Top-level namespaces +# --------------------------------------------------------------------------- + +SENSORY_NS: Final[str] = "sensory" +FACT_NS: Final[str] = "fact" +REFLEX_NS: Final[str] = "reflex" +MOTOR_NS: Final[str] = "motor" +CONSOLIDATION_NS: Final[str] = "consolidation" + +NAMESPACES: Final[frozenset[str]] = frozenset({ + SENSORY_NS, + FACT_NS, + REFLEX_NS, + MOTOR_NS, + CONSOLIDATION_NS, +}) + +# --------------------------------------------------------------------------- +# Closed vocabulary of event classes used after `sensory.` and `fact.` +# --------------------------------------------------------------------------- + +#: Adding to this set MUST be accompanied by a documentation update in +#: docs/architecture/subjects.md in the same PR. Reviewers reject changes +#: that grow one without the other. +SENSORY_EVENT_CLASSES: Final[frozenset[str]] = frozenset({ + "link_down", + "link_up", + "bgp_drop", + "bgp_up", + "device_reboot", + "device_unreachable", + "device_reachable", + "security_alert", + "config_change", + "topology_change", + "route_advertisement_change", +}) + +# --------------------------------------------------------------------------- +# Source-token registry (modality_provenance, single NATS token) +# --------------------------------------------------------------------------- + +#: Sources we currently know how to receive from. Add to this set when a +#: new receiver lands. Receivers ARE expected to validate their own source +#: token against this set at startup, so a typo fails fast. +SENSORY_SOURCES: Final[frozenset[str]] = frozenset({ + # SNMP + "snmp_trap", + "snmp_poll", + "snmp_walk", + # Webhooks + "meraki_webhook", + "thousandeyes_webhook", + "cisco_amp_webhook", + "catalyst_center_webhook", + # Streaming telemetry + "gnmi_dialout", + "gnmi_dialin", + "netconf_yangpush", + "cisco_mdt", + # API pollers (one per platform adapter) + "meraki_api", + "intersight_api", + "vsphere_api", + "fmc_api", + "nexus_dashboard_api", + "catalyst_center_api", + # Synthetic — when NetCortex itself derives an observation. Rare; + # most synthetic state should publish to `fact.*` instead. + "netcortex_inference", +}) + + +# --------------------------------------------------------------------------- +# Builders +# --------------------------------------------------------------------------- + + +def sensory_subject(event_class: str, source: str, *target_parts: str) -> str: + """Build a validated `sensory...` subject. + + Receivers should use this rather than f-string concatenation. The + validation catches drift between the doc, this module, and what + publishers actually emit — the kind of drift that produces + "subscriptions silently match nothing" bugs that are miserable to + debug in production. + """ + if event_class not in SENSORY_EVENT_CLASSES: + raise ValueError( + f"unknown sensory event class {event_class!r}; " + f"add to SENSORY_EVENT_CLASSES + docs/architecture/subjects.md" + ) + if source not in SENSORY_SOURCES: + raise ValueError( + f"unknown sensory source {source!r}; " + f"add to SENSORY_SOURCES + docs/architecture/subjects.md" + ) + if not target_parts: + raise ValueError( + f"sensory subject {event_class}/{source} requires at least one target part; " + f"got 0 — an empty subject tail makes subjects with wildcards ambiguous" + ) + # NATS tokens cannot contain '.' (token separator) or whitespace. + # We don't try to validate the full grammar here; we just catch the + # two cases that produce silent-but-wrong subjects. + for i, part in enumerate(target_parts): + if not part: + raise ValueError( + f"sensory subject target part {i} is empty — would produce " + f"a malformed subject like 'sensory.{event_class}.{source}..foo'" + ) + if "." in part: + raise ValueError( + f"sensory subject target part {i}={part!r} contains '.' which is " + f"the NATS token separator; use '|' to join compound identifiers" + ) + if any(ch.isspace() for ch in part): + raise ValueError( + f"sensory subject target part {i}={part!r} contains whitespace; " + f"NATS subjects must be whitespace-free" + ) + return ".".join([SENSORY_NS, event_class, source, *target_parts]) + + +def fact_subject(event_class: str, *target_parts: str) -> str: + """Build a validated `fact..` subject. + + Lands in production with the fusion stage (0.9.0). Provided here so + code written against the future fact namespace can validate today. + """ + if event_class not in SENSORY_EVENT_CLASSES: + raise ValueError( + f"unknown fact event class {event_class!r}; " + f"add to SENSORY_EVENT_CLASSES + docs/architecture/subjects.md" + ) + if not target_parts: + raise ValueError( + f"fact subject {event_class} requires at least one target part" + ) + return ".".join([FACT_NS, event_class, *target_parts]) + + +# --------------------------------------------------------------------------- +# Parser — used by handlers to extract the canonical fact_key +# --------------------------------------------------------------------------- + + +def parse_sensory_subject(subject: str) -> tuple[str, str, str]: + """Parse a sensory subject into (event_class, source, target_joined). + + `target_joined` is the remaining tokens after class+source, re-joined + with the NATS separator (``.``). Handlers typically use this to + compute their dedup fact_key. + + Returns a 3-tuple even when the subject is malformed — the caller + decides what to do. Empty fields signal "we don't know." + + Examples + -------- + >>> parse_sensory_subject("sensory.link_down.snmp_trap.r1|Gi0/1") + ('link_down', 'snmp_trap', 'r1|Gi0/1') + >>> parse_sensory_subject("sensory.security_alert.meraki_webhook.N_1.aa:bb:cc:dd:ee:ff") + ('security_alert', 'meraki_webhook', 'N_1.aa:bb:cc:dd:ee:ff') + """ + parts = subject.split(".") + if len(parts) < 4 or parts[0] != SENSORY_NS: + return ("", "", "") + return (parts[1], parts[2], ".".join(parts[3:])) + + +__all__ = [ + "CONSOLIDATION_NS", + "FACT_NS", + "MOTOR_NS", + "NAMESPACES", + "REFLEX_NS", + "SENSORY_EVENT_CLASSES", + "SENSORY_NS", + "SENSORY_SOURCES", + "fact_subject", + "parse_sensory_subject", + "sensory_subject", +] diff --git a/netcortex/reflex/__init__.py b/netcortex/reflex/__init__.py index c443a5b..5f90c73 100644 --- a/netcortex/reflex/__init__.py +++ b/netcortex/reflex/__init__.py @@ -18,7 +18,7 @@ brain-mapped architecture. """ -from netcortex.reflex.protocol import ReflexHandler, ReflexOutcome +from netcortex.reflex.protocol import ReflexContext, ReflexHandler, ReflexOutcome from netcortex.reflex.registry import ( DuplicateHandlerError, all_handlers, @@ -29,6 +29,7 @@ __all__ = [ "DuplicateHandlerError", + "ReflexContext", "ReflexHandler", "ReflexOutcome", "ReflexRunner", diff --git a/netcortex/reflex/handlers/__init__.py b/netcortex/reflex/handlers/__init__.py index 83ecadb..cea7b0f 100644 --- a/netcortex/reflex/handlers/__init__.py +++ b/netcortex/reflex/handlers/__init__.py @@ -23,4 +23,4 @@ # Keep these imports alphabetical so a diff reviewer can spot additions. from netcortex.reflex.handlers import bgp_drop # noqa: F401 from netcortex.reflex.handlers import link_down # noqa: F401 -from netcortex.reflex.handlers import security_webhook # noqa: F401 +from netcortex.reflex.handlers import security_alert # noqa: F401 diff --git a/netcortex/reflex/handlers/bgp_drop.py b/netcortex/reflex/handlers/bgp_drop.py index 1cafee2..217a55e 100644 --- a/netcortex/reflex/handlers/bgp_drop.py +++ b/netcortex/reflex/handlers/bgp_drop.py @@ -1,28 +1,21 @@ """``bgp_drop`` — reflex handler for BGP session state-down signals. -Subscribes to the BGP4-MIB ``bgpBackwardTransition`` trap (and, once the -streaming telemetry adapter lands, also ``sensory.cisco.mdt.bgp.>`` -neighbor-down samples). The fast deterministic response is to record a -session-down outcome so the deliberative loop (route convergence -analysis, prefix advertisement drift) has the wall-clock anchor. - -This module is dev2 scaffolding. When publishers land in 0.8.0-dev3+ the -handler will additionally: - -* resolve the peer IP against semantic memory's ``:BgpSession`` nodes so - the outcome carries the canonical session identifier, not just the - peer address; -* check whether the device is in a maintenance window OR the peer is a - known-flapping route-server (operator policy); -* attach a NetBox journal entry to the BGP session object (once the - reconciliation engine starts surfacing those — they are not first- - class in NetBox today, so the journal will live on the device); -* trigger a deliberative follow-up to assess prefix-advertisement - impact, comparing the last-known advertised prefix set on this - session against the post-drop topology snapshot. - -None of that is in dev2. The current handler logs and returns a -``high``-severity outcome; downstream consumers can already key off it. +Subscribes to ``sensory.bgp_drop.>`` so the same handler reacts to +every source of a BGP session backward-transition (SNMP trap, gNMI +neighbor-state sample, future RIB poll diff). + +The event class was ``bgp_backward_transition`` in dev2's draft +taxonomy — renamed to the cleaner ``bgp_drop`` in dev3's event-class- +first taxonomy. See ``docs/architecture/subjects.md``. + +Dedup +----- +A real BGP session drop typically produces one trap immediately and a +gNMI sample 1-2 seconds later. Both arrivals collapse to one fire via +the runner-supplied :class:`DedupStore`. Per-handler window default is +60 seconds — short enough that a real flap (drop, restore, drop within +a minute) collapses to one fact but a same-session re-drop after the +window fires fresh. Operators tune via constructor arg. """ from __future__ import annotations @@ -31,17 +24,13 @@ class in NetBox today, so the journal will live on the device); from typing import Final from netcortex.contracts.event_bus import EventMessage -from netcortex.reflex.protocol import ReflexOutcome +from netcortex.contracts.subjects import parse_sensory_subject +from netcortex.reflex.protocol import ReflexContext, ReflexOutcome from netcortex.reflex.registry import register_handler -# Subject pattern. -# -# Real publishers will use -# ``sensory.snmp.trap.bgp_backward_transition.`` or -# ``sensory.cisco.mdt.bgp_neighbor_state.``. For dev2 the -# handler subscribes to the SNMP trap subject only; the second -# subscription (or a glob) lands once the telemetry adapter exists. -_PATTERN: Final[str] = "sensory.snmp.trap.bgp_backward_transition.>" +_PATTERN: Final[str] = "sensory.bgp_drop.>" + +_DEFAULT_DEDUP_WINDOW_SECONDS: Final[float] = 60.0 class BgpDropHandler: @@ -50,8 +39,18 @@ class BgpDropHandler: id: Final[str] = "bgp_drop" pattern: Final[str] = _PATTERN - async def handle(self, event: EventMessage) -> ReflexOutcome | None: + def __init__( + self, + *, + dedup_window_seconds: float = _DEFAULT_DEDUP_WINDOW_SECONDS, + ) -> None: + self.dedup_window_seconds = dedup_window_seconds + + async def handle( + self, event: EventMessage, ctx: ReflexContext + ) -> ReflexOutcome | None: payload = event.payload + event_class, source, _ = parse_sensory_subject(event.subject) device = ( payload.get("device_id") or payload.get("device") @@ -60,27 +59,45 @@ async def handle(self, event: EventMessage) -> ReflexOutcome | None: peer = payload.get("peer") or payload.get("peer_ip") peer_asn = payload.get("peer_asn") or payload.get("remote_as") last_state = payload.get("last_state") or payload.get("previous_state") - # Compose a target identifier that survives whether or not the peer - # IP is known — preferring the canonical session "device|peer" key - # when both are available, falling back to whichever is present. if device and peer: - target = f"{device}|{peer}" + target: str | None = f"{device}|{peer}" elif peer: target = str(peer) elif device: target = str(device) else: target = None + + now = datetime.now(tz=timezone.utc) + + if ctx.dedup_store is not None and target is not None and event_class: + fact_key = f"{event_class}|{target}" + is_new = await ctx.dedup_store.record_unless_duplicate( + fact_key, ttl_seconds=self.dedup_window_seconds + ) + if not is_new: + return ReflexOutcome( + handler=self.id, + subject=event.subject, + target=target, + severity="info", + occurred_at=now, + payload={"source": source}, + outcome="skipped", + rationale=( + f"duplicate bgp_drop within {self.dedup_window_seconds}s " + f"dedup window (source={source!r})" + ), + ) + return ReflexOutcome( handler=self.id, subject=event.subject, target=target, - # BGP session loss is high-severity by default. Operators can - # tune later via the policy library once it exists; we do not - # second-guess severity inside the handler. severity="high", - occurred_at=datetime.now(tz=timezone.utc), + occurred_at=now, payload={ + "source": source, "device": device, "peer": peer, "peer_asn": peer_asn, @@ -88,8 +105,8 @@ async def handle(self, event: EventMessage) -> ReflexOutcome | None: }, outcome="logged", rationale=( - f"BGP session {target!r} backward-transition observed; " - f"last_state={last_state!r} — dev2 idle handler" + f"bgp_drop on {target!r} from {source!r}; " + f"last_state={last_state!r} — dev3 dedup wired" ), ) diff --git a/netcortex/reflex/handlers/link_down.py b/netcortex/reflex/handlers/link_down.py index c562a49..c5c3b53 100644 --- a/netcortex/reflex/handlers/link_down.py +++ b/netcortex/reflex/handlers/link_down.py @@ -1,29 +1,22 @@ """``link_down`` — reflex handler for interface-down signals. -Subscribes to the SNMP linkDown trap subject. In the brain-mapped -architecture this is the fast deterministic response to an interface -going hard-down: log it now, let the deliberative loop (prefrontal, -0.11.0) decide whether to open a ticket, page someone, or just wait -for the symmetric linkUp. - -This module is dev2 scaffolding — the handler is registered and the -runner will subscribe it to the bus, but no publisher exists yet. The -first real linkDown publish lands in 0.8.0-dev3 when the SNMP-trap -sensory adapter (``sensory/trap/snmp.py``) is wired in. - -When publishers exist, the handler will also: - -* fetch the affected (device, interface) from semantic memory and verify - it is not in a maintenance window; -* deduplicate against a Redis "recently seen" window so a flapping link - produces one outcome per minute, not one per trap; -* attach a NetBox journal entry on the Interface object so an operator - sees the trap immediately in the tool they live in; -* emit a follow-up ``reflex.link_down.applied`` event so consolidation - knows a synthetic interface-state transition has been recorded. - -None of that is in dev2. The current implementation captures the trap, -extracts the target, and returns a ``logged`` outcome. +Subscribes to ``sensory.link_down.>`` so the same handler reacts to +every source of a link-down observation (SNMP trap, SNMP poll diff, +Meraki webhook, gNMI dial-out sample) without needing per-source +duplicate registrations. + +Dedup +----- +The same physical link going down typically arrives multiple times +within a short window — the trap, then the Meraki webhook ~50ms later, +then the SNMP poll on its next 30-second pass. We collapse those to a +single fire via the runner-supplied :class:`DedupStore`. Per-handler +defaults documented in ``docs/architecture/subjects.md``. + +Dev2 was idle (no publishers). Dev3 wires the dedup contract but is +still idle in terms of upstream publishers — the first real +``sensory.link_down.snmp_poll.`` publish lands in 0.8.0-dev4 +when the ingest worker starts dual-writing detected state changes. """ from __future__ import annotations @@ -32,50 +25,93 @@ from typing import Final from netcortex.contracts.event_bus import EventMessage -from netcortex.reflex.protocol import ReflexOutcome +from netcortex.contracts.subjects import parse_sensory_subject +from netcortex.reflex.protocol import ReflexContext, ReflexOutcome from netcortex.reflex.registry import register_handler -# Subject pattern. -# -# Real publishers in 0.8.0-dev3+ will use -# ``sensory.snmp.trap.link_down.`` and emit one event per -# (device, interface) transition. The trailing ``>`` matches any number -# of further tokens so the handler can be subscribed today and the -# publisher's exact subject layout can evolve without redeploying the -# handler. -_PATTERN: Final[str] = "sensory.snmp.trap.link_down.>" +_PATTERN: Final[str] = "sensory.link_down.>" + +_DEFAULT_DEDUP_WINDOW_SECONDS: Final[float] = 60.0 class LinkDownHandler: - """Reflex for IF-MIB linkDown traps.""" + """Reflex for link-down observations from any source.""" id: Final[str] = "link_down" pattern: Final[str] = _PATTERN - async def handle(self, event: EventMessage) -> ReflexOutcome | None: + def __init__( + self, + *, + dedup_window_seconds: float = _DEFAULT_DEDUP_WINDOW_SECONDS, + ) -> None: + self.dedup_window_seconds = dedup_window_seconds + + async def handle( + self, event: EventMessage, ctx: ReflexContext + ) -> ReflexOutcome | None: payload = event.payload - target = ( + event_class, source, _target_from_subject = parse_sensory_subject( + event.subject + ) + device = ( payload.get("device_id") or payload.get("device") or payload.get("target") ) interface = payload.get("interface") or payload.get("if_name") + if device and interface: + target: str | None = f"{device}|{interface}" + elif device: + target = str(device) + else: + target = None + + now = datetime.now(tz=timezone.utc) + + # Dedup check — only when both a store is provided AND we have + # a canonical target. An event with no target identifier cannot + # be meaningfully deduped (every arrival is treated as unique). + if ctx.dedup_store is not None and target is not None and event_class: + fact_key = f"{event_class}|{target}" + is_new = await ctx.dedup_store.record_unless_duplicate( + fact_key, ttl_seconds=self.dedup_window_seconds + ) + if not is_new: + return ReflexOutcome( + handler=self.id, + subject=event.subject, + target=target, + # Severity is intentionally demoted on the skipped + # outcome — it is informational corroboration, not a + # second incident. + severity="info", + occurred_at=now, + payload={"source": source}, + outcome="skipped", + rationale=( + f"duplicate link_down within {self.dedup_window_seconds}s " + f"dedup window (source={source!r})" + ), + ) + return ReflexOutcome( handler=self.id, subject=event.subject, - target=str(target) if target else None, + target=target, severity="high", - occurred_at=datetime.now(tz=timezone.utc), + occurred_at=now, payload={ + "source": source, "interface": interface, - # Cap the upstream payload echo so a chatty publisher - # cannot blow up the outcome record. + # Cap upstream key echo at 16 — same rationale as dev2 + # (a chatty publisher cannot blow up the outcome). "upstream_keys": sorted(payload.keys())[:16], }, outcome="logged", rationale=( - f"linkDown observed on {target!r} interface {interface!r}; " - "dev2 idle handler — no remediation yet" + f"link_down on {target!r} from {source!r}; " + "dev3 dedup wired, no remediation yet" ), ) diff --git a/netcortex/reflex/handlers/security_alert.py b/netcortex/reflex/handlers/security_alert.py new file mode 100644 index 0000000..f0cb982 --- /dev/null +++ b/netcortex/reflex/handlers/security_alert.py @@ -0,0 +1,129 @@ +"""``security_alert`` — reflex handler for security webhook events. + +Renamed from ``security_webhook`` (dev2) to ``security_alert`` in dev3 +to reflect the event-class-first taxonomy: the handler subscribes to +``sensory.security_alert.>`` and reacts to every webhook source +(Meraki today; Cisco AMP, future SIEM webhooks tomorrow) without +needing per-source registrations. + +Dedup +----- +Meraki retries webhook delivery on failure (typically 2-3 retries spaced +over a minute or two), so the same ``alertId`` can arrive several times +within a few minutes. Default dedup window is 300 seconds — long enough +to absorb the retry pattern, short enough that distinct alerts on the +same target later in the day still fire fresh. + +Note that ``alertId`` is the *upstream* dedup identifier; we still +construct our own ``fact_key`` from (event_class + target) so the same +underlying incident observed via Meraki AND via a future Cisco AMP +webhook would still collapse to one outcome — assuming both publishers +agree on the canonical target. Identity reconciliation across webhook +sources is a 0.9.0 fusion-layer concern; in 0.8.0 we accept some +slippage (e.g., Meraki using clientMac while AMP uses clientIp). +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Final + +from netcortex.contracts.event_bus import EventMessage +from netcortex.contracts.subjects import parse_sensory_subject +from netcortex.reflex.protocol import ReflexContext, ReflexOutcome, Severity +from netcortex.reflex.registry import register_handler + +_PATTERN: Final[str] = "sensory.security_alert.>" + +_DEFAULT_DEDUP_WINDOW_SECONDS: Final[float] = 300.0 + +# Coarse mapping for dev3. Real severity policy lives in +# policy/security_severity.py once the policy library lands (0.8.x). +_MERAKI_SEVERITY_MAP: Final[dict[str, Severity]] = { + "informational": "info", + "info": "info", + "warning": "warn", + "warn": "warn", + "high": "high", + "critical": "critical", +} + + +class SecurityAlertHandler: + """Reflex for security-class webhook events.""" + + id: Final[str] = "security_alert" + pattern: Final[str] = _PATTERN + + def __init__( + self, + *, + dedup_window_seconds: float = _DEFAULT_DEDUP_WINDOW_SECONDS, + ) -> None: + self.dedup_window_seconds = dedup_window_seconds + + async def handle( + self, event: EventMessage, ctx: ReflexContext + ) -> ReflexOutcome | None: + payload = event.payload + event_class, source, _ = parse_sensory_subject(event.subject) + upstream_sev = str(payload.get("severity") or "").lower() + severity: Severity = _MERAKI_SEVERITY_MAP.get(upstream_sev, "warn") + target = ( + payload.get("clientMac") + or payload.get("deviceSerial") + or payload.get("networkId") + or payload.get("target") + ) + target = str(target) if target else None + event_type = payload.get("alertType") or payload.get("eventType") + + now = datetime.now(tz=timezone.utc) + + if ctx.dedup_store is not None and target is not None and event_class: + # Include event_type in the fact_key when present so two + # different alert types on the same client don't dedup with + # each other. + fact_key_parts = [event_class, target] + if event_type: + fact_key_parts.append(str(event_type)) + fact_key = "|".join(fact_key_parts) + is_new = await ctx.dedup_store.record_unless_duplicate( + fact_key, ttl_seconds=self.dedup_window_seconds + ) + if not is_new: + return ReflexOutcome( + handler=self.id, + subject=event.subject, + target=target, + severity="info", + occurred_at=now, + payload={"source": source, "event_type": event_type}, + outcome="skipped", + rationale=( + f"duplicate security_alert within {self.dedup_window_seconds}s " + f"dedup window (source={source!r}, event_type={event_type!r})" + ), + ) + + return ReflexOutcome( + handler=self.id, + subject=event.subject, + target=target, + severity=severity, + occurred_at=now, + payload={ + "source": source, + "alert_id": payload.get("alertId"), + "event_type": event_type, + "network_id": payload.get("networkId"), + }, + outcome="logged", + rationale=( + f"security_alert {event_type!r} on {target!r} from {source!r}; " + "dev3 dedup wired" + ), + ) + + +_HANDLER = register_handler(SecurityAlertHandler()) diff --git a/netcortex/reflex/handlers/security_webhook.py b/netcortex/reflex/handlers/security_webhook.py deleted file mode 100644 index c9a40d7..0000000 --- a/netcortex/reflex/handlers/security_webhook.py +++ /dev/null @@ -1,88 +0,0 @@ -"""``security_webhook`` — reflex handler for Meraki security webhooks. - -Subscribes to security-class Meraki Dashboard webhooks (IDS alerts, -malware, anomalous traffic, blocked URL hit, etc.) once the webhook -receiver in ``sensory/webhook/meraki.py`` lands (0.8.x patch). Dev2 -ships the handler skeleton so the pattern is reserved and the runner's -subscription map is complete from day one. - -When publishers exist, this handler will: - -* dedupe via Meraki's ``alertId`` (a Redis "seen recently" set with a - short TTL — Meraki retries the same alert on delivery failure); -* check whether the affected client is known to semantic memory; if not, - promote it to the working-memory "unknown clients" set so the - operator UI can surface unrecognized endpoints; -* compute a severity from the ``occurredAt`` + ``eventType`` cross - product using a policy in ``policy/security_severity.py`` (so the - threshold is operator-tunable, not hardcoded); -* attach a NetBox journal entry on the affected IPAddress/Interface - when it can be resolved. - -None of that is in dev2. The current implementation logs the inbound -event and returns an outcome whose severity comes verbatim from the -Meraki payload's ``severity`` field when present, defaulting to -``warn`` (Meraki's own scale runs informational/warning/critical; we -collapse to our four-bucket scale in the policy module later). -""" - -from __future__ import annotations - -from datetime import datetime, timezone -from typing import Final - -from netcortex.contracts.event_bus import EventMessage -from netcortex.reflex.protocol import ReflexOutcome, Severity -from netcortex.reflex.registry import register_handler - -_PATTERN: Final[str] = "sensory.meraki.webhook.security.>" - -# Coarse mapping for dev2. Real severity policy lives in -# policy/security_severity.py once the policy library lands (0.8.x). -_MERAKI_SEVERITY_MAP: Final[dict[str, Severity]] = { - "informational": "info", - "info": "info", - "warning": "warn", - "warn": "warn", - "high": "high", - "critical": "critical", -} - - -class SecurityWebhookHandler: - """Reflex for Meraki security-class webhook events.""" - - id: Final[str] = "security_webhook" - pattern: Final[str] = _PATTERN - - async def handle(self, event: EventMessage) -> ReflexOutcome | None: - payload = event.payload - upstream_sev = str(payload.get("severity") or "").lower() - severity: Severity = _MERAKI_SEVERITY_MAP.get(upstream_sev, "warn") - target = ( - payload.get("clientMac") - or payload.get("deviceSerial") - or payload.get("networkId") - or payload.get("target") - ) - event_type = payload.get("alertType") or payload.get("eventType") - return ReflexOutcome( - handler=self.id, - subject=event.subject, - target=str(target) if target else None, - severity=severity, - occurred_at=datetime.now(tz=timezone.utc), - payload={ - "alert_id": payload.get("alertId"), - "event_type": event_type, - "network_id": payload.get("networkId"), - }, - outcome="logged", - rationale=( - f"meraki security webhook {event_type!r} on {target!r}; " - "dev2 idle handler — dedupe + classification pending" - ), - ) - - -_HANDLER = register_handler(SecurityWebhookHandler()) diff --git a/netcortex/reflex/protocol.py b/netcortex/reflex/protocol.py index 77ef3f8..1026dcb 100644 --- a/netcortex/reflex/protocol.py +++ b/netcortex/reflex/protocol.py @@ -30,6 +30,7 @@ from datetime import datetime from typing import Any, Literal, Protocol, runtime_checkable +from netcortex.contracts.dedup_store import DedupStore from netcortex.contracts.event_bus import EventMessage Severity = Literal["info", "warn", "high", "critical"] @@ -101,6 +102,36 @@ class of "the value I saw at log time differs from what I wrote to used). Not surfaced in the operator UI by default.""" +@dataclass(frozen=True) +class ReflexContext: + """Runtime dependencies a handler may use during ``handle()``. + + Threaded through by the :class:`netcortex.reflex.runner.ReflexRunner` + so handlers can consult shared resources (the dedup store today; + semantic memory, working memory, and the policy engine in later + releases) without each one carrying its own constructor wiring. + + Frozen so a handler cannot replace another handler's view of the + world mid-dispatch. New shared resources are added by appending + fields here; handlers that don't consume them are unaffected. That + forward-compatibility is the whole point of using a context dataclass + rather than positional arguments. + + All fields default to ``None`` so the runner can be constructed + without ever wiring a context (the default-context path) and tests + can pass partial contexts that exercise only the resources they + care about. + """ + + dedup_store: DedupStore | None = None + """If set, handlers should consult it via + :meth:`DedupStore.record_unless_duplicate` to suppress duplicate + arrivals of the same logical event (e.g. trap + webhook + poll all + observing one interface going down). Handlers that opt out of dedup + — because their event class is inherently de-duplicated upstream — + may ignore this field.""" + + @runtime_checkable class ReflexHandler(Protocol): """Minimum surface every reflex handler must expose. @@ -126,11 +157,21 @@ def pattern(self) -> str: Validated by the runner against the same grammar :class:`netcortex.thalamus.NatsEventBus` enforces. One pattern per - handler in 0.8.0-dev2. + handler in 0.8.0-dev3. + + Patterns follow the event-class-first taxonomy documented in + ``docs/architecture/subjects.md`` — + ``sensory...`` — so a single + wildcard like ``sensory.link_down.>`` catches every source of a + link-down observation. """ ... - async def handle(self, event: EventMessage) -> ReflexOutcome | None: + async def handle( + self, + event: EventMessage, + ctx: ReflexContext, + ) -> ReflexOutcome | None: """Process one event. Returning :class:`ReflexOutcome` instructs the runner to log / @@ -141,5 +182,14 @@ async def handle(self, event: EventMessage) -> ReflexOutcome | None: way to silently skip an error — raise instead, and the runner will convert it to an ``errored`` outcome with the traceback in ``diagnostic``. + + ``ctx`` carries shared runtime resources (dedup store and, in + later releases, semantic memory / working memory / policy). A + handler that consults the dedup store and finds the event is a + duplicate SHOULD return a ``ReflexOutcome`` with + ``outcome="skipped"`` and a ``rationale`` naming the dedup + window — the skipped outcome is itself useful telemetry + (corroboration count, visibility gaps) so the runner persists + it the same as any other outcome. """ ... diff --git a/netcortex/reflex/runner.py b/netcortex/reflex/runner.py index e36d67d..63b2579 100644 --- a/netcortex/reflex/runner.py +++ b/netcortex/reflex/runner.py @@ -40,7 +40,7 @@ from datetime import datetime, timezone from netcortex.contracts.event_bus import EventBus, EventMessage -from netcortex.reflex.protocol import ReflexHandler, ReflexOutcome +from netcortex.reflex.protocol import ReflexContext, ReflexHandler, ReflexOutcome from netcortex.reflex.registry import all_handlers _LOG = logging.getLogger(__name__) @@ -67,11 +67,17 @@ def __init__( bus: EventBus, *, handlers: list[ReflexHandler] | None = None, + context: ReflexContext | None = None, ) -> None: self._bus = bus self._handlers: list[ReflexHandler] = ( list(handlers) if handlers is not None else list(all_handlers()) ) + # Default context has every shared resource set to None — the + # opt-out path for handlers that don't need any of them. The + # production wiring (web pod / worker pod) replaces this with a + # context that has dedup_store + future memory layers attached. + self._context: ReflexContext = context or ReflexContext() self._tasks: list[asyncio.Task[None]] = [] self._started = False self._stopping = False @@ -89,6 +95,11 @@ def handlers(self) -> list[ReflexHandler]: """Snapshot of handlers this runner is driving (read-only).""" return list(self._handlers) + @property + def context(self) -> ReflexContext: + """The :class:`ReflexContext` every handler will receive on dispatch.""" + return self._context + # ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------ @@ -172,7 +183,7 @@ async def _dispatch_one( ) -> None: """Invoke one handler safely and persist its outcome.""" try: - outcome = await handler.handle(event) + outcome = await handler.handle(event, self._context) except asyncio.CancelledError: raise except Exception as exc: diff --git a/netcortex/thalamus/nats_bus.py b/netcortex/thalamus/nats_bus.py index af7f68b..1b80411 100644 --- a/netcortex/thalamus/nats_bus.py +++ b/netcortex/thalamus/nats_bus.py @@ -58,7 +58,16 @@ # Subject syntax — kept in sync with InMemoryEventBus so both backends reject # the same set of malformed subjects (the contract tests publish these as # negative cases and expect rejection from any conforming implementation). -_VALID_PUBLISH_SUBJECT = re.compile(r"^[A-Za-z0-9_\-]+(?:\.[A-Za-z0-9_\-]+)*$") +# NATS subject grammar: dot-separated tokens where each token is one or +# more printable, non-whitespace characters EXCLUDING the three reserved +# meta-characters: '.' (token separator), '*' (single-token wildcard), +# and '>' (multi-token wildcard). This admits real-world identifier +# characters like ':' (MAC addresses), '|' (compound NetCortex targets +# like 'device|interface'), '/' (interface names like Gi0/1), and '+'. +# Earlier revisions used [A-Za-z0-9_-] which silently rejected +# realistic subjects; that mismatch is now fixed so the bus accepts +# everything the taxonomy in docs/architecture/subjects.md emits. +_VALID_PUBLISH_SUBJECT = re.compile(r"^[^.\s*>]+(?:\.[^.\s*>]+)*$") def _validate_pattern(pattern: str) -> None: diff --git a/netcortex/working/__init__.py b/netcortex/working/__init__.py new file mode 100644 index 0000000..f21ceb7 --- /dev/null +++ b/netcortex/working/__init__.py @@ -0,0 +1,23 @@ +"""Working memory — short-lived, fast, replaceable state stores. + +Working memory in the brain-mapped architecture (see +[`docs/architecture/brain.md`](../../docs/architecture/brain.md)) is the +layer that holds **right now** state: active alerts, in-flight +reconciliation jobs, dedup windows, sliding metric aggregates. It is +deliberately separate from semantic memory (Neo4j, long-lived structural +knowledge) and episodic memory (Splunk, raw event history). + +Production target storage is Redis. The interfaces are defined in +``netcortex/contracts/`` and concrete backends live in the per-concern +subpackages here: + +* ``netcortex.working.dedup`` — TTL-windowed dedup (in-memory for 0.8.0, + Redis from 0.9.0) +* ``netcortex.working.activity`` (future) — sliding-window counters +* ``netcortex.working.queues`` (future) — per-flow rate budgets + +In 0.8.0 we ship only the in-memory implementations. They are functional +and used in CI; production deployments will swap them for Redis-backed +implementations in 0.9.0 with no call-site changes (the swap is wired +through the Protocols in ``netcortex/contracts/``). +""" diff --git a/netcortex/working/dedup/__init__.py b/netcortex/working/dedup/__init__.py new file mode 100644 index 0000000..5947605 --- /dev/null +++ b/netcortex/working/dedup/__init__.py @@ -0,0 +1,10 @@ +"""Working-memory dedup implementations. + +The Protocol lives in :mod:`netcortex.contracts.dedup_store`. This +package contains the concrete implementations the runner can be wired +to. +""" + +from netcortex.working.dedup.in_memory import InMemoryDedupStore + +__all__ = ["InMemoryDedupStore"] diff --git a/netcortex/working/dedup/in_memory.py b/netcortex/working/dedup/in_memory.py new file mode 100644 index 0000000..0a94da9 --- /dev/null +++ b/netcortex/working/dedup/in_memory.py @@ -0,0 +1,155 @@ +"""Single-process in-memory :class:`DedupStore`. + +Asyncio-safe (one ``asyncio.Lock`` serializes mutations), TTL-bounded, +and size-bounded with LRU eviction so a misbehaving publisher cannot OOM +the runner. The implementation is small on purpose — production +multi-replica deployments use the Redis-backed store starting in 0.9.0, +and the in-memory version is meant for single-replica production, +single-process CI, and unit tests. + +Operators MUST NOT deploy multi-replica reflex runners against this +store: there is no cross-process visibility, so two replicas would each +fire the reflex for the same event. The runtime config code that wires +the store is responsible for refusing this configuration, not this +class. +""" + +from __future__ import annotations + +import asyncio +import logging +import time +from collections import OrderedDict +from typing import Final + +_LOG = logging.getLogger(__name__) + +#: Hard upper bound on entries we keep. Keys beyond this are LRU-evicted +#: even if not yet expired. The number is large enough that a normal +#: production cardinality (10k devices × 10 event classes × 30 active +#: facts per device) sits comfortably below; small enough that a +#: pathological 1000x explosion still fits in a few MB. +_DEFAULT_MAX_ENTRIES: Final[int] = 100_000 + +#: Warn if a caller asks for a TTL longer than this — long-lived dedup +#: state tends to mask real flap behavior the operator needs to see. +_LONG_TTL_WARN_SECONDS: Final[float] = 24 * 60 * 60 + + +class InMemoryDedupStore: + """Time-windowed dedup, single-process, asyncio-safe. + + Parameters + ---------- + max_entries: + Cap on stored keys. When exceeded, the LRU entry (the one we + haven't touched longest) is evicted, even if its TTL has not + expired. Default sized for normal production workloads — raise + if you legitimately have more concurrent fact keys. + clock: + Override for the monotonic clock; tests pass a controllable + clock so they can advance time without ``asyncio.sleep``. + Production code passes nothing. + """ + + def __init__( + self, + *, + max_entries: int = _DEFAULT_MAX_ENTRIES, + clock: "callable[..., float] | None" = None, + ) -> None: + if max_entries <= 0: + raise ValueError(f"max_entries must be > 0, got {max_entries}") + self._max_entries = max_entries + self._clock = clock or time.monotonic + # OrderedDict tracks insertion order, which we mutate on every + # access so dict iteration is LRU. value = expiry_monotonic. + self._entries: OrderedDict[str, float] = OrderedDict() + self._lock = asyncio.Lock() + self._closed = False + + async def record_unless_duplicate( + self, + fact_key: str, + *, + ttl_seconds: float, + ) -> bool: + """Atomically record ``fact_key`` if absent. See Protocol docs.""" + if not fact_key: + raise ValueError("fact_key must be non-empty") + if ttl_seconds <= 0: + raise ValueError(f"ttl_seconds must be > 0, got {ttl_seconds}") + if self._closed: + raise RuntimeError("InMemoryDedupStore.record_unless_duplicate after close()") + if ttl_seconds > _LONG_TTL_WARN_SECONDS: + _LOG.warning( + "dedup_store.long_ttl ttl_seconds=%s fact_key=%s", + ttl_seconds, fact_key, + ) + + async with self._lock: + now = self._clock() + existing = self._entries.get(fact_key) + if existing is not None and existing > now: + # Touch for LRU — even a duplicate "refreshes interest". + # This matters under sustained duplicate floods: if we + # didn't touch, the duplicated key would age out for LRU + # eviction faster than fresh keys, which is the wrong + # direction. + self._entries.move_to_end(fact_key) + return False + + # Lazy GC: when we're at capacity OR have any stale entries + # adjacent to insertion, sweep a bounded number of expired + # heads off the LRU. Bounded so worst-case latency is O(B). + self._sweep_expired(now, budget=32) + + # Enforce hard cap by evicting LRU(s) if needed. + while len(self._entries) >= self._max_entries: + evicted_key, evicted_exp = self._entries.popitem(last=False) + _LOG.debug( + "dedup_store.evicted_lru key=%s expired_in_s=%s", + evicted_key, round(evicted_exp - now, 3), + ) + + self._entries[fact_key] = now + ttl_seconds + return True + + def _sweep_expired(self, now: float, *, budget: int) -> None: + """Evict up to ``budget`` expired entries from the LRU head. + + Called under lock. We sweep only from the head (oldest entries) + because that's where expired keys cluster — the move_to_end on + every touch means recently-touched keys are at the tail. A + bounded sweep keeps tail latency predictable; the rare + worst-case (all entries expired in one burst) just means future + calls each chip away another 32 entries until the head is clean. + """ + swept = 0 + while swept < budget and self._entries: + head_key = next(iter(self._entries)) + head_exp = self._entries[head_key] + if head_exp > now: + return # head is fresh — by LRU ordering, nothing behind is expired enough to matter for this sweep + del self._entries[head_key] + swept += 1 + + async def close(self) -> None: + """Drop all state. Idempotent.""" + if self._closed: + return + self._closed = True + async with self._lock: + self._entries.clear() + + # ------------------------------------------------------------------ + # Test helpers — not on the Protocol, do not depend on them in + # production code. + # ------------------------------------------------------------------ + + def size(self) -> int: + """Current number of tracked keys. Lock-free read for inspection.""" + return len(self._entries) + + +__all__ = ["InMemoryDedupStore"] diff --git a/pyproject.toml b/pyproject.toml index 5f2e4a0..75e8f6c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "netcortex" -version = "0.8.0.dev2" +version = "0.8.0.dev3" description = "The intelligence layer for your network — multi-dimensional graph of the network bridging Meraki, Catalyst Center, Intersight, and more with NetBox as SoT" readme = "README.md" requires-python = ">=3.12" diff --git a/tests/contracts/conftest.py b/tests/contracts/conftest.py index 83a3690..10d3adb 100644 --- a/tests/contracts/conftest.py +++ b/tests/contracts/conftest.py @@ -19,7 +19,7 @@ import pytest -from netcortex.contracts import EventBus, Policy, SensoryAdapter +from netcortex.contracts import DedupStore, EventBus, Policy, SensoryAdapter # --------------------------------------------------------------------------- # EventBus registry. @@ -120,10 +120,40 @@ def policy_factory(request: pytest.FixtureRequest) -> Callable[[], Policy]: raise RuntimeError(f"unknown policy impl: {name}") +# --------------------------------------------------------------------------- +# DedupStore registry. +# --------------------------------------------------------------------------- + + +def _make_in_memory_dedup_store() -> DedupStore: + from netcortex.working.dedup import InMemoryDedupStore + + return InMemoryDedupStore() + + +# Redis-backed store lands in 0.9.0; when it does, add a +# _make_redis_dedup_store factory here that pytest.skips when REDIS_URL is +# absent, exactly the same pattern as _make_nats_event_bus above. +DEDUP_STORE_IMPLEMENTATIONS: list[tuple[str, Callable[[], DedupStore]]] = [ + ("in_memory", _make_in_memory_dedup_store), +] + + +@pytest.fixture(params=[name for name, _ in DEDUP_STORE_IMPLEMENTATIONS], ids=lambda n: n) +def dedup_store_factory(request: pytest.FixtureRequest) -> Callable[[], DedupStore]: + name: str = request.param + for n, factory in DEDUP_STORE_IMPLEMENTATIONS: + if n == name: + return factory + raise RuntimeError(f"unknown dedup store impl: {name}") + + __all__: Iterable[str] = ( + "DEDUP_STORE_IMPLEMENTATIONS", "EVENT_BUS_IMPLEMENTATIONS", "SENSORY_ADAPTER_IMPLEMENTATIONS", "POLICY_IMPLEMENTATIONS", + "dedup_store_factory", "event_bus_factory", "sensory_adapter_factory", "policy_factory", diff --git a/tests/contracts/dedup_store/__init__.py b/tests/contracts/dedup_store/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/contracts/dedup_store/test_dedup_store_contract.py b/tests/contracts/dedup_store/test_dedup_store_contract.py new file mode 100644 index 0000000..ebeef3f --- /dev/null +++ b/tests/contracts/dedup_store/test_dedup_store_contract.py @@ -0,0 +1,138 @@ +"""Contract suite for :class:`netcortex.contracts.dedup_store.DedupStore`. + +Every implementation registered in +``tests/contracts/conftest.py::DEDUP_STORE_IMPLEMENTATIONS`` runs against +the same assertions. A future Redis-backed implementation only needs a +factory function and a row in the registry to gain full coverage. + +The cases below are the **observable** behavior the Protocol promises; +anything implementation-specific (LRU cap, eviction sweep, etc.) lives +in the implementation's own unit tests. +""" + +from __future__ import annotations + +import asyncio +from collections.abc import Callable + +import pytest + +from netcortex.contracts import DedupStore + +pytestmark = pytest.mark.asyncio + + +async def test_first_call_returns_true( + dedup_store_factory: Callable[[], DedupStore], +) -> None: + store = dedup_store_factory() + try: + assert await store.record_unless_duplicate("k1", ttl_seconds=1.0) is True + finally: + await store.close() + + +async def test_second_call_within_ttl_returns_false( + dedup_store_factory: Callable[[], DedupStore], +) -> None: + store = dedup_store_factory() + try: + assert await store.record_unless_duplicate("k1", ttl_seconds=5.0) is True + assert await store.record_unless_duplicate("k1", ttl_seconds=5.0) is False + finally: + await store.close() + + +async def test_different_keys_are_independent( + dedup_store_factory: Callable[[], DedupStore], +) -> None: + store = dedup_store_factory() + try: + assert await store.record_unless_duplicate("k1", ttl_seconds=5.0) is True + assert await store.record_unless_duplicate("k2", ttl_seconds=5.0) is True + # ``k1`` and ``k2`` did not collide. + assert await store.record_unless_duplicate("k1", ttl_seconds=5.0) is False + assert await store.record_unless_duplicate("k2", ttl_seconds=5.0) is False + finally: + await store.close() + + +async def test_call_after_ttl_expiry_returns_true( + dedup_store_factory: Callable[[], DedupStore], +) -> None: + """Once the TTL window closes, the key is treatable as new again.""" + store = dedup_store_factory() + try: + assert await store.record_unless_duplicate("k1", ttl_seconds=0.1) is True + # Sleep ~3x the TTL so the window has definitely closed even on + # heavily-loaded CI runners. Real-clock dependency is acceptable + # here because the TTL is short. + await asyncio.sleep(0.35) + assert await store.record_unless_duplicate("k1", ttl_seconds=0.1) is True + finally: + await store.close() + + +async def test_empty_key_rejected( + dedup_store_factory: Callable[[], DedupStore], +) -> None: + """An empty fact_key is almost always a publisher bug — fail loud.""" + store = dedup_store_factory() + try: + with pytest.raises(ValueError): + await store.record_unless_duplicate("", ttl_seconds=1.0) + finally: + await store.close() + + +@pytest.mark.parametrize("bad_ttl", [0.0, -1.0, -0.001]) +async def test_non_positive_ttl_rejected( + dedup_store_factory: Callable[[], DedupStore], + bad_ttl: float, +) -> None: + """A non-positive TTL is meaningless and likely a config error.""" + store = dedup_store_factory() + try: + with pytest.raises(ValueError): + await store.record_unless_duplicate("k1", ttl_seconds=bad_ttl) + finally: + await store.close() + + +async def test_close_is_idempotent( + dedup_store_factory: Callable[[], DedupStore], +) -> None: + store = dedup_store_factory() + await store.close() + await store.close() + + +async def test_use_after_close_raises( + dedup_store_factory: Callable[[], DedupStore], +) -> None: + """The Protocol requires post-close calls to raise RuntimeError.""" + store = dedup_store_factory() + await store.close() + with pytest.raises(RuntimeError): + await store.record_unless_duplicate("k1", ttl_seconds=1.0) + + +async def test_concurrent_callers_for_same_key_dedupe( + dedup_store_factory: Callable[[], DedupStore], +) -> None: + """The Protocol's atomicity guarantee — only one concurrent caller wins. + + Spawn many coroutines that all race on the same key simultaneously; + exactly one must observe ``True`` and the rest must observe ``False``. + A non-atomic implementation would let multiple racers all return True. + """ + store = dedup_store_factory() + try: + results = await asyncio.gather(*[ + store.record_unless_duplicate("contested", ttl_seconds=10.0) + for _ in range(50) + ]) + assert sum(1 for r in results if r is True) == 1 + assert sum(1 for r in results if r is False) == 49 + finally: + await store.close() diff --git a/tests/contracts/event_bus/in_memory.py b/tests/contracts/event_bus/in_memory.py index 8c58dcc..0201974 100644 --- a/tests/contracts/event_bus/in_memory.py +++ b/tests/contracts/event_bus/in_memory.py @@ -32,7 +32,19 @@ ) _LOG = logging.getLogger(__name__) -_VALID_PUBLISH_SUBJECT = re.compile(r"^[A-Za-z0-9_\-]+(?:\.[A-Za-z0-9_\-]+)*$") +# NATS subject grammar: dot-separated tokens where each token is one or +# more printable, non-whitespace characters EXCLUDING the three reserved +# meta-characters: '.' (token separator), '*' (single-token wildcard), +# and '>' (multi-token wildcard). This admits real-world identifier +# characters like ':' (MACs), '|' (compound NetCortex targets), '/' +# (interface names like Gi0/1), and '+' (URL-encoded payloads). +# +# Earlier revisions of this validator only allowed [A-Za-z0-9_-] which +# silently rejected publishes like +# 'sensory.link_down.snmp_trap.r1|Gi0/1', leading to subjects-look- +# correct-but-publish-throws bugs. Match the canonical taxonomy in +# docs/architecture/subjects.md. +_VALID_PUBLISH_SUBJECT = re.compile(r"^[^.\s*>]+(?:\.[^.\s*>]+)*$") def _compile_pattern(pattern: str) -> re.Pattern[str]: diff --git a/tests/contracts/test_subjects.py b/tests/contracts/test_subjects.py new file mode 100644 index 0000000..549d13b --- /dev/null +++ b/tests/contracts/test_subjects.py @@ -0,0 +1,140 @@ +"""Tests for the subject taxonomy module. + +These check the **validation** the builders perform — empty parts, +unknown event classes, unknown sources, embedded dots, whitespace — +because those are exactly the silent-but-wrong subject failures that +plague NATS-based systems when there is no centralized builder. The +closed-vocabulary checks are the contract that production publishers +will rely on starting in 0.8.0-dev4. +""" + +from __future__ import annotations + +import pytest + +from netcortex.contracts.subjects import ( + SENSORY_EVENT_CLASSES, + SENSORY_SOURCES, + fact_subject, + parse_sensory_subject, + sensory_subject, +) + + +# --------------------------------------------------------------------------- +# sensory_subject builder +# --------------------------------------------------------------------------- + + +def test_builds_well_formed_subject() -> None: + s = sensory_subject("link_down", "snmp_trap", "r1|Gi0/1") + assert s == "sensory.link_down.snmp_trap.r1|Gi0/1" + + +def test_supports_multi_token_targets() -> None: + s = sensory_subject( + "security_alert", "meraki_webhook", "N_1", "aa:bb:cc:dd:ee:ff" + ) + # Multi-token targets give downstream subscribers more wildcard + # options (e.g. sensory.security_alert.*.N_1.>). + assert s == "sensory.security_alert.meraki_webhook.N_1.aa:bb:cc:dd:ee:ff" + + +def test_rejects_unknown_event_class() -> None: + with pytest.raises(ValueError, match="unknown sensory event class"): + sensory_subject("not_an_event", "snmp_trap", "r1") + + +def test_rejects_unknown_source() -> None: + with pytest.raises(ValueError, match="unknown sensory source"): + sensory_subject("link_down", "carrier_pigeon", "r1") + + +def test_rejects_no_target_parts() -> None: + with pytest.raises(ValueError, match="at least one target part"): + sensory_subject("link_down", "snmp_trap") + + +def test_rejects_empty_target_part() -> None: + with pytest.raises(ValueError, match="empty"): + sensory_subject("link_down", "snmp_trap", "") + + +def test_rejects_dot_in_target_part() -> None: + """`.` is the NATS token separator — sneaking one in produces a + malformed subject with the wrong token count.""" + with pytest.raises(ValueError, match="separator"): + sensory_subject("link_down", "snmp_trap", "r1.Gi0/1") + + +def test_rejects_whitespace_in_target_part() -> None: + with pytest.raises(ValueError, match="whitespace"): + sensory_subject("link_down", "snmp_trap", "r1 problem") + + +# --------------------------------------------------------------------------- +# fact_subject builder +# --------------------------------------------------------------------------- + + +def test_fact_subject_basic() -> None: + s = fact_subject("link_down", "r1|Gi0/1") + assert s == "fact.link_down.r1|Gi0/1" + + +def test_fact_subject_rejects_unknown_class() -> None: + with pytest.raises(ValueError): + fact_subject("not_an_event", "r1") + + +def test_fact_subject_requires_target() -> None: + with pytest.raises(ValueError): + fact_subject("link_down") + + +# --------------------------------------------------------------------------- +# parse_sensory_subject +# --------------------------------------------------------------------------- + + +def test_parse_extracts_class_source_and_target() -> None: + assert parse_sensory_subject("sensory.link_down.snmp_trap.r1|Gi0/1") == ( + "link_down", + "snmp_trap", + "r1|Gi0/1", + ) + + +def test_parse_joins_multi_token_target() -> None: + assert parse_sensory_subject( + "sensory.security_alert.meraki_webhook.N_1.aabbccddeeff" + ) == ("security_alert", "meraki_webhook", "N_1.aabbccddeeff") + + +def test_parse_returns_empty_tuple_for_non_sensory() -> None: + assert parse_sensory_subject("fact.link_down.r1") == ("", "", "") + + +def test_parse_returns_empty_tuple_for_too_few_tokens() -> None: + assert parse_sensory_subject("sensory.link_down.snmp_trap") == ("", "", "") + + +# --------------------------------------------------------------------------- +# Vocabulary integrity +# --------------------------------------------------------------------------- + + +def test_all_event_classes_are_lower_snake_case() -> None: + """A taxonomy with mixed case becomes a footgun on case-sensitive + matching. Lock it down here.""" + for cls in SENSORY_EVENT_CLASSES: + assert cls == cls.lower(), f"event class {cls!r} must be lowercase" + assert " " not in cls, f"event class {cls!r} must not contain whitespace" + assert "." not in cls, f"event class {cls!r} must not contain '.'" + + +def test_all_sources_are_lower_snake_case() -> None: + for src in SENSORY_SOURCES: + assert src == src.lower(), f"source {src!r} must be lowercase" + assert " " not in src, f"source {src!r} must not contain whitespace" + assert "." not in src, f"source {src!r} must not contain '.'" diff --git a/tests/reflex/test_handlers.py b/tests/reflex/test_handlers.py index 49389ce..5661841 100644 --- a/tests/reflex/test_handlers.py +++ b/tests/reflex/test_handlers.py @@ -1,11 +1,19 @@ -"""Tests for the first-party reflex handlers. +"""Tests for the first-party reflex handlers (dev3 taxonomy). -These cover the per-handler outcome shape and the target-extraction -fallbacks. The runner-level dispatch is covered in ``test_runner.py``. +Covers: -Importing the handlers module side-registers them with the global -registry. We pin those identities here so a rename to a handler id — -which is an operator-facing field on every outcome — fails CI loudly. +* the operator-facing identity surface (handler id + subject pattern); +* per-handler outcome shape on the happy path; +* target-extraction fallbacks for partial / missing payload fields; +* dedup behavior when a :class:`DedupStore` is wired through the + :class:`ReflexContext`. + +The runner-level dispatch is covered separately in ``test_runner.py``. + +Importing ``netcortex.reflex.handlers`` side-registers all three first- +party handlers. We then read them back out of the registry by id; this +exercises the same import-for-side-effect path the runner uses in +production, rather than constructing fresh instances ourselves. """ from __future__ import annotations @@ -15,14 +23,14 @@ import pytest from netcortex.contracts.event_bus import EventMessage - -# Import the package so the handlers register themselves. We then read -# them back out of the registry by id rather than constructing fresh -# instances — that way a future change to the registration mechanics is -# automatically exercised by these tests too. -from netcortex.reflex import handlers as _handlers # noqa: F401 -from netcortex.reflex.protocol import ReflexHandler, ReflexOutcome +from netcortex.reflex import handlers as _handlers # noqa: F401 — side-effect import +from netcortex.reflex.protocol import ( + ReflexContext, + ReflexHandler, + ReflexOutcome, +) from netcortex.reflex.registry import get_handler +from netcortex.working.dedup import InMemoryDedupStore pytestmark = pytest.mark.asyncio @@ -36,6 +44,10 @@ def _event(subject: str, payload: dict[str, object]) -> EventMessage: ) +def _empty_ctx() -> ReflexContext: + return ReflexContext() + + # --------------------------------------------------------------------------- # Identity / wiring smoke tests # --------------------------------------------------------------------------- @@ -44,9 +56,9 @@ def _event(subject: str, payload: dict[str, object]) -> EventMessage: @pytest.mark.parametrize( "handler_id,expected_pattern", [ - ("link_down", "sensory.snmp.trap.link_down.>"), - ("security_webhook", "sensory.meraki.webhook.security.>"), - ("bgp_drop", "sensory.snmp.trap.bgp_backward_transition.>"), + ("link_down", "sensory.link_down.>"), + ("security_alert", "sensory.security_alert.>"), + ("bgp_drop", "sensory.bgp_drop.>"), ], ) async def test_handler_registered_with_expected_pattern( @@ -57,10 +69,8 @@ async def test_handler_registered_with_expected_pattern( A rename here is fine, but it MUST be intentional — production operators read these ids in alerts and on the reconciliation UI. - Declared ``async`` purely to match the module-level - ``pytestmark = pytest.mark.asyncio``; there is nothing to await - here. Splitting this into its own file just to avoid the marker - would be a worse trade-off. + Declared async to match the module-level ``pytestmark = + pytest.mark.asyncio`` (no await needed here). """ h = get_handler(handler_id) assert isinstance(h, ReflexHandler) @@ -75,25 +85,29 @@ async def test_handler_registered_with_expected_pattern( async def test_link_down_extracts_device_and_interface() -> None: h = get_handler("link_down") - outcome = await h.handle(_event( - "sensory.snmp.trap.link_down.r1", - {"device_id": "r1", "interface": "Gi0/1"}, - )) + outcome = await h.handle( + _event( + "sensory.link_down.snmp_trap.r1", + {"device_id": "r1", "interface": "Gi0/1"}, + ), + _empty_ctx(), + ) assert outcome is not None assert outcome.handler == "link_down" - assert outcome.target == "r1" + assert outcome.target == "r1|Gi0/1" assert outcome.severity == "high" assert outcome.payload["interface"] == "Gi0/1" + assert outcome.payload["source"] == "snmp_trap" assert outcome.outcome == "logged" async def test_link_down_handles_missing_target_field() -> None: - """No device field at all — outcome.target is None, not a stringified ``None``.""" + """No device field at all — outcome.target is None.""" h = get_handler("link_down") - outcome = await h.handle(_event( - "sensory.snmp.trap.link_down.unknown", - {"interface": "Gi0/1"}, - )) + outcome = await h.handle( + _event("sensory.link_down.snmp_trap.unknown", {"interface": "Gi0/1"}), + _empty_ctx(), + ) assert outcome is not None assert outcome.target is None @@ -102,13 +116,114 @@ async def test_link_down_caps_upstream_keys() -> None: """A pathologically wide payload doesn't blow up the outcome record.""" h = get_handler("link_down") wide = {f"k{i}": i for i in range(100)} - outcome = await h.handle(_event("sensory.snmp.trap.link_down.r1", wide)) + outcome = await h.handle( + _event("sensory.link_down.snmp_trap.r1", wide), _empty_ctx() + ) assert outcome is not None assert len(outcome.payload["upstream_keys"]) <= 16 +async def test_link_down_dedups_across_sources() -> None: + """The whole point of dev3: trap + webhook + poll on same target dedup.""" + h = get_handler("link_down") + store = InMemoryDedupStore() + ctx = ReflexContext(dedup_store=store) + try: + # First arrival: SNMP trap → fires + out1 = await h.handle( + _event( + "sensory.link_down.snmp_trap.r1", + {"device_id": "r1", "interface": "Gi0/1"}, + ), + ctx, + ) + assert out1 is not None + assert out1.outcome == "logged" + + # Same physical event arrives via Meraki webhook ~50ms later + out2 = await h.handle( + _event( + "sensory.link_down.meraki_webhook.r1", + {"device_id": "r1", "interface": "Gi0/1"}, + ), + ctx, + ) + assert out2 is not None + assert out2.outcome == "skipped" + assert "duplicate" in out2.rationale.lower() + # Severity demoted on the skipped outcome. + assert out2.severity == "info" + + # And via the SNMP poll diff + out3 = await h.handle( + _event( + "sensory.link_down.snmp_poll.r1", + {"device_id": "r1", "interface": "Gi0/1"}, + ), + ctx, + ) + assert out3 is not None + assert out3.outcome == "skipped" + finally: + await store.close() + + +async def test_link_down_does_not_dedup_different_targets() -> None: + """Different (device, interface) tuples are independent facts.""" + h = get_handler("link_down") + store = InMemoryDedupStore() + ctx = ReflexContext(dedup_store=store) + try: + out1 = await h.handle( + _event( + "sensory.link_down.snmp_trap.r1", + {"device_id": "r1", "interface": "Gi0/1"}, + ), + ctx, + ) + out2 = await h.handle( + _event( + "sensory.link_down.snmp_trap.r1", + {"device_id": "r1", "interface": "Gi0/2"}, + ), + ctx, + ) + out3 = await h.handle( + _event( + "sensory.link_down.snmp_trap.r2", + {"device_id": "r2", "interface": "Gi0/1"}, + ), + ctx, + ) + assert out1 is not None and out1.outcome == "logged" + assert out2 is not None and out2.outcome == "logged" + assert out3 is not None and out3.outcome == "logged" + finally: + await store.close() + + +async def test_link_down_skips_dedup_when_target_unknown() -> None: + """Without a canonical target we can't meaningfully dedup.""" + h = get_handler("link_down") + store = InMemoryDedupStore() + ctx = ReflexContext(dedup_store=store) + try: + # No device_id / interface — handler returns logged outcome twice, + # because there's no fact_key to dedup against. + out1 = await h.handle( + _event("sensory.link_down.snmp_trap.unknown", {}), ctx + ) + out2 = await h.handle( + _event("sensory.link_down.snmp_trap.unknown", {}), ctx + ) + assert out1 is not None and out1.outcome == "logged" + assert out2 is not None and out2.outcome == "logged" + finally: + await store.close() + + # --------------------------------------------------------------------------- -# security_webhook +# security_alert # --------------------------------------------------------------------------- @@ -127,43 +242,124 @@ async def test_link_down_caps_upstream_keys() -> None: ("CRITICAL", "critical"), ], ) -async def test_security_webhook_severity_mapping( +async def test_security_alert_severity_mapping( upstream: str, expected: str ) -> None: - h = get_handler("security_webhook") - outcome = await h.handle(_event( - "sensory.meraki.webhook.security.ids_alerted", - {"severity": upstream, "alertId": "abc-123", - "networkId": "N_1", "clientMac": "aa:bb:cc:dd:ee:ff", - "alertType": "ids_alerted"}, - )) + h = get_handler("security_alert") + outcome = await h.handle( + _event( + "sensory.security_alert.meraki_webhook.N_1|aabbccddeeff", + { + "severity": upstream, + "alertId": "abc-123", + "networkId": "N_1", + "clientMac": "aa:bb:cc:dd:ee:ff", + "alertType": "ids_alerted", + }, + ), + _empty_ctx(), + ) assert outcome is not None assert outcome.severity == expected -async def test_security_webhook_prefers_client_mac_as_target() -> None: - h = get_handler("security_webhook") - outcome = await h.handle(_event( - "sensory.meraki.webhook.security.ids_alerted", - {"clientMac": "aa:bb:cc:dd:ee:ff", - "deviceSerial": "Q2XX-YYYY-ZZZZ", - "networkId": "N_1", - "severity": "warning"}, - )) +async def test_security_alert_prefers_client_mac_as_target() -> None: + h = get_handler("security_alert") + outcome = await h.handle( + _event( + "sensory.security_alert.meraki_webhook.N_1|aabbccddeeff", + { + "clientMac": "aa:bb:cc:dd:ee:ff", + "deviceSerial": "Q2XX-YYYY-ZZZZ", + "networkId": "N_1", + "severity": "warning", + }, + ), + _empty_ctx(), + ) assert outcome is not None assert outcome.target == "aa:bb:cc:dd:ee:ff" -async def test_security_webhook_falls_back_to_device_serial() -> None: - h = get_handler("security_webhook") - outcome = await h.handle(_event( - "sensory.meraki.webhook.security.malware_detected", - {"deviceSerial": "Q2XX-YYYY-ZZZZ", "networkId": "N_1"}, - )) +async def test_security_alert_falls_back_to_device_serial() -> None: + h = get_handler("security_alert") + outcome = await h.handle( + _event( + "sensory.security_alert.meraki_webhook.N_1|Q2XX-YYYY-ZZZZ", + {"deviceSerial": "Q2XX-YYYY-ZZZZ", "networkId": "N_1"}, + ), + _empty_ctx(), + ) assert outcome is not None assert outcome.target == "Q2XX-YYYY-ZZZZ" +async def test_security_alert_dedups_repeated_meraki_retries() -> None: + """Meraki retries the same alert; we collapse to one outcome.""" + h = get_handler("security_alert") + store = InMemoryDedupStore() + ctx = ReflexContext(dedup_store=store) + try: + payload = { + "clientMac": "aa:bb:cc:dd:ee:ff", + "networkId": "N_1", + "severity": "warning", + "alertId": "retry-test", + "alertType": "ids_alerted", + } + # First delivery + out1 = await h.handle( + _event( + "sensory.security_alert.meraki_webhook.N_1|aabbccddeeff", + payload, + ), + ctx, + ) + # Second delivery (Meraki retry) + out2 = await h.handle( + _event( + "sensory.security_alert.meraki_webhook.N_1|aabbccddeeff", + payload, + ), + ctx, + ) + assert out1 is not None and out1.outcome == "logged" + assert out2 is not None and out2.outcome == "skipped" + finally: + await store.close() + + +async def test_security_alert_different_event_types_dont_dedup() -> None: + """Two distinct alert types on the same client are different facts.""" + h = get_handler("security_alert") + store = InMemoryDedupStore() + ctx = ReflexContext(dedup_store=store) + try: + base_payload = { + "clientMac": "aa:bb:cc:dd:ee:ff", + "networkId": "N_1", + "severity": "warning", + } + out1 = await h.handle( + _event( + "sensory.security_alert.meraki_webhook.N_1|aabbccddeeff", + {**base_payload, "alertType": "ids_alerted"}, + ), + ctx, + ) + out2 = await h.handle( + _event( + "sensory.security_alert.meraki_webhook.N_1|aabbccddeeff", + {**base_payload, "alertType": "malware_detected"}, + ), + ctx, + ) + assert out1 is not None and out1.outcome == "logged" + assert out2 is not None and out2.outcome == "logged" + finally: + await store.close() + + # --------------------------------------------------------------------------- # bgp_drop # --------------------------------------------------------------------------- @@ -172,63 +368,99 @@ async def test_security_webhook_falls_back_to_device_serial() -> None: async def test_bgp_drop_composes_session_target() -> None: """device+peer -> ``device|peer`` canonical session id.""" h = get_handler("bgp_drop") - outcome = await h.handle(_event( - "sensory.snmp.trap.bgp_backward_transition.r1", - {"device_id": "r1", "peer": "10.0.1.5", - "peer_asn": 65001, "last_state": "Established"}, - )) + outcome = await h.handle( + _event( + "sensory.bgp_drop.snmp_trap.r1|10.0.1.5", + { + "device_id": "r1", + "peer": "10.0.1.5", + "peer_asn": 65001, + "last_state": "Established", + }, + ), + _empty_ctx(), + ) assert outcome is not None assert outcome.target == "r1|10.0.1.5" assert outcome.severity == "high" assert outcome.payload["peer_asn"] == 65001 assert outcome.payload["last_state"] == "Established" + assert outcome.payload["source"] == "snmp_trap" async def test_bgp_drop_target_falls_back_to_peer_only() -> None: h = get_handler("bgp_drop") - outcome = await h.handle(_event( - "sensory.snmp.trap.bgp_backward_transition.unknown", - {"peer": "10.0.1.5"}, - )) + outcome = await h.handle( + _event( + "sensory.bgp_drop.snmp_trap.unknown|10.0.1.5", + {"peer": "10.0.1.5"}, + ), + _empty_ctx(), + ) assert outcome is not None assert outcome.target == "10.0.1.5" async def test_bgp_drop_target_none_when_no_identifiers() -> None: h = get_handler("bgp_drop") - outcome = await h.handle(_event( - "sensory.snmp.trap.bgp_backward_transition.unknown", - {"last_state": "Established"}, - )) + outcome = await h.handle( + _event( + "sensory.bgp_drop.snmp_trap.unknown", + {"last_state": "Established"}, + ), + _empty_ctx(), + ) assert outcome is not None assert outcome.target is None +async def test_bgp_drop_dedups_trap_and_gnmi_for_same_session() -> None: + h = get_handler("bgp_drop") + store = InMemoryDedupStore() + ctx = ReflexContext(dedup_store=store) + try: + out1 = await h.handle( + _event( + "sensory.bgp_drop.snmp_trap.r1|10.0.1.5", + {"device_id": "r1", "peer": "10.0.1.5"}, + ), + ctx, + ) + # Same session, observed via gNMI a moment later + out2 = await h.handle( + _event( + "sensory.bgp_drop.gnmi_dialout.r1|10.0.1.5", + {"device_id": "r1", "peer": "10.0.1.5"}, + ), + ctx, + ) + assert out1 is not None and out1.outcome == "logged" + assert out2 is not None and out2.outcome == "skipped" + finally: + await store.close() + + # --------------------------------------------------------------------------- # All-handler invariants # --------------------------------------------------------------------------- @pytest.mark.parametrize( - "handler_id", - ["link_down", "security_webhook", "bgp_drop"], + "handler_id,sample_subject", + [ + ("link_down", "sensory.link_down.snmp_trap.test"), + ("security_alert", "sensory.security_alert.meraki_webhook.test"), + ("bgp_drop", "sensory.bgp_drop.snmp_trap.test"), + ], ) async def test_handler_returns_frozen_outcome_with_required_fields( - handler_id: str, + handler_id: str, sample_subject: str, ) -> None: - """Every handler must produce a well-formed :class:`ReflexOutcome`. - - Catches the common bug of forgetting to set ``severity`` or returning - a dict instead of a dataclass. - """ + """Every handler must produce a well-formed :class:`ReflexOutcome`.""" h = get_handler(handler_id) - outcome = await h.handle(_event( - # Use a subject that matches each handler's pattern hierarchically. - # We don't strictly need this — handle() doesn't re-validate the - # subject — but it makes the test inputs realistic. - f"sensory.test.invocation.{handler_id}", - {"target": "test-target"}, - )) + outcome = await h.handle( + _event(sample_subject, {"target": "test-target"}), _empty_ctx() + ) assert outcome is not None assert isinstance(outcome, ReflexOutcome) assert outcome.handler == handler_id diff --git a/tests/reflex/test_registry.py b/tests/reflex/test_registry.py index 00f656a..c50f6af 100644 --- a/tests/reflex/test_registry.py +++ b/tests/reflex/test_registry.py @@ -14,7 +14,7 @@ import pytest from netcortex.contracts.event_bus import EventMessage -from netcortex.reflex.protocol import ReflexOutcome +from netcortex.reflex.protocol import ReflexContext, ReflexOutcome from netcortex.reflex.registry import ( DuplicateHandlerError, all_handlers, @@ -27,11 +27,13 @@ class _StubHandler: """Minimal structural ReflexHandler used by these tests.""" - def __init__(self, hid: str, pattern: str = "test.>") -> None: + def __init__(self, hid: str, pattern: str = "sensory.test.test_src.test") -> None: self.id: Final[str] = hid self.pattern: Final[str] = pattern - async def handle(self, event: EventMessage) -> ReflexOutcome | None: + async def handle( + self, event: EventMessage, ctx: ReflexContext + ) -> ReflexOutcome | None: return ReflexOutcome( handler=self.id, subject=event.subject, @@ -63,7 +65,6 @@ def _isolated_registry(): def test_register_and_lookup() -> None: h = _StubHandler("alpha") returned = register_handler(h) - # Decorator-style usage returns the handler unchanged. assert returned is h assert get_handler("alpha") is h assert list(all_handlers()) == [h] @@ -81,19 +82,11 @@ def test_register_duplicate_id_raises() -> None: register_handler(_StubHandler("dup")) with pytest.raises(DuplicateHandlerError) as ei: register_handler(_StubHandler("dup")) - # Error message names BOTH handler classes so the operator can find - # the offending file quickly. We do not pin the exact message; only - # that the registered id appears in it. assert "dup" in str(ei.value) def test_register_same_instance_is_idempotent() -> None: - """Re-registering the exact same instance is a no-op. - - Modules can be imported twice in the same process (test suite - isolation, plugin auto-discovery), so the registry treats an - identical re-register as the harmless event it is. - """ + """Re-registering the exact same instance is a no-op.""" h = _StubHandler("same") register_handler(h) register_handler(h) @@ -104,7 +97,6 @@ def test_register_rejects_non_handler() -> None: """Registration is type-checked at registration time.""" class NotAHandler: - # Missing handle(), pattern, id. pass with pytest.raises(TypeError): diff --git a/tests/reflex/test_runner.py b/tests/reflex/test_runner.py index 01bb033..4c53cc7 100644 --- a/tests/reflex/test_runner.py +++ b/tests/reflex/test_runner.py @@ -5,6 +5,12 @@ same Protocol, so passing here means the runner will behave identically against the production NATS backend (verified by the contract suite that NATS satisfies that Protocol). + +In dev3 the runner threads a :class:`ReflexContext` to every handler +call. These tests pin the wiring: default-context path (no context +supplied → empty context), explicit-context path (with a dedup store +that the handler can consult), and the unchanged failure-isolation / +lifecycle behavior. """ from __future__ import annotations @@ -16,8 +22,9 @@ import pytest from netcortex.contracts.event_bus import EventBus, EventMessage -from netcortex.reflex.protocol import ReflexOutcome +from netcortex.reflex.protocol import ReflexContext, ReflexOutcome from netcortex.reflex.runner import ReflexRunner +from netcortex.working.dedup import InMemoryDedupStore from tests.contracts.event_bus.in_memory import InMemoryEventBus pytestmark = pytest.mark.asyncio @@ -28,15 +35,23 @@ def _bus() -> EventBus: class _RecordingHandler: - """Handler that records every event it sees for test introspection.""" + """Handler that records every event it sees for test introspection. + + Also records the context it received so tests can confirm the runner + threaded a non-default context through. + """ def __init__(self, hid: str, pattern: str) -> None: self.id: Final[str] = hid self.pattern: Final[str] = pattern self.seen: list[EventMessage] = [] + self.contexts: list[ReflexContext] = [] - async def handle(self, event: EventMessage) -> ReflexOutcome | None: + async def handle( + self, event: EventMessage, ctx: ReflexContext + ) -> ReflexOutcome | None: self.seen.append(event) + self.contexts.append(ctx) return ReflexOutcome( handler=self.id, subject=event.subject, @@ -51,9 +66,11 @@ class _RaisingHandler: """Handler that raises — used to verify per-handler isolation.""" id: Final[str] = "boom" - pattern: Final[str] = "sensory.boom.>" + pattern: Final[str] = "sensory.link_down.test_src.boom" - async def handle(self, event: EventMessage) -> ReflexOutcome | None: + async def handle( + self, event: EventMessage, ctx: ReflexContext + ) -> ReflexOutcome | None: raise RuntimeError("simulated handler failure") @@ -61,9 +78,11 @@ class _SkippingHandler: """Handler that returns ``None`` — used to verify None-skips-recording.""" id: Final[str] = "skip" - pattern: Final[str] = "sensory.skip.>" + pattern: Final[str] = "sensory.link_up.test_src.skip" - async def handle(self, event: EventMessage) -> ReflexOutcome | None: + async def handle( + self, event: EventMessage, ctx: ReflexContext + ) -> ReflexOutcome | None: return None @@ -79,13 +98,13 @@ async def _wait_for(predicate: Any, timeout: float = 1.5) -> None: async def test_dispatches_event_to_matching_handler() -> None: bus = _bus() - handler = _RecordingHandler("link_down", "sensory.snmp.trap.link_down.>") + handler = _RecordingHandler("link_down", "sensory.link_down.>") runner = ReflexRunner(bus, handlers=[handler]) await runner.start() try: - await asyncio.sleep(0.05) # let subscription register + await asyncio.sleep(0.05) await bus.publish( - "sensory.snmp.trap.link_down.r1", + "sensory.link_down.snmp_trap.r1|Gi0/1", {"interface": "Gi0/1", "target": "r1"}, ) await _wait_for(lambda: len(handler.seen) == 1) @@ -93,7 +112,7 @@ async def test_dispatches_event_to_matching_handler() -> None: await runner.stop() await bus.close() - assert handler.seen[0].subject == "sensory.snmp.trap.link_down.r1" + assert handler.seen[0].subject == "sensory.link_down.snmp_trap.r1|Gi0/1" assert handler.seen[0].payload == {"interface": "Gi0/1", "target": "r1"} assert len(runner.outcomes) == 1 outcome = runner.outcomes[0] @@ -103,16 +122,14 @@ async def test_dispatches_event_to_matching_handler() -> None: async def test_pattern_filters_out_non_matching_events() -> None: bus = _bus() - handler = _RecordingHandler("only_a", "sensory.a.>") + handler = _RecordingHandler("only_link_down", "sensory.link_down.>") runner = ReflexRunner(bus, handlers=[handler]) await runner.start() try: await asyncio.sleep(0.05) - await bus.publish("sensory.a.event", {"i": 0}) - await bus.publish("sensory.b.event", {"i": 1}) # must NOT match - await bus.publish("sensory.a.deeper.event", {"i": 2}) - # Give the bus time to deliver any non-matching events too (and - # have them be filtered out, not silently delivered). + await bus.publish("sensory.link_down.snmp_trap.r1", {"i": 0}) + await bus.publish("sensory.link_up.snmp_trap.r1", {"i": 1}) # not match + await bus.publish("sensory.link_down.meraki_webhook.r2", {"i": 2}) await _wait_for(lambda: len(handler.seen) == 2) finally: await runner.stop() @@ -124,13 +141,13 @@ async def test_pattern_filters_out_non_matching_events() -> None: async def test_multiple_handlers_fan_out() -> None: """One event whose subject matches two handlers reaches both.""" bus = _bus() - a = _RecordingHandler("a", "sensory.shared.>") - b = _RecordingHandler("b", "sensory.shared.>") + a = _RecordingHandler("a", "sensory.link_down.>") + b = _RecordingHandler("b", "sensory.link_down.>") runner = ReflexRunner(bus, handlers=[a, b]) await runner.start() try: await asyncio.sleep(0.05) - await bus.publish("sensory.shared.event", {"i": 0}) + await bus.publish("sensory.link_down.snmp_trap.r1", {"i": 0}) await _wait_for(lambda: len(a.seen) == 1 and len(b.seen) == 1) finally: await runner.stop() @@ -145,8 +162,8 @@ async def test_handler_exception_does_not_kill_dispatcher() -> None: await runner.start() try: await asyncio.sleep(0.05) - await bus.publish("sensory.boom.event", {"n": 1}) - await bus.publish("sensory.boom.event", {"n": 2}) + await bus.publish("sensory.link_down.test_src.boom", {"n": 1}) + await bus.publish("sensory.link_down.test_src.boom", {"n": 2}) await _wait_for(lambda: len(runner.outcomes) == 2) finally: await runner.stop() @@ -154,7 +171,6 @@ async def test_handler_exception_does_not_kill_dispatcher() -> None: assert all(o.outcome == "errored" for o in runner.outcomes) assert all("RuntimeError" in o.rationale for o in runner.outcomes) - # Diagnostic carries a traceback for debugging. assert "traceback" in runner.outcomes[0].diagnostic assert ( "simulated handler failure" in runner.outcomes[0].diagnostic["traceback"] @@ -169,9 +185,7 @@ async def test_none_outcome_is_not_recorded() -> None: await runner.start() try: await asyncio.sleep(0.05) - await bus.publish("sensory.skip.event", {}) - # Wait long enough that, if an outcome were going to be recorded, - # it would have been. Then assert it was not. + await bus.publish("sensory.link_up.test_src.skip", {}) await asyncio.sleep(0.2) finally: await runner.stop() @@ -182,12 +196,11 @@ async def test_none_outcome_is_not_recorded() -> None: async def test_start_is_idempotent() -> None: bus = _bus() - handler = _RecordingHandler("x", "sensory.x.>") + handler = _RecordingHandler("x", "sensory.link_down.>") runner = ReflexRunner(bus, handlers=[handler]) await runner.start() - await runner.start() # second call must be a no-op + await runner.start() try: - # The handler list is one task, not two. assert len([t for t in runner._tasks if not t.done()]) == 1 # noqa: SLF001 finally: await runner.stop() @@ -197,18 +210,18 @@ async def test_start_is_idempotent() -> None: async def test_stop_is_idempotent() -> None: bus = _bus() runner = ReflexRunner( - bus, handlers=[_RecordingHandler("x", "sensory.x.>")] + bus, handlers=[_RecordingHandler("x", "sensory.link_down.>")] ) await runner.start() await runner.stop() - await runner.stop() # second call must be a no-op + await runner.stop() await bus.close() async def test_stop_without_start_is_safe() -> None: """Calling stop on a runner that never started is a no-op.""" bus = _bus() - runner = ReflexRunner(bus, handlers=[_RecordingHandler("x", "sensory.x.>")]) + runner = ReflexRunner(bus, handlers=[_RecordingHandler("x", "sensory.link_down.>")]) await runner.stop() await bus.close() @@ -221,13 +234,11 @@ async def test_runner_enumerates_registry_when_no_handlers_given() -> None: register_handler, ) - # Snapshot the production registry so we don't leak the cleared state - # to sibling test files that depend on the first-party handlers. snapshot = list(all_handlers()) clear_registry() try: - a = _RecordingHandler("reg-a", "sensory.reg.>") - b = _RecordingHandler("reg-b", "sensory.reg.>") + a = _RecordingHandler("reg-a", "sensory.link_down.>") + b = _RecordingHandler("reg-b", "sensory.link_up.>") register_handler(a) register_handler(b) bus = _bus() @@ -239,3 +250,47 @@ async def test_runner_enumerates_registry_when_no_handlers_given() -> None: clear_registry() for h in snapshot: register_handler(h) + + +# --------------------------------------------------------------------------- +# ReflexContext threading (dev3) +# --------------------------------------------------------------------------- + + +async def test_default_context_has_no_dedup_store() -> None: + """Runners built without an explicit context still pass one through.""" + bus = _bus() + handler = _RecordingHandler("ctx_default", "sensory.link_down.>") + runner = ReflexRunner(bus, handlers=[handler]) + await runner.start() + try: + await asyncio.sleep(0.05) + await bus.publish("sensory.link_down.snmp_trap.r1", {}) + await _wait_for(lambda: len(handler.contexts) == 1) + finally: + await runner.stop() + await bus.close() + + assert isinstance(handler.contexts[0], ReflexContext) + assert handler.contexts[0].dedup_store is None + + +async def test_explicit_context_is_threaded_to_handler() -> None: + """A context passed to ReflexRunner reaches every handle() call.""" + bus = _bus() + handler = _RecordingHandler("ctx_explicit", "sensory.link_down.>") + store = InMemoryDedupStore() + ctx = ReflexContext(dedup_store=store) + runner = ReflexRunner(bus, handlers=[handler], context=ctx) + try: + assert runner.context is ctx + await runner.start() + await asyncio.sleep(0.05) + await bus.publish("sensory.link_down.snmp_trap.r1", {}) + await _wait_for(lambda: len(handler.contexts) == 1) + assert handler.contexts[0] is ctx + assert handler.contexts[0].dedup_store is store + finally: + await runner.stop() + await store.close() + await bus.close() diff --git a/tests/working/__init__.py b/tests/working/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/working/dedup/__init__.py b/tests/working/dedup/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/working/dedup/test_in_memory.py b/tests/working/dedup/test_in_memory.py new file mode 100644 index 0000000..2e9cdbf --- /dev/null +++ b/tests/working/dedup/test_in_memory.py @@ -0,0 +1,149 @@ +"""Implementation-specific tests for :class:`InMemoryDedupStore`. + +The Protocol-level behavior (atomic check-and-record, TTL expiry, +empty-key rejection, etc.) is covered by the contract suite in +``tests/contracts/dedup_store/``. This module covers the bits that are +**specific** to the in-memory implementation: LRU eviction at the size +cap, lazy expired-entry sweep, controllable-clock fast-forward, the +constructor validation, and the ``size()`` helper. + +The use of a controllable monotonic clock is the key idea: we don't want +unit tests to depend on real-clock ``asyncio.sleep`` for the dedup-window +logic (slow + flaky), so the store accepts a ``clock`` callable that the +tests advance manually. +""" + +from __future__ import annotations + +import pytest + +from netcortex.working.dedup import InMemoryDedupStore + +pytestmark = pytest.mark.asyncio + + +class _FakeClock: + """Monotonically-non-decreasing time source we drive from the test.""" + + def __init__(self, start: float = 1000.0) -> None: + self.now = start + + def __call__(self) -> float: + return self.now + + def advance(self, seconds: float) -> None: + if seconds < 0: + raise ValueError("clock must not move backward") + self.now += seconds + + +async def test_constructor_rejects_non_positive_max_entries() -> None: + with pytest.raises(ValueError): + InMemoryDedupStore(max_entries=0) + with pytest.raises(ValueError): + InMemoryDedupStore(max_entries=-5) + + +async def test_size_starts_at_zero_and_tracks_inserts() -> None: + store = InMemoryDedupStore() + try: + assert store.size() == 0 + await store.record_unless_duplicate("a", ttl_seconds=10) + await store.record_unless_duplicate("b", ttl_seconds=10) + assert store.size() == 2 + # Duplicate insert does not grow size. + await store.record_unless_duplicate("a", ttl_seconds=10) + assert store.size() == 2 + finally: + await store.close() + + +async def test_fake_clock_drives_ttl_expiry_without_real_sleep() -> None: + """Whole point of accepting a clock parameter — fast deterministic tests.""" + clk = _FakeClock() + store = InMemoryDedupStore(clock=clk) + try: + assert await store.record_unless_duplicate("k", ttl_seconds=60) is True + clk.advance(30) + # Still within TTL — dup. + assert await store.record_unless_duplicate("k", ttl_seconds=60) is False + clk.advance(31) + # Now past TTL — fresh again. + assert await store.record_unless_duplicate("k", ttl_seconds=60) is True + finally: + await store.close() + + +async def test_lru_eviction_when_cap_exceeded() -> None: + """At capacity, the LRU entry is evicted to make room for a new one. + + The "least recently used" definition includes duplicate hits: a key + that gets hit (even as a dup) is moved to the tail. This test checks + that touching ``oldest`` mid-flow rescues it from eviction. + """ + clk = _FakeClock() + store = InMemoryDedupStore(max_entries=3, clock=clk) + try: + await store.record_unless_duplicate("oldest", ttl_seconds=1000) + await store.record_unless_duplicate("middle", ttl_seconds=1000) + await store.record_unless_duplicate("newest", ttl_seconds=1000) + assert store.size() == 3 + + # Touch "oldest" — dup hit — should move it to the tail. + assert ( + await store.record_unless_duplicate("oldest", ttl_seconds=1000) + is False + ) + + # Insert a fourth — should evict "middle" (now the LRU), not "oldest". + await store.record_unless_duplicate("fourth", ttl_seconds=1000) + assert store.size() == 3 + + # "middle" gone — a fresh insert should succeed. + assert ( + await store.record_unless_duplicate("middle", ttl_seconds=1000) + is True + ) + # "oldest" should still be deduped (was rescued). + assert ( + await store.record_unless_duplicate("oldest", ttl_seconds=1000) + is False + ) + finally: + await store.close() + + +async def test_expired_entries_are_swept_lazily() -> None: + """Stale head-of-LRU entries are reclaimed when new inserts happen. + + We don't promise eager cleanup — the sweep runs when ``record`` is + called, bounded so worst-case latency stays predictable. After many + inserts, expired-and-stale entries should be gone. + """ + clk = _FakeClock() + store = InMemoryDedupStore(clock=clk) + try: + # Plant some short-TTL keys, then age them all out. + for i in range(10): + await store.record_unless_duplicate(f"old{i}", ttl_seconds=1.0) + assert store.size() == 10 + + clk.advance(60) + + # New inserts trigger the sweep; the old keys should drop out + # after enough inserts to exhaust the per-call sweep budget. + for i in range(40): + await store.record_unless_duplicate(f"new{i}", ttl_seconds=10) + + # Only "new*" keys remain (40 of them, all within TTL). + assert store.size() == 40 + finally: + await store.close() + + +async def test_close_clears_state() -> None: + store = InMemoryDedupStore() + await store.record_unless_duplicate("k1", ttl_seconds=10) + assert store.size() == 1 + await store.close() + assert store.size() == 0