From a189712332adcb37c1a44422c7ab5d115fa7bc80 Mon Sep 17 00:00:00 2001 From: Akanksha Akkihal Date: Sun, 31 May 2026 18:13:29 +0000 Subject: [PATCH 1/7] Add support for HLL-Cardinality inference/streaming config generation --- .../src/query_logics/logics.rs | 13 ++++ asap-planner-rs/src/config/input.rs | 7 +++ asap-planner-rs/src/planner/sketch.rs | 14 +++++ asap-planner-rs/src/planner/sql.rs | 8 ++- asap-planner-rs/tests/sql_integration.rs | 63 +++++++++++++++++++ 5 files changed, 104 insertions(+), 1 deletion(-) diff --git a/asap-common/dependencies/rs/promql_utilities/src/query_logics/logics.rs b/asap-common/dependencies/rs/promql_utilities/src/query_logics/logics.rs index d9eb4cad..79189a65 100644 --- a/asap-common/dependencies/rs/promql_utilities/src/query_logics/logics.rs +++ b/asap-common/dependencies/rs/promql_utilities/src/query_logics/logics.rs @@ -50,6 +50,7 @@ pub fn map_statistic_to_precompute_operator( Ok((AggregationType::MultipleIncrease, "".to_string())) } Statistic::Topk => Ok((AggregationType::CountMinSketchWithHeap, "topk".to_string())), + Statistic::Cardinality => Ok((AggregationType::HLL, "".to_string())), _ => Err(format!("Statistic {statistic:?} not supported")), } } @@ -82,6 +83,8 @@ pub fn does_precompute_operator_support_subpopulations( // CountMinSketchWithHeap is only supported for Topk — does not support subpopulations AggregationType::CountMinSketchWithHeap if matches!(statistic, Statistic::Topk) => false, + AggregationType::HLL => false, + // Default: not supported _ => panic!("Unexpected precompute operator: {}", precompute_operator), } @@ -169,6 +172,16 @@ mod tests { )); } + #[test] + fn test_cardinality_maps_to_hll() { + let result = map_statistic_to_precompute_operator( + Statistic::Cardinality, + QueryTreatmentType::Approximate, + ) + .unwrap(); + assert_eq!(result, (AggregationType::HLL, "".to_string())); + } + #[test] fn test_topk_maps_to_count_min_sketch_with_heap() { let result = diff --git a/asap-planner-rs/src/config/input.rs b/asap-planner-rs/src/config/input.rs index 509b11b1..bba96cab 100644 --- a/asap-planner-rs/src/config/input.rs +++ b/asap-planner-rs/src/config/input.rs @@ -86,6 +86,8 @@ pub struct SketchParameterOverrides { pub datasketches_kll: Option, #[serde(rename = "HydraKLL")] pub hydra_kll: Option, + #[serde(rename = "HLL")] + pub hll: Option, } #[derive(Debug, Clone, Deserialize)] @@ -114,6 +116,11 @@ pub struct HydraParams { pub k: u64, } +#[derive(Debug, Clone, Deserialize)] +pub struct HllParams { + pub precision: u64, +} + #[derive(Debug, Clone, Deserialize)] pub struct SQLControllerConfig { pub query_groups: Vec, diff --git a/asap-planner-rs/src/planner/sketch.rs b/asap-planner-rs/src/planner/sketch.rs index 40310bc7..c0acd5c9 100644 --- a/asap-planner-rs/src/planner/sketch.rs +++ b/asap-planner-rs/src/planner/sketch.rs @@ -11,6 +11,7 @@ const DEFAULT_KLL_K: u64 = 500; const DEFAULT_HYDRA_ROW: u64 = 3; const DEFAULT_HYDRA_COL: u64 = 1024; const DEFAULT_HYDRA_K: u64 = 20; +const DEFAULT_HLL_PRECISION: u64 = 14; /// Shared sketch parameter builder used by both PromQL and SQL paths. /// @@ -89,6 +90,19 @@ pub fn build_sketch_parameters( Ok(m) } + AggregationType::HLL => { + let precision = sketch_params + .and_then(|p| p.hll.as_ref()) + .map(|p| p.precision) + .unwrap_or(DEFAULT_HLL_PRECISION); + let mut m = HashMap::new(); + m.insert( + "precision".to_string(), + serde_json::Value::Number(precision.into()), + ); + Ok(m) + } + AggregationType::HydraKLL => { let row_num = sketch_params .and_then(|p| p.hydra_kll.as_ref()) diff --git a/asap-planner-rs/src/planner/sql.rs b/asap-planner-rs/src/planner/sql.rs index 4bcb2b95..72ff999f 100644 --- a/asap-planner-rs/src/planner/sql.rs +++ b/asap-planner-rs/src/planner/sql.rs @@ -105,7 +105,12 @@ impl SQLSingleQueryProcessor { // Label routing let spatial_output = KeyByLabelNames::new(labels.iter().cloned().collect::>()); - let rollup = all_metadata.difference(&spatial_output); + let rollup = if agg_info.get_name().eq_ignore_ascii_case("CARDINALITY") { + // Distinct target is value_column, not a rollup label dimension. + KeyByLabelNames::empty() + } else { + all_metadata.difference(&spatial_output) + }; let treatment_type = get_sql_treatment_type(agg_info.get_name()); let statistics = get_sql_statistics(agg_info.get_name())?; @@ -179,6 +184,7 @@ fn get_sql_statistics(name: &str) -> Result, ControllerError> { "AVG" => Ok(vec![Statistic::Sum, Statistic::Count]), "MIN" => Ok(vec![Statistic::Min]), "MAX" => Ok(vec![Statistic::Max]), + "CARDINALITY" => Ok(vec![Statistic::Cardinality]), other => Err(ControllerError::SqlParse(format!( "Unsupported aggregation: {}", other diff --git a/asap-planner-rs/tests/sql_integration.rs b/asap-planner-rs/tests/sql_integration.rs index 2f48f1b4..72dab0fe 100644 --- a/asap-planner-rs/tests/sql_integration.rs +++ b/asap-planner-rs/tests/sql_integration.rs @@ -407,6 +407,69 @@ fn temporal_quantile_cast_datetime_bounds() { assert_eq!(out.inference_cleanup_param(q), Some(1)); } +// ── COUNT(DISTINCT) / HLL (spatial 1 s) ─────────────────────────────────────── + +fn netflow_one_query_config(query: &str, t_repeat: u64) -> String { + format!( + r#" +tables: + - name: netflow_table + time_column: time + value_columns: [pkt_len, dstip] + metadata_columns: [srcip, dstip, proto] +query_groups: + - id: 1 + repetition_delay: {t_repeat} + controller_options: + accuracy_sla: 0.95 + latency_sla: 100.0 + queries: + - >- + {query} +aggregate_cleanup: + policy: no_cleanup +"# + ) +} + +fn sql_opts_1s_ingest() -> SQLRuntimeOptions { + SQLRuntimeOptions { + streaming_engine: StreamingEngine::Arroyo, + query_evaluation_time: Some(1_000_000.0), + data_ingestion_interval: 1, + } +} + +/// COUNT(DISTINCT dstip) GROUP BY srcip, 1 s window → HLL, grouping [srcip], empty rollup. +#[test] +fn spatial_count_distinct_hll() { + let q = "SELECT srcip, COUNT(DISTINCT dstip) AS unique_peers FROM netflow_table WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) GROUP BY srcip"; + let out = SQLController::from_yaml(&netflow_one_query_config(q, 1), sql_opts_1s_ingest()) + .unwrap() + .generate() + .unwrap(); + + assert_eq!(out.streaming_aggregation_count(), 1); + assert_eq!(out.inference_query_count(), 1); + assert!(out.has_aggregation_type("HLL")); + assert!(!out.has_aggregation_type("DeltaSetAggregator")); + assert!(out.all_tumbling_window_sizes_eq(1)); + assert_eq!( + out.aggregation_table_name("HLL"), + Some("netflow_table".to_string()) + ); + assert_eq!( + out.aggregation_value_column("HLL"), + Some("dstip".to_string()) + ); + assert_eq!( + out.aggregation_labels("HLL", "grouping"), + vec!["srcip".to_string()] + ); + assert_eq!(out.aggregation_labels("HLL", "rollup"), Vec::::new()); + assert_eq!(out.aggregation_labels("HLL", "aggregated"), Vec::::new()); +} + // ── T-value variants for SUM (range = 300 s fixed) ─────────────────────────── // // These three tests use the same query and differ only in repetition_delay (T). From 826ab636c28c1742531181b242ce2cad5f7f48c7 Mon Sep 17 00:00:00 2001 From: Akanksha Akkihal Date: Sun, 31 May 2026 18:34:34 +0000 Subject: [PATCH 2/7] Formatting changes --- asap-planner-rs/tests/sql_integration.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/asap-planner-rs/tests/sql_integration.rs b/asap-planner-rs/tests/sql_integration.rs index 72dab0fe..242bef86 100644 --- a/asap-planner-rs/tests/sql_integration.rs +++ b/asap-planner-rs/tests/sql_integration.rs @@ -466,8 +466,14 @@ fn spatial_count_distinct_hll() { out.aggregation_labels("HLL", "grouping"), vec!["srcip".to_string()] ); - assert_eq!(out.aggregation_labels("HLL", "rollup"), Vec::::new()); - assert_eq!(out.aggregation_labels("HLL", "aggregated"), Vec::::new()); + assert_eq!( + out.aggregation_labels("HLL", "rollup"), + Vec::::new() + ); + assert_eq!( + out.aggregation_labels("HLL", "aggregated"), + Vec::::new() + ); } // ── T-value variants for SUM (range = 300 s fixed) ─────────────────────────── From 6de8be6683b9988d97c4825c5e591b9132223ca9 Mon Sep 17 00:00:00 2001 From: Akanksha Akkihal Date: Sun, 31 May 2026 18:39:04 +0000 Subject: [PATCH 3/7] Fixed clippy warnings --- .../dependencies/rs/promql_utilities/src/query_logics/logics.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/asap-common/dependencies/rs/promql_utilities/src/query_logics/logics.rs b/asap-common/dependencies/rs/promql_utilities/src/query_logics/logics.rs index 79189a65..7199f52b 100644 --- a/asap-common/dependencies/rs/promql_utilities/src/query_logics/logics.rs +++ b/asap-common/dependencies/rs/promql_utilities/src/query_logics/logics.rs @@ -51,7 +51,6 @@ pub fn map_statistic_to_precompute_operator( } Statistic::Topk => Ok((AggregationType::CountMinSketchWithHeap, "topk".to_string())), Statistic::Cardinality => Ok((AggregationType::HLL, "".to_string())), - _ => Err(format!("Statistic {statistic:?} not supported")), } } From 2a60de71509117eda6b46ab0e4fedffe327bc048 Mon Sep 17 00:00:00 2001 From: Akanksha Akkihal Date: Mon, 1 Jun 2026 13:18:59 +0000 Subject: [PATCH 4/7] Preserved Error condition --- .../dependencies/rs/promql_utilities/src/query_logics/logics.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/asap-common/dependencies/rs/promql_utilities/src/query_logics/logics.rs b/asap-common/dependencies/rs/promql_utilities/src/query_logics/logics.rs index 7199f52b..79189a65 100644 --- a/asap-common/dependencies/rs/promql_utilities/src/query_logics/logics.rs +++ b/asap-common/dependencies/rs/promql_utilities/src/query_logics/logics.rs @@ -51,6 +51,7 @@ pub fn map_statistic_to_precompute_operator( } Statistic::Topk => Ok((AggregationType::CountMinSketchWithHeap, "topk".to_string())), Statistic::Cardinality => Ok((AggregationType::HLL, "".to_string())), + _ => Err(format!("Statistic {statistic:?} not supported")), } } From ed427f22b02403a6674346f903998d4e53bbd2e9 Mon Sep 17 00:00:00 2001 From: Akanksha Akkihal Date: Mon, 1 Jun 2026 13:24:10 +0000 Subject: [PATCH 5/7] Preserving error codition --- .../dependencies/rs/promql_utilities/src/query_logics/logics.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/asap-common/dependencies/rs/promql_utilities/src/query_logics/logics.rs b/asap-common/dependencies/rs/promql_utilities/src/query_logics/logics.rs index 79189a65..c821e8a6 100644 --- a/asap-common/dependencies/rs/promql_utilities/src/query_logics/logics.rs +++ b/asap-common/dependencies/rs/promql_utilities/src/query_logics/logics.rs @@ -51,6 +51,7 @@ pub fn map_statistic_to_precompute_operator( } Statistic::Topk => Ok((AggregationType::CountMinSketchWithHeap, "topk".to_string())), Statistic::Cardinality => Ok((AggregationType::HLL, "".to_string())), + #[allow(unreachable_patterns)] _ => Err(format!("Statistic {statistic:?} not supported")), } } From 662e1d854f0ae083725859ae7ec7021b0a072062 Mon Sep 17 00:00:00 2001 From: Akanksha Akkihal Date: Mon, 1 Jun 2026 13:34:24 +0000 Subject: [PATCH 6/7] addressing PR comments --- asap-planner-rs/src/planner/sql.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/asap-planner-rs/src/planner/sql.rs b/asap-planner-rs/src/planner/sql.rs index 72ff999f..1c0c7c0c 100644 --- a/asap-planner-rs/src/planner/sql.rs +++ b/asap-planner-rs/src/planner/sql.rs @@ -105,16 +105,15 @@ impl SQLSingleQueryProcessor { // Label routing let spatial_output = KeyByLabelNames::new(labels.iter().cloned().collect::>()); - let rollup = if agg_info.get_name().eq_ignore_ascii_case("CARDINALITY") { + let treatment_type = get_sql_treatment_type(agg_info.get_name()); + let statistics = get_sql_statistics(agg_info.get_name())?; + let rollup = if statistics.contains(&Statistic::Cardinality) { // Distinct target is value_column, not a rollup label dimension. KeyByLabelNames::empty() } else { all_metadata.difference(&spatial_output) }; - let treatment_type = get_sql_treatment_type(agg_info.get_name()); - let statistics = get_sql_statistics(agg_info.get_name())?; - let configs = build_agg_configs_for_statistics( &statistics, treatment_type, From 434e794cd38561f33ffaabd62f79ae8298447983 Mon Sep 17 00:00:00 2001 From: Akanksha Akkihal Date: Mon, 1 Jun 2026 13:38:05 +0000 Subject: [PATCH 7/7] Removing unreachable code --- .../dependencies/rs/promql_utilities/src/query_logics/logics.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/asap-common/dependencies/rs/promql_utilities/src/query_logics/logics.rs b/asap-common/dependencies/rs/promql_utilities/src/query_logics/logics.rs index c821e8a6..7199f52b 100644 --- a/asap-common/dependencies/rs/promql_utilities/src/query_logics/logics.rs +++ b/asap-common/dependencies/rs/promql_utilities/src/query_logics/logics.rs @@ -51,8 +51,6 @@ pub fn map_statistic_to_precompute_operator( } Statistic::Topk => Ok((AggregationType::CountMinSketchWithHeap, "topk".to_string())), Statistic::Cardinality => Ok((AggregationType::HLL, "".to_string())), - #[allow(unreachable_patterns)] - _ => Err(format!("Statistic {statistic:?} not supported")), } }