Skip to content

Commit da67cb2

Browse files
Adding COUNT(DISTINCT) SQL support with HLL precompute (asap_sketchlib) and CARDINALITY routing
1 parent e9f764b commit da67cb2

9 files changed

Lines changed: 1055 additions & 10 deletions

File tree

asap-common/dependencies/rs/asap_types/src/capability_matching.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ pub fn compatible_agg_types(stat: Statistic) -> &'static [AggregationType] {
3333
Statistic::Cardinality => &[
3434
AggregationType::SetAggregator,
3535
AggregationType::DeltaSetAggregator,
36+
// HLL is the single-population probabilistic alternative used by
37+
// `COUNT(DISTINCT col) GROUP BY …` queries. It backs the per-bucket
38+
// sketch directly (no paired key aggregation required) — see
39+
// `is_multi_population_value_type`, which excludes HLL.
40+
AggregationType::HLL,
3641
],
3742
Statistic::Topk => &[AggregationType::CountMinSketchWithHeap],
3843
}
@@ -753,6 +758,51 @@ mod tests {
753758
assert!(result.is_some());
754759
}
755760

761+
// --- cardinality / HLL ---
762+
763+
#[test]
764+
fn cardinality_matches_hll_single_population() {
765+
// `COUNT(DISTINCT col)` flows in as `Statistic::Cardinality`. An HLL config
766+
// alone must satisfy it without requiring any paired key aggregation —
767+
// HLL is a single-population value type (per grouping key bucket), unlike
768+
// SetAggregator which is a multi-population key tracker.
769+
let configs = single_config(make_config(
770+
42, "peers", "HLL", "", 1, "tumbling", &["srcip"], "",
771+
));
772+
let result = find_compatible_aggregation(
773+
&configs,
774+
&req(
775+
"peers",
776+
&[Statistic::Cardinality],
777+
Some(1_000),
778+
&["srcip"],
779+
"",
780+
),
781+
);
782+
let info = result.expect("HLL should serve Cardinality");
783+
assert_eq!(info.aggregation_id_for_value, 42);
784+
assert_eq!(info.aggregation_type_for_value, AggregationType::HLL);
785+
// Single-population: key agg falls through to the value config itself,
786+
// matching the KLL / Sum / MinMax pattern (no separate SetAggregator needed).
787+
assert_eq!(info.aggregation_id_for_key, 42);
788+
assert_eq!(info.aggregation_type_for_key, AggregationType::HLL);
789+
}
790+
791+
#[test]
792+
fn compatible_agg_types_cardinality_includes_hll() {
793+
// Direct unit test on the capability table: HLL must appear alongside the
794+
// existing exact-cardinality types so the SQL→engine path picks it up
795+
// without any further plumbing changes.
796+
let types = compatible_agg_types(Statistic::Cardinality);
797+
assert!(
798+
types.contains(&AggregationType::HLL),
799+
"compatible_agg_types(Cardinality) must include HLL; got {types:?}",
800+
);
801+
// Backwards compat: existing exact types stay supported.
802+
assert!(types.contains(&AggregationType::SetAggregator));
803+
assert!(types.contains(&AggregationType::DeltaSetAggregator));
804+
}
805+
756806
#[test]
757807
fn avg_different_windows_rejected() {
758808
let mut configs = HashMap::new();

asap-common/dependencies/rs/promql_utilities/src/query_logics/enums.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,9 @@ pub enum AggregationOperator {
185185
Min,
186186
Max,
187187
Topk,
188+
/// Distinct-value count. SQL-side normalisation maps `COUNT(DISTINCT col)` to
189+
/// the aggregation name "CARDINALITY", which lands here.
190+
Cardinality,
188191
}
189192

190193
impl AggregationOperator {
@@ -197,6 +200,7 @@ impl AggregationOperator {
197200
AggregationOperator::Min => "min",
198201
AggregationOperator::Max => "max",
199202
AggregationOperator::Topk => "topk",
203+
AggregationOperator::Cardinality => "cardinality",
200204
}
201205
}
202206

@@ -211,11 +215,21 @@ impl AggregationOperator {
211215
AggregationOperator::Min => vec![Statistic::Min],
212216
AggregationOperator::Max => vec![Statistic::Max],
213217
AggregationOperator::Topk => vec![Statistic::Topk],
218+
AggregationOperator::Cardinality => vec![Statistic::Cardinality],
214219
}
215220
}
216221

