Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ pub mod sqlparser_test;
pub mod sqlpattern_matcher;
pub mod sqlpattern_parser;

pub use sqlhelper::{SQLSchema, Table};
pub use sqlhelper::{detect_sql_topk, SQLSchema, SqlTopk, Table, TopkWeighting};
pub use sqlpattern_matcher::*;
pub use sqlpattern_parser::*;
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,92 @@ impl SQLQueryData {
}
}
}

/// How a top-k query weights each observation fed into the heavy-hitter sketch.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TopkWeighting {
/// `COUNT(col)`: every matching row contributes weight 1, so the heap ranks
/// keys by event frequency (`count_events: true`).
Count,
/// `SUM(col)`: every matching row contributes weight = `col`, so the heap
/// ranks keys by summed value (`count_events: false`).
///
/// Assumes **non-negative** summands: `CountMinSketch` is a frequency sketch
/// and cannot represent negative weights, so a `SUM` over a column that can
/// go negative would produce meaningless estimates.
Sum,
}

/// A detected SQL top-k query: the `LIMIT k` plus how the sketch is weighted.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SqlTopk {
pub k: u64,
pub weighting: TopkWeighting,
}

impl SqlTopk {
/// `count_events` flag the backing `CountMinSketchWithHeap` must use:
/// `true` for COUNT (unit weight), `false` for SUM (value weight).
pub fn count_events(&self) -> bool {
matches!(self.weighting, TopkWeighting::Count)
}
}

/// Detect a SQL top-k query and return its `k` plus sketch weighting.
///
/// Recognises the heavy-hitter shape that `CountMinSketchWithHeap` serves:
///
/// ```sql
/// SELECT <key>, COUNT(<col>) AS <alias> -- or SUM(<col>)
/// FROM <table> WHERE <1s window>
/// GROUP BY <key>
/// ORDER BY <alias> DESC
/// LIMIT k
/// ```
///
/// The grouping key (`<key>`) becomes the *aggregated* dimension inside the
/// sketch's heap — not a precompute partition key — so a single sketch per
/// window tracks the top keys by event count (COUNT) or summed value (SUM).
///
/// The SQL parser only accepts identifier ORDER BY targets, so the descending
/// order must reference the aggregate's alias (e.g. `transfer_events`), not the
/// `COUNT(col)` / `SUM(col)` expression itself.
///
/// This detection inspects a single SELECT layer only. For nested queries the
/// ORDER BY / LIMIT sit on the outer SELECT, which on its own matches this
/// shape; callers that must exclude nested patterns (e.g. spatial-over-temporal)
/// are responsible for gating before calling this (the query engine gates on
/// query pattern type; the planner rejects nested queries up front).
pub fn detect_sql_topk(query_data: &SQLQueryData) -> Option<SqlTopk> {
let k = query_data.limit?;
// LIMIT 0 is an empty-result query, not a top-k heavy-hitter request.
if k == 0 {
return None;
}
// Need a GROUP BY key to rank and an ORDER BY to define "top".
if query_data.labels.is_empty() || query_data.order_by.is_empty() {
return None;
}
// CountMinSketchWithHeap tracks heavy hitters by COUNT (unit weight) or
// SUM (value weight). Any other aggregate (MIN/MAX/quantile/...) cannot be
// served by the additive frequency sketch.
let name = query_data.aggregation_info.get_name();
let weighting = if name.eq_ignore_ascii_case("count") {
TopkWeighting::Count
} else if name.eq_ignore_ascii_case("sum") {
TopkWeighting::Sum
} else {
return None;
};
// Primary ordering must be the aggregate alias, descending (largest first).
let primary = &query_data.order_by[0];
if primary.ascending {
return None;
}
// ORDER BY may differ only by identifier case for unquoted aliases.
let alias = query_data.aggregation_alias.as_deref()?;
if !alias.eq_ignore_ascii_case(primary.column.as_str()) {
return None;
}
Some(SqlTopk { k, weighting })
}
1 change: 1 addition & 0 deletions asap-planner-rs/src/planner/elastic_dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ impl ElasticSingleQueryProcessor {
agg_type,
agg_sub_type,
None,
None,
self.sketch_parameters.as_ref(),
)
},
Expand Down
16 changes: 13 additions & 3 deletions asap-planner-rs/src/planner/sketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@ const DEFAULT_HLL_PRECISION: u64 = 14;

