From 704375339ae0bf3d9465babbbc5cb2e532096f67 Mon Sep 17 00:00:00 2001 From: Akanksha Akkihal Date: Mon, 8 Jun 2026 01:56:19 +0000 Subject: [PATCH 1/5] Support for SUM Top-k queries using CountMinSketchWithHeap --- .../rs/asap_types/src/capability_matching.rs | 157 +++++++++++- .../rs/asap_types/src/query_requirements.rs | 9 + .../src/engines/simple_engine/promql.rs | 3 + .../src/engines/simple_engine/sql.rs | 228 +++++++++++++++--- 4 files changed, 369 insertions(+), 28 deletions(-) diff --git a/asap-common/dependencies/rs/asap_types/src/capability_matching.rs b/asap-common/dependencies/rs/asap_types/src/capability_matching.rs index 8a64f1fd..4763c47d 100644 --- a/asap-common/dependencies/rs/asap_types/src/capability_matching.rs +++ b/asap-common/dependencies/rs/asap_types/src/capability_matching.rs @@ -101,6 +101,39 @@ pub fn spatial_filter_compatible(config_filter: &str, req_filter: &str) -> bool config_norm == req_norm } +/// Reads the `count_events` parameter from a `CountMinSketchWithHeap` config. +/// Defaults to `true` (COUNT semantics) so existing count top-k configs that +/// omit the flag keep matching COUNT top-k queries. +fn config_count_events(config: &AggregationConfig) -> bool { + config + .parameters + .get("count_events") + .and_then(|v| v.as_bool()) + .unwrap_or(true) +} + +/// Top-k weighting compatibility. Only constrains `Statistic::Topk` candidates; +/// every other statistic passes unconditionally. +/// +/// A COUNT top-k query (`Some(true)`) must be served by a `count_events: true` +/// sketch and a SUM top-k query (`Some(false)`) by a `count_events: false` +/// (value-weighted) sketch. This is what tells two `CountMinSketchWithHeap` +/// configs on the same metric apart. `None` (non-top-k, or PromQL top-k which +/// does not pin the weighting) imposes no constraint. +pub fn topk_weighting_compatible( + stat: Statistic, + config: &AggregationConfig, + req_count_events: Option, +) -> bool { + if stat != Statistic::Topk { + return true; + } + match req_count_events { + Some(want) => config_count_events(config) == want, + None => true, + } +} + /// Aggregation priority comparator: prefer larger `window_size` (descending). /// This is a separate function so callers can swap the policy without touching matching logic. pub fn aggregation_priority(a: &AggregationConfig, b: &AggregationConfig) -> Ordering { @@ -157,7 +190,8 @@ pub fn find_compatible_aggregation( && spatial_filter_compatible( &c.spatial_filter_normalized, &requirements.spatial_filter_normalized, - ); + ) + && topk_weighting_compatible(stat, c, requirements.topk_count_events); if !ok { debug!( agg_id = c.aggregation_id, @@ -216,6 +250,10 @@ pub fn find_compatible_aggregation( } // If value type is multi-population, find the paired key aggregation. + // Top-k (CountMinSketchWithHeap) follows the same path as plain COUNT: the + // self-keyed case is expressed via the query_config path (a single + // aggregation reference), while the capability-matching fallback resolves a + // separate key aggregation just like any other multi-population value type. let key_agg: &AggregationConfig = if is_multi_population_value_type(value_agg.aggregation_type) { let ka = configs @@ -310,6 +348,7 @@ mod tests { data_range_ms, grouping_labels: KeyByLabelNames::new(grouping.iter().map(|s| s.to_string()).collect()), spatial_filter_normalized: normalize_spatial_filter(spatial_filter), + topk_count_events: None, } } @@ -839,4 +878,120 @@ mod tests { ); assert!(result.is_none()); } + + // --- top-k count vs sum weighting --- + // + // Top-k follows the same capability-matching path as plain COUNT: a + // CountMinSketchWithHeap is a multi-population value type, so the fallback + // pairs it with a key aggregation. These tests therefore always provision a + // DeltaSetAggregator and focus on which *value* heap (count- vs sum-weighted) + // is selected via the count_events discriminator. + + /// Paired key aggregation required by the multi-population fallback. + fn make_key_agg(id: u64, metric: &str) -> AggregationConfig { + make_config(id, metric, "DeltaSetAggregator", "", 1, "tumbling", &[], "") + } + + /// `CountMinSketchWithHeap` config with an explicit `count_events` parameter. + fn make_topk_config(id: u64, metric: &str, count_events: bool) -> AggregationConfig { + let mut c = make_config( + id, + metric, + "CountMinSketchWithHeap", + "", + 1, + "tumbling", + &[], + "", + ); + c.parameters.insert( + "count_events".to_string(), + serde_json::Value::Bool(count_events), + ); + c + } + + fn topk_req(metric: &str, count_events: Option) -> QueryRequirements { + let mut r = req(metric, &[Statistic::Topk], Some(1_000), &[], ""); + r.topk_count_events = count_events; + r + } + + #[test] + fn topk_count_query_picks_count_events_sketch() { + // Two heap sketches on the same metric: one count-weighted, one + // sum-weighted. A COUNT top-k query must resolve to the count one. + let mut configs = HashMap::new(); + configs.insert(1, make_topk_config(1, "netflow_table", true)); + configs.insert(2, make_topk_config(2, "netflow_table", false)); + configs.insert(9, make_key_agg(9, "netflow_table")); + + let result = find_compatible_aggregation(&configs, &topk_req("netflow_table", Some(true))) + .expect("COUNT top-k should match the count_events sketch"); + assert_eq!(result.aggregation_id_for_value, 1); + assert_eq!(result.aggregation_id_for_key, 9); + } + + #[test] + fn topk_sum_query_picks_value_weighted_sketch() { + let mut configs = HashMap::new(); + configs.insert(1, make_topk_config(1, "netflow_table", true)); + configs.insert(2, make_topk_config(2, "netflow_table", false)); + configs.insert(9, make_key_agg(9, "netflow_table")); + + let result = find_compatible_aggregation(&configs, &topk_req("netflow_table", Some(false))) + .expect("SUM top-k should match the count_events: false sketch"); + assert_eq!(result.aggregation_id_for_value, 2); + assert_eq!(result.aggregation_id_for_key, 9); + } + + #[test] + fn topk_sum_query_rejects_count_only_sketch() { + // Only a count-weighted sketch exists; a SUM top-k query cannot be served + // even with a key agg available. + let mut configs = HashMap::new(); + configs.insert(1, make_topk_config(1, "netflow_table", true)); + configs.insert(9, make_key_agg(9, "netflow_table")); + let result = find_compatible_aggregation(&configs, &topk_req("netflow_table", Some(false))); + assert!( + result.is_none(), + "SUM top-k must not fall back to a count_events: true sketch", + ); + } + + #[test] + fn topk_count_query_matches_sketch_without_explicit_flag() { + // Configs that omit `count_events` default to count semantics, so a + // COUNT top-k query still matches (backwards compatibility). + let mut configs = HashMap::new(); + configs.insert( + 7, + make_config( + 7, + "netflow_table", + "CountMinSketchWithHeap", + "", + 1, + "tumbling", + &[], + "", + ), + ); + configs.insert(9, make_key_agg(9, "netflow_table")); + let result = find_compatible_aggregation(&configs, &topk_req("netflow_table", Some(true))) + .expect("default (no flag) sketch should serve COUNT top-k"); + assert_eq!(result.aggregation_id_for_value, 7); + } + + #[test] + fn topk_unconstrained_weighting_matches_any_sketch() { + // `topk_count_events: None` (e.g. PromQL top-k) imposes no weighting + // constraint, so any heap sketch on the metric matches. + let mut configs = HashMap::new(); + configs.insert(3, make_topk_config(3, "netflow_table", false)); + configs.insert(9, make_key_agg(9, "netflow_table")); + let result = find_compatible_aggregation(&configs, &topk_req("netflow_table", None)) + .expect("unconstrained top-k should match regardless of count_events"); + assert_eq!(result.aggregation_id_for_value, 3); + } } diff --git a/asap-common/dependencies/rs/asap_types/src/query_requirements.rs b/asap-common/dependencies/rs/asap_types/src/query_requirements.rs index 218d94d5..56b2b766 100644 --- a/asap-common/dependencies/rs/asap_types/src/query_requirements.rs +++ b/asap-common/dependencies/rs/asap_types/src/query_requirements.rs @@ -18,4 +18,13 @@ pub struct QueryRequirements { pub grouping_labels: KeyByLabelNames, /// Normalized label filter (produced by normalize_spatial_filter). pub spatial_filter_normalized: String, + /// For `Statistic::Topk` requirements, the heavy-hitter weighting the query + /// needs, used to disambiguate two `CountMinSketchWithHeap` configs on the + /// same metric: + /// * `Some(true)` → COUNT semantics (`count_events: true`, weight 1/event), + /// * `Some(false)` → SUM semantics (`count_events: false`, weight = value). + /// + /// `None` for non-top-k requirements (and for PromQL top-k, which does not + /// constrain the sketch weighting); matching ignores it when `None`. + pub topk_count_events: Option, } diff --git a/asap-query-engine/src/engines/simple_engine/promql.rs b/asap-query-engine/src/engines/simple_engine/promql.rs index 25ee689e..2eae222d 100644 --- a/asap-query-engine/src/engines/simple_engine/promql.rs +++ b/asap-query-engine/src/engines/simple_engine/promql.rs @@ -648,6 +648,9 @@ impl SimpleEngine { data_range_ms, grouping_labels, spatial_filter_normalized: normalize_spatial_filter(&spatial_filter), + // PromQL top-k does not constrain the sketch weighting; leave the + // count/sum discriminator unset so matching does not over-filter. + topk_count_events: None, } } diff --git a/asap-query-engine/src/engines/simple_engine/sql.rs b/asap-query-engine/src/engines/simple_engine/sql.rs index bf2eb490..f51ed679 100644 --- a/asap-query-engine/src/engines/simple_engine/sql.rs +++ b/asap-query-engine/src/engines/simple_engine/sql.rs @@ -137,12 +137,42 @@ fn sort_and_truncate_instant_vector( results } -/// Detect a SQL top-k query and return its `k`. +/// How a top-k query weights each observation fed into the heavy-hitter sketch. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum TopkWeighting { + /// `COUNT(col)`: every matching row contributes weight 1, so the heap ranks + /// keys by event frequency (`count_events: true`). + Count, + /// `SUM(col)`: every matching row contributes weight = `col`, so the heap + /// ranks keys by summed value (`count_events: false`). + /// + /// Assumes **non-negative** summands: `CountMinSketch` is a frequency sketch + /// and cannot represent negative weights, so a `SUM` over a column that can + /// go negative would produce meaningless estimates. + Sum, +} + +/// A detected SQL top-k query: the `LIMIT k` plus how the sketch is weighted. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) struct SqlTopk { + pub k: u64, + pub weighting: TopkWeighting, +} + +impl SqlTopk { + /// `count_events` flag the backing `CountMinSketchWithHeap` must use: + /// `true` for COUNT (unit weight), `false` for SUM (value weight). + pub fn count_events(&self) -> bool { + matches!(self.weighting, TopkWeighting::Count) + } +} + +/// Detect a SQL top-k query and return its `k` plus sketch weighting. /// /// Recognises the heavy-hitter shape that `CountMinSketchWithHeap` serves: /// /// ```sql -/// SELECT , COUNT() AS +/// SELECT , COUNT() AS -- or SUM() /// FROM WHERE <1s window> /// GROUP BY /// ORDER BY DESC @@ -151,25 +181,28 @@ fn sort_and_truncate_instant_vector( /// /// The grouping key (``) becomes the *aggregated* dimension inside the /// sketch's heap — not a precompute partition key — so a single sketch per -/// window tracks the top keys by event count. +/// window tracks the top keys by event count (COUNT) or summed value (SUM). /// /// The SQL parser only accepts identifier ORDER BY targets, so the descending /// order must reference the aggregate's alias (e.g. `transfer_events`), not the -/// `COUNT(col)` expression itself. -pub(crate) fn detect_sql_topk(query_data: &SQLQueryData) -> Option { +/// `COUNT(col)` / `SUM(col)` expression itself. +pub(crate) fn detect_sql_topk(query_data: &SQLQueryData) -> Option { let k = query_data.limit?; // Need a GROUP BY key to rank and an ORDER BY to define "top". if query_data.labels.is_empty() || query_data.order_by.is_empty() { return None; } - // CountMinSketchWithHeap tracks heavy hitters by COUNT. - if !query_data - .aggregation_info - .get_name() - .eq_ignore_ascii_case("count") - { + // CountMinSketchWithHeap tracks heavy hitters by COUNT (unit weight) or + // SUM (value weight). Any other aggregate (MIN/MAX/quantile/...) cannot be + // served by the additive frequency sketch. + let name = query_data.aggregation_info.get_name(); + let weighting = if name.eq_ignore_ascii_case("count") { + TopkWeighting::Count + } else if name.eq_ignore_ascii_case("sum") { + TopkWeighting::Sum + } else { return None; - } + }; // Primary ordering must be the aggregate alias, descending (largest first). let primary = &query_data.order_by[0]; if primary.ascending { @@ -178,7 +211,7 @@ pub(crate) fn detect_sql_topk(query_data: &SQLQueryData) -> Option { if query_data.aggregation_alias.as_deref() != Some(primary.column.as_str()) { return None; } - Some(k) + Some(SqlTopk { k, weighting }) } impl SimpleEngine { @@ -312,7 +345,7 @@ impl SimpleEngine { &self, match_result: &SQLQuery, query_pattern_type: QueryPatternType, - topk_k: Option, + topk: Option, ) -> QueryRequirements { let query_data = match_result .outer_data() @@ -333,7 +366,7 @@ impl SimpleEngine { // and the grouping is empty: the GROUP BY column is the sketch's // *aggregated* (heavy-hitter) dimension, held inside one sketch per // window, not a precompute partition key. - let is_topk = topk_k.is_some(); + let is_topk = topk.is_some(); let statistics: Vec = if is_topk { vec![Statistic::Topk] } else { @@ -371,6 +404,10 @@ impl SimpleEngine { data_range_ms, grouping_labels, spatial_filter_normalized: normalize_spatial_filter(""), + // COUNT top-k needs a `count_events: true` sketch; SUM top-k needs a + // `count_events: false` (value-weighted) one. This disambiguates two + // CountMinSketchWithHeap configs on the same metric during matching. + topk_count_events: topk.map(|t| t.count_events()), } } @@ -571,9 +608,11 @@ impl SimpleEngine { // Top-k detection takes precedence: `... ORDER BY DESC LIMIT k` // is served by CountMinSketchWithHeap (Statistic::Topk) rather than the - // plain COUNT path, so the sketch heap drives the result set. - let topk_k = detect_sql_topk(&query_data); - let statistic_to_compute = if topk_k.is_some() { + // plain COUNT/SUM path, so the sketch heap drives the result set. Both + // COUNT(col) and SUM(col) map to Topk; they differ only in how the + // backing sketch is weighted (see TopkWeighting). + let topk = detect_sql_topk(&query_data); + let statistic_to_compute = if topk.is_some() { Statistic::Topk } else { Self::parse_single_statistic(&statistic_name)? @@ -586,8 +625,8 @@ impl SimpleEngine { e }) .ok()?; - if let Some(k) = topk_k { - query_kwargs.insert("k".to_string(), k.to_string()); + if let Some(topk) = topk { + query_kwargs.insert("k".to_string(), topk.k.to_string()); } // Create query metadata @@ -613,7 +652,7 @@ impl SimpleEngine { } else { warn!("No query_config entry for SQL query. Attempting capability-based matching."); let requirements = - self.build_query_requirements_sql(&match_result, query_pattern_type, topk_k); + self.build_query_requirements_sql(&match_result, query_pattern_type, topk); self.streaming_config .read() .unwrap() @@ -792,7 +831,7 @@ impl SimpleEngine { #[cfg(test)] mod detect_topk_tests { - use super::detect_sql_topk; + use super::{detect_sql_topk, SqlTopk, TopkWeighting}; use sql_utilities::ast_matching::SQLPatternParser; use sql_utilities::sqlhelper::{SQLSchema, Table}; use sqlparser::dialect::GenericDialect; @@ -828,7 +867,34 @@ mod detect_topk_tests { GROUP BY srcip ORDER BY transfer_events DESC LIMIT 10" ); let qd = parse(&sql).expect("valid topk query should parse"); - assert_eq!(detect_sql_topk(&qd), Some(10)); + assert_eq!( + detect_sql_topk(&qd), + Some(SqlTopk { + k: 10, + weighting: TopkWeighting::Count, + }), + "COUNT top-k must use unit (count_events) weighting", + ); + } + + #[test] + fn sum_order_by_alias_desc_limit_is_topk() { + let sql = format!( + "SELECT srcip, SUM(pkt_len) AS total FROM netflow_table {WINDOW} \ + GROUP BY srcip ORDER BY total DESC LIMIT 10" + ); + let qd = parse(&sql).expect("valid sum top-k query should parse"); + let detected = detect_sql_topk(&qd).expect("SUM ORDER BY DESC LIMIT is top-k"); + assert_eq!(detected.k, 10); + assert_eq!( + detected.weighting, + TopkWeighting::Sum, + "SUM top-k must use value (count_events=false) weighting", + ); + assert!( + !detected.count_events(), + "SUM top-k maps to a count_events: false sketch", + ); } #[test] @@ -870,16 +936,18 @@ mod detect_topk_tests { } #[test] - fn sum_aggregate_is_not_topk() { + fn min_aggregate_is_not_topk() { + // Only the additive sketch-friendly aggregates (COUNT/SUM) are top-k; + // MIN/MAX/quantile cannot be served by CountMinSketchWithHeap. let sql = format!( - "SELECT srcip, SUM(pkt_len) AS total FROM netflow_table {WINDOW} \ - GROUP BY srcip ORDER BY total DESC LIMIT 10" + "SELECT srcip, MIN(pkt_len) AS smallest FROM netflow_table {WINDOW} \ + GROUP BY srcip ORDER BY smallest DESC LIMIT 10" ); let qd = parse(&sql).expect("query should parse"); assert_eq!( detect_sql_topk(&qd), None, - "only COUNT maps to CMS-with-heap top-k" + "only COUNT/SUM map to CMS-with-heap top-k" ); } @@ -1208,6 +1276,112 @@ mod topk_pipeline_tests { ) } + /// Incoming SUM top-k query over a 1-second absolute window. + fn sum_topk_query(limit: u64) -> String { + format!( + "SELECT srcip, SUM(pkt_len) AS total_bytes FROM netflow_table \ + WHERE time BETWEEN DATEADD(s, -1, '2025-10-01 00:00:10') AND '2025-10-01 00:00:10' \ + GROUP BY srcip ORDER BY total_bytes DESC LIMIT {limit}" + ) + } + + /// Build a SQL engine whose only aggregation is a self-keyed, value-weighted + /// (`count_events: false`) `CountMinSketchWithHeap` over `netflow_table`, + /// referenced by a single-aggregation `SUM(pkt_len)` query_config. Mirrors + /// `build_topk_engine` but for SUM top-k, so the engine resolves it + /// self-keyed via the query_config path (the same path COUNT uses). + fn build_sum_topk_engine() -> SimpleEngine { + let template = "SELECT srcip, SUM(pkt_len) FROM netflow_table \ + WHERE time BETWEEN DATEADD(s, -1, NOW()) AND NOW() GROUP BY srcip"; + + let value_cols: HashSet = ["pkt_len"].iter().map(|s| s.to_string()).collect(); + let labels: HashSet = ["srcip", "dstip", "proto"] + .iter() + .map(|s| s.to_string()) + .collect(); + let table = Table::new(METRIC.to_string(), "time".to_string(), value_cols, labels); + let sql_schema = SQLSchema::new(vec![table]); + + let query_config = QueryConfig::new(template.to_string()) + .add_aggregation(AggregationReference::new(AGG_ID, None)); + + let inference_config = InferenceConfig { + schema: SchemaConfig::SQL(sql_schema), + query_configs: vec![query_config], + cleanup_policy: CleanupPolicy::NoCleanup, + }; + + // count_events: false ⇒ the heap is weighted by the summed value rather + // than the event count (SUM semantics). + let mut parameters = HashMap::new(); + parameters.insert("count_events".to_string(), serde_json::json!(false)); + let agg_config = AggregationConfig { + aggregation_id: AGG_ID, + aggregation_type: AggregationType::CountMinSketchWithHeap, + aggregation_sub_type: String::new(), + parameters, + grouping_labels: KeyByLabelNames::empty(), + aggregated_labels: KeyByLabelNames::new(vec!["srcip".to_string()]), + rollup_labels: KeyByLabelNames::empty(), + original_yaml: String::new(), + window_size: 1, + slide_interval: 1, + window_type: WindowType::Tumbling, + spatial_filter: String::new(), + spatial_filter_normalized: String::new(), + metric: METRIC.to_string(), + num_aggregates_to_retain: None, + read_count_threshold: None, + table_name: None, + value_column: None, + }; + + let mut agg_configs = HashMap::new(); + agg_configs.insert(AGG_ID, agg_config); + let streaming_config = Arc::new(StreamingConfig { + aggregation_configs: agg_configs, + }); + let store = Arc::new(SimpleMapStore::new( + streaming_config.clone(), + CleanupPolicy::NoCleanup, + )); + SimpleEngine::new( + store, + inference_config, + streaming_config, + 1, + QueryLanguage::sql, + ) + } + + #[test] + fn sum_topk_resolves_self_keyed_heap() { + // SUM(col) ORDER BY DESC LIMIT k is a top-k query and, like COUNT, + // resolves self-keyed through the single-aggregation query_config path. + let engine = build_sum_topk_engine(); + let context = engine + .build_query_execution_context_sql(sum_topk_query(5), QUERY_TIME) + .expect("SUM top-k should build a context via the query_config path"); + + assert_eq!( + context.metadata.statistic_to_compute, + Statistic::Topk, + "SUM ... ORDER BY DESC LIMIT n must be promoted to Topk", + ); + assert_eq!( + context.metadata.query_kwargs.get("k").map(String::as_str), + Some("5"), + ); + // Self-keyed: heap supplies both keys and values, so key id == value id + // and no separate keys query is planned. + assert_eq!( + context.agg_info.aggregation_id_for_key, + context.agg_info.aggregation_id_for_value, + ); + assert_eq!(context.agg_info.aggregation_id_for_value, AGG_ID); + assert!(context.store_plan.keys_query.is_none()); + } + #[test] fn detects_topk_and_resolves_self_keyed_heap() { let (engine, _store) = build_topk_engine(); From 7376ef39ed4574809c8eddd71dcef62bfac9e0e5 Mon Sep 17 00:00:00 2001 From: Akanksha Akkihal Date: Mon, 8 Jun 2026 18:32:56 +0000 Subject: [PATCH 2/5] Restrict SQL top-k detection to OnlyTemporal queries --- .../src/engines/simple_engine/sql.rs | 34 +++++++++++++++---- .../src/tests/sql_pattern_matching_tests.rs | 21 ++++++++++++ 2 files changed, 49 insertions(+), 6 deletions(-) diff --git a/asap-query-engine/src/engines/simple_engine/sql.rs b/asap-query-engine/src/engines/simple_engine/sql.rs index f51ed679..550f8982 100644 --- a/asap-query-engine/src/engines/simple_engine/sql.rs +++ b/asap-query-engine/src/engines/simple_engine/sql.rs @@ -606,12 +606,16 @@ impl SimpleEngine { } }; - // Top-k detection takes precedence: `... ORDER BY DESC LIMIT k` - // is served by CountMinSketchWithHeap (Statistic::Topk) rather than the - // plain COUNT/SUM path, so the sketch heap drives the result set. Both - // COUNT(col) and SUM(col) map to Topk; they differ only in how the - // backing sketch is weighted (see TopkWeighting). - let topk = detect_sql_topk(&query_data); + // Top-k (CountMinSketchWithHeap) is defined only for flat temporal queries: + // one window, one GROUP BY key, COUNT/SUM ... ORDER BY DESC LIMIT k. + // Nested patterns attach ORDER BY / LIMIT to the outer SELECT; `query_data` from + // parse is the outer layer, while the temporal aggregate lives in `inner_data` + // for OneTemporalOneSpatial. Running detect_sql_topk on the outer layer would + // mis-classify spatial rollups as top-k. + let topk = match query_pattern_type { + QueryPatternType::OnlyTemporal => detect_sql_topk(&query_data), + QueryPatternType::OnlySpatial | QueryPatternType::OneTemporalOneSpatial => None, + }; let statistic_to_compute = if topk.is_some() { Statistic::Topk } else { @@ -961,6 +965,24 @@ mod detect_topk_tests { let qd = parse(&sql).expect("query should parse"); assert_eq!(detect_sql_topk(&qd), None); } + + #[test] + fn nested_outer_layer_would_match_detect_sql_topk() { + // Spatial-over-temporal: ORDER BY / LIMIT sit on the outer SELECT, so the + // parsed top-level `query_data` looks like SUM top-k even though the temporal + // aggregate is in the subquery. The engine must not promote this to Topk. + let sql = format!( + "SELECT srcip, SUM(bytes) AS rollup FROM ( \ + SELECT srcip, dstip, SUM(pkt_len) AS bytes FROM netflow_table {WINDOW} \ + GROUP BY srcip, dstip \ + ) sub GROUP BY srcip ORDER BY rollup DESC LIMIT 10" + ); + let qd = parse(&sql).expect("nested query should parse"); + assert!( + detect_sql_topk(&qd).is_some(), + "outer SELECT alone matches the top-k shape (this is why OnlyTemporal is gated)", + ); + } } #[cfg(test)] diff --git a/asap-query-engine/src/tests/sql_pattern_matching_tests.rs b/asap-query-engine/src/tests/sql_pattern_matching_tests.rs index 44db4fc2..602493f8 100644 --- a/asap-query-engine/src/tests/sql_pattern_matching_tests.rs +++ b/asap-query-engine/src/tests/sql_pattern_matching_tests.rs @@ -12,6 +12,7 @@ mod tests { use crate::engines::simple_engine::SimpleEngine; use crate::stores::simple_map_store::SimpleMapStore; use promql_utilities::data_model::KeyByLabelNames; + use promql_utilities::query_logics::enums::Statistic; use sql_utilities::sqlhelper::{SQLSchema, Table}; use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -176,6 +177,26 @@ mod tests { ); } + #[test] + fn nested_order_by_limit_is_not_topk() { + // Outer ORDER BY + LIMIT on a spatial-over-temporal query must not be routed + // through CountMinSketchWithHeap; the inner temporal SUM is not a flat top-k. + let template = "SELECT L1, SUM(result) FROM (SELECT SUM(value) AS result FROM cpu_usage WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() GROUP BY L1, L2, L3, L4) sub GROUP BY L1"; + let engine = build_sql_engine(template, 1, 10); + + let incoming = "SELECT L1, SUM(result) AS rollup FROM (SELECT SUM(value) AS result FROM cpu_usage WHERE time BETWEEN DATEADD(s, -10, '2025-10-01 00:00:10') AND '2025-10-01 00:00:10' GROUP BY L1, L2, L3, L4) sub GROUP BY L1 ORDER BY rollup DESC LIMIT 10"; + let query_time = 1727740810.0_f64; + + let context = engine + .build_query_execution_context_sql(incoming.to_string(), query_time) + .expect("nested spatial-of-temporal query should build a context"); + assert_ne!( + context.metadata.statistic_to_compute, + Statistic::Topk, + "top-k detection is OnlyTemporal-only; nested outer ORDER BY LIMIT must stay on the plain SUM path", + ); + } + #[test] fn test_no_match_returns_none() { // Engine has a SUM template; incoming uses AVG — should never match From f8e25aad9171fc448ae8895ad6a54a6d83a4db12 Mon Sep 17 00:00:00 2001 From: Akanksha Akkihal Date: Mon, 8 Jun 2026 18:41:36 +0000 Subject: [PATCH 3/5] Restore top-k detection for OnlySpatial queries and warn on SUM negatives --- .../src/engines/simple_engine/sql.rs | 29 +++++++++++++------ .../src/tests/sql_pattern_matching_tests.rs | 2 +- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/asap-query-engine/src/engines/simple_engine/sql.rs b/asap-query-engine/src/engines/simple_engine/sql.rs index 550f8982..50cf4eca 100644 --- a/asap-query-engine/src/engines/simple_engine/sql.rs +++ b/asap-query-engine/src/engines/simple_engine/sql.rs @@ -606,16 +606,27 @@ impl SimpleEngine { } }; - // Top-k (CountMinSketchWithHeap) is defined only for flat temporal queries: - // one window, one GROUP BY key, COUNT/SUM ... ORDER BY DESC LIMIT k. - // Nested patterns attach ORDER BY / LIMIT to the outer SELECT; `query_data` from - // parse is the outer layer, while the temporal aggregate lives in `inner_data` - // for OneTemporalOneSpatial. Running detect_sql_topk on the outer layer would - // mis-classify spatial rollups as top-k. + // Top-k (CountMinSketchWithHeap) applies to flat single-layer queries: + // COUNT/SUM ... GROUP BY ORDER BY DESC LIMIT k. + // Nested patterns attach ORDER BY / LIMIT to the outer SELECT; `query_data` + // from parse is the outer layer, while the temporal aggregate lives in + // `inner_data` for OneTemporalOneSpatial. Running detect_sql_topk on the + // outer layer would mis-classify spatial rollups as top-k. + // + // Single-interval windows (duration == scrape interval) classify as + // `OnlySpatial` in the pattern matcher even though they are flat temporal + // reads, so both `OnlyTemporal` and `OnlySpatial` must run detection. let topk = match query_pattern_type { - QueryPatternType::OnlyTemporal => detect_sql_topk(&query_data), - QueryPatternType::OnlySpatial | QueryPatternType::OneTemporalOneSpatial => None, + QueryPatternType::OnlyTemporal | QueryPatternType::OnlySpatial => { + detect_sql_topk(&query_data) + } + QueryPatternType::OneTemporalOneSpatial => None, }; + if topk.is_some_and(|t| t.weighting == TopkWeighting::Sum) { + warn!( + "SUM top-k assumes non-negative values; results are undefined for columns with negative entries" + ); + } let statistic_to_compute = if topk.is_some() { Statistic::Topk } else { @@ -980,7 +991,7 @@ mod detect_topk_tests { let qd = parse(&sql).expect("nested query should parse"); assert!( detect_sql_topk(&qd).is_some(), - "outer SELECT alone matches the top-k shape (this is why OnlyTemporal is gated)", + "outer SELECT alone matches the top-k shape (this is why OneTemporalOneSpatial is gated)", ); } } diff --git a/asap-query-engine/src/tests/sql_pattern_matching_tests.rs b/asap-query-engine/src/tests/sql_pattern_matching_tests.rs index 602493f8..9db601dc 100644 --- a/asap-query-engine/src/tests/sql_pattern_matching_tests.rs +++ b/asap-query-engine/src/tests/sql_pattern_matching_tests.rs @@ -193,7 +193,7 @@ mod tests { assert_ne!( context.metadata.statistic_to_compute, Statistic::Topk, - "top-k detection is OnlyTemporal-only; nested outer ORDER BY LIMIT must stay on the plain SUM path", + "top-k detection skips nested OneTemporalOneSpatial; outer ORDER BY LIMIT must stay on the plain SUM path", ); } From 16e68e15dbec8f9b48bb2dc0121594ad958453ab Mon Sep 17 00:00:00 2001 From: Akanksha Akkihal Date: Mon, 8 Jun 2026 19:27:30 +0000 Subject: [PATCH 4/5] add SQL top-k capability-matching fallback integration tests --- .../src/engines/simple_engine/sql.rs | 220 +++++++++++++++++- 1 file changed, 212 insertions(+), 8 deletions(-) diff --git a/asap-query-engine/src/engines/simple_engine/sql.rs b/asap-query-engine/src/engines/simple_engine/sql.rs index 50cf4eca..367d946b 100644 --- a/asap-query-engine/src/engines/simple_engine/sql.rs +++ b/asap-query-engine/src/engines/simple_engine/sql.rs @@ -1189,15 +1189,21 @@ mod sort_and_truncate_tests { } } -/// End-to-end tests for SQL top-k queries served by `CountMinSketchWithHeap`. +/// End-to-end SQL top-k pipeline tests for `CountMinSketchWithHeap`. /// -/// Exercises the full path for `SELECT srcip, COUNT(pkt_len) AS k FROM -/// netflow_table WHERE <1s window> GROUP BY srcip ORDER BY k DESC LIMIT n`: -/// * SQL detection promotes it to `Statistic::Topk`. -/// * The single `CountMinSketchWithHeap` aggregation resolves self-keyed -/// (key id == value id), so the sketch heap enumerates candidate `srcip`s. -/// * The pipeline sorts by count descending and truncates to `n`, without -/// PromQL-style metric-name prefixing (rows stay bare `(srcip, count)`). +/// Covers both resolution paths: +/// * **query_config** — self-keyed single-aggregation reference +/// * **capability matching** — heap + paired `DeltaSetAggregator`, no query_config +/// +/// Example query shape: +/// ```sql +/// SELECT srcip, COUNT(pkt_len) AS transfer_events +/// FROM netflow_table WHERE <1s window> GROUP BY srcip ORDER BY transfer_events DESC LIMIT n +/// ``` +/// SQL detection promotes it to `Statistic::Topk`. On the query_config path the +/// heap is self-keyed; on the capability path a separate key aggregation is paired. +/// The pipeline sorts by value descending and truncates to `n`, without PromQL-style +/// metric-name prefixing (rows stay bare `(srcip, count)`). /// /// Lives here alongside `detect_topk_tests` / `sort_and_truncate_tests` so all /// SQL top-k coverage is co-located in the SQL handler. Unlike those pure-fn @@ -1387,6 +1393,104 @@ mod topk_pipeline_tests { ) } + const HEAP_COUNT_ID: u64 = 111; + const HEAP_SUM_ID: u64 = 112; + const HEAP_DEFAULT_ID: u64 = 113; + const KEY_AGG_ID: u64 = 211; + + fn netflow_sql_schema() -> SQLSchema { + let value_cols: HashSet = ["pkt_len"].iter().map(|s| s.to_string()).collect(); + let labels: HashSet = ["srcip", "dstip", "proto"] + .iter() + .map(|s| s.to_string()) + .collect(); + let table = Table::new(METRIC.to_string(), "time".to_string(), value_cols, labels); + SQLSchema::new(vec![table]) + } + + /// `CountMinSketchWithHeap` for capability-matching tests. When `count_events` + /// is `None`, the parameter is omitted so the config relies on the default + /// (`count_events: true`). + fn make_heap_agg(id: u64, count_events: Option) -> AggregationConfig { + let mut parameters = HashMap::new(); + if let Some(count_events) = count_events { + parameters.insert("count_events".to_string(), serde_json::json!(count_events)); + } + AggregationConfig { + aggregation_id: id, + aggregation_type: AggregationType::CountMinSketchWithHeap, + aggregation_sub_type: String::new(), + parameters, + grouping_labels: KeyByLabelNames::empty(), + aggregated_labels: KeyByLabelNames::new(vec!["srcip".to_string()]), + rollup_labels: KeyByLabelNames::empty(), + original_yaml: String::new(), + window_size: 1, + slide_interval: 1, + window_type: WindowType::Tumbling, + spatial_filter: String::new(), + spatial_filter_normalized: String::new(), + metric: METRIC.to_string(), + num_aggregates_to_retain: None, + read_count_threshold: None, + table_name: None, + value_column: None, + } + } + + fn make_delta_set_key_agg(id: u64) -> AggregationConfig { + AggregationConfig { + aggregation_id: id, + aggregation_type: AggregationType::DeltaSetAggregator, + aggregation_sub_type: String::new(), + parameters: HashMap::new(), + grouping_labels: KeyByLabelNames::empty(), + aggregated_labels: KeyByLabelNames::empty(), + rollup_labels: KeyByLabelNames::empty(), + original_yaml: String::new(), + window_size: 1, + slide_interval: 1, + window_type: WindowType::Tumbling, + spatial_filter: String::new(), + spatial_filter_normalized: String::new(), + metric: METRIC.to_string(), + num_aggregates_to_retain: None, + read_count_threshold: None, + table_name: None, + value_column: None, + } + } + + /// Engine with **no** query_configs so top-k resolves via capability matching. + /// Always provisions a paired `DeltaSetAggregator` key aggregation. + fn build_capability_fallback_engine(heap_configs: Vec) -> SimpleEngine { + let mut agg_configs = HashMap::new(); + for heap in &heap_configs { + agg_configs.insert(heap.aggregation_id, heap.clone()); + } + agg_configs.insert(KEY_AGG_ID, make_delta_set_key_agg(KEY_AGG_ID)); + + let streaming_config = Arc::new(StreamingConfig { + aggregation_configs: agg_configs, + }); + let store = Arc::new(SimpleMapStore::new( + streaming_config.clone(), + CleanupPolicy::NoCleanup, + )); + let inference_config = InferenceConfig { + schema: SchemaConfig::SQL(netflow_sql_schema()), + query_configs: vec![], + cleanup_policy: CleanupPolicy::NoCleanup, + }; + SimpleEngine::new( + store, + inference_config, + streaming_config, + 1, + QueryLanguage::sql, + ) + } + #[test] fn sum_topk_resolves_self_keyed_heap() { // SUM(col) ORDER BY DESC LIMIT k is a top-k query and, like COUNT, @@ -1502,4 +1606,104 @@ mod topk_pipeline_tests { let expected: HashSet = (6..=15u64).map(|i| format!("10.0.0.{i}")).collect(); assert_eq!(returned, expected); } + + #[test] + fn count_topk_capability_fallback_pairs_heap_with_key_agg() { + let engine = build_capability_fallback_engine(vec![make_heap_agg( + HEAP_COUNT_ID, + Some(true), + )]); + let context = engine + .build_query_execution_context_sql(topk_query(10), QUERY_TIME) + .expect("COUNT top-k should resolve via capability matching"); + + assert_eq!(context.metadata.statistic_to_compute, Statistic::Topk); + assert_eq!( + context.agg_info.aggregation_id_for_value, + HEAP_COUNT_ID, + "count-weighted heap must be the value aggregation", + ); + assert_eq!( + context.agg_info.aggregation_id_for_key, KEY_AGG_ID, + "multi-population top-k must pair heap with DeltaSetAggregator", + ); + assert_ne!( + context.agg_info.aggregation_id_for_key, + context.agg_info.aggregation_id_for_value, + ); + assert!( + context.store_plan.keys_query.is_some(), + "capability fallback plans a separate keys query", + ); + } + + #[test] + fn count_topk_capability_fallback_picks_count_weighted_when_both_heaps_exist() { + let engine = build_capability_fallback_engine(vec![ + make_heap_agg(HEAP_COUNT_ID, Some(true)), + make_heap_agg(HEAP_SUM_ID, Some(false)), + ]); + let context = engine + .build_query_execution_context_sql(topk_query(10), QUERY_TIME) + .expect("COUNT top-k should pick the count_events: true sketch"); + + assert_eq!( + context.agg_info.aggregation_id_for_value, + HEAP_COUNT_ID, + "COUNT top-k must not pick the sum-weighted sketch when both exist", + ); + } + + #[test] + fn count_topk_capability_fallback_defaults_count_events_true() { + // Heap omits `count_events`; matcher treats that as count semantics. + let engine = build_capability_fallback_engine(vec![make_heap_agg( + HEAP_DEFAULT_ID, + None, + )]); + let context = engine + .build_query_execution_context_sql(topk_query(10), QUERY_TIME) + .expect("COUNT top-k should match a sketch with default count_events"); + + assert_eq!( + context.agg_info.aggregation_id_for_value, + HEAP_DEFAULT_ID, + "default (no flag) heap must serve COUNT top-k", + ); + } + + #[test] + fn sum_topk_capability_fallback_picks_value_weighted_heap() { + let engine = build_capability_fallback_engine(vec![ + make_heap_agg(HEAP_COUNT_ID, Some(true)), + make_heap_agg(HEAP_SUM_ID, Some(false)), + ]); + let context = engine + .build_query_execution_context_sql(sum_topk_query(5), QUERY_TIME) + .expect("SUM top-k should resolve via capability matching"); + + assert_eq!(context.metadata.statistic_to_compute, Statistic::Topk); + assert_eq!( + context.agg_info.aggregation_id_for_value, + HEAP_SUM_ID, + "SUM top-k must pick the count_events: false sketch", + ); + assert_eq!(context.agg_info.aggregation_id_for_key, KEY_AGG_ID); + assert!(context.store_plan.keys_query.is_some()); + } + + #[test] + fn sum_topk_capability_fallback_rejects_count_only_default_heap() { + // Only a default (count-weighted) sketch exists; SUM top-k cannot be served. + let engine = build_capability_fallback_engine(vec![make_heap_agg( + HEAP_DEFAULT_ID, + None, + )]); + assert!( + engine + .build_query_execution_context_sql(sum_topk_query(5), QUERY_TIME) + .is_none(), + "SUM top-k must not fall back to a count_events-default sketch", + ); + } } From 9ba4fc5207281232344a74ea2cbec5da9969aa87 Mon Sep 17 00:00:00 2001 From: Akanksha Akkihal Date: Mon, 8 Jun 2026 19:30:50 +0000 Subject: [PATCH 5/5] formatting --- .../src/engines/simple_engine/sql.rs | 28 ++++++------------- 1 file changed, 8 insertions(+), 20 deletions(-) diff --git a/asap-query-engine/src/engines/simple_engine/sql.rs b/asap-query-engine/src/engines/simple_engine/sql.rs index 367d946b..281832b5 100644 --- a/asap-query-engine/src/engines/simple_engine/sql.rs +++ b/asap-query-engine/src/engines/simple_engine/sql.rs @@ -1609,18 +1609,15 @@ mod topk_pipeline_tests { #[test] fn count_topk_capability_fallback_pairs_heap_with_key_agg() { - let engine = build_capability_fallback_engine(vec![make_heap_agg( - HEAP_COUNT_ID, - Some(true), - )]); + let engine = + build_capability_fallback_engine(vec![make_heap_agg(HEAP_COUNT_ID, Some(true))]); let context = engine .build_query_execution_context_sql(topk_query(10), QUERY_TIME) .expect("COUNT top-k should resolve via capability matching"); assert_eq!(context.metadata.statistic_to_compute, Statistic::Topk); assert_eq!( - context.agg_info.aggregation_id_for_value, - HEAP_COUNT_ID, + context.agg_info.aggregation_id_for_value, HEAP_COUNT_ID, "count-weighted heap must be the value aggregation", ); assert_eq!( @@ -1648,8 +1645,7 @@ mod topk_pipeline_tests { .expect("COUNT top-k should pick the count_events: true sketch"); assert_eq!( - context.agg_info.aggregation_id_for_value, - HEAP_COUNT_ID, + context.agg_info.aggregation_id_for_value, HEAP_COUNT_ID, "COUNT top-k must not pick the sum-weighted sketch when both exist", ); } @@ -1657,17 +1653,13 @@ mod topk_pipeline_tests { #[test] fn count_topk_capability_fallback_defaults_count_events_true() { // Heap omits `count_events`; matcher treats that as count semantics. - let engine = build_capability_fallback_engine(vec![make_heap_agg( - HEAP_DEFAULT_ID, - None, - )]); + let engine = build_capability_fallback_engine(vec![make_heap_agg(HEAP_DEFAULT_ID, None)]); let context = engine .build_query_execution_context_sql(topk_query(10), QUERY_TIME) .expect("COUNT top-k should match a sketch with default count_events"); assert_eq!( - context.agg_info.aggregation_id_for_value, - HEAP_DEFAULT_ID, + context.agg_info.aggregation_id_for_value, HEAP_DEFAULT_ID, "default (no flag) heap must serve COUNT top-k", ); } @@ -1684,8 +1676,7 @@ mod topk_pipeline_tests { assert_eq!(context.metadata.statistic_to_compute, Statistic::Topk); assert_eq!( - context.agg_info.aggregation_id_for_value, - HEAP_SUM_ID, + context.agg_info.aggregation_id_for_value, HEAP_SUM_ID, "SUM top-k must pick the count_events: false sketch", ); assert_eq!(context.agg_info.aggregation_id_for_key, KEY_AGG_ID); @@ -1695,10 +1686,7 @@ mod topk_pipeline_tests { #[test] fn sum_topk_capability_fallback_rejects_count_only_default_heap() { // Only a default (count-weighted) sketch exists; SUM top-k cannot be served. - let engine = build_capability_fallback_engine(vec![make_heap_agg( - HEAP_DEFAULT_ID, - None, - )]); + let engine = build_capability_fallback_engine(vec![make_heap_agg(HEAP_DEFAULT_ID, None)]); assert!( engine .build_query_execution_context_sql(sum_topk_query(5), QUERY_TIME)