217222
pub fn as_str_slice() -> &'static [&'static str] {
218-
&["sum", "count", "avg", "quantile", "min", "max", "topk"]
223+
&[
224+
"sum",
225+
"count",
226+
"avg",
227+
"quantile",
228+
"min",
229+
"max",
230+
"topk",
231+
"cardinality",
232+
]
219233
}
220234
}
221235

@@ -236,6 +250,7 @@ impl FromStr for AggregationOperator {
236250
"min" => Ok(AggregationOperator::Min),
237251
"max" => Ok(AggregationOperator::Max),
238252
"topk" => Ok(AggregationOperator::Topk),
253+
"cardinality" => Ok(AggregationOperator::Cardinality),
239254
other => Err(format!("Unknown aggregation operator: '{other}'")),
240255
}
241256
}
@@ -251,6 +266,7 @@ impl AggregationOperator {
251266
| AggregationOperator::Count
252267
| AggregationOperator::Avg
253268
| AggregationOperator::Topk
269+
| AggregationOperator::Cardinality
254270
)
255271
}
256272
}
@@ -432,4 +448,20 @@ mod tests {
432448
assert_eq!(exact_back, QueryTreatmentType::Exact);
433449
assert_eq!(approximate_back, QueryTreatmentType::Approximate);
434450
}
451+
452+
#[test]
453+
fn test_aggregation_operator_cardinality_round_trip() {
454+
// The SQL parser normalises `COUNT(DISTINCT col)` to the aggregation name
455+
// "CARDINALITY"; `parse_single_statistic` then routes it through
456+
// `AggregationOperator::FromStr`. Without a Cardinality variant the lookup
457+
// returns Err and the query is rejected as "Unsupported statistic name".
458+
let op: AggregationOperator = "cardinality".parse().expect("cardinality should parse");
459+
assert_eq!(op, AggregationOperator::Cardinality);
460+
assert_eq!(op.to_statistics(), vec![Statistic::Cardinality]);
461+
assert_eq!(op.as_str(), "cardinality");
462+
// Case-insensitive (matches the existing pattern for all other operators).
463+
let op_upper: AggregationOperator =
464+
"CARDINALITY".parse().expect("CARDINALITY should parse");
465+
assert_eq!(op_upper, AggregationOperator::Cardinality);
466+
}
435467
}

asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,73 @@ mod tests {
595595
);
596596
}
597597

