Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions proto/countminsketch/countminsketch.proto
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ message CountMinState {
// to the Go reference implementation's CountMinDelta so the two runtimes emit
// byte-identical delta frames for identical window state (cross-language byte
// parity).
//
// CountMinDelta is structurally identical to CountSketchDelta: rows, cols, the
// packed cell encoding, per-row norm deltas, and an optional repeated hh_keys
// at the SAME tag number (6). Both sketches can track heavy hitters; whether
// hh_keys is populated is a control-plane decision (it is empty/omitted when
// heavy-hitter tracking is not enabled for this sketch).
message CountMinDelta {
uint32 rows = 1;
uint32 cols = 2;
Expand All @@ -86,6 +92,12 @@ message CountMinDelta {
repeated double l1 = 4 [packed = true];
// Per-row L2 norm deltas, length = rows.
repeated double l2 = 5 [packed = true];
// Heavy-hitter candidate keys from an upstream tracker, mirroring
// CountSketchDelta.hh_keys (same tag number 6, same wire shape). Downstream
// queries the merged Count-Min matrix for each key to (re)build its Top-K
// with globally-merged estimates. Empty/omitted when heavy-hitter tracking
// is not enabled.
repeated string hh_keys = 6;
// Packed cell encoding (canonical form):
repeated uint32 cell_rows = 9 [packed = true]; // row index of each changed cell
repeated uint32 cell_cols = 10 [packed = true]; // col index of each changed cell
Expand Down
26 changes: 19 additions & 7 deletions proto/hll/hll.proto
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,27 @@ enum HLLVariant {
// losslessly on the receiver via register[index] = max(register[index], value)
// — no register update is ever dropped (there is no threshold to apply).
//
// The increased registers are varint-packed exactly like
// HLLSparseRegisters.packed: sorted ascending by index, each register emitted
// as (index_delta, value) where index_delta = index - prev_index (prev_index
// starts at 0) and value is the new (larger) register value:
//
// for each increased register, in ascending index order:
// uvarint(index - prev_index) // prev_index starts at 0; deltas are >= 0
// uvarint(value) // 1..=Q+1, always 1 byte in practice
//
// A single-emit delta against the all-zero snapshot is therefore the same size
// as the full sparse frame, and a sub-window delta is strictly smaller (it
// packs only the registers that actually grew). This replaces the previous
// per-register sub-message encoding, eliminating ~6–8 bytes of tag/length
// overhead per register.
//
// This message is byte-identical to the Go reference implementation's HLLDelta
// so the two runtimes emit byte-identical delta frames for identical window
// state (cross-language byte parity).
message HLLDelta {
repeated HLLRegisterUpdate updates = 1;
}

// HLLRegisterUpdate is one register whose value increased.
message HLLRegisterUpdate {
uint32 index = 1; // register index, 0 – 2^precision-1
uint32 value = 2; // new value (only sent when > snapshot value)
// Varint-packed (index_delta, value) pairs for the registers that increased,
// in ascending index order. Same layout as HLLSparseRegisters.packed. Empty
// when no register changed.
bytes packed_updates = 1;
}
78 changes: 73 additions & 5 deletions src/message_pack_format/portable/countminsketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,22 @@ pub fn sketchlib_cms_query(inner: &SketchlibCms, key: &str) -> f64 {
/// Cells apply additively: `matrix[row][col] += d_count`. Per-row
/// L1 and L2 norm deltas are carried for downstream error-accounting
/// but are not consumed by `apply_delta` itself.
///
/// Structurally identical to [`super::countsketch::CountSketchDelta`]: the
/// optional `hh_keys` channel carries heavy-hitter candidate keys forwarded by
/// an upstream tracker. Count-Min can track heavy hitters, but whether
/// `hh_keys` is populated is a control-plane decision — it is empty when
/// heavy-hitter tracking is not enabled. The receiver re-queries the merged
/// matrix for each key to (re)build its Top-K. Mirrors the Go reference
/// implementation's `Delta.HHKeys`.
#[derive(Debug, Clone, Default)]
pub struct CountMinSketchDelta {
pub rows: u32,
pub cols: u32,
pub cells: Vec<(u32, u32, i64)>,
pub l1: Vec<f64>,
pub l2: Vec<f64>,
pub hh_keys: Vec<String>,
}

/// Provides approximate frequency counts with error bounds.
Expand Down Expand Up @@ -256,10 +265,14 @@ impl CountMinSketch {
/// encoding (`cell_rows`/`cell_cols`/`d_counts`). L1/L2 row deltas are
/// derived from the matrices — `l1[r] = Σ_c count[r][c]` and
/// `l2[r] = Σ_c count[r][c]^2` — which telescopes to the same value the
/// Go producer maintains incrementally. The returned bytes are a
/// `prost`-encoded [`crate::proto::sketchlib::CountMinDelta`],
/// byte-identical to the Go `proto.Marshal(CountMinDelta)` output for
/// the same inputs (cross-language byte parity).
/// Go producer maintains incrementally. Heavy-hitter candidate keys
/// (`hh_keys`) are sourced from an upstream tracker that this minimal
/// wrapper does not maintain, so the field is left empty (an empty repeated
/// field encodes to nothing on the wire), exactly as CountSketch's
/// `compute_delta` does. The returned bytes are a `prost`-encoded
/// [`crate::proto::sketchlib::CountMinDelta`], byte-identical to the Go
/// `proto.Marshal(CountMinDelta)` output for the same inputs when no
/// heavy-hitter keys are forwarded (cross-language byte parity).
///
/// Delta-against-empty: when `snapshot` is the all-zero sketch, every
/// surviving cell delta equals this window's own cell count, so the
Expand Down Expand Up @@ -322,6 +335,9 @@ impl CountMinSketch {
cells_legacy: Vec::new(),
l1,
l2,
// Heavy-hitter keys are control-plane-gated; this wrapper has no
// tracker, so the field is empty (mirrors CountSketch::compute_delta).
hh_keys: Vec::new(),
cell_rows,
cell_cols,
d_counts,
Expand All @@ -334,7 +350,9 @@ impl CountMinSketch {
/// Go reference implementation's `DeserializeDelta` + `ApplyDelta`.
/// Reads the packed cell arrays (`cell_rows`/`cell_cols`/`d_counts`)
/// and falls back to the legacy per-cell records for payloads from
/// older producers.
/// older producers. Heavy-hitter keys (`hh_keys`) are read symmetrically
/// (mirroring CountSketch) and carried onto the decoded delta; a plain CMS
/// keeps no Top-K, so they are not otherwise consumed here.
///
/// Returns `Err` if `bytes` is not a valid `CountMinDelta` proto or a
/// cell is out of range for this sketch's dimensions.
Expand Down Expand Up @@ -367,6 +385,11 @@ impl CountMinSketch {
cells,
l1: proto.l1,
l2: proto.l2,
// Heavy-hitter keys are carried through symmetrically (mirrors
// CountSketch). This wrapper keeps no Top-K, so they are not
// otherwise consumed; a control-plane sink would re-query the
// merged matrix for each key. Empty for a plain CMS.
hh_keys: proto.hh_keys,
};
self.apply_delta(&delta)
}
Expand Down Expand Up @@ -542,6 +565,7 @@ mod tests {
cells: vec![(0, 0, 10), (1, 2, 100)],
l1: vec![],
l2: vec![],
hh_keys: vec![],
};
cms.apply_delta(&delta).unwrap();
assert_eq!(
Expand All @@ -564,6 +588,7 @@ mod tests {
cells: vec![(0, 0, 10), (1, 1, 20)],
l1: vec![],
l2: vec![],
hh_keys: vec![],
};
let mut via_delta = base;
via_delta.apply_delta(&delta).unwrap();
Expand All @@ -579,6 +604,7 @@ mod tests {
cells: vec![(5, 0, 1)],
l1: vec![],
l2: vec![],
hh_keys: vec![],
};
assert!(cms.apply_delta(&delta).is_err());
}
Expand Down Expand Up @@ -693,6 +719,48 @@ mod tests {
);
}

/// Cross-language byte-parity guard for CountMinDelta hh_keys: encoding the
/// canonical packed delta with heavy-hitter keys at tag 6 must be
/// byte-identical to the Go reference's `SerializeDelta` output for the same
/// state and keys. The empty-`hh_keys` path is already covered by
/// `test_compute_delta_matches_go_golden_bytes`.
#[test]
fn test_hh_keys_matches_go_golden_bytes() {
use crate::proto::sketchlib::CountMinDelta as ProtoDelta;
use prost::Message;

let rows = 4usize;
let cols = 2048usize;
let mut current = CountMinSketch::new(rows, cols);
for i in 0..50u64 {
current.update(&format!("flow-{}", i % 10), 1.0);
}
let empty = CountMinSketch::new(rows, cols);

// Inject heavy-hitter keys onto the canonical packed delta at tag 6.
let no_hh = current.compute_delta(&empty, 1.0).unwrap();
let mut proto = ProtoDelta::decode(no_hh.as_slice()).unwrap();
proto.hh_keys = vec!["flow-0".into(), "flow-3".into(), "flow-7".into()];
let got = proto.encode_to_vec();

// Captured from the Go reference implementation's SerializeDelta for the
// same input with HHKeys = ["flow-0","flow-3","flow-7"].
const GOLDEN_HEX: &str = "0804108010222000000000000049400000000000004940000000000000494000000000000049402a200000000000406f400000000000406f400000000000406f400000000000406f403206666c6f772d303206666c6f772d333206666c6f772d374a28000000000000000000000101010101010101010102020202020202020202030303030303030303035250c101b202b608b7088809bc09c20dc60d910fbb0f9f018d04a604c004d6058507a909920b900ec70ead038304bc058007c207cc0a820bd50cbe0fcd0fe2019e02cd02e702a707b809a00ac20cee0dfe0f5a280a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a";
let want = decode_hex_cms(GOLDEN_HEX);
assert_eq!(
got,
want,
"CountMin delta hh_keys bytes diverge from the Go reference golden \
({} bytes got vs {} bytes want)",
got.len(),
want.len(),
);

// Symmetric decode: apply_delta_bytes round-trips hh_keys without error.
let mut sink = CountMinSketch::new(rows, cols);
sink.apply_delta_bytes(&got).unwrap();
}

fn decode_hex_cms(s: &str) -> Vec<u8> {
let s = s.trim();
s.as_bytes()
Expand Down
39 changes: 39 additions & 0 deletions src/message_pack_format/portable/countsketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,45 @@ mod tests {
);
}

/// Cross-language byte-parity guard for CountSketchDelta hh_keys: encoding
/// the canonical packed delta with heavy-hitter keys at tag 6 must be
/// byte-identical to the Go reference's `SerializeDelta` output for the same
/// state and keys. The empty-`hh_keys` path is already covered by
/// `test_compute_delta_matches_go_golden_bytes`.
#[test]
fn test_hh_keys_matches_go_golden_bytes() {
use crate::proto::sketchlib::CountSketchDelta as ProtoDelta;
use prost::Message;

let rows = 3usize;
let cols = 512usize;
let mut current = CountSketch::new(rows, cols);
for i in 0..25 {
let key = format!("k-{}", (b'a' + (i % 5) as u8) as char);
current.update(&key, 1.0);
}
let empty = CountSketch::new(rows, cols);

// Inject heavy-hitter keys onto the canonical packed delta at tag 6.
let no_hh = current.compute_delta(&empty, 1.0).unwrap();
let mut proto = ProtoDelta::decode(no_hh.as_slice()).unwrap();
proto.hh_keys = vec!["flow-0".into(), "flow-3".into(), "flow-7".into()];
let got = proto.encode_to_vec();

// Captured from the Go reference implementation's SerializeDelta for the
// same input with HHKeys = ["flow-0","flow-3","flow-7"].
const GOLDEN_HEX: &str = "08031080043206666c6f772d303206666c6f772d333206666c6f772d374a0f000000000001010101010202020202521a3d7fd9019402d3034eb9019602f703fe0306a501c601d201ba025a0f090a0909090a090a090a09090a090962180000000000405f400000000000405f400000000000405f40";
let want = decode_hex(GOLDEN_HEX);
assert_eq!(
got,
want,
"CountSketch delta hh_keys bytes diverge from the Go reference golden \
({} bytes got vs {} bytes want)",
got.len(),
want.len(),
);
}

fn decode_hex(s: &str) -> Vec<u8> {
s.as_bytes()
.chunks(2)
Expand Down
Loading
Loading