/// Shared sketch parameter builder used by both PromQL and SQL paths.
///
/// `topk_k` is only required for `CountMinSketchWithHeap`: PromQL supplies it
/// from the `topk(k, …)` query argument; SQL passes `None` (SQL never produces
/// this operator today, so the `None` branch is unreachable in practice).
/// `topk_k` is required for `CountMinSketchWithHeap`. PromQL supplies it from
/// the `topk(k, …)` query argument; SQL supplies it from `LIMIT k`.
///
/// `topk_count_events` disambiguates COUNT vs SUM SQL top-k (`true` / `false`).
/// PromQL passes `None` and omits the parameter (defaults to count semantics).
pub fn build_sketch_parameters(
aggregation_type: AggregationType,
aggregation_sub_type: &str,
topk_k: Option<u64>,
topk_count_events: Option<bool>,
sketch_params: Option<&SketchParameterOverrides>,
) -> Result<HashMap<String, serde_json::Value>, String> {
match aggregation_type {
Expand Down Expand Up @@ -77,6 +80,12 @@ pub fn build_sketch_parameters(
"heapsize".to_string(),
serde_json::Value::Number((k * heap_mult).into()),
);
if let Some(count_events) = topk_count_events {
m.insert(
"count_events".to_string(),
serde_json::Value::Bool(count_events),
);
}
Ok(m)
}

Expand Down Expand Up @@ -158,6 +167,7 @@ pub fn build_sketch_parameters_from_promql(
aggregation_type,
aggregation_sub_type,
topk_k,
None,
sketch_params,
)
}
30 changes: 26 additions & 4 deletions asap-planner-rs/src/planner/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::HashSet;
use asap_types::enums::{CleanupPolicy, WindowType};
use promql_utilities::data_model::KeyByLabelNames;
use promql_utilities::query_logics::enums::{AggregationType, QueryTreatmentType, Statistic};
use sql_utilities::ast_matching::sqlhelper::Table;
use sql_utilities::ast_matching::sqlhelper::{detect_sql_topk, Table};
use sql_utilities::ast_matching::sqlpattern_matcher::{QueryType, SQLPatternMatcher};
use sql_utilities::ast_matching::sqlpattern_parser::SQLPatternParser;
use sql_utilities::ast_matching::SQLSchema;
Expand Down Expand Up @@ -105,16 +105,26 @@ impl SQLSingleQueryProcessor {

// Label routing
let spatial_output = KeyByLabelNames::new(labels.iter().cloned().collect::<Vec<_>>());
// Top-k needs ORDER BY / LIMIT from the parser; SQLPatternMatcher drops them
// when building `sql_query.query_data[0]`, so use `qdata` not query_data[0].
let sql_topk = detect_sql_topk(&qdata);
let treatment_type = get_sql_treatment_type(agg_info.get_name());
let statistics = get_sql_statistics(agg_info.get_name())?;
let statistics = if sql_topk.is_some() {
vec![Statistic::Topk]
} else {
get_sql_statistics(agg_info.get_name())?
};
let rollup = if statistics.contains(&Statistic::Cardinality) {
// Distinct target is value_column, not a rollup label dimension.
KeyByLabelNames::empty()
} else {
all_metadata.difference(&spatial_output)
};

let configs = build_agg_configs_for_statistics(
let topk_k = sql_topk.map(|t| t.k);
let topk_count_events = sql_topk.map(|t| t.count_events());

let mut configs = build_agg_configs_for_statistics(
&statistics,
treatment_type,
&spatial_output,
Expand All @@ -128,13 +138,25 @@ impl SQLSingleQueryProcessor {
build_sketch_parameters(
agg_type,
agg_sub_type,
None,
topk_k,
topk_count_events,
self.sketch_parameters.as_ref(),
)
},
)
.map_err(ControllerError::SqlParse)?;

if sql_topk.is_some() {
for cfg in &mut configs {
if cfg.aggregation_type == AggregationType::CountMinSketchWithHeap {
// Heap-only self-keyed layout: the GROUP BY column is tracked
// inside the sketch's aggregated dimension, not as a partition key.
cfg.grouping_labels = KeyByLabelNames::empty();
cfg.aggregated_labels = spatial_output.clone();
}
}
}

let t_lookback = match query_type {
QueryType::Spatial => self.data_ingestion_interval,
_ => sql_query.query_data[0].time_info.get_duration() as u64,
Expand Down
17 changes: 15 additions & 2 deletions asap-planner-rs/src/planner_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use asap_types::streaming_config::StreamingConfig;

use crate::generator::{
GeneratorOutput, PuntedQuery, KEY_AGGREGATIONS, KEY_AGG_SUB_TYPE, KEY_AGG_TYPE, KEY_LABELS,
KEY_NUM_AGG_TO_RETAIN, KEY_QUERIES, KEY_QUERY, KEY_READ_COUNT_THRESHOLD, KEY_TABLE_NAME,
KEY_VALUE_COLUMN, KEY_WINDOW_SIZE,
KEY_NUM_AGG_TO_RETAIN, KEY_PARAMETERS, KEY_QUERIES, KEY_QUERY, KEY_READ_COUNT_THRESHOLD,
KEY_TABLE_NAME, KEY_VALUE_COLUMN, KEY_WINDOW_SIZE,
};

/// Output of the planning process — contains the two YAML configs
Expand Down Expand Up @@ -227,4 +227,17 @@ impl PlannerOutput {
})
.unwrap_or(false)
}

/// Returns a sketch parameter from the first aggregation matching `agg_type`.
pub fn aggregation_parameter(&self, agg_type: &str, key: &str) -> Option<YamlValue> {
self.find_aggregation_by_type(agg_type)
.and_then(|m| m.get(KEY_PARAMETERS))
.and_then(|v| {
if let YamlValue::Mapping(params) = v {
params.get(key).cloned()
} else {
None
}
})
}
}
135 changes: 135 additions & 0 deletions asap-planner-rs/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -834,3 +834,138 @@ fn t_not_multiple_of_data_ingestion_interval_returns_planner_error() {
.generate();
assert!(matches!(result, Err(ControllerError::PlannerError(_))));
}