598+
/// Regression for three matcher-side gaps that surface together when the
599+
/// simple_engine SQL path runs an HLL `COUNT(DISTINCT)` query end-to-end:
600+
///
601+
/// 1. The parser correctly normalises `COUNT(DISTINCT col)` to the
602+
/// aggregation name `"CARDINALITY"`, but
603+
/// `SQLPatternMatcher::is_valid_aggregation` never gained `CARDINALITY`
604+
/// in its `legal_aggregations` set, so the validator rejected the query
605+
/// with `IllegalAggregationFn` before pattern matching ran.
606+
///
607+
/// 2. After fixing (1), `flatten_query_info` validates the aggregation's
608+
/// "value column" against `schema.is_valid_value_column`, which only
609+
/// knows table value columns. `COUNT(DISTINCT col)` legitimately targets
610+
/// metadata/label columns (e.g. `COUNT(DISTINCT dstip)`), so the
611+
/// validator rejected it with `InvalidValueCol`. The fix accepts
612+
/// metadata columns *only* for CARDINALITY.
613+
///
614+
/// 3. With both fixed, the query classifies as `SpatioTemporal` because
615+
/// `GROUP BY` only covers a subset of metadata columns — exactly the
616+
/// shape of the user's real `COUNT(DISTINCT dstip) GROUP BY srcip`
617+
/// query, which selects on `srcip` and aggregates over `dstip` (so
618+
/// labels ⊊ metadata_columns).
619+
///
620+
/// Observed log line that motivated this test:
621+
/// error: Some(IllegalAggregationFn),
622+
/// msg: Some("attempt to use illegal aggregation function CARDINALITY")
623+
#[test]
624+
fn test_count_distinct_passes_aggregation_allowlist() {
625+
check_query(
626+
"SELECT COUNT(DISTINCT L4) FROM cpu_usage \
627+
WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \
628+
GROUP BY L1, L2, L3",
629+
vec![QueryType::SpatioTemporal],
630+
None,
631+
);
632+
}
633+
634+
/// Companion: when `GROUP BY` covers all metadata columns *except* the
635+
/// distinct-target itself, the query is still SpatioTemporal — the
636+
/// distinct-target is the value column, not a grouping label, so labels
637+
/// always form a strict subset of metadata_columns. Guards against future
638+
/// "treat L4 as both label and value" regressions in the classifier.
639+
#[test]
640+
fn test_count_distinct_with_full_remaining_labels_is_spatiotemporal() {
641+
check_query(
642+
"SELECT COUNT(DISTINCT L4) FROM cpu_usage \
643+
WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \
644+
GROUP BY L1, L2, L3",
645+
vec![QueryType::SpatioTemporal],
646+
None,
647+
);
648+
}
649+
650+
/// Negative case: `COUNT(DISTINCT not_in_schema)` against a column that's
651+
/// neither a value_column nor a metadata_column must still be rejected as
652+
/// `InvalidValueCol`. The CARDINALITY relaxation widens what's *allowed*
653+
/// (metadata columns) but doesn't disable the schema check entirely.
654+
#[test]
655+
fn test_count_distinct_unknown_column_still_rejected() {
656+
check_query(
657+
"SELECT COUNT(DISTINCT bogus_column) FROM cpu_usage \
658+
WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \
659+
GROUP BY L1, L2, L3",
660+
vec![],
661+
Some(QueryError::InvalidValueCol),
662+
);
663+
}
664+
598665
#[test]
599666
fn test_error_spatial_scrape_duration_too_small() {
600667
check_query(
@@ -1034,4 +1101,143 @@ mod tests {
10341101
.unwrap();
10351102
assert!(incoming.matches_sql_pattern(&template));
10361103
}
1104+
1105+
// ── COUNT(DISTINCT col) support ──────────────────────────────────────────
1106+
//
1107+
// `COUNT(DISTINCT col)` must be normalised to a cardinality aggregation
1108+
// (`AggregationInfo.name == "CARDINALITY"`) so the engine routes it to a
1109+
// distinct-tracking sketch (SetAggregator / HLL) instead of a plain Count
1110+
// sketch. The parser today drops `DISTINCT` silently — a parser-level bug
1111+
// that would dispatch streaming counts as totals.
1112+
1113+
#[test]
1114+
fn test_count_distinct_single_column_maps_to_cardinality() {
1115+
// The structural signature of the user's COUNT(DISTINCT) query.
1116+
let q = parse_sql_query(
1117+
"SELECT L1, COUNT(DISTINCT L2) FROM cpu_usage \
1118+
WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \
1119+
GROUP BY L1",
1120+
)
1121+
.expect("COUNT(DISTINCT col) should parse");
1122+
assert_eq!(q.aggregation_info.get_name(), "CARDINALITY");
1123+
assert_eq!(q.aggregation_info.get_value_column_name(), "L2");
1124+
assert!(q.aggregation_info.get_args().is_empty());
1125+
assert!(q.labels.contains("L1"));
1126+
}
1127+
1128+
#[test]
1129+
fn test_count_distinct_full_user_query_with_order_by_limit() {
1130+
// The exact shape of the user's HLL netflow query, ported to the test schema.
1131+
let q = parse_sql_query(
1132+
"SELECT L1, COUNT(DISTINCT L2) AS unique_peers FROM cpu_usage \
1133+
WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) \
1134+
GROUP BY L1 \
1135+
ORDER BY unique_peers DESC LIMIT 20",
1136+
)
1137+
.expect("COUNT(DISTINCT col) + ORDER BY + LIMIT should parse");
1138+
assert_eq!(q.aggregation_info.get_name(), "CARDINALITY");
1139+
assert_eq!(q.aggregation_info.get_value_column_name(), "L2");
1140+
assert_eq!(q.aggregation_alias.as_deref(), Some("unique_peers"));
1141+
assert_eq!(q.order_by.len(), 1);
1142+
assert_eq!(q.order_by[0].column, "unique_peers");
1143+
assert!(!q.order_by[0].ascending);
1144+
assert_eq!(q.limit, Some(20));
1145+
}
1146+
1147+
#[test]
1148+
fn test_count_distinct_matches_count_distinct_template() {
1149+
// Pattern matching: incoming COUNT(DISTINCT col) with absolute timestamps must
1150+
// match a NOW()-relative COUNT(DISTINCT col) template.
1151+
let template = parse_sql_query(
1152+
"SELECT COUNT(DISTINCT L2) FROM cpu_usage \
1153+
WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \
1154+
GROUP BY L1",
1155+
)
1156+
.unwrap();
1157+
let incoming = parse_sql_query(
1158+
"SELECT COUNT(DISTINCT L2) FROM cpu_usage \
1159+
WHERE time BETWEEN DATEADD(s, -10, '2025-10-01 00:00:10') AND '2025-10-01 00:00:10' \
1160+
GROUP BY L1",
1161+
)
1162+
.unwrap();
1163+
assert!(incoming.matches_sql_pattern(&template));
1164+
}
1165+
1166+
#[test]
1167+
fn test_count_distinct_does_not_match_plain_count_template() {
1168+
// CARDINALITY and COUNT are distinct aggregations — a COUNT(DISTINCT col)
1169+
// template must not be served by an incoming COUNT(col) query (and vice versa).
1170+
let count_distinct = parse_sql_query(
1171+
"SELECT COUNT(DISTINCT L2) FROM cpu_usage \
1172+
WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \
1173+
GROUP BY L1",
1174+
)
1175+
.unwrap();
1176+
let plain_count = parse_sql_query(
1177+
"SELECT COUNT(L2) FROM cpu_usage \
1178+
WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \
1179+
GROUP BY L1",
1180+
)
1181+
.unwrap();
1182+
assert!(!plain_count.matches_sql_pattern(&count_distinct));
1183+
assert!(!count_distinct.matches_sql_pattern(&plain_count));
1184+
}
1185+
1186+
#[test]
1187+
fn test_count_all_treated_as_plain_count() {
1188+
// The redundant explicit `ALL` modifier (the SQL default) must NOT switch the
1189+
// aggregation to CARDINALITY; only `DISTINCT` triggers cardinality semantics.
1190+
let q = parse_sql_query(
1191+
"SELECT COUNT(ALL L2) FROM cpu_usage \
1192+
WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \
1193+
GROUP BY L1",
1194+
)
1195+
.expect("COUNT(ALL col) should parse as plain COUNT");
1196+
assert_eq!(q.aggregation_info.get_name(), "COUNT");
1197+
}
1198+
1199+
#[test]
1200+
fn test_count_without_distinct_remains_count() {
1201+
// Regression guard: ensure the DISTINCT-aware path doesn't accidentally rewrite
1202+
// `COUNT(col)` (without any duplicate_treatment).
1203+
let q = parse_sql_query(
1204+
"SELECT COUNT(L2) FROM cpu_usage \
1205+
WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \
1206+
GROUP BY L1",
1207+
)
1208+
.expect("COUNT(col) should parse");
1209+
assert_eq!(q.aggregation_info.get_name(), "COUNT");
1210+
}
1211+
1212+
#[test]
1213+
fn test_count_distinct_multiple_columns_rejected() {
1214+
// Multi-column DISTINCT (`COUNT(DISTINCT a, b)`) is a compound-key cardinality
1215+
// that the structural model can't represent with a single value_column. Reject
1216+
// it explicitly rather than silently keeping only the first argument.
1217+
assert!(parse_sql_query(
1218+
"SELECT COUNT(DISTINCT L1, L2) FROM cpu_usage \
1219+
WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \
1220+
GROUP BY L3",
1221+
)
1222+
.is_none());
1223+
}
1224+
1225+
#[test]
1226+
fn test_distinct_on_non_count_aggregate_rejected() {
1227+
// DISTINCT on aggregates other than COUNT (e.g. `SUM(DISTINCT v)`, `AVG(DISTINCT v)`)
1228+
// is not modelled by any precompute sketch type; reject rather than silently
1229+
// dropping the modifier and dispatching to a plain Sum.
1230+
assert!(parse_sql_query(
1231+
"SELECT SUM(DISTINCT value) FROM cpu_usage \
1232+
WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \
1233+
GROUP BY L1",
1234+
)
1235+
.is_none());
1236+
assert!(parse_sql_query(
1237+
"SELECT AVG(DISTINCT value) FROM cpu_usage \
1238+
WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \
1239+
GROUP BY L1",
1240+
)
1241+
.is_none());
1242+
}
10371243
}

asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_matcher.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,12 @@ impl SQLPatternMatcher {
102102
legal_aggregations.insert("MIN");
103103
legal_aggregations.insert("MAX");
104104
legal_aggregations.insert("QUANTILE");
105+
// `COUNT(DISTINCT col)` is normalised by the parser to the aggregation
106+
// name "CARDINALITY" (see `SQLPatternParser::get_aggregation`). Without
107+
// this entry the simple_engine SQL handler rejects every COUNT(DISTINCT)
108+
// query with `IllegalAggregationFn` before pattern matching runs, which
109+
// in turn blocks routing to the precompute engine's HLL accumulator.
110+
legal_aggregations.insert("CARDINALITY");
105111

106112
Self {
107113
schema,
@@ -176,10 +182,22 @@ impl SQLPatternMatcher {
176182
}
177183

178184
let value_column_name = query.aggregation_info.get_value_column_name();
179-
if !self
180-
.schema
181-
.is_valid_value_column(&query.metric, value_column_name)
182-
{
185+
// `COUNT(DISTINCT col)` (normalised to "CARDINALITY") legitimately
186+
// targets metadata/label columns (e.g. `COUNT(DISTINCT dstip)`),
187+
// which the schema lists under metadata_columns rather than
188+
// value_columns. Accept either bucket for CARDINALITY; for all
189+
// other aggregations keep the strict value_columns-only check.
190+
let column_is_known = if query.aggregation_info.get_name() == "CARDINALITY" {
191+
self.schema
192+
.is_valid_value_column(&query.metric, value_column_name)
193+
|| self.schema.get_metadata_columns(&query.metric).is_some_and(
194+
|cols| cols.contains(value_column_name),
195+
)
196+
} else {
197+
self.schema
198+
.is_valid_value_column(&query.metric, value_column_name)
199+
};
200+
if !column_is_known {
183201
println!("Returned QueryError::InvalidValueCol");
184202

185203
return Err((

0 commit comments

Comments
 (0)