From da67cb2732c3c2817221fa2a4624cbc6faca5ce0 Mon Sep 17 00:00:00 2001 From: Akanksha Akkihal Date: Mon, 25 May 2026 20:24:37 +0000 Subject: [PATCH 1/3] Adding COUNT(DISTINCT) SQL support with HLL precompute (asap_sketchlib) and CARDINALITY routing --- .../rs/asap_types/src/capability_matching.rs | 50 +++ .../src/query_logics/enums.rs | 34 +- .../src/ast_matching/sqlparser_test.rs | 206 +++++++++ .../src/ast_matching/sqlpattern_matcher.rs | 26 +- .../src/ast_matching/sqlpattern_parser.rs | 29 +- .../src/engines/physical/accumulator_serde.rs | 50 ++- .../precompute_engine/accumulator_factory.rs | 244 +++++++++- .../precompute_operators/hll_accumulator.rs | 424 ++++++++++++++++++ .../src/precompute_operators/mod.rs | 2 + 9 files changed, 1055 insertions(+), 10 deletions(-) create mode 100644 asap-query-engine/src/precompute_operators/hll_accumulator.rs diff --git a/asap-common/dependencies/rs/asap_types/src/capability_matching.rs b/asap-common/dependencies/rs/asap_types/src/capability_matching.rs index 570c0854..fb2b797d 100644 --- a/asap-common/dependencies/rs/asap_types/src/capability_matching.rs +++ b/asap-common/dependencies/rs/asap_types/src/capability_matching.rs @@ -33,6 +33,11 @@ pub fn compatible_agg_types(stat: Statistic) -> &'static [AggregationType] { Statistic::Cardinality => &[ AggregationType::SetAggregator, AggregationType::DeltaSetAggregator, + // HLL is the single-population probabilistic alternative used by + // `COUNT(DISTINCT col) GROUP BY …` queries. It backs the per-bucket + // sketch directly (no paired key aggregation required) — see + // `is_multi_population_value_type`, which excludes HLL. + AggregationType::HLL, ], Statistic::Topk => &[AggregationType::CountMinSketchWithHeap], } @@ -753,6 +758,51 @@ mod tests { assert!(result.is_some()); } + // --- cardinality / HLL --- + + #[test] + fn cardinality_matches_hll_single_population() { + // `COUNT(DISTINCT col)` flows in as `Statistic::Cardinality`. An HLL config + // alone must satisfy it without requiring any paired key aggregation — + // HLL is a single-population value type (per grouping key bucket), unlike + // SetAggregator which is a multi-population key tracker. + let configs = single_config(make_config( + 42, "peers", "HLL", "", 1, "tumbling", &["srcip"], "", + )); + let result = find_compatible_aggregation( + &configs, + &req( + "peers", + &[Statistic::Cardinality], + Some(1_000), + &["srcip"], + "", + ), + ); + let info = result.expect("HLL should serve Cardinality"); + assert_eq!(info.aggregation_id_for_value, 42); + assert_eq!(info.aggregation_type_for_value, AggregationType::HLL); + // Single-population: key agg falls through to the value config itself, + // matching the KLL / Sum / MinMax pattern (no separate SetAggregator needed). + assert_eq!(info.aggregation_id_for_key, 42); + assert_eq!(info.aggregation_type_for_key, AggregationType::HLL); + } + + #[test] + fn compatible_agg_types_cardinality_includes_hll() { + // Direct unit test on the capability table: HLL must appear alongside the + // existing exact-cardinality types so the SQL→engine path picks it up + // without any further plumbing changes. + let types = compatible_agg_types(Statistic::Cardinality); + assert!( + types.contains(&AggregationType::HLL), + "compatible_agg_types(Cardinality) must include HLL; got {types:?}", + ); + // Backwards compat: existing exact types stay supported. + assert!(types.contains(&AggregationType::SetAggregator)); + assert!(types.contains(&AggregationType::DeltaSetAggregator)); + } + #[test] fn avg_different_windows_rejected() { let mut configs = HashMap::new(); diff --git a/asap-common/dependencies/rs/promql_utilities/src/query_logics/enums.rs b/asap-common/dependencies/rs/promql_utilities/src/query_logics/enums.rs index 05b34353..d2a2423a 100644 --- a/asap-common/dependencies/rs/promql_utilities/src/query_logics/enums.rs +++ b/asap-common/dependencies/rs/promql_utilities/src/query_logics/enums.rs @@ -185,6 +185,9 @@ pub enum AggregationOperator { Min, Max, Topk, + /// Distinct-value count. SQL-side normalisation maps `COUNT(DISTINCT col)` to + /// the aggregation name "CARDINALITY", which lands here. + Cardinality, } impl AggregationOperator { @@ -197,6 +200,7 @@ impl AggregationOperator { AggregationOperator::Min => "min", AggregationOperator::Max => "max", AggregationOperator::Topk => "topk", + AggregationOperator::Cardinality => "cardinality", } } @@ -211,11 +215,21 @@ impl AggregationOperator { AggregationOperator::Min => vec![Statistic::Min], AggregationOperator::Max => vec![Statistic::Max], AggregationOperator::Topk => vec![Statistic::Topk], + AggregationOperator::Cardinality => vec![Statistic::Cardinality], } } pub fn as_str_slice() -> &'static [&'static str] { - &["sum", "count", "avg", "quantile", "min", "max", "topk"] + &[ + "sum", + "count", + "avg", + "quantile", + "min", + "max", + "topk", + "cardinality", + ] } } @@ -236,6 +250,7 @@ impl FromStr for AggregationOperator { "min" => Ok(AggregationOperator::Min), "max" => Ok(AggregationOperator::Max), "topk" => Ok(AggregationOperator::Topk), + "cardinality" => Ok(AggregationOperator::Cardinality), other => Err(format!("Unknown aggregation operator: '{other}'")), } } @@ -251,6 +266,7 @@ impl AggregationOperator { | AggregationOperator::Count | AggregationOperator::Avg | AggregationOperator::Topk + | AggregationOperator::Cardinality ) } } @@ -432,4 +448,20 @@ mod tests { assert_eq!(exact_back, QueryTreatmentType::Exact); assert_eq!(approximate_back, QueryTreatmentType::Approximate); } + + #[test] + fn test_aggregation_operator_cardinality_round_trip() { + // The SQL parser normalises `COUNT(DISTINCT col)` to the aggregation name + // "CARDINALITY"; `parse_single_statistic` then routes it through + // `AggregationOperator::FromStr`. Without a Cardinality variant the lookup + // returns Err and the query is rejected as "Unsupported statistic name". + let op: AggregationOperator = "cardinality".parse().expect("cardinality should parse"); + assert_eq!(op, AggregationOperator::Cardinality); + assert_eq!(op.to_statistics(), vec![Statistic::Cardinality]); + assert_eq!(op.as_str(), "cardinality"); + // Case-insensitive (matches the existing pattern for all other operators). + let op_upper: AggregationOperator = + "CARDINALITY".parse().expect("CARDINALITY should parse"); + assert_eq!(op_upper, AggregationOperator::Cardinality); + } } diff --git a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs index eb76dd5e..30efb0eb 100644 --- a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs +++ b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs @@ -595,6 +595,73 @@ mod tests { ); } + /// Regression for three matcher-side gaps that surface together when the + /// simple_engine SQL path runs an HLL `COUNT(DISTINCT)` query end-to-end: + /// + /// 1. The parser correctly normalises `COUNT(DISTINCT col)` to the + /// aggregation name `"CARDINALITY"`, but + /// `SQLPatternMatcher::is_valid_aggregation` never gained `CARDINALITY` + /// in its `legal_aggregations` set, so the validator rejected the query + /// with `IllegalAggregationFn` before pattern matching ran. + /// + /// 2. After fixing (1), `flatten_query_info` validates the aggregation's + /// "value column" against `schema.is_valid_value_column`, which only + /// knows table value columns. `COUNT(DISTINCT col)` legitimately targets + /// metadata/label columns (e.g. `COUNT(DISTINCT dstip)`), so the + /// validator rejected it with `InvalidValueCol`. The fix accepts + /// metadata columns *only* for CARDINALITY. + /// + /// 3. With both fixed, the query classifies as `SpatioTemporal` because + /// `GROUP BY` only covers a subset of metadata columns — exactly the + /// shape of the user's real `COUNT(DISTINCT dstip) GROUP BY srcip` + /// query, which selects on `srcip` and aggregates over `dstip` (so + /// labels ⊊ metadata_columns). + /// + /// Observed log line that motivated this test: + /// error: Some(IllegalAggregationFn), + /// msg: Some("attempt to use illegal aggregation function CARDINALITY") + #[test] + fn test_count_distinct_passes_aggregation_allowlist() { + check_query( + "SELECT COUNT(DISTINCT L4) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1, L2, L3", + vec![QueryType::SpatioTemporal], + None, + ); + } + + /// Companion: when `GROUP BY` covers all metadata columns *except* the + /// distinct-target itself, the query is still SpatioTemporal — the + /// distinct-target is the value column, not a grouping label, so labels + /// always form a strict subset of metadata_columns. Guards against future + /// "treat L4 as both label and value" regressions in the classifier. + #[test] + fn test_count_distinct_with_full_remaining_labels_is_spatiotemporal() { + check_query( + "SELECT COUNT(DISTINCT L4) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1, L2, L3", + vec![QueryType::SpatioTemporal], + None, + ); + } + + /// Negative case: `COUNT(DISTINCT not_in_schema)` against a column that's + /// neither a value_column nor a metadata_column must still be rejected as + /// `InvalidValueCol`. The CARDINALITY relaxation widens what's *allowed* + /// (metadata columns) but doesn't disable the schema check entirely. + #[test] + fn test_count_distinct_unknown_column_still_rejected() { + check_query( + "SELECT COUNT(DISTINCT bogus_column) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1, L2, L3", + vec![], + Some(QueryError::InvalidValueCol), + ); + } + #[test] fn test_error_spatial_scrape_duration_too_small() { check_query( @@ -1034,4 +1101,143 @@ mod tests { .unwrap(); assert!(incoming.matches_sql_pattern(&template)); } + + // ── COUNT(DISTINCT col) support ────────────────────────────────────────── + // + // `COUNT(DISTINCT col)` must be normalised to a cardinality aggregation + // (`AggregationInfo.name == "CARDINALITY"`) so the engine routes it to a + // distinct-tracking sketch (SetAggregator / HLL) instead of a plain Count + // sketch. The parser today drops `DISTINCT` silently — a parser-level bug + // that would dispatch streaming counts as totals. + + #[test] + fn test_count_distinct_single_column_maps_to_cardinality() { + // The structural signature of the user's COUNT(DISTINCT) query. + let q = parse_sql_query( + "SELECT L1, COUNT(DISTINCT L2) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1", + ) + .expect("COUNT(DISTINCT col) should parse"); + assert_eq!(q.aggregation_info.get_name(), "CARDINALITY"); + assert_eq!(q.aggregation_info.get_value_column_name(), "L2"); + assert!(q.aggregation_info.get_args().is_empty()); + assert!(q.labels.contains("L1")); + } + + #[test] + fn test_count_distinct_full_user_query_with_order_by_limit() { + // The exact shape of the user's HLL netflow query, ported to the test schema. + let q = parse_sql_query( + "SELECT L1, COUNT(DISTINCT L2) AS unique_peers FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) \ + GROUP BY L1 \ + ORDER BY unique_peers DESC LIMIT 20", + ) + .expect("COUNT(DISTINCT col) + ORDER BY + LIMIT should parse"); + assert_eq!(q.aggregation_info.get_name(), "CARDINALITY"); + assert_eq!(q.aggregation_info.get_value_column_name(), "L2"); + assert_eq!(q.aggregation_alias.as_deref(), Some("unique_peers")); + assert_eq!(q.order_by.len(), 1); + assert_eq!(q.order_by[0].column, "unique_peers"); + assert!(!q.order_by[0].ascending); + assert_eq!(q.limit, Some(20)); + } + + #[test] + fn test_count_distinct_matches_count_distinct_template() { + // Pattern matching: incoming COUNT(DISTINCT col) with absolute timestamps must + // match a NOW()-relative COUNT(DISTINCT col) template. + let template = parse_sql_query( + "SELECT COUNT(DISTINCT L2) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1", + ) + .unwrap(); + let incoming = parse_sql_query( + "SELECT COUNT(DISTINCT L2) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, '2025-10-01 00:00:10') AND '2025-10-01 00:00:10' \ + GROUP BY L1", + ) + .unwrap(); + assert!(incoming.matches_sql_pattern(&template)); + } + + #[test] + fn test_count_distinct_does_not_match_plain_count_template() { + // CARDINALITY and COUNT are distinct aggregations — a COUNT(DISTINCT col) + // template must not be served by an incoming COUNT(col) query (and vice versa). + let count_distinct = parse_sql_query( + "SELECT COUNT(DISTINCT L2) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1", + ) + .unwrap(); + let plain_count = parse_sql_query( + "SELECT COUNT(L2) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1", + ) + .unwrap(); + assert!(!plain_count.matches_sql_pattern(&count_distinct)); + assert!(!count_distinct.matches_sql_pattern(&plain_count)); + } + + #[test] + fn test_count_all_treated_as_plain_count() { + // The redundant explicit `ALL` modifier (the SQL default) must NOT switch the + // aggregation to CARDINALITY; only `DISTINCT` triggers cardinality semantics. + let q = parse_sql_query( + "SELECT COUNT(ALL L2) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1", + ) + .expect("COUNT(ALL col) should parse as plain COUNT"); + assert_eq!(q.aggregation_info.get_name(), "COUNT"); + } + + #[test] + fn test_count_without_distinct_remains_count() { + // Regression guard: ensure the DISTINCT-aware path doesn't accidentally rewrite + // `COUNT(col)` (without any duplicate_treatment). + let q = parse_sql_query( + "SELECT COUNT(L2) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1", + ) + .expect("COUNT(col) should parse"); + assert_eq!(q.aggregation_info.get_name(), "COUNT"); + } + + #[test] + fn test_count_distinct_multiple_columns_rejected() { + // Multi-column DISTINCT (`COUNT(DISTINCT a, b)`) is a compound-key cardinality + // that the structural model can't represent with a single value_column. Reject + // it explicitly rather than silently keeping only the first argument. + assert!(parse_sql_query( + "SELECT COUNT(DISTINCT L1, L2) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L3", + ) + .is_none()); + } + + #[test] + fn test_distinct_on_non_count_aggregate_rejected() { + // DISTINCT on aggregates other than COUNT (e.g. `SUM(DISTINCT v)`, `AVG(DISTINCT v)`) + // is not modelled by any precompute sketch type; reject rather than silently + // dropping the modifier and dispatching to a plain Sum. + assert!(parse_sql_query( + "SELECT SUM(DISTINCT value) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1", + ) + .is_none()); + assert!(parse_sql_query( + "SELECT AVG(DISTINCT value) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1", + ) + .is_none()); + } } diff --git a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_matcher.rs b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_matcher.rs index c23145b3..8e3b43ae 100644 --- a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_matcher.rs +++ b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_matcher.rs @@ -102,6 +102,12 @@ impl SQLPatternMatcher { legal_aggregations.insert("MIN"); legal_aggregations.insert("MAX"); legal_aggregations.insert("QUANTILE"); + // `COUNT(DISTINCT col)` is normalised by the parser to the aggregation + // name "CARDINALITY" (see `SQLPatternParser::get_aggregation`). Without + // this entry the simple_engine SQL handler rejects every COUNT(DISTINCT) + // query with `IllegalAggregationFn` before pattern matching runs, which + // in turn blocks routing to the precompute engine's HLL accumulator. + legal_aggregations.insert("CARDINALITY"); Self { schema, @@ -176,10 +182,22 @@ impl SQLPatternMatcher { } let value_column_name = query.aggregation_info.get_value_column_name(); - if !self - .schema - .is_valid_value_column(&query.metric, value_column_name) - { + // `COUNT(DISTINCT col)` (normalised to "CARDINALITY") legitimately + // targets metadata/label columns (e.g. `COUNT(DISTINCT dstip)`), + // which the schema lists under metadata_columns rather than + // value_columns. Accept either bucket for CARDINALITY; for all + // other aggregations keep the strict value_columns-only check. + let column_is_known = if query.aggregation_info.get_name() == "CARDINALITY" { + self.schema + .is_valid_value_column(&query.metric, value_column_name) + || self.schema.get_metadata_columns(&query.metric).is_some_and( + |cols| cols.contains(value_column_name), + ) + } else { + self.schema + .is_valid_value_column(&query.metric, value_column_name) + }; + if !column_is_known { println!("Returned QueryError::InvalidValueCol"); return Err(( diff --git a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs index 1b98a592..fa3f655b 100644 --- a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs +++ b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs @@ -366,6 +366,29 @@ impl SQLPatternParser { let name = func.name.to_string().to_uppercase(); + // DISTINCT handling. The structural model tracks at most one value column, + // so we only support DISTINCT in its single-column COUNT form, which we + // normalise to a cardinality aggregation: + // COUNT(DISTINCT col) → name="CARDINALITY", value_column=col + // COUNT(DISTINCT col1, col2) → rejected (compound-key distinct) + // COUNT(ALL col), COUNT(col) → unchanged (plain COUNT) + // SUM/AVG/...(DISTINCT col) → rejected (no sketch backs distinct-sum) + let has_distinct = matches!( + &func.args, + FunctionArguments::List(list) + if list.duplicate_treatment == Some(DuplicateTreatment::Distinct) + ); + if has_distinct { + if name != "COUNT" { + return None; + } + if let FunctionArguments::List(list) = &func.args { + if list.args.len() != 1 { + return None; + } + } + } + let args = self.get_quantile_args(func); // Get the column being aggregated @@ -425,9 +448,13 @@ impl SQLPatternParser { } }; - // Always store PERCENTILE as QUANTILE internally + // Normalisation: + // - PERCENTILE → QUANTILE (legacy alias). + // - COUNT(DISTINCT col) → CARDINALITY (validated above to be single-arg). let normalized_name = if name == "PERCENTILE" { "QUANTILE".to_string() + } else if has_distinct { + "CARDINALITY".to_string() } else { name }; diff --git a/asap-query-engine/src/engines/physical/accumulator_serde.rs b/asap-query-engine/src/engines/physical/accumulator_serde.rs index cc7f85ea..f38ce4a0 100644 --- a/asap-query-engine/src/engines/physical/accumulator_serde.rs +++ b/asap-query-engine/src/engines/physical/accumulator_serde.rs @@ -12,7 +12,7 @@ use datafusion_summary_library::SketchType; use crate::data_model::{MultipleSubpopulationAggregate, SingleSubpopulationAggregate}; use crate::precompute_operators::{ CountMinSketchAccumulator, DatasketchesKLLAccumulator, DeltaSetAggregatorAccumulator, - HydraKllSketchAccumulator, MultipleIncreaseAccumulator, MultipleSumAccumulator, + HllAccumulator, HydraKllSketchAccumulator, MultipleIncreaseAccumulator, MultipleSumAccumulator, SetAggregatorAccumulator, SumAccumulator, }; use crate::AggregateCore; @@ -114,6 +114,14 @@ pub fn deserialize_accumulator( Ok(Box::new(acc)) } + // Cardinality sketches + SketchType::HLL => { + let acc = HllAccumulator::deserialize_from_bytes_arroyo(bytes).map_err(|e| { + DataFusionError::Internal(format!("Failed to deserialize HLL: {}", e)) + })?; + Ok(Box::new(acc)) + } + // Sketches that aren't implemented yet _ => Err(DataFusionError::NotImplemented(format!( "Accumulator deserialization not implemented for: {:?}", @@ -183,6 +191,12 @@ pub fn deserialize_single_subpopulation( })?; Ok(Box::new(acc)) } + SketchType::HLL => { + let acc = HllAccumulator::deserialize_from_bytes_arroyo(bytes).map_err(|e| { + DataFusionError::Internal(format!("Failed to deserialize HLL: {}", e)) + })?; + Ok(Box::new(acc)) + } _ => Err(DataFusionError::NotImplemented(format!( "SingleSubpopulationAggregate deserialization not implemented for: {:?}", summary_type @@ -304,10 +318,42 @@ mod tests { assert!(result.is_err()); } + #[test] + fn test_deserialize_hll_round_trip() { + // HLL is now a supported path. Build a real accumulator, serialize it, + // then verify deserialize_accumulator and deserialize_single_subpopulation + // both reconstruct a working accumulator that reports the same estimate. + use crate::data_model::SerializableToSink; + use crate::precompute_operators::HllAccumulator; + let mut acc = HllAccumulator::new(14); + for i in 0..500 { + acc.update(i as f64); + } + let bytes = acc.serialize_to_bytes(); + let want = acc.estimate(); + + let core = deserialize_accumulator(&bytes, &SketchType::HLL).expect("HLL deserialize"); + assert_eq!(core.get_accumulator_type(), AggregationType::HLL); + let core_hll = core + .as_any() + .downcast_ref::() + .expect("downcast HllAccumulator"); + assert_eq!(core_hll.estimate(), want); + + let single = + deserialize_single_subpopulation(&bytes, &SketchType::HLL).expect("HLL single-pop"); + let via_query = single + .query(promql_utilities::query_logics::enums::Statistic::Cardinality, None) + .expect("Cardinality query"); + assert_eq!(via_query, want); + } + #[test] fn test_deserialize_unsupported_type() { + // MinMax keys-accumulator path remains unimplemented; ensure the generic + // dispatch still errors out cleanly for sketch types we haven't wired. let bytes = vec![1, 2, 3, 4]; - let result = deserialize_accumulator(&bytes, &SketchType::HLL); + let result = deserialize_accumulator(&bytes, &SketchType::MinMax); assert!(result.is_err()); } diff --git a/asap-query-engine/src/precompute_engine/accumulator_factory.rs b/asap-query-engine/src/precompute_engine/accumulator_factory.rs index 13c86b07..d6724cbc 100644 --- a/asap-query-engine/src/precompute_engine/accumulator_factory.rs +++ b/asap-query-engine/src/precompute_engine/accumulator_factory.rs @@ -1,8 +1,8 @@ use crate::data_model::{AggregateCore, AggregationType, KeyByLabelValues, Measurement}; use crate::precompute_operators::{ - CountMinSketchAccumulator, DatasketchesKLLAccumulator, HydraKllSketchAccumulator, + CountMinSketchAccumulator, DatasketchesKLLAccumulator, HllAccumulator, HydraKllSketchAccumulator, IncreaseAccumulator, MinMaxAccumulator, MultipleIncreaseAccumulator, MultipleMinMaxAccumulator, - MultipleSumAccumulator, SumAccumulator, + MultipleSumAccumulator, SumAccumulator, DEFAULT_HLL_PRECISION, }; use asap_types::aggregation_config::AggregationConfig; @@ -266,6 +266,55 @@ impl AccumulatorUpdater for KllAccumulatorUpdater { } } +// --------------------------------------------------------------------------- +// HllAccumulatorUpdater +// --------------------------------------------------------------------------- + +/// Updater for `AggregationType::HLL`. Single-population per grouping key — +/// behaves like `KllAccumulatorUpdater` from the worker's perspective: feed +/// raw f64 values, ignore the key argument. Internally hashes each value's +/// little-endian bytes into the wrapped HLL sketch. +pub struct HllAccumulatorUpdater { + acc: HllAccumulator, + precision: u32, +} + +impl HllAccumulatorUpdater { + pub fn new(precision: u32) -> Self { + Self { + acc: HllAccumulator::new(precision), + precision, + } + } +} + +impl AccumulatorUpdater for HllAccumulatorUpdater { + fn update_single(&mut self, value: f64, _timestamp_ms: i64) { + self.acc.update(value); + } + + fn update_keyed(&mut self, _key: &KeyByLabelValues, value: f64, timestamp_ms: i64) { + self.update_single(value, timestamp_ms); + } + + impl_clone_accumulator_methods!(acc); + + fn reset(&mut self) { + self.acc = HllAccumulator::new(self.precision); + } + + fn is_keyed(&self) -> bool { + false + } + + fn memory_usage_bytes(&self) -> usize { + // 1 byte per register; register count = 2^precision. Add a small fixed + // overhead for the HllSketch wrapper (variant, precision, HIP fields). + let registers = 1usize << self.precision; + std::mem::size_of::() + registers + } +} + // --------------------------------------------------------------------------- // MultipleSumAccumulatorUpdater // --------------------------------------------------------------------------- @@ -589,6 +638,28 @@ fn hydra_kll_params(config: &AggregationConfig) -> (usize, usize, u16) { (row_num, col_num, kll_k_param(config)) } +/// Extract the HLL `precision` parameter from a config. Falls back to +/// `DEFAULT_HLL_PRECISION` (14) when absent or non-numeric. The valid range is +/// 4..=18 per the underlying `HllSketch` storage; out-of-range values are +/// clamped and warned about so a typo doesn't crash the streaming worker. +fn hll_precision_param(config: &AggregationConfig) -> u32 { + let raw = config + .parameters + .get("precision") + .and_then(|v| v.as_u64()) + .map(|v| v as u32); + match raw { + Some(p) if (4..=18).contains(&p) => p, + Some(p) => { + tracing::warn!( + "HLL precision {p} is out of range (4..=18); using default {DEFAULT_HLL_PRECISION}" + ); + DEFAULT_HLL_PRECISION + } + None => DEFAULT_HLL_PRECISION, + } +} + // --------------------------------------------------------------------------- // Factory function // --------------------------------------------------------------------------- @@ -656,6 +727,7 @@ pub fn create_accumulator_updater(config: &AggregationConfig) -> Box Box::new(HllAccumulatorUpdater::new(hll_precision_param(config))), other => { tracing::warn!( "Unknown aggregation_type '{:?}', defaulting to SingleSubpopulation Sum", @@ -824,6 +896,174 @@ mod tests { } } + // ── HLL updater ────────────────────────────────────────────────────── + // + // `COUNT(DISTINCT col)` queries flow through `AggregationType::HLL`. The + // factory must produce an `HllAccumulatorUpdater` (not the silent + // `SumAccumulatorUpdater` fallback that the old default arm gave) so the + // streaming layer actually hashes incoming samples into an HLL register + // array rather than summing them. + + #[test] + fn test_hll_updater_via_factory_routes_to_hll_accumulator() { + use std::collections::HashMap; + let config = AggregationConfig::new( + 42, + AggregationType::HLL, + String::new(), + HashMap::new(), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + String::new(), + 60, + 0, + WindowType::Tumbling, + "m".to_string(), + "m".to_string(), + None, + None, + None, + None, + ); + let mut updater = create_accumulator_updater(&config); + assert!( + !updater.is_keyed(), + "HLL is single-population per grouping key (like KLL), not keyed", + ); + + // Feed 100 distinct values; the resulting accumulator should report an + // estimate near 100 (not a sum of 0+1+…+99 ≈ 4950, which is what the + // old SumAccumulatorUpdater fallback would have produced). + for i in 0..100 { + updater.update_single(i as f64, i * 1000); + } + let acc = updater.take_accumulator(); + assert_eq!(acc.type_name(), "HllAccumulator"); + assert_eq!(acc.get_accumulator_type(), AggregationType::HLL); + + let hll = acc + .as_any() + .downcast_ref::() + .expect("factory must produce HllAccumulator for AggregationType::HLL"); + let est = hll.estimate(); + assert!( + est > 90.0 && est < 110.0, + "100 distinct inserts should yield estimate near 100, got {est}", + ); + } + + #[test] + fn test_hll_updater_precision_param_propagates() { + // `parameters: { precision: 12 }` must flow into the HllAccumulator, not + // be silently dropped. 12-bit precision yields a 4 KiB register array, + // serialising to a noticeably smaller msgpack body than the 16 KiB + // default — that's the property we check (no need to assert exact size). + use std::collections::HashMap; + let mut params = HashMap::new(); + params.insert( + "precision".to_string(), + serde_json::Value::from(12_u64), + ); + let config = AggregationConfig::new( + 7, + AggregationType::HLL, + String::new(), + params, + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + String::new(), + 60, + 0, + WindowType::Tumbling, + "m".to_string(), + "m".to_string(), + None, + None, + None, + None, + ); + let updater = create_accumulator_updater(&config); + let acc = updater.snapshot_accumulator(); + let hll = acc + .as_any() + .downcast_ref::() + .expect("AggregationType::HLL → HllAccumulator"); + assert_eq!(hll.precision(), 12); + } + + #[test] + fn test_hll_updater_default_precision_is_14() { + // When no `precision` parameter is supplied, the factory must use the + // documented default (14) — not whatever the type default resolves to. + use std::collections::HashMap; + let config = AggregationConfig::new( + 7, + AggregationType::HLL, + String::new(), + HashMap::new(), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + String::new(), + 60, + 0, + WindowType::Tumbling, + "m".to_string(), + "m".to_string(), + None, + None, + None, + None, + ); + let updater = create_accumulator_updater(&config); + let acc = updater.snapshot_accumulator(); + let hll = acc + .as_any() + .downcast_ref::() + .expect("AggregationType::HLL → HllAccumulator"); + assert_eq!(hll.precision(), 14); + } + + #[test] + fn test_hll_updater_reset_clears_state() { + // After reset(), a freshly-taken accumulator must produce an empty (0.0) + // estimate — otherwise pane reuse across tumbling windows would leak + // distinct values from the previous pane into the next. + use std::collections::HashMap; + let config = AggregationConfig::new( + 7, + AggregationType::HLL, + String::new(), + HashMap::new(), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + promql_utilities::data_model::key_by_label_names::KeyByLabelNames::new(vec![]), + String::new(), + 60, + 0, + WindowType::Tumbling, + "m".to_string(), + "m".to_string(), + None, + None, + None, + None, + ); + let mut updater = create_accumulator_updater(&config); + for i in 0..50 { + updater.update_single(i as f64, 0); + } + updater.reset(); + let acc = updater.take_accumulator(); + let hll = acc + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(hll.estimate(), 0.0); + } + #[test] fn test_kll_k_param_capital_k() { // SingleSubpopulation/KLL with capital "K" param should use it (not default to 200) diff --git a/asap-query-engine/src/precompute_operators/hll_accumulator.rs b/asap-query-engine/src/precompute_operators/hll_accumulator.rs new file mode 100644 index 00000000..c87aa962 --- /dev/null +++ b/asap-query-engine/src/precompute_operators/hll_accumulator.rs @@ -0,0 +1,424 @@ +//! HyperLogLog accumulator for `COUNT(DISTINCT col)` queries. +//! +//! Wraps `asap_sketchlib::HllSketch` (precision-14 `HllVariant::Regular` by default) +//! and exposes it through the precompute-engine accumulator traits so the streaming +//! worker can feed values into it and the query path can dispatch `Cardinality` +//! against the resulting sketch. +//! +//! Mirrors the `DatasketchesKLLAccumulator` layout: a single subpopulation per +//! grouping key, msgpack-encoded byte serialization (round-trippable, unlike the +//! lossy `datafusion_summary_library::physical::hll::HllSketch::to_bytes`). +//! +//! End-to-end pipeline for `COUNT(DISTINCT col)`: +//! 1. SQL parser normalises the call to `name="CARDINALITY", value_column=col`. +//! 2. `AggregationOperator::Cardinality.to_statistics()` → `[Statistic::Cardinality]`. +//! 3. Capability matching pairs the query with an `AggregationType::HLL` config. +//! 4. The precompute worker feeds `col`'s f64 samples through +//! `HllAccumulatorUpdater::update_single`, which calls `HllAccumulator::update`. +//! 5. Query path: `plan_builder` maps `Statistic::Cardinality → InferOperation::CountDistinct`; +//! `SummaryInferExec` deserializes the stored sketch and calls +//! `SingleSubpopulationAggregate::query(Cardinality)`, which returns +//! `HllSketch::estimate()`. + +use std::collections::HashMap; + +use asap_sketchlib::sketches::hll::{HllSketch, HllVariant}; +use base64::{engine::general_purpose, Engine as _}; +use serde_json::Value; + +use promql_utilities::query_logics::enums::Statistic; + +use crate::data_model::{ + AggregateCore, AggregationType, MergeableAccumulator, SerializableToSink, + SingleSubpopulationAggregate, +}; + +/// Default precision when none is supplied via streaming-config parameters. +/// Matches `datafusion_summary_library::physical::hll::HllSketch` (~0.8% std error, +/// ~16 KiB per sketch). +pub const DEFAULT_HLL_PRECISION: u32 = 14; + +/// HLL sketch accumulator — wraps `asap_sketchlib::HllSketch`. +/// Core insert/merge/serde logic lives in `asap_sketchlib`; this file retains +/// the QE-specific trait impls (`AggregateCore`, `SingleSubpopulationAggregate`, +/// `SerializableToSink`, `MergeableAccumulator`). +#[derive(Debug, Clone)] +pub struct HllAccumulator { + pub inner: HllSketch, +} + +impl HllAccumulator { + pub fn new(precision: u32) -> Self { + Self { + inner: HllSketch::new(HllVariant::Regular, precision), + } + } + + pub fn with_default_precision() -> Self { + Self::new(DEFAULT_HLL_PRECISION) + } + + /// Feed a value into the sketch. The streaming layer surfaces all column + /// values as `f64`; we hash their little-endian bytes via `HllSketch::update`, + /// which goes through the canonical sketchlib hash (`hash64_seeded` with + /// `CANONICAL_HASH_SEED`). That makes the on-disk sketch byte-identical to + /// what `sketchlib-go` would produce for the same stream — cross-language + /// parity is locked in by upstream's golden-byte test. + pub fn update(&mut self, value: f64) { + self.inner.update(&value.to_le_bytes()); + } + + /// Current cardinality estimate. Returned as `f64` so callers don't lose the + /// fractional small-range correction; the engine truncates to an integer in + /// the final result column when needed. + pub fn estimate(&self) -> f64 { + self.inner.estimate() + } + + pub fn precision(&self) -> u32 { + self.inner.precision + } + + pub fn deserialize_from_bytes_arroyo( + buffer: &[u8], + ) -> Result> { + let inner = HllSketch::deserialize_msgpack(buffer) + .map_err(|e| -> Box { e.to_string().into() })?; + Ok(Self { inner }) + } +} + +impl Default for HllAccumulator { + fn default() -> Self { + Self::with_default_precision() + } +} + +impl SerializableToSink for HllAccumulator { + fn serialize_to_json(&self) -> Value { + // Mirrors KLL's JSON shape: opaque base64 of the canonical msgpack body so + // downstream consumers can round-trip through any serde-capable channel + // without losing the register array. + let bytes = self.inner.serialize_msgpack().unwrap_or_default(); + let b64 = general_purpose::STANDARD.encode(&bytes); + serde_json::json!({ + "sketch": b64, + "precision": self.inner.precision, + "variant": format!("{:?}", self.inner.variant), + }) + } + + fn serialize_to_bytes(&self) -> Vec { + self.inner.serialize_msgpack().unwrap_or_default() + } +} + +impl AggregateCore for HllAccumulator { + fn clone_boxed_core(&self) -> Box { + Box::new(self.clone()) + } + + fn type_name(&self) -> &'static str { + "HllAccumulator" + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn merge_with( + &self, + other: &dyn AggregateCore, + ) -> Result, Box> { + if other.get_accumulator_type() != self.get_accumulator_type() { + return Err(format!( + "Cannot merge HllAccumulator with {}", + other.get_accumulator_type() + ) + .into()); + } + let other_hll = other + .as_any() + .downcast_ref::() + .ok_or("Failed to downcast to HllAccumulator")?; + + let mut merged = self.inner.clone(); + merged.merge(&other_hll.inner)?; + Ok(Box::new(Self { inner: merged })) + } + + fn get_accumulator_type(&self) -> AggregationType { + AggregationType::HLL + } + + fn get_keys(&self) -> Option> { + // HLL is a probabilistic counter, not a set: it estimates |S| without + // retaining the individual members of S. There are no keys to enumerate. + None + } + + fn query_statistic( + &self, + statistic: Statistic, + _key: &Option, + query_kwargs: &HashMap, + ) -> Result> { + // HLL is a single-population accumulator (one sketch per grouping key + // partition), so `key` is unused — same shape as KllAccumulator. + SingleSubpopulationAggregate::query(self, statistic, Some(query_kwargs)) + } +} + +impl SingleSubpopulationAggregate for HllAccumulator { + fn query( + &self, + statistic: Statistic, + _query_kwargs: Option<&HashMap>, + ) -> Result> { + match statistic { + Statistic::Cardinality => Ok(self.estimate()), + other => Err(format!("Unsupported statistic in HllAccumulator: {other:?}").into()), + } + } + + fn clone_boxed(&self) -> Box { + Box::new(self.clone()) + } +} + +impl MergeableAccumulator for HllAccumulator { + fn merge_accumulators( + accumulators: Vec, + ) -> Result> { + if accumulators.is_empty() { + return Err("No accumulators to merge".into()); + } + let mut iter = accumulators.into_iter(); + let mut merged = iter.next().unwrap(); + for acc in iter { + merged.inner.merge(&acc.inner)?; + } + Ok(merged) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + /// Helper: insert n unique f64 values and return the accumulator. + fn build_with_n_unique(n: usize, precision: u32) -> HllAccumulator { + let mut acc = HllAccumulator::new(precision); + for i in 0..n { + acc.update(i as f64); + } + acc + } + + #[test] + fn new_with_precision_has_zero_estimate() { + let acc = HllAccumulator::new(14); + assert_eq!(acc.precision(), 14); + // Empty register array → estimate exactly 0.0 by the HLL small-range correction. + assert_eq!(acc.estimate(), 0.0); + } + + #[test] + fn default_uses_documented_precision() { + let acc = HllAccumulator::default(); + assert_eq!(acc.precision(), DEFAULT_HLL_PRECISION); + } + + #[test] + fn update_distinct_values_grows_estimate_within_tolerance() { + // 1000 distinct values at precision 14 should land within the documented + // ~0.8% standard error (allow 5% to keep this test deterministic; the + // accuracy-tightness is upstream's `assert_accuracy` test). + let acc = build_with_n_unique(1000, 14); + let est = acc.estimate(); + assert!( + est > 900.0 && est < 1100.0, + "expected estimate near 1000, got {est}" + ); + } + + #[test] + fn duplicates_do_not_increase_estimate() { + let mut acc = HllAccumulator::new(14); + for _ in 0..10_000 { + acc.update(42.0); + } + let est = acc.estimate(); + assert!( + est <= 5.0, + "estimate after 10k duplicates of one value should be ≈ 1, got {est}" + ); + } + + #[test] + fn query_cardinality_returns_estimate() { + let acc = build_with_n_unique(500, 14); + let via_trait = SingleSubpopulationAggregate::query(&acc, Statistic::Cardinality, None) + .expect("Cardinality should be supported"); + assert_eq!(via_trait, acc.estimate()); + } + + #[test] + fn query_other_statistic_errors() { + let acc = HllAccumulator::new(14); + for stat in [ + Statistic::Sum, + Statistic::Count, + Statistic::Min, + Statistic::Max, + Statistic::Quantile, + Statistic::Topk, + Statistic::Rate, + Statistic::Increase, + ] { + assert!( + SingleSubpopulationAggregate::query(&acc, stat, None).is_err(), + "HLL should reject {stat:?}", + ); + } + } + + #[test] + fn merge_two_disjoint_sketches_approximates_union_size() { + // 500 evens + 500 odds = 1000 distinct values total. + let mut left = HllAccumulator::new(14); + let mut right = HllAccumulator::new(14); + for i in 0..500 { + left.update((i * 2) as f64); + right.update((i * 2 + 1) as f64); + } + let merged = HllAccumulator::merge_accumulators(vec![left.clone(), right]) + .expect("merge of two same-precision sketches should succeed"); + let est = merged.estimate(); + assert!( + est > 900.0 && est < 1100.0, + "merged estimate should be ≈1000, got {est}" + ); + // Left should be untouched (merge_accumulators consumed clones). + let left_est = left.estimate(); + assert!( + left_est > 450.0 && left_est < 550.0, + "left sketch should still report ≈500, got {left_est}" + ); + } + + #[test] + fn merge_via_aggregate_core_trait_returns_same_result() { + // Exercises the AggregateCore::merge_with path used by the query engine + // when collapsing multiple per-pane sketches at infer time. + let left = build_with_n_unique(300, 14); + let mut right = HllAccumulator::new(14); + for i in 300..700 { + right.update(i as f64); + } + let merged_box = left.merge_with(&right).expect("merge_with should succeed"); + assert_eq!( + merged_box.get_accumulator_type(), + AggregationType::HLL, + "merged accumulator must report HLL type", + ); + let merged = merged_box + .as_any() + .downcast_ref::() + .expect("downcast HllAccumulator"); + let est = merged.estimate(); + // 700 distinct values; allow 5% tolerance. + assert!( + est > 650.0 && est < 750.0, + "merged estimate should be ≈700, got {est}" + ); + } + + #[test] + fn merge_with_wrong_type_errors() { + // Cross-type merges must fail rather than silently produce garbage. + // Use any other AggregateCore impl — SetAggregatorAccumulator is the + // sibling distinct-tracking type and the most likely accidental swap. + use crate::precompute_operators::SetAggregatorAccumulator; + let acc = HllAccumulator::new(14); + let other = SetAggregatorAccumulator::new(); + let result = acc.merge_with(&other); + assert!( + result.is_err(), + "merging HLL with non-HLL accumulator must error", + ); + let msg = result.err().unwrap().to_string(); + assert!( + msg.contains("HllAccumulator") || msg.contains("Cannot merge"), + "error message should mention HllAccumulator, got: {msg}", + ); + } + + #[test] + fn msgpack_round_trip_preserves_estimate() { + // Real serialisation: the register array survives encode→decode and the + // estimate is identical (modulo the lossless f64 estimator math). + // This is the property that the existing + // `datafusion_summary_library::physical::hll::HllSketch` lacks — its + // to_bytes/from_bytes drop the register state and only persist the count, + // which would corrupt any merge that happens after a store round-trip. + let original = build_with_n_unique(2000, 14); + let bytes = original.serialize_to_bytes(); + assert!(!bytes.is_empty(), "serialize_to_bytes must produce data"); + let restored = + HllAccumulator::deserialize_from_bytes_arroyo(&bytes).expect("msgpack round trip"); + assert_eq!(restored.precision(), original.precision()); + assert_eq!(restored.estimate(), original.estimate()); + // Bytes must be stable across re-encode (canonical form). + assert_eq!(restored.serialize_to_bytes(), bytes); + } + + #[test] + fn round_trip_then_merge_recovers_full_state() { + // Regression guard for the lossy-serialisation footgun: serialize, deserialize, + // then merge new data — if the restored sketch had dropped its register + // state, the post-merge estimate would underflow. + let acc_a = build_with_n_unique(1000, 14); + let bytes = acc_a.serialize_to_bytes(); + let restored = + HllAccumulator::deserialize_from_bytes_arroyo(&bytes).expect("msgpack round trip"); + + let mut acc_b = HllAccumulator::new(14); + for i in 1000..2000 { + acc_b.update(i as f64); + } + let merged = restored + .merge_with(&acc_b) + .expect("merge restored + new must succeed"); + let est = merged + .as_any() + .downcast_ref::() + .unwrap() + .estimate(); + // 2000 distinct values; allow 5% tolerance. + assert!( + est > 1800.0 && est < 2200.0, + "post-round-trip merge estimate should be ≈2000, got {est}", + ); + } + + #[test] + fn json_serialisation_includes_sketch_blob_and_precision() { + let acc = build_with_n_unique(100, 14); + let json = acc.serialize_to_json(); + assert!(json.get("sketch").and_then(|v| v.as_str()).is_some()); + assert_eq!(json["precision"], 14); + } + + #[test] + fn aggregate_core_query_statistic_dispatches_to_estimate() { + // The engine path goes through AggregateCore::query_statistic; verify the + // dispatch reaches the same code as the trait method. + let acc = build_with_n_unique(500, 14); + let kwargs = HashMap::new(); + let via_core = acc + .query_statistic(Statistic::Cardinality, &None, &kwargs) + .expect("query_statistic should dispatch Cardinality"); + assert_eq!(via_core, acc.estimate()); + } +} diff --git a/asap-query-engine/src/precompute_operators/mod.rs b/asap-query-engine/src/precompute_operators/mod.rs index 76ed652d..36ab865b 100644 --- a/asap-query-engine/src/precompute_operators/mod.rs +++ b/asap-query-engine/src/precompute_operators/mod.rs @@ -3,6 +3,7 @@ pub mod count_min_sketch_with_heap_accumulator; pub mod datasketches_kll_accumulator; pub mod delta_set_aggregator_accumulator; pub mod error; +pub mod hll_accumulator; pub mod hydra_kll_accumulator; pub mod increase_accumulator; pub mod min_max_accumulator; @@ -17,6 +18,7 @@ pub use count_min_sketch_with_heap_accumulator::*; pub use datasketches_kll_accumulator::*; pub use delta_set_aggregator_accumulator::*; pub use error::AccumulatorError; +pub use hll_accumulator::*; pub use hydra_kll_accumulator::*; pub use increase_accumulator::*; pub use min_max_accumulator::*; From 8612022f058f034b39eed262c036799568ca1817 Mon Sep 17 00:00:00 2001 From: Akanksha Akkihal Date: Tue, 26 May 2026 03:53:15 +0000 Subject: [PATCH 2/3] Refined comments --- .../rs/asap_types/src/capability_matching.rs | 4 - .../src/query_logics/enums.rs | 5 +- .../src/ast_matching/sqlparser_test.rs | 115 +++++++----------- .../src/ast_matching/sqlpattern_matcher.rs | 6 +- .../precompute_engine/accumulator_factory.rs | 9 +- .../precompute_operators/hll_accumulator.rs | 45 +------ 6 files changed, 52 insertions(+), 132 deletions(-) diff --git a/asap-common/dependencies/rs/asap_types/src/capability_matching.rs b/asap-common/dependencies/rs/asap_types/src/capability_matching.rs index fb2b797d..26bc6975 100644 --- a/asap-common/dependencies/rs/asap_types/src/capability_matching.rs +++ b/asap-common/dependencies/rs/asap_types/src/capability_matching.rs @@ -33,10 +33,6 @@ pub fn compatible_agg_types(stat: Statistic) -> &'static [AggregationType] { Statistic::Cardinality => &[ AggregationType::SetAggregator, AggregationType::DeltaSetAggregator, - // HLL is the single-population probabilistic alternative used by - // `COUNT(DISTINCT col) GROUP BY …` queries. It backs the per-bucket - // sketch directly (no paired key aggregation required) — see - // `is_multi_population_value_type`, which excludes HLL. AggregationType::HLL, ], Statistic::Topk => &[AggregationType::CountMinSketchWithHeap], diff --git a/asap-common/dependencies/rs/promql_utilities/src/query_logics/enums.rs b/asap-common/dependencies/rs/promql_utilities/src/query_logics/enums.rs index d2a2423a..53f17e5c 100644 --- a/asap-common/dependencies/rs/promql_utilities/src/query_logics/enums.rs +++ b/asap-common/dependencies/rs/promql_utilities/src/query_logics/enums.rs @@ -185,8 +185,6 @@ pub enum AggregationOperator { Min, Max, Topk, - /// Distinct-value count. SQL-side normalisation maps `COUNT(DISTINCT col)` to - /// the aggregation name "CARDINALITY", which lands here. Cardinality, } @@ -453,8 +451,7 @@ mod tests { fn test_aggregation_operator_cardinality_round_trip() { // The SQL parser normalises `COUNT(DISTINCT col)` to the aggregation name // "CARDINALITY"; `parse_single_statistic` then routes it through - // `AggregationOperator::FromStr`. Without a Cardinality variant the lookup - // returns Err and the query is rejected as "Unsupported statistic name". + // `AggregationOperator::FromStr`. let op: AggregationOperator = "cardinality".parse().expect("cardinality should parse"); assert_eq!(op, AggregationOperator::Cardinality); assert_eq!(op.to_statistics(), vec![Statistic::Cardinality]); diff --git a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs index 30efb0eb..96fe4348 100644 --- a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs +++ b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs @@ -595,73 +595,6 @@ mod tests { ); } - /// Regression for three matcher-side gaps that surface together when the - /// simple_engine SQL path runs an HLL `COUNT(DISTINCT)` query end-to-end: - /// - /// 1. The parser correctly normalises `COUNT(DISTINCT col)` to the - /// aggregation name `"CARDINALITY"`, but - /// `SQLPatternMatcher::is_valid_aggregation` never gained `CARDINALITY` - /// in its `legal_aggregations` set, so the validator rejected the query - /// with `IllegalAggregationFn` before pattern matching ran. - /// - /// 2. After fixing (1), `flatten_query_info` validates the aggregation's - /// "value column" against `schema.is_valid_value_column`, which only - /// knows table value columns. `COUNT(DISTINCT col)` legitimately targets - /// metadata/label columns (e.g. `COUNT(DISTINCT dstip)`), so the - /// validator rejected it with `InvalidValueCol`. The fix accepts - /// metadata columns *only* for CARDINALITY. - /// - /// 3. With both fixed, the query classifies as `SpatioTemporal` because - /// `GROUP BY` only covers a subset of metadata columns — exactly the - /// shape of the user's real `COUNT(DISTINCT dstip) GROUP BY srcip` - /// query, which selects on `srcip` and aggregates over `dstip` (so - /// labels ⊊ metadata_columns). - /// - /// Observed log line that motivated this test: - /// error: Some(IllegalAggregationFn), - /// msg: Some("attempt to use illegal aggregation function CARDINALITY") - #[test] - fn test_count_distinct_passes_aggregation_allowlist() { - check_query( - "SELECT COUNT(DISTINCT L4) FROM cpu_usage \ - WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ - GROUP BY L1, L2, L3", - vec![QueryType::SpatioTemporal], - None, - ); - } - - /// Companion: when `GROUP BY` covers all metadata columns *except* the - /// distinct-target itself, the query is still SpatioTemporal — the - /// distinct-target is the value column, not a grouping label, so labels - /// always form a strict subset of metadata_columns. Guards against future - /// "treat L4 as both label and value" regressions in the classifier. - #[test] - fn test_count_distinct_with_full_remaining_labels_is_spatiotemporal() { - check_query( - "SELECT COUNT(DISTINCT L4) FROM cpu_usage \ - WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ - GROUP BY L1, L2, L3", - vec![QueryType::SpatioTemporal], - None, - ); - } - - /// Negative case: `COUNT(DISTINCT not_in_schema)` against a column that's - /// neither a value_column nor a metadata_column must still be rejected as - /// `InvalidValueCol`. The CARDINALITY relaxation widens what's *allowed* - /// (metadata columns) but doesn't disable the schema check entirely. - #[test] - fn test_count_distinct_unknown_column_still_rejected() { - check_query( - "SELECT COUNT(DISTINCT bogus_column) FROM cpu_usage \ - WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ - GROUP BY L1, L2, L3", - vec![], - Some(QueryError::InvalidValueCol), - ); - } - #[test] fn test_error_spatial_scrape_duration_too_small() { check_query( @@ -1107,8 +1040,7 @@ mod tests { // `COUNT(DISTINCT col)` must be normalised to a cardinality aggregation // (`AggregationInfo.name == "CARDINALITY"`) so the engine routes it to a // distinct-tracking sketch (SetAggregator / HLL) instead of a plain Count - // sketch. The parser today drops `DISTINCT` silently — a parser-level bug - // that would dispatch streaming counts as totals. + // sketch. #[test] fn test_count_distinct_single_column_maps_to_cardinality() { @@ -1240,4 +1172,49 @@ mod tests { ) .is_none()); } + + /// Matcher must accept parser-normalised `CARDINALITY` (not `IllegalAggregationFn`), + /// allow distinct targets in metadata_columns (e.g. `dstip`), and classify + /// `COUNT(DISTINCT col) GROUP BY