From 35e3cf0837c862e1f88b24ecc8306318f481058d Mon Sep 17 00:00:00 2001 From: Eric Wang Date: Mon, 4 May 2026 22:43:04 -0400 Subject: [PATCH 01/10] Initial Elastic DSL path in planner. Logic almost 1-1 of SQL since we support semantically similar queries as of now. --- Cargo.lock | 1 + asap-planner-rs/Cargo.toml | 1 + asap-planner-rs/src/config/input.rs | 16 ++ asap-planner-rs/src/error.rs | 4 + asap-planner-rs/src/lib.rs | 47 +++++ asap-planner-rs/src/main.rs | 14 +- .../src/output/elastic_generator.rs | 140 +++++++++++++++ asap-planner-rs/src/output/mod.rs | 1 + .../src/planner/elastic_single_query.rs | 170 ++++++++++++++++++ asap-planner-rs/src/planner/mod.rs | 1 + 10 files changed, 393 insertions(+), 2 deletions(-) create mode 100644 asap-planner-rs/src/output/elastic_generator.rs create mode 100644 asap-planner-rs/src/planner/elastic_single_query.rs diff --git a/Cargo.lock b/Cargo.lock index cbbdde05..2da594de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -393,6 +393,7 @@ dependencies = [ "asap_types", "chrono", "clap 4.6.0", + "elastic_dsl_utilities", "indexmap 2.13.1", "pretty_assertions", "promql-parser", diff --git a/asap-planner-rs/Cargo.toml b/asap-planner-rs/Cargo.toml index abeaf025..53a32fc7 100644 --- a/asap-planner-rs/Cargo.toml +++ b/asap-planner-rs/Cargo.toml @@ -15,6 +15,7 @@ path = "src/main.rs" asap_types.workspace = true promql_utilities.workspace = true sql_utilities.workspace = true +elastic_dsl_utilities.workspace = true sqlparser = "0.59.0" serde.workspace = true serde_json.workspace = true diff --git a/asap-planner-rs/src/config/input.rs b/asap-planner-rs/src/config/input.rs index 55422bc3..75c599a7 100644 --- a/asap-planner-rs/src/config/input.rs +++ b/asap-planner-rs/src/config/input.rs @@ -137,3 +137,19 @@ pub struct TableDefinition { pub value_columns: Vec, pub metadata_columns: Vec, } + +#[derive(Debug, Clone, Deserialize)] +pub struct ElasticDSLControllerConfig { + pub query_groups: Vec, + pub index: String, + pub sketch_parameters: Option, + pub aggregate_cleanup: Option, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct ElasticDSLQueryGroup { + pub id: Option, + pub queries: Vec, + pub repetition_delay: u64, + pub controller_options: ControllerOptions, +} diff --git a/asap-planner-rs/src/error.rs b/asap-planner-rs/src/error.rs index 02748805..873c25c7 100644 --- a/asap-planner-rs/src/error.rs +++ b/asap-planner-rs/src/error.rs @@ -20,4 +20,8 @@ pub enum ControllerError { UnknownTable(String), #[error("Prometheus client error: {0}")] PrometheusClient(String), + #[error("Elasticsearch DSL parse error: {0}")] + ElasticDSLParse(String), + #[error("Unsupported Elasticsearch DSL query: {0}")] + UnsupportedElasticDSLQuery(String), } diff --git a/asap-planner-rs/src/lib.rs b/asap-planner-rs/src/lib.rs index 5aa3196b..0c6c2145 100644 --- a/asap-planner-rs/src/lib.rs +++ b/asap-planner-rs/src/lib.rs @@ -16,6 +16,7 @@ use tracing::debug; pub use asap_types::PromQLSchema; pub use config::input::ControllerConfig; pub use config::input::SQLControllerConfig; +pub use config::input::ElasticDSLControllerConfig; pub use error::ControllerError; pub use output::generator::{GeneratorOutput, PuntedQuery}; use output::generator::{ @@ -24,6 +25,7 @@ use output::generator::{ KEY_WINDOW_SIZE, }; pub use output::sql_generator::SQLRuntimeOptions; +pub use output::elastic_generator::ElasticRuntimeOptions; pub use prometheus_client::build_schema_from_prometheus; #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -294,6 +296,51 @@ impl SQLController { } } +pub struct ElasticController { + config: ElasticDSLControllerConfig, + options: ElasticRuntimeOptions, +} + +impl ElasticController { + pub fn new(config: ElasticDSLControllerConfig, options: ElasticRuntimeOptions) -> Self { + Self { config, options } + } + + pub fn from_file(path: &Path, opts: ElasticRuntimeOptions) -> Result { + let yaml_str = std::fs::read_to_string(path)?; + Self::from_yaml(&yaml_str, opts) + } + + pub fn from_yaml(yaml: &str, opts: ElasticRuntimeOptions) -> Result { + let config: ElasticDSLControllerConfig = serde_yaml::from_str(yaml)?; + Ok(Self { + config, + options: opts, + }) + } + + pub fn generate(&self) -> Result { + let output = output::elastic_generator::generate_elastic_plan(&self.config, &self.options)?; + Ok(PlannerOutput { + punted_queries: output.punted_queries, + streaming_yaml: output.streaming_yaml, + inference_yaml: output.inference_yaml, + aggregation_count: output.aggregation_count, + query_count: output.query_count, + }) + } + + pub fn generate_to_dir(&self, dir: &Path) -> Result { + let output = self.generate()?; + std::fs::create_dir_all(dir)?; + let streaming_str = serde_yaml::to_string(&output.streaming_yaml)?; + let inference_str = serde_yaml::to_string(&output.inference_yaml)?; + std::fs::write(dir.join("streaming_config.yaml"), streaming_str)?; + std::fs::write(dir.join("inference_config.yaml"), inference_str)?; + Ok(output) + } +} + impl Controller { pub fn new(config: ControllerConfig, schema: PromQLSchema, options: RuntimeOptions) -> Self { Self { diff --git a/asap-planner-rs/src/main.rs b/asap-planner-rs/src/main.rs index 863f4d4e..3b88abd5 100644 --- a/asap-planner-rs/src/main.rs +++ b/asap-planner-rs/src/main.rs @@ -1,4 +1,4 @@ -use asap_planner::{Controller, RuntimeOptions, SQLController, SQLRuntimeOptions, StreamingEngine}; +use asap_planner::{Controller, RuntimeOptions, SQLController, SQLRuntimeOptions, ElasticController, ElasticRuntimeOptions, StreamingEngine}; use asap_types::enums::QueryLanguage; use clap::Parser; use std::path::PathBuf; @@ -126,7 +126,17 @@ fn main() -> anyhow::Result<()> { SQLController::from_file(&config_path, opts)?.generate_to_dir(&args.output_dir)?; } QueryLanguage::elastic_querydsl => { - anyhow::bail!("ElasticQueryDSL is not yet supported"); + let interval = args.data_ingestion_interval.ok_or_else(|| { + anyhow::anyhow!("--data-ingestion-interval is required for Elasticsearch DSL mode") + })?; + let config_path = args + .input_config + .ok_or_else(|| anyhow::anyhow!("--input_config is required for Elasticsearch DSL mode"))?; + let opts = ElasticRuntimeOptions { + streaming_engine: engine, + data_ingestion_interval: interval, + }; + ElasticController::from_file(&config_path, opts)?.generate_to_dir(&args.output_dir)?; } } diff --git a/asap-planner-rs/src/output/elastic_generator.rs b/asap-planner-rs/src/output/elastic_generator.rs new file mode 100644 index 00000000..8cc9b678 --- /dev/null +++ b/asap-planner-rs/src/output/elastic_generator.rs @@ -0,0 +1,140 @@ +use asap_types::enums::CleanupPolicy; +use indexmap::IndexMap; +use serde_yaml::Value as YamlValue; +use std::collections::HashMap; + +use crate::config::input::ElasticDSLControllerConfig; +use crate::error::ControllerError; +use crate::output::generator::{ + build_aggregation_entry, build_queries_yaml, GeneratorOutput, KEY_AGGREGATIONS, + KEY_CLEANUP_POLICY, KEY_NAME, KEY_QUERIES, +}; +use crate::planner::single_query::IntermediateAggConfig; +use crate::planner::elastic_single_query::ElasticSingleQueryProcessor; +use crate::StreamingEngine; + +pub struct ElasticRuntimeOptions { + pub streaming_engine: StreamingEngine, + pub data_ingestion_interval: u64, +} + +pub fn generate_elastic_plan( + config: &ElasticDSLControllerConfig, + opts: &ElasticRuntimeOptions, +) -> Result { + let cleanup_policy = config + .aggregate_cleanup + .as_ref() + .and_then(|c| c.policy) + .unwrap_or(CleanupPolicy::ReadBased); + + // Validate T % data_ingestion_interval == 0 + for qg in &config.query_groups { + if qg.repetition_delay % opts.data_ingestion_interval != 0 { + return Err(ControllerError::PlannerError(format!( + "repetition_delay {} is not a multiple of data_ingestion_interval {}", + qg.repetition_delay, opts.data_ingestion_interval + ))); + } + } + + // Check for duplicate queries + let mut seen_queries = std::collections::HashSet::new(); + for qg in &config.query_groups { + for q in &qg.queries { + if !seen_queries.insert(q.clone()) { + return Err(ControllerError::DuplicateQuery(q.clone())); + } + } + } + + // Dedup map: identifying_key -> IntermediateAggConfig + let mut dedup_map: IndexMap = IndexMap::new(); + // query_string -> Vec<(key, cleanup_param)> + let mut query_keys_map: IndexMap)>> = IndexMap::new(); + + for qg in &config.query_groups { + for query_string in &qg.queries { + let processor = ElasticSingleQueryProcessor::new( + query_string.clone(), + qg.repetition_delay, + opts.data_ingestion_interval, + config.index.clone(), + opts.streaming_engine, + config.sketch_parameters.clone(), + cleanup_policy, + ); + + let (configs, cleanup_param) = processor.get_streaming_aggregation_configs()?; + + let mut keys_for_query = Vec::new(); + for config_item in configs { + let key = config_item.identifying_key(); + keys_for_query.push((key.clone(), cleanup_param)); + dedup_map.entry(key).or_insert(config_item); + } + query_keys_map.insert(query_string.clone(), keys_for_query); + } + } + + // Assign sequential IDs + let mut id_map: HashMap = HashMap::new(); + for (idx, key) in dedup_map.keys().enumerate() { + id_map.insert(key.clone(), idx as u32 + 1); + } + + let streaming_yaml = build_elastic_streaming_yaml(config, &dedup_map, &id_map)?; + let inference_yaml = build_elastic_inference_yaml(config, cleanup_policy, &query_keys_map, &id_map)?; + + Ok(GeneratorOutput { + punted_queries: Vec::new(), + streaming_yaml, + inference_yaml, + aggregation_count: dedup_map.len(), + query_count: query_keys_map.len(), + }) +} + +fn build_elastic_streaming_yaml( + _config: &ElasticDSLControllerConfig, + dedup_map: &IndexMap, + id_map: &HashMap, +) -> Result { + let aggregations: Vec = dedup_map + .iter() + .map(|(key, cfg)| build_aggregation_entry(id_map[key], cfg)) + .collect(); + + let mut root = serde_yaml::Mapping::new(); + root.insert( + YamlValue::String(KEY_AGGREGATIONS.to_string()), + YamlValue::Sequence(aggregations), + ); + + Ok(YamlValue::Mapping(root)) +} + +fn build_elastic_inference_yaml( + _config: &ElasticDSLControllerConfig, + cleanup_policy: CleanupPolicy, + query_keys_map: &IndexMap)>>, + id_map: &HashMap, +) -> Result { + let mut cleanup_map = serde_yaml::Mapping::new(); + cleanup_map.insert( + YamlValue::String(KEY_NAME.to_string()), + YamlValue::String(cleanup_policy.to_string()), + ); + + let mut root = serde_yaml::Mapping::new(); + root.insert( + YamlValue::String(KEY_CLEANUP_POLICY.to_string()), + YamlValue::Mapping(cleanup_map), + ); + root.insert( + YamlValue::String(KEY_QUERIES.to_string()), + YamlValue::Sequence(build_queries_yaml(cleanup_policy, query_keys_map, id_map)), + ); + + Ok(YamlValue::Mapping(root)) +} diff --git a/asap-planner-rs/src/output/mod.rs b/asap-planner-rs/src/output/mod.rs index 63c14cbb..773c921f 100644 --- a/asap-planner-rs/src/output/mod.rs +++ b/asap-planner-rs/src/output/mod.rs @@ -1,3 +1,4 @@ pub mod generator; pub mod sql_generator; +pub mod elastic_generator; pub use generator::*; diff --git a/asap-planner-rs/src/planner/elastic_single_query.rs b/asap-planner-rs/src/planner/elastic_single_query.rs new file mode 100644 index 00000000..db258909 --- /dev/null +++ b/asap-planner-rs/src/planner/elastic_single_query.rs @@ -0,0 +1,170 @@ +use asap_types::enums::{CleanupPolicy, WindowType}; +use elastic_dsl_utilities::ast_parsing::{ + extract_query_info, AggregationType as ElasticAggregationType, +}; +use promql_utilities::data_model::KeyByLabelNames; +use promql_utilities::query_logics::enums::{AggregationType, QueryTreatmentType, Statistic}; + +use crate::config::input::SketchParameterOverrides; +use crate::error::ControllerError; +use crate::planner::logics::{build_sketch_parameters, get_sql_cleanup_param, IntermediateWindowConfig}; +use crate::planner::single_query::{build_agg_configs_for_statistics, IntermediateAggConfig}; +use crate::StreamingEngine; + +pub struct ElasticSingleQueryProcessor { + query_string: String, + t_repeat: u64, + #[allow(dead_code)] + data_ingestion_interval: u64, + index: String, + #[allow(dead_code)] + streaming_engine: StreamingEngine, + sketch_parameters: Option, + cleanup_policy: CleanupPolicy, +} + +impl ElasticSingleQueryProcessor { + #[allow(clippy::too_many_arguments)] + pub fn new( + query_string: String, + t_repeat: u64, + data_ingestion_interval: u64, + index: String, + streaming_engine: StreamingEngine, + sketch_parameters: Option, + cleanup_policy: CleanupPolicy, + ) -> Self { + Self { + query_string, + t_repeat, + data_ingestion_interval, + index, + streaming_engine, + sketch_parameters, + cleanup_policy, + } + } + + pub fn get_streaming_aggregation_configs( + &self, + ) -> Result<(Vec, Option), ControllerError> { + // Parse and extract query info using utilities + let query_info = extract_query_info(&self.query_string).ok_or_else(|| { + ControllerError::ElasticDSLParse(format!( + "Failed to parse Elasticsearch DSL query: {}", + self.query_string + )) + })?; + + // Get aggregation type and statistics + let (treatment_type, statistics) = get_elastic_statistics(&query_info.aggregation)?; + + // Build window config (always tumbling for Elasticsearch queries) + let window_cfg = IntermediateWindowConfig { + window_size: self.t_repeat, + slide_interval: self.t_repeat, + window_type: WindowType::Tumbling, + }; + + // Extract target field and group by information + let target_field = query_info.target_field.clone(); + + // Determine spatial routing from group_by_buckets + let (spatial_output, rollup) = match &query_info.group_by_buckets { + Some(bucket_spec) => { + let group_fields = get_group_by_fields(bucket_spec); + let spatial = KeyByLabelNames::new(group_fields); + // For Elasticsearch, all potentially available fields become rollup + // when they're not in the group by + (spatial.clone(), spatial) + } + None => (KeyByLabelNames::empty(), KeyByLabelNames::empty()), + }; + + let configs = build_agg_configs_for_statistics( + &statistics, + treatment_type, + &spatial_output, + &rollup, + &window_cfg, + &query_info.target_field, + Some(&self.index), + Some(&target_field), + "", // Elasticsearch doesn't have spatial filters like SQL + |agg_type: AggregationType, agg_sub_type: &str| { + build_sketch_parameters( + agg_type, + agg_sub_type, + None, + self.sketch_parameters.as_ref(), + ) + }, + ) + .map_err(ControllerError::ElasticDSLParse)?; + + // Calculate cleanup param based on query's time window + let t_lookback = self.t_repeat; // Default to repetition delay + let cleanup_param = if self.cleanup_policy == CleanupPolicy::NoCleanup { + None + } else { + Some( + get_sql_cleanup_param(self.cleanup_policy, t_lookback, self.t_repeat) + .map_err(ControllerError::PlannerError)?, + ) + }; + + Ok((configs, cleanup_param)) + } +} + +/// Map Elasticsearch aggregation types to statistics and treatment types +fn get_elastic_statistics( + agg_type: &ElasticAggregationType, +) -> Result<(QueryTreatmentType, Vec), ControllerError> { + match agg_type { + ElasticAggregationType::Avg => { + // AVG requires SUM and COUNT + Ok((QueryTreatmentType::Exact, vec![Statistic::Sum, Statistic::Count])) + } + ElasticAggregationType::Sum => Ok((QueryTreatmentType::Approximate, vec![Statistic::Sum])), + ElasticAggregationType::Min => Ok((QueryTreatmentType::Exact, vec![Statistic::Min])), + ElasticAggregationType::Max => Ok((QueryTreatmentType::Exact, vec![Statistic::Max])), + ElasticAggregationType::Percentiles(percents) => { + // For percentiles, we use quantile statistic + // Check that we have valid percentiles + if percents.is_empty() { + return Err(ControllerError::UnsupportedElasticDSLQuery( + "Percentiles aggregation must specify percentile values".to_string(), + )); + } + Ok((QueryTreatmentType::Approximate, vec![Statistic::Quantile])) + } + } +} + +/// Extract field names from group by specification +fn get_group_by_fields(bucket_spec: &elastic_dsl_utilities::ast_parsing::GroupBySpec) -> Vec { + use elastic_dsl_utilities::ast_parsing::GroupBySpec; + match bucket_spec { + GroupBySpec::Fields(fields) => fields.clone(), + GroupBySpec::Filters(predicates) => { + // For filter-based grouping, we extract field names from predicates + let mut fields = Vec::new(); + for predicate in predicates { + match predicate { + elastic_dsl_utilities::ast_parsing::Predicate::Term { field, .. } => { + if !fields.contains(field) { + fields.push(field.clone()); + } + } + elastic_dsl_utilities::ast_parsing::Predicate::Range { field, .. } => { + if !fields.contains(field) { + fields.push(field.clone()); + } + } + } + } + fields + } + } +} diff --git a/asap-planner-rs/src/planner/mod.rs b/asap-planner-rs/src/planner/mod.rs index 44475bec..cc763854 100644 --- a/asap-planner-rs/src/planner/mod.rs +++ b/asap-planner-rs/src/planner/mod.rs @@ -2,4 +2,5 @@ pub mod logics; pub mod patterns; pub mod single_query; pub mod sql_single_query; +pub mod elastic_single_query; pub use single_query::*; From 90e24e50de70bd5ea75359541ccf5aa92cda2e56 Mon Sep 17 00:00:00 2001 From: Eric Wang Date: Fri, 8 May 2026 20:48:10 -0400 Subject: [PATCH 02/10] Add index specific mappings (schemas) for Elastic DSL in planner (similar to SQLSchema). --- Cargo.lock | 1 + .../dependencies/rs/asap_types/Cargo.toml | 1 + .../rs/asap_types/src/inference_config.rs | 59 +++++- .../rs/asap_types/src/streaming_config.rs | 2 +- .../src/ast_parsing/extract_info.rs | 30 +++ .../src/ast_parsing/query_info.rs | 3 + .../rs/elastic_dsl_utilities/src/lib.rs | 68 +++++++ asap-planner-rs/src/config/input.rs | 3 +- .../src/output/elastic_generator.rs | 173 +++++++++++++++++- .../tests/elastic_dsl_integration.rs | 30 +++ asap-planner-rs/tests/elastic_example.yaml | 34 ++++ .../src/engines/simple_engine/sql.rs | 2 +- .../tests/test_utilities/config_builders.rs | 2 +- 13 files changed, 399 insertions(+), 9 deletions(-) create mode 100644 asap-planner-rs/tests/elastic_dsl_integration.rs create mode 100644 asap-planner-rs/tests/elastic_example.yaml diff --git a/Cargo.lock b/Cargo.lock index 2da594de..d6a70d2b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -434,6 +434,7 @@ version = "0.3.0" dependencies = [ "anyhow", "clap 4.6.0", + "elastic_dsl_utilities", "promql_utilities", "serde", "serde_json", diff --git a/asap-common/dependencies/rs/asap_types/Cargo.toml b/asap-common/dependencies/rs/asap_types/Cargo.toml index 6ca70998..5d22cdaa 100644 --- a/asap-common/dependencies/rs/asap_types/Cargo.toml +++ b/asap-common/dependencies/rs/asap_types/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true [dependencies] promql_utilities.workspace = true +elastic_dsl_utilities.workspace = true tracing.workspace = true sql_utilities.workspace = true serde.workspace = true diff --git a/asap-common/dependencies/rs/asap_types/src/inference_config.rs b/asap-common/dependencies/rs/asap_types/src/inference_config.rs index 8ddb3ca6..ff8da7de 100644 --- a/asap-common/dependencies/rs/asap_types/src/inference_config.rs +++ b/asap-common/dependencies/rs/asap_types/src/inference_config.rs @@ -10,13 +10,14 @@ use crate::promql_schema::PromQLSchema; use crate::query_config::QueryConfig; use promql_utilities::data_model::KeyByLabelNames; use sql_utilities::sqlhelper::{SQLSchema, Table}; +use elastic_dsl_utilities::{ElasticIndexSchema, ElasticMappingSchema}; /// Schema configuration that can be either PromQL or SQL format #[derive(Debug, Clone)] pub enum SchemaConfig { PromQL(PromQLSchema), SQL(SQLSchema), - ElasticQueryDSL, + ElasticQueryDSL(ElasticMappingSchema), ElasticSQL(SQLSchema), } @@ -32,7 +33,9 @@ impl InferenceConfig { let schema = match query_language { QueryLanguage::promql => SchemaConfig::PromQL(PromQLSchema::new()), QueryLanguage::sql => SchemaConfig::SQL(SQLSchema::new(Vec::new())), - QueryLanguage::elastic_querydsl => SchemaConfig::ElasticQueryDSL, + QueryLanguage::elastic_querydsl => { + SchemaConfig::ElasticQueryDSL(ElasticMappingSchema::new(Vec::new())) + } QueryLanguage::elastic_sql => SchemaConfig::ElasticSQL(SQLSchema::new(Vec::new())), }; Self { @@ -60,7 +63,10 @@ impl InferenceConfig { let sql_schema = Self::parse_sql_schema(data)?; SchemaConfig::SQL(sql_schema) } - QueryLanguage::elastic_querydsl => SchemaConfig::ElasticQueryDSL, + QueryLanguage::elastic_querydsl => { + let elastic_schema = Self::parse_elastic_querydsl_schema(data)?; + SchemaConfig::ElasticQueryDSL(elastic_schema) + } QueryLanguage::elastic_sql => { let sql_schema = Self::parse_sql_schema(data)?; SchemaConfig::SQL(sql_schema) @@ -153,6 +159,53 @@ impl InferenceConfig { Ok(SQLSchema::new(tables)) } + /// Parse Elasticsearch mapping schema from YAML data (indices: key at top level). + fn parse_elastic_querydsl_schema(data: &Value) -> Result { + let Some(indices_data) = data.get("indices").and_then(|v| v.as_sequence()) else { + return Ok(ElasticMappingSchema::new(Vec::new())); + }; + + let mut indices = Vec::new(); + for index_data in indices_data { + let name = index_data + .get("name") + .and_then(|v| v.as_str()) + .ok_or_else(|| anyhow::anyhow!("Missing name field in elastic index"))? + .to_string(); + + let time_field = index_data + .get("time_field") + .and_then(|v| v.as_str()) + .ok_or_else(|| anyhow::anyhow!("Missing time_field field in elastic index {name}"))? + .to_string(); + + let metric_columns: HashSet = index_data + .get("metric_columns") + .and_then(|v| v.as_sequence()) + .ok_or_else(|| anyhow::anyhow!("Missing metric_columns field in elastic index {name}"))? + .iter() + .filter_map(|v| v.as_str()) + .map(|s| s.to_string()) + .collect(); + + let metadata_columns: HashSet = index_data + .get("metadata_columns") + .and_then(|v| v.as_sequence()) + .ok_or_else(|| anyhow::anyhow!("Missing metadata_columns field in elastic index {name}"))? + .iter() + .filter_map(|v| v.as_str()) + .map(|s| s.to_string()) + .collect(); + + indices.push(( + name, + ElasticIndexSchema::new(time_field, metric_columns, metadata_columns), + )); + } + + Ok(ElasticMappingSchema::new(indices)) + } + /// Parse cleanup policy from YAML data. Errors if not specified. fn parse_cleanup_policy(data: &Value) -> Result { let cleanup_policy_data = data.get("cleanup_policy").ok_or_else(|| { diff --git a/asap-common/dependencies/rs/asap_types/src/streaming_config.rs b/asap-common/dependencies/rs/asap_types/src/streaming_config.rs index 5a7b800c..6833f81d 100644 --- a/asap-common/dependencies/rs/asap_types/src/streaming_config.rs +++ b/asap-common/dependencies/rs/asap_types/src/streaming_config.rs @@ -72,7 +72,7 @@ impl StreamingConfig { .map(|ic| match &ic.schema { SchemaConfig::PromQL(_) => QueryLanguage::promql, SchemaConfig::SQL(_) => QueryLanguage::sql, - SchemaConfig::ElasticQueryDSL => QueryLanguage::elastic_querydsl, + SchemaConfig::ElasticQueryDSL(_) => QueryLanguage::elastic_querydsl, SchemaConfig::ElasticSQL(_) => QueryLanguage::elastic_sql, }) .unwrap_or(QueryLanguage::promql); // Default to promql if no inference_config diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs index 299af3b1..5a4f7743 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs @@ -1,6 +1,7 @@ use crate::ast_parsing::query_info::{ AggregationType, ElasticDSLQueryInfo, FieldName, GroupBySpec, Predicate, TermValue, }; +use crate::datemath::TimeRange; use crate::helpers::strip_keyword_suffix; use elasticsearch_dsl_ast::{self as dsl}; use serde_json; @@ -36,8 +37,10 @@ pub fn walk_ast_and_extract_info(ast: &dsl::Search) -> Option Option { } } +fn infer_time_field(predicates: &[Predicate]) -> FieldName { + for predicate in predicates { + if let Predicate::Range { field, gte, lte } = predicate { + let bound_is_time_like = gte.iter().chain(lte.iter()).any(|term| { + matches!(term, TermValue::String(value) if TimeRange::parse_date_math(value.as_str(), 0).is_some()) + }); + let looks_like_time_field = field == "@timestamp" + || field.contains("time") + || field.contains("timestamp") + || bound_is_time_like; + + if looks_like_time_field { + return field.clone(); + } + } + } + + for predicate in predicates { + if let Predicate::Range { field, .. } = predicate { + return field.clone(); + } + } + + "@timestamp".to_string() +} + #[cfg(test)] mod tests { use super::*; @@ -350,6 +379,7 @@ mod tests { let info = walk_ast_and_extract_info(&ast).expect("info should parse"); assert_eq!(info.target_field, "latency_ms"); + assert_eq!(info.time_field, "@timestamp"); assert_eq!(info.aggregation, AggregationType::Max); assert_eq!(info.predicates.len(), 2); assert_eq!( diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/query_info.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/query_info.rs index ce82d705..7905bf48 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/query_info.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/query_info.rs @@ -6,6 +6,7 @@ pub type FieldName = String; pub struct ElasticDSLQueryInfo { // A distilled representation of an ElasticSearch DSL query, capturing the essential logic and structure. pub target_field: FieldName, // List of metrics being queried + pub time_field: FieldName, // Time field used by the query's range filter pub predicates: Vec, // Predicates applied to the query (e.g. filters in bool.filter) pub group_by_buckets: Option, // Grouping specification if the query includes a group by clause pub aggregation: AggregationType, // The statistic being computed (e.g. avg, sum, percentiles) @@ -16,12 +17,14 @@ impl ElasticDSLQueryInfo { pub fn new( target_field: FieldName, + time_field: FieldName, predicates: Vec, group_by_buckets: Option, aggregation: AggregationType, ) -> Self { Self { target_field, + time_field, predicates, group_by_buckets, aggregation, diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs index 4a86ba8b..695610f7 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs @@ -5,3 +5,71 @@ pub mod helpers; pub use ast_parsing::*; pub use datemath::*; pub use helpers::*; + +use std::collections::{HashMap, HashSet}; + + +#[derive(Debug, Clone)] +pub struct ElasticIndexSchema { + pub time_field: String, + pub metric_columns: HashSet, + pub metadata_columns: HashSet, +} + +impl ElasticIndexSchema { + pub fn new( + time_field: String, + metric_columns: HashSet, + metadata_columns: HashSet, + ) -> Self { + Self { + time_field, + metric_columns, + metadata_columns, + } + } +} + +#[derive(Debug, Clone)] +pub struct ElasticMappingSchema { + pub config: HashMap, +} + +impl ElasticMappingSchema { + pub fn new(indexes: Vec<(String, ElasticIndexSchema)>) -> Self { + let mut config = HashMap::new(); + for (index_name, index_schema) in indexes { + config.insert(index_name, index_schema); + } + Self { config } + } + + pub fn add_index(mut self, index: String, schema: ElasticIndexSchema) -> Self { + self.config.insert(index, schema); + self + } + + pub fn get_time_field(&self, index: &str) -> Option<&String> { + self.config.get(index).map(|schema| &schema.time_field) + } + + pub fn get_metric_columns(&self, index: &str) -> Option<&HashSet> { + self.config.get(index).map(|schema| &schema.metric_columns) + } + + pub fn get_metadata_columns(&self, index: &str) -> Option<&HashSet> { + self.config.get(index).map(|schema| &schema.metadata_columns) + } + + pub fn is_valid_metric_column(&self, index: &str, metric_column: &str) -> bool { + self.get_metric_columns(index) + .map(|columns| columns.contains(metric_column)) + .unwrap_or(false) + } + + pub fn are_valid_metadata_columns(&self, index: &str, columns: &HashSet) -> bool { + self.get_metadata_columns(index) + .map(|schema_columns| columns.iter().all(|c| schema_columns.contains(c))) + .unwrap_or(false) + } +} \ No newline at end of file diff --git a/asap-planner-rs/src/config/input.rs b/asap-planner-rs/src/config/input.rs index 75c599a7..12369088 100644 --- a/asap-planner-rs/src/config/input.rs +++ b/asap-planner-rs/src/config/input.rs @@ -141,7 +141,7 @@ pub struct TableDefinition { #[derive(Debug, Clone, Deserialize)] pub struct ElasticDSLControllerConfig { pub query_groups: Vec, - pub index: String, + pub index: Option, pub sketch_parameters: Option, pub aggregate_cleanup: Option, } @@ -151,5 +151,6 @@ pub struct ElasticDSLQueryGroup { pub id: Option, pub queries: Vec, pub repetition_delay: u64, + pub index: Option, pub controller_options: ControllerOptions, } diff --git a/asap-planner-rs/src/output/elastic_generator.rs b/asap-planner-rs/src/output/elastic_generator.rs index 8cc9b678..85788eca 100644 --- a/asap-planner-rs/src/output/elastic_generator.rs +++ b/asap-planner-rs/src/output/elastic_generator.rs @@ -1,5 +1,6 @@ use asap_types::enums::CleanupPolicy; use indexmap::IndexMap; +use indexmap::IndexSet; use serde_yaml::Value as YamlValue; use std::collections::HashMap; @@ -9,10 +10,44 @@ use crate::output::generator::{ build_aggregation_entry, build_queries_yaml, GeneratorOutput, KEY_AGGREGATIONS, KEY_CLEANUP_POLICY, KEY_NAME, KEY_QUERIES, }; +use elastic_dsl_utilities::ast_parsing::{extract_query_info, GroupBySpec, Predicate}; use crate::planner::single_query::IntermediateAggConfig; use crate::planner::elastic_single_query::ElasticSingleQueryProcessor; use crate::StreamingEngine; +#[derive(Default)] +struct ElasticIndexSchemaBuilder { + time_field: Option, + metric_columns: IndexSet, + metadata_columns: IndexSet, +} + +impl ElasticIndexSchemaBuilder { + fn update_from_query_info( + &mut self, + query_info: &elastic_dsl_utilities::ast_parsing::ElasticDSLQueryInfo, + ) -> Result<(), ControllerError> { + match &self.time_field { + Some(existing) if existing != &query_info.time_field => { + return Err(ControllerError::PlannerError(format!( + "conflicting time fields for Elasticsearch index: '{}' vs '{}'", + existing, query_info.time_field + ))); + } + None => self.time_field = Some(query_info.time_field.clone()), + _ => {} + } + + self.metric_columns.insert(query_info.target_field.clone()); + + for field in collect_elastic_metadata_fields(query_info) { + self.metadata_columns.insert(field); + } + + Ok(()) + } +} + pub struct ElasticRuntimeOptions { pub streaming_engine: StreamingEngine, pub data_ingestion_interval: u64, @@ -52,14 +87,29 @@ pub fn generate_elastic_plan( let mut dedup_map: IndexMap = IndexMap::new(); // query_string -> Vec<(key, cleanup_param)> let mut query_keys_map: IndexMap)>> = IndexMap::new(); + // index -> schema builder derived from the queries targeting that index + let mut index_schema_builders: IndexMap = IndexMap::new(); for qg in &config.query_groups { + let index = resolve_elastic_index(config, qg)?; for query_string in &qg.queries { + let query_info = extract_query_info(query_string).ok_or_else(|| { + ControllerError::ElasticDSLParse(format!( + "Failed to parse Elasticsearch DSL query: {}", + query_string + )) + })?; + + index_schema_builders + .entry(index.clone()) + .or_default() + .update_from_query_info(&query_info)?; + let processor = ElasticSingleQueryProcessor::new( query_string.clone(), qg.repetition_delay, opts.data_ingestion_interval, - config.index.clone(), + index.clone(), opts.streaming_engine, config.sketch_parameters.clone(), cleanup_policy, @@ -84,7 +134,13 @@ pub fn generate_elastic_plan( } let streaming_yaml = build_elastic_streaming_yaml(config, &dedup_map, &id_map)?; - let inference_yaml = build_elastic_inference_yaml(config, cleanup_policy, &query_keys_map, &id_map)?; + let inference_yaml = build_elastic_inference_yaml( + config, + cleanup_policy, + &query_keys_map, + &id_map, + &index_schema_builders, + )?; Ok(GeneratorOutput { punted_queries: Vec::new(), @@ -119,6 +175,7 @@ fn build_elastic_inference_yaml( cleanup_policy: CleanupPolicy, query_keys_map: &IndexMap)>>, id_map: &HashMap, + index_schema_builders: &IndexMap, ) -> Result { let mut cleanup_map = serde_yaml::Mapping::new(); cleanup_map.insert( @@ -135,6 +192,118 @@ fn build_elastic_inference_yaml( YamlValue::String(KEY_QUERIES.to_string()), YamlValue::Sequence(build_queries_yaml(cleanup_policy, query_keys_map, id_map)), ); + root.insert( + YamlValue::String("indices".to_string()), + YamlValue::Sequence( + index_schema_builders + .iter() + .map(|(index_name, builder)| build_elastic_index_yaml(index_name, builder)) + .collect(), + ), + ); Ok(YamlValue::Mapping(root)) } + +fn build_elastic_index_yaml(index_name: &str, builder: &ElasticIndexSchemaBuilder) -> YamlValue { + let mut map = serde_yaml::Mapping::new(); + map.insert( + YamlValue::String("name".to_string()), + YamlValue::String(index_name.to_string()), + ); + map.insert( + YamlValue::String("time_field".to_string()), + YamlValue::String( + builder + .time_field + .clone() + .unwrap_or_else(|| "@timestamp".to_string()), + ), + ); + map.insert( + YamlValue::String("metric_columns".to_string()), + YamlValue::Sequence( + builder + .metric_columns + .iter() + .cloned() + .map(YamlValue::String) + .collect(), + ), + ); + map.insert( + YamlValue::String("metadata_columns".to_string()), + YamlValue::Sequence( + builder + .metadata_columns + .iter() + .cloned() + .map(YamlValue::String) + .collect(), + ), + ); + + YamlValue::Mapping(map) +} + +fn resolve_elastic_index( + config: &ElasticDSLControllerConfig, + query_group: &crate::config::input::ElasticDSLQueryGroup, +) -> Result { + query_group + .index + .clone() + .or_else(|| config.index.clone()) + .ok_or_else(|| { + ControllerError::PlannerError( + "each Elasticsearch query group must specify an index (or inherit one from the controller config)" + .to_string(), + ) + }) +} + +fn collect_elastic_metadata_fields( + query_info: &elastic_dsl_utilities::ast_parsing::ElasticDSLQueryInfo, +) -> IndexSet { + let mut fields = IndexSet::new(); + + for predicate in &query_info.predicates { + match predicate { + Predicate::Term { field, .. } => { + if field != &query_info.time_field { + fields.insert(field.clone()); + } + } + Predicate::Range { field, .. } => { + if field != &query_info.time_field { + fields.insert(field.clone()); + } + } + } + } + + if let Some(group_by_buckets) = &query_info.group_by_buckets { + match group_by_buckets { + GroupBySpec::Fields(group_fields) => { + for field in group_fields { + if field != &query_info.time_field { + fields.insert(field.clone()); + } + } + } + GroupBySpec::Filters(predicates) => { + for predicate in predicates { + match predicate { + Predicate::Term { field, .. } | Predicate::Range { field, .. } => { + if field != &query_info.time_field { + fields.insert(field.clone()); + } + } + } + } + } + } + } + + fields +} diff --git a/asap-planner-rs/tests/elastic_dsl_integration.rs b/asap-planner-rs/tests/elastic_dsl_integration.rs new file mode 100644 index 00000000..2dea8a2f --- /dev/null +++ b/asap-planner-rs/tests/elastic_dsl_integration.rs @@ -0,0 +1,30 @@ +use asap_planner::{ + ElasticController, ElasticRuntimeOptions, StreamingEngine, +}; +use asap_types::{QueryLanguage, SchemaConfig}; +use std::path::Path; + + +#[test] +fn elastic_querydsl_emits_index_schema() { + let opts = ElasticRuntimeOptions { + streaming_engine: StreamingEngine::Arroyo, + data_ingestion_interval: 15, + }; + let c = ElasticController::from_file(Path::new("tests/elastic_example.yaml"), opts).unwrap(); + let out = c.generate().unwrap(); + let inference_config = out + .to_inference_config(QueryLanguage::elastic_querydsl) + .unwrap(); + + match inference_config.schema { + SchemaConfig::ElasticQueryDSL(schema) => { + assert_eq!(schema.get_time_field("metrics"), Some(&"@timestamp".to_string())); + let metric_columns = schema.get_metric_columns("metrics").unwrap(); + assert!(metric_columns.contains("cpu_usage")); + let metadata_columns = schema.get_metadata_columns("metrics").unwrap(); + assert!(metadata_columns.is_empty()); + } + other => panic!("expected elastic querydsl schema, got {:?}", other), + } +} \ No newline at end of file diff --git a/asap-planner-rs/tests/elastic_example.yaml b/asap-planner-rs/tests/elastic_example.yaml new file mode 100644 index 00000000..9e431d90 --- /dev/null +++ b/asap-planner-rs/tests/elastic_example.yaml @@ -0,0 +1,34 @@ +query_groups: + - id: 1 + index: metrics + queries: + - | + { + "aggs": { + "avg_cpu": { + "avg": { + "field": "cpu_usage" + } + } + }, + "query": { + "bool": { + "filter": [ + { + "range": { + "@timestamp": { + "gte": "now-5m", + "lte": "now" + } + } + } + ] + } + } + } + repetition_delay: 300 + controller_options: + accuracy_sla: 0.95 + latency_sla: 1.0 +aggregate_cleanup: + policy: read_based diff --git a/asap-query-engine/src/engines/simple_engine/sql.rs b/asap-query-engine/src/engines/simple_engine/sql.rs index e84f2a5f..26ff945c 100644 --- a/asap-query-engine/src/engines/simple_engine/sql.rs +++ b/asap-query-engine/src/engines/simple_engine/sql.rs @@ -218,7 +218,7 @@ impl SimpleEngine { warn!("SQL query requested but config has PromQL schema"); return None; } - &SchemaConfig::ElasticQueryDSL => todo!(), + &SchemaConfig::ElasticQueryDSL(_) => todo!(), SchemaConfig::ElasticSQL(sql_schema) => sql_schema.clone(), }; diff --git a/asap-query-engine/src/tests/test_utilities/config_builders.rs b/asap-query-engine/src/tests/test_utilities/config_builders.rs index ec691272..4bd68ade 100644 --- a/asap-query-engine/src/tests/test_utilities/config_builders.rs +++ b/asap-query-engine/src/tests/test_utilities/config_builders.rs @@ -306,7 +306,7 @@ mod tests { assert!(promql_schema.get_labels("cpu_usage").is_some()); } SchemaConfig::SQL(_) => panic!("Expected PromQL schema"), - SchemaConfig::ElasticQueryDSL => panic!("Expected PromQL schema"), + SchemaConfig::ElasticQueryDSL(_) => panic!("Expected PromQL schema"), SchemaConfig::ElasticSQL(_) => panic!("Expected PromQL schema"), } From 26e66d96e781b77a7cb96d9dc4c741c7bdf46bb3 Mon Sep 17 00:00:00 2001 From: Eric Wang Date: Tue, 12 May 2026 16:19:59 -0400 Subject: [PATCH 03/10] Add more integration tests (ES DSL planner). --- .../tests/elastic_dsl_integration.rs | 391 +++++++++++++++++- 1 file changed, 388 insertions(+), 3 deletions(-) diff --git a/asap-planner-rs/tests/elastic_dsl_integration.rs b/asap-planner-rs/tests/elastic_dsl_integration.rs index 2dea8a2f..a46c8d08 100644 --- a/asap-planner-rs/tests/elastic_dsl_integration.rs +++ b/asap-planner-rs/tests/elastic_dsl_integration.rs @@ -1,8 +1,81 @@ -use asap_planner::{ - ElasticController, ElasticRuntimeOptions, StreamingEngine, -}; +use asap_planner::{ElasticController, ElasticRuntimeOptions, PlannerOutput, StreamingEngine}; use asap_types::{QueryLanguage, SchemaConfig}; +use std::io::Write; use std::path::Path; +use tempfile::NamedTempFile; + +fn indent_block(text: &str, indent: usize) -> String { + let padding = " ".repeat(indent); + text.trim() + .lines() + .map(|line| format!("{}{}", padding, line)) + .collect::>() + .join("\n") +} + +fn elastic_yaml(index: &str, query: &str, t_repeat: u64) -> String { + format!( + r#" +query_groups: + - id: 1 + index: {index} + queries: + - | +{query} + repetition_delay: {t_repeat} + controller_options: + accuracy_sla: 0.95 + latency_sla: 1.0 +aggregate_cleanup: + policy: read_based +"#, + index = index, + query = indent_block(query, 8), + t_repeat = t_repeat, + ) +} + +fn elastic_output(index: &str, query: &str, t_repeat: u64) -> PlannerOutput { + let yaml = elastic_yaml(index, query, t_repeat); + let mut file = NamedTempFile::new().unwrap(); + file.write_all(yaml.as_bytes()).unwrap(); + + let opts = ElasticRuntimeOptions { + streaming_engine: StreamingEngine::Arroyo, + data_ingestion_interval: 15, + }; + + ElasticController::from_file(Path::new(file.path()), opts) + .unwrap() + .generate() + .unwrap() +} + +fn assert_index_schema( + out: &PlannerOutput, + index: &str, + metric_column: &str, + metadata_columns: &[&str], +) { + let inference_config = out + .to_inference_config(QueryLanguage::elastic_querydsl) + .unwrap(); + + match inference_config.schema { + SchemaConfig::ElasticQueryDSL(schema) => { + assert_eq!(schema.get_time_field(index), Some(&"@timestamp".to_string())); + + let metric_columns = schema.get_metric_columns(index).unwrap(); + assert!(metric_columns.contains(metric_column)); + + let actual_metadata = schema.get_metadata_columns(index).unwrap(); + for column in metadata_columns { + assert!(actual_metadata.contains(*column)); + } + } + other => panic!("expected elastic querydsl schema, got {:?}", other), + } +} #[test] @@ -27,4 +100,316 @@ fn elastic_querydsl_emits_index_schema() { } other => panic!("expected elastic querydsl schema, got {:?}", other), } +} + +#[test] +fn elastic_sum_produces_basic_plan_and_schema() { + let query = r#" +{ + "aggs": { + "by_datacenter": { + "terms": { + "field": "datacenter.keyword" + }, + "aggs": { + "sum_cpu": { + "sum": { + "field": "cpu_usage" + } + } + } + } + }, + "query": { + "bool": { + "filter": [ + { + "range": { + "@timestamp": { + "gte": "now-5m", + "lte": "now" + } + } + } + ] + } + } +} +"#; + let out = elastic_output("metrics", query, 300); + + assert_eq!(out.streaming_aggregation_count(), 2); + assert_eq!(out.inference_query_count(), 1); + assert!(out.has_aggregation_type("CountMinSketch")); + assert!(out.has_aggregation_type("DeltaSetAggregator")); + assert!(out.all_tumbling_window_sizes_eq(300)); + assert_eq!(out.aggregation_table_name("CountMinSketch"), Some("metrics".to_string())); + assert_eq!(out.aggregation_value_column("CountMinSketch"), Some("cpu_usage".to_string())); + assert_index_schema(&out, "metrics", "cpu_usage", &["datacenter"]); +} + +#[test] +fn elastic_avg_produces_three_configs() { + let query = r#" +{ + "aggs": { + "by_datacenter": { + "terms": { + "field": "datacenter.keyword" + }, + "aggs": { + "avg_cpu": { + "avg": { + "field": "cpu_usage" + } + } + } + } + }, + "query": { + "bool": { + "filter": [ + { + "range": { + "@timestamp": { + "gte": "now-5m", + "lte": "now" + } + } + } + ] + } + } +} +"#; + let out = elastic_output("metrics", query, 300); + + assert_eq!(out.streaming_aggregation_count(), 2); + assert_eq!(out.inference_query_count(), 1); + assert!(out.has_aggregation_type("MultipleSum")); + assert!(!out.has_aggregation_type("DeltaSetAggregator")); + assert!(out.all_tumbling_window_sizes_eq(300)); + assert_eq!(out.aggregation_table_name("MultipleSum"), Some("metrics".to_string())); + assert_eq!(out.aggregation_value_column("MultipleSum"), Some("cpu_usage".to_string())); + + assert_index_schema(&out, "metrics", "cpu_usage", &["datacenter"]); +} + +#[test] +fn elastic_min_produces_exact_plan() { + let query = r#" +{ + "aggs": { + "by_service": { + "terms": { + "field": "service.keyword" + }, + "aggs": { + "min_cpu": { + "min": { + "field": "cpu_usage" + } + } + } + } + }, + "query": { + "bool": { + "filter": [ + { + "range": { + "@timestamp": { + "gte": "now-5m", + "lte": "now" + } + } + } + ] + } + } +} +"#; + let out = elastic_output("metrics", query, 300); + + assert_eq!(out.streaming_aggregation_count(), 1); + assert_eq!(out.inference_query_count(), 1); + assert!(out.has_aggregation_type("MultipleMinMax")); + assert!(!out.has_aggregation_type("DeltaSetAggregator")); + assert!(out.all_tumbling_window_sizes_eq(300)); + assert_eq!(out.aggregation_table_name("MultipleMinMax"), Some("metrics".to_string())); + assert_eq!(out.aggregation_value_column("MultipleMinMax"), Some("cpu_usage".to_string())); + assert_index_schema(&out, "metrics", "cpu_usage", &["service"]); +} + +#[test] +fn elastic_percentiles_produce_kll_plan() { + let query = r#" +{ + "aggs": { + "by_service": { + "terms": { + "field": "service.keyword" + }, + "aggs": { + "latency_percentiles": { + "percentiles": { + "field": "latency_ms", + "percents": [50.0, 95.0] + } + } + } + } + }, + "query": { + "bool": { + "filter": [ + { + "range": { + "@timestamp": { + "gte": "now-5m", + "lte": "now" + } + } + } + ] + } + } +} +"#; + let out = elastic_output("metrics", query, 300); + + assert_eq!(out.streaming_aggregation_count(), 1); + assert_eq!(out.inference_query_count(), 1); + assert!(out.has_aggregation_type("DatasketchesKLL")); + assert!(!out.has_aggregation_type("DeltaSetAggregator")); + assert!(out.all_tumbling_window_sizes_eq(300)); + assert_eq!(out.aggregation_table_name("DatasketchesKLL"), Some("metrics".to_string())); + assert_eq!(out.aggregation_value_column("DatasketchesKLL"), Some("latency_ms".to_string())); + assert_index_schema(&out, "metrics", "latency_ms", &["service"]); +} + +#[test] +fn elastic_multi_index_schema_inference() { + let yaml = r#" +query_groups: + - id: 1 + index: metrics + queries: + - | + { + "aggs": { + "by_datacenter": { + "terms": { + "field": "datacenter.keyword" + }, + "aggs": { + "avg_cpu": { + "avg": { + "field": "cpu_usage" + } + } + } + } + }, + "query": { + "bool": { + "filter": [ + { + "range": { + "@timestamp": { + "gte": "now-5m", + "lte": "now" + } + } + } + ] + } + } + } + repetition_delay: 300 + controller_options: + accuracy_sla: 0.95 + latency_sla: 1.0 + - id: 2 + index: other_metrics + queries: + - | + { + "aggs": { + "by_service": { + "terms": { + "field": "service.keyword" + }, + "aggs": { + "avg_mem": { + "avg": { + "field": "memory_usage" + } + } + } + } + }, + "query": { + "bool": { + "filter": [ + { + "range": { + "@timestamp": { + "gte": "now-5m", + "lte": "now" + } + } + } + ] + } + } + } + repetition_delay: 300 + controller_options: + accuracy_sla: 0.95 + latency_sla: 1.0 +aggregate_cleanup: + policy: read_based +"#; + + let mut file = NamedTempFile::new().unwrap(); + file.write_all(yaml.as_bytes()).unwrap(); + + let opts = ElasticRuntimeOptions { + streaming_engine: StreamingEngine::Arroyo, + data_ingestion_interval: 15, + }; + + let c = ElasticController::from_file(Path::new(file.path()), opts).unwrap(); + let out = c.generate().unwrap(); + + assert_eq!(out.streaming_aggregation_count(), 4); + assert_eq!(out.inference_query_count(), 2); + assert!(out.has_aggregation_type("MultipleSum")); + assert!(!out.has_aggregation_type("DeltaSetAggregator")); + assert!(out.all_tumbling_window_sizes_eq(300)); + + let inference_config = out + .to_inference_config(QueryLanguage::elastic_querydsl) + .unwrap(); + + match inference_config.schema { + SchemaConfig::ElasticQueryDSL(schema) => { + assert_eq!(schema.get_time_field("metrics"), Some(&"@timestamp".to_string())); + let metric_columns = schema.get_metric_columns("metrics").unwrap(); + assert!(metric_columns.contains("cpu_usage")); + let metadata_columns = schema.get_metadata_columns("metrics").unwrap(); + assert!(metadata_columns.contains("datacenter")); + + assert_eq!( + schema.get_time_field("other_metrics"), + Some(&"@timestamp".to_string()) + ); + let other_metric_columns = schema.get_metric_columns("other_metrics").unwrap(); + assert!(other_metric_columns.contains("memory_usage")); + let other_metadata_columns = schema.get_metadata_columns("other_metrics").unwrap(); + assert!(other_metadata_columns.contains("service")); + } + other => panic!("expected elastic querydsl schema, got {:?}", other), + } } \ No newline at end of file From 3bca6c3c1988caa96f27f0fa06ab4830a12ed0d6 Mon Sep 17 00:00:00 2001 From: Eric Wang Date: Tue, 12 May 2026 16:48:19 -0400 Subject: [PATCH 04/10] Refactor/rearrange Elastic DSL changes to match new crate layout. --- Cargo.lock | 4 +- asap-planner-rs/src/elastic_dsl/controller.rs | 46 +++++++++++++++++++ .../generator.rs} | 6 +-- asap-planner-rs/src/elastic_dsl/mod.rs | 4 ++ asap-planner-rs/src/lib.rs | 3 ++ ...elastic_single_query.rs => elastic_dsl.rs} | 6 ++- asap-planner-rs/src/planner/mod.rs | 1 + 7 files changed, 63 insertions(+), 7 deletions(-) create mode 100644 asap-planner-rs/src/elastic_dsl/controller.rs rename asap-planner-rs/src/{output/elastic_generator.rs => elastic_dsl/generator.rs} (98%) create mode 100644 asap-planner-rs/src/elastic_dsl/mod.rs rename asap-planner-rs/src/planner/{elastic_single_query.rs => elastic_dsl.rs} (96%) diff --git a/Cargo.lock b/Cargo.lock index b79abe1f..8e95d34a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -392,9 +392,9 @@ dependencies = [ "anyhow", "asap_types", "chrono", - "clap 4.6.0", + "clap 4.6.1", "elastic_dsl_utilities", - "indexmap 2.13.1", + "indexmap 2.14.0", "pretty_assertions", "promql-parser", "promql_utilities", diff --git a/asap-planner-rs/src/elastic_dsl/controller.rs b/asap-planner-rs/src/elastic_dsl/controller.rs new file mode 100644 index 00000000..ae1bab3f --- /dev/null +++ b/asap-planner-rs/src/elastic_dsl/controller.rs @@ -0,0 +1,46 @@ + +use std::path::Path; + +use crate::config::input::ElasticDSLControllerConfig; +use crate::error::ControllerError; +use crate::planner_output::PlannerOutput; +use crate::elastic_dsl::generator::{ElasticRuntimeOptions, generate_elastic_plan}; + +pub struct ElasticController { + config: ElasticDSLControllerConfig, + options: ElasticRuntimeOptions, +} + +impl ElasticController { + pub fn new(config: ElasticDSLControllerConfig, options: ElasticRuntimeOptions) -> Self { + Self { config, options } + } + + pub fn from_file(path: &Path, opts: ElasticRuntimeOptions) -> Result { + let yaml_str = std::fs::read_to_string(path)?; + Self::from_yaml(&yaml_str, opts) + } + + pub fn from_yaml(yaml: &str, opts: ElasticRuntimeOptions) -> Result { + let config: ElasticDSLControllerConfig = serde_yaml::from_str(yaml)?; + Ok(Self { + config, + options: opts, + }) + } + + pub fn generate(&self) -> Result { + let output = generate_elastic_plan(&self.config, &self.options)?; + Ok(PlannerOutput::from_output(output)) + } + + pub fn generate_to_dir(&self, dir: &Path) -> Result { + let output = self.generate()?; + std::fs::create_dir_all(dir)?; + let streaming_str = serde_yaml::to_string(output.streaming_yaml())?; + let inference_str = serde_yaml::to_string(output.inference_yaml())?; + std::fs::write(dir.join("streaming_config.yaml"), streaming_str)?; + std::fs::write(dir.join("inference_config.yaml"), inference_str)?; + Ok(output) + } +} \ No newline at end of file diff --git a/asap-planner-rs/src/output/elastic_generator.rs b/asap-planner-rs/src/elastic_dsl/generator.rs similarity index 98% rename from asap-planner-rs/src/output/elastic_generator.rs rename to asap-planner-rs/src/elastic_dsl/generator.rs index 85788eca..62e05a78 100644 --- a/asap-planner-rs/src/output/elastic_generator.rs +++ b/asap-planner-rs/src/elastic_dsl/generator.rs @@ -6,13 +6,13 @@ use std::collections::HashMap; use crate::config::input::ElasticDSLControllerConfig; use crate::error::ControllerError; -use crate::output::generator::{ +use crate::generator::{ build_aggregation_entry, build_queries_yaml, GeneratorOutput, KEY_AGGREGATIONS, KEY_CLEANUP_POLICY, KEY_NAME, KEY_QUERIES, }; use elastic_dsl_utilities::ast_parsing::{extract_query_info, GroupBySpec, Predicate}; -use crate::planner::single_query::IntermediateAggConfig; -use crate::planner::elastic_single_query::ElasticSingleQueryProcessor; +use crate::planner::agg_config::IntermediateAggConfig; +use crate::planner::elastic_dsl::ElasticSingleQueryProcessor; use crate::StreamingEngine; #[derive(Default)] diff --git a/asap-planner-rs/src/elastic_dsl/mod.rs b/asap-planner-rs/src/elastic_dsl/mod.rs new file mode 100644 index 00000000..626dbd05 --- /dev/null +++ b/asap-planner-rs/src/elastic_dsl/mod.rs @@ -0,0 +1,4 @@ +pub mod controller; +pub mod generator; +pub use controller::ElasticController; +pub use generator::ElasticRuntimeOptions; \ No newline at end of file diff --git a/asap-planner-rs/src/lib.rs b/asap-planner-rs/src/lib.rs index ad02317e..56faff3a 100644 --- a/asap-planner-rs/src/lib.rs +++ b/asap-planner-rs/src/lib.rs @@ -7,6 +7,7 @@ pub mod prometheus_client; pub mod promql; pub mod query_log; pub mod sql; +pub mod elastic_dsl; pub use asap_types::PromQLSchema; pub use config::input::ControllerConfig; @@ -19,6 +20,8 @@ pub use prometheus_client::build_schema_from_prometheus; pub use promql::Controller; pub use sql::SQLController; pub use sql::SQLRuntimeOptions; +pub use elastic_dsl::ElasticController; +pub use elastic_dsl::ElasticRuntimeOptions; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum StreamingEngine { diff --git a/asap-planner-rs/src/planner/elastic_single_query.rs b/asap-planner-rs/src/planner/elastic_dsl.rs similarity index 96% rename from asap-planner-rs/src/planner/elastic_single_query.rs rename to asap-planner-rs/src/planner/elastic_dsl.rs index db258909..ff763dd8 100644 --- a/asap-planner-rs/src/planner/elastic_single_query.rs +++ b/asap-planner-rs/src/planner/elastic_dsl.rs @@ -7,8 +7,10 @@ use promql_utilities::query_logics::enums::{AggregationType, QueryTreatmentType, use crate::config::input::SketchParameterOverrides; use crate::error::ControllerError; -use crate::planner::logics::{build_sketch_parameters, get_sql_cleanup_param, IntermediateWindowConfig}; -use crate::planner::single_query::{build_agg_configs_for_statistics, IntermediateAggConfig}; +use crate::planner::sketch::build_sketch_parameters; +use crate::planner::window::IntermediateWindowConfig; +use crate::planner::agg_config::{build_agg_configs_for_statistics, IntermediateAggConfig}; +use crate::planner::cleanup::get_sql_cleanup_param; use crate::StreamingEngine; pub struct ElasticSingleQueryProcessor { diff --git a/asap-planner-rs/src/planner/mod.rs b/asap-planner-rs/src/planner/mod.rs index ce9b172c..2e6fbb19 100644 --- a/asap-planner-rs/src/planner/mod.rs +++ b/asap-planner-rs/src/planner/mod.rs @@ -6,5 +6,6 @@ pub mod promql; pub mod sketch; pub mod sql; pub mod window; +pub mod elastic_dsl; pub use agg_config::*; pub use promql::*; From 018fc81cc2c8e77ab78431c01ffb1ba3d8fb8f58 Mon Sep 17 00:00:00 2001 From: Eric Wang Date: Tue, 12 May 2026 16:50:57 -0400 Subject: [PATCH 05/10] Cargo fmt and clippy. --- .../rs/asap_types/src/inference_config.rs | 10 +- .../rs/elastic_dsl_utilities/src/lib.rs | 7 +- asap-planner-rs/src/elastic_dsl/controller.rs | 5 +- asap-planner-rs/src/elastic_dsl/generator.rs | 2 +- asap-planner-rs/src/elastic_dsl/mod.rs | 2 +- asap-planner-rs/src/lib.rs | 8 +- asap-planner-rs/src/main.rs | 11 +- asap-planner-rs/src/planner/elastic_dsl.rs | 13 +- asap-planner-rs/src/planner/mod.rs | 2 +- .../tests/elastic_dsl_integration.rs | 188 ++++++++++-------- 10 files changed, 146 insertions(+), 102 deletions(-) diff --git a/asap-common/dependencies/rs/asap_types/src/inference_config.rs b/asap-common/dependencies/rs/asap_types/src/inference_config.rs index ff8da7de..6bd9a530 100644 --- a/asap-common/dependencies/rs/asap_types/src/inference_config.rs +++ b/asap-common/dependencies/rs/asap_types/src/inference_config.rs @@ -8,9 +8,9 @@ use crate::aggregation_reference::AggregationReference; use crate::enums::{CleanupPolicy, QueryLanguage}; use crate::promql_schema::PromQLSchema; use crate::query_config::QueryConfig; +use elastic_dsl_utilities::{ElasticIndexSchema, ElasticMappingSchema}; use promql_utilities::data_model::KeyByLabelNames; use sql_utilities::sqlhelper::{SQLSchema, Table}; -use elastic_dsl_utilities::{ElasticIndexSchema, ElasticMappingSchema}; /// Schema configuration that can be either PromQL or SQL format #[derive(Debug, Clone)] @@ -182,7 +182,9 @@ impl InferenceConfig { let metric_columns: HashSet = index_data .get("metric_columns") .and_then(|v| v.as_sequence()) - .ok_or_else(|| anyhow::anyhow!("Missing metric_columns field in elastic index {name}"))? + .ok_or_else(|| { + anyhow::anyhow!("Missing metric_columns field in elastic index {name}") + })? .iter() .filter_map(|v| v.as_str()) .map(|s| s.to_string()) @@ -191,7 +193,9 @@ impl InferenceConfig { let metadata_columns: HashSet = index_data .get("metadata_columns") .and_then(|v| v.as_sequence()) - .ok_or_else(|| anyhow::anyhow!("Missing metadata_columns field in elastic index {name}"))? + .ok_or_else(|| { + anyhow::anyhow!("Missing metadata_columns field in elastic index {name}") + })? .iter() .filter_map(|v| v.as_str()) .map(|s| s.to_string()) diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs index 695610f7..b8f2649f 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs @@ -8,7 +8,6 @@ pub use helpers::*; use std::collections::{HashMap, HashSet}; - #[derive(Debug, Clone)] pub struct ElasticIndexSchema { pub time_field: String, @@ -58,7 +57,9 @@ impl ElasticMappingSchema { } pub fn get_metadata_columns(&self, index: &str) -> Option<&HashSet> { - self.config.get(index).map(|schema| &schema.metadata_columns) + self.config + .get(index) + .map(|schema| &schema.metadata_columns) } pub fn is_valid_metric_column(&self, index: &str, metric_column: &str) -> bool { @@ -72,4 +73,4 @@ impl ElasticMappingSchema { .map(|schema_columns| columns.iter().all(|c| schema_columns.contains(c))) .unwrap_or(false) } -} \ No newline at end of file +} diff --git a/asap-planner-rs/src/elastic_dsl/controller.rs b/asap-planner-rs/src/elastic_dsl/controller.rs index ae1bab3f..22df6872 100644 --- a/asap-planner-rs/src/elastic_dsl/controller.rs +++ b/asap-planner-rs/src/elastic_dsl/controller.rs @@ -1,10 +1,9 @@ - use std::path::Path; use crate::config::input::ElasticDSLControllerConfig; +use crate::elastic_dsl::generator::{generate_elastic_plan, ElasticRuntimeOptions}; use crate::error::ControllerError; use crate::planner_output::PlannerOutput; -use crate::elastic_dsl::generator::{ElasticRuntimeOptions, generate_elastic_plan}; pub struct ElasticController { config: ElasticDSLControllerConfig, @@ -43,4 +42,4 @@ impl ElasticController { std::fs::write(dir.join("inference_config.yaml"), inference_str)?; Ok(output) } -} \ No newline at end of file +} diff --git a/asap-planner-rs/src/elastic_dsl/generator.rs b/asap-planner-rs/src/elastic_dsl/generator.rs index 62e05a78..7cbe7c07 100644 --- a/asap-planner-rs/src/elastic_dsl/generator.rs +++ b/asap-planner-rs/src/elastic_dsl/generator.rs @@ -10,10 +10,10 @@ use crate::generator::{ build_aggregation_entry, build_queries_yaml, GeneratorOutput, KEY_AGGREGATIONS, KEY_CLEANUP_POLICY, KEY_NAME, KEY_QUERIES, }; -use elastic_dsl_utilities::ast_parsing::{extract_query_info, GroupBySpec, Predicate}; use crate::planner::agg_config::IntermediateAggConfig; use crate::planner::elastic_dsl::ElasticSingleQueryProcessor; use crate::StreamingEngine; +use elastic_dsl_utilities::ast_parsing::{extract_query_info, GroupBySpec, Predicate}; #[derive(Default)] struct ElasticIndexSchemaBuilder { diff --git a/asap-planner-rs/src/elastic_dsl/mod.rs b/asap-planner-rs/src/elastic_dsl/mod.rs index 626dbd05..78350ee4 100644 --- a/asap-planner-rs/src/elastic_dsl/mod.rs +++ b/asap-planner-rs/src/elastic_dsl/mod.rs @@ -1,4 +1,4 @@ pub mod controller; pub mod generator; pub use controller::ElasticController; -pub use generator::ElasticRuntimeOptions; \ No newline at end of file +pub use generator::ElasticRuntimeOptions; diff --git a/asap-planner-rs/src/lib.rs b/asap-planner-rs/src/lib.rs index 56faff3a..243b1821 100644 --- a/asap-planner-rs/src/lib.rs +++ b/asap-planner-rs/src/lib.rs @@ -1,4 +1,5 @@ pub mod config; +pub mod elastic_dsl; pub mod error; pub mod generator; pub mod planner; @@ -7,12 +8,13 @@ pub mod prometheus_client; pub mod promql; pub mod query_log; pub mod sql; -pub mod elastic_dsl; pub use asap_types::PromQLSchema; pub use config::input::ControllerConfig; -pub use config::input::SQLControllerConfig; pub use config::input::ElasticDSLControllerConfig; +pub use config::input::SQLControllerConfig; +pub use elastic_dsl::ElasticController; +pub use elastic_dsl::ElasticRuntimeOptions; pub use error::ControllerError; pub use generator::{GeneratorOutput, PuntedQuery}; pub use planner_output::PlannerOutput; @@ -20,8 +22,6 @@ pub use prometheus_client::build_schema_from_prometheus; pub use promql::Controller; pub use sql::SQLController; pub use sql::SQLRuntimeOptions; -pub use elastic_dsl::ElasticController; -pub use elastic_dsl::ElasticRuntimeOptions; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum StreamingEngine { diff --git a/asap-planner-rs/src/main.rs b/asap-planner-rs/src/main.rs index 3b88abd5..2035dada 100644 --- a/asap-planner-rs/src/main.rs +++ b/asap-planner-rs/src/main.rs @@ -1,4 +1,7 @@ -use asap_planner::{Controller, RuntimeOptions, SQLController, SQLRuntimeOptions, ElasticController, ElasticRuntimeOptions, StreamingEngine}; +use asap_planner::{ + Controller, ElasticController, ElasticRuntimeOptions, RuntimeOptions, SQLController, + SQLRuntimeOptions, StreamingEngine, +}; use asap_types::enums::QueryLanguage; use clap::Parser; use std::path::PathBuf; @@ -129,9 +132,9 @@ fn main() -> anyhow::Result<()> { let interval = args.data_ingestion_interval.ok_or_else(|| { anyhow::anyhow!("--data-ingestion-interval is required for Elasticsearch DSL mode") })?; - let config_path = args - .input_config - .ok_or_else(|| anyhow::anyhow!("--input_config is required for Elasticsearch DSL mode"))?; + let config_path = args.input_config.ok_or_else(|| { + anyhow::anyhow!("--input_config is required for Elasticsearch DSL mode") + })?; let opts = ElasticRuntimeOptions { streaming_engine: engine, data_ingestion_interval: interval, diff --git a/asap-planner-rs/src/planner/elastic_dsl.rs b/asap-planner-rs/src/planner/elastic_dsl.rs index ff763dd8..317f8567 100644 --- a/asap-planner-rs/src/planner/elastic_dsl.rs +++ b/asap-planner-rs/src/planner/elastic_dsl.rs @@ -7,10 +7,10 @@ use promql_utilities::query_logics::enums::{AggregationType, QueryTreatmentType, use crate::config::input::SketchParameterOverrides; use crate::error::ControllerError; -use crate::planner::sketch::build_sketch_parameters; -use crate::planner::window::IntermediateWindowConfig; use crate::planner::agg_config::{build_agg_configs_for_statistics, IntermediateAggConfig}; use crate::planner::cleanup::get_sql_cleanup_param; +use crate::planner::sketch::build_sketch_parameters; +use crate::planner::window::IntermediateWindowConfig; use crate::StreamingEngine; pub struct ElasticSingleQueryProcessor { @@ -126,7 +126,10 @@ fn get_elastic_statistics( match agg_type { ElasticAggregationType::Avg => { // AVG requires SUM and COUNT - Ok((QueryTreatmentType::Exact, vec![Statistic::Sum, Statistic::Count])) + Ok(( + QueryTreatmentType::Exact, + vec![Statistic::Sum, Statistic::Count], + )) } ElasticAggregationType::Sum => Ok((QueryTreatmentType::Approximate, vec![Statistic::Sum])), ElasticAggregationType::Min => Ok((QueryTreatmentType::Exact, vec![Statistic::Min])), @@ -145,7 +148,9 @@ fn get_elastic_statistics( } /// Extract field names from group by specification -fn get_group_by_fields(bucket_spec: &elastic_dsl_utilities::ast_parsing::GroupBySpec) -> Vec { +fn get_group_by_fields( + bucket_spec: &elastic_dsl_utilities::ast_parsing::GroupBySpec, +) -> Vec { use elastic_dsl_utilities::ast_parsing::GroupBySpec; match bucket_spec { GroupBySpec::Fields(fields) => fields.clone(), diff --git a/asap-planner-rs/src/planner/mod.rs b/asap-planner-rs/src/planner/mod.rs index 2e6fbb19..41a38081 100644 --- a/asap-planner-rs/src/planner/mod.rs +++ b/asap-planner-rs/src/planner/mod.rs @@ -1,11 +1,11 @@ pub mod agg_config; pub mod cleanup; +pub mod elastic_dsl; pub mod labels; pub mod patterns; pub mod promql; pub mod sketch; pub mod sql; pub mod window; -pub mod elastic_dsl; pub use agg_config::*; pub use promql::*; diff --git a/asap-planner-rs/tests/elastic_dsl_integration.rs b/asap-planner-rs/tests/elastic_dsl_integration.rs index a46c8d08..5e02813f 100644 --- a/asap-planner-rs/tests/elastic_dsl_integration.rs +++ b/asap-planner-rs/tests/elastic_dsl_integration.rs @@ -63,7 +63,10 @@ fn assert_index_schema( match inference_config.schema { SchemaConfig::ElasticQueryDSL(schema) => { - assert_eq!(schema.get_time_field(index), Some(&"@timestamp".to_string())); + assert_eq!( + schema.get_time_field(index), + Some(&"@timestamp".to_string()) + ); let metric_columns = schema.get_metric_columns(index).unwrap(); assert!(metric_columns.contains(metric_column)); @@ -77,7 +80,6 @@ fn assert_index_schema( } } - #[test] fn elastic_querydsl_emits_index_schema() { let opts = ElasticRuntimeOptions { @@ -92,7 +94,10 @@ fn elastic_querydsl_emits_index_schema() { match inference_config.schema { SchemaConfig::ElasticQueryDSL(schema) => { - assert_eq!(schema.get_time_field("metrics"), Some(&"@timestamp".to_string())); + assert_eq!( + schema.get_time_field("metrics"), + Some(&"@timestamp".to_string()) + ); let metric_columns = schema.get_metric_columns("metrics").unwrap(); assert!(metric_columns.contains("cpu_usage")); let metadata_columns = schema.get_metadata_columns("metrics").unwrap(); @@ -104,7 +109,7 @@ fn elastic_querydsl_emits_index_schema() { #[test] fn elastic_sum_produces_basic_plan_and_schema() { - let query = r#" + let query = r#" { "aggs": { "by_datacenter": { @@ -136,21 +141,27 @@ fn elastic_sum_produces_basic_plan_and_schema() { } } "#; - let out = elastic_output("metrics", query, 300); + let out = elastic_output("metrics", query, 300); - assert_eq!(out.streaming_aggregation_count(), 2); - assert_eq!(out.inference_query_count(), 1); - assert!(out.has_aggregation_type("CountMinSketch")); - assert!(out.has_aggregation_type("DeltaSetAggregator")); - assert!(out.all_tumbling_window_sizes_eq(300)); - assert_eq!(out.aggregation_table_name("CountMinSketch"), Some("metrics".to_string())); - assert_eq!(out.aggregation_value_column("CountMinSketch"), Some("cpu_usage".to_string())); - assert_index_schema(&out, "metrics", "cpu_usage", &["datacenter"]); + assert_eq!(out.streaming_aggregation_count(), 2); + assert_eq!(out.inference_query_count(), 1); + assert!(out.has_aggregation_type("CountMinSketch")); + assert!(out.has_aggregation_type("DeltaSetAggregator")); + assert!(out.all_tumbling_window_sizes_eq(300)); + assert_eq!( + out.aggregation_table_name("CountMinSketch"), + Some("metrics".to_string()) + ); + assert_eq!( + out.aggregation_value_column("CountMinSketch"), + Some("cpu_usage".to_string()) + ); + assert_index_schema(&out, "metrics", "cpu_usage", &["datacenter"]); } #[test] fn elastic_avg_produces_three_configs() { - let query = r#" + let query = r#" { "aggs": { "by_datacenter": { @@ -182,22 +193,28 @@ fn elastic_avg_produces_three_configs() { } } "#; - let out = elastic_output("metrics", query, 300); + let out = elastic_output("metrics", query, 300); - assert_eq!(out.streaming_aggregation_count(), 2); - assert_eq!(out.inference_query_count(), 1); - assert!(out.has_aggregation_type("MultipleSum")); - assert!(!out.has_aggregation_type("DeltaSetAggregator")); - assert!(out.all_tumbling_window_sizes_eq(300)); - assert_eq!(out.aggregation_table_name("MultipleSum"), Some("metrics".to_string())); - assert_eq!(out.aggregation_value_column("MultipleSum"), Some("cpu_usage".to_string())); + assert_eq!(out.streaming_aggregation_count(), 2); + assert_eq!(out.inference_query_count(), 1); + assert!(out.has_aggregation_type("MultipleSum")); + assert!(!out.has_aggregation_type("DeltaSetAggregator")); + assert!(out.all_tumbling_window_sizes_eq(300)); + assert_eq!( + out.aggregation_table_name("MultipleSum"), + Some("metrics".to_string()) + ); + assert_eq!( + out.aggregation_value_column("MultipleSum"), + Some("cpu_usage".to_string()) + ); - assert_index_schema(&out, "metrics", "cpu_usage", &["datacenter"]); + assert_index_schema(&out, "metrics", "cpu_usage", &["datacenter"]); } #[test] fn elastic_min_produces_exact_plan() { - let query = r#" + let query = r#" { "aggs": { "by_service": { @@ -229,21 +246,27 @@ fn elastic_min_produces_exact_plan() { } } "#; - let out = elastic_output("metrics", query, 300); + let out = elastic_output("metrics", query, 300); - assert_eq!(out.streaming_aggregation_count(), 1); - assert_eq!(out.inference_query_count(), 1); - assert!(out.has_aggregation_type("MultipleMinMax")); - assert!(!out.has_aggregation_type("DeltaSetAggregator")); - assert!(out.all_tumbling_window_sizes_eq(300)); - assert_eq!(out.aggregation_table_name("MultipleMinMax"), Some("metrics".to_string())); - assert_eq!(out.aggregation_value_column("MultipleMinMax"), Some("cpu_usage".to_string())); - assert_index_schema(&out, "metrics", "cpu_usage", &["service"]); + assert_eq!(out.streaming_aggregation_count(), 1); + assert_eq!(out.inference_query_count(), 1); + assert!(out.has_aggregation_type("MultipleMinMax")); + assert!(!out.has_aggregation_type("DeltaSetAggregator")); + assert!(out.all_tumbling_window_sizes_eq(300)); + assert_eq!( + out.aggregation_table_name("MultipleMinMax"), + Some("metrics".to_string()) + ); + assert_eq!( + out.aggregation_value_column("MultipleMinMax"), + Some("cpu_usage".to_string()) + ); + assert_index_schema(&out, "metrics", "cpu_usage", &["service"]); } #[test] fn elastic_percentiles_produce_kll_plan() { - let query = r#" + let query = r#" { "aggs": { "by_service": { @@ -276,21 +299,27 @@ fn elastic_percentiles_produce_kll_plan() { } } "#; - let out = elastic_output("metrics", query, 300); + let out = elastic_output("metrics", query, 300); - assert_eq!(out.streaming_aggregation_count(), 1); - assert_eq!(out.inference_query_count(), 1); - assert!(out.has_aggregation_type("DatasketchesKLL")); - assert!(!out.has_aggregation_type("DeltaSetAggregator")); - assert!(out.all_tumbling_window_sizes_eq(300)); - assert_eq!(out.aggregation_table_name("DatasketchesKLL"), Some("metrics".to_string())); - assert_eq!(out.aggregation_value_column("DatasketchesKLL"), Some("latency_ms".to_string())); - assert_index_schema(&out, "metrics", "latency_ms", &["service"]); + assert_eq!(out.streaming_aggregation_count(), 1); + assert_eq!(out.inference_query_count(), 1); + assert!(out.has_aggregation_type("DatasketchesKLL")); + assert!(!out.has_aggregation_type("DeltaSetAggregator")); + assert!(out.all_tumbling_window_sizes_eq(300)); + assert_eq!( + out.aggregation_table_name("DatasketchesKLL"), + Some("metrics".to_string()) + ); + assert_eq!( + out.aggregation_value_column("DatasketchesKLL"), + Some("latency_ms".to_string()) + ); + assert_index_schema(&out, "metrics", "latency_ms", &["service"]); } #[test] fn elastic_multi_index_schema_inference() { - let yaml = r#" + let yaml = r#" query_groups: - id: 1 index: metrics @@ -372,44 +401,47 @@ aggregate_cleanup: policy: read_based "#; - let mut file = NamedTempFile::new().unwrap(); - file.write_all(yaml.as_bytes()).unwrap(); + let mut file = NamedTempFile::new().unwrap(); + file.write_all(yaml.as_bytes()).unwrap(); - let opts = ElasticRuntimeOptions { - streaming_engine: StreamingEngine::Arroyo, - data_ingestion_interval: 15, - }; + let opts = ElasticRuntimeOptions { + streaming_engine: StreamingEngine::Arroyo, + data_ingestion_interval: 15, + }; - let c = ElasticController::from_file(Path::new(file.path()), opts).unwrap(); - let out = c.generate().unwrap(); + let c = ElasticController::from_file(Path::new(file.path()), opts).unwrap(); + let out = c.generate().unwrap(); - assert_eq!(out.streaming_aggregation_count(), 4); - assert_eq!(out.inference_query_count(), 2); - assert!(out.has_aggregation_type("MultipleSum")); - assert!(!out.has_aggregation_type("DeltaSetAggregator")); - assert!(out.all_tumbling_window_sizes_eq(300)); + assert_eq!(out.streaming_aggregation_count(), 4); + assert_eq!(out.inference_query_count(), 2); + assert!(out.has_aggregation_type("MultipleSum")); + assert!(!out.has_aggregation_type("DeltaSetAggregator")); + assert!(out.all_tumbling_window_sizes_eq(300)); - let inference_config = out - .to_inference_config(QueryLanguage::elastic_querydsl) - .unwrap(); + let inference_config = out + .to_inference_config(QueryLanguage::elastic_querydsl) + .unwrap(); - match inference_config.schema { - SchemaConfig::ElasticQueryDSL(schema) => { - assert_eq!(schema.get_time_field("metrics"), Some(&"@timestamp".to_string())); - let metric_columns = schema.get_metric_columns("metrics").unwrap(); - assert!(metric_columns.contains("cpu_usage")); - let metadata_columns = schema.get_metadata_columns("metrics").unwrap(); - assert!(metadata_columns.contains("datacenter")); + match inference_config.schema { + SchemaConfig::ElasticQueryDSL(schema) => { + assert_eq!( + schema.get_time_field("metrics"), + Some(&"@timestamp".to_string()) + ); + let metric_columns = schema.get_metric_columns("metrics").unwrap(); + assert!(metric_columns.contains("cpu_usage")); + let metadata_columns = schema.get_metadata_columns("metrics").unwrap(); + assert!(metadata_columns.contains("datacenter")); - assert_eq!( - schema.get_time_field("other_metrics"), - Some(&"@timestamp".to_string()) - ); - let other_metric_columns = schema.get_metric_columns("other_metrics").unwrap(); - assert!(other_metric_columns.contains("memory_usage")); - let other_metadata_columns = schema.get_metadata_columns("other_metrics").unwrap(); - assert!(other_metadata_columns.contains("service")); - } - other => panic!("expected elastic querydsl schema, got {:?}", other), + assert_eq!( + schema.get_time_field("other_metrics"), + Some(&"@timestamp".to_string()) + ); + let other_metric_columns = schema.get_metric_columns("other_metrics").unwrap(); + assert!(other_metric_columns.contains("memory_usage")); + let other_metadata_columns = schema.get_metadata_columns("other_metrics").unwrap(); + assert!(other_metadata_columns.contains("service")); } -} \ No newline at end of file + other => panic!("expected elastic querydsl schema, got {:?}", other), + } +} From 14aad8be1c4f0bee76b1e868b87b542359467169 Mon Sep 17 00:00:00 2001 From: Eric Wang Date: Tue, 19 May 2026 13:24:05 -0400 Subject: [PATCH 06/10] - Make criteria for inferring time field stricter: fieldname must either be "@timestamp", "timestamp", or end with "_time", and the range predicate must be datetime like. - Fix issue where lookback duration was not using time range specified by parsed ES DSL query. - Fix issue where rollup labels were same as group by labels for agg config -> should be all non group by (non metric) labels. --- .../src/ast_parsing/extract_info.rs | 7 ++-- .../rs/elastic_dsl_utilities/src/datemath.rs | 10 +++++ asap-planner-rs/src/elastic_dsl/generator.rs | 36 ++++++++++++------ asap-planner-rs/src/elastic_dsl/mod.rs | 1 + asap-planner-rs/src/lib.rs | 1 + asap-planner-rs/src/planner/elastic_dsl.rs | 37 ++++++++++++++----- 6 files changed, 68 insertions(+), 24 deletions(-) diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs index 5a4f7743..97ff91f0 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs @@ -214,10 +214,9 @@ fn infer_time_field(predicates: &[Predicate]) -> FieldName { let bound_is_time_like = gte.iter().chain(lte.iter()).any(|term| { matches!(term, TermValue::String(value) if TimeRange::parse_date_math(value.as_str(), 0).is_some()) }); - let looks_like_time_field = field == "@timestamp" - || field.contains("time") - || field.contains("timestamp") - || bound_is_time_like; + let looks_like_time_field = + (field == "@timestamp" || field == "timestamp" || field.ends_with("_time")) + && bound_is_time_like; if looks_like_time_field { return field.clone(); diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/datemath.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/datemath.rs index 64f2cf87..ed049bc4 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/datemath.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/datemath.rs @@ -9,6 +9,16 @@ pub struct ResolvedTimeRange { pub lte_ms: Option, } +impl ResolvedTimeRange { + /// Calculate the duration of the time range in milliseconds, if both bounds are present. + pub fn duration_ms(&self) -> Option { + match (self.gte_ms, self.lte_ms) { + (Some(gte), Some(lte)) if lte >= gte => Some((lte - gte) as u64), + _ => None, + } + } +} + /// An optional time range applied to a timestamp field. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct TimeRange { diff --git a/asap-planner-rs/src/elastic_dsl/generator.rs b/asap-planner-rs/src/elastic_dsl/generator.rs index 7cbe7c07..871d3949 100644 --- a/asap-planner-rs/src/elastic_dsl/generator.rs +++ b/asap-planner-rs/src/elastic_dsl/generator.rs @@ -15,14 +15,24 @@ use crate::planner::elastic_dsl::ElasticSingleQueryProcessor; use crate::StreamingEngine; use elastic_dsl_utilities::ast_parsing::{extract_query_info, GroupBySpec, Predicate}; -#[derive(Default)] -struct ElasticIndexSchemaBuilder { - time_field: Option, - metric_columns: IndexSet, - metadata_columns: IndexSet, +#[derive(Default, Clone)] +pub struct ElasticIndexSchemaBuilder { + pub index: String, + pub time_field: Option, + pub metric_columns: IndexSet, + pub metadata_columns: IndexSet, } impl ElasticIndexSchemaBuilder { + fn new(index: String) -> Self { + Self { + index, + time_field: None, + metric_columns: IndexSet::new(), + metadata_columns: IndexSet::new(), + } + } + fn update_from_query_info( &mut self, query_info: &elastic_dsl_utilities::ast_parsing::ElasticDSLQueryInfo, @@ -90,6 +100,7 @@ pub fn generate_elastic_plan( // index -> schema builder derived from the queries targeting that index let mut index_schema_builders: IndexMap = IndexMap::new(); + // First pass to build index schema builders from query info. for qg in &config.query_groups { let index = resolve_elastic_index(config, qg)?; for query_string in &qg.queries { @@ -102,14 +113,20 @@ pub fn generate_elastic_plan( index_schema_builders .entry(index.clone()) - .or_default() + .or_insert_with(|| ElasticIndexSchemaBuilder::new(index.clone())) .update_from_query_info(&query_info)?; + } + } + // Second pass to build aggregation configs and query mappings. + for qg in &config.query_groups { + let index = resolve_elastic_index(config, qg)?; + for query_string in &qg.queries { let processor = ElasticSingleQueryProcessor::new( query_string.clone(), qg.repetition_delay, opts.data_ingestion_interval, - index.clone(), + index_schema_builders[&index].clone(), opts.streaming_engine, config.sketch_parameters.clone(), cleanup_policy, @@ -133,9 +150,8 @@ pub fn generate_elastic_plan( id_map.insert(key.clone(), idx as u32 + 1); } - let streaming_yaml = build_elastic_streaming_yaml(config, &dedup_map, &id_map)?; + let streaming_yaml = build_elastic_streaming_yaml(&dedup_map, &id_map)?; let inference_yaml = build_elastic_inference_yaml( - config, cleanup_policy, &query_keys_map, &id_map, @@ -152,7 +168,6 @@ pub fn generate_elastic_plan( } fn build_elastic_streaming_yaml( - _config: &ElasticDSLControllerConfig, dedup_map: &IndexMap, id_map: &HashMap, ) -> Result { @@ -171,7 +186,6 @@ fn build_elastic_streaming_yaml( } fn build_elastic_inference_yaml( - _config: &ElasticDSLControllerConfig, cleanup_policy: CleanupPolicy, query_keys_map: &IndexMap)>>, id_map: &HashMap, diff --git a/asap-planner-rs/src/elastic_dsl/mod.rs b/asap-planner-rs/src/elastic_dsl/mod.rs index 78350ee4..6e19846a 100644 --- a/asap-planner-rs/src/elastic_dsl/mod.rs +++ b/asap-planner-rs/src/elastic_dsl/mod.rs @@ -1,4 +1,5 @@ pub mod controller; pub mod generator; pub use controller::ElasticController; +pub use generator::ElasticIndexSchemaBuilder; pub use generator::ElasticRuntimeOptions; diff --git a/asap-planner-rs/src/lib.rs b/asap-planner-rs/src/lib.rs index 243b1821..1adbce23 100644 --- a/asap-planner-rs/src/lib.rs +++ b/asap-planner-rs/src/lib.rs @@ -14,6 +14,7 @@ pub use config::input::ControllerConfig; pub use config::input::ElasticDSLControllerConfig; pub use config::input::SQLControllerConfig; pub use elastic_dsl::ElasticController; +pub use elastic_dsl::ElasticIndexSchemaBuilder; pub use elastic_dsl::ElasticRuntimeOptions; pub use error::ControllerError; pub use generator::{GeneratorOutput, PuntedQuery}; diff --git a/asap-planner-rs/src/planner/elastic_dsl.rs b/asap-planner-rs/src/planner/elastic_dsl.rs index 317f8567..f4bf100a 100644 --- a/asap-planner-rs/src/planner/elastic_dsl.rs +++ b/asap-planner-rs/src/planner/elastic_dsl.rs @@ -1,7 +1,9 @@ +use crate::ElasticIndexSchemaBuilder; use asap_types::enums::{CleanupPolicy, WindowType}; use elastic_dsl_utilities::ast_parsing::{ extract_query_info, AggregationType as ElasticAggregationType, }; +use elastic_dsl_utilities::range_query_to_time_range; use promql_utilities::data_model::KeyByLabelNames; use promql_utilities::query_logics::enums::{AggregationType, QueryTreatmentType, Statistic}; @@ -18,7 +20,7 @@ pub struct ElasticSingleQueryProcessor { t_repeat: u64, #[allow(dead_code)] data_ingestion_interval: u64, - index: String, + index_schema: ElasticIndexSchemaBuilder, #[allow(dead_code)] streaming_engine: StreamingEngine, sketch_parameters: Option, @@ -31,7 +33,7 @@ impl ElasticSingleQueryProcessor { query_string: String, t_repeat: u64, data_ingestion_interval: u64, - index: String, + index_schema: ElasticIndexSchemaBuilder, streaming_engine: StreamingEngine, sketch_parameters: Option, cleanup_policy: CleanupPolicy, @@ -40,7 +42,7 @@ impl ElasticSingleQueryProcessor { query_string, t_repeat, data_ingestion_interval, - index, + index_schema, streaming_engine, sketch_parameters, cleanup_policy, @@ -75,10 +77,19 @@ impl ElasticSingleQueryProcessor { let (spatial_output, rollup) = match &query_info.group_by_buckets { Some(bucket_spec) => { let group_fields = get_group_by_fields(bucket_spec); - let spatial = KeyByLabelNames::new(group_fields); - // For Elasticsearch, all potentially available fields become rollup - // when they're not in the group by - (spatial.clone(), spatial) + let temp: indexmap::IndexSet = group_fields.clone().into_iter().collect(); + let rollup = self + .index_schema + .metadata_columns + .difference(&temp) + .cloned() + .collect(); + ( + KeyByLabelNames { + labels: group_fields, + }, + KeyByLabelNames { labels: rollup }, + ) } None => (KeyByLabelNames::empty(), KeyByLabelNames::empty()), }; @@ -90,7 +101,7 @@ impl ElasticSingleQueryProcessor { &rollup, &window_cfg, &query_info.target_field, - Some(&self.index), + Some(&self.index_schema.index), Some(&target_field), "", // Elasticsearch doesn't have spatial filters like SQL |agg_type: AggregationType, agg_sub_type: &str| { @@ -104,8 +115,16 @@ impl ElasticSingleQueryProcessor { ) .map_err(ControllerError::ElasticDSLParse)?; + let time_range = query_info + .predicates + .first() + .and_then(|p| range_query_to_time_range(p, 0)); + let t_lookback = match time_range { + Some(tr) => tr.duration_ms().unwrap_or(self.t_repeat), + None => self.t_repeat, // Default to repetition delay if no time range found + }; + // Calculate cleanup param based on query's time window - let t_lookback = self.t_repeat; // Default to repetition delay let cleanup_param = if self.cleanup_policy == CleanupPolicy::NoCleanup { None } else { From 63016771939bb9a8e43bb552ae48e0456126e2ab Mon Sep 17 00:00:00 2001 From: Eric Wang Date: Tue, 19 May 2026 13:46:32 -0400 Subject: [PATCH 07/10] Change ElasticMappingSchema add_index to take mut ref instead of consuming to reflect expected behavior. --- asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs index b8f2649f..9eaccecc 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs @@ -43,9 +43,8 @@ impl ElasticMappingSchema { Self { config } } - pub fn add_index(mut self, index: String, schema: ElasticIndexSchema) -> Self { + pub fn add_index(&mut self, index: String, schema: ElasticIndexSchema) { self.config.insert(index, schema); - self } pub fn get_time_field(&self, index: &str) -> Option<&String> { From 6f6ea0c274517cafafa98154e2bd9b72d390ee98 Mon Sep 17 00:00:00 2001 From: Eric Wang Date: Tue, 19 May 2026 14:05:25 -0400 Subject: [PATCH 08/10] Fix infer time heuristic check. --- .../src/ast_parsing/extract_info.rs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs index 97ff91f0..3a697c0c 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs @@ -214,9 +214,10 @@ fn infer_time_field(predicates: &[Predicate]) -> FieldName { let bound_is_time_like = gte.iter().chain(lte.iter()).any(|term| { matches!(term, TermValue::String(value) if TimeRange::parse_date_math(value.as_str(), 0).is_some()) }); - let looks_like_time_field = - (field == "@timestamp" || field == "timestamp" || field.ends_with("_time")) - && bound_is_time_like; + let looks_like_time_field = field == "@timestamp" + || field == "timestamp" + || field.ends_with("_time") + || bound_is_time_like; if looks_like_time_field { return field.clone(); @@ -224,12 +225,6 @@ fn infer_time_field(predicates: &[Predicate]) -> FieldName { } } - for predicate in predicates { - if let Predicate::Range { field, .. } = predicate { - return field.clone(); - } - } - "@timestamp".to_string() } From 6014eda17d952dfae609467b074a31b79b9375c7 Mon Sep 17 00:00:00 2001 From: Eric Wang Date: Wed, 20 May 2026 11:36:43 -0400 Subject: [PATCH 09/10] Fix planner only extracting time range if it is the first predicate. Reject queries that use filter group by (not supported right now). --- asap-planner-rs/src/output/mod.rs | 4 --- asap-planner-rs/src/planner/elastic_dsl.rs | 40 ++++++++-------------- 2 files changed, 14 insertions(+), 30 deletions(-) delete mode 100644 asap-planner-rs/src/output/mod.rs diff --git a/asap-planner-rs/src/output/mod.rs b/asap-planner-rs/src/output/mod.rs deleted file mode 100644 index 773c921f..00000000 --- a/asap-planner-rs/src/output/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub mod generator; -pub mod sql_generator; -pub mod elastic_generator; -pub use generator::*; diff --git a/asap-planner-rs/src/planner/elastic_dsl.rs b/asap-planner-rs/src/planner/elastic_dsl.rs index f4bf100a..c48adf66 100644 --- a/asap-planner-rs/src/planner/elastic_dsl.rs +++ b/asap-planner-rs/src/planner/elastic_dsl.rs @@ -15,6 +15,8 @@ use crate::planner::sketch::build_sketch_parameters; use crate::planner::window::IntermediateWindowConfig; use crate::StreamingEngine; +use indexmap::IndexSet; + pub struct ElasticSingleQueryProcessor { query_string: String, t_repeat: u64, @@ -76,8 +78,11 @@ impl ElasticSingleQueryProcessor { // Determine spatial routing from group_by_buckets let (spatial_output, rollup) = match &query_info.group_by_buckets { Some(bucket_spec) => { - let group_fields = get_group_by_fields(bucket_spec); - let temp: indexmap::IndexSet = group_fields.clone().into_iter().collect(); + let group_fields = + get_group_by_fields(bucket_spec).ok_or(ControllerError::ElasticDSLParse( + "Only field-based grouping is supported in Elasticsearch DSL".to_string(), + ))?; + let temp: IndexSet = group_fields.clone().into_iter().collect(); let rollup = self .index_schema .metadata_columns @@ -100,7 +105,7 @@ impl ElasticSingleQueryProcessor { &spatial_output, &rollup, &window_cfg, - &query_info.target_field, + &target_field, Some(&self.index_schema.index), Some(&target_field), "", // Elasticsearch doesn't have spatial filters like SQL @@ -117,8 +122,9 @@ impl ElasticSingleQueryProcessor { let time_range = query_info .predicates - .first() - .and_then(|p| range_query_to_time_range(p, 0)); + .iter() + .filter_map(|p| range_query_to_time_range(p, 0)) + .next(); let t_lookback = match time_range { Some(tr) => tr.duration_ms().unwrap_or(self.t_repeat), None => self.t_repeat, // Default to repetition delay if no time range found @@ -169,28 +175,10 @@ fn get_elastic_statistics( /// Extract field names from group by specification fn get_group_by_fields( bucket_spec: &elastic_dsl_utilities::ast_parsing::GroupBySpec, -) -> Vec { +) -> Option> { use elastic_dsl_utilities::ast_parsing::GroupBySpec; match bucket_spec { - GroupBySpec::Fields(fields) => fields.clone(), - GroupBySpec::Filters(predicates) => { - // For filter-based grouping, we extract field names from predicates - let mut fields = Vec::new(); - for predicate in predicates { - match predicate { - elastic_dsl_utilities::ast_parsing::Predicate::Term { field, .. } => { - if !fields.contains(field) { - fields.push(field.clone()); - } - } - elastic_dsl_utilities::ast_parsing::Predicate::Range { field, .. } => { - if !fields.contains(field) { - fields.push(field.clone()); - } - } - } - } - fields - } + GroupBySpec::Fields(fields) => Some(fields.clone()), + GroupBySpec::Filters(_) => None, // We don't support filter-based group by in ES DSL for now, so return None to indicate unsupported query pattern. } } From 74c3865213246d28b06adacd4af6bcd3b16205b8 Mon Sep 17 00:00:00 2001 From: Eric Wang Date: Thu, 21 May 2026 15:47:58 -0400 Subject: [PATCH 10/10] Require explicit specification of time field for each query group (Elasticsearch index) in asap-planner. --- .../src/ast_parsing/extract_info.rs | 24 ------- .../src/ast_parsing/query_info.rs | 3 - asap-planner-rs/src/config/input.rs | 4 +- asap-planner-rs/src/elastic_dsl/generator.rs | 63 +++++-------------- asap-planner-rs/src/planner/elastic_dsl.rs | 10 ++- .../tests/elastic_dsl_integration.rs | 22 ++++--- asap-planner-rs/tests/elastic_example.yaml | 1 + 7 files changed, 38 insertions(+), 89 deletions(-) diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs index 3a697c0c..299af3b1 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs @@ -1,7 +1,6 @@ use crate::ast_parsing::query_info::{ AggregationType, ElasticDSLQueryInfo, FieldName, GroupBySpec, Predicate, TermValue, }; -use crate::datemath::TimeRange; use crate::helpers::strip_keyword_suffix; use elasticsearch_dsl_ast::{self as dsl}; use serde_json; @@ -37,10 +36,8 @@ pub fn walk_ast_and_extract_info(ast: &dsl::Search) -> Option Option { } } -fn infer_time_field(predicates: &[Predicate]) -> FieldName { - for predicate in predicates { - if let Predicate::Range { field, gte, lte } = predicate { - let bound_is_time_like = gte.iter().chain(lte.iter()).any(|term| { - matches!(term, TermValue::String(value) if TimeRange::parse_date_math(value.as_str(), 0).is_some()) - }); - let looks_like_time_field = field == "@timestamp" - || field == "timestamp" - || field.ends_with("_time") - || bound_is_time_like; - - if looks_like_time_field { - return field.clone(); - } - } - } - - "@timestamp".to_string() -} - #[cfg(test)] mod tests { use super::*; @@ -373,7 +350,6 @@ mod tests { let info = walk_ast_and_extract_info(&ast).expect("info should parse"); assert_eq!(info.target_field, "latency_ms"); - assert_eq!(info.time_field, "@timestamp"); assert_eq!(info.aggregation, AggregationType::Max); assert_eq!(info.predicates.len(), 2); assert_eq!( diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/query_info.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/query_info.rs index 7905bf48..ce82d705 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/query_info.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/query_info.rs @@ -6,7 +6,6 @@ pub type FieldName = String; pub struct ElasticDSLQueryInfo { // A distilled representation of an ElasticSearch DSL query, capturing the essential logic and structure. pub target_field: FieldName, // List of metrics being queried - pub time_field: FieldName, // Time field used by the query's range filter pub predicates: Vec, // Predicates applied to the query (e.g. filters in bool.filter) pub group_by_buckets: Option, // Grouping specification if the query includes a group by clause pub aggregation: AggregationType, // The statistic being computed (e.g. avg, sum, percentiles) @@ -17,14 +16,12 @@ impl ElasticDSLQueryInfo { pub fn new( target_field: FieldName, - time_field: FieldName, predicates: Vec, group_by_buckets: Option, aggregation: AggregationType, ) -> Self { Self { target_field, - time_field, predicates, group_by_buckets, aggregation, diff --git a/asap-planner-rs/src/config/input.rs b/asap-planner-rs/src/config/input.rs index 12369088..509b11b1 100644 --- a/asap-planner-rs/src/config/input.rs +++ b/asap-planner-rs/src/config/input.rs @@ -141,7 +141,6 @@ pub struct TableDefinition { #[derive(Debug, Clone, Deserialize)] pub struct ElasticDSLControllerConfig { pub query_groups: Vec, - pub index: Option, pub sketch_parameters: Option, pub aggregate_cleanup: Option, } @@ -151,6 +150,7 @@ pub struct ElasticDSLQueryGroup { pub id: Option, pub queries: Vec, pub repetition_delay: u64, - pub index: Option, + pub index: String, + pub time_field: String, pub controller_options: ControllerOptions, } diff --git a/asap-planner-rs/src/elastic_dsl/generator.rs b/asap-planner-rs/src/elastic_dsl/generator.rs index 871d3949..51e0c846 100644 --- a/asap-planner-rs/src/elastic_dsl/generator.rs +++ b/asap-planner-rs/src/elastic_dsl/generator.rs @@ -18,16 +18,16 @@ use elastic_dsl_utilities::ast_parsing::{extract_query_info, GroupBySpec, Predic #[derive(Default, Clone)] pub struct ElasticIndexSchemaBuilder { pub index: String, - pub time_field: Option, + pub time_field: String, pub metric_columns: IndexSet, pub metadata_columns: IndexSet, } impl ElasticIndexSchemaBuilder { - fn new(index: String) -> Self { + fn new(index: String, time_field: String) -> Self { Self { index, - time_field: None, + time_field, metric_columns: IndexSet::new(), metadata_columns: IndexSet::new(), } @@ -37,23 +37,10 @@ impl ElasticIndexSchemaBuilder { &mut self, query_info: &elastic_dsl_utilities::ast_parsing::ElasticDSLQueryInfo, ) -> Result<(), ControllerError> { - match &self.time_field { - Some(existing) if existing != &query_info.time_field => { - return Err(ControllerError::PlannerError(format!( - "conflicting time fields for Elasticsearch index: '{}' vs '{}'", - existing, query_info.time_field - ))); - } - None => self.time_field = Some(query_info.time_field.clone()), - _ => {} - } - self.metric_columns.insert(query_info.target_field.clone()); - - for field in collect_elastic_metadata_fields(query_info) { + for field in collect_elastic_metadata_fields(query_info, &self.time_field) { self.metadata_columns.insert(field); } - Ok(()) } } @@ -102,7 +89,6 @@ pub fn generate_elastic_plan( // First pass to build index schema builders from query info. for qg in &config.query_groups { - let index = resolve_elastic_index(config, qg)?; for query_string in &qg.queries { let query_info = extract_query_info(query_string).ok_or_else(|| { ControllerError::ElasticDSLParse(format!( @@ -112,21 +98,22 @@ pub fn generate_elastic_plan( })?; index_schema_builders - .entry(index.clone()) - .or_insert_with(|| ElasticIndexSchemaBuilder::new(index.clone())) + .entry(qg.index.clone()) + .or_insert_with(|| { + ElasticIndexSchemaBuilder::new(qg.index.clone(), qg.time_field.clone()) + }) .update_from_query_info(&query_info)?; } } // Second pass to build aggregation configs and query mappings. for qg in &config.query_groups { - let index = resolve_elastic_index(config, qg)?; for query_string in &qg.queries { let processor = ElasticSingleQueryProcessor::new( query_string.clone(), qg.repetition_delay, opts.data_ingestion_interval, - index_schema_builders[&index].clone(), + index_schema_builders[&qg.index].clone(), opts.streaming_engine, config.sketch_parameters.clone(), cleanup_policy, @@ -227,12 +214,7 @@ fn build_elastic_index_yaml(index_name: &str, builder: &ElasticIndexSchemaBuilde ); map.insert( YamlValue::String("time_field".to_string()), - YamlValue::String( - builder - .time_field - .clone() - .unwrap_or_else(|| "@timestamp".to_string()), - ), + YamlValue::String(builder.time_field.clone()), ); map.insert( YamlValue::String("metric_columns".to_string()), @@ -260,36 +242,21 @@ fn build_elastic_index_yaml(index_name: &str, builder: &ElasticIndexSchemaBuilde YamlValue::Mapping(map) } -fn resolve_elastic_index( - config: &ElasticDSLControllerConfig, - query_group: &crate::config::input::ElasticDSLQueryGroup, -) -> Result { - query_group - .index - .clone() - .or_else(|| config.index.clone()) - .ok_or_else(|| { - ControllerError::PlannerError( - "each Elasticsearch query group must specify an index (or inherit one from the controller config)" - .to_string(), - ) - }) -} - fn collect_elastic_metadata_fields( query_info: &elastic_dsl_utilities::ast_parsing::ElasticDSLQueryInfo, + time_field: &str, ) -> IndexSet { let mut fields = IndexSet::new(); for predicate in &query_info.predicates { match predicate { Predicate::Term { field, .. } => { - if field != &query_info.time_field { + if field != time_field { fields.insert(field.clone()); } } Predicate::Range { field, .. } => { - if field != &query_info.time_field { + if field != time_field { fields.insert(field.clone()); } } @@ -300,7 +267,7 @@ fn collect_elastic_metadata_fields( match group_by_buckets { GroupBySpec::Fields(group_fields) => { for field in group_fields { - if field != &query_info.time_field { + if field != time_field { fields.insert(field.clone()); } } @@ -309,7 +276,7 @@ fn collect_elastic_metadata_fields( for predicate in predicates { match predicate { Predicate::Term { field, .. } | Predicate::Range { field, .. } => { - if field != &query_info.time_field { + if field != time_field { fields.insert(field.clone()); } } diff --git a/asap-planner-rs/src/planner/elastic_dsl.rs b/asap-planner-rs/src/planner/elastic_dsl.rs index c48adf66..709f3907 100644 --- a/asap-planner-rs/src/planner/elastic_dsl.rs +++ b/asap-planner-rs/src/planner/elastic_dsl.rs @@ -3,7 +3,7 @@ use asap_types::enums::{CleanupPolicy, WindowType}; use elastic_dsl_utilities::ast_parsing::{ extract_query_info, AggregationType as ElasticAggregationType, }; -use elastic_dsl_utilities::range_query_to_time_range; +use elastic_dsl_utilities::{range_query_to_time_range, Predicate}; use promql_utilities::data_model::KeyByLabelNames; use promql_utilities::query_logics::enums::{AggregationType, QueryTreatmentType, Statistic}; @@ -120,11 +120,15 @@ impl ElasticSingleQueryProcessor { ) .map_err(ControllerError::ElasticDSLParse)?; + let time_field = self.index_schema.time_field.clone(); let time_range = query_info .predicates .iter() - .filter_map(|p| range_query_to_time_range(p, 0)) - .next(); + .find(|p| match p { + Predicate::Range { field, .. } => field == &time_field, + _ => false, + }) + .and_then(|p| range_query_to_time_range(p, 0)); let t_lookback = match time_range { Some(tr) => tr.duration_ms().unwrap_or(self.t_repeat), None => self.t_repeat, // Default to repetition delay if no time range found diff --git a/asap-planner-rs/tests/elastic_dsl_integration.rs b/asap-planner-rs/tests/elastic_dsl_integration.rs index 5e02813f..d46cdf49 100644 --- a/asap-planner-rs/tests/elastic_dsl_integration.rs +++ b/asap-planner-rs/tests/elastic_dsl_integration.rs @@ -13,12 +13,13 @@ fn indent_block(text: &str, indent: usize) -> String { .join("\n") } -fn elastic_yaml(index: &str, query: &str, t_repeat: u64) -> String { +fn elastic_yaml(index: &str, time_field: &str, query: &str, t_repeat: u64) -> String { format!( r#" query_groups: - id: 1 index: {index} + time_field: {time_field} queries: - | {query} @@ -30,13 +31,14 @@ aggregate_cleanup: policy: read_based "#, index = index, + time_field = time_field, query = indent_block(query, 8), t_repeat = t_repeat, ) } -fn elastic_output(index: &str, query: &str, t_repeat: u64) -> PlannerOutput { - let yaml = elastic_yaml(index, query, t_repeat); +fn elastic_output(index: &str, time_field: &str, query: &str, t_repeat: u64) -> PlannerOutput { + let yaml = elastic_yaml(index, time_field, query, t_repeat); let mut file = NamedTempFile::new().unwrap(); file.write_all(yaml.as_bytes()).unwrap(); @@ -141,7 +143,7 @@ fn elastic_sum_produces_basic_plan_and_schema() { } } "#; - let out = elastic_output("metrics", query, 300); + let out = elastic_output("metrics", "\"@timestamp\"", query, 300); assert_eq!(out.streaming_aggregation_count(), 2); assert_eq!(out.inference_query_count(), 1); @@ -193,7 +195,7 @@ fn elastic_avg_produces_three_configs() { } } "#; - let out = elastic_output("metrics", query, 300); + let out = elastic_output("metrics", "\"@timestamp\"", query, 300); assert_eq!(out.streaming_aggregation_count(), 2); assert_eq!(out.inference_query_count(), 1); @@ -246,7 +248,7 @@ fn elastic_min_produces_exact_plan() { } } "#; - let out = elastic_output("metrics", query, 300); + let out = elastic_output("metrics", "\"@timestamp\"", query, 300); assert_eq!(out.streaming_aggregation_count(), 1); assert_eq!(out.inference_query_count(), 1); @@ -299,7 +301,7 @@ fn elastic_percentiles_produce_kll_plan() { } } "#; - let out = elastic_output("metrics", query, 300); + let out = elastic_output("metrics", "\"@timestamp\"", query, 300); assert_eq!(out.streaming_aggregation_count(), 1); assert_eq!(out.inference_query_count(), 1); @@ -323,6 +325,7 @@ fn elastic_multi_index_schema_inference() { query_groups: - id: 1 index: metrics + time_field: "@timestamp" queries: - | { @@ -361,6 +364,7 @@ query_groups: latency_sla: 1.0 - id: 2 index: other_metrics + time_field: "timestamp" queries: - | { @@ -383,7 +387,7 @@ query_groups: "filter": [ { "range": { - "@timestamp": { + "timestamp": { "gte": "now-5m", "lte": "now" } @@ -435,7 +439,7 @@ aggregate_cleanup: assert_eq!( schema.get_time_field("other_metrics"), - Some(&"@timestamp".to_string()) + Some(&"timestamp".to_string()) ); let other_metric_columns = schema.get_metric_columns("other_metrics").unwrap(); assert!(other_metric_columns.contains("memory_usage")); diff --git a/asap-planner-rs/tests/elastic_example.yaml b/asap-planner-rs/tests/elastic_example.yaml index 9e431d90..ac311de7 100644 --- a/asap-planner-rs/tests/elastic_example.yaml +++ b/asap-planner-rs/tests/elastic_example.yaml @@ -1,6 +1,7 @@ query_groups: - id: 1 index: metrics + time_field: "@timestamp" queries: - | {