Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions src/message_pack_format/portable/countminsketch_topk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,19 +138,14 @@ impl std::fmt::Debug for CountMinSketchWithHeap {

impl Clone for CountMinSketchWithHeap {
fn clone(&self) -> Self {
let sketch = matrix_from_sketchlib_cms_heap(&self.backend);
let wire_heap: Vec<WireHeapItem> = heap_to_wire(&self.backend);
// Structural clone: `CMSHeap` derives `Clone` (its `CountMin` + `HHHeap`
// fields are `Clone`), so copy the backend directly instead of
// extracting the matrix + heap to wire form and rebuilding it.
Self {
rows: self.rows,
cols: self.cols,
heap_size: self.heap_size,
backend: sketchlib_cms_heap_from_matrix_and_heap(
self.rows,
self.cols,
self.heap_size,
&sketch,
&wire_heap,
),
backend: self.backend.clone(),
}
}
}
Expand Down
24 changes: 22 additions & 2 deletions src/message_pack_format/portable/kll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,34 @@ impl KllSketch {

impl Clone for KllSketch {
fn clone(&self) -> Self {
let bytes = bytes_from_sketchlib_kll(&self.backend);
// Structural clone: the backing `KLL<f64>` derives `Clone` (its fields
// are `Box<[..]>`/`Vec`/scalars), so copy it directly instead of a full
// msgpack serialize -> deserialize -> rebuild round-trip. Produces an
// identical sketch far more cheaply (no rmp encode/decode, far fewer
// allocations).
Self {
k: self.k,
backend: sketchlib_kll_from_bytes(&bytes).unwrap(),
backend: self.backend.clone(),
}
}
}

impl KllSketch {
/// Reconstruct directly from portable wire state (k + level-ordered items +
/// level boundaries) without replaying items through `update()`. Bit-exact.
pub fn from_portable_state(
k: u16,
items: &[f64],
levels: &[usize],
num_levels: usize,
) -> Result<Self, String> {
Ok(Self {
k,
backend: SketchlibKll::from_portable_state(k as usize, items, levels, num_levels)?,
})
}
}

impl std::fmt::Debug for KllSketch {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("KllSketch")
Expand Down
1 change: 1 addition & 0 deletions src/sketches/countminsketch_topk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const DEFAULT_TOP_K: usize = 32;
/// A Count-Min Sketch paired with a top-k heavy-hitter heap.
///
/// Generic over the same type parameters as [`CountMin`].
#[derive(Clone)]
pub struct CMSHeap<
S: MatrixStorage = Vector2D<i64>,
Mode = RegularPath,
Expand Down
93 changes: 93 additions & 0 deletions src/sketches/kll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,66 @@ impl<T: NumericalValue> KLL<T> {
s
}

/// Reconstruct a KLL directly from portable wire state — the retained
/// `items` in level order plus the `levels` boundary array (proto contract:
/// `levels[0] == 0`, `levels[num_levels] == items.len()`). Avoids replaying
/// every item through `update()`: it places the items straight into the
/// internal buffer and fixes up the level boundaries, so the result is
/// bit-identical to the source sketch (identical quantiles) at a fraction
/// of the cost. Errors if the supplied state is inconsistent.
pub fn from_portable_state(
k: usize,
items: &[T],
levels: &[usize],
num_levels: usize,
) -> Result<Self, String> {
let mut s = Self::init_kll(k as i32);
if items.is_empty() {
return Ok(s);
}
if num_levels == 0 || num_levels >= s.levels.len() {
return Err(format!(
"from_portable_state: invalid num_levels {num_levels}"
));
}
if levels.len() != num_levels + 1 {
return Err(format!(
"from_portable_state: levels.len()={} != num_levels+1={}",
levels.len(),
num_levels + 1
));
}
if levels[0] != 0 || levels[num_levels] != items.len() {
return Err(format!(
"from_portable_state: level bounds [{}..{}] inconsistent with items.len()={}",
levels[0],
levels[num_levels],
items.len()
));
}
if items.len() > s.max_capacity {
return Err(format!(
"from_portable_state: {} items exceed max_capacity {} for k={}",
items.len(),
s.max_capacity,
k
));
}
// Live data occupies the high end of the buffer; free space at the front.
let offset = s.max_capacity - items.len();
let max_cap = s.max_capacity;
s.items[offset..].clone_from_slice(items);
for (dst, &src) in s.levels.iter_mut().zip(levels[..=num_levels].iter()) {
*dst = src + offset;
}
for dst in s.levels.iter_mut().skip(num_levels + 1) {
*dst = max_cap;
}
s.num_levels = num_levels;
s.rebuild_capacity_cache(); // recomputes top_height + level0_capacity for num_levels
Ok(s)
}

/// Hot-path insert: decrement `levels[0]`, write item, check capacity.
#[inline]
fn push_value(&mut self, value: T) {
Expand Down Expand Up @@ -813,6 +873,39 @@ mod tests {
use super::*;
use crate::test_utils::{sample_uniform_f64, sample_zipf_f64};

// Direct reconstruction from portable wire state must be BIT-EXACT:
// identical quantiles to the source sketch (unlike a lossy
// replay-through-`update()` reconstruction).
#[test]
fn from_portable_state_reproduces_source_exactly() {
let k = 200usize;
let mut src = KLL::<f64>::init_kll(k as i32);
for i in 0..200_000u32 {
src.update(&((i as f64) * 0.0007 + 3.0));
}
// Extract the compacted portable form (proto contract: levels[0]==0,
// levels[num_levels]==items.len()).
let off = src.levels[0];
let items: Vec<f64> = src.items[off..src.max_capacity].to_vec();
let levels: Vec<usize> = (0..=src.num_levels).map(|h| src.levels[h] - off).collect();
let n = src.num_levels;

// (A) bit-exact: direct reconstruction reproduces the source exactly.
let rebuilt = KLL::<f64>::from_portable_state(k, &items, &levels, n).unwrap();
for &q in &[0.0, 0.01, 0.1, 0.25, 0.5, 0.75, 0.9, 0.99, 1.0] {
assert_eq!(
rebuilt.quantile(q),
src.quantile(q),
"direct reconstruction quantile mismatch at q={q}"
);
}

// An empty sketch round-trips to an empty sketch.
let empty = KLL::<f64>::from_portable_state(k, &[], &[], 0).unwrap();
assert_eq!(empty.num_levels, 1);
assert_eq!(empty.levels[0], empty.max_capacity);
}

// Ensure each 64-bit chunk is consumed bit-by-bit before refilling.
#[test]
fn coin_bit_cache_behavior() {
Expand Down
Loading