// ── SQL top-k (CountMinSketchWithHeap) ────────────────────────────────────────

/// COUNT … ORDER BY <alias> DESC LIMIT k → single heap-only sketch.
#[test]
fn spatial_count_topk_heap() {
let q = "SELECT srcip, COUNT(pkt_len) AS transfer_events FROM netflow_table \
WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) \
GROUP BY srcip ORDER BY transfer_events DESC LIMIT 10";
let out = SQLController::from_yaml(&netflow_one_query_config(q, 1), sql_opts_1s_ingest())
.unwrap()
.generate()
.unwrap();

assert_eq!(out.streaming_aggregation_count(), 1);
assert_eq!(out.inference_query_count(), 1);
assert!(out.has_aggregation_type("CountMinSketchWithHeap"));
assert!(out.has_aggregation_type_and_sub_type("CountMinSketchWithHeap", "topk"));
assert!(!out.has_aggregation_type("DeltaSetAggregator"));
assert!(!out.has_aggregation_type("CountMinSketch"));
assert!(out.all_tumbling_window_sizes_eq(1));
assert_eq!(
out.aggregation_labels("CountMinSketchWithHeap", "grouping"),
Vec::<String>::new()
);
assert_eq!(
out.aggregation_labels("CountMinSketchWithHeap", "aggregated"),
vec!["srcip".to_string()]
);
let mut rollup = out.aggregation_labels("CountMinSketchWithHeap", "rollup");
rollup.sort();
assert_eq!(rollup, vec!["dstip".to_string(), "proto".to_string()]);
assert_eq!(
out.aggregation_parameter("CountMinSketchWithHeap", "heapsize")
.and_then(|v| v.as_u64()),
Some(40)
);
assert_eq!(
out.aggregation_parameter("CountMinSketchWithHeap", "count_events")
.and_then(|v| v.as_bool()),
Some(true)
);
}

