diff --git a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/mod.rs b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/mod.rs index 74fd48f..7820667 100644 --- a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/mod.rs +++ b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/mod.rs @@ -3,6 +3,6 @@ pub mod sqlparser_test; pub mod sqlpattern_matcher; pub mod sqlpattern_parser; -pub use sqlhelper::{SQLSchema, Table}; +pub use sqlhelper::{detect_sql_topk, SQLSchema, SqlTopk, Table, TopkWeighting}; pub use sqlpattern_matcher::*; pub use sqlpattern_parser::*; diff --git a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlhelper.rs b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlhelper.rs index 14e3824..7ec90f3 100644 --- a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlhelper.rs +++ b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlhelper.rs @@ -222,3 +222,92 @@ impl SQLQueryData { } } } + +/// How a top-k query weights each observation fed into the heavy-hitter sketch. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TopkWeighting { + /// `COUNT(col)`: every matching row contributes weight 1, so the heap ranks + /// keys by event frequency (`count_events: true`). + Count, + /// `SUM(col)`: every matching row contributes weight = `col`, so the heap + /// ranks keys by summed value (`count_events: false`). + /// + /// Assumes **non-negative** summands: `CountMinSketch` is a frequency sketch + /// and cannot represent negative weights, so a `SUM` over a column that can + /// go negative would produce meaningless estimates. + Sum, +} + +/// A detected SQL top-k query: the `LIMIT k` plus how the sketch is weighted. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct SqlTopk { + pub k: u64, + pub weighting: TopkWeighting, +} + +impl SqlTopk { + /// `count_events` flag the backing `CountMinSketchWithHeap` must use: + /// `true` for COUNT (unit weight), `false` for SUM (value weight). + pub fn count_events(&self) -> bool { + matches!(self.weighting, TopkWeighting::Count) + } +} + +/// Detect a SQL top-k query and return its `k` plus sketch weighting. +/// +/// Recognises the heavy-hitter shape that `CountMinSketchWithHeap` serves: +/// +/// ```sql +/// SELECT , COUNT() AS -- or SUM() +/// FROM WHERE <1s window> +/// GROUP BY +/// ORDER BY DESC +/// LIMIT k +/// ``` +/// +/// The grouping key (``) becomes the *aggregated* dimension inside the +/// sketch's heap — not a precompute partition key — so a single sketch per +/// window tracks the top keys by event count (COUNT) or summed value (SUM). +/// +/// The SQL parser only accepts identifier ORDER BY targets, so the descending +/// order must reference the aggregate's alias (e.g. `transfer_events`), not the +/// `COUNT(col)` / `SUM(col)` expression itself. +/// +/// This detection inspects a single SELECT layer only. For nested queries the +/// ORDER BY / LIMIT sit on the outer SELECT, which on its own matches this +/// shape; callers that must exclude nested patterns (e.g. spatial-over-temporal) +/// are responsible for gating before calling this (the query engine gates on +/// query pattern type; the planner rejects nested queries up front). +pub fn detect_sql_topk(query_data: &SQLQueryData) -> Option { + let k = query_data.limit?; + // LIMIT 0 is an empty-result query, not a top-k heavy-hitter request. + if k == 0 { + return None; + } + // Need a GROUP BY key to rank and an ORDER BY to define "top". + if query_data.labels.is_empty() || query_data.order_by.is_empty() { + return None; + } + // CountMinSketchWithHeap tracks heavy hitters by COUNT (unit weight) or + // SUM (value weight). Any other aggregate (MIN/MAX/quantile/...) cannot be + // served by the additive frequency sketch. + let name = query_data.aggregation_info.get_name(); + let weighting = if name.eq_ignore_ascii_case("count") { + TopkWeighting::Count + } else if name.eq_ignore_ascii_case("sum") { + TopkWeighting::Sum + } else { + return None; + }; + // Primary ordering must be the aggregate alias, descending (largest first). + let primary = &query_data.order_by[0]; + if primary.ascending { + return None; + } + // ORDER BY may differ only by identifier case for unquoted aliases. + let alias = query_data.aggregation_alias.as_deref()?; + if !alias.eq_ignore_ascii_case(primary.column.as_str()) { + return None; + } + Some(SqlTopk { k, weighting }) +} diff --git a/asap-planner-rs/src/planner/elastic_dsl.rs b/asap-planner-rs/src/planner/elastic_dsl.rs index 709f390..713218c 100644 --- a/asap-planner-rs/src/planner/elastic_dsl.rs +++ b/asap-planner-rs/src/planner/elastic_dsl.rs @@ -114,6 +114,7 @@ impl ElasticSingleQueryProcessor { agg_type, agg_sub_type, None, + None, self.sketch_parameters.as_ref(), ) }, diff --git a/asap-planner-rs/src/planner/sketch.rs b/asap-planner-rs/src/planner/sketch.rs index c0acd5c..766365b 100644 --- a/asap-planner-rs/src/planner/sketch.rs +++ b/asap-planner-rs/src/planner/sketch.rs @@ -15,13 +15,16 @@ const DEFAULT_HLL_PRECISION: u64 = 14; /// Shared sketch parameter builder used by both PromQL and SQL paths. /// -/// `topk_k` is only required for `CountMinSketchWithHeap`: PromQL supplies it -/// from the `topk(k, …)` query argument; SQL passes `None` (SQL never produces -/// this operator today, so the `None` branch is unreachable in practice). +/// `topk_k` is required for `CountMinSketchWithHeap`. PromQL supplies it from +/// the `topk(k, …)` query argument; SQL supplies it from `LIMIT k`. +/// +/// `topk_count_events` disambiguates COUNT vs SUM SQL top-k (`true` / `false`). +/// PromQL passes `None` and omits the parameter (defaults to count semantics). pub fn build_sketch_parameters( aggregation_type: AggregationType, aggregation_sub_type: &str, topk_k: Option, + topk_count_events: Option, sketch_params: Option<&SketchParameterOverrides>, ) -> Result, String> { match aggregation_type { @@ -77,6 +80,12 @@ pub fn build_sketch_parameters( "heapsize".to_string(), serde_json::Value::Number((k * heap_mult).into()), ); + if let Some(count_events) = topk_count_events { + m.insert( + "count_events".to_string(), + serde_json::Value::Bool(count_events), + ); + } Ok(m) } @@ -158,6 +167,7 @@ pub fn build_sketch_parameters_from_promql( aggregation_type, aggregation_sub_type, topk_k, + None, sketch_params, ) } diff --git a/asap-planner-rs/src/planner/sql.rs b/asap-planner-rs/src/planner/sql.rs index 1c0c7c0..523203f 100644 --- a/asap-planner-rs/src/planner/sql.rs +++ b/asap-planner-rs/src/planner/sql.rs @@ -3,7 +3,7 @@ use std::collections::HashSet; use asap_types::enums::{CleanupPolicy, WindowType}; use promql_utilities::data_model::KeyByLabelNames; use promql_utilities::query_logics::enums::{AggregationType, QueryTreatmentType, Statistic}; -use sql_utilities::ast_matching::sqlhelper::Table; +use sql_utilities::ast_matching::sqlhelper::{detect_sql_topk, Table}; use sql_utilities::ast_matching::sqlpattern_matcher::{QueryType, SQLPatternMatcher}; use sql_utilities::ast_matching::sqlpattern_parser::SQLPatternParser; use sql_utilities::ast_matching::SQLSchema; @@ -105,8 +105,15 @@ impl SQLSingleQueryProcessor { // Label routing let spatial_output = KeyByLabelNames::new(labels.iter().cloned().collect::>()); + // Top-k needs ORDER BY / LIMIT from the parser; SQLPatternMatcher drops them + // when building `sql_query.query_data[0]`, so use `qdata` not query_data[0]. + let sql_topk = detect_sql_topk(&qdata); let treatment_type = get_sql_treatment_type(agg_info.get_name()); - let statistics = get_sql_statistics(agg_info.get_name())?; + let statistics = if sql_topk.is_some() { + vec![Statistic::Topk] + } else { + get_sql_statistics(agg_info.get_name())? + }; let rollup = if statistics.contains(&Statistic::Cardinality) { // Distinct target is value_column, not a rollup label dimension. KeyByLabelNames::empty() @@ -114,7 +121,10 @@ impl SQLSingleQueryProcessor { all_metadata.difference(&spatial_output) }; - let configs = build_agg_configs_for_statistics( + let topk_k = sql_topk.map(|t| t.k); + let topk_count_events = sql_topk.map(|t| t.count_events()); + + let mut configs = build_agg_configs_for_statistics( &statistics, treatment_type, &spatial_output, @@ -128,13 +138,25 @@ impl SQLSingleQueryProcessor { build_sketch_parameters( agg_type, agg_sub_type, - None, + topk_k, + topk_count_events, self.sketch_parameters.as_ref(), ) }, ) .map_err(ControllerError::SqlParse)?; + if sql_topk.is_some() { + for cfg in &mut configs { + if cfg.aggregation_type == AggregationType::CountMinSketchWithHeap { + // Heap-only self-keyed layout: the GROUP BY column is tracked + // inside the sketch's aggregated dimension, not as a partition key. + cfg.grouping_labels = KeyByLabelNames::empty(); + cfg.aggregated_labels = spatial_output.clone(); + } + } + } + let t_lookback = match query_type { QueryType::Spatial => self.data_ingestion_interval, _ => sql_query.query_data[0].time_info.get_duration() as u64, diff --git a/asap-planner-rs/src/planner_output.rs b/asap-planner-rs/src/planner_output.rs index 71b3ad1..ee739aa 100644 --- a/asap-planner-rs/src/planner_output.rs +++ b/asap-planner-rs/src/planner_output.rs @@ -6,8 +6,8 @@ use asap_types::streaming_config::StreamingConfig; use crate::generator::{ GeneratorOutput, PuntedQuery, KEY_AGGREGATIONS, KEY_AGG_SUB_TYPE, KEY_AGG_TYPE, KEY_LABELS, - KEY_NUM_AGG_TO_RETAIN, KEY_QUERIES, KEY_QUERY, KEY_READ_COUNT_THRESHOLD, KEY_TABLE_NAME, - KEY_VALUE_COLUMN, KEY_WINDOW_SIZE, + KEY_NUM_AGG_TO_RETAIN, KEY_PARAMETERS, KEY_QUERIES, KEY_QUERY, KEY_READ_COUNT_THRESHOLD, + KEY_TABLE_NAME, KEY_VALUE_COLUMN, KEY_WINDOW_SIZE, }; /// Output of the planning process — contains the two YAML configs @@ -227,4 +227,17 @@ impl PlannerOutput { }) .unwrap_or(false) } + + /// Returns a sketch parameter from the first aggregation matching `agg_type`. + pub fn aggregation_parameter(&self, agg_type: &str, key: &str) -> Option { + self.find_aggregation_by_type(agg_type) + .and_then(|m| m.get(KEY_PARAMETERS)) + .and_then(|v| { + if let YamlValue::Mapping(params) = v { + params.get(key).cloned() + } else { + None + } + }) + } } diff --git a/asap-planner-rs/tests/sql_integration.rs b/asap-planner-rs/tests/sql_integration.rs index 242bef8..759c2a9 100644 --- a/asap-planner-rs/tests/sql_integration.rs +++ b/asap-planner-rs/tests/sql_integration.rs @@ -834,3 +834,138 @@ fn t_not_multiple_of_data_ingestion_interval_returns_planner_error() { .generate(); assert!(matches!(result, Err(ControllerError::PlannerError(_)))); } + +// ── SQL top-k (CountMinSketchWithHeap) ──────────────────────────────────────── + +/// COUNT … ORDER BY DESC LIMIT k → single heap-only sketch. +#[test] +fn spatial_count_topk_heap() { + let q = "SELECT srcip, COUNT(pkt_len) AS transfer_events FROM netflow_table \ + WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) \ + GROUP BY srcip ORDER BY transfer_events DESC LIMIT 10"; + let out = SQLController::from_yaml(&netflow_one_query_config(q, 1), sql_opts_1s_ingest()) + .unwrap() + .generate() + .unwrap(); + + assert_eq!(out.streaming_aggregation_count(), 1); + assert_eq!(out.inference_query_count(), 1); + assert!(out.has_aggregation_type("CountMinSketchWithHeap")); + assert!(out.has_aggregation_type_and_sub_type("CountMinSketchWithHeap", "topk")); + assert!(!out.has_aggregation_type("DeltaSetAggregator")); + assert!(!out.has_aggregation_type("CountMinSketch")); + assert!(out.all_tumbling_window_sizes_eq(1)); + assert_eq!( + out.aggregation_labels("CountMinSketchWithHeap", "grouping"), + Vec::::new() + ); + assert_eq!( + out.aggregation_labels("CountMinSketchWithHeap", "aggregated"), + vec!["srcip".to_string()] + ); + let mut rollup = out.aggregation_labels("CountMinSketchWithHeap", "rollup"); + rollup.sort(); + assert_eq!(rollup, vec!["dstip".to_string(), "proto".to_string()]); + assert_eq!( + out.aggregation_parameter("CountMinSketchWithHeap", "heapsize") + .and_then(|v| v.as_u64()), + Some(40) + ); + assert_eq!( + out.aggregation_parameter("CountMinSketchWithHeap", "count_events") + .and_then(|v| v.as_bool()), + Some(true) + ); +} + +/// SUM … ORDER BY DESC LIMIT k → value-weighted heap sketch. +#[test] +fn spatial_sum_topk_heap() { + let q = "SELECT srcip, SUM(pkt_len) AS total_bytes FROM netflow_table \ + WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) \ + GROUP BY srcip ORDER BY total_bytes DESC LIMIT 10"; + let out = SQLController::from_yaml(&netflow_one_query_config(q, 1), sql_opts_1s_ingest()) + .unwrap() + .generate() + .unwrap(); + + assert_eq!(out.streaming_aggregation_count(), 1); + assert_eq!(out.inference_query_count(), 1); + assert!(out.has_aggregation_type("CountMinSketchWithHeap")); + assert!(out.has_aggregation_type_and_sub_type("CountMinSketchWithHeap", "topk")); + assert!(!out.has_aggregation_type("DeltaSetAggregator")); + assert!(!out.has_aggregation_type("CountMinSketch")); + assert!(out.all_tumbling_window_sizes_eq(1)); + assert_eq!( + out.aggregation_labels("CountMinSketchWithHeap", "grouping"), + Vec::::new() + ); + assert_eq!( + out.aggregation_labels("CountMinSketchWithHeap", "aggregated"), + vec!["srcip".to_string()] + ); + let mut rollup = out.aggregation_labels("CountMinSketchWithHeap", "rollup"); + rollup.sort(); + assert_eq!(rollup, vec!["dstip".to_string(), "proto".to_string()]); + assert_eq!( + out.aggregation_parameter("CountMinSketchWithHeap", "heapsize") + .and_then(|v| v.as_u64()), + Some(40) + ); + assert_eq!( + out.aggregation_parameter("CountMinSketchWithHeap", "count_events") + .and_then(|v| v.as_bool()), + Some(false) + ); +} + +/// Plain COUNT without ORDER BY / LIMIT stays on the CMS + DeltaSet path. +#[test] +fn spatial_count_without_order_by_is_not_topk() { + let q = "SELECT srcip, COUNT(pkt_len) AS transfer_events FROM netflow_table \ + WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) \ + GROUP BY srcip"; + let out = SQLController::from_yaml(&netflow_one_query_config(q, 1), sql_opts_1s_ingest()) + .unwrap() + .generate() + .unwrap(); + + assert_eq!(out.streaming_aggregation_count(), 2); + assert!(out.has_aggregation_type("CountMinSketch")); + assert!(out.has_aggregation_type("DeltaSetAggregator")); + assert!(!out.has_aggregation_type("CountMinSketchWithHeap")); +} + +/// ORDER BY aggregate alias ASC (bottom-k) stays on the CMS + DeltaSet path. +#[test] +fn spatial_count_order_by_asc_limit_is_not_topk() { + let q = "SELECT srcip, COUNT(pkt_len) AS transfer_events FROM netflow_table \ + WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) \ + GROUP BY srcip ORDER BY transfer_events ASC LIMIT 10"; + let out = SQLController::from_yaml(&netflow_one_query_config(q, 1), sql_opts_1s_ingest()) + .unwrap() + .generate() + .unwrap(); + + assert_eq!(out.streaming_aggregation_count(), 2); + assert!(out.has_aggregation_type("CountMinSketch")); + assert!(out.has_aggregation_type("DeltaSetAggregator")); + assert!(!out.has_aggregation_type("CountMinSketchWithHeap")); +} + +/// LIMIT 0 is treated as non-top-k and uses the normal CMS + DeltaSet path. +#[test] +fn spatial_count_limit_zero_is_not_topk() { + let q = "SELECT srcip, COUNT(pkt_len) AS transfer_events FROM netflow_table \ + WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) \ + GROUP BY srcip ORDER BY transfer_events DESC LIMIT 0"; + let out = SQLController::from_yaml(&netflow_one_query_config(q, 1), sql_opts_1s_ingest()) + .unwrap() + .generate() + .unwrap(); + + assert_eq!(out.streaming_aggregation_count(), 2); + assert!(out.has_aggregation_type("CountMinSketch")); + assert!(out.has_aggregation_type("DeltaSetAggregator")); + assert!(!out.has_aggregation_type("CountMinSketchWithHeap")); +} diff --git a/asap-query-engine/src/engines/simple_engine/sql.rs b/asap-query-engine/src/engines/simple_engine/sql.rs index 281832b..0e20fe5 100644 --- a/asap-query-engine/src/engines/simple_engine/sql.rs +++ b/asap-query-engine/src/engines/simple_engine/sql.rs @@ -11,7 +11,9 @@ use asap_types::utils::normalize_spatial_filter; use promql_utilities::data_model::KeyByLabelNames; use promql_utilities::query_logics::enums::{QueryPatternType, Statistic}; use sql_utilities::ast_matching::QueryType; -use sql_utilities::ast_matching::{SQLPatternMatcher, SQLPatternParser, SQLQuery}; +use sql_utilities::ast_matching::{ + detect_sql_topk, SQLPatternMatcher, SQLPatternParser, SQLQuery, SqlTopk, TopkWeighting, +}; use sql_utilities::sqlhelper::{AggregationInfo, OrderByItem, SQLQueryData}; use sqlparser::dialect::*; use sqlparser::parser::Parser as parser; @@ -137,83 +139,6 @@ fn sort_and_truncate_instant_vector( results } -/// How a top-k query weights each observation fed into the heavy-hitter sketch. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub(crate) enum TopkWeighting { - /// `COUNT(col)`: every matching row contributes weight 1, so the heap ranks - /// keys by event frequency (`count_events: true`). - Count, - /// `SUM(col)`: every matching row contributes weight = `col`, so the heap - /// ranks keys by summed value (`count_events: false`). - /// - /// Assumes **non-negative** summands: `CountMinSketch` is a frequency sketch - /// and cannot represent negative weights, so a `SUM` over a column that can - /// go negative would produce meaningless estimates. - Sum, -} - -/// A detected SQL top-k query: the `LIMIT k` plus how the sketch is weighted. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub(crate) struct SqlTopk { - pub k: u64, - pub weighting: TopkWeighting, -} - -impl SqlTopk { - /// `count_events` flag the backing `CountMinSketchWithHeap` must use: - /// `true` for COUNT (unit weight), `false` for SUM (value weight). - pub fn count_events(&self) -> bool { - matches!(self.weighting, TopkWeighting::Count) - } -} - -/// Detect a SQL top-k query and return its `k` plus sketch weighting. -/// -/// Recognises the heavy-hitter shape that `CountMinSketchWithHeap` serves: -/// -/// ```sql -/// SELECT , COUNT() AS -- or SUM() -/// FROM
WHERE <1s window> -/// GROUP BY -/// ORDER BY DESC -/// LIMIT k -/// ``` -/// -/// The grouping key (``) becomes the *aggregated* dimension inside the -/// sketch's heap — not a precompute partition key — so a single sketch per -/// window tracks the top keys by event count (COUNT) or summed value (SUM). -/// -/// The SQL parser only accepts identifier ORDER BY targets, so the descending -/// order must reference the aggregate's alias (e.g. `transfer_events`), not the -/// `COUNT(col)` / `SUM(col)` expression itself. -pub(crate) fn detect_sql_topk(query_data: &SQLQueryData) -> Option { - let k = query_data.limit?; - // Need a GROUP BY key to rank and an ORDER BY to define "top". - if query_data.labels.is_empty() || query_data.order_by.is_empty() { - return None; - } - // CountMinSketchWithHeap tracks heavy hitters by COUNT (unit weight) or - // SUM (value weight). Any other aggregate (MIN/MAX/quantile/...) cannot be - // served by the additive frequency sketch. - let name = query_data.aggregation_info.get_name(); - let weighting = if name.eq_ignore_ascii_case("count") { - TopkWeighting::Count - } else if name.eq_ignore_ascii_case("sum") { - TopkWeighting::Sum - } else { - return None; - }; - // Primary ordering must be the aggregate alias, descending (largest first). - let primary = &query_data.order_by[0]; - if primary.ascending { - return None; - } - if query_data.aggregation_alias.as_deref() != Some(primary.column.as_str()) { - return None; - } - Some(SqlTopk { k, weighting }) -} - impl SimpleEngine { /// Finds the query configuration for a SQL query using structural pattern matching. /// @@ -846,9 +771,10 @@ impl SimpleEngine { #[cfg(test)] mod detect_topk_tests { - use super::{detect_sql_topk, SqlTopk, TopkWeighting}; - use sql_utilities::ast_matching::SQLPatternParser; - use sql_utilities::sqlhelper::{SQLSchema, Table}; + use sql_utilities::ast_matching::{detect_sql_topk, SQLPatternParser, SqlTopk, TopkWeighting}; + use sql_utilities::sqlhelper::{ + AggregationInfo, OrderByItem, SQLQueryData, SQLSchema, Table, TimeInfo, + }; use sqlparser::dialect::GenericDialect; use sqlparser::parser::Parser; use std::collections::HashSet; @@ -912,6 +838,46 @@ mod detect_topk_tests { ); } + #[test] + fn alias_case_mismatch_still_detects_topk() { + // The parser path can normalize/canonicalize identifiers; verify directly on + // SQLQueryData that alias matching in detect_sql_topk is case-insensitive. + let qd = SQLQueryData { + aggregation_info: AggregationInfo::new( + "COUNT".to_string(), + "pkt_len".to_string(), + vec![], + ), + aggregation_alias: Some("transfer_events".to_string()), + metric: "netflow_table".to_string(), + labels: HashSet::from(["srcip".to_string()]), + time_info: TimeInfo::new("time".to_string(), 0.0, 1.0), + subquery: None, + order_by: vec![OrderByItem { + column: "TRANSFER_EVENTS".to_string(), + ascending: false, + }], + limit: Some(10), + }; + assert_eq!( + detect_sql_topk(&qd), + Some(SqlTopk { + k: 10, + weighting: TopkWeighting::Count, + }), + ); + } + + #[test] + fn zero_limit_is_not_topk() { + let sql = format!( + "SELECT srcip, COUNT(pkt_len) AS transfer_events FROM netflow_table {WINDOW} \ + GROUP BY srcip ORDER BY transfer_events DESC LIMIT 0" + ); + let qd = parse(&sql).expect("query should parse"); + assert_eq!(detect_sql_topk(&qd), None, "LIMIT 0 is not top-k"); + } + #[test] fn missing_limit_is_not_topk() { let sql = format!(