From 928950589d13df2a865d023a8d19e56b3dbd4766 Mon Sep 17 00:00:00 2001 From: zz_y Date: Tue, 26 May 2026 13:47:11 -0600 Subject: [PATCH] delta wire-format: pack HLL delta + add CountMin hh_keys MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two delta encoding updates (cross-language byte parity verified via golden tests against the Go reference implementation): - HLL: replace HLLDelta's repeated per-register sub-messages with a packed varint (index_delta, value) blob — the layout HLLSparseRegisters.packed already uses for full sparse state. ~62% smaller deltas; a single-emit delta is about the size of the full sparse frame. - CountMin: add an optional hh_keys (repeated string, field 6) to CountMinDelta so it matches CountSketchDelta. Population is control-plane-gated (empty unless a heavy-hitter source is supplied); no byte change when unset. cargo test --lib: 426 passed. Co-Authored-By: Claude Opus 4.7 (1M context) --- proto/countminsketch/countminsketch.proto | 12 ++ proto/hll/hll.proto | 26 +++-- .../portable/countminsketch.rs | 78 ++++++++++++- .../portable/countsketch.rs | 39 +++++++ src/message_pack_format/portable/hll.rs | 110 ++++++++++++++---- src/proto/generated/sketchlib.v1.rs | 45 +++++-- 6 files changed, 266 insertions(+), 44 deletions(-) diff --git a/proto/countminsketch/countminsketch.proto b/proto/countminsketch/countminsketch.proto index 95cf902..27e09d3 100644 --- a/proto/countminsketch/countminsketch.proto +++ b/proto/countminsketch/countminsketch.proto @@ -76,6 +76,12 @@ message CountMinState { // to the Go reference implementation's CountMinDelta so the two runtimes emit // byte-identical delta frames for identical window state (cross-language byte // parity). +// +// CountMinDelta is structurally identical to CountSketchDelta: rows, cols, the +// packed cell encoding, per-row norm deltas, and an optional repeated hh_keys +// at the SAME tag number (6). Both sketches can track heavy hitters; whether +// hh_keys is populated is a control-plane decision (it is empty/omitted when +// heavy-hitter tracking is not enabled for this sketch). message CountMinDelta { uint32 rows = 1; uint32 cols = 2; @@ -86,6 +92,12 @@ message CountMinDelta { repeated double l1 = 4 [packed = true]; // Per-row L2 norm deltas, length = rows. repeated double l2 = 5 [packed = true]; + // Heavy-hitter candidate keys from an upstream tracker, mirroring + // CountSketchDelta.hh_keys (same tag number 6, same wire shape). Downstream + // queries the merged Count-Min matrix for each key to (re)build its Top-K + // with globally-merged estimates. Empty/omitted when heavy-hitter tracking + // is not enabled. + repeated string hh_keys = 6; // Packed cell encoding (canonical form): repeated uint32 cell_rows = 9 [packed = true]; // row index of each changed cell repeated uint32 cell_cols = 10 [packed = true]; // col index of each changed cell diff --git a/proto/hll/hll.proto b/proto/hll/hll.proto index 6a835dd..1cdf612 100644 --- a/proto/hll/hll.proto +++ b/proto/hll/hll.proto @@ -108,15 +108,27 @@ enum HLLVariant { // losslessly on the receiver via register[index] = max(register[index], value) // — no register update is ever dropped (there is no threshold to apply). // +// The increased registers are varint-packed exactly like +// HLLSparseRegisters.packed: sorted ascending by index, each register emitted +// as (index_delta, value) where index_delta = index - prev_index (prev_index +// starts at 0) and value is the new (larger) register value: +// +// for each increased register, in ascending index order: +// uvarint(index - prev_index) // prev_index starts at 0; deltas are >= 0 +// uvarint(value) // 1..=Q+1, always 1 byte in practice +// +// A single-emit delta against the all-zero snapshot is therefore the same size +// as the full sparse frame, and a sub-window delta is strictly smaller (it +// packs only the registers that actually grew). This replaces the previous +// per-register sub-message encoding, eliminating ~6–8 bytes of tag/length +// overhead per register. +// // This message is byte-identical to the Go reference implementation's HLLDelta // so the two runtimes emit byte-identical delta frames for identical window // state (cross-language byte parity). message HLLDelta { - repeated HLLRegisterUpdate updates = 1; -} - -// HLLRegisterUpdate is one register whose value increased. -message HLLRegisterUpdate { - uint32 index = 1; // register index, 0 – 2^precision-1 - uint32 value = 2; // new value (only sent when > snapshot value) + // Varint-packed (index_delta, value) pairs for the registers that increased, + // in ascending index order. Same layout as HLLSparseRegisters.packed. Empty + // when no register changed. + bytes packed_updates = 1; } diff --git a/src/message_pack_format/portable/countminsketch.rs b/src/message_pack_format/portable/countminsketch.rs index 0a1a0e4..41fd91a 100644 --- a/src/message_pack_format/portable/countminsketch.rs +++ b/src/message_pack_format/portable/countminsketch.rs @@ -91,6 +91,14 @@ pub fn sketchlib_cms_query(inner: &SketchlibCms, key: &str) -> f64 { /// Cells apply additively: `matrix[row][col] += d_count`. Per-row /// L1 and L2 norm deltas are carried for downstream error-accounting /// but are not consumed by `apply_delta` itself. +/// +/// Structurally identical to [`super::countsketch::CountSketchDelta`]: the +/// optional `hh_keys` channel carries heavy-hitter candidate keys forwarded by +/// an upstream tracker. Count-Min can track heavy hitters, but whether +/// `hh_keys` is populated is a control-plane decision — it is empty when +/// heavy-hitter tracking is not enabled. The receiver re-queries the merged +/// matrix for each key to (re)build its Top-K. Mirrors the Go reference +/// implementation's `Delta.HHKeys`. #[derive(Debug, Clone, Default)] pub struct CountMinSketchDelta { pub rows: u32, @@ -98,6 +106,7 @@ pub struct CountMinSketchDelta { pub cells: Vec<(u32, u32, i64)>, pub l1: Vec, pub l2: Vec, + pub hh_keys: Vec, } /// Provides approximate frequency counts with error bounds. @@ -256,10 +265,14 @@ impl CountMinSketch { /// encoding (`cell_rows`/`cell_cols`/`d_counts`). L1/L2 row deltas are /// derived from the matrices — `l1[r] = Σ_c count[r][c]` and /// `l2[r] = Σ_c count[r][c]^2` — which telescopes to the same value the - /// Go producer maintains incrementally. The returned bytes are a - /// `prost`-encoded [`crate::proto::sketchlib::CountMinDelta`], - /// byte-identical to the Go `proto.Marshal(CountMinDelta)` output for - /// the same inputs (cross-language byte parity). + /// Go producer maintains incrementally. Heavy-hitter candidate keys + /// (`hh_keys`) are sourced from an upstream tracker that this minimal + /// wrapper does not maintain, so the field is left empty (an empty repeated + /// field encodes to nothing on the wire), exactly as CountSketch's + /// `compute_delta` does. The returned bytes are a `prost`-encoded + /// [`crate::proto::sketchlib::CountMinDelta`], byte-identical to the Go + /// `proto.Marshal(CountMinDelta)` output for the same inputs when no + /// heavy-hitter keys are forwarded (cross-language byte parity). /// /// Delta-against-empty: when `snapshot` is the all-zero sketch, every /// surviving cell delta equals this window's own cell count, so the @@ -322,6 +335,9 @@ impl CountMinSketch { cells_legacy: Vec::new(), l1, l2, + // Heavy-hitter keys are control-plane-gated; this wrapper has no + // tracker, so the field is empty (mirrors CountSketch::compute_delta). + hh_keys: Vec::new(), cell_rows, cell_cols, d_counts, @@ -334,7 +350,9 @@ impl CountMinSketch { /// Go reference implementation's `DeserializeDelta` + `ApplyDelta`. /// Reads the packed cell arrays (`cell_rows`/`cell_cols`/`d_counts`) /// and falls back to the legacy per-cell records for payloads from - /// older producers. + /// older producers. Heavy-hitter keys (`hh_keys`) are read symmetrically + /// (mirroring CountSketch) and carried onto the decoded delta; a plain CMS + /// keeps no Top-K, so they are not otherwise consumed here. /// /// Returns `Err` if `bytes` is not a valid `CountMinDelta` proto or a /// cell is out of range for this sketch's dimensions. @@ -367,6 +385,11 @@ impl CountMinSketch { cells, l1: proto.l1, l2: proto.l2, + // Heavy-hitter keys are carried through symmetrically (mirrors + // CountSketch). This wrapper keeps no Top-K, so they are not + // otherwise consumed; a control-plane sink would re-query the + // merged matrix for each key. Empty for a plain CMS. + hh_keys: proto.hh_keys, }; self.apply_delta(&delta) } @@ -542,6 +565,7 @@ mod tests { cells: vec![(0, 0, 10), (1, 2, 100)], l1: vec![], l2: vec![], + hh_keys: vec![], }; cms.apply_delta(&delta).unwrap(); assert_eq!( @@ -564,6 +588,7 @@ mod tests { cells: vec![(0, 0, 10), (1, 1, 20)], l1: vec![], l2: vec![], + hh_keys: vec![], }; let mut via_delta = base; via_delta.apply_delta(&delta).unwrap(); @@ -579,6 +604,7 @@ mod tests { cells: vec![(5, 0, 1)], l1: vec![], l2: vec![], + hh_keys: vec![], }; assert!(cms.apply_delta(&delta).is_err()); } @@ -693,6 +719,48 @@ mod tests { ); } + /// Cross-language byte-parity guard for CountMinDelta hh_keys: encoding the + /// canonical packed delta with heavy-hitter keys at tag 6 must be + /// byte-identical to the Go reference's `SerializeDelta` output for the same + /// state and keys. The empty-`hh_keys` path is already covered by + /// `test_compute_delta_matches_go_golden_bytes`. + #[test] + fn test_hh_keys_matches_go_golden_bytes() { + use crate::proto::sketchlib::CountMinDelta as ProtoDelta; + use prost::Message; + + let rows = 4usize; + let cols = 2048usize; + let mut current = CountMinSketch::new(rows, cols); + for i in 0..50u64 { + current.update(&format!("flow-{}", i % 10), 1.0); + } + let empty = CountMinSketch::new(rows, cols); + + // Inject heavy-hitter keys onto the canonical packed delta at tag 6. + let no_hh = current.compute_delta(&empty, 1.0).unwrap(); + let mut proto = ProtoDelta::decode(no_hh.as_slice()).unwrap(); + proto.hh_keys = vec!["flow-0".into(), "flow-3".into(), "flow-7".into()]; + let got = proto.encode_to_vec(); + + // Captured from the Go reference implementation's SerializeDelta for the + // same input with HHKeys = ["flow-0","flow-3","flow-7"]. + const GOLDEN_HEX: &str = "0804108010222000000000000049400000000000004940000000000000494000000000000049402a200000000000406f400000000000406f400000000000406f400000000000406f403206666c6f772d303206666c6f772d333206666c6f772d374a28000000000000000000000101010101010101010102020202020202020202030303030303030303035250c101b202b608b7088809bc09c20dc60d910fbb0f9f018d04a604c004d6058507a909920b900ec70ead038304bc058007c207cc0a820bd50cbe0fcd0fe2019e02cd02e702a707b809a00ac20cee0dfe0f5a280a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a"; + let want = decode_hex_cms(GOLDEN_HEX); + assert_eq!( + got, + want, + "CountMin delta hh_keys bytes diverge from the Go reference golden \ + ({} bytes got vs {} bytes want)", + got.len(), + want.len(), + ); + + // Symmetric decode: apply_delta_bytes round-trips hh_keys without error. + let mut sink = CountMinSketch::new(rows, cols); + sink.apply_delta_bytes(&got).unwrap(); + } + fn decode_hex_cms(s: &str) -> Vec { let s = s.trim(); s.as_bytes() diff --git a/src/message_pack_format/portable/countsketch.rs b/src/message_pack_format/portable/countsketch.rs index cb1d4cb..408c693 100644 --- a/src/message_pack_format/portable/countsketch.rs +++ b/src/message_pack_format/portable/countsketch.rs @@ -700,6 +700,45 @@ mod tests { ); } + /// Cross-language byte-parity guard for CountSketchDelta hh_keys: encoding + /// the canonical packed delta with heavy-hitter keys at tag 6 must be + /// byte-identical to the Go reference's `SerializeDelta` output for the same + /// state and keys. The empty-`hh_keys` path is already covered by + /// `test_compute_delta_matches_go_golden_bytes`. + #[test] + fn test_hh_keys_matches_go_golden_bytes() { + use crate::proto::sketchlib::CountSketchDelta as ProtoDelta; + use prost::Message; + + let rows = 3usize; + let cols = 512usize; + let mut current = CountSketch::new(rows, cols); + for i in 0..25 { + let key = format!("k-{}", (b'a' + (i % 5) as u8) as char); + current.update(&key, 1.0); + } + let empty = CountSketch::new(rows, cols); + + // Inject heavy-hitter keys onto the canonical packed delta at tag 6. + let no_hh = current.compute_delta(&empty, 1.0).unwrap(); + let mut proto = ProtoDelta::decode(no_hh.as_slice()).unwrap(); + proto.hh_keys = vec!["flow-0".into(), "flow-3".into(), "flow-7".into()]; + let got = proto.encode_to_vec(); + + // Captured from the Go reference implementation's SerializeDelta for the + // same input with HHKeys = ["flow-0","flow-3","flow-7"]. + const GOLDEN_HEX: &str = "08031080043206666c6f772d303206666c6f772d333206666c6f772d374a0f000000000001010101010202020202521a3d7fd9019402d3034eb9019602f703fe0306a501c601d201ba025a0f090a0909090a090a090a09090a090962180000000000405f400000000000405f400000000000405f40"; + let want = decode_hex(GOLDEN_HEX); + assert_eq!( + got, + want, + "CountSketch delta hh_keys bytes diverge from the Go reference golden \ + ({} bytes got vs {} bytes want)", + got.len(), + want.len(), + ); + } + fn decode_hex(s: &str) -> Vec { s.as_bytes() .chunks(2) diff --git a/src/message_pack_format/portable/hll.rs b/src/message_pack_format/portable/hll.rs index 597d747..17b8686 100644 --- a/src/message_pack_format/portable/hll.rs +++ b/src/message_pack_format/portable/hll.rs @@ -153,20 +153,24 @@ impl HllSketch { /// subtraction). HLL deltas carry only register max-updates — there are /// no DataPoint-level metric scalars to drop. pub fn compute_delta(&self, snapshot: &HllSketch, _threshold: u64) -> Vec { - use crate::proto::sketchlib::{HllDelta as ProtoDelta, HllRegisterUpdate}; + use crate::proto::sketchlib::HllDelta as ProtoDelta; use prost::Message; let cur = &self.registers; let snap = &snapshot.registers; let n = cur.len().min(snap.len()); - let mut updates: Vec = Vec::new(); + // Varint-pack the increased registers as (index_delta, value) pairs in + // ascending index order — the same layout `encode_sparse_registers` + // uses for the full sparse state. Walking registers in index order + // yields a sorted sequence, so `prev` only ever advances. + let mut packed: Vec = Vec::new(); + let mut prev: usize = 0; for i in 0..n { if cur[i] > snap[i] { - updates.push(HllRegisterUpdate { - index: i as u32, - value: cur[i] as u32, - }); + write_uvarint(&mut packed, (i - prev) as u64); + write_uvarint(&mut packed, cur[i] as u64); + prev = i; } } // Guard: if `self` has more registers than the snapshot (should not @@ -174,14 +178,16 @@ impl HllSketch { // the Go reference's trailing-register guard. for (i, &v) in cur.iter().enumerate().take(cur.len()).skip(n) { if v > 0 { - updates.push(HllRegisterUpdate { - index: i as u32, - value: v as u32, - }); + write_uvarint(&mut packed, (i - prev) as u64); + write_uvarint(&mut packed, v as u64); + prev = i; } } - ProtoDelta { updates }.encode_to_vec() + ProtoDelta { + packed_updates: packed, + } + .encode_to_vec() } /// Apply a `prost`-encoded [`crate::proto::sketchlib::HllDelta`] to this @@ -200,13 +206,38 @@ impl HllSketch { use prost::Message; let proto = ProtoDelta::decode(bytes)?; - let delta = HllSketchDelta { - updates: proto - .updates - .into_iter() - .map(|u| (u.index, u.value as u8)) - .collect(), - }; + + // Unpack the (index_delta, value) blob — inverse of `compute_delta`, + // same layout as `decode_sparse_registers` but bounded by the blob's + // own contents rather than a fixed register count. + let packed = &proto.packed_updates; + let mut updates: Vec<(u32, u8)> = Vec::new(); + let mut off = 0usize; + let mut prev: u32 = 0; + let mut first = true; + while off < packed.len() { + let (delta, used) = + read_uvarint(&packed[off..]).ok_or("hll: delta index-delta varint corrupt")?; + off += used; + let (val, used) = + read_uvarint(&packed[off..]).ok_or("hll: delta value varint corrupt")?; + off += used; + + let idx = prev + delta as u32; + // First delta is an absolute index (prev=0); later deltas must be + // > 0 so indices stay strictly increasing and unique. + if !first && delta == 0 { + return Err(format!("hll: delta non-increasing index at idx {idx}").into()); + } + first = false; + if val > 0xff { + return Err(format!("hll: delta value {val} exceeds u8 at idx {idx}").into()); + } + updates.push((idx, val as u8)); + prev = idx; + } + + let delta = HllSketchDelta { updates }; self.apply_delta(&delta) } @@ -729,8 +760,8 @@ mod tests { // Captured from the Go reference implementation's // SerializeRegisterDelta(ComputeRegisterDelta(empty, current)) for the - // same input. - const GOLDEN_HEX: &str = "0a04085810010a0508930510020a0508931110010a0508e31110010a0508fa1510010a0508d71910010a0508881b10010a0508ba2310010a0508ec2310010a0508d62410010a0508ff2510020a0508ae2610010a0508b22810020a0508bb2810020a0508dc3210020a0508f63510020a0508eb3610010a0508b23910040a0508da3910040a0508853a10050a0508f14110020a0508974310020a05089c4510050a0508de4810020a0508b64910020a0508c74910010a0508c54d10010a0508ed4d10020a05088d4e10020a0508b35210030a0508805410020a0508e75a10020a0508f85a10060a0508835b10030a0508b75b10020a0508c25b10060a0508de5c10010a0508dd6310010a0508876410010a0508e56410050a0508e66610010a0508d66710040a0508e86710010a0508e06c10010a0508877110040a0508ad7210010a0508e97510010a0508877710030a0508d37910010a0508db7b1003"; + // same input, with the packed (index_delta, value) HLLDelta encoding. + const GOLDEN_HEX: &str = "0a81015801bb0402800c015001970401dd0301b10101b2080132016a01a901022f018402020902a10a029a03027501c7020428042b05ec0702a60102850205c2030258021101fe030128022002a60403cd0102e7060211060b0334020b069c0101ff06012a015e0581020170041201f80401a70404a60101bc03019e0103cc0201880203"; let want = decode_hex(GOLDEN_HEX); assert_eq!( got, @@ -741,4 +772,43 @@ mod tests { want.len(), ); } + + /// Cross-language byte-parity guard for a SUB-WINDOW delta (not against + /// empty): a precision-14 sketch is fed `(0..2000)` as f64 little-endian + /// bytes (the base/snapshot), then `(2000..5000)`; the delta carries only + /// the registers that grew between the two, packed as (index_delta, value) + /// pairs. Must be byte-identical to the Go reference implementation's + /// `SerializeRegisterDelta(ComputeRegisterDelta(base, current))`. + #[test] + fn test_compute_subwindow_delta_matches_go_golden_bytes() { + let mut base = HllSketch::new(HllVariant::Datafusion, 14); + for i in 0..2000i32 { + base.update(&(i as f64).to_le_bytes()); + } + let mut current = base.clone(); + for i in 2000..5000i32 { + current.update(&(i as f64).to_le_bytes()); + } + let got = current.compute_delta(&base, 0); + + // Captured from the Go reference implementation's + // SerializeRegisterDelta(ComputeRegisterDelta(base, current)) for the + // same input, with the packed (index_delta, value) HLLDelta encoding. + const GOLDEN_HEX: &str = "0ae627000402030401020302030c0109020502040201011108100202021304030101010b010e02040102030b0407020201050405021404060105010301010101020101030306020201040102011101010407010106080104030101010103020c0204021105040108010101180201030f010b01010103010e01120201030101010303030301030107030a0108010a0108012106060309010302070203010202030101010e0102020a020102060101020a01040101011702020203021003060103030e02110206010102040604010d010a050c011104010104050a0103020f0203010305050209010301020401020805010101010201020104010201060501040601020203020601020304010401020101010601010419010e010503050107020302010102010d010501110105050503020104050201020215050401030101020d041a010601190209020301020404070e01090202010b01030107040a0103010403010117050602040601010301040101030203030105010401020108030b0517020c01010506010402030103040704010102020d010a011201060304020c010c020e010b020702070101040101010104021504070109040804060502021802030104021605010107020603010101010201100104010605020205010e010406040102020f0203020a02040201030201010303010401030302010a030201060514020d02050104010201050103020802040105020602080213020d02020105020301140204010a0509060d03070206020902070101080a0202010a010e0102020503080201010e0101010403080104020d0102010501040403060401050405011203010101040e010202040303020c010203090201030d010a0306020201030208020e0103010701030113020202030501020c010b0101040502010105010203060105020a0315050b030c0101010f060405040204030b03010502020101080302020c010b02060403010404030102010f020f0107030601040109060301040301010703040303020801010104010b011101070102011101010102030201070106010601030201020301030302020e030501190108030201040205020503030106030a01050112020102010107030b02060209050201010304010502030106060601050108020602080201010905080205010204010203020303060205010102070201010308160304040f02010105020605070407030b0104010701010205020c02060103010b040c0102030202060102010e030102070108030301030205010101040301030303010203020802190105020601120101020701010101010302080303010f0104020202060402020403010103010503060203030701010105030903030508010101020103020305060605030e0203020603010102060f010201040202030504030103020f01130204010201020104010102010401010403050201030502040306060d0302010a010a02080103020101020208030301030102010403030305011003030206010f01010609030302020102030402020102010103020408010502130306010303060102010c01030202030401030101020101070202010901150104010f010401080107010601010206020b0201010d040303070202011403090210010302090403010403050201010b05010401010301090102010c010b04040206010d0102010b05040301020702020302010d0204030102110106010803020108030101070104020104040201030605050102040101010302010b01020103030e031101030101020103070112010403010101010b011001060313030301070203030803050101030601050207030a0104030302020202030c010101080103010c0102020404020102030c01110102030901140307010602020303020401070102010b020402030707010101010201070705130102011b0201060501010101010201020109010601020205010b0103040504070202010501080108020703030104030401010201030a010201030206010101020104020d02040102010102040301040605100201020801030306020e010402110106030602080105011101060106020d0216010702050301020101040409010403010201040102020205010401070207010d0205040704050202021201030510040303020602020703090204060102070102010301020101010401200101030202040103021301010107050a010702030311010303020103020b010505100302060102010104020a010402070210010704030304050a02010409010101010106030901030110031002020103010d0301020202040101020806080105010401060126020201080109020e0102030503120106010901010301010b01040201020402020115020d0108030f010c0303020c0101010602080504030802050202030f030202050404010603030208010a0204010d02020202030a0209030e030304030101060d0104020702010209020a0406010101070204020b0201030d021201020106031a0504050202040102010101020103030601080102020f021a020201120112020d02030405020403090304020102010108010203080101010704060104010902010401020203040107010e02070102060601080105010101060204010e010c020d010601050108010801020508010102070302030201030101010b020b02060107010c020b01020104020302060203040c02020105010301030202020902090103030503070606010103040216020401050205020404020208010b0207010502050108010502030102040f0206021203050105010a010a010b04090101030401040c03010102040104020c04090203020301040202020101060205020d010303040202020b010c010f0205030d010301050202030a02030309020203110103010102090106020402010202010303020206010a0104010601030208020803080110020702030205050201130102022102060204021504040102010a040c0401010503030207010701050103010d0104040601100201010201020302010a010501110102080504010106030101060601040a01050202080203020509010c010102090203010d0101020102120713021104070204010201010208020502060408010309020101020d0104010206030102010201010202050202040101010102050102030201060209020b01020105040202060401020f010103090504030101030111020e031001110204020602010201011101050204010a01040402030501010202010602070503011002020406010401030103020f0403050e0104020201010101040b0105030103010106020803080201010101010202050404010103011c0306030201010105010202060206010102020101010703010502010c010303020206030c021601130121020f03020307030e0109010602040102011f01090506010b01050402020e01020303020a040201020202010301030204030a010201080404030304110104010403070203060301020107020c050502020109030901020102010801030203090d0104021502010104040504090105030a010503110102020a010203050303040e030502030101030105020108010103020102030103010209030202010205010301010202020804010207010d020d020a010605080306010c0202021101090202030f010101040109020f0301020501010103020501040109021202080601070f0204020105070119030101110208040a0203020c010201070503030b0108030201040204020402010203010c022603040505010e04010103010702010109050b020801020405020e01140204020201050406020a020301060404010c010102050106020207010102010801050202020105040103010108010113050d0109020703070205010f0703050b0112010f02010103070b010201050101030201010203010705090605020f0103040b010801020210020501060204040c010104070105020a020104040201010403040109010c0101050a0305010301090108010b030b0201010601030202030e020203030101050202010106010d010d010601020101010c0304010901010103010401030201010902010101050b01010107010105020105020c01030405011404020104030a01030105010801050202020a0107010c04150105030301050401020603020203040b01100101010301030103070101010215020302010101010101080203020a010a021d031302030104020201030305030b01030203030b03010212020302030202050d010703040202040501040101030203060405031802010206020402020109020202070305021c01010105031c010503050305030d010e01090105030401070104010302010103010201100103020b0107050301090104010203030205030103010105010504080104020302020111050601050404022202070102010201090705080b01030109012903090206030104040107010102070201040f020201030103030b020804020207012d0105010301080103020201030202050501070105010202010304020303020401030802020103020102010501010c0106021201050317010a0304010101020207090102030205010b0204010b010801080312010f01150102020802040102011102080101040a0102010c020403020203010603050203010301040111031505040202030e01010102050607050101010502020301010c0210010d0407010903020202010e01050106020401070402010c010602090311060202100206020202020316020103010101010a030301060101010203150201050e04030204021c020c020c030102030101010502060102010d02010411040c0208010101010101040201010205050101060214010901010407020206160101050407050104010101010205050501060103010101060411020302070211010201090203021a0602010f0106030a04010106010301090101070501020107060102010404020b0102010202040110010201050106020901090309020d0105020601080101010501010205030403180401030d0105040802240209010a02020305020502040102010301090303020802040108010405020201040d0403010501020102020503040215010d030902030116010f010d0107030204070204040a0109011002090119031e020502090104031b050605020201010c0207020503010104010c0102010a01060502010c020d020202050101011102050107040f0102010201010106020305020101010402010302010c0306051505010104020802010102070c010803010506011101050207020302040103010301080103080d030c0103040d01010209040501030101010402020301010f01020102010301010401010202050408011c050904080106040401040105010402120102020103030201010a030103020402021303020305020403030201030901010504010902010305010301030203020a0108010702050107020a01160205020a060201030106030e02030103031001070102030d0119020202070505021202030108010101060808010202050108010f0102010701030404020802020214030c0211010b020301080303020105080105010401100106011309040203010103010208030b020102010301010e0107021501080203010402010101010606030204010d0108010e0102010401010105030903020302030c0103010104080101010405030201020c020101070201020102010401010301080102041f01070110030101060107010201060108010d010a030a0114041003020104010302020115011a02070802010301010106020b0101010201050109020703020205010a05050402030b0108010901070201020101020105040401110101020101040103010301010107010201180101020a030201030211011a02110103010102050103040e02160106010b02050a0e0202010203030101010402120305010301030305010805070304010402050102020402040106040105060105010b01110104010c0202020b01020104020e0201020704070206010c02030105010601030106011303070103010c0601010e021a01010704020801020201030c0506010a010203010202030e0311010201050105010b010504040203040a0102010302030302010102090301010403010104010e010501050a04070201060304030b0203010e01030201011b0102020c010701040303020d0604011001130202021304020505020201010105010a010f0106020801030113010802020104040b020104080104010e03090205020905020205010e040306020304010204100106010101150103050501050101010c0102010401050201030203010315010f0111020e0403010702030203030c0101020d02030205030a01040605050a02020101011201060502020201020101010a030f010706040105010101080502020301010608040b050101010104010d0103032001030201010f01040103011003010102060401070107010f01030116030301030205030902020611030102010101010101010201020202050403011b0208010b02030202010e0108010b010e01010508040701010103050604040515020202030104020703060203010601060106020f0101020302040102010c0302010101030105010a03020311020301030201010d03040107020502010109030b0302090801070304010301010402020501010201010501050202040302030305010101040201020802020101020f0107010b020302070313010b06050201030c010b04010106050302030102030401090208010901050307020d030b0309030e01090106010302020202010501040708010e0102010401010101020304010303010505050104020201070107020101040102010e0104011a0105050a030d030f010a02130103030b0107050e0203040403010208030d0701030101080206010a010e0203020501070303011102030203010101050117010e040d040701180201010e0201010402090204012c031c010e0204020502080113010201"; + let want = decode_hex(GOLDEN_HEX); + assert_eq!( + got, + want, + "HLL sub-window delta bytes diverge from the Go reference golden \ + ({} bytes got vs {} bytes want)", + got.len(), + want.len(), + ); + + // Sanity: applying the packed sub-window delta to the base + // reconstructs the current register state (register max-merge). + let mut reconstructed = base.clone(); + reconstructed.apply_delta_bytes(&got).unwrap(); + assert_eq!(reconstructed.registers, current.registers); + } } diff --git a/src/proto/generated/sketchlib.v1.rs b/src/proto/generated/sketchlib.v1.rs index 7969920..e3b6194 100644 --- a/src/proto/generated/sketchlib.v1.rs +++ b/src/proto/generated/sketchlib.v1.rs @@ -212,6 +212,12 @@ pub struct CountMinState { /// to the Go reference implementation's CountMinDelta so the two runtimes emit /// byte-identical delta frames for identical window state (cross-language byte /// parity). +/// +/// CountMinDelta is structurally identical to CountSketchDelta: rows, cols, the +/// packed cell encoding, per-row norm deltas, and an optional repeated hh_keys +/// at the SAME tag number (6). Both sketches can track heavy hitters; whether +/// hh_keys is populated is a control-plane decision (it is empty/omitted when +/// heavy-hitter tracking is not enabled for this sketch). #[derive(Clone, PartialEq, ::prost::Message)] pub struct CountMinDelta { #[prost(uint32, tag = "1")] @@ -228,6 +234,13 @@ pub struct CountMinDelta { /// Per-row L2 norm deltas, length = rows. #[prost(double, repeated, tag = "5")] pub l2: ::prost::alloc::vec::Vec, + /// Heavy-hitter candidate keys from an upstream tracker, mirroring + /// CountSketchDelta.hh_keys (same tag number 6, same wire shape). Downstream + /// queries the merged Count-Min matrix for each key to (re)build its Top-K + /// with globally-merged estimates. Empty/omitted when heavy-hitter tracking + /// is not enabled. + #[prost(string, repeated, tag = "6")] + pub hh_keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, /// Packed cell encoding (canonical form): /// /// row index of each changed cell @@ -454,23 +467,31 @@ pub struct HllSparseRegisters { /// losslessly on the receiver via register\[index\] = max(register\[index\], value) /// — no register update is ever dropped (there is no threshold to apply). /// +/// The increased registers are varint-packed exactly like +/// HLLSparseRegisters.packed: sorted ascending by index, each register emitted +/// as (index_delta, value) where index_delta = index - prev_index (prev_index +/// starts at 0) and value is the new (larger) register value: +/// +/// for each increased register, in ascending index order: +/// uvarint(index - prev_index) // prev_index starts at 0; deltas are >= 0 +/// uvarint(value) // 1..=Q+1, always 1 byte in practice +/// +/// A single-emit delta against the all-zero snapshot is therefore the same size +/// as the full sparse frame, and a sub-window delta is strictly smaller (it +/// packs only the registers that actually grew). This replaces the previous +/// per-register sub-message encoding, eliminating ~6–8 bytes of tag/length +/// overhead per register. +/// /// This message is byte-identical to the Go reference implementation's HLLDelta /// so the two runtimes emit byte-identical delta frames for identical window /// state (cross-language byte parity). #[derive(Clone, PartialEq, ::prost::Message)] pub struct HllDelta { - #[prost(message, repeated, tag = "1")] - pub updates: ::prost::alloc::vec::Vec, -} -/// HLLRegisterUpdate is one register whose value increased. -#[derive(Clone, Copy, PartialEq, ::prost::Message)] -pub struct HllRegisterUpdate { - /// register index, 0 – 2^precision-1 - #[prost(uint32, tag = "1")] - pub index: u32, - /// new value (only sent when > snapshot value) - #[prost(uint32, tag = "2")] - pub value: u32, + /// Varint-packed (index_delta, value) pairs for the registers that increased, + /// in ascending index order. Same layout as HLLSparseRegisters.packed. Empty + /// when no register changed. + #[prost(bytes = "vec", tag = "1")] + pub packed_updates: ::prost::alloc::vec::Vec, } /// HLLVariant identifies which HLL estimator algorithm the registers belong to. #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]