diff --git a/Cargo.lock b/Cargo.lock index e6f69253..5b42d28f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -412,7 +412,7 @@ dependencies = [ [[package]] name = "asap_sketchlib" version = "0.1.0" -source = "git+https://github.com/ProjectASAP/asap_sketchlib#81c3436dde44cc587c098d42bf42db77acdb4fa5" +source = "git+https://github.com/ProjectASAP/asap_sketchlib#cd1f10d90d315c36306ff3b10748bac33e5fded6" dependencies = [ "bytes", "prost", @@ -1435,7 +1435,7 @@ dependencies = [ "itertools 0.13.0", "log", "paste", - "petgraph 0.6.5", + "petgraph", ] [[package]] @@ -1675,12 +1675,6 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" -[[package]] -name = "fixedbitset" -version = "0.5.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" - [[package]] name = "flatbuffers" version = "24.12.23" @@ -2400,15 +2394,6 @@ dependencies = [ "either", ] -[[package]] -name = "itertools" -version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" -dependencies = [ - "either", -] - [[package]] name = "itoa" version = "1.0.18" @@ -3152,17 +3137,7 @@ version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" dependencies = [ - "fixedbitset 0.4.2", - "indexmap 2.14.0", -] - -[[package]] -name = "petgraph" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" -dependencies = [ - "fixedbitset 0.5.7", + "fixedbitset", "indexmap 2.14.0", ] @@ -3424,11 +3399,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ "heck 0.5.0", - "itertools 0.14.0", + "itertools 0.13.0", "log", "multimap", "once_cell", - "petgraph 0.7.1", + "petgraph", "prettyplease", "prost", "prost-types", @@ -3444,7 +3419,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.14.0", + "itertools 0.13.0", "proc-macro2", "quote", "syn 2.0.117", @@ -5105,7 +5080,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.48.0", ] [[package]] diff --git a/asap-query-engine/src/precompute_engine/accumulator_factory.rs b/asap-query-engine/src/precompute_engine/accumulator_factory.rs index 13c86b07..30fbc7e9 100644 --- a/asap-query-engine/src/precompute_engine/accumulator_factory.rs +++ b/asap-query-engine/src/precompute_engine/accumulator_factory.rs @@ -464,7 +464,11 @@ impl AccumulatorUpdater for CmsAccumulatorUpdater { } fn update_keyed(&mut self, key: &KeyByLabelValues, value: f64, _timestamp_ms: i64) { - self.acc.inner.update(&key.to_semicolon_str(), value); + crate::precompute_operators::sketchlib_runtime::cms_update( + &mut self.acc.inner, + &key.to_semicolon_str(), + value, + ); } impl_clone_accumulator_methods!(acc); @@ -855,6 +859,6 @@ mod tests { .as_any() .downcast_ref::() .expect("should be KLL"); - assert_eq!(kll.inner.k, 50, "k should be 50 from capital-K param"); + assert_eq!(kll.inner.k(), 50, "k should be 50 from capital-K param"); } } diff --git a/asap-query-engine/src/precompute_engine/worker.rs b/asap-query-engine/src/precompute_engine/worker.rs index 02c330bd..05781979 100644 --- a/asap-query-engine/src/precompute_engine/worker.rs +++ b/asap-query-engine/src/precompute_engine/worker.rs @@ -776,7 +776,7 @@ mod tests { use crate::precompute_operators::datasketches_kll_accumulator::DatasketchesKLLAccumulator; use crate::precompute_operators::multiple_sum_accumulator::MultipleSumAccumulator; use crate::precompute_operators::sum_accumulator::SumAccumulator; - use asap_sketchlib::sketches::kll::KllSketch; + use asap_sketchlib::KllSketch; use asap_types::enums::{AggregationType, WindowType}; fn make_agg_config( @@ -1511,7 +1511,7 @@ mod tests { handcrafted_output.end_timestamp, arroyo_output.end_timestamp ); - assert_eq!(handcrafted_acc.inner.k, arroyo_acc.inner.k); + assert_eq!(handcrafted_acc.inner.k(), arroyo_acc.inner.k()); assert_eq!(handcrafted_acc.inner.count(), arroyo_acc.inner.count()); for quantile in [0.0, 0.5, 1.0] { diff --git a/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs b/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs index 6840cb58..613e908b 100644 --- a/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs @@ -2,34 +2,45 @@ use crate::data_model::{ AggregateCore, AggregationType, KeyByLabelValues, MergeableAccumulator, MultipleSubpopulationAggregate, SerializableToSink, }; -use asap_sketchlib::sketches::countminsketch::CountMinSketch; +use crate::precompute_operators::sketchlib_runtime::{ + cms_from_matrix, cms_from_msgpack, cms_matrix, cms_merge_refs, cms_to_msgpack, RuntimeCountMin, +}; +#[cfg(test)] +use crate::precompute_operators::sketchlib_runtime::cms_update; +use asap_sketchlib::DataInput; use serde_json::Value; use std::collections::HashMap; use promql_utilities::query_logics::enums::Statistic; -/// Count-Min Sketch accumulator — wraps asap_sketchlib::sketches::CountMinSketch. -/// Core struct, update/merge/serde logic live in `asap_sketchlib::sketches`. -/// This file retains QE-specific trait impls, legacy deserializers, and JSON output. +/// Count-Min Sketch accumulator — holds `sketches::CountMin<.., FastPath>` +/// directly. Wire format (Go-compatible MessagePack envelope) and +/// matrix-shape conversions live in `sketchlib_runtime`. #[derive(Debug, Clone)] pub struct CountMinSketchAccumulator { - pub inner: CountMinSketch, + pub inner: RuntimeCountMin, } impl CountMinSketchAccumulator { pub fn new(row_num: usize, col_num: usize) -> Self { Self { - inner: CountMinSketch::new(row_num, col_num), + inner: RuntimeCountMin::with_dimensions(row_num, col_num), } } - // Marked as _update and kept private; only called internally. + // Test-only helper. Production goes straight through + // `sketchlib_runtime::cms_update` in `CmsAccumulatorUpdater::update_keyed`; + // this wrapper exists so the regression test + // `test_update_and_query_use_same_key_encoding` exercises the same + // `key.to_semicolon_str()` encoding path that production uses. + #[cfg(test)] fn _update(&mut self, key: &KeyByLabelValues, value: f64) { - self.inner.update(&key.to_semicolon_str(), value); + cms_update(&mut self.inner, &key.to_semicolon_str(), value); } pub fn query_key(&self, key: &KeyByLabelValues) -> f64 { - self.inner.estimate(&key.to_semicolon_str()) + self.inner + .estimate(&DataInput::String(key.to_semicolon_str())) } pub fn deserialize_from_json(data: &Value) -> Result> { @@ -56,7 +67,7 @@ impl CountMinSketchAccumulator { } Ok(Self { - inner: CountMinSketch::from_legacy_matrix(sketch, row_num, col_num), + inner: cms_from_matrix(sketch, row_num, col_num)?, }) } @@ -64,8 +75,7 @@ impl CountMinSketchAccumulator { buffer: &[u8], ) -> Result> { Ok(Self { - inner: CountMinSketch::deserialize_msgpack(buffer) - .map_err(|e| -> Box { e.to_string().into() })?, + inner: cms_from_msgpack(buffer)?, }) } @@ -108,7 +118,7 @@ impl CountMinSketchAccumulator { } Ok(Self { - inner: CountMinSketch::from_legacy_matrix(sketch, row_num, col_num), + inner: cms_from_matrix(sketch, row_num, col_num)?, }) } @@ -136,20 +146,9 @@ impl CountMinSketchAccumulator { cms_accumulators.push(cms_acc); } - // Check dimensions are consistent - let rows = cms_accumulators[0].inner.rows(); - let cols = cms_accumulators[0].inner.cols(); - for acc in &cms_accumulators { - if acc.inner.rows() != rows || acc.inner.cols() != cols { - return Err( - "Cannot merge CountMinSketch accumulators with different dimensions".into(), - ); - } - } - - let inner_refs: Vec<&CountMinSketch> = + let inner_refs: Vec<&RuntimeCountMin> = cms_accumulators.iter().map(|acc| &acc.inner).collect(); - let merged_inner = CountMinSketch::merge_refs(&inner_refs)?; + let merged_inner = cms_merge_refs(&inner_refs)?; Ok(Self { inner: merged_inner, }) @@ -161,12 +160,12 @@ impl SerializableToSink for CountMinSketchAccumulator { serde_json::json!({ "row_num": self.inner.rows(), "col_num": self.inner.cols(), - "sketch": self.inner.sketch() + "sketch": cms_matrix(&self.inner) }) } fn serialize_to_bytes(&self) -> Vec { - self.inner.serialize_msgpack().unwrap_or_default() + cms_to_msgpack(&self.inner) } } @@ -200,7 +199,7 @@ impl AggregateCore for CountMinSketchAccumulator { .downcast_ref::() .ok_or("Failed to downcast to CountMinSketchAccumulator")?; - let merged_inner = CountMinSketch::merge_refs(&[&self.inner, &other_cms.inner])?; + let merged_inner = cms_merge_refs(&[&self.inner, &other_cms.inner])?; Ok(Box::new(Self { inner: merged_inner, })) @@ -250,12 +249,11 @@ impl MergeableAccumulator for CountMinSketchAccumulat if accumulators.is_empty() { return Err("No accumulators to merge".into()); } - let mut iter = accumulators.into_iter(); - let mut merged = iter.next().unwrap(); - for acc in iter { - merged.inner.merge(&acc.inner)?; - } - Ok(merged) + let inner_refs: Vec<&RuntimeCountMin> = accumulators.iter().map(|acc| &acc.inner).collect(); + let merged_inner = cms_merge_refs(&inner_refs)?; + Ok(Self { + inner: merged_inner, + }) } } @@ -268,7 +266,7 @@ mod tests { let cms = CountMinSketchAccumulator::new(4, 1000); assert_eq!(cms.inner.rows(), 4); assert_eq!(cms.inner.cols(), 1000); - let sketch = cms.inner.sketch(); + let sketch = cms_matrix(&cms.inner); assert_eq!(sketch.len(), 4); assert_eq!(sketch[0].len(), 1000); @@ -300,25 +298,16 @@ mod tests { #[test] fn test_count_min_sketch_merge() { - // Build controlled state via from_legacy_matrix (works for both Legacy and Sketchlib backends). let cms1 = CountMinSketchAccumulator { - inner: CountMinSketch::from_legacy_matrix( - vec![vec![5.0, 0.0, 0.0], vec![0.0, 0.0, 10.0]], - 2, - 3, - ), + inner: cms_from_matrix(vec![vec![5.0, 0.0, 0.0], vec![0.0, 0.0, 10.0]], 2, 3).unwrap(), }; let cms2 = CountMinSketchAccumulator { - inner: CountMinSketch::from_legacy_matrix( - vec![vec![3.0, 7.0, 0.0], vec![0.0, 0.0, 0.0]], - 2, - 3, - ), + inner: cms_from_matrix(vec![vec![3.0, 7.0, 0.0], vec![0.0, 0.0, 0.0]], 2, 3).unwrap(), }; let merged = CountMinSketchAccumulator::merge_accumulators(vec![cms1, cms2]).unwrap(); - let merged_sketch = merged.inner.sketch(); + let merged_sketch = cms_matrix(&merged.inner); assert_eq!(merged_sketch[0][0], 8.0); assert_eq!(merged_sketch[0][1], 7.0); assert_eq!(merged_sketch[1][2], 10.0); @@ -335,11 +324,8 @@ mod tests { #[test] fn test_count_min_sketch_serialization() { let cms = CountMinSketchAccumulator { - inner: CountMinSketch::from_legacy_matrix( - vec![vec![0.0, 42.0, 0.0], vec![0.0, 0.0, 100.0]], - 2, - 3, - ), + inner: cms_from_matrix(vec![vec![0.0, 42.0, 0.0], vec![0.0, 0.0, 100.0]], 2, 3) + .unwrap(), }; let bytes = cms.serialize_to_bytes(); @@ -348,7 +334,7 @@ mod tests { assert_eq!(deserialized.inner.rows(), 2); assert_eq!(deserialized.inner.cols(), 3); - let deser_sketch = deserialized.inner.sketch(); + let deser_sketch = cms_matrix(&deserialized.inner); assert_eq!(deser_sketch[0][1], 42.0); assert_eq!(deser_sketch[1][2], 100.0); } @@ -393,9 +379,9 @@ mod tests { // Also verify a different key does not interfere. let other_key = KeyByLabelValues::new_with_labels(vec!["api".to_string()]); - // other_key was never updated; its estimate should be lower than key's. + // other_key was never updated; in a sketch this large there should be no + // collision, so its estimate should be exactly 0. let other_result = cms.query_key(&other_key); - // In a sketch this large there should be no collision, so other_result == 0. assert_eq!( other_result, 0.0, "unrelated key returned non-zero: {other_result}" @@ -418,27 +404,14 @@ mod tests { #[test] fn test_count_min_sketch_merge_multiple() { - // Build controlled state via from_legacy_matrix (works for both Legacy and Sketchlib backends). let cms1 = CountMinSketchAccumulator { - inner: CountMinSketch::from_legacy_matrix( - vec![vec![5.0, 0.0, 0.0], vec![0.0, 0.0, 10.0]], - 2, - 3, - ), + inner: cms_from_matrix(vec![vec![5.0, 0.0, 0.0], vec![0.0, 0.0, 10.0]], 2, 3).unwrap(), }; let cms2 = CountMinSketchAccumulator { - inner: CountMinSketch::from_legacy_matrix( - vec![vec![3.0, 7.0, 0.0], vec![0.0, 0.0, 0.0]], - 2, - 3, - ), + inner: cms_from_matrix(vec![vec![3.0, 7.0, 0.0], vec![0.0, 0.0, 0.0]], 2, 3).unwrap(), }; let cms3 = CountMinSketchAccumulator { - inner: CountMinSketch::from_legacy_matrix( - vec![vec![2.0, 0.0, 0.0], vec![0.0, 0.0, 5.0]], - 2, - 3, - ), + inner: cms_from_matrix(vec![vec![2.0, 0.0, 0.0], vec![0.0, 0.0, 5.0]], 2, 3).unwrap(), }; let boxed_accs: Vec> = @@ -446,7 +419,7 @@ mod tests { let merged = CountMinSketchAccumulator::merge_multiple(&boxed_accs).unwrap(); - let merged_sketch = merged.inner.sketch(); + let merged_sketch = cms_matrix(&merged.inner); assert_eq!(merged_sketch[0][0], 10.0); assert_eq!(merged_sketch[0][1], 7.0); assert_eq!(merged_sketch[1][2], 15.0); diff --git a/asap-query-engine/src/precompute_operators/count_min_sketch_with_heap_accumulator.rs b/asap-query-engine/src/precompute_operators/count_min_sketch_with_heap_accumulator.rs index c96cde98..119bbc30 100644 --- a/asap-query-engine/src/precompute_operators/count_min_sketch_with_heap_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/count_min_sketch_with_heap_accumulator.rs @@ -2,7 +2,8 @@ use crate::data_model::{ AggregateCore, AggregationType, KeyByLabelValues, MergeableAccumulator, MultipleSubpopulationAggregate, SerializableToSink, }; -use asap_sketchlib::sketches::countminsketch_topk::{CmsHeapItem, CountMinSketchWithHeap}; +use asap_sketchlib::message_pack_format::MessagePackCodec; +use asap_sketchlib::{CmsHeapItem, CountMinSketchWithHeap}; use serde_json::Value; use std::collections::HashMap; @@ -17,7 +18,7 @@ pub struct CountMinSketchWithHeapAccumulator { } // Re-export HeapItem so existing code using CountMinSketchWithHeapAccumulator::HeapItem still works. -pub use asap_sketchlib::sketches::countminsketch_topk::CmsHeapItem as HeapItemReexport; +pub use asap_sketchlib::CmsHeapItem as HeapItemReexport; impl CountMinSketchWithHeapAccumulator { pub fn new(row_num: usize, col_num: usize, heap_size: usize) -> Self { @@ -85,7 +86,7 @@ impl CountMinSketchWithHeapAccumulator { buffer: &[u8], ) -> Result> { Ok(Self { - inner: CountMinSketchWithHeap::deserialize_msgpack(buffer) + inner: CountMinSketchWithHeap::from_msgpack(buffer) .map_err(|e| -> Box { e.to_string().into() })?, }) } @@ -131,7 +132,7 @@ impl SerializableToSink for CountMinSketchWithHeapAccumulator { } fn serialize_to_bytes(&self) -> Vec { - self.inner.serialize_msgpack().unwrap_or_default() + self.inner.to_msgpack().unwrap_or_default() } } diff --git a/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs b/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs index 33e085d6..3ca025a8 100644 --- a/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs @@ -2,7 +2,9 @@ use crate::data_model::{ AggregateCore, AggregationType, MergeableAccumulator, SerializableToSink, SingleSubpopulationAggregate, }; -use asap_sketchlib::sketches::kll::KllSketch; +use crate::precompute_operators::sketchlib_runtime::{ + kll_from_msgpack, kll_merge_refs, kll_quantile, kll_sketch_bytes, kll_to_msgpack, RuntimeKll, +}; use base64::{engine::general_purpose, Engine as _}; use serde_json::Value; use std::collections::HashMap; @@ -12,26 +14,26 @@ use tracing::debug; use promql_utilities::query_logics::enums::Statistic; -/// KLL sketch accumulator — wraps asap_sketchlib::sketches::KllSketch. -/// Core struct, update/merge/serde logic live in `asap_sketchlib::sketches`. -/// This file retains QE-specific trait impls and JSON output. +/// KLL sketch accumulator — holds `sketches::KLL` directly. Wire +/// format (`KllSketchData { k, sketch_bytes }`) and base64-JSON output +/// live in `sketchlib_runtime`. pub struct DatasketchesKLLAccumulator { - pub inner: KllSketch, + pub inner: RuntimeKll, } impl DatasketchesKLLAccumulator { pub fn new(k: u16) -> Self { Self { - inner: KllSketch::new(k), + inner: RuntimeKll::init_kll(k as i32), } } pub fn update(&mut self, value: f64) { - self.inner.update(value); + self.inner.update(&value); } pub fn get_quantile(&self, quantile: f64) -> f64 { - self.inner.quantile(quantile) + kll_quantile(&self.inner, quantile) } pub fn deserialize_from_bytes_arroyo( @@ -42,8 +44,7 @@ impl DatasketchesKLLAccumulator { buffer.len() ); Ok(Self { - inner: KllSketch::deserialize_msgpack(buffer) - .map_err(|e| -> Box { e.to_string().into() })?, + inner: kll_from_msgpack(buffer)?, }) } @@ -71,15 +72,14 @@ impl DatasketchesKLLAccumulator { kll_accumulators.push(kll_acc); } - let inner_refs: Vec<&KllSketch> = kll_accumulators.iter().map(|acc| &acc.inner).collect(); - let merged_inner = KllSketch::merge_refs(&inner_refs)?; + let inner_refs: Vec<&RuntimeKll> = kll_accumulators.iter().map(|acc| &acc.inner).collect(); + let merged_inner = kll_merge_refs(&inner_refs)?; Ok(Self { inner: merged_inner, }) } } -// Manual trait implementations since the C++ library doesn't provide them impl Clone for DatasketchesKLLAccumulator { fn clone(&self) -> Self { Self { @@ -91,29 +91,28 @@ impl Clone for DatasketchesKLLAccumulator { impl std::fmt::Debug for DatasketchesKLLAccumulator { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("DatasketchesKLLAccumulator") - .field("k", &self.inner.k) + .field("k", &self.inner.k()) .field("sketch_n", &self.inner.count()) .finish() } } -// TODO: verify this -// Thread safety: The C++ library is not thread-safe by default, but since we're using it -// in a single-threaded context per accumulator instance and only sharing read-only operations, -// this should be safe. -unsafe impl Send for DatasketchesKLLAccumulator {} -unsafe impl Sync for DatasketchesKLLAccumulator {} +// `RuntimeKll` (= `KLL`) is a pure-Rust type with no interior FFI +// handles, so `DatasketchesKLLAccumulator` auto-derives `Send + Sync`. The +// hand-written `unsafe impl`s that were needed when the backend wrapped a +// C++ sketch are gone; `tests::accumulator_is_send_sync` statically pins the +// auto-derivation so a future field can't silently regress it. impl SerializableToSink for DatasketchesKLLAccumulator { fn serialize_to_json(&self) -> Value { // Mirror Python implementation: {"sketch": base64_encoded_string} - let sketch_bytes = self.inner.sketch_bytes(); + let sketch_bytes = kll_sketch_bytes(&self.inner); let sketch_b64 = general_purpose::STANDARD.encode(&sketch_bytes); serde_json::json!({ "sketch": sketch_b64 }) } fn serialize_to_bytes(&self) -> Vec { - self.inner.serialize_msgpack().unwrap_or_default() + kll_to_msgpack(&self.inner) } } @@ -139,7 +138,7 @@ impl AggregateCore for DatasketchesKLLAccumulator { #[cfg(feature = "extra_debugging")] debug!( "[PERF] DatasketchesKLLAccumulator::merge_with() started - self.k={}, self.n={}", - self.inner.k, + self.inner.k(), self.inner.count() ); @@ -156,7 +155,7 @@ impl AggregateCore for DatasketchesKLLAccumulator { .downcast_ref::() .ok_or("Failed to downcast to DatasketchesKLLAccumulator")?; - let merged_inner = KllSketch::merge_refs(&[&self.inner, &other_kll.inner])?; + let merged_inner = kll_merge_refs(&[&self.inner, &other_kll.inner])?; let merged = Self { inner: merged_inner, }; @@ -232,12 +231,11 @@ impl MergeableAccumulator for DatasketchesKLLAccumul if accumulators.is_empty() { return Err("No accumulators to merge".into()); } - let mut iter = accumulators.into_iter(); - let mut merged = iter.next().unwrap(); - for acc in iter { - merged.inner.merge(&acc.inner)?; - } - Ok(merged) + let inner_refs: Vec<&RuntimeKll> = accumulators.iter().map(|acc| &acc.inner).collect(); + let merged_inner = kll_merge_refs(&inner_refs)?; + Ok(Self { + inner: merged_inner, + }) } } @@ -245,11 +243,21 @@ impl MergeableAccumulator for DatasketchesKLLAccumul mod tests { use super::*; + // Pins the auto-derived thread-safety that replaced the old `unsafe impl + // Send/Sync`. If a future field makes the accumulator non-`Send`/`Sync`, + // this stops compiling instead of silently breaking callers that require + // the bounds. + #[test] + fn accumulator_is_send_sync() { + fn assert_send_sync() {} + assert_send_sync::(); + } + #[test] fn test_datasketches_kll_creation() { let kll = DatasketchesKLLAccumulator::new(200); assert!(kll.inner.count() == 0); - assert_eq!(kll.inner.k, 200); + assert_eq!(kll.inner.k(), 200); } #[test] @@ -269,7 +277,6 @@ mod tests { } assert_eq!(kll.get_quantile(0.0), 1.0); assert_eq!(kll.get_quantile(1.0), 10.0); - // Sketchlib KLL is approximate; 0.5 quantile of 1..10 may be 5, 6, or 7. let q50 = kll.get_quantile(0.5); assert!((q50 - 6.0).abs() <= 1.0, "expected median ~6, got {q50}"); } @@ -284,7 +291,6 @@ mod tests { let mut query_kwargs = HashMap::new(); query_kwargs.insert("quantile".to_string(), "0.5".to_string()); let result = kll.query(Statistic::Quantile, Some(&query_kwargs)).unwrap(); - // Sketchlib KLL is approximate; 0.5 quantile of 1..10 may be 5, 6, or 7. assert!( (result - 6.0).abs() <= 1.0, "expected median ~6, got {result}" @@ -322,7 +328,7 @@ mod tests { let deserialized = DatasketchesKLLAccumulator::deserialize_from_bytes_arroyo(&bytes).unwrap(); - assert_eq!(deserialized.inner.k, 200); + assert_eq!(deserialized.inner.k(), 200); assert_eq!(deserialized.inner.count(), 5); assert_eq!(deserialized.get_quantile(0.0), 1.0); assert_eq!(deserialized.get_quantile(1.0), 5.0); @@ -352,7 +358,6 @@ mod tests { let mut query_kwargs = HashMap::new(); query_kwargs.insert("quantile".to_string(), "0.5".to_string()); let result = kll.query(Statistic::Quantile, Some(&query_kwargs)).unwrap(); - // Sketchlib KLL is approximate; 0.5 quantile of 1..10 may be 5, 6, or 7. assert!( (result - 6.0).abs() <= 1.0, "expected median ~6, got {result}" @@ -360,7 +365,6 @@ mod tests { query_kwargs.insert("quantile".to_string(), "0.9".to_string()); let result = kll.query(Statistic::Quantile, Some(&query_kwargs)).unwrap(); - // Sketchlib KLL is approximate; 0.9 quantile of 1..10 may be 9 or 10. assert!( (9.0..=10.0).contains(&result), "expected 0.9 quantile in [9,10], got {result}" diff --git a/asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs b/asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs index f323426a..a8354bea 100644 --- a/asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs @@ -2,7 +2,8 @@ use crate::data_model::{ AggregateCore, AggregationType, KeyByLabelValues, MergeableAccumulator, MultipleSubpopulationAggregate, SerializableToSink, }; -use asap_sketchlib::sketches::delta_set_aggregator::{deserialize_msgpack, serialize_msgpack}; +use asap_sketchlib::message_pack_format::MessagePackCodec; +use asap_sketchlib::DeltaResult; use serde_json::Value; use std::collections::{HashMap, HashSet}; @@ -153,7 +154,7 @@ impl DeltaSetAggregatorAccumulator { buffer: &[u8], ) -> Result> { // Delegate to sketch-core canonical DeltaResult msgpack format - let delta = deserialize_msgpack(buffer) + let delta = DeltaResult::from_msgpack(buffer) .map_err(|e| -> Box { e.to_string().into() })?; let mut added = HashSet::new(); @@ -203,7 +204,9 @@ impl SerializableToSink for DeltaSetAggregatorAccumulator { .iter() .map(|key| key.to_semicolon_str()) .collect(); - serialize_msgpack(&added, &removed).unwrap_or_default() + DeltaResult { added, removed } + .to_msgpack() + .unwrap_or_default() } } diff --git a/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs b/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs index f33012da..52b2f3d9 100644 --- a/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs @@ -5,7 +5,8 @@ use crate::{ }, KeyByLabelValues, }; -use asap_sketchlib::sketches::hydra_kll::HydraKllSketch; +use asap_sketchlib::message_pack_format::MessagePackCodec; +use asap_sketchlib::HydraKllSketch; use base64::{engine::general_purpose, Engine as _}; use std::collections::HashMap; @@ -38,7 +39,7 @@ impl HydraKllSketchAccumulator { buffer: &[u8], ) -> Result> { Ok(Self { - inner: HydraKllSketch::deserialize_msgpack(buffer) + inner: HydraKllSketch::from_msgpack(buffer) .map_err(|e| -> Box { e.to_string().into() })?, }) } @@ -51,13 +52,13 @@ impl HydraKllSketchAccumulator { impl SerializableToSink for HydraKllSketchAccumulator { fn serialize_to_json(&self) -> serde_json::Value { // Mirror Python implementation: {"sketch": base64_encoded_string} - let sketch_bytes = self.inner.serialize_msgpack().unwrap_or_default(); + let sketch_bytes = self.inner.to_msgpack().unwrap_or_default(); let sketch_b64 = general_purpose::STANDARD.encode(&sketch_bytes); serde_json::json!({ "sketch": sketch_b64 }) } fn serialize_to_bytes(&self) -> Vec { - self.inner.serialize_msgpack().unwrap_or_default() + self.inner.to_msgpack().unwrap_or_default() } } diff --git a/asap-query-engine/src/precompute_operators/mod.rs b/asap-query-engine/src/precompute_operators/mod.rs index fbbff1cc..01d06082 100644 --- a/asap-query-engine/src/precompute_operators/mod.rs +++ b/asap-query-engine/src/precompute_operators/mod.rs @@ -9,6 +9,7 @@ pub mod multiple_increase_accumulator; pub mod multiple_min_max_accumulator; pub mod multiple_sum_accumulator; pub mod set_aggregator_accumulator; +pub mod sketchlib_runtime; pub mod sum_accumulator; pub use count_min_sketch_accumulator::*; diff --git a/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs b/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs index 45b74d55..1f569dbf 100644 --- a/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs @@ -2,7 +2,8 @@ use crate::data_model::{ AggregateCore, AggregationType, KeyByLabelValues, MergeableAccumulator, MultipleSubpopulationAggregate, SerializableToSink, }; -use asap_sketchlib::sketches::set_aggregator::SetAggregator; +use asap_sketchlib::message_pack_format::MessagePackCodec; +use asap_sketchlib::SetAggregator; use serde_json::Value; use std::collections::{HashMap, HashSet}; @@ -92,7 +93,7 @@ impl SetAggregatorAccumulator { pub fn deserialize_from_bytes_arroyo( buffer: &[u8], ) -> Result> { - let sa = SetAggregator::deserialize_msgpack(buffer) + let sa = SetAggregator::from_msgpack(buffer) .map_err(|e| -> Box { e.to_string().into() })?; let added = sa .values @@ -109,7 +110,7 @@ impl SetAggregatorAccumulator { for key in &self.added { sa.update(&key.to_semicolon_str()); } - sa.serialize_msgpack().unwrap_or_default() + sa.to_msgpack().unwrap_or_default() } } diff --git a/asap-query-engine/src/precompute_operators/sketchlib_runtime.rs b/asap-query-engine/src/precompute_operators/sketchlib_runtime.rs new file mode 100644 index 00000000..d1c0b582 --- /dev/null +++ b/asap-query-engine/src/precompute_operators/sketchlib_runtime.rs @@ -0,0 +1,251 @@ +//! Shared sketch logic for ASAPQuery's accumulators. +//! +//! This is **not** an isolation layer: accumulators hold the concrete +//! runtime types (`RuntimeCountMin`, `RuntimeKll`) and call their inherent +//! API (`with_dimensions`, `update`, `estimate`, `rows`, `cols`, `k`, …) +//! directly. This module only owns the operations that carry real logic +//! and shouldn't be duplicated across callers: +//! - wire codec: the Go-compatible MessagePack envelopes +//! (`*_to_msgpack` / `*_from_msgpack`) and the `Vec>` matrix +//! snapshot / shape-validated reconstruction, +//! - validated merge (`*_merge_refs`): dimension / `k` agreement checks, +//! - the non-negative `cms_update` guard, which two callers share. +//! +//! Cross-language byte parity for the underlying `sketches::*` paths is +//! locked in by `asap_sketchlib::tests::sketches_go_parity_probe`. + +use asap_sketchlib::message_pack_format::portable::countminsketch::CountMinSketchWire; +use asap_sketchlib::message_pack_format::portable::kll::KllSketchData; +use asap_sketchlib::sketches::countminsketch::CountMin; +use asap_sketchlib::sketches::kll::KLL; +use asap_sketchlib::{DataInput, DefaultXxHasher, FastPath, Vector2D}; + +// ============================================================================= +// CountMinSketch — sketches::CountMin, FastPath, DefaultXxHasher> +// ============================================================================= + +/// Concrete runtime CMS type used by `CountMinSketchAccumulator`. Same +/// dimensions + hasher choice as `asap_sketchlib`'s wire-format +/// `CountMinSketch` facade, so the on-the-wire byte shape is identical. +pub type RuntimeCountMin = CountMin, FastPath, DefaultXxHasher>; + +pub fn cms_update(sk: &mut RuntimeCountMin, key: &str, value: f64) { + // Count-min sketches model non-negative frequencies: every cell is a + // monotonically increasing counter and `estimate` returns the row-wise + // minimum. A zero update is a no-op, and a negative update would corrupt + // the estimate for `key` *and* for every other key colliding in any of + // its rows (collisions only ever inflate, never deflate, an estimate). + // The accumulator contract upstream only ever feeds counts/durations, + // which are >= 0, so dropping `value <= 0.0` preserves the invariant + // rather than changing intended behavior. See the regression test + // `cms_update_drops_non_positive_values`. + if value <= 0.0 { + return; + } + sk.insert_many(&DataInput::String(key.to_owned()), value); +} + +/// Snapshot the storage as `Vec>` (used for JSON output + wire DTO). +pub fn cms_matrix(sk: &RuntimeCountMin) -> Vec> { + let storage = sk.as_storage(); + let rows = storage.rows(); + let cols = storage.cols(); + let mut out = vec![vec![0.0f64; cols]; rows]; + for (r, row) in out.iter_mut().enumerate() { + for (c, cell) in row.iter_mut().enumerate() { + // `storage.get` only returns `None` for out-of-bounds access; we + // iterate within the dimensions storage just reported, so this is + // a programmer error if it ever fires. + *cell = *storage + .get(r, c) + .expect("cms_matrix indexed within reported storage dimensions"); + } + } + out +} + +/// Build a CountMin from an existing matrix (used by JSON / legacy +/// byte-format decoders). +/// +/// `rows`/`cols` arrive from the envelope while `matrix` is parsed +/// separately, so a malformed payload can disagree. We reject the +/// mismatch instead of silently padding/truncating with `0.0`, which +/// would be invisible corruption. +// Note on error types in this module: the `*_merge_refs` helpers return +// `Box` because their callers feed into +// `AggregateCore::merge_with` (and friends), whose trait signature requires +// the `Send + Sync` bound. The `*_from_*` decoders return plain +// `Box` because the `deserialize_from_*` methods on the +// accumulators (and their callers across the crate) use that shape and +// bumping the bound would ripple through every accumulator's API. +pub fn cms_from_matrix( + matrix: Vec>, + rows: usize, + cols: usize, +) -> Result> { + if matrix.len() != rows { + return Err(format!( + "CountMin matrix shape mismatch: envelope declares {rows} rows, matrix has {}", + matrix.len() + ) + .into()); + } + if let Some(bad) = matrix.iter().position(|row| row.len() != cols) { + return Err(format!( + "CountMin matrix shape mismatch: envelope declares {cols} cols, row {bad} has {}", + matrix[bad].len() + ) + .into()); + } + let storage = Vector2D::from_fn(rows, cols, |r, c| matrix[r][c]); + Ok(CountMin::from_storage(storage)) +} + +/// Serialize to the Go-compatible MessagePack envelope. +pub fn cms_to_msgpack(sk: &RuntimeCountMin) -> Vec { + let wire = CountMinSketchWire { + sketch: cms_matrix(sk), + rows: sk.rows(), + cols: sk.cols(), + }; + // A `Vec>` + two `usize`s has no unrepresentable state, so + // failure here is a bug, not bad input. Panic loudly rather than emit + // empty bytes that surface downstream as a misleading "buffer too short". + rmp_serde::to_vec(&wire).expect("CountMinSketchWire msgpack serialization is infallible") +} + +/// Deserialize from the Go-compatible MessagePack envelope. +pub fn cms_from_msgpack(bytes: &[u8]) -> Result> { + let wire: CountMinSketchWire = rmp_serde::from_slice(bytes)?; + cms_from_matrix(wire.sketch, wire.rows, wire.cols) +} + +/// Merge a slice of CMS references into a single new sketch. +pub fn cms_merge_refs( + sketches: &[&RuntimeCountMin], +) -> Result> { + let first = *sketches + .first() + .ok_or("cms_merge_refs called with empty input")?; + let rows = first.rows(); + let cols = first.cols(); + for s in sketches { + if s.rows() != rows || s.cols() != cols { + return Err(format!( + "CountMin dimension mismatch in merge: expected {rows}x{cols}, got {}x{}", + s.rows(), + s.cols() + ) + .into()); + } + } + let mut merged = CountMin::with_dimensions(rows, cols); + for s in sketches { + merged.merge(s); + } + Ok(merged) +} + +// ============================================================================= +// KllSketch — sketches::KLL +// ============================================================================= + +/// Concrete runtime KLL type used by `DatasketchesKLLAccumulator`. +pub type RuntimeKll = KLL; + +pub fn kll_quantile(sk: &RuntimeKll, q: f64) -> f64 { + if sk.count() == 0 { + return 0.0; + } + sk.quantile(q) +} + +/// Raw msgpack bytes of the KLL backend (sans the `k`-envelope outer +/// wrapper). Used by JSON output (base64-encoded) and the wire codec. +/// +/// `serialize_to_bytes` failure is a bug, not bad input: a `KLL` +/// holding only finite samples has no unrepresentable state. Panic +/// loudly rather than emit empty bytes that surface downstream as a +/// misleading "buffer too short". +pub fn kll_sketch_bytes(sk: &RuntimeKll) -> Vec { + sk.serialize_to_bytes() + .expect("KLL serialize_to_bytes is infallible for finite samples") +} + +/// Serialize to the Go-compatible `KllSketchData { k, sketch_bytes }` +/// MessagePack envelope. +pub fn kll_to_msgpack(sk: &RuntimeKll) -> Vec { + let wire = KllSketchData { + k: sk.k() as u16, + sketch_bytes: kll_sketch_bytes(sk), + }; + // Same reasoning as `cms_to_msgpack`: the wire struct has no + // unrepresentable state, so failure is a bug. + rmp_serde::to_vec(&wire).expect("KllSketchData msgpack serialization is infallible") +} + +/// Deserialize from the Go-compatible `KllSketchData` envelope. +pub fn kll_from_msgpack(bytes: &[u8]) -> Result> { + let wire: KllSketchData = rmp_serde::from_slice(bytes)?; + Ok(KLL::deserialize_from_bytes(&wire.sketch_bytes)?) +} + +/// Merge a slice of KLL references into a single new sketch. All +/// inputs must share the same `k`. +pub fn kll_merge_refs( + sketches: &[&RuntimeKll], +) -> Result> { + let first = *sketches + .first() + .ok_or("kll_merge_refs called with empty input")?; + let k = first.k(); + for s in sketches { + if s.k() != k { + return Err(format!("KLL k mismatch in merge: expected {k}, got {}", s.k()).into()); + } + } + let mut merged = KLL::init_kll(k as i32); + for s in sketches { + merged.merge(s); + } + Ok(merged) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn cms_update_drops_non_positive_values() { + // The `value <= 0.0` guard in `cms_update` must be a no-op: a + // count-min sketch only ever accumulates non-negative frequencies, + // so zero and negative updates leave the estimate untouched. + let mut sk = CountMin::with_dimensions(4, 1000); + let estimate = |sk: &RuntimeCountMin| sk.estimate(&DataInput::String("k".to_owned())); + cms_update(&mut sk, "k", 0.0); + cms_update(&mut sk, "k", -5.0); + assert_eq!(estimate(&sk), 0.0); + + // A positive update is still recorded after the dropped ones. + cms_update(&mut sk, "k", 3.0); + assert_eq!(estimate(&sk), 3.0); + } + + #[test] + fn kll_merge_refs_rejects_k_mismatch() { + let a = KLL::init_kll(200); + let b = KLL::init_kll(100); + let err = kll_merge_refs(&[&a, &b]) + .expect_err("merging KLL sketches with different k must error"); + assert!( + err.to_string().contains("KLL k mismatch"), + "unexpected error: {err}" + ); + } + + #[test] + fn kll_merge_refs_rejects_empty_input() { + let err = kll_merge_refs(&[]).expect_err("empty merge input must error"); + assert!(err.to_string().contains("empty input"), "unexpected error: {err}"); + } +} diff --git a/asap-query-engine/tests/e2e_precompute_equivalence.rs b/asap-query-engine/tests/e2e_precompute_equivalence.rs index b8c6953a..6e26bbc1 100644 --- a/asap-query-engine/tests/e2e_precompute_equivalence.rs +++ b/asap-query-engine/tests/e2e_precompute_equivalence.rs @@ -7,7 +7,7 @@ //! 3. Advances the watermark past the window boundary to close it //! 4. Drains captured outputs and verifies equivalence with wire-format accumulators -use asap_sketchlib::sketches::kll::KllSketch; +use asap_sketchlib::KllSketch; use asap_types::aggregation_config::AggregationConfig; use asap_types::enums::{AggregationType, WindowType}; use flate2::{write::GzEncoder, Compression}; @@ -265,7 +265,8 @@ async fn e2e_kll_output_matches_arroyo() { // Sketch contents assert_eq!( - handcrafted_acc.inner.k, arroyo_acc.inner.k, + handcrafted_acc.inner.k(), + arroyo_acc.inner.k(), "KLL k mismatch" ); assert_eq!(