From e6d99fc385a2488c4b238e75d4505c743e7ca7b8 Mon Sep 17 00:00:00 2001 From: Akanksha Akkihal Date: Fri, 22 May 2026 22:28:06 +0000 Subject: [PATCH 1/3] feat(sql-utilities): support multiple SELECT projections with one aggregate --- .../src/ast_matching/sqlparser_test.rs | 89 ++++++++++ .../src/ast_matching/sqlpattern_parser.rs | 156 ++++++++++-------- 2 files changed, 173 insertions(+), 72 deletions(-) diff --git a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs index a6f593a1..cc8cdef8 100644 --- a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs +++ b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs @@ -802,4 +802,93 @@ mod tests { Some(QueryError::SpatialDurationSmall), ); } + + // ── Multi-projection SELECT (group cols + aggregate) ───────────────────── + // + // ClickHouse and standard SQL allow `SELECT g1, g2, agg(v) FROM t GROUP BY g1, g2` + // (one row per group with the grouping keys included alongside the aggregate). + // The pattern parser must also accept this shape and produce the same structural + // SQLQueryData as the single-projection form `SELECT agg(v) FROM t GROUP BY g1, g2`. + + #[test] + fn test_multi_projection_groupcols_then_aggregate() { + let query = parse_sql_query( + "SELECT L1, L2, L3, L4, SUM(value) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1, L2, L3, L4", + ) + .expect("multi-projection SELECT with group cols + aggregate should parse"); + assert_eq!(query.metric, "cpu_usage"); + assert_eq!(query.aggregation_info.get_name(), "SUM"); + assert!(query.labels.contains("L1")); + assert!(query.labels.contains("L4")); + } + + #[test] + fn test_multi_projection_aggregate_first() { + let query = parse_sql_query( + "SELECT SUM(value), L1, L2, L3, L4 FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1, L2, L3, L4", + ) + .expect("aggregate-first multi-projection SELECT should parse"); + assert_eq!(query.aggregation_info.get_name(), "SUM"); + } + + #[test] + fn test_multi_projection_quantile_clickhouse_syntax() { + // The exact shape of the user's netflow query: ClickHouse parametric quantile + // with grouping columns alongside the aggregate in SELECT. + let query = parse_sql_query( + "SELECT L1, L2, quantile(0.99)(value) AS p99 FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) \ + GROUP BY L1, L2", + ) + .expect("multi-projection ClickHouse parametric quantile should parse"); + assert_eq!(query.aggregation_info.get_name(), "QUANTILE"); + assert_eq!(query.aggregation_info.get_args()[0], "0.99"); + } + + #[test] + fn test_multi_projection_matches_single_projection_template() { + // A template registered as single-projection should structurally match an + // incoming query that lists the group cols in SELECT alongside the aggregate. + let template = parse_sql_query( + "SELECT SUM(value) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1, L2, L3, L4", + ) + .expect("single-projection template should parse"); + let incoming = parse_sql_query( + "SELECT L1, L2, L3, L4, SUM(value) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, '2025-10-01 00:00:10') AND '2025-10-01 00:00:10' \ + GROUP BY L1, L2, L3, L4", + ) + .expect("multi-projection incoming should parse"); + assert!(incoming.matches_sql_pattern(&template)); + } + + #[test] + fn test_multi_projection_rejects_two_aggregates() { + // Two aggregate functions in the projection list — the parser only tracks one + // statistic so this must be rejected to avoid silently dropping one. + assert!(parse_sql_query( + "SELECT SUM(value), AVG(value), L1 FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1", + ) + .is_none()); + } + + #[test] + fn test_multi_projection_rejects_arbitrary_expr() { + // Non-identifier, non-function projection items (computed expressions, literals, …) + // are not supported by the pattern model and must be rejected. + assert!(parse_sql_query( + "SELECT (L1 + 1), SUM(value) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1", + ) + .is_none()); + } } diff --git a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs index 4b653ec2..cfadc79a 100644 --- a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs +++ b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs @@ -211,88 +211,100 @@ impl SQLPatternParser { } fn get_aggregation(&self, select: &Select) -> Option { - if select.projection.len() != 1 { - return None; + // Find the (single) aggregate function in the projection list. Other + // projection items must be plain column references — these are expected to + // be group-by keys (e.g. `SELECT g1, g2, SUM(v) FROM t GROUP BY g1, g2`). + // Anything else (multiple aggregates, computed expressions, literals, *) + // is rejected since the structural pattern model only tracks one statistic. + let mut agg_func: Option<&Function> = None; + for item in &select.projection { + let expr = match item { + SelectItem::UnnamedExpr(expr) => expr, + SelectItem::ExprWithAlias { expr, .. } => expr, + _ => return None, + }; + match expr { + Expr::Function(f) => { + if agg_func.is_some() { + return None; + } + agg_func = Some(f); + } + Expr::Identifier(_) | Expr::CompoundIdentifier(_) => {} + _ => return None, + } } + let func = agg_func?; - match &select.projection[0] { - SelectItem::UnnamedExpr(Expr::Function(func)) - | SelectItem::ExprWithAlias { - expr: Expr::Function(func), - .. - } => { - let name = func.name.to_string().to_uppercase(); - - let args = self.get_quantile_args(func); - - // Get the column being aggregated - let col = match &func.args { - FunctionArguments::None => return None, - FunctionArguments::Subquery(_) => return None, - FunctionArguments::List(func_args) => { - if name == "QUANTILE" { - if let FunctionArguments::List(params) = &func.parameters { - if !params.args.is_empty() { - // ClickHouse parametric syntax: quantile(0.95)(column) - // Column is the sole argument in func.args. - match func_args.args.first() { - Some(FunctionArg::Unnamed(FunctionArgExpr::Expr( - Expr::Identifier(ident), - ))) => ident.value.clone(), - _ => return None, - } - } else { - return None; - } - } else { - // ASAP syntax: QUANTILE(0.95, value) - column is second argument - if func_args.args.len() < 2 { - return None; - } - match &func_args.args[1] { - FunctionArg::Unnamed(FunctionArgExpr::Expr( - Expr::Identifier(ident), - )) => ident.value.clone(), - _ => return None, - } - } - } else if name == "PERCENTILE" { - // PERCENTILE(value, 95) - column is first argument - if func_args.args.is_empty() { - return None; - } - match &func_args.args[0] { - FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Identifier( - ident, + let name = func.name.to_string().to_uppercase(); + + let args = self.get_quantile_args(func); + + // Get the column being aggregated + let col = match &func.args { + FunctionArguments::None => return None, + FunctionArguments::Subquery(_) => return None, + FunctionArguments::List(func_args) => { + if name == "QUANTILE" { + if let FunctionArguments::List(params) = &func.parameters { + if !params.args.is_empty() { + // ClickHouse parametric syntax: quantile(0.95)(column) + // Column is the sole argument in func.args. + match func_args.args.first() { + Some(FunctionArg::Unnamed(FunctionArgExpr::Expr( + Expr::Identifier(ident), ))) => ident.value.clone(), _ => return None, } } else { - // For other aggregations - column is first argument - if func_args.args.is_empty() { - return None; - } - match &func_args.args[0] { - FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Identifier( - ident, - ))) => ident.value.clone(), - _ => return None, - } + return None; + } + } else { + // ASAP syntax: QUANTILE(0.95, value) - column is second argument + if func_args.args.len() < 2 { + return None; + } + match &func_args.args[1] { + FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Identifier( + ident, + ))) => ident.value.clone(), + _ => return None, } } - }; - - // Always store PERCENTILE as QUANTILE internally - let normalized_name = if name == "PERCENTILE" { - "QUANTILE".to_string() + } else if name == "PERCENTILE" { + // PERCENTILE(value, 95) - column is first argument + if func_args.args.is_empty() { + return None; + } + match &func_args.args[0] { + FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Identifier(ident))) => { + ident.value.clone() + } + _ => return None, + } } else { - name - }; - - Some(AggregationInfo::new(normalized_name, col, args)) + // For other aggregations - column is first argument + if func_args.args.is_empty() { + return None; + } + match &func_args.args[0] { + FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Identifier(ident))) => { + ident.value.clone() + } + _ => return None, + } + } } - _ => None, - } + }; + + // Always store PERCENTILE as QUANTILE internally + let normalized_name = if name == "PERCENTILE" { + "QUANTILE".to_string() + } else { + name + }; + + Some(AggregationInfo::new(normalized_name, col, args)) } fn get_metric(&self, select: &Select) -> Option<(String, bool)> { From 59f77146c13377e443ed993201d32b5c5ba1a5f1 Mon Sep 17 00:00:00 2001 From: Akanksha Akkihal Date: Fri, 22 May 2026 22:28:06 +0000 Subject: [PATCH 2/3] fix(sql-utilities): reject SELECT identifiers absent from GROUP BY --- .../src/ast_matching/sqlparser_test.rs | 28 +++++++++++++++++++ .../src/ast_matching/sqlpattern_parser.rs | 28 +++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs index cc8cdef8..5843de6c 100644 --- a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs +++ b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs @@ -891,4 +891,32 @@ mod tests { ) .is_none()); } + + #[test] + fn test_multi_projection_rejects_select_col_not_in_groupby() { + // L2 is in SELECT but not in GROUP BY. Standard SQL rejects this; we must too, + // otherwise the column would be silently dropped from the output. + assert!(parse_sql_query( + "SELECT L2, SUM(value) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1", + ) + .is_none()); + } + + #[test] + fn test_multi_projection_accepts_select_subset_of_groupby() { + // SELECT lists a subset of group-by keys (L1) while the GROUP BY uses two + // (L1, L2). Allowed: every SELECT identifier is in GROUP BY; the remaining + // group-by key is just absent from the projection. + let query = parse_sql_query( + "SELECT L1, SUM(value) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1, L2", + ) + .expect("SELECT subset of GROUP BY should parse"); + assert!(query.labels.contains("L1")); + assert!(query.labels.contains("L2")); + assert_eq!(query.aggregation_info.get_name(), "SUM"); + } } diff --git a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs index cfadc79a..3453b700 100644 --- a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs +++ b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs @@ -92,6 +92,10 @@ impl SQLPatternParser { let group_bys = self.get_groupbys(select)?; + if !self.select_identifiers_subset_of(select, &group_bys) { + return None; + } + if !has_subquery { let time_info = self.get_time_info(select, &metric)?; @@ -126,6 +130,9 @@ impl SQLPatternParser { SetExpr::Select(inner_select) => { let inner_aggregation = self.get_aggregation(inner_select)?; let inner_group_bys = self.get_groupbys(inner_select)?; + if !self.select_identifiers_subset_of(inner_select, &inner_group_bys) { + return None; + } let time_info = self.get_time_info(inner_select, &metric)?; Some(Box::new(SQLQueryData { @@ -210,6 +217,27 @@ impl SQLPatternParser { } } + /// Returns true iff every non-aggregate identifier in `select.projection` is + /// also present in `group_bys`. Used to reject queries like + /// `SELECT srcip, SUM(v) FROM t GROUP BY proto`, where standard SQL would + /// require `srcip` to appear in the GROUP BY clause; without this check the + /// pattern parser would silently drop `srcip` from the output. + fn select_identifiers_subset_of(&self, select: &Select, group_bys: &HashSet) -> bool { + for item in &select.projection { + let expr = match item { + SelectItem::UnnamedExpr(expr) => expr, + SelectItem::ExprWithAlias { expr, .. } => expr, + _ => continue, + }; + if let Expr::Identifier(ident) = expr { + if !group_bys.contains(&ident.value) { + return false; + } + } + } + true + } + fn get_aggregation(&self, select: &Select) -> Option { // Find the (single) aggregate function in the projection list. Other // projection items must be plain column references — these are expected to From c5bdda234d855c853f6f410531660cc8c68a487a Mon Sep 17 00:00:00 2001 From: Akanksha Akkihal Date: Sat, 23 May 2026 00:45:08 +0000 Subject: [PATCH 3/3] feat(sql-utilities,query-engine): add SQL ORDER BY and LIMIT support --- .../src/ast_matching/sqlhelper.rs | 18 + .../src/ast_matching/sqlparser_test.rs | 123 +++++- .../src/ast_matching/sqlpattern_matcher.rs | 3 + .../src/ast_matching/sqlpattern_parser.rs | 129 +++++- .../src/engines/simple_engine/sql.rs | 368 +++++++++++++++++- 5 files changed, 614 insertions(+), 27 deletions(-) diff --git a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlhelper.rs b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlhelper.rs index a43f3f0f..14e38244 100644 --- a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlhelper.rs +++ b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlhelper.rs @@ -89,10 +89,28 @@ impl SQLSchema { #[derive(Debug, Clone)] pub struct SQLQueryData { pub aggregation_info: AggregationInfo, + /// Alias of the aggregate function in SELECT, e.g. `agg(v) AS p99` → `Some("p99")`. + /// Captured separately from `aggregation_info` because it's presentational only: + /// two queries that differ solely in alias must still match the same template. + pub aggregation_alias: Option, pub metric: String, pub labels: HashSet, pub time_info: TimeInfo, pub subquery: Option>, + /// `ORDER BY` items in source order. Empty when no ORDER BY is present. + /// Excluded from `matches_sql_pattern` since ordering is post-aggregation. + pub order_by: Vec, + /// `LIMIT N`. None when no LIMIT is present. Excluded from `matches_sql_pattern`. + pub limit: Option, +} + +/// Single `ORDER BY` clause item: a column reference plus sort direction. +/// `column` is either a GROUP BY identifier or the aggregate alias. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct OrderByItem { + pub column: String, + /// `true` for ASC (the default when neither ASC nor DESC is specified), `false` for DESC. + pub ascending: bool, } #[derive(Debug, Clone)] diff --git a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs index 5843de6c..eb76dd5e 100644 --- a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs +++ b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs @@ -700,10 +700,17 @@ mod tests { } #[test] - fn test_order_by_is_rejected() { - assert!(parse_sql_query( - "SELECT AVG(value) FROM cpu_usage WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() GROUP BY L1, L2 ORDER BY L1" - ).is_none()); + fn test_order_by_groupby_column_default_ascending() { + // Bare `ORDER BY L1` (no ASC/DESC) defaults to ascending. The order_by item + // must reflect that the column is a GROUP BY key. + let q = parse_sql_query( + "SELECT AVG(value) FROM cpu_usage WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() GROUP BY L1, L2 ORDER BY L1", + ) + .expect("ORDER BY group-by column should parse"); + assert_eq!(q.order_by.len(), 1); + assert_eq!(q.order_by[0].column, "L1"); + assert!(q.order_by[0].ascending); + assert_eq!(q.limit, None); } // ── scrape_interval > 1s regression tests (issue #201) ─────────────────── @@ -919,4 +926,112 @@ mod tests { assert!(query.labels.contains("L2")); assert_eq!(query.aggregation_info.get_name(), "SUM"); } + + // ── ORDER BY / LIMIT support ───────────────────────────────────────────── + // + // The parser must accept ORDER BY (possibly multi-column, with optional ASC/DESC) + // and LIMIT N, capturing them in SQLQueryData for the engine to apply post-aggregation. + // ORDER BY columns must reference either the aggregate alias or a GROUP BY column. + // The aggregate alias is captured separately so `ORDER BY ` can resolve. + + #[test] + fn test_order_by_aggregate_alias_desc_limit_n() { + // Top-N user case: ORDER BY DESC LIMIT 10. + let q = parse_sql_query( + "SELECT L1, L2, QUANTILE(0.99, value) AS p99 FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) \ + GROUP BY L1, L2 \ + ORDER BY p99 DESC LIMIT 10", + ) + .expect("ORDER BY DESC LIMIT N should parse"); + assert_eq!(q.aggregation_alias.as_deref(), Some("p99")); + assert_eq!(q.order_by.len(), 1); + assert_eq!(q.order_by[0].column, "p99"); + assert!(!q.order_by[0].ascending); + assert_eq!(q.limit, Some(10)); + } + + #[test] + fn test_order_by_multiple_columns_mixed_directions() { + let q = parse_sql_query( + "SELECT QUANTILE(0.99, value) AS p99 FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -1, NOW()) AND NOW() \ + GROUP BY L1, L2 \ + ORDER BY L1 ASC, p99 DESC", + ) + .expect("multi-column ORDER BY with mixed directions should parse"); + assert_eq!(q.order_by.len(), 2); + assert_eq!(q.order_by[0].column, "L1"); + assert!(q.order_by[0].ascending); + assert_eq!(q.order_by[1].column, "p99"); + assert!(!q.order_by[1].ascending); + } + + #[test] + fn test_limit_only_no_orderby() { + let q = parse_sql_query( + "SELECT SUM(value) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1 \ + LIMIT 5", + ) + .expect("LIMIT without ORDER BY should parse"); + assert!(q.order_by.is_empty()); + assert_eq!(q.limit, Some(5)); + } + + #[test] + fn test_order_by_unknown_column_rejected() { + // mystery_col is neither the aggregate alias nor a GROUP BY column. + assert!(parse_sql_query( + "SELECT QUANTILE(0.99, value) AS p99 FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1 \ + ORDER BY mystery_col", + ) + .is_none()); + } + + #[test] + fn test_order_by_expression_rejected() { + assert!(parse_sql_query( + "SELECT QUANTILE(0.99, value) AS p99 FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1 \ + ORDER BY p99 + 1", + ) + .is_none()); + } + + #[test] + fn test_limit_with_offset_rejected() { + // OFFSET is not supported (no pagination semantics in the precompute model). + assert!(parse_sql_query( + "SELECT SUM(value) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1 \ + LIMIT 5 OFFSET 3", + ) + .is_none()); + } + + #[test] + fn test_matches_template_ignores_order_by_and_limit() { + // A registered template without ORDER BY / LIMIT must still match an incoming + // query that adds them — they're presentational, not structural. + let template = parse_sql_query( + "SELECT QUANTILE(0.99, value) AS p99 FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1, L2", + ) + .unwrap(); + let incoming = parse_sql_query( + "SELECT QUANTILE(0.99, value) AS top FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, '2025-10-01 00:00:10') AND '2025-10-01 00:00:10' \ + GROUP BY L1, L2 \ + ORDER BY top DESC LIMIT 25", + ) + .unwrap(); + assert!(incoming.matches_sql_pattern(&template)); + } } diff --git a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_matcher.rs b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_matcher.rs index 65433f8b..c23145b3 100644 --- a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_matcher.rs +++ b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_matcher.rs @@ -53,10 +53,13 @@ impl SQLQuery { let query_data = SQLQueryData { aggregation_info: aggregation, + aggregation_alias: None, metric, labels, time_info: time, subquery: None, + order_by: Vec::new(), + limit: None, }; self.query_data.push(query_data); diff --git a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs index 3453b700..7cc8ac17 100644 --- a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs +++ b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs @@ -1,5 +1,5 @@ use crate::sqlhelper::SQLSchema; -use crate::sqlhelper::{AggregationInfo, SQLQueryData, TimeInfo}; +use crate::sqlhelper::{AggregationInfo, OrderByItem, SQLQueryData, TimeInfo}; use sqlparser::ast::*; use std::collections::HashSet; @@ -36,20 +36,106 @@ impl SQLPatternParser { } fn parse_query_node(&self, query: &Query) -> Option { - if query.order_by.is_some() { - println!("ORDER BY is not supported"); - return None; - } + // Parse ORDER BY / LIMIT before walking into the SELECT body. Both are properties + // of the outer Query, not of the inner Select. Any unsupported sub-shape (positional + // refs, expressions, OFFSET, NULLS FIRST/LAST, ClickHouse `ORDER BY ALL`, etc.) + // bails the whole query rather than silently dropping it. + let order_by_items = self.parse_order_by_items(query)?; + let limit = self.parse_limit_value(query)?; // Convert CTE to subquery if present let query = self.cte_to_subquery(query); - match &query.body.as_ref() { - SetExpr::Select(select) => self.parse_select(select), + let mut data = match &query.body.as_ref() { + SetExpr::Select(select) => self.parse_select(select)?, _ => { println!("Not a SELECT statement"); - None + return None; + } + }; + + // ORDER BY columns must reference either the aggregate alias or a group-by key. + // Anything else (e.g. `ORDER BY some_other_column`) is rejected to avoid the + // engine returning an arbitrary order in cases where the user assumed the + // column would resolve. + for item in &order_by_items { + let valid = data.aggregation_alias.as_deref() == Some(item.column.as_str()) + || data.labels.contains(&item.column); + if !valid { + return None; + } + } + + data.order_by = order_by_items; + data.limit = limit; + Some(data) + } + + /// Convert `query.order_by` into a flat `Vec`. + /// Returns `Some(vec![])` when no ORDER BY is present. + /// Returns `None` for any unsupported shape (positional refs, expressions, + /// `WITH FILL`, `NULLS FIRST/LAST`, ClickHouse `ORDER BY ALL`, `INTERPOLATE`). + fn parse_order_by_items(&self, query: &Query) -> Option> { + let order_by = match &query.order_by { + None => return Some(Vec::new()), + Some(ob) => ob, + }; + if order_by.interpolate.is_some() { + return None; + } + let exprs = match &order_by.kind { + OrderByKind::Expressions(e) => e, + // `ORDER BY ALL` (DuckDB / ClickHouse extension) is not supported. + OrderByKind::All(_) => return None, + }; + let mut items = Vec::with_capacity(exprs.len()); + for ob in exprs { + if ob.with_fill.is_some() || ob.options.nulls_first.is_some() { + return None; } + let column = match &ob.expr { + Expr::Identifier(ident) => ident.value.clone(), + _ => return None, + }; + // Default direction is ASC when neither ASC nor DESC is written. + let ascending = ob.options.asc.unwrap_or(true); + items.push(OrderByItem { column, ascending }); + } + Some(items) + } + + /// Convert `query.limit_clause` into an `Option`. + /// Returns `Some(None)` when no LIMIT is present. + /// Returns `None` for any unsupported shape (OFFSET, `LIMIT BY`, MySQL `LIMIT a, b`, + /// non-literal expressions, `LIMIT ALL`). + fn parse_limit_value(&self, query: &Query) -> Option> { + let clause = match &query.limit_clause { + None => return Some(None), + Some(c) => c, + }; + let limit_expr = match clause { + // MySQL-style `LIMIT a, b` (offset-comma-limit) is not supported. + LimitClause::OffsetCommaLimit { .. } => return None, + LimitClause::LimitOffset { + limit, + offset, + limit_by, + } => { + if offset.is_some() || !limit_by.is_empty() { + return None; + } + match limit { + None => return Some(None), // `LIMIT ALL` or no LIMIT + Some(e) => e, + } + } + }; + match limit_expr { + Expr::Value(ValueWithSpan { + value: Value::Number(n, _), + .. + }) => n.parse::().ok().map(Some), + _ => None, } } @@ -88,7 +174,7 @@ impl SQLPatternParser { fn parse_select(&self, select: &Select) -> Option { let (metric, has_subquery) = self.get_metric(select)?; - let aggregation = self.get_aggregation(select)?; + let (aggregation, aggregation_alias) = self.get_aggregation(select)?; let group_bys = self.get_groupbys(select)?; @@ -118,17 +204,20 @@ impl SQLPatternParser { Some(SQLQueryData { aggregation_info: aggregation, + aggregation_alias, metric, labels: group_bys, time_info, subquery: None, + order_by: Vec::new(), + limit: None, }) } else { // Parse subquery let subquery = match &select.from[0].relation { TableFactor::Derived { subquery, .. } => match subquery.body.as_ref() { SetExpr::Select(inner_select) => { - let inner_aggregation = self.get_aggregation(inner_select)?; + let (inner_aggregation, inner_alias) = self.get_aggregation(inner_select)?; let inner_group_bys = self.get_groupbys(inner_select)?; if !self.select_identifiers_subset_of(inner_select, &inner_group_bys) { return None; @@ -137,10 +226,13 @@ impl SQLPatternParser { Some(Box::new(SQLQueryData { aggregation_info: inner_aggregation, + aggregation_alias: inner_alias, metric: metric.clone(), labels: inner_group_bys, time_info, subquery: None, + order_by: Vec::new(), + limit: None, })) } _ => None, @@ -150,10 +242,13 @@ impl SQLPatternParser { Some(SQLQueryData { aggregation_info: aggregation, + aggregation_alias, metric, labels: group_bys, time_info: TimeInfo::new("UNUSED".to_string(), -1.0, -1_f64), subquery: Some(subquery), + order_by: Vec::new(), + limit: None, }) } } @@ -238,17 +333,20 @@ impl SQLPatternParser { true } - fn get_aggregation(&self, select: &Select) -> Option { + fn get_aggregation(&self, select: &Select) -> Option<(AggregationInfo, Option)> { // Find the (single) aggregate function in the projection list. Other // projection items must be plain column references — these are expected to // be group-by keys (e.g. `SELECT g1, g2, SUM(v) FROM t GROUP BY g1, g2`). // Anything else (multiple aggregates, computed expressions, literals, *) // is rejected since the structural pattern model only tracks one statistic. + // Also captures the aggregate's alias if the SELECT writes `agg(v) AS `, + // so `ORDER BY ` can resolve later. let mut agg_func: Option<&Function> = None; + let mut agg_alias: Option = None; for item in &select.projection { - let expr = match item { - SelectItem::UnnamedExpr(expr) => expr, - SelectItem::ExprWithAlias { expr, .. } => expr, + let (expr, alias) = match item { + SelectItem::UnnamedExpr(expr) => (expr, None), + SelectItem::ExprWithAlias { expr, alias } => (expr, Some(alias.value.clone())), _ => return None, }; match expr { @@ -257,6 +355,7 @@ impl SQLPatternParser { return None; } agg_func = Some(f); + agg_alias = alias; } Expr::Identifier(_) | Expr::CompoundIdentifier(_) => {} _ => return None, @@ -332,7 +431,7 @@ impl SQLPatternParser { name }; - Some(AggregationInfo::new(normalized_name, col, args)) + Some((AggregationInfo::new(normalized_name, col, args), agg_alias)) } fn get_metric(&self, select: &Select) -> Option<(String, bool)> { diff --git a/asap-query-engine/src/engines/simple_engine/sql.rs b/asap-query-engine/src/engines/simple_engine/sql.rs index eb299013..66ba31fb 100644 --- a/asap-query-engine/src/engines/simple_engine/sql.rs +++ b/asap-query-engine/src/engines/simple_engine/sql.rs @@ -5,19 +5,138 @@ use super::SimpleEngine; use super::{QueryExecutionContext, QueryMetadata, QueryTimestamps}; use crate::data_model::{AggregationIdInfo, QueryConfig, SchemaConfig}; -use crate::engines::query_result::QueryResult; +use crate::engines::query_result::{InstantVector, InstantVectorElement, QueryResult}; use asap_types::query_requirements::QueryRequirements; use asap_types::utils::normalize_spatial_filter; use promql_utilities::data_model::KeyByLabelNames; use promql_utilities::query_logics::enums::{QueryPatternType, Statistic}; use sql_utilities::ast_matching::QueryType; use sql_utilities::ast_matching::{SQLPatternMatcher, SQLPatternParser, SQLQuery}; -use sql_utilities::sqlhelper::{AggregationInfo, SQLQueryData}; +use sql_utilities::sqlhelper::{AggregationInfo, OrderByItem, SQLQueryData}; use sqlparser::dialect::*; use sqlparser::parser::Parser as parser; use std::collections::HashMap; use tracing::{debug, warn}; +/// SQL-only post-processing produced alongside a `QueryExecutionContext`: +/// rules for ordering and truncating the final result vector. +/// +/// Lives outside `QueryExecutionContext` so that PromQL/Elastic engines — +/// which share that context but have no SQL-level ORDER BY / LIMIT — never +/// have to know about these fields. +#[derive(Debug, Clone, Default)] +pub struct SqlPostProcessing { + /// Alias of the aggregate function in SELECT, e.g. `agg(v) AS p99`. + /// Used so `ORDER BY p99` resolves to `element.value`. + pub aggregation_alias: Option, + /// `ORDER BY` items in source order. Empty when no ORDER BY clause is present. + pub order_by: Vec, + /// `LIMIT N`. None when no LIMIT clause is present. + pub limit: Option, +} + +impl SqlPostProcessing { + fn from_query_data(query_data: &SQLQueryData) -> Self { + Self { + aggregation_alias: query_data.aggregation_alias.clone(), + order_by: query_data.order_by.clone(), + limit: query_data.limit, + } + } + + /// Returns `true` when there's no ordering or truncation to apply, so the + /// caller can short-circuit any deconstruction of the result. + fn is_noop(&self) -> bool { + self.order_by.is_empty() && self.limit.is_none() + } + + /// Apply ORDER BY + LIMIT to a `QueryResult`. Only `QueryResult::Vector` + /// is rewritten; matrices pass through unchanged (range queries don't + /// flow through `handle_query_sql`). + pub fn apply(&self, output_labels: &KeyByLabelNames, result: QueryResult) -> QueryResult { + if self.is_noop() { + return result; + } + match result { + QueryResult::Vector(InstantVector { values, timestamp }) => { + let values = sort_and_truncate_instant_vector( + values, + &output_labels.labels, + self.aggregation_alias.as_deref(), + &self.order_by, + self.limit, + ); + QueryResult::Vector(InstantVector { values, timestamp }) + } + other => other, + } + } +} + +/// Sort and truncate a `Vec` per `ORDER BY` / `LIMIT`. +/// +/// Each `OrderByItem.column` resolves to either: +/// * the aggregate alias → compare by `element.value` +/// * a `label_names` entry → compare lexicographically by `element.labels[idx]` +/// +/// Items that don't match either category are silently skipped (the SQL parser +/// already rejects unknown identifiers, so reaching this branch indicates only +/// a mismatch between schema config and runtime labels). When `order_by` is +/// empty and `limit` is `None`, the result vector is returned unchanged. +fn sort_and_truncate_instant_vector( + mut results: Vec, + label_names: &[String], + aggregation_alias: Option<&str>, + order_by: &[OrderByItem], + limit: Option, +) -> Vec { + if !order_by.is_empty() { + // Pre-resolve each ORDER BY key once. KeyByLabelNames::new sorts the names + // alphabetically and InstantVectorElement.labels is parallel to that vector, + // so positional indexing is sound. + let resolved: Vec<(Option, bool)> = order_by + .iter() + .filter_map(|item| { + if aggregation_alias == Some(item.column.as_str()) { + Some((None, item.ascending)) + } else { + label_names + .iter() + .position(|n| n == &item.column) + .map(|i| (Some(i), item.ascending)) + } + }) + .collect(); + + results.sort_by(|a, b| { + for &(target, asc) in &resolved { + let ord = match target { + None => a + .value + .partial_cmp(&b.value) + .unwrap_or(std::cmp::Ordering::Equal), + Some(idx) => { + let av = a.labels.labels.get(idx).map(String::as_str).unwrap_or(""); + let bv = b.labels.labels.get(idx).map(String::as_str).unwrap_or(""); + av.cmp(bv) + } + }; + let ord = if asc { ord } else { ord.reverse() }; + if ord != std::cmp::Ordering::Equal { + return ord; + } + } + std::cmp::Ordering::Equal + }); + } + + if let Some(limit) = limit { + results.truncate(limit as usize); + } + + results +} + impl SimpleEngine { /// Finds the query configuration for a SQL query using structural pattern matching. /// @@ -202,15 +321,33 @@ impl SimpleEngine { query: String, time: f64, ) -> Option<(KeyByLabelNames, QueryResult)> { - let context = self.build_query_execution_context_sql(query, time)?; - self.execute_context(context, false) + let (context, post) = self.build_sql_query_pieces(query, time)?; + let (output_labels, result) = self.execute_context(context, false)?; + let result = post.apply(&output_labels, result); + Some((output_labels, result)) } + /// Public entry point retained for tests that only need the execution + /// context (e.g. assertions on `agg_info` or `metadata`). Discards the + /// SQL post-processing side-channel since it isn't applied without a + /// `QueryResult` to operate on. pub fn build_query_execution_context_sql( &self, query: String, time: f64, ) -> Option { + self.build_sql_query_pieces(query, time) + .map(|(ctx, _)| ctx) + } + + /// Internal: parses + plans a SQL query and returns both the execution + /// context (shared with PromQL/Elastic engines) and the SQL-only + /// post-processing rules (ORDER BY / LIMIT / alias resolution). + fn build_sql_query_pieces( + &self, + query: String, + time: f64, + ) -> Option<(QueryExecutionContext, SqlPostProcessing)> { // Get SQL schema from inference config let schema = match &self.inference_config.read().unwrap().schema { SchemaConfig::SQL(sql_schema) => sql_schema.clone(), @@ -243,12 +380,19 @@ impl SimpleEngine { return None; } + // ORDER BY / LIMIT / aggregate alias are presentational and SQL-specific. + // They live alongside the (engine-shared) `QueryExecutionContext` rather than + // inside it. Built once here from the parsed `query_data` and returned with + // every successful path below. + let post = SqlPostProcessing::from_query_data(&query_data); + // Handle SpatioTemporal queries separately - they bypass QueryPatternType mapping if match_result.query_type == vec![QueryType::SpatioTemporal] { let query_time = Self::convert_query_time_to_data_time( query_data.time_info.get_start() + query_data.time_info.get_duration(), ); - return self.build_spatiotemporal_context(&match_result, query_time, &query_data); + let ctx = self.build_spatiotemporal_context(&match_result, query_time, &query_data)?; + return Some((ctx, post)); } let query_pattern_type = match &match_result.query_type[..] { @@ -415,7 +559,7 @@ impl SimpleEngine { let do_merge = query_pattern_type == QueryPatternType::OnlyTemporal || query_pattern_type == QueryPatternType::OneTemporalOneSpatial; - self.build_sql_execution_context_tail( + let ctx = self.build_sql_execution_context_tail( metric, ×tamps, metadata, @@ -423,7 +567,8 @@ impl SimpleEngine { do_merge, spatial_filter, query_time, - ) + )?; + Some((ctx, post)) } /// Shared context-building tail for both SQL context builders. @@ -432,7 +577,6 @@ impl SimpleEngine { /// after labels, statistic, metadata, timestamps, and `agg_info` are resolved. /// Builds the query plan, derives grouping/aggregated labels, and returns the /// final `QueryExecutionContext`. - #[allow(clippy::too_many_arguments)] fn build_sql_execution_context_tail( &self, metric: &str, @@ -562,3 +706,211 @@ impl SimpleEngine { ) } } + +#[cfg(test)] +mod sort_and_truncate_tests { + use super::sort_and_truncate_instant_vector; + use super::SqlPostProcessing; + use crate::data_model::KeyByLabelValues; + use crate::engines::query_result::{InstantVector, InstantVectorElement, QueryResult}; + use promql_utilities::data_model::KeyByLabelNames; + use sql_utilities::sqlhelper::OrderByItem; + + fn elem(labels: &[&str], value: f64) -> InstantVectorElement { + InstantVectorElement { + labels: KeyByLabelValues::new_with_labels( + labels.iter().map(|s| s.to_string()).collect(), + ), + value, + } + } + + fn label_names(names: &[&str]) -> Vec { + names.iter().map(|s| s.to_string()).collect() + } + + #[test] + fn no_orderby_no_limit_returns_unchanged() { + let input = vec![elem(&["a"], 1.0), elem(&["b"], 2.0)]; + let result = + sort_and_truncate_instant_vector(input.clone(), &label_names(&["L"]), None, &[], None); + assert_eq!(result.len(), 2); + assert_eq!(result[0].labels.labels, input[0].labels.labels); + assert_eq!(result[1].labels.labels, input[1].labels.labels); + } + + #[test] + fn order_by_aggregate_desc_with_limit() { + // Mirrors the user's netflow query shape: ORDER BY DESC LIMIT N. + // Build 5 rows with values 1..=5 and assert top-3 in descending order. + let input = vec![ + elem(&["a"], 1.0), + elem(&["b"], 5.0), + elem(&["c"], 3.0), + elem(&["d"], 2.0), + elem(&["e"], 4.0), + ]; + let order_by = vec![OrderByItem { + column: "p99".to_string(), + ascending: false, + }]; + let result = sort_and_truncate_instant_vector( + input, + &label_names(&["L"]), + Some("p99"), + &order_by, + Some(3), + ); + assert_eq!(result.len(), 3); + let values: Vec = result.iter().map(|e| e.value).collect(); + assert_eq!(values, vec![5.0, 4.0, 3.0]); + } + + #[test] + fn order_by_label_ascending_default() { + // ORDER BY with no ASC/DESC defaults to ascending. + let input = vec![ + elem(&["c"], 1.0), + elem(&["a"], 2.0), + elem(&["b"], 3.0), + ]; + let order_by = vec![OrderByItem { + column: "L".to_string(), + ascending: true, + }]; + let result = + sort_and_truncate_instant_vector(input, &label_names(&["L"]), None, &order_by, None); + let labels: Vec<&str> = result + .iter() + .map(|e| e.labels.labels[0].as_str()) + .collect(); + assert_eq!(labels, vec!["a", "b", "c"]); + } + + #[test] + fn order_by_multi_key_uses_secondary_for_ties() { + // Primary: L1 ASC. Secondary: value DESC. Tied L1 values should be + // broken by descending value. labels are [L1, L2] alphabetical ⇒ index 0 = L1. + let input = vec![ + elem(&["x", "i"], 1.0), + elem(&["x", "j"], 5.0), + elem(&["a", "k"], 3.0), + elem(&["a", "l"], 7.0), + ]; + let order_by = vec![ + OrderByItem { + column: "L1".to_string(), + ascending: true, + }, + OrderByItem { + column: "p99".to_string(), + ascending: false, + }, + ]; + let result = sort_and_truncate_instant_vector( + input, + &label_names(&["L1", "L2"]), + Some("p99"), + &order_by, + None, + ); + let expected: Vec<(&str, f64)> = vec![("a", 7.0), ("a", 3.0), ("x", 5.0), ("x", 1.0)]; + let actual: Vec<(&str, f64)> = result + .iter() + .map(|e| (e.labels.labels[0].as_str(), e.value)) + .collect(); + assert_eq!(actual, expected); + } + + #[test] + fn limit_only_no_orderby_truncates_in_place() { + let input = vec![ + elem(&["a"], 1.0), + elem(&["b"], 2.0), + elem(&["c"], 3.0), + ]; + let result = + sort_and_truncate_instant_vector(input, &label_names(&["L"]), None, &[], Some(2)); + assert_eq!(result.len(), 2); + assert_eq!(result[0].labels.labels[0], "a"); + assert_eq!(result[1].labels.labels[0], "b"); + } + + #[test] + fn nan_values_do_not_panic() { + // partial_cmp returns None for NaN; we map to Equal to keep the comparator total. + let input = vec![ + elem(&["a"], f64::NAN), + elem(&["b"], 1.0), + elem(&["c"], 2.0), + ]; + let order_by = vec![OrderByItem { + column: "p99".to_string(), + ascending: false, + }]; + let result = sort_and_truncate_instant_vector( + input, + &label_names(&["L"]), + Some("p99"), + &order_by, + None, + ); + assert_eq!(result.len(), 3); + } + + #[test] + fn sql_post_processing_default_is_noop() { + // Default == no ORDER BY, no LIMIT, no alias. apply() must hand back the + // exact QueryResult unchanged (no allocation, no reorder). + let post = SqlPostProcessing::default(); + let input = vec![elem(&["c"], 3.0), elem(&["a"], 1.0), elem(&["b"], 2.0)]; + let labels = KeyByLabelNames::new(vec!["L".to_string()]); + let result = QueryResult::Vector(InstantVector { + values: input.clone(), + timestamp: 1234, + }); + let out = post.apply(&labels, result); + let QueryResult::Vector(v) = out else { + panic!("expected vector"); + }; + let values: Vec<&str> = v + .values + .iter() + .map(|e| e.labels.labels[0].as_str()) + .collect(); + assert_eq!(values, vec!["c", "a", "b"]); + assert_eq!(v.timestamp, 1234); + } + + #[test] + fn sql_post_processing_applies_orderby_desc_limit() { + // End-to-end check at the SqlPostProcessing layer: the wrapper unpacks + // the vector, sorts and truncates, and re-wraps preserving timestamp. + let post = SqlPostProcessing { + aggregation_alias: Some("p99".to_string()), + order_by: vec![OrderByItem { + column: "p99".to_string(), + ascending: false, + }], + limit: Some(2), + }; + let labels = KeyByLabelNames::new(vec!["L".to_string()]); + let input = vec![ + elem(&["a"], 1.0), + elem(&["b"], 5.0), + elem(&["c"], 3.0), + elem(&["d"], 2.0), + ]; + let result = QueryResult::Vector(InstantVector { + values: input, + timestamp: 9999, + }); + let out = post.apply(&labels, result); + let QueryResult::Vector(v) = out else { + panic!("expected vector"); + }; + assert_eq!(v.timestamp, 9999); + let values: Vec = v.values.iter().map(|e| e.value).collect(); + assert_eq!(values, vec![5.0, 3.0]); + } +}