Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions proto/countminsketch/countminsketch.proto
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,48 @@ message CountMinState {

reserved 10 to 15;
}

// ============================================================================
// Count-Min Sketch Delta
// ============================================================================

// CountMinDelta carries only the matrix cells that changed by at least the
// threshold between two consecutive snapshots, plus the full per-row L1/L2
// norm deltas (one entry per row, negligible size).
//
// Cells apply additively on the receiver: matrix[row][col] += d_count. CMS
// counters only ever grow, but the delta encodes a signed count (sint64) so
// the same wire form covers weighted/decay variants.
//
// The packed cell encoding (cell_rows / cell_cols / d_counts, tags 9-11) is
// the canonical form; the legacy per-cell message (tag 3) is retained only so
// payloads from older producers still decode. This message is byte-identical
// to the Go reference implementation's CountMinDelta so the two runtimes emit
// byte-identical delta frames for identical window state (cross-language byte
// parity).
message CountMinDelta {
uint32 rows = 1;
uint32 cols = 2;
// Deprecated: use cell_rows/cell_cols/d_counts (tags 9-11). Retained for
// backward-compatible decode of payloads from older producers.
repeated CountMinCell cells_legacy = 3;
// Per-row L1 norm deltas, length = rows.
repeated double l1 = 4 [packed = true];
// Per-row L2 norm deltas, length = rows.
repeated double l2 = 5 [packed = true];
// Packed cell encoding (canonical form):
repeated uint32 cell_rows = 9 [packed = true]; // row index of each changed cell
repeated uint32 cell_cols = 10 [packed = true]; // col index of each changed cell
repeated sint64 d_counts = 11 [packed = true]; // signed integer count delta
}

// CountMinCell is the deprecated per-cell delta record. Producers emit the
// packed cell arrays on CountMinDelta instead; this message exists only to
// decode payloads from older producers.
message CountMinCell {
uint32 row = 1;
uint32 col = 2;
double d_count = 3; // deprecated: use CountMinDelta.d_counts
double d_sum = 4; // deprecated: omitted in the packed encoding
double d_sum2 = 5; // deprecated: omitted in the packed encoding
}
49 changes: 49 additions & 0 deletions proto/countsketch/countsketch.proto
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,52 @@ message HeapEntry {

reserved 3; // Reserved for bytes_key alternative.
}

// ============================================================================
// Count Sketch Delta
// ============================================================================

// CountSketchDelta carries only the matrix cells that changed by at least the
// threshold between two consecutive snapshots, plus the full per-row L2 norm
// deltas and any heavy-hitter candidate keys.
//
// Cells apply additively on the receiver: matrix[row][col] += d_count. Count
// Sketch counters are signed (±1 increments), so d_count is a signed integer
// (sint64). Heavy-hitter candidate keys (hh_keys) are re-queried against the
// merged matrix downstream to rebuild the receiver's Top-K with accurate,
// globally-merged estimates.
//
// The packed cell encoding (cell_rows / cell_cols / d_counts, tags 9-11) plus
// l2 (tag 12) is the canonical form; the legacy per-cell message (tag 3),
// l2_legacy (tag 4) and topk (tag 5) are retained only so payloads from older
// producers still decode. This message is byte-identical to the Go reference
// implementation's CountSketchDelta so the two runtimes emit byte-identical
// delta frames for identical window state (cross-language byte parity).
message CountSketchDelta {
uint32 rows = 1;
uint32 cols = 2;
// Deprecated: use cell_rows/cell_cols/d_counts (tags 9-11). Retained for
// backward-compatible decode of payloads from older producers.
repeated CountSketchCell cells_legacy = 3;
// Deprecated: use l2 (tag 12) instead.
repeated double l2_legacy = 4 [packed = true];
// Deprecated: use hh_keys (tag 6) instead.
TopKState topk = 5;
// Heavy-hitter candidate keys from the upstream tracker. Downstream queries
// the merged Count Sketch matrix for each key to build its Top-K.
repeated string hh_keys = 6;
// Packed cell encoding (canonical form):
repeated uint32 cell_rows = 9 [packed = true]; // row index of each changed cell
repeated uint32 cell_cols = 10 [packed = true]; // col index of each changed cell
repeated sint64 d_counts = 11 [packed = true]; // signed integer count delta
repeated double l2 = 12 [packed = true]; // per-row L2 norm deltas
}

// CountSketchCell is the deprecated per-cell delta record. Producers emit the
// packed cell arrays on CountSketchDelta instead; this message exists only to
// decode payloads from older producers.
message CountSketchCell {
uint32 row = 1;
uint32 col = 2;
double d_count = 3; // deprecated: use CountSketchDelta.d_counts
}
23 changes: 23 additions & 0 deletions proto/hll/hll.proto
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,26 @@ enum HLLVariant {
// Requires hip_kxq0, hip_kxq1, hip_est to be populated.
HLL_VARIANT_HIP = 3;
}

