From c201b543839232b2ceaa415c14f9ed3e55f4aceb Mon Sep 17 00:00:00 2001 From: zz_y Date: Tue, 26 May 2026 10:08:45 -0600 Subject: [PATCH 1/2] feat(deltas): add CountMin/CountSketch/HLL delta proto + compute_delta/apply_delta_bytes Extends the delta-encoding support previously added for DDSketch to three more sketch families so the runtime can emit and apply sparse deltas for them. Per family: - CountMinSketch: additive cell deltas (signed sint64 on the wire; CMS counters only grow) carried as packed cell_rows/cell_cols/d_counts arrays plus full per-row L1/L2 norm deltas. - CountSketch: signed cell deltas carried as packed arrays plus per-row L2 norm deltas; heavy-hitter candidate keys (hh_keys) are forwarded when an upstream tracker provides them. - HyperLogLog: lossless register max-updates -- a register's new value when it increased, applied via max-merge on the receiver. No update is ever dropped (there is no threshold to apply). Each family gains a *Delta proto message (mirroring the Go reference implementation's field numbers and wire encoding exactly), regenerated prost bindings, and compute_delta(snapshot, threshold) + apply_delta_bytes(&[u8]) on its portable wrapper. The emitted delta bytes are byte-identical to the Go reference implementation's delta output for identical input (cross-language byte parity), pinned by per-family golden tests. These deltas carry only sketch-internal state (cells / register updates), so there are no DataPoint-level metric scalars to drop. Tests: per-family round-trip (build -> compute_delta against a snapshot -> apply_delta_bytes -> reconstructs) and a byte-parity assertion against a golden captured from the Go reference implementation. Co-Authored-By: Claude Opus 4.7 (1M context) --- proto/countminsketch/countminsketch.proto | 45 ++++ proto/countsketch/countsketch.proto | 49 ++++ proto/hll/hll.proto | 23 ++ .../portable/countminsketch.rs | 207 +++++++++++++++- .../portable/countsketch.rs | 223 +++++++++++++++++- src/message_pack_format/portable/hll.rs | 155 +++++++++++- src/proto/generated/sketchlib.v1.rs | 149 ++++++++++++ 7 files changed, 845 insertions(+), 6 deletions(-) diff --git a/proto/countminsketch/countminsketch.proto b/proto/countminsketch/countminsketch.proto index c3be044..95cf902 100644 --- a/proto/countminsketch/countminsketch.proto +++ b/proto/countminsketch/countminsketch.proto @@ -57,3 +57,48 @@ message CountMinState { reserved 10 to 15; } + +// ============================================================================ +// Count-Min Sketch Delta +// ============================================================================ + +// CountMinDelta carries only the matrix cells that changed by at least the +// threshold between two consecutive snapshots, plus the full per-row L1/L2 +// norm deltas (one entry per row, negligible size). +// +// Cells apply additively on the receiver: matrix[row][col] += d_count. CMS +// counters only ever grow, but the delta encodes a signed count (sint64) so +// the same wire form covers weighted/decay variants. +// +// The packed cell encoding (cell_rows / cell_cols / d_counts, tags 9-11) is +// the canonical form; the legacy per-cell message (tag 3) is retained only so +// payloads from older producers still decode. This message is byte-identical +// to the Go reference implementation's CountMinDelta so the two runtimes emit +// byte-identical delta frames for identical window state (cross-language byte +// parity). +message CountMinDelta { + uint32 rows = 1; + uint32 cols = 2; + // Deprecated: use cell_rows/cell_cols/d_counts (tags 9-11). Retained for + // backward-compatible decode of payloads from older producers. + repeated CountMinCell cells_legacy = 3; + // Per-row L1 norm deltas, length = rows. + repeated double l1 = 4 [packed = true]; + // Per-row L2 norm deltas, length = rows. + repeated double l2 = 5 [packed = true]; + // Packed cell encoding (canonical form): + repeated uint32 cell_rows = 9 [packed = true]; // row index of each changed cell + repeated uint32 cell_cols = 10 [packed = true]; // col index of each changed cell + repeated sint64 d_counts = 11 [packed = true]; // signed integer count delta +} + +// CountMinCell is the deprecated per-cell delta record. Producers emit the +// packed cell arrays on CountMinDelta instead; this message exists only to +// decode payloads from older producers. +message CountMinCell { + uint32 row = 1; + uint32 col = 2; + double d_count = 3; // deprecated: use CountMinDelta.d_counts + double d_sum = 4; // deprecated: omitted in the packed encoding + double d_sum2 = 5; // deprecated: omitted in the packed encoding +} diff --git a/proto/countsketch/countsketch.proto b/proto/countsketch/countsketch.proto index 8195922..ae92110 100644 --- a/proto/countsketch/countsketch.proto +++ b/proto/countsketch/countsketch.proto @@ -72,3 +72,52 @@ message HeapEntry { reserved 3; // Reserved for bytes_key alternative. } + +// ============================================================================ +// Count Sketch Delta +// ============================================================================ + +// CountSketchDelta carries only the matrix cells that changed by at least the +// threshold between two consecutive snapshots, plus the full per-row L2 norm +// deltas and any heavy-hitter candidate keys. +// +// Cells apply additively on the receiver: matrix[row][col] += d_count. Count +// Sketch counters are signed (±1 increments), so d_count is a signed integer +// (sint64). Heavy-hitter candidate keys (hh_keys) are re-queried against the +// merged matrix downstream to rebuild the receiver's Top-K with accurate, +// globally-merged estimates. +// +// The packed cell encoding (cell_rows / cell_cols / d_counts, tags 9-11) plus +// l2 (tag 12) is the canonical form; the legacy per-cell message (tag 3), +// l2_legacy (tag 4) and topk (tag 5) are retained only so payloads from older +// producers still decode. This message is byte-identical to the Go reference +// implementation's CountSketchDelta so the two runtimes emit byte-identical +// delta frames for identical window state (cross-language byte parity). +message CountSketchDelta { + uint32 rows = 1; + uint32 cols = 2; + // Deprecated: use cell_rows/cell_cols/d_counts (tags 9-11). Retained for + // backward-compatible decode of payloads from older producers. + repeated CountSketchCell cells_legacy = 3; + // Deprecated: use l2 (tag 12) instead. + repeated double l2_legacy = 4 [packed = true]; + // Deprecated: use hh_keys (tag 6) instead. + TopKState topk = 5; + // Heavy-hitter candidate keys from the upstream tracker. Downstream queries + // the merged Count Sketch matrix for each key to build its Top-K. + repeated string hh_keys = 6; + // Packed cell encoding (canonical form): + repeated uint32 cell_rows = 9 [packed = true]; // row index of each changed cell + repeated uint32 cell_cols = 10 [packed = true]; // col index of each changed cell + repeated sint64 d_counts = 11 [packed = true]; // signed integer count delta + repeated double l2 = 12 [packed = true]; // per-row L2 norm deltas +} + +// CountSketchCell is the deprecated per-cell delta record. Producers emit the +// packed cell arrays on CountSketchDelta instead; this message exists only to +// decode payloads from older producers. +message CountSketchCell { + uint32 row = 1; + uint32 col = 2; + double d_count = 3; // deprecated: use CountSketchDelta.d_counts +} diff --git a/proto/hll/hll.proto b/proto/hll/hll.proto index 39eb3d7..6a835dd 100644 --- a/proto/hll/hll.proto +++ b/proto/hll/hll.proto @@ -97,3 +97,26 @@ enum HLLVariant { // Requires hip_kxq0, hip_kxq1, hip_est to be populated. HLL_VARIANT_HIP = 3; } + +// ============================================================================ +// HyperLogLog Delta +// ============================================================================ + +// HLLDelta carries the registers that increased between two consecutive +// snapshots. HLL uses max semantics, so only increases are meaningful; +// at a fixed precision a register can never decrease. Each update is applied +// losslessly on the receiver via register[index] = max(register[index], value) +// — no register update is ever dropped (there is no threshold to apply). +// +// This message is byte-identical to the Go reference implementation's HLLDelta +// so the two runtimes emit byte-identical delta frames for identical window +// state (cross-language byte parity). +message HLLDelta { + repeated HLLRegisterUpdate updates = 1; +} + +// HLLRegisterUpdate is one register whose value increased. +message HLLRegisterUpdate { + uint32 index = 1; // register index, 0 – 2^precision-1 + uint32 value = 2; // new value (only sent when > snapshot value) +} diff --git a/src/message_pack_format/portable/countminsketch.rs b/src/message_pack_format/portable/countminsketch.rs index cf2bf4e..12a46d1 100644 --- a/src/message_pack_format/portable/countminsketch.rs +++ b/src/message_pack_format/portable/countminsketch.rs @@ -243,6 +243,134 @@ impl CountMinSketch { Ok(()) } + /// Compute a sparse, proto-marshalled `CountMinDelta` of `self` + /// against a `snapshot`. A cell is included when its absolute count + /// delta `|Δcount|` (self − snapshot) is `>= threshold` and non-zero. + /// The full per-row L1/L2 norm deltas are always carried (one entry + /// per row, negligible size). + /// + /// This is the Rust twin of the Go reference implementation's + /// `ComputeDelta` + `SerializeDelta`: it iterates the matrix in + /// row-major order, subtracts the snapshot's value for each cell, and + /// emits the surviving cell deltas using the packed-array proto + /// encoding (`cell_rows`/`cell_cols`/`d_counts`). L1/L2 row deltas are + /// derived from the matrices — `l1[r] = Σ_c count[r][c]` and + /// `l2[r] = Σ_c count[r][c]^2` — which telescopes to the same value the + /// Go producer maintains incrementally. The returned bytes are a + /// `prost`-encoded [`crate::proto::sketchlib::CountMinDelta`], + /// byte-identical to the Go `proto.Marshal(CountMinDelta)` output for + /// the same inputs (cross-language byte parity). + /// + /// Delta-against-empty: when `snapshot` is the all-zero sketch, every + /// surviving cell delta equals this window's own cell count, so the + /// result is this window's full state encoded as a delta (no + /// cross-window subtraction). CMS deltas carry only sketch-internal + /// cells/norms — there are no DataPoint-level metric scalars to drop. + pub fn compute_delta( + &self, + snapshot: &CountMinSketch, + threshold: f64, + ) -> Result, Box> { + use crate::proto::sketchlib::CountMinDelta as ProtoDelta; + use prost::Message; + + if self.rows != snapshot.rows || self.cols != snapshot.cols { + return Err(format!( + "CountMinSketch dimension mismatch: self={}x{}, snapshot={}x{}", + self.rows, self.cols, snapshot.rows, snapshot.cols + ) + .into()); + } + + let cur = self.sketch(); + let snap = snapshot.sketch(); + + let mut cell_rows: Vec = Vec::new(); + let mut cell_cols: Vec = Vec::new(); + let mut d_counts: Vec = Vec::new(); + let mut l1: Vec = Vec::with_capacity(self.rows); + let mut l2: Vec = Vec::with_capacity(self.rows); + + for r in 0..self.rows { + let mut cur_l1 = 0.0f64; + let mut cur_l2 = 0.0f64; + let mut snap_l1 = 0.0f64; + let mut snap_l2 = 0.0f64; + for c in 0..self.cols { + let cv = cur[r][c]; + let sv = snap[r][c]; + cur_l1 += cv; + cur_l2 += cv * cv; + snap_l1 += sv; + snap_l2 += sv * sv; + // CMS counts are non-negative integers in the wire form; + // mirror the Go reference's signed Δ + |Δ| threshold test. + let dc = (cv - sv) as i64; + if dc != 0 && (dc.unsigned_abs() as f64) >= threshold { + cell_rows.push(r as u32); + cell_cols.push(c as u32); + d_counts.push(dc); + } + } + l1.push(cur_l1 - snap_l1); + l2.push(cur_l2 - snap_l2); + } + + let delta = ProtoDelta { + rows: self.rows as u32, + cols: self.cols as u32, + cells_legacy: Vec::new(), + l1, + l2, + cell_rows, + cell_cols, + d_counts, + }; + Ok(delta.encode_to_vec()) + } + + /// Apply a `prost`-encoded [`crate::proto::sketchlib::CountMinDelta`] + /// to this sketch in place (additive cell merge). The Rust twin of the + /// Go reference implementation's `DeserializeDelta` + `ApplyDelta`. + /// Reads the packed cell arrays (`cell_rows`/`cell_cols`/`d_counts`) + /// and falls back to the legacy per-cell records for payloads from + /// older producers. + /// + /// Returns `Err` if `bytes` is not a valid `CountMinDelta` proto or a + /// cell is out of range for this sketch's dimensions. + pub fn apply_delta_bytes( + &mut self, + bytes: &[u8], + ) -> Result<(), Box> { + use crate::proto::sketchlib::CountMinDelta as ProtoDelta; + use prost::Message; + + let proto = ProtoDelta::decode(bytes)?; + let cells: Vec<(u32, u32, i64)> = if !proto.cell_rows.is_empty() { + proto + .cell_rows + .iter() + .zip(proto.cell_cols.iter()) + .zip(proto.d_counts.iter()) + .map(|((&r, &c), &d)| (r, c, d)) + .collect() + } else { + proto + .cells_legacy + .iter() + .map(|c| (c.row, c.col, c.d_count as i64)) + .collect() + }; + let delta = CountMinSketchDelta { + rows: proto.rows, + cols: proto.cols, + cells, + l1: proto.l1, + l2: proto.l2, + }; + self.apply_delta(&delta) + } + /// One-shot aggregation: build a sketch from parallel key/value slices /// and return the msgpack bytes. pub fn aggregate_count( @@ -484,7 +612,7 @@ mod tests { #[test] fn test_update_then_envelope_matches_sketchlib_go_bytes() { use crate::proto::sketchlib::{ - CountMinState, CounterType, SketchEnvelope, sketch_envelope::SketchState, + sketch_envelope::SketchState, CountMinState, CounterType, SketchEnvelope, }; use prost::Message; @@ -585,4 +713,81 @@ mod tests { _ => panic!("non-hex byte {}", c as char), } } + + /// `compute_delta` against an EMPTY snapshot reconstructs the window's + /// full state when its bytes are applied to a fresh empty sketch + /// (round-trip). With `threshold = 1.0` every changed cell survives, so + /// applying the delta to a zero base yields the same matrix as the + /// original window. + #[test] + fn test_compute_delta_against_empty_round_trips() { + let rows = 4usize; + let cols = 2048usize; + let mut window = CountMinSketch::new(rows, cols); + for i in 0..200u64 { + window.update(&format!("flow-{}", i % 37), 1.0); + } + let empty = CountMinSketch::new(rows, cols); + + let delta_bytes = window.compute_delta(&empty, 1.0).unwrap(); + + let mut reconstructed = CountMinSketch::new(rows, cols); + reconstructed.apply_delta_bytes(&delta_bytes).unwrap(); + + assert_eq!(reconstructed.sketch(), window.sketch()); + } + + /// A delta computed between two non-empty snapshots reconstructs the + /// current sketch when applied to the base — matching a direct merge of + /// the cell-wise difference. + #[test] + fn test_compute_delta_then_apply_matches_current() { + let mut base = CountMinSketch::new(2, 64); + for i in 0..40u64 { + base.update(&format!("k{}", i % 8), 1.0); + } + let mut current = base.clone(); + for i in 0..30u64 { + current.update(&format!("k{}", i % 8), 1.0); + } + + let delta_bytes = current.compute_delta(&base, 1.0).unwrap(); + let mut reconstructed = base.clone(); + reconstructed.apply_delta_bytes(&delta_bytes).unwrap(); + assert_eq!(reconstructed.sketch(), current.sketch()); + } + + /// Cross-language byte-parity guard: `compute_delta` against an empty + /// snapshot must emit bytes identical to the Go reference + /// implementation's `SerializeDelta(ComputeDelta(empty, current, 1.0))` + /// for the same `(rows=4, cols=2048)` × "flow-0".."flow-9" (each 5×, + /// 50 unweighted updates) input. The golden hex was captured from a + /// `proto.Marshal` of the Go reference's `CountMinDelta` (packed-array + /// encoding) for that input. A delta-against-empty carries the window's + /// full state, so this also pins the per-row L1 (=50) / L2 (=250) norm + /// deltas and the packed cell arrays. + #[test] + fn test_compute_delta_matches_go_golden_bytes() { + let rows = 4usize; + let cols = 2048usize; + let mut current = CountMinSketch::new(rows, cols); + for i in 0..50u64 { + current.update(&format!("flow-{}", i % 10), 1.0); + } + let empty = CountMinSketch::new(rows, cols); + let got = current.compute_delta(&empty, 1.0).unwrap(); + + // Captured from the Go reference implementation's + // SerializeDelta(ComputeDelta(empty, current, 1.0)) for the same input. + const GOLDEN_HEX: &str = "0804108010222000000000000049400000000000004940000000000000494000000000000049402a200000000000406f400000000000406f400000000000406f400000000000406f404a28000000000000000000000101010101010101010102020202020202020202030303030303030303035250c101b202b608b7088809bc09c20dc60d910fbb0f9f018d04a604c004d6058507a909920b900ec70ead038304bc058007c207cc0a820bd50cbe0fcd0fe2019e02cd02e702a707b809a00ac20cee0dfe0f5a280a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a"; + let want = decode_hex_cms(GOLDEN_HEX); + assert_eq!( + got, + want, + "CountMin delta bytes diverge from the Go reference golden \ + ({} bytes got vs {} bytes want)", + got.len(), + want.len(), + ); + } } diff --git a/src/message_pack_format/portable/countsketch.rs b/src/message_pack_format/portable/countsketch.rs index 359e1fa..af8ca9e 100644 --- a/src/message_pack_format/portable/countsketch.rs +++ b/src/message_pack_format/portable/countsketch.rs @@ -140,8 +140,8 @@ impl CountSketch { .iter() .enumerate() .min_by(|a, b| { - a.1.1 - .partial_cmp(&b.1.1) + a.1 .1 + .partial_cmp(&b.1 .1) .unwrap_or(std::cmp::Ordering::Equal) }) .map(|(i, e)| (i, e.1)) @@ -274,6 +274,147 @@ impl CountSketch { Ok(()) } + /// Compute a sparse, proto-marshalled `CountSketchDelta` of `self` + /// against a `snapshot`. A cell is included when its signed count + /// delta `Δcount` (self − snapshot) is non-zero and its magnitude is + /// `>= threshold` (the threshold is truncated to an integer, matching + /// the Go reference). The full per-row L2 norm deltas are always + /// carried (one entry per row). + /// + /// This is the Rust twin of the Go reference implementation's + /// `ComputeDelta` + `SerializeDelta`: it iterates the matrix in + /// row-major order, subtracts the snapshot's value for each cell, and + /// emits the surviving signed cell deltas using the packed-array proto + /// encoding (`cell_rows`/`cell_cols`/`d_counts`). L2 row deltas are + /// derived from the matrices — `l2[r] = Σ_c count[r][c]^2` — matching + /// the Go producer's incrementally-maintained norm. Heavy-hitter + /// candidate keys (`hh_keys`) are sourced from an upstream Space-Saving + /// tracker that this minimal wrapper does not maintain, so the field is + /// left empty (an empty repeated field encodes to nothing on the wire). + /// The returned bytes are a `prost`-encoded + /// [`crate::proto::sketchlib::CountSketchDelta`], byte-identical to the + /// Go `proto.Marshal(CountSketchDelta)` output for the same inputs when + /// no heavy-hitter keys are forwarded (cross-language byte parity). + /// + /// Delta-against-empty: when `snapshot` is the all-zero sketch, every + /// surviving cell delta equals this window's own (signed) cell count, + /// so the result is this window's full state encoded as a delta (no + /// cross-window subtraction). CountSketch deltas carry only + /// sketch-internal cells/norms — there are no DataPoint-level metric + /// scalars to drop. + pub fn compute_delta( + &self, + snapshot: &CountSketch, + threshold: f64, + ) -> Result, Box> { + use crate::proto::sketchlib::CountSketchDelta as ProtoDelta; + use prost::Message; + + if self.rows != snapshot.rows || self.cols != snapshot.cols { + return Err(format!( + "CountSketch dimension mismatch: self={}x{}, snapshot={}x{}", + self.rows, self.cols, snapshot.rows, snapshot.cols + ) + .into()); + } + + // Threshold is compared against |Δ| after truncation to an + // integer, mirroring the Go reference's `int64(threshold)` cast. + let thresh = threshold as i64; + + let mut cell_rows: Vec = Vec::new(); + let mut cell_cols: Vec = Vec::new(); + let mut d_counts: Vec = Vec::new(); + let mut l2: Vec = Vec::with_capacity(self.rows); + + for r in 0..self.rows { + let mut cur_l2 = 0.0f64; + let mut snap_l2 = 0.0f64; + for c in 0..self.cols { + let cv = self.matrix[r][c]; + let sv = snapshot.matrix[r][c]; + cur_l2 += cv * cv; + snap_l2 += sv * sv; + let dc = (cv - sv) as i64; + if dc != 0 && (dc <= -thresh || dc >= thresh) { + cell_rows.push(r as u32); + cell_cols.push(c as u32); + d_counts.push(dc); + } + } + l2.push(cur_l2 - snap_l2); + } + + let delta = ProtoDelta { + rows: self.rows as u32, + cols: self.cols as u32, + cells_legacy: Vec::new(), + l2_legacy: Vec::new(), + topk: None, + hh_keys: Vec::new(), + cell_rows, + cell_cols, + d_counts, + l2, + }; + Ok(delta.encode_to_vec()) + } + + /// Apply a `prost`-encoded [`crate::proto::sketchlib::CountSketchDelta`] + /// to this sketch in place (additive signed-cell merge + heavy-hitter + /// rebuild). The Rust twin of the Go reference implementation's + /// `DeserializeDelta` + `ApplyDelta`. Reads the packed cell arrays + /// (`cell_rows`/`cell_cols`/`d_counts`) and the heavy-hitter keys + /// (`hh_keys`), falling back to the legacy per-cell records and TopK + /// entries for payloads from older producers. + /// + /// Returns `Err` if `bytes` is not a valid `CountSketchDelta` proto or + /// a cell is out of range for this sketch's dimensions. + pub fn apply_delta_bytes( + &mut self, + bytes: &[u8], + ) -> Result<(), Box> { + use crate::proto::sketchlib::CountSketchDelta as ProtoDelta; + use prost::Message; + + let proto = ProtoDelta::decode(bytes)?; + let cells: Vec<(u32, u32, i64)> = if !proto.cell_rows.is_empty() { + proto + .cell_rows + .iter() + .zip(proto.cell_cols.iter()) + .zip(proto.d_counts.iter()) + .map(|((&r, &c), &d)| (r, c, d)) + .collect() + } else { + proto + .cells_legacy + .iter() + .map(|c| (c.row, c.col, c.d_count as i64)) + .collect() + }; + let l2 = if !proto.l2.is_empty() { + proto.l2 + } else { + proto.l2_legacy + }; + let hh_keys = if !proto.hh_keys.is_empty() { + proto.hh_keys + } else if let Some(topk) = proto.topk { + topk.entries.into_iter().map(|e| e.key).collect() + } else { + Vec::new() + }; + let delta = CountSketchDelta { + rows: proto.rows, + cols: proto.cols, + cells, + l2, + hh_keys, + }; + self.apply_delta(&delta) + } + /// Merge a slice of references into a single new sketch. All inputs /// must share the same dimensions; returns `Err` on mismatch or an /// empty input. @@ -494,7 +635,7 @@ mod tests { #[test] fn test_update_then_envelope_matches_sketchlib_go_bytes() { use crate::proto::sketchlib::{ - CountSketchState, CounterType, SketchEnvelope, sketch_envelope::SketchState, + sketch_envelope::SketchState, CountSketchState, CounterType, SketchEnvelope, }; use prost::Message; @@ -578,4 +719,80 @@ mod tests { _ => panic!("non-hex byte {}", c as char), } } + + /// `compute_delta` against an EMPTY snapshot reconstructs the window's + /// full (signed) state when its bytes are applied to a fresh empty + /// sketch (round-trip). With `threshold = 1.0` every changed cell + /// survives. + #[test] + fn test_compute_delta_against_empty_round_trips() { + let rows = 3usize; + let cols = 512usize; + let mut window = CountSketch::new(rows, cols); + for i in 0..200u64 { + window.update(&format!("k-{}", i % 23), 1.0); + } + let empty = CountSketch::new(rows, cols); + + let delta_bytes = window.compute_delta(&empty, 1.0).unwrap(); + + let mut reconstructed = CountSketch::new(rows, cols); + reconstructed.apply_delta_bytes(&delta_bytes).unwrap(); + + assert_eq!(reconstructed.sketch(), window.sketch()); + } + + /// A delta computed between two non-empty snapshots reconstructs the + /// current sketch when applied to the base. + #[test] + fn test_compute_delta_then_apply_matches_current() { + let mut base = CountSketch::new(3, 64); + for i in 0..40u64 { + base.update(&format!("x{}", i % 8), 1.0); + } + let mut current = base.clone(); + for i in 0..30u64 { + current.update(&format!("x{}", i % 8), 1.0); + } + + let delta_bytes = current.compute_delta(&base, 1.0).unwrap(); + let mut reconstructed = base.clone(); + reconstructed.apply_delta_bytes(&delta_bytes).unwrap(); + assert_eq!(reconstructed.sketch(), current.sketch()); + } + + /// Cross-language byte-parity guard: `compute_delta` against an empty + /// snapshot must emit bytes identical to the Go reference + /// implementation's `SerializeDelta(ComputeDelta(empty, current, 1.0))` + /// for the same `(rows=3, cols=512)` × "k-a".."k-e" (each 5×, 25 + /// unweighted updates) input. The golden hex was captured from a + /// `proto.Marshal` of the Go reference's `CountSketchDelta` + /// (packed-array encoding). No heavy-hitter keys are forwarded by this + /// minimal wrapper, so `hh_keys` is empty in both producers; the golden + /// pins the packed cell arrays and the per-row L2 (=125) norm deltas. + #[test] + fn test_compute_delta_matches_go_golden_bytes() { + let rows = 3usize; + let cols = 512usize; + let mut current = CountSketch::new(rows, cols); + for i in 0..25 { + let key = format!("k-{}", (b'a' + (i % 5) as u8) as char); + current.update(&key, 1.0); + } + let empty = CountSketch::new(rows, cols); + let got = current.compute_delta(&empty, 1.0).unwrap(); + + // Captured from the Go reference implementation's + // SerializeDelta(ComputeDelta(empty, current, 1.0)) for the same input. + const GOLDEN_HEX: &str = "08031080044a0f000000000001010101010202020202521a3d7fd9019402d3034eb9019602f703fe0306a501c601d201ba025a0f090a0909090a090a090a09090a090962180000000000405f400000000000405f400000000000405f40"; + let want = decode_hex(GOLDEN_HEX); + assert_eq!( + got, + want, + "CountSketch delta bytes diverge from the Go reference golden \ + ({} bytes got vs {} bytes want)", + got.len(), + want.len(), + ); + } } diff --git a/src/message_pack_format/portable/hll.rs b/src/message_pack_format/portable/hll.rs index 2a66fb3..3e38acd 100644 --- a/src/message_pack_format/portable/hll.rs +++ b/src/message_pack_format/portable/hll.rs @@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize}; use crate::message_pack_format::{Error as MsgPackError, MessagePackCodec}; -use crate::{CANONICAL_HASH_SEED, DataInput, hash64_seeded}; +use crate::{hash64_seeded, DataInput, CANONICAL_HASH_SEED}; /// HLL estimator variant. Mirrors `asap_sketchlib::proto::sketchlib::HllVariant`. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] @@ -131,6 +131,85 @@ impl HllSketch { Ok(()) } + /// Compute a sparse, proto-marshalled `HLLDelta` of `self` against a + /// `snapshot`. A register update is included when `self`'s register + /// value increased over the snapshot's (`self[i] > snapshot[i]`); its + /// carried value is the new (larger) register value. + /// + /// This is the Rust twin of the Go reference implementation's + /// `ComputeRegisterDelta` + `SerializeRegisterDelta`. HLL uses max + /// semantics, so only increases are meaningful and the delta is + /// **lossless** — every increased register is carried, regardless of + /// `threshold` (the parameter is accepted for a uniform delta API and + /// has no effect here; an HLL register update is never dropped). The + /// returned bytes are a `prost`-encoded + /// [`crate::proto::sketchlib::HllDelta`], byte-identical to the Go + /// `proto.Marshal(HLLDelta)` output for the same inputs (cross-language + /// byte parity). + /// + /// Delta-against-empty: when `snapshot` is the all-zero sketch, every + /// non-zero register of `self` is carried, so the result is this + /// window's full register state encoded as a delta (no cross-window + /// subtraction). HLL deltas carry only register max-updates — there are + /// no DataPoint-level metric scalars to drop. + pub fn compute_delta(&self, snapshot: &HllSketch, _threshold: u64) -> Vec { + use crate::proto::sketchlib::{HllDelta as ProtoDelta, HllRegisterUpdate}; + use prost::Message; + + let cur = &self.registers; + let snap = &snapshot.registers; + let n = cur.len().min(snap.len()); + + let mut updates: Vec = Vec::new(); + for i in 0..n { + if cur[i] > snap[i] { + updates.push(HllRegisterUpdate { + index: i as u32, + value: cur[i] as u32, + }); + } + } + // Guard: if `self` has more registers than the snapshot (should not + // happen at a fixed precision), carry all non-zero extras. Matches + // the Go reference's trailing-register guard. + for (i, &v) in cur.iter().enumerate().take(cur.len()).skip(n) { + if v > 0 { + updates.push(HllRegisterUpdate { + index: i as u32, + value: v as u32, + }); + } + } + + ProtoDelta { updates }.encode_to_vec() + } + + /// Apply a `prost`-encoded [`crate::proto::sketchlib::HllDelta`] to this + /// sketch in place (register max-merge). The Rust twin of the Go + /// reference implementation's `DeserializeRegisterDelta` + + /// `ApplyRegisterDelta`: each update sets + /// `register[index] = max(register[index], value)`. + /// + /// Returns `Err` if `bytes` is not a valid `HLLDelta` proto or a + /// register index is out of range for this sketch's precision. + pub fn apply_delta_bytes( + &mut self, + bytes: &[u8], + ) -> Result<(), Box> { + use crate::proto::sketchlib::HllDelta as ProtoDelta; + use prost::Message; + + let proto = ProtoDelta::decode(bytes)?; + let delta = HllSketchDelta { + updates: proto + .updates + .into_iter() + .map(|u| (u.index, u.value as u8)) + .collect(), + }; + self.apply_delta(&delta) + } + pub fn merge_refs( inputs: &[&HllSketch], ) -> Result> { @@ -450,7 +529,7 @@ mod tests { #[test] fn test_update_then_envelope_matches_sketchlib_go_bytes() { use crate::proto::sketchlib::{ - HyperLogLogState, SketchEnvelope, sketch_envelope::SketchState, + sketch_envelope::SketchState, HyperLogLogState, SketchEnvelope, }; use prost::Message; @@ -590,4 +669,76 @@ mod tests { }; assert!(decode_sparse_registers(&sp).is_err()); } + + /// `compute_delta` against an EMPTY snapshot reconstructs the window's + /// full register state when its bytes are applied to a fresh empty + /// sketch (register max-merge round-trip). HLL deltas are lossless — + /// every increased register is carried. + #[test] + fn test_compute_delta_against_empty_round_trips() { + let mut window = HllSketch::new(HllVariant::Datafusion, 14); + for i in 0..5000u64 { + window.update(&i.to_le_bytes()); + } + let empty = HllSketch::new(HllVariant::Datafusion, 14); + + let delta_bytes = window.compute_delta(&empty, 0); + + let mut reconstructed = HllSketch::new(HllVariant::Datafusion, 14); + reconstructed.apply_delta_bytes(&delta_bytes).unwrap(); + + assert_eq!(reconstructed.registers, window.registers); + } + + /// A delta computed between two non-empty snapshots reconstructs the + /// current sketch when applied to the base (max-merge of the registers + /// that increased). + #[test] + fn test_compute_delta_then_apply_matches_current() { + let mut base = HllSketch::new(HllVariant::Datafusion, 12); + for i in 0..2000u64 { + base.update(&i.to_le_bytes()); + } + let mut current = base.clone(); + for i in 2000..5000u64 { + current.update(&i.to_le_bytes()); + } + + let delta_bytes = current.compute_delta(&base, 0); + let mut reconstructed = base.clone(); + reconstructed.apply_delta_bytes(&delta_bytes).unwrap(); + assert_eq!(reconstructed.registers, current.registers); + } + + /// Cross-language byte-parity guard: `compute_delta` against an empty + /// snapshot must emit bytes identical to the Go reference + /// implementation's `SerializeRegisterDelta(ComputeRegisterDelta(empty, + /// current))` for the same precision-14 sketch fed `(1..=50)` as f64 + /// little-endian byte values. A delta-against-empty carries every + /// non-zero register as an `(index, value)` update; the golden hex was + /// captured from a `proto.Marshal` of the Go reference's `HLLDelta`. + #[test] + fn test_compute_delta_matches_go_golden_bytes() { + let mut current = HllSketch::new(HllVariant::Datafusion, 14); + for i in 1..=50i32 { + let v = i as f64; + current.update(&v.to_le_bytes()); + } + let empty = HllSketch::new(HllVariant::Datafusion, 14); + let got = current.compute_delta(&empty, 0); + + // Captured from the Go reference implementation's + // SerializeRegisterDelta(ComputeRegisterDelta(empty, current)) for the + // same input. + const GOLDEN_HEX: &str = "0a04085810010a0508930510020a0508931110010a0508e31110010a0508fa1510010a0508d71910010a0508881b10010a0508ba2310010a0508ec2310010a0508d62410010a0508ff2510020a0508ae2610010a0508b22810020a0508bb2810020a0508dc3210020a0508f63510020a0508eb3610010a0508b23910040a0508da3910040a0508853a10050a0508f14110020a0508974310020a05089c4510050a0508de4810020a0508b64910020a0508c74910010a0508c54d10010a0508ed4d10020a05088d4e10020a0508b35210030a0508805410020a0508e75a10020a0508f85a10060a0508835b10030a0508b75b10020a0508c25b10060a0508de5c10010a0508dd6310010a0508876410010a0508e56410050a0508e66610010a0508d66710040a0508e86710010a0508e06c10010a0508877110040a0508ad7210010a0508e97510010a0508877710030a0508d37910010a0508db7b1003"; + let want = decode_hex(GOLDEN_HEX); + assert_eq!( + got, + want, + "HLL delta bytes diverge from the Go reference golden \ + ({} bytes got vs {} bytes want)", + got.len(), + want.len(), + ); + } } diff --git a/src/proto/generated/sketchlib.v1.rs b/src/proto/generated/sketchlib.v1.rs index 8e6843d..7969920 100644 --- a/src/proto/generated/sketchlib.v1.rs +++ b/src/proto/generated/sketchlib.v1.rs @@ -198,6 +198,67 @@ pub struct CountMinState { #[prost(double, repeated, tag = "9")] pub l2: ::prost::alloc::vec::Vec, } +/// CountMinDelta carries only the matrix cells that changed by at least the +/// threshold between two consecutive snapshots, plus the full per-row L1/L2 +/// norm deltas (one entry per row, negligible size). +/// +/// Cells apply additively on the receiver: matrix[row][col] += d_count. CMS +/// counters only ever grow, but the delta encodes a signed count (sint64) so +/// the same wire form covers weighted/decay variants. +/// +/// The packed cell encoding (cell_rows / cell_cols / d_counts, tags 9-11) is +/// the canonical form; the legacy per-cell message (tag 3) is retained only so +/// payloads from older producers still decode. This message is byte-identical +/// to the Go reference implementation's CountMinDelta so the two runtimes emit +/// byte-identical delta frames for identical window state (cross-language byte +/// parity). +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CountMinDelta { + #[prost(uint32, tag = "1")] + pub rows: u32, + #[prost(uint32, tag = "2")] + pub cols: u32, + /// Deprecated: use cell_rows/cell_cols/d_counts (tags 9-11). Retained for + /// backward-compatible decode of payloads from older producers. + #[prost(message, repeated, tag = "3")] + pub cells_legacy: ::prost::alloc::vec::Vec, + /// Per-row L1 norm deltas, length = rows. + #[prost(double, repeated, tag = "4")] + pub l1: ::prost::alloc::vec::Vec, + /// Per-row L2 norm deltas, length = rows. + #[prost(double, repeated, tag = "5")] + pub l2: ::prost::alloc::vec::Vec, + /// Packed cell encoding (canonical form): + /// + /// row index of each changed cell + #[prost(uint32, repeated, tag = "9")] + pub cell_rows: ::prost::alloc::vec::Vec, + /// col index of each changed cell + #[prost(uint32, repeated, tag = "10")] + pub cell_cols: ::prost::alloc::vec::Vec, + /// signed integer count delta + #[prost(sint64, repeated, tag = "11")] + pub d_counts: ::prost::alloc::vec::Vec, +} +/// CountMinCell is the deprecated per-cell delta record. Producers emit the +/// packed cell arrays on CountMinDelta instead; this message exists only to +/// decode payloads from older producers. +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct CountMinCell { + #[prost(uint32, tag = "1")] + pub row: u32, + #[prost(uint32, tag = "2")] + pub col: u32, + /// deprecated: use CountMinDelta.d_counts + #[prost(double, tag = "3")] + pub d_count: f64, + /// deprecated: omitted in the packed encoding + #[prost(double, tag = "4")] + pub d_sum: f64, + /// deprecated: omitted in the packed encoding + #[prost(double, tag = "5")] + pub d_sum2: f64, +} /// CountSketchState is the portable state of a Count (±1) Sketch. /// /// The counter matrix is signed because Count Sketch uses ±1 increments. @@ -254,6 +315,70 @@ pub struct HeapEntry { #[prost(double, tag = "2")] pub count: f64, } +/// CountSketchDelta carries only the matrix cells that changed by at least the +/// threshold between two consecutive snapshots, plus the full per-row L2 norm +/// deltas and any heavy-hitter candidate keys. +/// +/// Cells apply additively on the receiver: matrix[row][col] += d_count. Count +/// Sketch counters are signed (±1 increments), so d_count is a signed integer +/// (sint64). Heavy-hitter candidate keys (hh_keys) are re-queried against the +/// merged matrix downstream to rebuild the receiver's Top-K with accurate, +/// globally-merged estimates. +/// +/// The packed cell encoding (cell_rows / cell_cols / d_counts, tags 9-11) plus +/// l2 (tag 12) is the canonical form; the legacy per-cell message (tag 3), +/// l2_legacy (tag 4) and topk (tag 5) are retained only so payloads from older +/// producers still decode. This message is byte-identical to the Go reference +/// implementation's CountSketchDelta so the two runtimes emit byte-identical +/// delta frames for identical window state (cross-language byte parity). +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CountSketchDelta { + #[prost(uint32, tag = "1")] + pub rows: u32, + #[prost(uint32, tag = "2")] + pub cols: u32, + /// Deprecated: use cell_rows/cell_cols/d_counts (tags 9-11). Retained for + /// backward-compatible decode of payloads from older producers. + #[prost(message, repeated, tag = "3")] + pub cells_legacy: ::prost::alloc::vec::Vec, + /// Deprecated: use l2 (tag 12) instead. + #[prost(double, repeated, tag = "4")] + pub l2_legacy: ::prost::alloc::vec::Vec, + /// Deprecated: use hh_keys (tag 6) instead. + #[prost(message, optional, tag = "5")] + pub topk: ::core::option::Option, + /// Heavy-hitter candidate keys from the upstream tracker. Downstream queries + /// the merged Count Sketch matrix for each key to build its Top-K. + #[prost(string, repeated, tag = "6")] + pub hh_keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + /// Packed cell encoding (canonical form): + /// + /// row index of each changed cell + #[prost(uint32, repeated, tag = "9")] + pub cell_rows: ::prost::alloc::vec::Vec, + /// col index of each changed cell + #[prost(uint32, repeated, tag = "10")] + pub cell_cols: ::prost::alloc::vec::Vec, + /// signed integer count delta + #[prost(sint64, repeated, tag = "11")] + pub d_counts: ::prost::alloc::vec::Vec, + /// per-row L2 norm deltas + #[prost(double, repeated, tag = "12")] + pub l2: ::prost::alloc::vec::Vec, +} +/// CountSketchCell is the deprecated per-cell delta record. Producers emit the +/// packed cell arrays on CountSketchDelta instead; this message exists only to +/// decode payloads from older producers. +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct CountSketchCell { + #[prost(uint32, tag = "1")] + pub row: u32, + #[prost(uint32, tag = "2")] + pub col: u32, + /// deprecated: use CountSketchDelta.d_counts + #[prost(double, tag = "3")] + pub d_count: f64, +} /// HyperLogLogState is the portable state of a HyperLogLog cardinality sketch. /// /// Precision p determines the register count: num_registers = 2^precision. @@ -323,6 +448,30 @@ pub struct HllSparseRegisters { #[prost(bytes = "vec", tag = "2")] pub packed: ::prost::alloc::vec::Vec, } +/// HLLDelta carries the registers that increased between two consecutive +/// snapshots. HLL uses max semantics, so only increases are meaningful; +/// at a fixed precision a register can never decrease. Each update is applied +/// losslessly on the receiver via register\[index\] = max(register\[index\], value) +/// — no register update is ever dropped (there is no threshold to apply). +/// +/// This message is byte-identical to the Go reference implementation's HLLDelta +/// so the two runtimes emit byte-identical delta frames for identical window +/// state (cross-language byte parity). +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct HllDelta { + #[prost(message, repeated, tag = "1")] + pub updates: ::prost::alloc::vec::Vec, +} +/// HLLRegisterUpdate is one register whose value increased. +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct HllRegisterUpdate { + /// register index, 0 – 2^precision-1 + #[prost(uint32, tag = "1")] + pub index: u32, + /// new value (only sent when > snapshot value) + #[prost(uint32, tag = "2")] + pub value: u32, +} /// HLLVariant identifies which HLL estimator algorithm the registers belong to. #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] From 60c5b1de2eb31077c038d7494da629d0222b9172 Mon Sep 17 00:00:00 2001 From: zz_y Date: Tue, 26 May 2026 10:26:32 -0600 Subject: [PATCH 2/2] style: apply rustfmt to the new delta methods (fix CI Check Format) Co-Authored-By: Claude Opus 4.7 (1M context) --- src/message_pack_format/portable/countminsketch.rs | 2 +- src/message_pack_format/portable/countsketch.rs | 6 +++--- src/message_pack_format/portable/hll.rs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/message_pack_format/portable/countminsketch.rs b/src/message_pack_format/portable/countminsketch.rs index 12a46d1..0a1a0e4 100644 --- a/src/message_pack_format/portable/countminsketch.rs +++ b/src/message_pack_format/portable/countminsketch.rs @@ -612,7 +612,7 @@ mod tests { #[test] fn test_update_then_envelope_matches_sketchlib_go_bytes() { use crate::proto::sketchlib::{ - sketch_envelope::SketchState, CountMinState, CounterType, SketchEnvelope, + CountMinState, CounterType, SketchEnvelope, sketch_envelope::SketchState, }; use prost::Message; diff --git a/src/message_pack_format/portable/countsketch.rs b/src/message_pack_format/portable/countsketch.rs index af8ca9e..cb1d4cb 100644 --- a/src/message_pack_format/portable/countsketch.rs +++ b/src/message_pack_format/portable/countsketch.rs @@ -140,8 +140,8 @@ impl CountSketch { .iter() .enumerate() .min_by(|a, b| { - a.1 .1 - .partial_cmp(&b.1 .1) + a.1.1 + .partial_cmp(&b.1.1) .unwrap_or(std::cmp::Ordering::Equal) }) .map(|(i, e)| (i, e.1)) @@ -635,7 +635,7 @@ mod tests { #[test] fn test_update_then_envelope_matches_sketchlib_go_bytes() { use crate::proto::sketchlib::{ - sketch_envelope::SketchState, CountSketchState, CounterType, SketchEnvelope, + CountSketchState, CounterType, SketchEnvelope, sketch_envelope::SketchState, }; use prost::Message; diff --git a/src/message_pack_format/portable/hll.rs b/src/message_pack_format/portable/hll.rs index 3e38acd..597d747 100644 --- a/src/message_pack_format/portable/hll.rs +++ b/src/message_pack_format/portable/hll.rs @@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize}; use crate::message_pack_format::{Error as MsgPackError, MessagePackCodec}; -use crate::{hash64_seeded, DataInput, CANONICAL_HASH_SEED}; +use crate::{CANONICAL_HASH_SEED, DataInput, hash64_seeded}; /// HLL estimator variant. Mirrors `asap_sketchlib::proto::sketchlib::HllVariant`. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] @@ -529,7 +529,7 @@ mod tests { #[test] fn test_update_then_envelope_matches_sketchlib_go_bytes() { use crate::proto::sketchlib::{ - sketch_envelope::SketchState, HyperLogLogState, SketchEnvelope, + HyperLogLogState, SketchEnvelope, sketch_envelope::SketchState, }; use prost::Message;