Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 7 additions & 32 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,11 @@ impl AccumulatorUpdater for CmsAccumulatorUpdater {
}

fn update_keyed(&mut self, key: &KeyByLabelValues, value: f64, _timestamp_ms: i64) {
self.acc.inner.update(&key.to_semicolon_str(), value);
crate::precompute_operators::sketchlib_runtime::cms_update(
&mut self.acc.inner,
&key.to_semicolon_str(),
value,
);
}

impl_clone_accumulator_methods!(acc);
Expand Down Expand Up @@ -855,6 +859,6 @@ mod tests {
.as_any()
.downcast_ref::<crate::precompute_operators::datasketches_kll_accumulator::DatasketchesKLLAccumulator>()
.expect("should be KLL");
assert_eq!(kll.inner.k, 50, "k should be 50 from capital-K param");
assert_eq!(kll.inner.k(), 50, "k should be 50 from capital-K param");
}
}
4 changes: 2 additions & 2 deletions asap-query-engine/src/precompute_engine/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ mod tests {
use crate::precompute_operators::datasketches_kll_accumulator::DatasketchesKLLAccumulator;
use crate::precompute_operators::multiple_sum_accumulator::MultipleSumAccumulator;
use crate::precompute_operators::sum_accumulator::SumAccumulator;
use asap_sketchlib::sketches::kll::KllSketch;
use asap_sketchlib::KllSketch;
use asap_types::enums::{AggregationType, WindowType};

fn make_agg_config(
Expand Down Expand Up @@ -1511,7 +1511,7 @@ mod tests {
handcrafted_output.end_timestamp,
arroyo_output.end_timestamp
);
assert_eq!(handcrafted_acc.inner.k, arroyo_acc.inner.k);
assert_eq!(handcrafted_acc.inner.k(), arroyo_acc.inner.k());
assert_eq!(handcrafted_acc.inner.count(), arroyo_acc.inner.count());

for quantile in [0.0, 0.5, 1.0] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,45 @@ use crate::data_model::{
AggregateCore, AggregationType, KeyByLabelValues, MergeableAccumulator,
MultipleSubpopulationAggregate, SerializableToSink,
};
use asap_sketchlib::sketches::countminsketch::CountMinSketch;
use crate::precompute_operators::sketchlib_runtime::{
cms_from_matrix, cms_from_msgpack, cms_matrix, cms_merge_refs, cms_to_msgpack, RuntimeCountMin,
};
#[cfg(test)]
use crate::precompute_operators::sketchlib_runtime::cms_update;
use asap_sketchlib::DataInput;
use serde_json::Value;
use std::collections::HashMap;

use promql_utilities::query_logics::enums::Statistic;

/// Count-Min Sketch accumulator — wraps asap_sketchlib::sketches::CountMinSketch.
/// Core struct, update/merge/serde logic live in `asap_sketchlib::sketches`.
/// This file retains QE-specific trait impls, legacy deserializers, and JSON output.
/// Count-Min Sketch accumulator — holds `sketches::CountMin<.., FastPath>`
/// directly. Wire format (Go-compatible MessagePack envelope) and
/// matrix-shape conversions live in `sketchlib_runtime`.
#[derive(Debug, Clone)]
pub struct CountMinSketchAccumulator {
pub inner: CountMinSketch,
pub inner: RuntimeCountMin,
}

impl CountMinSketchAccumulator {
pub fn new(row_num: usize, col_num: usize) -> Self {
Self {
inner: CountMinSketch::new(row_num, col_num),
inner: RuntimeCountMin::with_dimensions(row_num, col_num),
}
}

// Marked as _update and kept private; only called internally.
// Test-only helper. Production goes straight through
// `sketchlib_runtime::cms_update` in `CmsAccumulatorUpdater::update_keyed`;
// this wrapper exists so the regression test
// `test_update_and_query_use_same_key_encoding` exercises the same
// `key.to_semicolon_str()` encoding path that production uses.
#[cfg(test)]
fn _update(&mut self, key: &KeyByLabelValues, value: f64) {
self.inner.update(&key.to_semicolon_str(), value);
cms_update(&mut self.inner, &key.to_semicolon_str(), value);
}

pub fn query_key(&self, key: &KeyByLabelValues) -> f64 {
self.inner.estimate(&key.to_semicolon_str())
self.inner
.estimate(&DataInput::String(key.to_semicolon_str()))
}

pub fn deserialize_from_json(data: &Value) -> Result<Self, Box<dyn std::error::Error>> {
Expand All @@ -56,16 +67,15 @@ impl CountMinSketchAccumulator {
}

Ok(Self {
inner: CountMinSketch::from_legacy_matrix(sketch, row_num, col_num),
inner: cms_from_matrix(sketch, row_num, col_num)?,
})
}

pub fn deserialize_from_bytes_arroyo(
buffer: &[u8],
) -> Result<Self, Box<dyn std::error::Error>> {
Ok(Self {
inner: CountMinSketch::deserialize_msgpack(buffer)
.map_err(|e| -> Box<dyn std::error::Error> { e.to_string().into() })?,
inner: cms_from_msgpack(buffer)?,
})
}

Expand Down Expand Up @@ -108,7 +118,7 @@ impl CountMinSketchAccumulator {
}

Ok(Self {
inner: CountMinSketch::from_legacy_matrix(sketch, row_num, col_num),
inner: cms_from_matrix(sketch, row_num, col_num)?,
})
}

Expand Down Expand Up @@ -136,20 +146,9 @@ impl CountMinSketchAccumulator {
cms_accumulators.push(cms_acc);
}

// Check dimensions are consistent
let rows = cms_accumulators[0].inner.rows();
let cols = cms_accumulators[0].inner.cols();
for acc in &cms_accumulators {
if acc.inner.rows() != rows || acc.inner.cols() != cols {
return Err(
"Cannot merge CountMinSketch accumulators with different dimensions".into(),
);
}
}

let inner_refs: Vec<&CountMinSketch> =
let inner_refs: Vec<&RuntimeCountMin> =
cms_accumulators.iter().map(|acc| &acc.inner).collect();
let merged_inner = CountMinSketch::merge_refs(&inner_refs)?;
let merged_inner = cms_merge_refs(&inner_refs)?;
Ok(Self {
inner: merged_inner,
})
Expand All @@ -161,12 +160,12 @@ impl SerializableToSink for CountMinSketchAccumulator {
serde_json::json!({
"row_num": self.inner.rows(),
"col_num": self.inner.cols(),
"sketch": self.inner.sketch()
"sketch": cms_matrix(&self.inner)
})
}

fn serialize_to_bytes(&self) -> Vec<u8> {
self.inner.serialize_msgpack().unwrap_or_default()
cms_to_msgpack(&self.inner)
}
}

Expand Down Expand Up @@ -200,7 +199,7 @@ impl AggregateCore for CountMinSketchAccumulator {
.downcast_ref::<CountMinSketchAccumulator>()
.ok_or("Failed to downcast to CountMinSketchAccumulator")?;

let merged_inner = CountMinSketch::merge_refs(&[&self.inner, &other_cms.inner])?;
let merged_inner = cms_merge_refs(&[&self.inner, &other_cms.inner])?;
Ok(Box::new(Self {
inner: merged_inner,
}))
Expand Down Expand Up @@ -250,12 +249,11 @@ impl MergeableAccumulator<CountMinSketchAccumulator> for CountMinSketchAccumulat
if accumulators.is_empty() {
return Err("No accumulators to merge".into());
}
let mut iter = accumulators.into_iter();
let mut merged = iter.next().unwrap();
for acc in iter {
merged.inner.merge(&acc.inner)?;
}
Ok(merged)
let inner_refs: Vec<&RuntimeCountMin> = accumulators.iter().map(|acc| &acc.inner).collect();
let merged_inner = cms_merge_refs(&inner_refs)?;
Ok(Self {
inner: merged_inner,
})
}
}