// ============================================================================
// HyperLogLog Delta
// ============================================================================

// HLLDelta carries the registers that increased between two consecutive
// snapshots. HLL uses max semantics, so only increases are meaningful;
// at a fixed precision a register can never decrease. Each update is applied
// losslessly on the receiver via register[index] = max(register[index], value)
// — no register update is ever dropped (there is no threshold to apply).
//
// This message is byte-identical to the Go reference implementation's HLLDelta
// so the two runtimes emit byte-identical delta frames for identical window
// state (cross-language byte parity).
message HLLDelta {
repeated HLLRegisterUpdate updates = 1;
}

// HLLRegisterUpdate is one register whose value increased.
message HLLRegisterUpdate {
uint32 index = 1; // register index, 0 – 2^precision-1
uint32 value = 2; // new value (only sent when > snapshot value)
}
205 changes: 205 additions & 0 deletions src/message_pack_format/portable/countminsketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,134 @@ impl CountMinSketch {
Ok(())
}

/// Compute a sparse, proto-marshalled `CountMinDelta` of `self`
/// against a `snapshot`. A cell is included when its absolute count
/// delta `|Δcount|` (self − snapshot) is `>= threshold` and non-zero.
/// The full per-row L1/L2 norm deltas are always carried (one entry
/// per row, negligible size).
///
/// This is the Rust twin of the Go reference implementation's
/// `ComputeDelta` + `SerializeDelta`: it iterates the matrix in
/// row-major order, subtracts the snapshot's value for each cell, and
/// emits the surviving cell deltas using the packed-array proto
/// encoding (`cell_rows`/`cell_cols`/`d_counts`). L1/L2 row deltas are
/// derived from the matrices — `l1[r] = Σ_c count[r][c]` and
/// `l2[r] = Σ_c count[r][c]^2` — which telescopes to the same value the
/// Go producer maintains incrementally. The returned bytes are a
/// `prost`-encoded [`crate::proto::sketchlib::CountMinDelta`],
/// byte-identical to the Go `proto.Marshal(CountMinDelta)` output for
/// the same inputs (cross-language byte parity).
///
/// Delta-against-empty: when `snapshot` is the all-zero sketch, every
/// surviving cell delta equals this window's own cell count, so the
/// result is this window's full state encoded as a delta (no
/// cross-window subtraction). CMS deltas carry only sketch-internal
/// cells/norms — there are no DataPoint-level metric scalars to drop.
pub fn compute_delta(
&self,
snapshot: &CountMinSketch,
threshold: f64,
) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
use crate::proto::sketchlib::CountMinDelta as ProtoDelta;
use prost::Message;

if self.rows != snapshot.rows || self.cols != snapshot.cols {
return Err(format!(
"CountMinSketch dimension mismatch: self={}x{}, snapshot={}x{}",
self.rows, self.cols, snapshot.rows, snapshot.cols
)
.into());
}

let cur = self.sketch();
let snap = snapshot.sketch();

let mut cell_rows: Vec<u32> = Vec::new();
let mut cell_cols: Vec<u32> = Vec::new();
let mut d_counts: Vec<i64> = Vec::new();
let mut l1: Vec<f64> = Vec::with_capacity(self.rows);
let mut l2: Vec<f64> = Vec::with_capacity(self.rows);

for r in 0..self.rows {
let mut cur_l1 = 0.0f64;
let mut cur_l2 = 0.0f64;
let mut snap_l1 = 0.0f64;
let mut snap_l2 = 0.0f64;
for c in 0..self.cols {
let cv = cur[r][c];
let sv = snap[r][c];
cur_l1 += cv;
cur_l2 += cv * cv;
snap_l1 += sv;
snap_l2 += sv * sv;
// CMS counts are non-negative integers in the wire form;
// mirror the Go reference's signed Δ + |Δ| threshold test.
let dc = (cv - sv) as i64;
if dc != 0 && (dc.unsigned_abs() as f64) >= threshold {
cell_rows.push(r as u32);
cell_cols.push(c as u32);
d_counts.push(dc);
}
}
l1.push(cur_l1 - snap_l1);
l2.push(cur_l2 - snap_l2);
}

let delta = ProtoDelta {
rows: self.rows as u32,
cols: self.cols as u32,
cells_legacy: Vec::new(),
l1,
l2,
cell_rows,
cell_cols,
d_counts,
};
Ok(delta.encode_to_vec())
}

