From 8140ea78dca78010d0f9671ab6fd60fa20afd51e Mon Sep 17 00:00:00 2001 From: Akanksha Akkihal Date: Sat, 23 May 2026 00:45:08 +0000 Subject: [PATCH 1/4] feat(sql-utilities,query-engine): add SQL ORDER BY and LIMIT support --- .../src/ast_matching/sqlhelper.rs | 18 + .../src/ast_matching/sqlparser_test.rs | 123 +++++- .../src/ast_matching/sqlpattern_matcher.rs | 3 + .../src/ast_matching/sqlpattern_parser.rs | 129 +++++- .../src/engines/simple_engine/sql.rs | 368 +++++++++++++++++- 5 files changed, 614 insertions(+), 27 deletions(-) diff --git a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlhelper.rs b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlhelper.rs index a43f3f0f..14e38244 100644 --- a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlhelper.rs +++ b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlhelper.rs @@ -89,10 +89,28 @@ impl SQLSchema { #[derive(Debug, Clone)] pub struct SQLQueryData { pub aggregation_info: AggregationInfo, + /// Alias of the aggregate function in SELECT, e.g. `agg(v) AS p99` → `Some("p99")`. + /// Captured separately from `aggregation_info` because it's presentational only: + /// two queries that differ solely in alias must still match the same template. + pub aggregation_alias: Option, pub metric: String, pub labels: HashSet, pub time_info: TimeInfo, pub subquery: Option>, + /// `ORDER BY` items in source order. Empty when no ORDER BY is present. + /// Excluded from `matches_sql_pattern` since ordering is post-aggregation. + pub order_by: Vec, + /// `LIMIT N`. None when no LIMIT is present. Excluded from `matches_sql_pattern`. + pub limit: Option, +} + +/// Single `ORDER BY` clause item: a column reference plus sort direction. +/// `column` is either a GROUP BY identifier or the aggregate alias. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct OrderByItem { + pub column: String, + /// `true` for ASC (the default when neither ASC nor DESC is specified), `false` for DESC. + pub ascending: bool, } #[derive(Debug, Clone)] diff --git a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs index 5843de6c..eb76dd5e 100644 --- a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs +++ b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlparser_test.rs @@ -700,10 +700,17 @@ mod tests { } #[test] - fn test_order_by_is_rejected() { - assert!(parse_sql_query( - "SELECT AVG(value) FROM cpu_usage WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() GROUP BY L1, L2 ORDER BY L1" - ).is_none()); + fn test_order_by_groupby_column_default_ascending() { + // Bare `ORDER BY L1` (no ASC/DESC) defaults to ascending. The order_by item + // must reflect that the column is a GROUP BY key. + let q = parse_sql_query( + "SELECT AVG(value) FROM cpu_usage WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() GROUP BY L1, L2 ORDER BY L1", + ) + .expect("ORDER BY group-by column should parse"); + assert_eq!(q.order_by.len(), 1); + assert_eq!(q.order_by[0].column, "L1"); + assert!(q.order_by[0].ascending); + assert_eq!(q.limit, None); } // ── scrape_interval > 1s regression tests (issue #201) ─────────────────── @@ -919,4 +926,112 @@ mod tests { assert!(query.labels.contains("L2")); assert_eq!(query.aggregation_info.get_name(), "SUM"); } + + // ── ORDER BY / LIMIT support ───────────────────────────────────────────── + // + // The parser must accept ORDER BY (possibly multi-column, with optional ASC/DESC) + // and LIMIT N, capturing them in SQLQueryData for the engine to apply post-aggregation. + // ORDER BY columns must reference either the aggregate alias or a GROUP BY column. + // The aggregate alias is captured separately so `ORDER BY ` can resolve. + + #[test] + fn test_order_by_aggregate_alias_desc_limit_n() { + // Top-N user case: ORDER BY DESC LIMIT 10. + let q = parse_sql_query( + "SELECT L1, L2, QUANTILE(0.99, value) AS p99 FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) \ + GROUP BY L1, L2 \ + ORDER BY p99 DESC LIMIT 10", + ) + .expect("ORDER BY DESC LIMIT N should parse"); + assert_eq!(q.aggregation_alias.as_deref(), Some("p99")); + assert_eq!(q.order_by.len(), 1); + assert_eq!(q.order_by[0].column, "p99"); + assert!(!q.order_by[0].ascending); + assert_eq!(q.limit, Some(10)); + } + + #[test] + fn test_order_by_multiple_columns_mixed_directions() { + let q = parse_sql_query( + "SELECT QUANTILE(0.99, value) AS p99 FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -1, NOW()) AND NOW() \ + GROUP BY L1, L2 \ + ORDER BY L1 ASC, p99 DESC", + ) + .expect("multi-column ORDER BY with mixed directions should parse"); + assert_eq!(q.order_by.len(), 2); + assert_eq!(q.order_by[0].column, "L1"); + assert!(q.order_by[0].ascending); + assert_eq!(q.order_by[1].column, "p99"); + assert!(!q.order_by[1].ascending); + } + + #[test] + fn test_limit_only_no_orderby() { + let q = parse_sql_query( + "SELECT SUM(value) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1 \ + LIMIT 5", + ) + .expect("LIMIT without ORDER BY should parse"); + assert!(q.order_by.is_empty()); + assert_eq!(q.limit, Some(5)); + } + + #[test] + fn test_order_by_unknown_column_rejected() { + // mystery_col is neither the aggregate alias nor a GROUP BY column. + assert!(parse_sql_query( + "SELECT QUANTILE(0.99, value) AS p99 FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1 \ + ORDER BY mystery_col", + ) + .is_none()); + } + + #[test] + fn test_order_by_expression_rejected() { + assert!(parse_sql_query( + "SELECT QUANTILE(0.99, value) AS p99 FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1 \ + ORDER BY p99 + 1", + ) + .is_none()); + } + + #[test] + fn test_limit_with_offset_rejected() { + // OFFSET is not supported (no pagination semantics in the precompute model). + assert!(parse_sql_query( + "SELECT SUM(value) FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1 \ + LIMIT 5 OFFSET 3", + ) + .is_none()); + } + + #[test] + fn test_matches_template_ignores_order_by_and_limit() { + // A registered template without ORDER BY / LIMIT must still match an incoming + // query that adds them — they're presentational, not structural. + let template = parse_sql_query( + "SELECT QUANTILE(0.99, value) AS p99 FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \ + GROUP BY L1, L2", + ) + .unwrap(); + let incoming = parse_sql_query( + "SELECT QUANTILE(0.99, value) AS top FROM cpu_usage \ + WHERE time BETWEEN DATEADD(s, -10, '2025-10-01 00:00:10') AND '2025-10-01 00:00:10' \ + GROUP BY L1, L2 \ + ORDER BY top DESC LIMIT 25", + ) + .unwrap(); + assert!(incoming.matches_sql_pattern(&template)); + } } diff --git a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_matcher.rs b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_matcher.rs index 65433f8b..c23145b3 100644 --- a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_matcher.rs +++ b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_matcher.rs @@ -53,10 +53,13 @@ impl SQLQuery { let query_data = SQLQueryData { aggregation_info: aggregation, + aggregation_alias: None, metric, labels, time_info: time, subquery: None, + order_by: Vec::new(), + limit: None, }; self.query_data.push(query_data); diff --git a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs index 3453b700..7cc8ac17 100644 --- a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs +++ b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs @@ -1,5 +1,5 @@ use crate::sqlhelper::SQLSchema; -use crate::sqlhelper::{AggregationInfo, SQLQueryData, TimeInfo}; +use crate::sqlhelper::{AggregationInfo, OrderByItem, SQLQueryData, TimeInfo}; use sqlparser::ast::*; use std::collections::HashSet; @@ -36,20 +36,106 @@ impl SQLPatternParser { } fn parse_query_node(&self, query: &Query) -> Option { - if query.order_by.is_some() { - println!("ORDER BY is not supported"); - return None; - } + // Parse ORDER BY / LIMIT before walking into the SELECT body. Both are properties + // of the outer Query, not of the inner Select. Any unsupported sub-shape (positional + // refs, expressions, OFFSET, NULLS FIRST/LAST, ClickHouse `ORDER BY ALL`, etc.) + // bails the whole query rather than silently dropping it. + let order_by_items = self.parse_order_by_items(query)?; + let limit = self.parse_limit_value(query)?; // Convert CTE to subquery if present let query = self.cte_to_subquery(query); - match &query.body.as_ref() { - SetExpr::Select(select) => self.parse_select(select), + let mut data = match &query.body.as_ref() { + SetExpr::Select(select) => self.parse_select(select)?, _ => { println!("Not a SELECT statement"); - None + return None; + } + }; + + // ORDER BY columns must reference either the aggregate alias or a group-by key. + // Anything else (e.g. `ORDER BY some_other_column`) is rejected to avoid the + // engine returning an arbitrary order in cases where the user assumed the + // column would resolve. + for item in &order_by_items { + let valid = data.aggregation_alias.as_deref() == Some(item.column.as_str()) + || data.labels.contains(&item.column); + if !valid { + return None; + } + } + + data.order_by = order_by_items; + data.limit = limit; + Some(data) + } + + /// Convert `query.order_by` into a flat `Vec`. + /// Returns `Some(vec![])` when no ORDER BY is present. + /// Returns `None` for any unsupported shape (positional refs, expressions, + /// `WITH FILL`, `NULLS FIRST/LAST`, ClickHouse `ORDER BY ALL`, `INTERPOLATE`). + fn parse_order_by_items(&self, query: &Query) -> Option> { + let order_by = match &query.order_by { + None => return Some(Vec::new()), + Some(ob) => ob, + }; + if order_by.interpolate.is_some() { + return None; + } + let exprs = match &order_by.kind { + OrderByKind::Expressions(e) => e, + // `ORDER BY ALL` (DuckDB / ClickHouse extension) is not supported. + OrderByKind::All(_) => return None, + }; + let mut items = Vec::with_capacity(exprs.len()); + for ob in exprs { + if ob.with_fill.is_some() || ob.options.nulls_first.is_some() { + return None; } + let column = match &ob.expr { + Expr::Identifier(ident) => ident.value.clone(), + _ => return None, + }; + // Default direction is ASC when neither ASC nor DESC is written. + let ascending = ob.options.asc.unwrap_or(true); + items.push(OrderByItem { column, ascending }); + } + Some(items) + } + + /// Convert `query.limit_clause` into an `Option`. + /// Returns `Some(None)` when no LIMIT is present. + /// Returns `None` for any unsupported shape (OFFSET, `LIMIT BY`, MySQL `LIMIT a, b`, + /// non-literal expressions, `LIMIT ALL`). + fn parse_limit_value(&self, query: &Query) -> Option> { + let clause = match &query.limit_clause { + None => return Some(None), + Some(c) => c, + }; + let limit_expr = match clause { + // MySQL-style `LIMIT a, b` (offset-comma-limit) is not supported. + LimitClause::OffsetCommaLimit { .. } => return None, + LimitClause::LimitOffset { + limit, + offset, + limit_by, + } => { + if offset.is_some() || !limit_by.is_empty() { + return None; + } + match limit { + None => return Some(None), // `LIMIT ALL` or no LIMIT + Some(e) => e, + } + } + }; + match limit_expr { + Expr::Value(ValueWithSpan { + value: Value::Number(n, _), + .. + }) => n.parse::().ok().map(Some), + _ => None, } } @@ -88,7 +174,7 @@ impl SQLPatternParser { fn parse_select(&self, select: &Select) -> Option { let (metric, has_subquery) = self.get_metric(select)?; - let aggregation = self.get_aggregation(select)?; + let (aggregation, aggregation_alias) = self.get_aggregation(select)?; let group_bys = self.get_groupbys(select)?; @@ -118,17 +204,20 @@ impl SQLPatternParser { Some(SQLQueryData { aggregation_info: aggregation, + aggregation_alias, metric, labels: group_bys, time_info, subquery: None, + order_by: Vec::new(), + limit: None, }) } else { // Parse subquery let subquery = match &select.from[0].relation { TableFactor::Derived { subquery, .. } => match subquery.body.as_ref() { SetExpr::Select(inner_select) => { - let inner_aggregation = self.get_aggregation(inner_select)?; + let (inner_aggregation, inner_alias) = self.get_aggregation(inner_select)?; let inner_group_bys = self.get_groupbys(inner_select)?; if !self.select_identifiers_subset_of(inner_select, &inner_group_bys) { return None; @@ -137,10 +226,13 @@ impl SQLPatternParser { Some(Box::new(SQLQueryData { aggregation_info: inner_aggregation, + aggregation_alias: inner_alias, metric: metric.clone(), labels: inner_group_bys, time_info, subquery: None, + order_by: Vec::new(), + limit: None, })) } _ => None, @@ -150,10 +242,13 @@ impl SQLPatternParser { Some(SQLQueryData { aggregation_info: aggregation, + aggregation_alias, metric, labels: group_bys, time_info: TimeInfo::new("UNUSED".to_string(), -1.0, -1_f64), subquery: Some(subquery), + order_by: Vec::new(), + limit: None, }) } } @@ -238,17 +333,20 @@ impl SQLPatternParser { true } - fn get_aggregation(&self, select: &Select) -> Option { + fn get_aggregation(&self, select: &Select) -> Option<(AggregationInfo, Option)> { // Find the (single) aggregate function in the projection list. Other // projection items must be plain column references — these are expected to // be group-by keys (e.g. `SELECT g1, g2, SUM(v) FROM t GROUP BY g1, g2`). // Anything else (multiple aggregates, computed expressions, literals, *) // is rejected since the structural pattern model only tracks one statistic. + // Also captures the aggregate's alias if the SELECT writes `agg(v) AS `, + // so `ORDER BY ` can resolve later. let mut agg_func: Option<&Function> = None; + let mut agg_alias: Option = None; for item in &select.projection { - let expr = match item { - SelectItem::UnnamedExpr(expr) => expr, - SelectItem::ExprWithAlias { expr, .. } => expr, + let (expr, alias) = match item { + SelectItem::UnnamedExpr(expr) => (expr, None), + SelectItem::ExprWithAlias { expr, alias } => (expr, Some(alias.value.clone())), _ => return None, }; match expr { @@ -257,6 +355,7 @@ impl SQLPatternParser { return None; } agg_func = Some(f); + agg_alias = alias; } Expr::Identifier(_) | Expr::CompoundIdentifier(_) => {} _ => return None, @@ -332,7 +431,7 @@ impl SQLPatternParser { name }; - Some(AggregationInfo::new(normalized_name, col, args)) + Some((AggregationInfo::new(normalized_name, col, args), agg_alias)) } fn get_metric(&self, select: &Select) -> Option<(String, bool)> { diff --git a/asap-query-engine/src/engines/simple_engine/sql.rs b/asap-query-engine/src/engines/simple_engine/sql.rs index 51528cbb..08508f36 100644 --- a/asap-query-engine/src/engines/simple_engine/sql.rs +++ b/asap-query-engine/src/engines/simple_engine/sql.rs @@ -5,19 +5,138 @@ use super::SimpleEngine; use super::{QueryExecutionContext, QueryMetadata, QueryTimestamps}; use crate::data_model::{AggregationIdInfo, QueryConfig, SchemaConfig}; -use crate::engines::query_result::QueryResult; +use crate::engines::query_result::{InstantVector, InstantVectorElement, QueryResult}; use asap_types::query_requirements::QueryRequirements; use asap_types::utils::normalize_spatial_filter; use promql_utilities::data_model::KeyByLabelNames; use promql_utilities::query_logics::enums::{QueryPatternType, Statistic}; use sql_utilities::ast_matching::QueryType; use sql_utilities::ast_matching::{SQLPatternMatcher, SQLPatternParser, SQLQuery}; -use sql_utilities::sqlhelper::{AggregationInfo, SQLQueryData}; +use sql_utilities::sqlhelper::{AggregationInfo, OrderByItem, SQLQueryData}; use sqlparser::dialect::*; use sqlparser::parser::Parser as parser; use std::collections::HashMap; use tracing::{debug, warn}; +/// SQL-only post-processing produced alongside a `QueryExecutionContext`: +/// rules for ordering and truncating the final result vector. +/// +/// Lives outside `QueryExecutionContext` so that PromQL/Elastic engines — +/// which share that context but have no SQL-level ORDER BY / LIMIT — never +/// have to know about these fields. +#[derive(Debug, Clone, Default)] +pub struct SqlPostProcessing { + /// Alias of the aggregate function in SELECT, e.g. `agg(v) AS p99`. + /// Used so `ORDER BY p99` resolves to `element.value`. + pub aggregation_alias: Option, + /// `ORDER BY` items in source order. Empty when no ORDER BY clause is present. + pub order_by: Vec, + /// `LIMIT N`. None when no LIMIT clause is present. + pub limit: Option, +} + +impl SqlPostProcessing { + fn from_query_data(query_data: &SQLQueryData) -> Self { + Self { + aggregation_alias: query_data.aggregation_alias.clone(), + order_by: query_data.order_by.clone(), + limit: query_data.limit, + } + } + + /// Returns `true` when there's no ordering or truncation to apply, so the + /// caller can short-circuit any deconstruction of the result. + fn is_noop(&self) -> bool { + self.order_by.is_empty() && self.limit.is_none() + } + + /// Apply ORDER BY + LIMIT to a `QueryResult`. Only `QueryResult::Vector` + /// is rewritten; matrices pass through unchanged (range queries don't + /// flow through `handle_query_sql`). + pub fn apply(&self, output_labels: &KeyByLabelNames, result: QueryResult) -> QueryResult { + if self.is_noop() { + return result; + } + match result { + QueryResult::Vector(InstantVector { values, timestamp }) => { + let values = sort_and_truncate_instant_vector( + values, + &output_labels.labels, + self.aggregation_alias.as_deref(), + &self.order_by, + self.limit, + ); + QueryResult::Vector(InstantVector { values, timestamp }) + } + other => other, + } + } +} + +/// Sort and truncate a `Vec` per `ORDER BY` / `LIMIT`. +/// +/// Each `OrderByItem.column` resolves to either: +/// * the aggregate alias → compare by `element.value` +/// * a `label_names` entry → compare lexicographically by `element.labels[idx]` +/// +/// Items that don't match either category are silently skipped (the SQL parser +/// already rejects unknown identifiers, so reaching this branch indicates only +/// a mismatch between schema config and runtime labels). When `order_by` is +/// empty and `limit` is `None`, the result vector is returned unchanged. +fn sort_and_truncate_instant_vector( + mut results: Vec, + label_names: &[String], + aggregation_alias: Option<&str>, + order_by: &[OrderByItem], + limit: Option, +) -> Vec { + if !order_by.is_empty() { + // Pre-resolve each ORDER BY key once. KeyByLabelNames::new sorts the names + // alphabetically and InstantVectorElement.labels is parallel to that vector, + // so positional indexing is sound. + let resolved: Vec<(Option, bool)> = order_by + .iter() + .filter_map(|item| { + if aggregation_alias == Some(item.column.as_str()) { + Some((None, item.ascending)) + } else { + label_names + .iter() + .position(|n| n == &item.column) + .map(|i| (Some(i), item.ascending)) + } + }) + .collect(); + + results.sort_by(|a, b| { + for &(target, asc) in &resolved { + let ord = match target { + None => a + .value + .partial_cmp(&b.value) + .unwrap_or(std::cmp::Ordering::Equal), + Some(idx) => { + let av = a.labels.labels.get(idx).map(String::as_str).unwrap_or(""); + let bv = b.labels.labels.get(idx).map(String::as_str).unwrap_or(""); + av.cmp(bv) + } + }; + let ord = if asc { ord } else { ord.reverse() }; + if ord != std::cmp::Ordering::Equal { + return ord; + } + } + std::cmp::Ordering::Equal + }); + } + + if let Some(limit) = limit { + results.truncate(limit as usize); + } + + results +} + impl SimpleEngine { /// Finds the query configuration for a SQL query using structural pattern matching. /// @@ -202,15 +321,33 @@ impl SimpleEngine { query: String, time: f64, ) -> Option<(KeyByLabelNames, QueryResult)> { - let context = self.build_query_execution_context_sql(query, time)?; - self.execute_context(context, false) + let (context, post) = self.build_sql_query_pieces(query, time)?; + let (output_labels, result) = self.execute_context(context, false)?; + let result = post.apply(&output_labels, result); + Some((output_labels, result)) } + /// Public entry point retained for tests that only need the execution + /// context (e.g. assertions on `agg_info` or `metadata`). Discards the + /// SQL post-processing side-channel since it isn't applied without a + /// `QueryResult` to operate on. pub fn build_query_execution_context_sql( &self, query: String, time: f64, ) -> Option { + self.build_sql_query_pieces(query, time) + .map(|(ctx, _)| ctx) + } + + /// Internal: parses + plans a SQL query and returns both the execution + /// context (shared with PromQL/Elastic engines) and the SQL-only + /// post-processing rules (ORDER BY / LIMIT / alias resolution). + fn build_sql_query_pieces( + &self, + query: String, + time: f64, + ) -> Option<(QueryExecutionContext, SqlPostProcessing)> { // Get SQL schema from inference config let schema = match &self.inference_config.read().unwrap().schema { SchemaConfig::SQL(sql_schema) => sql_schema.clone(), @@ -243,12 +380,19 @@ impl SimpleEngine { return None; } + // ORDER BY / LIMIT / aggregate alias are presentational and SQL-specific. + // They live alongside the (engine-shared) `QueryExecutionContext` rather than + // inside it. Built once here from the parsed `query_data` and returned with + // every successful path below. + let post = SqlPostProcessing::from_query_data(&query_data); + // Handle SpatioTemporal queries separately - they bypass QueryPatternType mapping if match_result.query_type == vec![QueryType::SpatioTemporal] { let query_time = Self::convert_query_time_to_data_time( query_data.time_info.get_start() + query_data.time_info.get_duration(), ); - return self.build_spatiotemporal_context(&match_result, query_time, &query_data); + let ctx = self.build_spatiotemporal_context(&match_result, query_time, &query_data)?; + return Some((ctx, post)); } let query_pattern_type = match &match_result.query_type[..] { @@ -415,7 +559,7 @@ impl SimpleEngine { let do_merge = query_pattern_type == QueryPatternType::OnlyTemporal || query_pattern_type == QueryPatternType::OneTemporalOneSpatial; - self.build_sql_execution_context_tail( + let ctx = self.build_sql_execution_context_tail( metric, ×tamps, metadata, @@ -423,7 +567,8 @@ impl SimpleEngine { do_merge, spatial_filter, query_time, - ) + )?; + Some((ctx, post)) } /// Shared context-building tail for both SQL context builders. @@ -432,7 +577,6 @@ impl SimpleEngine { /// after labels, statistic, metadata, timestamps, and `agg_info` are resolved. /// Builds the query plan, derives grouping/aggregated labels, and returns the /// final `QueryExecutionContext`. - #[allow(clippy::too_many_arguments)] fn build_sql_execution_context_tail( &self, metric: &str, @@ -562,3 +706,211 @@ impl SimpleEngine { ) } } + +#[cfg(test)] +mod sort_and_truncate_tests { + use super::sort_and_truncate_instant_vector; + use super::SqlPostProcessing; + use crate::data_model::KeyByLabelValues; + use crate::engines::query_result::{InstantVector, InstantVectorElement, QueryResult}; + use promql_utilities::data_model::KeyByLabelNames; + use sql_utilities::sqlhelper::OrderByItem; + + fn elem(labels: &[&str], value: f64) -> InstantVectorElement { + InstantVectorElement { + labels: KeyByLabelValues::new_with_labels( + labels.iter().map(|s| s.to_string()).collect(), + ), + value, + } + } + + fn label_names(names: &[&str]) -> Vec { + names.iter().map(|s| s.to_string()).collect() + } + + #[test] + fn no_orderby_no_limit_returns_unchanged() { + let input = vec![elem(&["a"], 1.0), elem(&["b"], 2.0)]; + let result = + sort_and_truncate_instant_vector(input.clone(), &label_names(&["L"]), None, &[], None); + assert_eq!(result.len(), 2); + assert_eq!(result[0].labels.labels, input[0].labels.labels); + assert_eq!(result[1].labels.labels, input[1].labels.labels); + } + + #[test] + fn order_by_aggregate_desc_with_limit() { + // Mirrors the user's netflow query shape: ORDER BY DESC LIMIT N. + // Build 5 rows with values 1..=5 and assert top-3 in descending order. + let input = vec![ + elem(&["a"], 1.0), + elem(&["b"], 5.0), + elem(&["c"], 3.0), + elem(&["d"], 2.0), + elem(&["e"], 4.0), + ]; + let order_by = vec![OrderByItem { + column: "p99".to_string(), + ascending: false, + }]; + let result = sort_and_truncate_instant_vector( + input, + &label_names(&["L"]), + Some("p99"), + &order_by, + Some(3), + ); + assert_eq!(result.len(), 3); + let values: Vec = result.iter().map(|e| e.value).collect(); + assert_eq!(values, vec![5.0, 4.0, 3.0]); + } + + #[test] + fn order_by_label_ascending_default() { + // ORDER BY with no ASC/DESC defaults to ascending. + let input = vec![ + elem(&["c"], 1.0), + elem(&["a"], 2.0), + elem(&["b"], 3.0), + ]; + let order_by = vec![OrderByItem { + column: "L".to_string(), + ascending: true, + }]; + let result = + sort_and_truncate_instant_vector(input, &label_names(&["L"]), None, &order_by, None); + let labels: Vec<&str> = result + .iter() + .map(|e| e.labels.labels[0].as_str()) + .collect(); + assert_eq!(labels, vec!["a", "b", "c"]); + } + + #[test] + fn order_by_multi_key_uses_secondary_for_ties() { + // Primary: L1 ASC. Secondary: value DESC. Tied L1 values should be + // broken by descending value. labels are [L1, L2] alphabetical ⇒ index 0 = L1. + let input = vec![ + elem(&["x", "i"], 1.0), + elem(&["x", "j"], 5.0), + elem(&["a", "k"], 3.0), + elem(&["a", "l"], 7.0), + ]; + let order_by = vec![ + OrderByItem { + column: "L1".to_string(), + ascending: true, + }, + OrderByItem { + column: "p99".to_string(), + ascending: false, + }, + ]; + let result = sort_and_truncate_instant_vector( + input, + &label_names(&["L1", "L2"]), + Some("p99"), + &order_by, + None, + ); + let expected: Vec<(&str, f64)> = vec![("a", 7.0), ("a", 3.0), ("x", 5.0), ("x", 1.0)]; + let actual: Vec<(&str, f64)> = result + .iter() + .map(|e| (e.labels.labels[0].as_str(), e.value)) + .collect(); + assert_eq!(actual, expected); + } + + #[test] + fn limit_only_no_orderby_truncates_in_place() { + let input = vec![ + elem(&["a"], 1.0), + elem(&["b"], 2.0), + elem(&["c"], 3.0), + ]; + let result = + sort_and_truncate_instant_vector(input, &label_names(&["L"]), None, &[], Some(2)); + assert_eq!(result.len(), 2); + assert_eq!(result[0].labels.labels[0], "a"); + assert_eq!(result[1].labels.labels[0], "b"); + } + + #[test] + fn nan_values_do_not_panic() { + // partial_cmp returns None for NaN; we map to Equal to keep the comparator total. + let input = vec![ + elem(&["a"], f64::NAN), + elem(&["b"], 1.0), + elem(&["c"], 2.0), + ]; + let order_by = vec![OrderByItem { + column: "p99".to_string(), + ascending: false, + }]; + let result = sort_and_truncate_instant_vector( + input, + &label_names(&["L"]), + Some("p99"), + &order_by, + None, + ); + assert_eq!(result.len(), 3); + } + + #[test] + fn sql_post_processing_default_is_noop() { + // Default == no ORDER BY, no LIMIT, no alias. apply() must hand back the + // exact QueryResult unchanged (no allocation, no reorder). + let post = SqlPostProcessing::default(); + let input = vec![elem(&["c"], 3.0), elem(&["a"], 1.0), elem(&["b"], 2.0)]; + let labels = KeyByLabelNames::new(vec!["L".to_string()]); + let result = QueryResult::Vector(InstantVector { + values: input.clone(), + timestamp: 1234, + }); + let out = post.apply(&labels, result); + let QueryResult::Vector(v) = out else { + panic!("expected vector"); + }; + let values: Vec<&str> = v + .values + .iter() + .map(|e| e.labels.labels[0].as_str()) + .collect(); + assert_eq!(values, vec!["c", "a", "b"]); + assert_eq!(v.timestamp, 1234); + } + + #[test] + fn sql_post_processing_applies_orderby_desc_limit() { + // End-to-end check at the SqlPostProcessing layer: the wrapper unpacks + // the vector, sorts and truncates, and re-wraps preserving timestamp. + let post = SqlPostProcessing { + aggregation_alias: Some("p99".to_string()), + order_by: vec![OrderByItem { + column: "p99".to_string(), + ascending: false, + }], + limit: Some(2), + }; + let labels = KeyByLabelNames::new(vec!["L".to_string()]); + let input = vec![ + elem(&["a"], 1.0), + elem(&["b"], 5.0), + elem(&["c"], 3.0), + elem(&["d"], 2.0), + ]; + let result = QueryResult::Vector(InstantVector { + values: input, + timestamp: 9999, + }); + let out = post.apply(&labels, result); + let QueryResult::Vector(v) = out else { + panic!("expected vector"); + }; + assert_eq!(v.timestamp, 9999); + let values: Vec = v.values.iter().map(|e| e.value).collect(); + assert_eq!(values, vec![5.0, 3.0]); + } +} From eba3fde9b9838599cf488e26d1908e49f8a7e0d8 Mon Sep 17 00:00:00 2001 From: Akanksha Akkihal Date: Sun, 24 May 2026 20:14:19 +0000 Subject: [PATCH 2/4] formatting --- .../src/ast_matching/sqlpattern_parser.rs | 3 ++- .../src/engines/simple_engine/sql.rs | 26 ++++--------------- 2 files changed, 7 insertions(+), 22 deletions(-) diff --git a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs index 7cc8ac17..1b98a592 100644 --- a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs +++ b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_parser.rs @@ -217,7 +217,8 @@ impl SQLPatternParser { let subquery = match &select.from[0].relation { TableFactor::Derived { subquery, .. } => match subquery.body.as_ref() { SetExpr::Select(inner_select) => { - let (inner_aggregation, inner_alias) = self.get_aggregation(inner_select)?; + let (inner_aggregation, inner_alias) = + self.get_aggregation(inner_select)?; let inner_group_bys = self.get_groupbys(inner_select)?; if !self.select_identifiers_subset_of(inner_select, &inner_group_bys) { return None; diff --git a/asap-query-engine/src/engines/simple_engine/sql.rs b/asap-query-engine/src/engines/simple_engine/sql.rs index 08508f36..3201a450 100644 --- a/asap-query-engine/src/engines/simple_engine/sql.rs +++ b/asap-query-engine/src/engines/simple_engine/sql.rs @@ -336,8 +336,7 @@ impl SimpleEngine { query: String, time: f64, ) -> Option { - self.build_sql_query_pieces(query, time) - .map(|(ctx, _)| ctx) + self.build_sql_query_pieces(query, time).map(|(ctx, _)| ctx) } /// Internal: parses + plans a SQL query and returns both the execution @@ -769,21 +768,14 @@ mod sort_and_truncate_tests { #[test] fn order_by_label_ascending_default() { // ORDER BY with no ASC/DESC defaults to ascending. - let input = vec![ - elem(&["c"], 1.0), - elem(&["a"], 2.0), - elem(&["b"], 3.0), - ]; + let input = vec![elem(&["c"], 1.0), elem(&["a"], 2.0), elem(&["b"], 3.0)]; let order_by = vec![OrderByItem { column: "L".to_string(), ascending: true, }]; let result = sort_and_truncate_instant_vector(input, &label_names(&["L"]), None, &order_by, None); - let labels: Vec<&str> = result - .iter() - .map(|e| e.labels.labels[0].as_str()) - .collect(); + let labels: Vec<&str> = result.iter().map(|e| e.labels.labels[0].as_str()).collect(); assert_eq!(labels, vec!["a", "b", "c"]); } @@ -824,11 +816,7 @@ mod sort_and_truncate_tests { #[test] fn limit_only_no_orderby_truncates_in_place() { - let input = vec![ - elem(&["a"], 1.0), - elem(&["b"], 2.0), - elem(&["c"], 3.0), - ]; + let input = vec![elem(&["a"], 1.0), elem(&["b"], 2.0), elem(&["c"], 3.0)]; let result = sort_and_truncate_instant_vector(input, &label_names(&["L"]), None, &[], Some(2)); assert_eq!(result.len(), 2); @@ -839,11 +827,7 @@ mod sort_and_truncate_tests { #[test] fn nan_values_do_not_panic() { // partial_cmp returns None for NaN; we map to Equal to keep the comparator total. - let input = vec![ - elem(&["a"], f64::NAN), - elem(&["b"], 1.0), - elem(&["c"], 2.0), - ]; + let input = vec![elem(&["a"], f64::NAN), elem(&["b"], 1.0), elem(&["c"], 2.0)]; let order_by = vec![OrderByItem { column: "p99".to_string(), ascending: false, From 8e3f4fe6d3998686c170d27789ffc111537881ed Mon Sep 17 00:00:00 2001 From: Akanksha Akkihal Date: Sun, 24 May 2026 20:19:14 +0000 Subject: [PATCH 3/4] restore #[allow(clippy::too_many_arguments)] on build_sql_execution_context_tail --- asap-query-engine/src/engines/simple_engine/sql.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/asap-query-engine/src/engines/simple_engine/sql.rs b/asap-query-engine/src/engines/simple_engine/sql.rs index 3201a450..fb7b431e 100644 --- a/asap-query-engine/src/engines/simple_engine/sql.rs +++ b/asap-query-engine/src/engines/simple_engine/sql.rs @@ -576,6 +576,7 @@ impl SimpleEngine { /// after labels, statistic, metadata, timestamps, and `agg_info` are resolved. /// Builds the query plan, derives grouping/aggregated labels, and returns the /// final `QueryExecutionContext`. + #[allow(clippy::too_many_arguments)] fn build_sql_execution_context_tail( &self, metric: &str, From ab8279e7c43bae78afd5ef2a711920f3b4a5a60e Mon Sep 17 00:00:00 2001 From: Akanksha Akkihal Date: Mon, 25 May 2026 14:58:25 +0000 Subject: [PATCH 4/4] rename build_sql_query_pieces to build_query_execution_context_sql_with_post_processing --- asap-query-engine/src/engines/simple_engine/sql.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/asap-query-engine/src/engines/simple_engine/sql.rs b/asap-query-engine/src/engines/simple_engine/sql.rs index fb7b431e..564e826b 100644 --- a/asap-query-engine/src/engines/simple_engine/sql.rs +++ b/asap-query-engine/src/engines/simple_engine/sql.rs @@ -321,7 +321,8 @@ impl SimpleEngine { query: String, time: f64, ) -> Option<(KeyByLabelNames, QueryResult)> { - let (context, post) = self.build_sql_query_pieces(query, time)?; + let (context, post) = + self.build_query_execution_context_sql_with_post_processing(query, time)?; let (output_labels, result) = self.execute_context(context, false)?; let result = post.apply(&output_labels, result); Some((output_labels, result)) @@ -336,13 +337,14 @@ impl SimpleEngine { query: String, time: f64, ) -> Option { - self.build_sql_query_pieces(query, time).map(|(ctx, _)| ctx) + self.build_query_execution_context_sql_with_post_processing(query, time) + .map(|(ctx, _)| ctx) } /// Internal: parses + plans a SQL query and returns both the execution /// context (shared with PromQL/Elastic engines) and the SQL-only /// post-processing rules (ORDER BY / LIMIT / alias resolution). - fn build_sql_query_pieces( + fn build_query_execution_context_sql_with_post_processing( &self, query: String, time: f64,