/// SUM … ORDER BY <alias> DESC LIMIT k → value-weighted heap sketch.
#[test]
fn spatial_sum_topk_heap() {
let q = "SELECT srcip, SUM(pkt_len) AS total_bytes FROM netflow_table \
WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) \
GROUP BY srcip ORDER BY total_bytes DESC LIMIT 10";
let out = SQLController::from_yaml(&netflow_one_query_config(q, 1), sql_opts_1s_ingest())
.unwrap()
.generate()
.unwrap();

assert_eq!(out.streaming_aggregation_count(), 1);
assert_eq!(out.inference_query_count(), 1);
assert!(out.has_aggregation_type("CountMinSketchWithHeap"));
assert!(out.has_aggregation_type_and_sub_type("CountMinSketchWithHeap", "topk"));
assert!(!out.has_aggregation_type("DeltaSetAggregator"));
assert!(!out.has_aggregation_type("CountMinSketch"));
assert!(out.all_tumbling_window_sizes_eq(1));
assert_eq!(
out.aggregation_labels("CountMinSketchWithHeap", "grouping"),
Vec::<String>::new()
);
assert_eq!(
out.aggregation_labels("CountMinSketchWithHeap", "aggregated"),
vec!["srcip".to_string()]
);
let mut rollup = out.aggregation_labels("CountMinSketchWithHeap", "rollup");
rollup.sort();
assert_eq!(rollup, vec!["dstip".to_string(), "proto".to_string()]);
assert_eq!(
out.aggregation_parameter("CountMinSketchWithHeap", "heapsize")
.and_then(|v| v.as_u64()),
Some(40)
);
assert_eq!(
out.aggregation_parameter("CountMinSketchWithHeap", "count_events")
.and_then(|v| v.as_bool()),
Some(false)
);
}

/// Plain COUNT without ORDER BY / LIMIT stays on the CMS + DeltaSet path.
#[test]
fn spatial_count_without_order_by_is_not_topk() {
let q = "SELECT srcip, COUNT(pkt_len) AS transfer_events FROM netflow_table \
WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) \
GROUP BY srcip";
let out = SQLController::from_yaml(&netflow_one_query_config(q, 1), sql_opts_1s_ingest())
.unwrap()
.generate()
.unwrap();

assert_eq!(out.streaming_aggregation_count(), 2);
assert!(out.has_aggregation_type("CountMinSketch"));
assert!(out.has_aggregation_type("DeltaSetAggregator"));
assert!(!out.has_aggregation_type("CountMinSketchWithHeap"));
}

/// ORDER BY aggregate alias ASC (bottom-k) stays on the CMS + DeltaSet path.
#[test]
fn spatial_count_order_by_asc_limit_is_not_topk() {
let q = "SELECT srcip, COUNT(pkt_len) AS transfer_events FROM netflow_table \
WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) \
GROUP BY srcip ORDER BY transfer_events ASC LIMIT 10";
let out = SQLController::from_yaml(&netflow_one_query_config(q, 1), sql_opts_1s_ingest())
.unwrap()
.generate()
.unwrap();

assert_eq!(out.streaming_aggregation_count(), 2);
assert!(out.has_aggregation_type("CountMinSketch"));
assert!(out.has_aggregation_type("DeltaSetAggregator"));
assert!(!out.has_aggregation_type("CountMinSketchWithHeap"));
}

/// LIMIT 0 is treated as non-top-k and uses the normal CMS + DeltaSet path.
#[test]
fn spatial_count_limit_zero_is_not_topk() {
let q = "SELECT srcip, COUNT(pkt_len) AS transfer_events FROM netflow_table \
WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) \
GROUP BY srcip ORDER BY transfer_events DESC LIMIT 0";
let out = SQLController::from_yaml(&netflow_one_query_config(q, 1), sql_opts_1s_ingest())
.unwrap()
.generate()
.unwrap();

assert_eq!(out.streaming_aggregation_count(), 2);
assert!(out.has_aggregation_type("CountMinSketch"));
assert!(out.has_aggregation_type("DeltaSetAggregator"));
assert!(!out.has_aggregation_type("CountMinSketchWithHeap"));
}
Loading
Loading