/// Apply a `prost`-encoded [`crate::proto::sketchlib::CountMinDelta`]
/// to this sketch in place (additive cell merge). The Rust twin of the
/// Go reference implementation's `DeserializeDelta` + `ApplyDelta`.
/// Reads the packed cell arrays (`cell_rows`/`cell_cols`/`d_counts`)
/// and falls back to the legacy per-cell records for payloads from
/// older producers.
///
/// Returns `Err` if `bytes` is not a valid `CountMinDelta` proto or a
/// cell is out of range for this sketch's dimensions.
pub fn apply_delta_bytes(
&mut self,
bytes: &[u8],
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use crate::proto::sketchlib::CountMinDelta as ProtoDelta;
use prost::Message;

let proto = ProtoDelta::decode(bytes)?;
let cells: Vec<(u32, u32, i64)> = if !proto.cell_rows.is_empty() {
proto
.cell_rows
.iter()
.zip(proto.cell_cols.iter())
.zip(proto.d_counts.iter())
.map(|((&r, &c), &d)| (r, c, d))
.collect()
} else {
proto
.cells_legacy
.iter()
.map(|c| (c.row, c.col, c.d_count as i64))
.collect()
};
let delta = CountMinSketchDelta {
rows: proto.rows,
cols: proto.cols,
cells,
l1: proto.l1,
l2: proto.l2,
};
self.apply_delta(&delta)
}

/// One-shot aggregation: build a sketch from parallel key/value slices
/// and return the msgpack bytes.
pub fn aggregate_count(
Expand Down Expand Up @@ -585,4 +713,81 @@ mod tests {
_ => panic!("non-hex byte {}", c as char),
}
}

/// `compute_delta` against an EMPTY snapshot reconstructs the window's
/// full state when its bytes are applied to a fresh empty sketch
/// (round-trip). With `threshold = 1.0` every changed cell survives, so
/// applying the delta to a zero base yields the same matrix as the
/// original window.
#[test]
fn test_compute_delta_against_empty_round_trips() {
let rows = 4usize;
let cols = 2048usize;
let mut window = CountMinSketch::new(rows, cols);
for i in 0..200u64 {
window.update(&format!("flow-{}", i % 37), 1.0);
}
let empty = CountMinSketch::new(rows, cols);

let delta_bytes = window.compute_delta(&empty, 1.0).unwrap();

let mut reconstructed = CountMinSketch::new(rows, cols);
reconstructed.apply_delta_bytes(&delta_bytes).unwrap();

assert_eq!(reconstructed.sketch(), window.sketch());
}

/// A delta computed between two non-empty snapshots reconstructs the
/// current sketch when applied to the base — matching a direct merge of
/// the cell-wise difference.
#[test]
fn test_compute_delta_then_apply_matches_current() {
let mut base = CountMinSketch::new(2, 64);
for i in 0..40u64 {
base.update(&format!("k{}", i % 8), 1.0);
}
let mut current = base.clone();
for i in 0..30u64 {
current.update(&format!("k{}", i % 8), 1.0);
}

let delta_bytes = current.compute_delta(&base, 1.0).unwrap();
let mut reconstructed = base.clone();
reconstructed.apply_delta_bytes(&delta_bytes).unwrap();
assert_eq!(reconstructed.sketch(), current.sketch());
}

/// Cross-language byte-parity guard: `compute_delta` against an empty
/// snapshot must emit bytes identical to the Go reference
/// implementation's `SerializeDelta(ComputeDelta(empty, current, 1.0))`
/// for the same `(rows=4, cols=2048)` × "flow-0".."flow-9" (each 5×,
/// 50 unweighted updates) input. The golden hex was captured from a
/// `proto.Marshal` of the Go reference's `CountMinDelta` (packed-array
/// encoding) for that input. A delta-against-empty carries the window's
/// full state, so this also pins the per-row L1 (=50) / L2 (=250) norm
/// deltas and the packed cell arrays.
#[test]
fn test_compute_delta_matches_go_golden_bytes() {
let rows = 4usize;
let cols = 2048usize;
let mut current = CountMinSketch::new(rows, cols);
for i in 0..50u64 {
current.update(&format!("flow-{}", i % 10), 1.0);
}
let empty = CountMinSketch::new(rows, cols);
let got = current.compute_delta(&empty, 1.0).unwrap();

// Captured from the Go reference implementation's
// SerializeDelta(ComputeDelta(empty, current, 1.0)) for the same input.
const GOLDEN_HEX: &str = "0804108010222000000000000049400000000000004940000000000000494000000000000049402a200000000000406f400000000000406f400000000000406f400000000000406f404a28000000000000000000000101010101010101010102020202020202020202030303030303030303035250c101b202b608b7088809bc09c20dc60d910fbb0f9f018d04a604c004d6058507a909920b900ec70ead038304bc058007c207cc0a820bd50cbe0fcd0fe2019e02cd02e702a707b809a00ac20cee0dfe0f5a280a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a";
let want = decode_hex_cms(GOLDEN_HEX);
assert_eq!(
got,
want,
"CountMin delta bytes diverge from the Go reference golden \
({} bytes got vs {} bytes want)",
got.len(),
want.len(),
);
}
}
Loading
Loading