Expand All @@ -268,7 +266,7 @@ mod tests {
let cms = CountMinSketchAccumulator::new(4, 1000);
assert_eq!(cms.inner.rows(), 4);
assert_eq!(cms.inner.cols(), 1000);
let sketch = cms.inner.sketch();
let sketch = cms_matrix(&cms.inner);
assert_eq!(sketch.len(), 4);
assert_eq!(sketch[0].len(), 1000);

Expand Down Expand Up @@ -300,25 +298,16 @@ mod tests {

#[test]
fn test_count_min_sketch_merge() {
// Build controlled state via from_legacy_matrix (works for both Legacy and Sketchlib backends).
let cms1 = CountMinSketchAccumulator {
inner: CountMinSketch::from_legacy_matrix(
vec![vec![5.0, 0.0, 0.0], vec![0.0, 0.0, 10.0]],
2,
3,
),
inner: cms_from_matrix(vec![vec![5.0, 0.0, 0.0], vec![0.0, 0.0, 10.0]], 2, 3).unwrap(),
};
let cms2 = CountMinSketchAccumulator {
inner: CountMinSketch::from_legacy_matrix(
vec![vec![3.0, 7.0, 0.0], vec![0.0, 0.0, 0.0]],
2,
3,
),
inner: cms_from_matrix(vec![vec![3.0, 7.0, 0.0], vec![0.0, 0.0, 0.0]], 2, 3).unwrap(),
};

let merged = CountMinSketchAccumulator::merge_accumulators(vec![cms1, cms2]).unwrap();

let merged_sketch = merged.inner.sketch();
let merged_sketch = cms_matrix(&merged.inner);
assert_eq!(merged_sketch[0][0], 8.0);
assert_eq!(merged_sketch[0][1], 7.0);
assert_eq!(merged_sketch[1][2], 10.0);
Expand All @@ -335,11 +324,8 @@ mod tests {
#[test]
fn test_count_min_sketch_serialization() {
let cms = CountMinSketchAccumulator {
inner: CountMinSketch::from_legacy_matrix(
vec![vec![0.0, 42.0, 0.0], vec![0.0, 0.0, 100.0]],
2,
3,
),
inner: cms_from_matrix(vec![vec![0.0, 42.0, 0.0], vec![0.0, 0.0, 100.0]], 2, 3)
.unwrap(),
};

let bytes = cms.serialize_to_bytes();
Expand All @@ -348,7 +334,7 @@ mod tests {

assert_eq!(deserialized.inner.rows(), 2);
assert_eq!(deserialized.inner.cols(), 3);
let deser_sketch = deserialized.inner.sketch();
let deser_sketch = cms_matrix(&deserialized.inner);
assert_eq!(deser_sketch[0][1], 42.0);
assert_eq!(deser_sketch[1][2], 100.0);
}
Expand Down Expand Up @@ -393,9 +379,9 @@ mod tests {

// Also verify a different key does not interfere.
let other_key = KeyByLabelValues::new_with_labels(vec!["api".to_string()]);
// other_key was never updated; its estimate should be lower than key's.
// other_key was never updated; in a sketch this large there should be no
// collision, so its estimate should be exactly 0.
let other_result = cms.query_key(&other_key);
// In a sketch this large there should be no collision, so other_result == 0.
assert_eq!(
other_result, 0.0,
"unrelated key returned non-zero: {other_result}"
Expand All @@ -418,35 +404,22 @@ mod tests {

#[test]
fn test_count_min_sketch_merge_multiple() {
// Build controlled state via from_legacy_matrix (works for both Legacy and Sketchlib backends).
let cms1 = CountMinSketchAccumulator {
inner: CountMinSketch::from_legacy_matrix(
vec![vec![5.0, 0.0, 0.0], vec![0.0, 0.0, 10.0]],
2,
3,
),
inner: cms_from_matrix(vec![vec![5.0, 0.0, 0.0], vec![0.0, 0.0, 10.0]], 2, 3).unwrap(),
};
let cms2 = CountMinSketchAccumulator {
inner: CountMinSketch::from_legacy_matrix(
vec![vec![3.0, 7.0, 0.0], vec![0.0, 0.0, 0.0]],
2,
3,
),
inner: cms_from_matrix(vec![vec![3.0, 7.0, 0.0], vec![0.0, 0.0, 0.0]], 2, 3).unwrap(),
};
let cms3 = CountMinSketchAccumulator {
inner: CountMinSketch::from_legacy_matrix(
vec![vec![2.0, 0.0, 0.0], vec![0.0, 0.0, 5.0]],
2,
3,
),
inner: cms_from_matrix(vec![vec![2.0, 0.0, 0.0], vec![0.0, 0.0, 5.0]], 2, 3).unwrap(),
};

let boxed_accs: Vec<Box<dyn AggregateCore>> =
vec![Box::new(cms1), Box::new(cms2), Box::new(cms3)];

let merged = CountMinSketchAccumulator::merge_multiple(&boxed_accs).unwrap();

let merged_sketch = merged.inner.sketch();
let merged_sketch = cms_matrix(&merged.inner);
assert_eq!(merged_sketch[0][0], 10.0);
assert_eq!(merged_sketch[0][1], 7.0);
assert_eq!(merged_sketch[1][2], 15.0);
Expand Down
Loading