diff --git a/asap-common/dependencies/rs/asap_types/src/capability_matching.rs b/asap-common/dependencies/rs/asap_types/src/capability_matching.rs index 570c0854..8a64f1fd 100644 --- a/asap-common/dependencies/rs/asap_types/src/capability_matching.rs +++ b/asap-common/dependencies/rs/asap_types/src/capability_matching.rs @@ -33,6 +33,7 @@ pub fn compatible_agg_types(stat: Statistic) -> &'static [AggregationType] { Statistic::Cardinality => &[ AggregationType::SetAggregator, AggregationType::DeltaSetAggregator, + AggregationType::HLL, ], Statistic::Topk => &[AggregationType::CountMinSketchWithHeap], } @@ -753,6 +754,58 @@ mod tests { assert!(result.is_some()); } + // --- cardinality / HLL --- + + #[test] + fn cardinality_matches_hll_single_population() { + // `COUNT(DISTINCT col)` flows in as `Statistic::Cardinality`. An HLL config + // alone must satisfy it without requiring any paired key aggregation — + // HLL is a single-population value type (per grouping key bucket), unlike + // SetAggregator which is a multi-population key tracker. + let configs = single_config(make_config( + 42, + "peers", + "HLL", + "", + 1, + "tumbling", + &["srcip"], + "", + )); + let result = find_compatible_aggregation( + &configs, + &req( + "peers", + &[Statistic::Cardinality], + Some(1_000), + &["srcip"], + "", + ), + ); + let info = result.expect("HLL should serve Cardinality"); + assert_eq!(info.aggregation_id_for_value, 42); + assert_eq!(info.aggregation_type_for_value, AggregationType::HLL); + // Single-population: key agg falls through to the value config itself, + // matching the KLL / Sum / MinMax pattern (no separate SetAggregator needed). + assert_eq!(info.aggregation_id_for_key, 42); + assert_eq!(info.aggregation_type_for_key, AggregationType::HLL); + } + + #[test] + fn compatible_agg_types_cardinality_includes_hll() { + // Direct unit test on the capability table: HLL must appear alongside the + // existing exact-cardinality types so the SQL→engine path picks it up + // without any further plumbing changes. + let types = compatible_agg_types(Statistic::Cardinality); + assert!( + types.contains(&AggregationType::HLL), + "compatible_agg_types(Cardinality) must include HLL; got {types:?}", + ); + // Backwards compat: existing exact types stay supported. + assert!(types.contains(&AggregationType::SetAggregator)); + assert!(types.contains(&AggregationType::DeltaSetAggregator)); + } + #[test] fn avg_different_windows_rejected() { let mut configs = HashMap::new(); diff --git a/asap-common/dependencies/rs/promql_utilities/src/query_logics/enums.rs b/asap-common/dependencies/rs/promql_utilities/src/query_logics/enums.rs index 05b34353..977dfd47 100644 --- a/asap-common/dependencies/rs/promql_utilities/src/query_logics/enums.rs +++ b/asap-common/dependencies/rs/promql_utilities/src/query_logics/enums.rs @@ -185,6 +185,7 @@ pub enum AggregationOperator { Min, Max, Topk, + Cardinality, } impl AggregationOperator { @@ -197,6 +198,7 @@ impl AggregationOperator { AggregationOperator::Min => "min", AggregationOperator::Max => "max", AggregationOperator::Topk => "topk", + AggregationOperator::Cardinality => "cardinality", } } @@ -211,11 +213,21 @@ impl AggregationOperator { AggregationOperator::Min => vec![Statistic::Min], AggregationOperator::Max => vec![Statistic::Max], AggregationOperator::Topk => vec![Statistic::Topk], + AggregationOperator::Cardinality => vec![Statistic::Cardinality], } } pub fn as_str_slice() -> &'static [&'static str] { - &["sum", "count", "avg", "quantile", "min", "max", "topk"] + &[ + "sum", + "count", + "avg", + "quantile", + "min", + "max", + "topk", + "cardinality", + ] } } @@ -236,6 +248,7 @@ impl FromStr for AggregationOperator { "min" => Ok(AggregationOperator::Min), "max" => Ok(AggregationOperator::Max), "topk" => Ok(AggregationOperator::Topk), + "cardinality" => Ok(AggregationOperator::Cardinality), other => Err(format!("Unknown aggregation operator: '{other}'")), } } @@ -251,6 +264,7 @@ impl AggregationOperator { | AggregationOperator::Count | AggregationOperator::Avg | AggregationOperator::Topk + | AggregationOperator::Cardinality ) } } @@ -432,4 +446,19 @@ mod tests { assert_eq!(exact_back, QueryTreatmentType::Exact); assert_eq!(approximate_back, QueryTreatmentType::Approximate); } + + #[test] + fn test_aggregation_operator_cardinality_round_trip() { + // The SQL parser normalises `COUNT(DISTINCT col)` to the aggregation name + // "CARDINALITY"; `parse_single_statistic` then routes it through + // `AggregationOperator::FromStr`. + let op: AggregationOperator = "cardinality".parse().expect("cardinality should parse"); + assert_eq!(op, AggregationOperator::Cardinality); + assert_eq!(op.to_statistics(), vec![Statistic::Cardinality]); + assert_eq!(op.as_str(), "cardinality"); + // Case-insensitive (matches the existing pattern for all other operators). + let op_upper: AggregationOperator = + "CARDINALITY".parse().expect("CARDINALITY should parse"); + assert_eq!(op_upper, AggregationOperator::Cardinality); + } } diff --git a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs index eb76dd5e..6a293d84 100644 --- a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs +++ b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs @@ -1034,4 +1034,187 @@ mod tests { .unwrap(); assert!(incoming.matches_sql_pattern(&template)); } + + // ── COUNT(DISTINCT col) support ────────────────────────────────────────── + // + // `COUNT(DISTINCT col)` must be normalised to a cardinality aggregation + // (`AggregationInfo.name == "CARDINALITY"`) so the engine routes it to a + // distinct-tracking sketch (SetAggregator / HLL) instead of a plain Count + // sketch. + + #[test] + fn test_count_distinct_single_column_maps_to_cardinality() { + // The structural signature of the user's COUNT(DISTINCT) query. + let q = parse_sql_query( + "SELECT L1, COUNT(DISTINCT L2) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1", + ) + .expect("COUNT(DISTINCT col) should parse"); + assert_eq!(q.aggregation_info.get_name(), "CARDINALITY"); + assert_eq!(q.aggregation_info.get_value_column_name(), "L2"); + assert!(q.aggregation_info.get_args().is_empty()); + assert!(q.labels.contains("L1")); + } + + #[test] + fn test_count_distinct_full_user_query_with_order_by_limit() { + // The exact shape of the user's HLL netflow query, ported to the test schema. + let q = parse_sql_query( + "SELECT L1, COUNT(DISTINCT L2) AS unique_peers FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) \ + GROUP BY L1 \ + ORDER BY unique_peers DESC LIMIT 20", + ) + .expect("COUNT(DISTINCT col) + ORDER BY + LIMIT should parse"); + assert_eq!(q.aggregation_info.get_name(), "CARDINALITY"); + assert_eq!(q.aggregation_info.get_value_column_name(), "L2"); + assert_eq!(q.aggregation_alias.as_deref(), Some("unique_peers")); + assert_eq!(q.order_by.len(), 1); + assert_eq!(q.order_by[0].column, "unique_peers"); + assert!(!q.order_by[0].ascending); + assert_eq!(q.limit, Some(20)); + } + + #[test] + fn test_count_distinct_matches_count_distinct_template() { + // Pattern matching: incoming COUNT(DISTINCT col) with absolute timestamps must + // match a NOW()-relative COUNT(DISTINCT col) template. + let template = parse_sql_query( + "SELECT COUNT(DISTINCT L2) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1", + ) + .unwrap(); + let incoming = parse_sql_query( + "SELECT COUNT(DISTINCT L2) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, '2025-10-01 00:00:10') AND '2025-10-01 00:00:10' \ + GROUP BY L1", + ) + .unwrap(); + assert!(incoming.matches_sql_pattern(&template)); + } + + #[test] + fn test_count_distinct_does_not_match_plain_count_template() { + // CARDINALITY and COUNT are distinct aggregations — a COUNT(DISTINCT col) + // template must not be served by an incoming COUNT(col) query (and vice versa). + let count_distinct = parse_sql_query( + "SELECT COUNT(DISTINCT L2) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1", + ) + .unwrap(); + let plain_count = parse_sql_query( + "SELECT COUNT(L2) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1", + ) + .unwrap(); + assert!(!plain_count.matches_sql_pattern(&count_distinct)); + assert!(!count_distinct.matches_sql_pattern(&plain_count)); + } + + #[test] + fn test_count_all_treated_as_plain_count() { + // The redundant explicit `ALL` modifier (the SQL default) must NOT switch the + // aggregation to CARDINALITY; only `DISTINCT` triggers cardinality semantics. + let q = parse_sql_query( + "SELECT COUNT(ALL L2) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1", + ) + .expect("COUNT(ALL col) should parse as plain COUNT"); + assert_eq!(q.aggregation_info.get_name(), "COUNT"); + } + + #[test] + fn test_count_without_distinct_remains_count() { + // Regression guard: ensure the DISTINCT-aware path doesn't accidentally rewrite + // `COUNT(col)` (without any duplicate_treatment). + let q = parse_sql_query( + "SELECT COUNT(L2) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1", + ) + .expect("COUNT(col) should parse"); + assert_eq!(q.aggregation_info.get_name(), "COUNT"); + } + + #[test] + fn test_count_distinct_multiple_columns_rejected() { + // Multi-column DISTINCT (`COUNT(DISTINCT a, b)`) is a compound-key cardinality + // that the structural model can't represent with a single value_column. Reject + // it explicitly rather than silently keeping only the first argument. + assert!(parse_sql_query( + "SELECT COUNT(DISTINCT L1, L2) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L3", + ) + .is_none()); + } + + #[test] + fn test_distinct_on_non_count_aggregate_rejected() { + // DISTINCT on aggregates other than COUNT (e.g. `SUM(DISTINCT v)`, `AVG(DISTINCT v)`) + // is not modelled by any precompute sketch type; reject rather than silently + // dropping the modifier and dispatching to a plain Sum. + assert!(parse_sql_query( + "SELECT SUM(DISTINCT value) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1", + ) + .is_none()); + assert!(parse_sql_query( + "SELECT AVG(DISTINCT value) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1", + ) + .is_none()); + } + + /// Matcher must accept parser-normalised `CARDINALITY` (not `IllegalAggregationFn`), + /// allow distinct targets in metadata_columns (e.g. `dstip`), and classify + /// `COUNT(DISTINCT col) GROUP BY