Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 5 additions & 121 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ asap_types = { path = "asap-common/dependencies/rs/asap_types" }
datafusion_summary_library = { path = "asap-common/dependencies/rs/datafusion_summary_library" }
elastic_dsl_utilities = { path = "asap-common/dependencies/rs/elastic_dsl_utilities" }
asap_planner = { path = "asap-planner-rs" }
asap_sketchlib = "0.2.2"
indexmap = { version = "2.0", features = ["serde"] }

[profile.release]
Expand Down
2 changes: 1 addition & 1 deletion asap-query-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ figment = { version = "0.10", features = ["yaml"] }
arc-swap = "1"
csv = "1"
elastic_dsl_utilities.workspace = true
asap_sketchlib = { git = "https://github.com/ProjectASAP/asap_sketchlib" }
asap_sketchlib.workspace = true

[[bin]]
name = "precompute_engine"
Expand Down
2 changes: 1 addition & 1 deletion asap-query-engine/src/precompute_engine/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ mod tests {
use crate::precompute_operators::datasketches_kll_accumulator::DatasketchesKLLAccumulator;
use crate::precompute_operators::multiple_sum_accumulator::MultipleSumAccumulator;
use crate::precompute_operators::sum_accumulator::SumAccumulator;
use asap_sketchlib::sketches::kll::KllSketch;
use asap_sketchlib::KllSketch;
use asap_types::enums::{AggregationType, WindowType};

fn make_agg_config(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::data_model::{
AggregateCore, AggregationType, KeyByLabelValues, MergeableAccumulator,
MultipleSubpopulationAggregate, SerializableToSink,
};
use asap_sketchlib::sketches::countminsketch::CountMinSketch;
use asap_sketchlib::{message_pack_format::MessagePackCodec, CountMinSketch};
use serde_json::Value;
use std::collections::HashMap;

Expand Down Expand Up @@ -64,7 +64,7 @@ impl CountMinSketchAccumulator {
buffer: &[u8],
) -> Result<Self, Box<dyn std::error::Error>> {
Ok(Self {
inner: CountMinSketch::deserialize_msgpack(buffer)
inner: CountMinSketch::from_msgpack(buffer)
.map_err(|e| -> Box<dyn std::error::Error> { e.to_string().into() })?,
})
}
Expand Down Expand Up @@ -166,7 +166,7 @@ impl SerializableToSink for CountMinSketchAccumulator {
}

fn serialize_to_bytes(&self) -> Vec<u8> {
self.inner.serialize_msgpack().unwrap_or_default()
self.inner.to_msgpack().unwrap_or_default()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::data_model::{
AggregateCore, AggregationType, KeyByLabelValues, MergeableAccumulator,
MultipleSubpopulationAggregate, SerializableToSink,
};
use asap_sketchlib::sketches::countminsketch_topk::{CmsHeapItem, CountMinSketchWithHeap};
use asap_sketchlib::{message_pack_format::MessagePackCodec, CmsHeapItem, CountMinSketchWithHeap};
use serde_json::Value;
use std::collections::HashMap;

Expand All @@ -17,7 +17,7 @@ pub struct CountMinSketchWithHeapAccumulator {
}

// Re-export HeapItem so existing code using CountMinSketchWithHeapAccumulator::HeapItem still works.
pub use asap_sketchlib::sketches::countminsketch_topk::CmsHeapItem as HeapItemReexport;
pub use asap_sketchlib::CmsHeapItem as HeapItemReexport;

impl CountMinSketchWithHeapAccumulator {
pub fn new(row_num: usize, col_num: usize, heap_size: usize) -> Self {
Expand Down Expand Up @@ -85,7 +85,7 @@ impl CountMinSketchWithHeapAccumulator {
buffer: &[u8],
) -> Result<Self, Box<dyn std::error::Error>> {
Ok(Self {
inner: CountMinSketchWithHeap::deserialize_msgpack(buffer)
inner: CountMinSketchWithHeap::from_msgpack(buffer)
.map_err(|e| -> Box<dyn std::error::Error> { e.to_string().into() })?,
})
}
Expand Down Expand Up @@ -131,7 +131,7 @@ impl SerializableToSink for CountMinSketchWithHeapAccumulator {
}

fn serialize_to_bytes(&self) -> Vec<u8> {
self.inner.serialize_msgpack().unwrap_or_default()
self.inner.to_msgpack().unwrap_or_default()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::data_model::{
AggregateCore, AggregationType, MergeableAccumulator, SerializableToSink,
SingleSubpopulationAggregate,
};
use asap_sketchlib::sketches::kll::KllSketch;
use asap_sketchlib::{message_pack_format::MessagePackCodec, KllSketch};
use base64::{engine::general_purpose, Engine as _};
use serde_json::Value;
use std::collections::HashMap;
Expand Down Expand Up @@ -42,7 +42,7 @@ impl DatasketchesKLLAccumulator {
buffer.len()
);
Ok(Self {
inner: KllSketch::deserialize_msgpack(buffer)
inner: KllSketch::from_msgpack(buffer)
.map_err(|e| -> Box<dyn std::error::Error> { e.to_string().into() })?,
})
}
Expand Down Expand Up @@ -113,7 +113,7 @@ impl SerializableToSink for DatasketchesKLLAccumulator {
}

fn serialize_to_bytes(&self) -> Vec<u8> {
self.inner.serialize_msgpack().unwrap_or_default()
self.inner.to_msgpack().unwrap_or_default()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::data_model::{
AggregateCore, AggregationType, KeyByLabelValues, MergeableAccumulator,
MultipleSubpopulationAggregate, SerializableToSink,
};
use asap_sketchlib::sketches::delta_set_aggregator::{deserialize_msgpack, serialize_msgpack};
use asap_sketchlib::{message_pack_format::MessagePackCodec, DeltaResult};
use serde_json::Value;
use std::collections::{HashMap, HashSet};
use tracing::warn;
Expand Down Expand Up @@ -154,7 +154,7 @@ impl DeltaSetAggregatorAccumulator {
buffer: &[u8],
) -> Result<Self, Box<dyn std::error::Error>> {
// Delegate to sketch-core canonical DeltaResult msgpack format
let delta = deserialize_msgpack(buffer)
let delta = DeltaResult::from_msgpack(buffer)
.map_err(|e| -> Box<dyn std::error::Error> { e.to_string().into() })?;

let mut added = HashSet::new();
Expand Down Expand Up @@ -204,7 +204,9 @@ impl SerializableToSink for DeltaSetAggregatorAccumulator {
.iter()
.map(|key| key.to_semicolon_str())
.collect();
serialize_msgpack(&added, &removed).unwrap_or_default()
DeltaResult { added, removed }
.to_msgpack()
.unwrap_or_default()
}
}

Expand Down
8 changes: 4 additions & 4 deletions asap-query-engine/src/precompute_operators/hll_accumulator.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashMap;

use asap_sketchlib::sketches::hll::{HllSketch, HllVariant};
use asap_sketchlib::{message_pack_format::MessagePackCodec, HllSketch, HllVariant};
use base64::{engine::general_purpose, Engine as _};
use serde_json::Value;

Expand Down Expand Up @@ -48,7 +48,7 @@ impl HllAccumulator {
pub fn deserialize_from_bytes_arroyo(
buffer: &[u8],
) -> Result<Self, Box<dyn std::error::Error>> {
let inner = HllSketch::deserialize_msgpack(buffer)
let inner = HllSketch::from_msgpack(buffer)
.map_err(|e| -> Box<dyn std::error::Error> { e.to_string().into() })?;
Ok(Self { inner })
}
Expand All @@ -62,7 +62,7 @@ impl Default for HllAccumulator {

impl SerializableToSink for HllAccumulator {
fn serialize_to_json(&self) -> Value {
let bytes = self.inner.serialize_msgpack().unwrap_or_default();
let bytes = self.inner.to_msgpack().unwrap_or_default();
let b64 = general_purpose::STANDARD.encode(&bytes);
serde_json::json!({
"sketch": b64,
Expand All @@ -72,7 +72,7 @@ impl SerializableToSink for HllAccumulator {
}

fn serialize_to_bytes(&self) -> Vec<u8> {
self.inner.serialize_msgpack().unwrap_or_default()
self.inner.to_msgpack().unwrap_or_default()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
},
KeyByLabelValues,
};
use asap_sketchlib::sketches::hydra_kll::HydraKllSketch;
use asap_sketchlib::{message_pack_format::MessagePackCodec, HydraKllSketch};
use base64::{engine::general_purpose, Engine as _};
use std::collections::HashMap;

Expand Down Expand Up @@ -38,7 +38,7 @@ impl HydraKllSketchAccumulator {
buffer: &[u8],
) -> Result<Self, Box<dyn std::error::Error>> {
Ok(Self {
inner: HydraKllSketch::deserialize_msgpack(buffer)
inner: HydraKllSketch::from_msgpack(buffer)
.map_err(|e| -> Box<dyn std::error::Error> { e.to_string().into() })?,
})
}
Expand All @@ -51,13 +51,13 @@ impl HydraKllSketchAccumulator {
impl SerializableToSink for HydraKllSketchAccumulator {
fn serialize_to_json(&self) -> serde_json::Value {
// Mirror Python implementation: {"sketch": base64_encoded_string}
let sketch_bytes = self.inner.serialize_msgpack().unwrap_or_default();
let sketch_bytes = self.inner.to_msgpack().unwrap_or_default();
let sketch_b64 = general_purpose::STANDARD.encode(&sketch_bytes);
serde_json::json!({ "sketch": sketch_b64 })
}

fn serialize_to_bytes(&self) -> Vec<u8> {
self.inner.serialize_msgpack().unwrap_or_default()
self.inner.to_msgpack().unwrap_or_default()
}
}

Expand Down
Loading
Loading