From b35b7bfffdfb058eafc824dbe4c94743fdba4833 Mon Sep 17 00:00:00 2001 From: Akanksha Akkihal Date: Fri, 5 Jun 2026 22:19:13 +0000 Subject: [PATCH 1/3] split enable_topk into limiting vs formatting flags --- .../src/engines/simple_engine/elastic.rs | 2 +- .../src/engines/simple_engine/mod.rs | 115 +++++++++--------- .../src/engines/simple_engine/promql.rs | 2 +- .../src/engines/simple_engine/sql.rs | 2 +- .../tests/test_utilities/engine_factories.rs | 2 +- 5 files changed, 64 insertions(+), 59 deletions(-) diff --git a/asap-query-engine/src/engines/simple_engine/elastic.rs b/asap-query-engine/src/engines/simple_engine/elastic.rs index 572840b9..1c427eb7 100644 --- a/asap-query-engine/src/engines/simple_engine/elastic.rs +++ b/asap-query-engine/src/engines/simple_engine/elastic.rs @@ -25,7 +25,7 @@ impl SimpleEngine { "Built execution context for ElasticSearch query {:?}", context ); - self.execute_context(context, false) + self.execute_context(context, false, false) } pub fn build_query_execution_context_elastic( diff --git a/asap-query-engine/src/engines/simple_engine/mod.rs b/asap-query-engine/src/engines/simple_engine/mod.rs index ff01e98e..3cd069f1 100644 --- a/asap-query-engine/src/engines/simple_engine/mod.rs +++ b/asap-query-engine/src/engines/simple_engine/mod.rs @@ -626,27 +626,32 @@ impl SimpleEngine { merged_keys: Option<&HashMap, Box>>, statistic: &Statistic, query_kwargs: &HashMap, - enable_topk_limiting: bool, ) -> Result, f64>, String> { if let Some(keys_map) = merged_keys { // Separate keys and values self.collect_results_separate_keys(merged_values, keys_map, statistic, query_kwargs) } else { // Same aggregation for keys and values - self.collect_results_same_aggregation( - merged_values, - statistic, - query_kwargs, - enable_topk_limiting, - ) + self.collect_results_same_aggregation(merged_values, statistic, query_kwargs) } } - /// Executes the complete query pipeline: plan, execute, collect, and format + /// Executes the complete query pipeline: plan, execute, collect, and format. + /// + /// The two top-k flags are deliberately separate because the two engines + /// need different halves of the behaviour: + /// * `enable_topk_limiting` — enumerate candidate keys from the sketch + /// heap and truncate to `k` during collection. Used by both PromQL and + /// SQL top-k (it's what makes the heap actually drive the result set). + /// * `enable_topk_formatting` — sort by value descending AND prepend the + /// metric name to each key's labels. This is PromQL `topk(...)` output + /// shape only; SQL returns bare `(group-by columns, value)` rows and + /// applies its own ORDER BY / LIMIT, so SQL leaves this `false`. pub fn execute_query_pipeline( &self, context: &QueryExecutionContext, - enable_topk: bool, + enable_topk_limiting: bool, + enable_topk_formatting: bool, ) -> Result, String> { // Step 1: Execute the query plan (already created in context.store_plan) let (merged_values, merged_keys) = self.execute_and_merge_store_queries( @@ -662,7 +667,6 @@ impl SimpleEngine { merged_keys.as_ref(), &context.metadata.statistic_to_compute, &context.metadata.query_kwargs, - enable_topk, // SQL=false, PromQL=true )?; debug!( "[LATENCY] Unformatted results collection: {:.2}ms", @@ -671,12 +675,24 @@ impl SimpleEngine { // Step 3: Format results let results_start_time = Instant::now(); - let results = self.format_final_results( + let mut results = self.format_final_results( unformatted_results, &context.metadata.statistic_to_compute, &context.metric, - enable_topk, // SQL=false, PromQL=true + enable_topk_formatting, ); + // Truncate to k when limiting is active (heap may carry heap_size > k + // candidates; the query only asked for the top k). + if enable_topk_limiting { + if let Some(k) = context + .metadata + .query_kwargs + .get("k") + .and_then(|s| s.parse::().ok()) + { + results.truncate(k); + } + } debug!( "[LATENCY] Results collection: {}ms", results_start_time.elapsed().as_millis() @@ -841,8 +857,14 @@ impl SimpleEngine { Ok(self.format_final_results(all_results, statistic, metric, false)) } - /// Formats unformatted results into final InstantVectorElement format - /// For topk queries (when enabled), sorts by value and prepends metric name to keys + /// Formats unformatted results into final InstantVectorElement format. + /// + /// For top-k queries the rows are always sorted by value descending (that's + /// the semantics of top-k, and lets the caller truncate to `k` correctly + /// regardless of HashMap iteration order). `enable_topk_formatting` + /// additionally prepends the metric name to each key's labels — this is the + /// PromQL `topk(...)` output shape only; SQL leaves it `false` so rows stay + /// as bare `(group-by columns, value)`. fn format_final_results( &self, unformatted_results: HashMap, f64>, @@ -850,13 +872,14 @@ impl SimpleEngine { metric: &str, enable_topk_formatting: bool, ) -> Vec { - let sorted_results: Vec<(Option, f64)> = - if *statistic == Statistic::Topk && enable_topk_formatting { - // Sort by value descending for topk - let mut sorted: Vec<_> = unformatted_results.into_iter().collect(); - sorted.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)); + let sorted_results: Vec<(Option, f64)> = if *statistic == Statistic::Topk + { + // Sort by value descending for topk (independent of output formatting). + let mut sorted: Vec<_> = unformatted_results.into_iter().collect(); + sorted.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)); - // Prepend metric name to each key's label values + if enable_topk_formatting { + // Prepend metric name to each key's label values (PromQL shape). sorted .into_iter() .map(|(key_opt, value)| { @@ -870,8 +893,11 @@ impl SimpleEngine { }) .collect() } else { - unformatted_results.into_iter().collect() - }; + sorted + } + } else { + unformatted_results.into_iter().collect() + }; sorted_results .into_iter() @@ -990,10 +1016,11 @@ impl SimpleEngine { fn execute_context( &self, context: QueryExecutionContext, - enable_topk: bool, + enable_topk_limiting: bool, + enable_topk_formatting: bool, ) -> Option<(KeyByLabelNames, QueryResult)> { let results = self - .execute_query_pipeline(&context, enable_topk) + .execute_query_pipeline(&context, enable_topk_limiting, enable_topk_formatting) .map_err(|e| { warn!("Query execution failed: {}", e); e @@ -1191,25 +1218,25 @@ impl SimpleEngine { Ok(unformatted_results) } - /// Collects results when key and value use same aggregation + /// Collects results when key and value use same aggregation. + /// + /// For keyed accumulators (incl. `CountMinSketchWithHeap`) this enumerates + /// every candidate key the accumulator exposes. Top-k ordering/truncation is + /// applied later (sort in `format_final_results`, truncate-to-k in + /// `execute_query_pipeline`) so we must NOT pre-truncate here — the sketch + /// heap can hold more than `k` candidates and is not value-sorted, so + /// dropping keys now could discard a true top-k member. fn collect_results_same_aggregation( &self, merged_outputs: &HashMap, Box>, statistic: &Statistic, query_kwargs: &HashMap, - enable_topk_limiting: bool, ) -> Result, f64>, String> { let mut unformatted_results = HashMap::new(); for (key, precompute) in merged_outputs { if let Some(unwrapped_keys) = precompute.get_keys() { - let keys_to_process = if enable_topk_limiting { - self.limit_keys_for_topk(unwrapped_keys, statistic, query_kwargs)? - } else { - unwrapped_keys - }; - - for key_for_this_precompute in keys_to_process { + for key_for_this_precompute in unwrapped_keys { let value = self .query_precompute_for_statistic( precompute.as_ref(), @@ -1238,28 +1265,6 @@ impl SimpleEngine { Ok(unformatted_results) } - /// Limits keys for topk queries - fn limit_keys_for_topk( - &self, - keys: Vec, - statistic: &Statistic, - query_kwargs: &HashMap, - ) -> Result, String> { - if *statistic != Statistic::Topk { - return Ok(keys); - } - - let k_str = query_kwargs - .get("k") - .ok_or_else(|| "Missing k parameter for topk".to_string())?; - - let k = k_str - .parse::() - .map_err(|_| format!("Failed to parse k: '{}'", k_str))?; - - Ok(keys.into_iter().take(k).collect()) - } - fn query_precompute_for_statistic( &self, precompute: &dyn AggregateCore, diff --git a/asap-query-engine/src/engines/simple_engine/promql.rs b/asap-query-engine/src/engines/simple_engine/promql.rs index 7c062201..f7cd8de9 100644 --- a/asap-query-engine/src/engines/simple_engine/promql.rs +++ b/asap-query-engine/src/engines/simple_engine/promql.rs @@ -919,7 +919,7 @@ impl SimpleEngine { context.store_plan.values_query.end_timestamp ); - let result = self.execute_context(context, true); + let result = self.execute_context(context, true, true); // Determine query routing order based on function type. // USampling functions prefer the precomputed path first (sketch fallback), diff --git a/asap-query-engine/src/engines/simple_engine/sql.rs b/asap-query-engine/src/engines/simple_engine/sql.rs index 564e826b..d9fe8640 100644 --- a/asap-query-engine/src/engines/simple_engine/sql.rs +++ b/asap-query-engine/src/engines/simple_engine/sql.rs @@ -323,7 +323,7 @@ impl SimpleEngine { ) -> Option<(KeyByLabelNames, QueryResult)> { let (context, post) = self.build_query_execution_context_sql_with_post_processing(query, time)?; - let (output_labels, result) = self.execute_context(context, false)?; + let (output_labels, result) = self.execute_context(context, false, false)?; let result = post.apply(&output_labels, result); Some((output_labels, result)) } diff --git a/asap-query-engine/src/tests/test_utilities/engine_factories.rs b/asap-query-engine/src/tests/test_utilities/engine_factories.rs index 25947214..53848f5f 100644 --- a/asap-query-engine/src/tests/test_utilities/engine_factories.rs +++ b/asap-query-engine/src/tests/test_utilities/engine_factories.rs @@ -638,7 +638,7 @@ pub async fn assert_old_new_match(engine: &SimpleEngine, query: &str, query_time .expect("Failed to build context"); let old_results = engine - .execute_query_pipeline(&context, false) + .execute_query_pipeline(&context, false, false) .expect("Old pipeline failed"); let new_results = engine From c98b248fc9e4dc3521056317a288272670b4913a Mon Sep 17 00:00:00 2001 From: Akanksha Akkihal Date: Fri, 5 Jun 2026 22:56:53 +0000 Subject: [PATCH 2/3] add PromQL topk pipeline and Prometheus wire format tests --- .../src/engines/simple_engine/promql.rs | 211 ++++++++++++++++++ 1 file changed, 211 insertions(+) diff --git a/asap-query-engine/src/engines/simple_engine/promql.rs b/asap-query-engine/src/engines/simple_engine/promql.rs index f7cd8de9..0bea7706 100644 --- a/asap-query-engine/src/engines/simple_engine/promql.rs +++ b/asap-query-engine/src/engines/simple_engine/promql.rs @@ -1260,3 +1260,214 @@ impl SimpleEngine { )) } } + +/// End-to-end tests for PromQL `topk(k, …)` served by `CountMinSketchWithHeap`. +/// +/// Exercises the PromQL half of the top-k flag split: +/// `execute_query_pipeline(ctx, true, true)`. Unlike SQL (`true, false`), PromQL +/// enables formatting so each result row carries the metric name as the first +/// label value (Prometheus series shape). +#[cfg(test)] +mod topk_pipeline_tests { + use super::SimpleEngine; + use crate::data_model::{ + AggregationConfig, AggregationReference, AggregationType, CleanupPolicy, InferenceConfig, + PrecomputedOutput, PromQLSchema, QueryConfig, QueryLanguage, SchemaConfig, StreamingConfig, + WindowType, + }; + use crate::engines::QueryResult; + use crate::precompute_operators::CountMinSketchWithHeapAccumulator; + use crate::stores::simple_map_store::SimpleMapStore; + use crate::stores::Store; + use crate::utils::http::convert_query_result_to_prometheus; + use promql_utilities::data_model::KeyByLabelNames; + use promql_utilities::query_logics::enums::Statistic; + use std::collections::{HashMap, HashSet}; + use std::sync::Arc; + + const AGG_ID: u64 = 101; + const METRIC: &str = "transfer_events"; + // Aligned to the 1s scrape interval (multiple of 1000ms). + const QUERY_TIME: f64 = 1_759_276_810.0; + const TOPK_QUERY: &str = "topk(10, transfer_events)"; + + fn build_topk_engine() -> (SimpleEngine, Arc) { + let promql_schema = PromQLSchema::new().add_metric( + METRIC.to_string(), + KeyByLabelNames::new(vec!["srcip".to_string()]), + ); + + let query_config = QueryConfig::new(TOPK_QUERY.to_string()) + .add_aggregation(AggregationReference::new(AGG_ID, None)); + + let inference_config = InferenceConfig { + schema: SchemaConfig::PromQL(promql_schema), + query_configs: vec![query_config], + cleanup_policy: CleanupPolicy::NoCleanup, + }; + + let agg_config = AggregationConfig { + aggregation_id: AGG_ID, + aggregation_type: AggregationType::CountMinSketchWithHeap, + aggregation_sub_type: String::new(), + parameters: HashMap::new(), + grouping_labels: KeyByLabelNames::empty(), + aggregated_labels: KeyByLabelNames::new(vec!["srcip".to_string()]), + rollup_labels: KeyByLabelNames::empty(), + original_yaml: String::new(), + window_size: 1, + slide_interval: 1, + window_type: WindowType::Tumbling, + spatial_filter: String::new(), + spatial_filter_normalized: String::new(), + metric: METRIC.to_string(), + num_aggregates_to_retain: None, + read_count_threshold: None, + table_name: None, + value_column: None, + }; + + let mut agg_configs = HashMap::new(); + agg_configs.insert(AGG_ID, agg_config); + let streaming_config = Arc::new(StreamingConfig { + aggregation_configs: agg_configs, + }); + + let store = Arc::new(SimpleMapStore::new( + streaming_config.clone(), + CleanupPolicy::NoCleanup, + )); + + let engine = SimpleEngine::new( + store.clone(), + inference_config, + streaming_config, + 1, + QueryLanguage::promql, + ); + (engine, store) + } + + #[test] + fn detects_topk_and_resolves_self_keyed_heap() { + let (engine, _store) = build_topk_engine(); + let context = engine + .build_query_execution_context_promql(TOPK_QUERY.to_string(), QUERY_TIME) + .expect("topk(k, metric) should build a context via the query_config path"); + + assert_eq!( + context.metadata.statistic_to_compute, + Statistic::Topk, + "topk(...) must resolve to Statistic::Topk", + ); + assert_eq!( + context.metadata.query_kwargs.get("k").map(String::as_str), + Some("10"), + "the topk k argument should be threaded through as the `k` kwarg", + ); + assert_eq!( + context.agg_info.aggregation_id_for_key, + context.agg_info.aggregation_id_for_value, + ); + assert!(context.store_plan.keys_query.is_none()); + assert_eq!( + context.metadata.query_output_labels.labels, + vec!["__name__".to_string(), "srcip".to_string()], + "topk PromQL rows zip to {{ __name__, srcip }} in the wire format", + ); + } + + #[test] + fn returns_top_k_srcips_sorted_descending_with_metric_prefix() { + let (engine, store) = build_topk_engine(); + + let context = engine + .build_query_execution_context_promql(TOPK_QUERY.to_string(), QUERY_TIME) + .expect("context should build"); + let window = &context.store_plan.values_query; + + let mut sketch = CountMinSketchWithHeapAccumulator::new(3, 1024, 32); + for i in 1..=15u64 { + let srcip = format!("10.0.0.{i}"); + sketch.inner.update(&srcip, (i * 10) as f64); + } + + let output = + PrecomputedOutput::new(window.start_timestamp, window.end_timestamp, None, AGG_ID); + store + .insert_precomputed_output(output, Box::new(sketch)) + .expect("insert should succeed"); + + let results = engine + .execute_query_pipeline(&context, true, true) + .expect("pipeline should produce results"); + + assert_eq!(results.len(), 10, "topk(10, ...) must truncate to 10 rows"); + + for pair in results.windows(2) { + assert!( + pair[0].value >= pair[1].value, + "results must be sorted by count descending: {} then {}", + pair[0].value, + pair[1].value, + ); + } + + assert_eq!( + results[0].labels.labels, + vec![METRIC.to_string(), "10.0.0.15".to_string()], + ); + assert_eq!(results[0].value, 150.0); + for element in &results { + assert_eq!( + element.labels.labels.len(), + 2, + "PromQL top-k rows carry the metric-name prefix plus the srcip", + ); + assert_eq!( + element.labels.labels[0], METRIC, + "first label value must be the metric name (PromQL formatting)", + ); + } + + let returned: HashSet = + results.iter().map(|e| e.labels.labels[1].clone()).collect(); + let expected: HashSet = (6..=15u64).map(|i| format!("10.0.0.{i}")).collect(); + assert_eq!(returned, expected); + + // Wire format: zip label names with values into Prometheus instant-vector JSON. + let output_labels = context.metadata.query_output_labels.clone(); + let query_result = QueryResult::vector(results, context.query_time); + let prometheus_data = + convert_query_result_to_prometheus(&query_result, &output_labels).expect( + "pipeline output should convert to Prometheus instant-vector JSON", + ); + + assert_eq!(prometheus_data["resultType"], "vector"); + let wire_rows = prometheus_data["result"] + .as_array() + .expect("result must be an array"); + assert_eq!(wire_rows.len(), 10); + + let top_row = &wire_rows[0]; + assert_eq!(top_row["metric"]["__name__"], METRIC); + assert_eq!(top_row["metric"]["srcip"], "10.0.0.15"); + assert_eq!(top_row["value"][0], QUERY_TIME); + assert_eq!(top_row["value"][1], "150"); + + for row in wire_rows { + assert_eq!(row["metric"]["__name__"], METRIC); + assert!(row["metric"]["srcip"].is_string()); + assert!(row["value"][1].is_string()); + } + + // Descending order is preserved in the wire format. + let wire_values: Vec = wire_rows + .iter() + .map(|row| row["value"][1].as_str().unwrap().parse::().unwrap()) + .collect(); + for pair in wire_values.windows(2) { + assert!(pair[0] >= pair[1]); + } + } +} From 58dec026c73a5c22eff83e5bef6992d471f775db Mon Sep 17 00:00:00 2001 From: Akanksha Akkihal Date: Fri, 5 Jun 2026 22:59:48 +0000 Subject: [PATCH 3/3] formatting --- asap-query-engine/src/engines/simple_engine/promql.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/asap-query-engine/src/engines/simple_engine/promql.rs b/asap-query-engine/src/engines/simple_engine/promql.rs index 0bea7706..25ee689e 100644 --- a/asap-query-engine/src/engines/simple_engine/promql.rs +++ b/asap-query-engine/src/engines/simple_engine/promql.rs @@ -1438,10 +1438,8 @@ mod topk_pipeline_tests { // Wire format: zip label names with values into Prometheus instant-vector JSON. let output_labels = context.metadata.query_output_labels.clone(); let query_result = QueryResult::vector(results, context.query_time); - let prometheus_data = - convert_query_result_to_prometheus(&query_result, &output_labels).expect( - "pipeline output should convert to Prometheus instant-vector JSON", - ); + let prometheus_data = convert_query_result_to_prometheus(&query_result, &output_labels) + .expect("pipeline output should convert to Prometheus instant-vector JSON"); assert_eq!(prometheus_data["resultType"], "vector"); let wire_rows = prometheus_data["result"]