Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion asap-query-engine/src/engines/simple_engine/elastic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl SimpleEngine {
"Built execution context for ElasticSearch query {:?}",
context
);
self.execute_context(context, false)
self.execute_context(context, false, false)
}

pub fn build_query_execution_context_elastic(
Expand Down
115 changes: 60 additions & 55 deletions asap-query-engine/src/engines/simple_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,27 +626,32 @@ impl SimpleEngine {
merged_keys: Option<&HashMap<Option<KeyByLabelValues>, Box<dyn AggregateCore>>>,
statistic: &Statistic,
query_kwargs: &HashMap<String, String>,
enable_topk_limiting: bool,
) -> Result<HashMap<Option<KeyByLabelValues>, f64>, String> {
if let Some(keys_map) = merged_keys {
// Separate keys and values
self.collect_results_separate_keys(merged_values, keys_map, statistic, query_kwargs)
} else {
// Same aggregation for keys and values
self.collect_results_same_aggregation(
merged_values,
statistic,
query_kwargs,
enable_topk_limiting,
)
self.collect_results_same_aggregation(merged_values, statistic, query_kwargs)
}
}

/// Executes the complete query pipeline: plan, execute, collect, and format
/// Executes the complete query pipeline: plan, execute, collect, and format.
///
/// The two top-k flags are deliberately separate because the two engines
/// need different halves of the behaviour:
/// * `enable_topk_limiting` — enumerate candidate keys from the sketch
/// heap and truncate to `k` during collection. Used by both PromQL and
/// SQL top-k (it's what makes the heap actually drive the result set).
/// * `enable_topk_formatting` — sort by value descending AND prepend the
/// metric name to each key's labels. This is PromQL `topk(...)` output
/// shape only; SQL returns bare `(group-by columns, value)` rows and
/// applies its own ORDER BY / LIMIT, so SQL leaves this `false`.
pub fn execute_query_pipeline(
&self,
context: &QueryExecutionContext,
enable_topk: bool,
enable_topk_limiting: bool,
enable_topk_formatting: bool,
) -> Result<Vec<InstantVectorElement>, String> {
// Step 1: Execute the query plan (already created in context.store_plan)
let (merged_values, merged_keys) = self.execute_and_merge_store_queries(
Expand All @@ -662,7 +667,6 @@ impl SimpleEngine {
merged_keys.as_ref(),
&context.metadata.statistic_to_compute,
&context.metadata.query_kwargs,
enable_topk, // SQL=false, PromQL=true
)?;
debug!(
"[LATENCY] Unformatted results collection: {:.2}ms",
Expand All @@ -671,12 +675,24 @@ impl SimpleEngine {

// Step 3: Format results
let results_start_time = Instant::now();
let results = self.format_final_results(
let mut results = self.format_final_results(
unformatted_results,
&context.metadata.statistic_to_compute,
&context.metric,
enable_topk, // SQL=false, PromQL=true
enable_topk_formatting,
);
// Truncate to k when limiting is active (heap may carry heap_size > k
// candidates; the query only asked for the top k).
if enable_topk_limiting {
if let Some(k) = context
.metadata
.query_kwargs
.get("k")
.and_then(|s| s.parse::<usize>().ok())
{
results.truncate(k);
}
}
debug!(
"[LATENCY] Results collection: {}ms",
results_start_time.elapsed().as_millis()
Expand Down Expand Up @@ -841,22 +857,29 @@ impl SimpleEngine {
Ok(self.format_final_results(all_results, statistic, metric, false))
}

/// Formats unformatted results into final InstantVectorElement format
/// For topk queries (when enabled), sorts by value and prepends metric name to keys
/// Formats unformatted results into final InstantVectorElement format.
///
/// For top-k queries the rows are always sorted by value descending (that's
/// the semantics of top-k, and lets the caller truncate to `k` correctly
/// regardless of HashMap iteration order). `enable_topk_formatting`
/// additionally prepends the metric name to each key's labels — this is the
/// PromQL `topk(...)` output shape only; SQL leaves it `false` so rows stay
/// as bare `(group-by columns, value)`.
fn format_final_results(
&self,
unformatted_results: HashMap<Option<KeyByLabelValues>, f64>,
statistic: &Statistic,
metric: &str,
enable_topk_formatting: bool,
) -> Vec<InstantVectorElement> {
let sorted_results: Vec<(Option<KeyByLabelValues>, f64)> =
if *statistic == Statistic::Topk && enable_topk_formatting {
// Sort by value descending for topk
let mut sorted: Vec<_> = unformatted_results.into_iter().collect();
sorted.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
let sorted_results: Vec<(Option<KeyByLabelValues>, f64)> = if *statistic == Statistic::Topk
{
// Sort by value descending for topk (independent of output formatting).
let mut sorted: Vec<_> = unformatted_results.into_iter().collect();
sorted.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));

// Prepend metric name to each key's label values
if enable_topk_formatting {
// Prepend metric name to each key's label values (PromQL shape).
sorted
.into_iter()
.map(|(key_opt, value)| {
Expand All @@ -870,8 +893,11 @@ impl SimpleEngine {
})
.collect()
} else {
unformatted_results.into_iter().collect()
};
sorted
}
} else {
unformatted_results.into_iter().collect()
};

sorted_results
.into_iter()
Expand Down Expand Up @@ -990,10 +1016,11 @@ impl SimpleEngine {
fn execute_context(
&self,
context: QueryExecutionContext,
enable_topk: bool,
enable_topk_limiting: bool,
enable_topk_formatting: bool,
) -> Option<(KeyByLabelNames, QueryResult)> {
let results = self
.execute_query_pipeline(&context, enable_topk)
.execute_query_pipeline(&context, enable_topk_limiting, enable_topk_formatting)
.map_err(|e| {
warn!("Query execution failed: {}", e);
e
Expand Down Expand Up @@ -1191,25 +1218,25 @@ impl SimpleEngine {
Ok(unformatted_results)
}

/// Collects results when key and value use same aggregation
/// Collects results when key and value use same aggregation.
///
/// For keyed accumulators (incl. `CountMinSketchWithHeap`) this enumerates
/// every candidate key the accumulator exposes. Top-k ordering/truncation is
/// applied later (sort in `format_final_results`, truncate-to-k in
/// `execute_query_pipeline`) so we must NOT pre-truncate here — the sketch
/// heap can hold more than `k` candidates and is not value-sorted, so
/// dropping keys now could discard a true top-k member.
fn collect_results_same_aggregation(
&self,
merged_outputs: &HashMap<Option<KeyByLabelValues>, Box<dyn AggregateCore>>,
statistic: &Statistic,
query_kwargs: &HashMap<String, String>,
enable_topk_limiting: bool,
) -> Result<HashMap<Option<KeyByLabelValues>, f64>, String> {
let mut unformatted_results = HashMap::new();

for (key, precompute) in merged_outputs {
if let Some(unwrapped_keys) = precompute.get_keys() {
let keys_to_process = if enable_topk_limiting {
self.limit_keys_for_topk(unwrapped_keys, statistic, query_kwargs)?
} else {
unwrapped_keys
};

for key_for_this_precompute in keys_to_process {
for key_for_this_precompute in unwrapped_keys {
let value = self
.query_precompute_for_statistic(
precompute.as_ref(),
Expand Down Expand Up @@ -1238,28 +1265,6 @@ impl SimpleEngine {
Ok(unformatted_results)
}

/// Limits keys for topk queries
fn limit_keys_for_topk(
&self,
keys: Vec<KeyByLabelValues>,
statistic: &Statistic,
query_kwargs: &HashMap<String, String>,
) -> Result<Vec<KeyByLabelValues>, String> {
if *statistic != Statistic::Topk {
return Ok(keys);
}

let k_str = query_kwargs
.get("k")
.ok_or_else(|| "Missing k parameter for topk".to_string())?;

let k = k_str
.parse::<usize>()
.map_err(|_| format!("Failed to parse k: '{}'", k_str))?;

Ok(keys.into_iter().take(k).collect())
}

fn query_precompute_for_statistic(
&self,
precompute: &dyn AggregateCore,
Expand Down
Loading
Loading