|
| 1 | +//! Elasticsearch DSL query language handler for SimpleEngine. |
| 2 | +//! |
| 3 | +//! Contains all Elastic DSL-specific context building and query dispatch. |
| 4 | +
|
| 5 | +use super::SimpleEngine; |
| 6 | +use super::{QueryExecutionContext, QueryMetadata, QueryTimestamps}; |
| 7 | +use crate::engines::query_result::QueryResult; |
| 8 | +use elastic_dsl_utilities::pattern::parse_and_classify; |
| 9 | +use elastic_dsl_utilities::types::{EsDslQueryPattern, GroupBySpec, MetricAggType}; |
| 10 | +use promql_utilities::data_model::KeyByLabelNames; |
| 11 | +use promql_utilities::query_logics::enums::Statistic; |
| 12 | +use std::collections::HashMap; |
| 13 | +use tracing::{debug, warn}; |
| 14 | + |
| 15 | +impl SimpleEngine { |
| 16 | + pub fn handle_query_elastic( |
| 17 | + &self, |
| 18 | + query: String, |
| 19 | + time: f64, |
| 20 | + ) -> Option<(KeyByLabelNames, QueryResult)> { |
| 21 | + let context = self.build_query_execution_context_elastic(query, time)?; |
| 22 | + debug!( |
| 23 | + "Built execution context for ElasticSearch query {:?}", |
| 24 | + context |
| 25 | + ); |
| 26 | + self.execute_context(context, false) |
| 27 | + } |
| 28 | + |
| 29 | + pub fn build_query_execution_context_elastic( |
| 30 | + &self, |
| 31 | + query: String, |
| 32 | + time: f64, |
| 33 | + ) -> Option<QueryExecutionContext> { |
| 34 | + let query_time = Self::convert_query_time_to_data_time(time); |
| 35 | + |
| 36 | + // 1. Parse query DSL somehow. Elasticsearch DSL crate does not support deserializing, but maybe can use Opensearch instead? |
| 37 | + // 2. Determine whether query is supported using some AST representation or hardcoded pattern matching. |
| 38 | + let query_pattern: EsDslQueryPattern = |
| 39 | + parse_and_classify(&query).unwrap_or(EsDslQueryPattern::Unknown); |
| 40 | + match query_pattern { |
| 41 | + EsDslQueryPattern::Unknown => { |
| 42 | + debug!("Could not parse query into known pattern"); |
| 43 | + return None; |
| 44 | + } |
| 45 | + _ => { |
| 46 | + debug!("Parsed query pattern: {:?}", query_pattern); |
| 47 | + } |
| 48 | + } |
| 49 | + |
| 50 | + // 3. Convert parsed query into execution context components (labels, statistic, kwargs, metadata, store query plan, etc.) |
| 51 | + |
| 52 | + // TODO: Figure out how to handle query configuration for ElasticSearch queries. |
| 53 | + let query_config = self.find_query_config(&query)?; |
| 54 | + let agg_info = self |
| 55 | + .get_aggregation_id_info(query_config) |
| 56 | + .map_err(|e| { |
| 57 | + warn!("{}", e); |
| 58 | + e |
| 59 | + }) |
| 60 | + .ok()?; |
| 61 | + |
| 62 | + let do_merge = true; // No "instant" queries in ElasticSearch supported for now, so we always need to merge. |
| 63 | + |
| 64 | + let (metric, query_metadata) = self.build_query_metadata_elastic(&query_pattern)?; |
| 65 | + |
| 66 | + let spatial_filter = String::new(); // Placeholder - extract from query if applicable |
| 67 | + |
| 68 | + // TODO: Need way to parse ES DSL "date math". |
| 69 | + let timestamps = self.resolve_query_time_range_elastic(query_time, query_pattern); |
| 70 | + |
| 71 | + let query_plan = self |
| 72 | + .create_store_query_plan(&metric, ×tamps, &agg_info) |
| 73 | + .map_err(|e| { |
| 74 | + warn!("Failed to create store query plan: {}", e); |
| 75 | + e |
| 76 | + }) |
| 77 | + .ok()?; |
| 78 | + |
| 79 | + let grouping_labels = self |
| 80 | + .streaming_config |
| 81 | + .get_aggregation_config(agg_info.aggregation_id_for_value) |
| 82 | + .map(|config| config.grouping_labels.clone()) |
| 83 | + .unwrap_or_else(|| query_metadata.query_output_labels.clone()); |
| 84 | + |
| 85 | + let aggregated_labels = self |
| 86 | + .streaming_config |
| 87 | + .get_aggregation_config(agg_info.aggregation_id_for_key) |
| 88 | + .map(|config| config.aggregated_labels.clone()) |
| 89 | + .unwrap_or_else(KeyByLabelNames::empty); |
| 90 | + |
| 91 | + Some(QueryExecutionContext { |
| 92 | + metric, |
| 93 | + metadata: query_metadata, |
| 94 | + store_plan: query_plan.clone(), |
| 95 | + agg_info: agg_info.clone(), |
| 96 | + do_merge, |
| 97 | + spatial_filter, |
| 98 | + query_time, |
| 99 | + grouping_labels, |
| 100 | + aggregated_labels, |
| 101 | + }) |
| 102 | + } |
| 103 | + |
| 104 | + fn build_query_metadata_elastic( |
| 105 | + &self, |
| 106 | + query_pattern: &EsDslQueryPattern, |
| 107 | + ) -> Option<(String, QueryMetadata)> { |
| 108 | + // Constructs QueryMetadata based on the parsed ES DSL query pattern. This includes determining the |
| 109 | + // metric to query, the statistic to compute, and any relevant query kwargs (e.g. quantile value for percentiles). |
| 110 | + |
| 111 | + // Figure out aggregation type and what labels are included in output. |
| 112 | + // By default, we only include grouping labels in the output for ES DSL. |
| 113 | + |
| 114 | + // Take first aggregation by default since current engine doesn't support multiple aggregations in a single query. |
| 115 | + let aggregation = query_pattern.get_metric_aggs()?.first()?.clone(); |
| 116 | + |
| 117 | + // By default, we only include grouping labels in the output for ES DSL. |
| 118 | + let query_output_labels = match query_pattern.get_groupby_spec() { |
| 119 | + Some(GroupBySpec::Terms { field }) => KeyByLabelNames::new(vec![field.clone()]), |
| 120 | + Some(GroupBySpec::MultiTerms { fields }) => KeyByLabelNames::new(fields.to_vec()), |
| 121 | + None => KeyByLabelNames::empty(), |
| 122 | + }; |
| 123 | + |
| 124 | + let metric = aggregation.field.clone(); |
| 125 | + |
| 126 | + // Map ElasticSearch aggregation types to our internal Statistic enum. |
| 127 | + let statistic_to_compute = match aggregation.agg_type { |
| 128 | + MetricAggType::Percentiles => Statistic::Quantile, |
| 129 | + MetricAggType::Avg => Statistic::Rate, |
| 130 | + MetricAggType::Sum => Statistic::Sum, |
| 131 | + MetricAggType::Min => Statistic::Min, |
| 132 | + MetricAggType::Max => Statistic::Max, |
| 133 | + }; |
| 134 | + |
| 135 | + let mut query_kwargs = HashMap::new(); // Placeholder - build based on query and statistic |
| 136 | + if aggregation.agg_type == MetricAggType::Percentiles { |
| 137 | + // Extract quantile value from aggregation parameters and add to query_kwargs |
| 138 | + if let Some(params) = &aggregation.params { |
| 139 | + if let Some(percents) = params.get("percents") { |
| 140 | + // Get first value from percents array since we only support one quantile argument for now. |
| 141 | + let quantile = percents |
| 142 | + .as_array() |
| 143 | + .and_then(|arr| arr.first()) |
| 144 | + .and_then(|v| v.as_f64()); |
| 145 | + // ES percentiles are specified as values between 0 and 100, but we want to convert to 0-1 range for our internal representation. |
| 146 | + query_kwargs.insert("quantile".to_string(), (quantile? / 100.0).to_string()); |
| 147 | + } |
| 148 | + } |
| 149 | + } |
| 150 | + |
| 151 | + let metadata = QueryMetadata { |
| 152 | + query_output_labels: query_output_labels.clone(), |
| 153 | + statistic_to_compute, |
| 154 | + query_kwargs: query_kwargs.clone(), |
| 155 | + }; |
| 156 | + Some((metric, metadata)) |
| 157 | + } |
| 158 | + |
| 159 | + pub fn resolve_query_time_range_elastic( |
| 160 | + &self, |
| 161 | + query_time: u64, |
| 162 | + query_pattern: EsDslQueryPattern, |
| 163 | + ) -> QueryTimestamps { |
| 164 | + // Resolves the actual start and end timestamps into milliseconds for an ElasticSearch query |
| 165 | + // based on the provided query_time and the time range specified in the ES DSL query pattern (if any). |
| 166 | + // If no time range is specified, default to entire history up to query_time. |
| 167 | + |
| 168 | + let mut start_timestamp: u64 = 0; |
| 169 | + let mut end_timestamp: u64 = query_time; |
| 170 | + |
| 171 | + let time_range = query_pattern.get_time_range(); |
| 172 | + if let Some(tr) = time_range { |
| 173 | + if let Some(resolved_range) = tr.resolve_epoch_millis(query_time as i64) { |
| 174 | + debug!( |
| 175 | + "Parsed time range from query: start={} end={}", |
| 176 | + resolved_range.gte_ms.unwrap_or(0), |
| 177 | + resolved_range.lte_ms.unwrap_or(0) |
| 178 | + ); |
| 179 | + start_timestamp = resolved_range.gte_ms.unwrap_or(0) as u64; |
| 180 | + end_timestamp = resolved_range.lte_ms.unwrap_or(query_time as i64) as u64; |
| 181 | + } else { |
| 182 | + debug!("Failed to resolve time range from query"); |
| 183 | + } |
| 184 | + }; |
| 185 | + |
| 186 | + QueryTimestamps { |
| 187 | + start_timestamp, |
| 188 | + end_timestamp, |
| 189 | + } |
| 190 | + } |
| 191 | +} |
0 commit comments