diff --git a/asap-common/dependencies/rs/promql_utilities/src/query_logics/logics.rs b/asap-common/dependencies/rs/promql_utilities/src/query_logics/logics.rs index d9eb4cad..7199f52b 100644 --- a/asap-common/dependencies/rs/promql_utilities/src/query_logics/logics.rs +++ b/asap-common/dependencies/rs/promql_utilities/src/query_logics/logics.rs @@ -50,7 +50,7 @@ pub fn map_statistic_to_precompute_operator( Ok((AggregationType::MultipleIncrease, "".to_string())) } Statistic::Topk => Ok((AggregationType::CountMinSketchWithHeap, "topk".to_string())), - _ => Err(format!("Statistic {statistic:?} not supported")), + Statistic::Cardinality => Ok((AggregationType::HLL, "".to_string())), } } @@ -82,6 +82,8 @@ pub fn does_precompute_operator_support_subpopulations( // CountMinSketchWithHeap is only supported for Topk — does not support subpopulations AggregationType::CountMinSketchWithHeap if matches!(statistic, Statistic::Topk) => false, + AggregationType::HLL => false, + // Default: not supported _ => panic!("Unexpected precompute operator: {}", precompute_operator), } @@ -169,6 +171,16 @@ mod tests { )); } + #[test] + fn test_cardinality_maps_to_hll() { + let result = map_statistic_to_precompute_operator( + Statistic::Cardinality, + QueryTreatmentType::Approximate, + ) + .unwrap(); + assert_eq!(result, (AggregationType::HLL, "".to_string())); + } + #[test] fn test_topk_maps_to_count_min_sketch_with_heap() { let result = diff --git a/asap-planner-rs/src/config/input.rs b/asap-planner-rs/src/config/input.rs index 509b11b1..bba96cab 100644 --- a/asap-planner-rs/src/config/input.rs +++ b/asap-planner-rs/src/config/input.rs @@ -86,6 +86,8 @@ pub struct SketchParameterOverrides { pub datasketches_kll: Option, #[serde(rename = "HydraKLL")] pub hydra_kll: Option, + #[serde(rename = "HLL")] + pub hll: Option, } #[derive(Debug, Clone, Deserialize)] @@ -114,6 +116,11 @@ pub struct HydraParams { pub k: u64, } +#[derive(Debug, Clone, Deserialize)] +pub struct HllParams { + pub precision: u64, +} + #[derive(Debug, Clone, Deserialize)] pub struct SQLControllerConfig { pub query_groups: Vec, diff --git a/asap-planner-rs/src/planner/sketch.rs b/asap-planner-rs/src/planner/sketch.rs index 40310bc7..c0acd5c9 100644 --- a/asap-planner-rs/src/planner/sketch.rs +++ b/asap-planner-rs/src/planner/sketch.rs @@ -11,6 +11,7 @@ const DEFAULT_KLL_K: u64 = 500; const DEFAULT_HYDRA_ROW: u64 = 3; const DEFAULT_HYDRA_COL: u64 = 1024; const DEFAULT_HYDRA_K: u64 = 20; +const DEFAULT_HLL_PRECISION: u64 = 14; /// Shared sketch parameter builder used by both PromQL and SQL paths. /// @@ -89,6 +90,19 @@ pub fn build_sketch_parameters( Ok(m) } + AggregationType::HLL => { + let precision = sketch_params + .and_then(|p| p.hll.as_ref()) + .map(|p| p.precision) + .unwrap_or(DEFAULT_HLL_PRECISION); + let mut m = HashMap::new(); + m.insert( + "precision".to_string(), + serde_json::Value::Number(precision.into()), + ); + Ok(m) + } + AggregationType::HydraKLL => { let row_num = sketch_params .and_then(|p| p.hydra_kll.as_ref()) diff --git a/asap-planner-rs/src/planner/sql.rs b/asap-planner-rs/src/planner/sql.rs index 4bcb2b95..1c0c7c0c 100644 --- a/asap-planner-rs/src/planner/sql.rs +++ b/asap-planner-rs/src/planner/sql.rs @@ -105,10 +105,14 @@ impl SQLSingleQueryProcessor { // Label routing let spatial_output = KeyByLabelNames::new(labels.iter().cloned().collect::>()); - let rollup = all_metadata.difference(&spatial_output); - let treatment_type = get_sql_treatment_type(agg_info.get_name()); let statistics = get_sql_statistics(agg_info.get_name())?; + let rollup = if statistics.contains(&Statistic::Cardinality) { + // Distinct target is value_column, not a rollup label dimension. + KeyByLabelNames::empty() + } else { + all_metadata.difference(&spatial_output) + }; let configs = build_agg_configs_for_statistics( &statistics, @@ -179,6 +183,7 @@ fn get_sql_statistics(name: &str) -> Result, ControllerError> { "AVG" => Ok(vec![Statistic::Sum, Statistic::Count]), "MIN" => Ok(vec![Statistic::Min]), "MAX" => Ok(vec![Statistic::Max]), + "CARDINALITY" => Ok(vec![Statistic::Cardinality]), other => Err(ControllerError::SqlParse(format!( "Unsupported aggregation: {}", other diff --git a/asap-planner-rs/tests/sql_integration.rs b/asap-planner-rs/tests/sql_integration.rs index 2f48f1b4..242bef86 100644 --- a/asap-planner-rs/tests/sql_integration.rs +++ b/asap-planner-rs/tests/sql_integration.rs @@ -407,6 +407,75 @@ fn temporal_quantile_cast_datetime_bounds() { assert_eq!(out.inference_cleanup_param(q), Some(1)); } +// ── COUNT(DISTINCT) / HLL (spatial 1 s) ─────────────────────────────────────── + +fn netflow_one_query_config(query: &str, t_repeat: u64) -> String { + format!( + r#" +tables: + - name: netflow_table + time_column: time + value_columns: [pkt_len, dstip] + metadata_columns: [srcip, dstip, proto] +query_groups: + - id: 1 + repetition_delay: {t_repeat} + controller_options: + accuracy_sla: 0.95 + latency_sla: 100.0 + queries: + - >- + {query} +aggregate_cleanup: + policy: no_cleanup +"# + ) +} + +fn sql_opts_1s_ingest() -> SQLRuntimeOptions { + SQLRuntimeOptions { + streaming_engine: StreamingEngine::Arroyo, + query_evaluation_time: Some(1_000_000.0), + data_ingestion_interval: 1, + } +} + +/// COUNT(DISTINCT dstip) GROUP BY srcip, 1 s window → HLL, grouping [srcip], empty rollup. +#[test] +fn spatial_count_distinct_hll() { + let q = "SELECT srcip, COUNT(DISTINCT dstip) AS unique_peers FROM netflow_table WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) GROUP BY srcip"; + let out = SQLController::from_yaml(&netflow_one_query_config(q, 1), sql_opts_1s_ingest()) + .unwrap() + .generate() + .unwrap(); + + assert_eq!(out.streaming_aggregation_count(), 1); + assert_eq!(out.inference_query_count(), 1); + assert!(out.has_aggregation_type("HLL")); + assert!(!out.has_aggregation_type("DeltaSetAggregator")); + assert!(out.all_tumbling_window_sizes_eq(1)); + assert_eq!( + out.aggregation_table_name("HLL"), + Some("netflow_table".to_string()) + ); + assert_eq!( + out.aggregation_value_column("HLL"), + Some("dstip".to_string()) + ); + assert_eq!( + out.aggregation_labels("HLL", "grouping"), + vec!["srcip".to_string()] + ); + assert_eq!( + out.aggregation_labels("HLL", "rollup"), + Vec::::new() + ); + assert_eq!( + out.aggregation_labels("HLL", "aggregated"), + Vec::::new() + ); +} + // ── T-value variants for SUM (range = 300 s fixed) ─────────────────────────── // // These three tests use the same query and differ only in repetition_delay (T).