diff --git a/Cargo.lock b/Cargo.lock index 8e95d34..615266f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -412,13 +412,12 @@ dependencies = [ [[package]] name = "asap_sketchlib" -version = "0.1.0" -source = "git+https://github.com/ProjectASAP/asap_sketchlib#81c3436dde44cc587c098d42bf42db77acdb4fa5" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71e22a03cabb3fbbde6078a6ce493d4c21d007b895ad1d32ae986cfa7b1ac65" dependencies = [ "bytes", "prost", - "prost-build", - "protoc-bin-vendored", "rand 0.9.4", "rmp-serde", "serde", @@ -1437,7 +1436,7 @@ dependencies = [ "itertools 0.13.0", "log", "paste", - "petgraph 0.6.5", + "petgraph", ] [[package]] @@ -1677,12 +1676,6 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" -[[package]] -name = "fixedbitset" -version = "0.5.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" - [[package]] name = "flatbuffers" version = "24.12.23" @@ -2755,12 +2748,6 @@ dependencies = [ "windows-sys 0.61.2", ] -[[package]] -name = "multimap" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" - [[package]] name = "native-tls" version = "0.2.18" @@ -3154,17 +3141,7 @@ version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" dependencies = [ - "fixedbitset 0.4.2", - "indexmap 2.14.0", -] - -[[package]] -name = "petgraph" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" -dependencies = [ - "fixedbitset 0.5.7", + "fixedbitset", "indexmap 2.14.0", ] @@ -3419,26 +3396,6 @@ dependencies = [ "prost-derive", ] -[[package]] -name = "prost-build" -version = "0.13.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" -dependencies = [ - "heck 0.5.0", - "itertools 0.14.0", - "log", - "multimap", - "once_cell", - "petgraph 0.7.1", - "prettyplease", - "prost", - "prost-types", - "regex", - "syn 2.0.117", - "tempfile", -] - [[package]] name = "prost-derive" version = "0.13.5" @@ -3452,85 +3409,12 @@ dependencies = [ "syn 2.0.117", ] -[[package]] -name = "prost-types" -version = "0.13.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" -dependencies = [ - "prost", -] - [[package]] name = "protobuf" version = "2.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" -[[package]] -name = "protoc-bin-vendored" -version = "3.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1c381df33c98266b5f08186583660090a4ffa0889e76c7e9a5e175f645a67fa" -dependencies = [ - "protoc-bin-vendored-linux-aarch_64", - "protoc-bin-vendored-linux-ppcle_64", - "protoc-bin-vendored-linux-s390_64", - "protoc-bin-vendored-linux-x86_32", - "protoc-bin-vendored-linux-x86_64", - "protoc-bin-vendored-macos-aarch_64", - "protoc-bin-vendored-macos-x86_64", - "protoc-bin-vendored-win32", -] - -[[package]] -name = "protoc-bin-vendored-linux-aarch_64" -version = "3.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c350df4d49b5b9e3ca79f7e646fde2377b199e13cfa87320308397e1f37e1a4c" - -[[package]] -name = "protoc-bin-vendored-linux-ppcle_64" -version = "3.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a55a63e6c7244f19b5c6393f025017eb5d793fd5467823a099740a7a4222440c" - -[[package]] -name = "protoc-bin-vendored-linux-s390_64" -version = "3.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1dba5565db4288e935d5330a07c264a4ee8e4a5b4a4e6f4e83fad824cc32f3b0" - -[[package]] -name = "protoc-bin-vendored-linux-x86_32" -version = "3.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8854774b24ee28b7868cd71dccaae8e02a2365e67a4a87a6cd11ee6cdbdf9cf5" - -[[package]] -name = "protoc-bin-vendored-linux-x86_64" -version = "3.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b38b07546580df720fa464ce124c4b03630a6fb83e05c336fea2a241df7e5d78" - -[[package]] -name = "protoc-bin-vendored-macos-aarch_64" -version = "3.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89278a9926ce312e51f1d999fee8825d324d603213344a9a706daa009f1d8092" - -[[package]] -name = "protoc-bin-vendored-macos-x86_64" -version = "3.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81745feda7ccfb9471d7a4de888f0652e806d5795b61480605d4943176299756" - -[[package]] -name = "protoc-bin-vendored-win32" -version = "3.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95067976aca6421a523e491fce939a3e65249bac4b977adee0ee9771568e8aa3" - [[package]] name = "psm" version = "0.1.31" diff --git a/Cargo.toml b/Cargo.toml index b4059ad..46d36bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ asap_types = { path = "asap-common/dependencies/rs/asap_types" } datafusion_summary_library = { path = "asap-common/dependencies/rs/datafusion_summary_library" } elastic_dsl_utilities = { path = "asap-common/dependencies/rs/elastic_dsl_utilities" } asap_planner = { path = "asap-planner-rs" } +asap_sketchlib = "0.2.2" indexmap = { version = "2.0", features = ["serde"] } [profile.release] diff --git a/asap-query-engine/Cargo.toml b/asap-query-engine/Cargo.toml index d2886c3..f14eb40 100644 --- a/asap-query-engine/Cargo.toml +++ b/asap-query-engine/Cargo.toml @@ -59,7 +59,7 @@ figment = { version = "0.10", features = ["yaml"] } arc-swap = "1" csv = "1" elastic_dsl_utilities.workspace = true -asap_sketchlib = { git = "https://github.com/ProjectASAP/asap_sketchlib" } +asap_sketchlib.workspace = true [[bin]] name = "precompute_engine" diff --git a/asap-query-engine/src/precompute_engine/worker.rs b/asap-query-engine/src/precompute_engine/worker.rs index 02c330b..37fc8f3 100644 --- a/asap-query-engine/src/precompute_engine/worker.rs +++ b/asap-query-engine/src/precompute_engine/worker.rs @@ -776,7 +776,7 @@ mod tests { use crate::precompute_operators::datasketches_kll_accumulator::DatasketchesKLLAccumulator; use crate::precompute_operators::multiple_sum_accumulator::MultipleSumAccumulator; use crate::precompute_operators::sum_accumulator::SumAccumulator; - use asap_sketchlib::sketches::kll::KllSketch; + use asap_sketchlib::KllSketch; use asap_types::enums::{AggregationType, WindowType}; fn make_agg_config( diff --git a/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs b/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs index 6840cb5..fbab5ed 100644 --- a/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs @@ -2,7 +2,7 @@ use crate::data_model::{ AggregateCore, AggregationType, KeyByLabelValues, MergeableAccumulator, MultipleSubpopulationAggregate, SerializableToSink, }; -use asap_sketchlib::sketches::countminsketch::CountMinSketch; +use asap_sketchlib::{message_pack_format::MessagePackCodec, CountMinSketch}; use serde_json::Value; use std::collections::HashMap; @@ -64,7 +64,7 @@ impl CountMinSketchAccumulator { buffer: &[u8], ) -> Result> { Ok(Self { - inner: CountMinSketch::deserialize_msgpack(buffer) + inner: CountMinSketch::from_msgpack(buffer) .map_err(|e| -> Box { e.to_string().into() })?, }) } @@ -166,7 +166,7 @@ impl SerializableToSink for CountMinSketchAccumulator { } fn serialize_to_bytes(&self) -> Vec { - self.inner.serialize_msgpack().unwrap_or_default() + self.inner.to_msgpack().unwrap_or_default() } } diff --git a/asap-query-engine/src/precompute_operators/count_min_sketch_with_heap_accumulator.rs b/asap-query-engine/src/precompute_operators/count_min_sketch_with_heap_accumulator.rs index c96cde9..e5cd65f 100644 --- a/asap-query-engine/src/precompute_operators/count_min_sketch_with_heap_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/count_min_sketch_with_heap_accumulator.rs @@ -2,7 +2,7 @@ use crate::data_model::{ AggregateCore, AggregationType, KeyByLabelValues, MergeableAccumulator, MultipleSubpopulationAggregate, SerializableToSink, }; -use asap_sketchlib::sketches::countminsketch_topk::{CmsHeapItem, CountMinSketchWithHeap}; +use asap_sketchlib::{message_pack_format::MessagePackCodec, CmsHeapItem, CountMinSketchWithHeap}; use serde_json::Value; use std::collections::HashMap; @@ -17,7 +17,7 @@ pub struct CountMinSketchWithHeapAccumulator { } // Re-export HeapItem so existing code using CountMinSketchWithHeapAccumulator::HeapItem still works. -pub use asap_sketchlib::sketches::countminsketch_topk::CmsHeapItem as HeapItemReexport; +pub use asap_sketchlib::CmsHeapItem as HeapItemReexport; impl CountMinSketchWithHeapAccumulator { pub fn new(row_num: usize, col_num: usize, heap_size: usize) -> Self { @@ -85,7 +85,7 @@ impl CountMinSketchWithHeapAccumulator { buffer: &[u8], ) -> Result> { Ok(Self { - inner: CountMinSketchWithHeap::deserialize_msgpack(buffer) + inner: CountMinSketchWithHeap::from_msgpack(buffer) .map_err(|e| -> Box { e.to_string().into() })?, }) } @@ -131,7 +131,7 @@ impl SerializableToSink for CountMinSketchWithHeapAccumulator { } fn serialize_to_bytes(&self) -> Vec { - self.inner.serialize_msgpack().unwrap_or_default() + self.inner.to_msgpack().unwrap_or_default() } } diff --git a/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs b/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs index 33e085d..ea98d28 100644 --- a/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs @@ -2,7 +2,7 @@ use crate::data_model::{ AggregateCore, AggregationType, MergeableAccumulator, SerializableToSink, SingleSubpopulationAggregate, }; -use asap_sketchlib::sketches::kll::KllSketch; +use asap_sketchlib::{message_pack_format::MessagePackCodec, KllSketch}; use base64::{engine::general_purpose, Engine as _}; use serde_json::Value; use std::collections::HashMap; @@ -42,7 +42,7 @@ impl DatasketchesKLLAccumulator { buffer.len() ); Ok(Self { - inner: KllSketch::deserialize_msgpack(buffer) + inner: KllSketch::from_msgpack(buffer) .map_err(|e| -> Box { e.to_string().into() })?, }) } @@ -113,7 +113,7 @@ impl SerializableToSink for DatasketchesKLLAccumulator { } fn serialize_to_bytes(&self) -> Vec { - self.inner.serialize_msgpack().unwrap_or_default() + self.inner.to_msgpack().unwrap_or_default() } } diff --git a/asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs b/asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs index cc9a858..7cde7ca 100644 --- a/asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs @@ -2,7 +2,7 @@ use crate::data_model::{ AggregateCore, AggregationType, KeyByLabelValues, MergeableAccumulator, MultipleSubpopulationAggregate, SerializableToSink, }; -use asap_sketchlib::sketches::delta_set_aggregator::{deserialize_msgpack, serialize_msgpack}; +use asap_sketchlib::{message_pack_format::MessagePackCodec, DeltaResult}; use serde_json::Value; use std::collections::{HashMap, HashSet}; use tracing::warn; @@ -154,7 +154,7 @@ impl DeltaSetAggregatorAccumulator { buffer: &[u8], ) -> Result> { // Delegate to sketch-core canonical DeltaResult msgpack format - let delta = deserialize_msgpack(buffer) + let delta = DeltaResult::from_msgpack(buffer) .map_err(|e| -> Box { e.to_string().into() })?; let mut added = HashSet::new(); @@ -204,7 +204,9 @@ impl SerializableToSink for DeltaSetAggregatorAccumulator { .iter() .map(|key| key.to_semicolon_str()) .collect(); - serialize_msgpack(&added, &removed).unwrap_or_default() + DeltaResult { added, removed } + .to_msgpack() + .unwrap_or_default() } } diff --git a/asap-query-engine/src/precompute_operators/hll_accumulator.rs b/asap-query-engine/src/precompute_operators/hll_accumulator.rs index b55c07d..393cf58 100644 --- a/asap-query-engine/src/precompute_operators/hll_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/hll_accumulator.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use asap_sketchlib::sketches::hll::{HllSketch, HllVariant}; +use asap_sketchlib::{message_pack_format::MessagePackCodec, HllSketch, HllVariant}; use base64::{engine::general_purpose, Engine as _}; use serde_json::Value; @@ -48,7 +48,7 @@ impl HllAccumulator { pub fn deserialize_from_bytes_arroyo( buffer: &[u8], ) -> Result> { - let inner = HllSketch::deserialize_msgpack(buffer) + let inner = HllSketch::from_msgpack(buffer) .map_err(|e| -> Box { e.to_string().into() })?; Ok(Self { inner }) } @@ -62,7 +62,7 @@ impl Default for HllAccumulator { impl SerializableToSink for HllAccumulator { fn serialize_to_json(&self) -> Value { - let bytes = self.inner.serialize_msgpack().unwrap_or_default(); + let bytes = self.inner.to_msgpack().unwrap_or_default(); let b64 = general_purpose::STANDARD.encode(&bytes); serde_json::json!({ "sketch": b64, @@ -72,7 +72,7 @@ impl SerializableToSink for HllAccumulator { } fn serialize_to_bytes(&self) -> Vec { - self.inner.serialize_msgpack().unwrap_or_default() + self.inner.to_msgpack().unwrap_or_default() } } diff --git a/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs b/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs index f33012d..6ce69fa 100644 --- a/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs @@ -5,7 +5,7 @@ use crate::{ }, KeyByLabelValues, }; -use asap_sketchlib::sketches::hydra_kll::HydraKllSketch; +use asap_sketchlib::{message_pack_format::MessagePackCodec, HydraKllSketch}; use base64::{engine::general_purpose, Engine as _}; use std::collections::HashMap; @@ -38,7 +38,7 @@ impl HydraKllSketchAccumulator { buffer: &[u8], ) -> Result> { Ok(Self { - inner: HydraKllSketch::deserialize_msgpack(buffer) + inner: HydraKllSketch::from_msgpack(buffer) .map_err(|e| -> Box { e.to_string().into() })?, }) } @@ -51,13 +51,13 @@ impl HydraKllSketchAccumulator { impl SerializableToSink for HydraKllSketchAccumulator { fn serialize_to_json(&self) -> serde_json::Value { // Mirror Python implementation: {"sketch": base64_encoded_string} - let sketch_bytes = self.inner.serialize_msgpack().unwrap_or_default(); + let sketch_bytes = self.inner.to_msgpack().unwrap_or_default(); let sketch_b64 = general_purpose::STANDARD.encode(&sketch_bytes); serde_json::json!({ "sketch": sketch_b64 }) } fn serialize_to_bytes(&self) -> Vec { - self.inner.serialize_msgpack().unwrap_or_default() + self.inner.to_msgpack().unwrap_or_default() } } diff --git a/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs b/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs index 45b74d5..68c2327 100644 --- a/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs @@ -2,7 +2,7 @@ use crate::data_model::{ AggregateCore, AggregationType, KeyByLabelValues, MergeableAccumulator, MultipleSubpopulationAggregate, SerializableToSink, }; -use asap_sketchlib::sketches::set_aggregator::SetAggregator; +use asap_sketchlib::{message_pack_format::MessagePackCodec, SetAggregator}; use serde_json::Value; use std::collections::{HashMap, HashSet}; @@ -92,7 +92,7 @@ impl SetAggregatorAccumulator { pub fn deserialize_from_bytes_arroyo( buffer: &[u8], ) -> Result> { - let sa = SetAggregator::deserialize_msgpack(buffer) + let sa = SetAggregator::from_msgpack(buffer) .map_err(|e| -> Box { e.to_string().into() })?; let added = sa .values @@ -109,7 +109,7 @@ impl SetAggregatorAccumulator { for key in &self.added { sa.update(&key.to_semicolon_str()); } - sa.serialize_msgpack().unwrap_or_default() + sa.to_msgpack().unwrap_or_default() } } diff --git a/asap-query-engine/tests/e2e_precompute_equivalence.rs b/asap-query-engine/tests/e2e_precompute_equivalence.rs index b8c6953..888ef5b 100644 --- a/asap-query-engine/tests/e2e_precompute_equivalence.rs +++ b/asap-query-engine/tests/e2e_precompute_equivalence.rs @@ -7,7 +7,7 @@ //! 3. Advances the watermark past the window boundary to close it //! 4. Drains captured outputs and verifies equivalence with wire-format accumulators -use asap_sketchlib::sketches::kll::KllSketch; +use asap_sketchlib::KllSketch; use asap_types::aggregation_config::AggregationConfig; use asap_types::enums::{AggregationType, WindowType}; use flate2::{write::GzEncoder, Compression};