diff --git a/Cargo.lock b/Cargo.lock index e6f69253..8e95d34a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -393,6 +393,7 @@ dependencies = [ "asap_types", "chrono", "clap 4.6.1", + "elastic_dsl_utilities", "indexmap 2.14.0", "pretty_assertions", "promql-parser", @@ -433,6 +434,7 @@ version = "0.3.0" dependencies = [ "anyhow", "clap 4.6.1", + "elastic_dsl_utilities", "promql_utilities", "serde", "serde_json", diff --git a/asap-common/dependencies/rs/asap_types/Cargo.toml b/asap-common/dependencies/rs/asap_types/Cargo.toml index 6ca70998..5d22cdaa 100644 --- a/asap-common/dependencies/rs/asap_types/Cargo.toml +++ b/asap-common/dependencies/rs/asap_types/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true [dependencies] promql_utilities.workspace = true +elastic_dsl_utilities.workspace = true tracing.workspace = true sql_utilities.workspace = true serde.workspace = true diff --git a/asap-common/dependencies/rs/asap_types/src/inference_config.rs b/asap-common/dependencies/rs/asap_types/src/inference_config.rs index 8ddb3ca6..6bd9a530 100644 --- a/asap-common/dependencies/rs/asap_types/src/inference_config.rs +++ b/asap-common/dependencies/rs/asap_types/src/inference_config.rs @@ -8,6 +8,7 @@ use crate::aggregation_reference::AggregationReference; use crate::enums::{CleanupPolicy, QueryLanguage}; use crate::promql_schema::PromQLSchema; use crate::query_config::QueryConfig; +use elastic_dsl_utilities::{ElasticIndexSchema, ElasticMappingSchema}; use promql_utilities::data_model::KeyByLabelNames; use sql_utilities::sqlhelper::{SQLSchema, Table}; @@ -16,7 +17,7 @@ use sql_utilities::sqlhelper::{SQLSchema, Table}; pub enum SchemaConfig { PromQL(PromQLSchema), SQL(SQLSchema), - ElasticQueryDSL, + ElasticQueryDSL(ElasticMappingSchema), ElasticSQL(SQLSchema), } @@ -32,7 +33,9 @@ impl InferenceConfig { let schema = match query_language { QueryLanguage::promql => SchemaConfig::PromQL(PromQLSchema::new()), QueryLanguage::sql => SchemaConfig::SQL(SQLSchema::new(Vec::new())), - QueryLanguage::elastic_querydsl => SchemaConfig::ElasticQueryDSL, + QueryLanguage::elastic_querydsl => { + SchemaConfig::ElasticQueryDSL(ElasticMappingSchema::new(Vec::new())) + } QueryLanguage::elastic_sql => SchemaConfig::ElasticSQL(SQLSchema::new(Vec::new())), }; Self { @@ -60,7 +63,10 @@ impl InferenceConfig { let sql_schema = Self::parse_sql_schema(data)?; SchemaConfig::SQL(sql_schema) } - QueryLanguage::elastic_querydsl => SchemaConfig::ElasticQueryDSL, + QueryLanguage::elastic_querydsl => { + let elastic_schema = Self::parse_elastic_querydsl_schema(data)?; + SchemaConfig::ElasticQueryDSL(elastic_schema) + } QueryLanguage::elastic_sql => { let sql_schema = Self::parse_sql_schema(data)?; SchemaConfig::SQL(sql_schema) @@ -153,6 +159,57 @@ impl InferenceConfig { Ok(SQLSchema::new(tables)) } + /// Parse Elasticsearch mapping schema from YAML data (indices: key at top level). + fn parse_elastic_querydsl_schema(data: &Value) -> Result { + let Some(indices_data) = data.get("indices").and_then(|v| v.as_sequence()) else { + return Ok(ElasticMappingSchema::new(Vec::new())); + }; + + let mut indices = Vec::new(); + for index_data in indices_data { + let name = index_data + .get("name") + .and_then(|v| v.as_str()) + .ok_or_else(|| anyhow::anyhow!("Missing name field in elastic index"))? + .to_string(); + + let time_field = index_data + .get("time_field") + .and_then(|v| v.as_str()) + .ok_or_else(|| anyhow::anyhow!("Missing time_field field in elastic index {name}"))? + .to_string(); + + let metric_columns: HashSet = index_data + .get("metric_columns") + .and_then(|v| v.as_sequence()) + .ok_or_else(|| { + anyhow::anyhow!("Missing metric_columns field in elastic index {name}") + })? + .iter() + .filter_map(|v| v.as_str()) + .map(|s| s.to_string()) + .collect(); + + let metadata_columns: HashSet = index_data + .get("metadata_columns") + .and_then(|v| v.as_sequence()) + .ok_or_else(|| { + anyhow::anyhow!("Missing metadata_columns field in elastic index {name}") + })? + .iter() + .filter_map(|v| v.as_str()) + .map(|s| s.to_string()) + .collect(); + + indices.push(( + name, + ElasticIndexSchema::new(time_field, metric_columns, metadata_columns), + )); + } + + Ok(ElasticMappingSchema::new(indices)) + } + /// Parse cleanup policy from YAML data. Errors if not specified. fn parse_cleanup_policy(data: &Value) -> Result { let cleanup_policy_data = data.get("cleanup_policy").ok_or_else(|| { diff --git a/asap-common/dependencies/rs/asap_types/src/streaming_config.rs b/asap-common/dependencies/rs/asap_types/src/streaming_config.rs index 5a7b800c..6833f81d 100644 --- a/asap-common/dependencies/rs/asap_types/src/streaming_config.rs +++ b/asap-common/dependencies/rs/asap_types/src/streaming_config.rs @@ -72,7 +72,7 @@ impl StreamingConfig { .map(|ic| match &ic.schema { SchemaConfig::PromQL(_) => QueryLanguage::promql, SchemaConfig::SQL(_) => QueryLanguage::sql, - SchemaConfig::ElasticQueryDSL => QueryLanguage::elastic_querydsl, + SchemaConfig::ElasticQueryDSL(_) => QueryLanguage::elastic_querydsl, SchemaConfig::ElasticSQL(_) => QueryLanguage::elastic_sql, }) .unwrap_or(QueryLanguage::promql); // Default to promql if no inference_config diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/datemath.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/datemath.rs index 64f2cf87..ed049bc4 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/datemath.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/datemath.rs @@ -9,6 +9,16 @@ pub struct ResolvedTimeRange { pub lte_ms: Option, } +impl ResolvedTimeRange { + /// Calculate the duration of the time range in milliseconds, if both bounds are present. + pub fn duration_ms(&self) -> Option { + match (self.gte_ms, self.lte_ms) { + (Some(gte), Some(lte)) if lte >= gte => Some((lte - gte) as u64), + _ => None, + } + } +} + /// An optional time range applied to a timestamp field. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct TimeRange { diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs index 4a86ba8b..9eaccecc 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs @@ -5,3 +5,71 @@ pub mod helpers; pub use ast_parsing::*; pub use datemath::*; pub use helpers::*; + +use std::collections::{HashMap, HashSet}; + +#[derive(Debug, Clone)] +pub struct ElasticIndexSchema { + pub time_field: String, + pub metric_columns: HashSet, + pub metadata_columns: HashSet, +} + +impl ElasticIndexSchema { + pub fn new( + time_field: String, + metric_columns: HashSet, + metadata_columns: HashSet, + ) -> Self { + Self { + time_field, + metric_columns, + metadata_columns, + } + } +} + +#[derive(Debug, Clone)] +pub struct ElasticMappingSchema { + pub config: HashMap, +} + +impl ElasticMappingSchema { + pub fn new(indexes: Vec<(String, ElasticIndexSchema)>) -> Self { + let mut config = HashMap::new(); + for (index_name, index_schema) in indexes { + config.insert(index_name, index_schema); + } + Self { config } + } + + pub fn add_index(&mut self, index: String, schema: ElasticIndexSchema) { + self.config.insert(index, schema); + } + + pub fn get_time_field(&self, index: &str) -> Option<&String> { + self.config.get(index).map(|schema| &schema.time_field) + } + + pub fn get_metric_columns(&self, index: &str) -> Option<&HashSet> { + self.config.get(index).map(|schema| &schema.metric_columns) + } + + pub fn get_metadata_columns(&self, index: &str) -> Option<&HashSet> { + self.config + .get(index) + .map(|schema| &schema.metadata_columns) + } + + pub fn is_valid_metric_column(&self, index: &str, metric_column: &str) -> bool { + self.get_metric_columns(index) + .map(|columns| columns.contains(metric_column)) + .unwrap_or(false) + } + + pub fn are_valid_metadata_columns(&self, index: &str, columns: &HashSet) -> bool { + self.get_metadata_columns(index) + .map(|schema_columns| columns.iter().all(|c| schema_columns.contains(c))) + .unwrap_or(false) + } +} diff --git a/asap-planner-rs/Cargo.toml b/asap-planner-rs/Cargo.toml index abeaf025..53a32fc7 100644 --- a/asap-planner-rs/Cargo.toml +++ b/asap-planner-rs/Cargo.toml @@ -15,6 +15,7 @@ path = "src/main.rs" asap_types.workspace = true promql_utilities.workspace = true sql_utilities.workspace = true +elastic_dsl_utilities.workspace = true sqlparser = "0.59.0" serde.workspace = true serde_json.workspace = true diff --git a/asap-planner-rs/src/config/input.rs b/asap-planner-rs/src/config/input.rs index 55422bc3..509b11b1 100644 --- a/asap-planner-rs/src/config/input.rs +++ b/asap-planner-rs/src/config/input.rs @@ -137,3 +137,20 @@ pub struct TableDefinition { pub value_columns: Vec, pub metadata_columns: Vec, } + +#[derive(Debug, Clone, Deserialize)] +pub struct ElasticDSLControllerConfig { + pub query_groups: Vec, + pub sketch_parameters: Option, + pub aggregate_cleanup: Option, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct ElasticDSLQueryGroup { + pub id: Option, + pub queries: Vec, + pub repetition_delay: u64, + pub index: String, + pub time_field: String, + pub controller_options: ControllerOptions, +} diff --git a/asap-planner-rs/src/elastic_dsl/controller.rs b/asap-planner-rs/src/elastic_dsl/controller.rs new file mode 100644 index 00000000..22df6872 --- /dev/null +++ b/asap-planner-rs/src/elastic_dsl/controller.rs @@ -0,0 +1,45 @@ +use std::path::Path; + +use crate::config::input::ElasticDSLControllerConfig; +use crate::elastic_dsl::generator::{generate_elastic_plan, ElasticRuntimeOptions}; +use crate::error::ControllerError; +use crate::planner_output::PlannerOutput; + +pub struct ElasticController { + config: ElasticDSLControllerConfig, + options: ElasticRuntimeOptions, +} + +impl ElasticController { + pub fn new(config: ElasticDSLControllerConfig, options: ElasticRuntimeOptions) -> Self { + Self { config, options } + } + + pub fn from_file(path: &Path, opts: ElasticRuntimeOptions) -> Result { + let yaml_str = std::fs::read_to_string(path)?; + Self::from_yaml(&yaml_str, opts) + } + + pub fn from_yaml(yaml: &str, opts: ElasticRuntimeOptions) -> Result { + let config: ElasticDSLControllerConfig = serde_yaml::from_str(yaml)?; + Ok(Self { + config, + options: opts, + }) + } + + pub fn generate(&self) -> Result { + let output = generate_elastic_plan(&self.config, &self.options)?; + Ok(PlannerOutput::from_output(output)) + } + + pub fn generate_to_dir(&self, dir: &Path) -> Result { + let output = self.generate()?; + std::fs::create_dir_all(dir)?; + let streaming_str = serde_yaml::to_string(output.streaming_yaml())?; + let inference_str = serde_yaml::to_string(output.inference_yaml())?; + std::fs::write(dir.join("streaming_config.yaml"), streaming_str)?; + std::fs::write(dir.join("inference_config.yaml"), inference_str)?; + Ok(output) + } +} diff --git a/asap-planner-rs/src/elastic_dsl/generator.rs b/asap-planner-rs/src/elastic_dsl/generator.rs new file mode 100644 index 00000000..51e0c846 --- /dev/null +++ b/asap-planner-rs/src/elastic_dsl/generator.rs @@ -0,0 +1,290 @@ +use asap_types::enums::CleanupPolicy; +use indexmap::IndexMap; +use indexmap::IndexSet; +use serde_yaml::Value as YamlValue; +use std::collections::HashMap; + +use crate::config::input::ElasticDSLControllerConfig; +use crate::error::ControllerError; +use crate::generator::{ + build_aggregation_entry, build_queries_yaml, GeneratorOutput, KEY_AGGREGATIONS, + KEY_CLEANUP_POLICY, KEY_NAME, KEY_QUERIES, +}; +use crate::planner::agg_config::IntermediateAggConfig; +use crate::planner::elastic_dsl::ElasticSingleQueryProcessor; +use crate::StreamingEngine; +use elastic_dsl_utilities::ast_parsing::{extract_query_info, GroupBySpec, Predicate}; + +#[derive(Default, Clone)] +pub struct ElasticIndexSchemaBuilder { + pub index: String, + pub time_field: String, + pub metric_columns: IndexSet, + pub metadata_columns: IndexSet, +} + +impl ElasticIndexSchemaBuilder { + fn new(index: String, time_field: String) -> Self { + Self { + index, + time_field, + metric_columns: IndexSet::new(), + metadata_columns: IndexSet::new(), + } + } + + fn update_from_query_info( + &mut self, + query_info: &elastic_dsl_utilities::ast_parsing::ElasticDSLQueryInfo, + ) -> Result<(), ControllerError> { + self.metric_columns.insert(query_info.target_field.clone()); + for field in collect_elastic_metadata_fields(query_info, &self.time_field) { + self.metadata_columns.insert(field); + } + Ok(()) + } +} + +pub struct ElasticRuntimeOptions { + pub streaming_engine: StreamingEngine, + pub data_ingestion_interval: u64, +} + +pub fn generate_elastic_plan( + config: &ElasticDSLControllerConfig, + opts: &ElasticRuntimeOptions, +) -> Result { + let cleanup_policy = config + .aggregate_cleanup + .as_ref() + .and_then(|c| c.policy) + .unwrap_or(CleanupPolicy::ReadBased); + + // Validate T % data_ingestion_interval == 0 + for qg in &config.query_groups { + if qg.repetition_delay % opts.data_ingestion_interval != 0 { + return Err(ControllerError::PlannerError(format!( + "repetition_delay {} is not a multiple of data_ingestion_interval {}", + qg.repetition_delay, opts.data_ingestion_interval + ))); + } + } + + // Check for duplicate queries + let mut seen_queries = std::collections::HashSet::new(); + for qg in &config.query_groups { + for q in &qg.queries { + if !seen_queries.insert(q.clone()) { + return Err(ControllerError::DuplicateQuery(q.clone())); + } + } + } + + // Dedup map: identifying_key -> IntermediateAggConfig + let mut dedup_map: IndexMap = IndexMap::new(); + // query_string -> Vec<(key, cleanup_param)> + let mut query_keys_map: IndexMap)>> = IndexMap::new(); + // index -> schema builder derived from the queries targeting that index + let mut index_schema_builders: IndexMap = IndexMap::new(); + + // First pass to build index schema builders from query info. + for qg in &config.query_groups { + for query_string in &qg.queries { + let query_info = extract_query_info(query_string).ok_or_else(|| { + ControllerError::ElasticDSLParse(format!( + "Failed to parse Elasticsearch DSL query: {}", + query_string + )) + })?; + + index_schema_builders + .entry(qg.index.clone()) + .or_insert_with(|| { + ElasticIndexSchemaBuilder::new(qg.index.clone(), qg.time_field.clone()) + }) + .update_from_query_info(&query_info)?; + } + } + + // Second pass to build aggregation configs and query mappings. + for qg in &config.query_groups { + for query_string in &qg.queries { + let processor = ElasticSingleQueryProcessor::new( + query_string.clone(), + qg.repetition_delay, + opts.data_ingestion_interval, + index_schema_builders[&qg.index].clone(), + opts.streaming_engine, + config.sketch_parameters.clone(), + cleanup_policy, + ); + + let (configs, cleanup_param) = processor.get_streaming_aggregation_configs()?; + + let mut keys_for_query = Vec::new(); + for config_item in configs { + let key = config_item.identifying_key(); + keys_for_query.push((key.clone(), cleanup_param)); + dedup_map.entry(key).or_insert(config_item); + } + query_keys_map.insert(query_string.clone(), keys_for_query); + } + } + + // Assign sequential IDs + let mut id_map: HashMap = HashMap::new(); + for (idx, key) in dedup_map.keys().enumerate() { + id_map.insert(key.clone(), idx as u32 + 1); + } + + let streaming_yaml = build_elastic_streaming_yaml(&dedup_map, &id_map)?; + let inference_yaml = build_elastic_inference_yaml( + cleanup_policy, + &query_keys_map, + &id_map, + &index_schema_builders, + )?; + + Ok(GeneratorOutput { + punted_queries: Vec::new(), + streaming_yaml, + inference_yaml, + aggregation_count: dedup_map.len(), + query_count: query_keys_map.len(), + }) +} + +fn build_elastic_streaming_yaml( + dedup_map: &IndexMap, + id_map: &HashMap, +) -> Result { + let aggregations: Vec = dedup_map + .iter() + .map(|(key, cfg)| build_aggregation_entry(id_map[key], cfg)) + .collect(); + + let mut root = serde_yaml::Mapping::new(); + root.insert( + YamlValue::String(KEY_AGGREGATIONS.to_string()), + YamlValue::Sequence(aggregations), + ); + + Ok(YamlValue::Mapping(root)) +} + +fn build_elastic_inference_yaml( + cleanup_policy: CleanupPolicy, + query_keys_map: &IndexMap)>>, + id_map: &HashMap, + index_schema_builders: &IndexMap, +) -> Result { + let mut cleanup_map = serde_yaml::Mapping::new(); + cleanup_map.insert( + YamlValue::String(KEY_NAME.to_string()), + YamlValue::String(cleanup_policy.to_string()), + ); + + let mut root = serde_yaml::Mapping::new(); + root.insert( + YamlValue::String(KEY_CLEANUP_POLICY.to_string()), + YamlValue::Mapping(cleanup_map), + ); + root.insert( + YamlValue::String(KEY_QUERIES.to_string()), + YamlValue::Sequence(build_queries_yaml(cleanup_policy, query_keys_map, id_map)), + ); + root.insert( + YamlValue::String("indices".to_string()), + YamlValue::Sequence( + index_schema_builders + .iter() + .map(|(index_name, builder)| build_elastic_index_yaml(index_name, builder)) + .collect(), + ), + ); + + Ok(YamlValue::Mapping(root)) +} + +fn build_elastic_index_yaml(index_name: &str, builder: &ElasticIndexSchemaBuilder) -> YamlValue { + let mut map = serde_yaml::Mapping::new(); + map.insert( + YamlValue::String("name".to_string()), + YamlValue::String(index_name.to_string()), + ); + map.insert( + YamlValue::String("time_field".to_string()), + YamlValue::String(builder.time_field.clone()), + ); + map.insert( + YamlValue::String("metric_columns".to_string()), + YamlValue::Sequence( + builder + .metric_columns + .iter() + .cloned() + .map(YamlValue::String) + .collect(), + ), + ); + map.insert( + YamlValue::String("metadata_columns".to_string()), + YamlValue::Sequence( + builder + .metadata_columns + .iter() + .cloned() + .map(YamlValue::String) + .collect(), + ), + ); + + YamlValue::Mapping(map) +} + +fn collect_elastic_metadata_fields( + query_info: &elastic_dsl_utilities::ast_parsing::ElasticDSLQueryInfo, + time_field: &str, +) -> IndexSet { + let mut fields = IndexSet::new(); + + for predicate in &query_info.predicates { + match predicate { + Predicate::Term { field, .. } => { + if field != time_field { + fields.insert(field.clone()); + } + } + Predicate::Range { field, .. } => { + if field != time_field { + fields.insert(field.clone()); + } + } + } + } + + if let Some(group_by_buckets) = &query_info.group_by_buckets { + match group_by_buckets { + GroupBySpec::Fields(group_fields) => { + for field in group_fields { + if field != time_field { + fields.insert(field.clone()); + } + } + } + GroupBySpec::Filters(predicates) => { + for predicate in predicates { + match predicate { + Predicate::Term { field, .. } | Predicate::Range { field, .. } => { + if field != time_field { + fields.insert(field.clone()); + } + } + } + } + } + } + } + + fields +} diff --git a/asap-planner-rs/src/elastic_dsl/mod.rs b/asap-planner-rs/src/elastic_dsl/mod.rs new file mode 100644 index 00000000..6e19846a --- /dev/null +++ b/asap-planner-rs/src/elastic_dsl/mod.rs @@ -0,0 +1,5 @@ +pub mod controller; +pub mod generator; +pub use controller::ElasticController; +pub use generator::ElasticIndexSchemaBuilder; +pub use generator::ElasticRuntimeOptions; diff --git a/asap-planner-rs/src/error.rs b/asap-planner-rs/src/error.rs index 02748805..873c25c7 100644 --- a/asap-planner-rs/src/error.rs +++ b/asap-planner-rs/src/error.rs @@ -20,4 +20,8 @@ pub enum ControllerError { UnknownTable(String), #[error("Prometheus client error: {0}")] PrometheusClient(String), + #[error("Elasticsearch DSL parse error: {0}")] + ElasticDSLParse(String), + #[error("Unsupported Elasticsearch DSL query: {0}")] + UnsupportedElasticDSLQuery(String), } diff --git a/asap-planner-rs/src/lib.rs b/asap-planner-rs/src/lib.rs index c73dfbde..1adbce23 100644 --- a/asap-planner-rs/src/lib.rs +++ b/asap-planner-rs/src/lib.rs @@ -1,4 +1,5 @@ pub mod config; +pub mod elastic_dsl; pub mod error; pub mod generator; pub mod planner; @@ -10,7 +11,11 @@ pub mod sql; pub use asap_types::PromQLSchema; pub use config::input::ControllerConfig; +pub use config::input::ElasticDSLControllerConfig; pub use config::input::SQLControllerConfig; +pub use elastic_dsl::ElasticController; +pub use elastic_dsl::ElasticIndexSchemaBuilder; +pub use elastic_dsl::ElasticRuntimeOptions; pub use error::ControllerError; pub use generator::{GeneratorOutput, PuntedQuery}; pub use planner_output::PlannerOutput; diff --git a/asap-planner-rs/src/main.rs b/asap-planner-rs/src/main.rs index 863f4d4e..2035dada 100644 --- a/asap-planner-rs/src/main.rs +++ b/asap-planner-rs/src/main.rs @@ -1,4 +1,7 @@ -use asap_planner::{Controller, RuntimeOptions, SQLController, SQLRuntimeOptions, StreamingEngine}; +use asap_planner::{ + Controller, ElasticController, ElasticRuntimeOptions, RuntimeOptions, SQLController, + SQLRuntimeOptions, StreamingEngine, +}; use asap_types::enums::QueryLanguage; use clap::Parser; use std::path::PathBuf; @@ -126,7 +129,17 @@ fn main() -> anyhow::Result<()> { SQLController::from_file(&config_path, opts)?.generate_to_dir(&args.output_dir)?; } QueryLanguage::elastic_querydsl => { - anyhow::bail!("ElasticQueryDSL is not yet supported"); + let interval = args.data_ingestion_interval.ok_or_else(|| { + anyhow::anyhow!("--data-ingestion-interval is required for Elasticsearch DSL mode") + })?; + let config_path = args.input_config.ok_or_else(|| { + anyhow::anyhow!("--input_config is required for Elasticsearch DSL mode") + })?; + let opts = ElasticRuntimeOptions { + streaming_engine: engine, + data_ingestion_interval: interval, + }; + ElasticController::from_file(&config_path, opts)?.generate_to_dir(&args.output_dir)?; } } diff --git a/asap-planner-rs/src/planner/elastic_dsl.rs b/asap-planner-rs/src/planner/elastic_dsl.rs new file mode 100644 index 00000000..709f3907 --- /dev/null +++ b/asap-planner-rs/src/planner/elastic_dsl.rs @@ -0,0 +1,188 @@ +use crate::ElasticIndexSchemaBuilder; +use asap_types::enums::{CleanupPolicy, WindowType}; +use elastic_dsl_utilities::ast_parsing::{ + extract_query_info, AggregationType as ElasticAggregationType, +}; +use elastic_dsl_utilities::{range_query_to_time_range, Predicate}; +use promql_utilities::data_model::KeyByLabelNames; +use promql_utilities::query_logics::enums::{AggregationType, QueryTreatmentType, Statistic}; + +use crate::config::input::SketchParameterOverrides; +use crate::error::ControllerError; +use crate::planner::agg_config::{build_agg_configs_for_statistics, IntermediateAggConfig}; +use crate::planner::cleanup::get_sql_cleanup_param; +use crate::planner::sketch::build_sketch_parameters; +use crate::planner::window::IntermediateWindowConfig; +use crate::StreamingEngine; + +use indexmap::IndexSet; + +pub struct ElasticSingleQueryProcessor { + query_string: String, + t_repeat: u64, + #[allow(dead_code)] + data_ingestion_interval: u64, + index_schema: ElasticIndexSchemaBuilder, + #[allow(dead_code)] + streaming_engine: StreamingEngine, + sketch_parameters: Option, + cleanup_policy: CleanupPolicy, +} + +impl ElasticSingleQueryProcessor { + #[allow(clippy::too_many_arguments)] + pub fn new( + query_string: String, + t_repeat: u64, + data_ingestion_interval: u64, + index_schema: ElasticIndexSchemaBuilder, + streaming_engine: StreamingEngine, + sketch_parameters: Option, + cleanup_policy: CleanupPolicy, + ) -> Self { + Self { + query_string, + t_repeat, + data_ingestion_interval, + index_schema, + streaming_engine, + sketch_parameters, + cleanup_policy, + } + } + + pub fn get_streaming_aggregation_configs( + &self, + ) -> Result<(Vec, Option), ControllerError> { + // Parse and extract query info using utilities + let query_info = extract_query_info(&self.query_string).ok_or_else(|| { + ControllerError::ElasticDSLParse(format!( + "Failed to parse Elasticsearch DSL query: {}", + self.query_string + )) + })?; + + // Get aggregation type and statistics + let (treatment_type, statistics) = get_elastic_statistics(&query_info.aggregation)?; + + // Build window config (always tumbling for Elasticsearch queries) + let window_cfg = IntermediateWindowConfig { + window_size: self.t_repeat, + slide_interval: self.t_repeat, + window_type: WindowType::Tumbling, + }; + + // Extract target field and group by information + let target_field = query_info.target_field.clone(); + + // Determine spatial routing from group_by_buckets + let (spatial_output, rollup) = match &query_info.group_by_buckets { + Some(bucket_spec) => { + let group_fields = + get_group_by_fields(bucket_spec).ok_or(ControllerError::ElasticDSLParse( + "Only field-based grouping is supported in Elasticsearch DSL".to_string(), + ))?; + let temp: IndexSet = group_fields.clone().into_iter().collect(); + let rollup = self + .index_schema + .metadata_columns + .difference(&temp) + .cloned() + .collect(); + ( + KeyByLabelNames { + labels: group_fields, + }, + KeyByLabelNames { labels: rollup }, + ) + } + None => (KeyByLabelNames::empty(), KeyByLabelNames::empty()), + }; + + let configs = build_agg_configs_for_statistics( + &statistics, + treatment_type, + &spatial_output, + &rollup, + &window_cfg, + &target_field, + Some(&self.index_schema.index), + Some(&target_field), + "", // Elasticsearch doesn't have spatial filters like SQL + |agg_type: AggregationType, agg_sub_type: &str| { + build_sketch_parameters( + agg_type, + agg_sub_type, + None, + self.sketch_parameters.as_ref(), + ) + }, + ) + .map_err(ControllerError::ElasticDSLParse)?; + + let time_field = self.index_schema.time_field.clone(); + let time_range = query_info + .predicates + .iter() + .find(|p| match p { + Predicate::Range { field, .. } => field == &time_field, + _ => false, + }) + .and_then(|p| range_query_to_time_range(p, 0)); + let t_lookback = match time_range { + Some(tr) => tr.duration_ms().unwrap_or(self.t_repeat), + None => self.t_repeat, // Default to repetition delay if no time range found + }; + + // Calculate cleanup param based on query's time window + let cleanup_param = if self.cleanup_policy == CleanupPolicy::NoCleanup { + None + } else { + Some( + get_sql_cleanup_param(self.cleanup_policy, t_lookback, self.t_repeat) + .map_err(ControllerError::PlannerError)?, + ) + }; + + Ok((configs, cleanup_param)) + } +} + +/// Map Elasticsearch aggregation types to statistics and treatment types +fn get_elastic_statistics( + agg_type: &ElasticAggregationType, +) -> Result<(QueryTreatmentType, Vec), ControllerError> { + match agg_type { + ElasticAggregationType::Avg => { + // AVG requires SUM and COUNT + Ok(( + QueryTreatmentType::Exact, + vec![Statistic::Sum, Statistic::Count], + )) + } + ElasticAggregationType::Sum => Ok((QueryTreatmentType::Approximate, vec![Statistic::Sum])), + ElasticAggregationType::Min => Ok((QueryTreatmentType::Exact, vec![Statistic::Min])), + ElasticAggregationType::Max => Ok((QueryTreatmentType::Exact, vec![Statistic::Max])), + ElasticAggregationType::Percentiles(percents) => { + // For percentiles, we use quantile statistic + // Check that we have valid percentiles + if percents.is_empty() { + return Err(ControllerError::UnsupportedElasticDSLQuery( + "Percentiles aggregation must specify percentile values".to_string(), + )); + } + Ok((QueryTreatmentType::Approximate, vec![Statistic::Quantile])) + } + } +} + +/// Extract field names from group by specification +fn get_group_by_fields( + bucket_spec: &elastic_dsl_utilities::ast_parsing::GroupBySpec, +) -> Option> { + use elastic_dsl_utilities::ast_parsing::GroupBySpec; + match bucket_spec { + GroupBySpec::Fields(fields) => Some(fields.clone()), + GroupBySpec::Filters(_) => None, // We don't support filter-based group by in ES DSL for now, so return None to indicate unsupported query pattern. + } +} diff --git a/asap-planner-rs/src/planner/mod.rs b/asap-planner-rs/src/planner/mod.rs index ce9b172c..41a38081 100644 --- a/asap-planner-rs/src/planner/mod.rs +++ b/asap-planner-rs/src/planner/mod.rs @@ -1,5 +1,6 @@ pub mod agg_config; pub mod cleanup; +pub mod elastic_dsl; pub mod labels; pub mod patterns; pub mod promql; diff --git a/asap-planner-rs/tests/elastic_dsl_integration.rs b/asap-planner-rs/tests/elastic_dsl_integration.rs new file mode 100644 index 00000000..d46cdf49 --- /dev/null +++ b/asap-planner-rs/tests/elastic_dsl_integration.rs @@ -0,0 +1,451 @@ +use asap_planner::{ElasticController, ElasticRuntimeOptions, PlannerOutput, StreamingEngine}; +use asap_types::{QueryLanguage, SchemaConfig}; +use std::io::Write; +use std::path::Path; +use tempfile::NamedTempFile; + +fn indent_block(text: &str, indent: usize) -> String { + let padding = " ".repeat(indent); + text.trim() + .lines() + .map(|line| format!("{}{}", padding, line)) + .collect::>() + .join("\n") +} + +fn elastic_yaml(index: &str, time_field: &str, query: &str, t_repeat: u64) -> String { + format!( + r#" +query_groups: + - id: 1 + index: {index} + time_field: {time_field} + queries: + - | +{query} + repetition_delay: {t_repeat} + controller_options: + accuracy_sla: 0.95 + latency_sla: 1.0 +aggregate_cleanup: + policy: read_based +"#, + index = index, + time_field = time_field, + query = indent_block(query, 8), + t_repeat = t_repeat, + ) +} + +fn elastic_output(index: &str, time_field: &str, query: &str, t_repeat: u64) -> PlannerOutput { + let yaml = elastic_yaml(index, time_field, query, t_repeat); + let mut file = NamedTempFile::new().unwrap(); + file.write_all(yaml.as_bytes()).unwrap(); + + let opts = ElasticRuntimeOptions { + streaming_engine: StreamingEngine::Arroyo, + data_ingestion_interval: 15, + }; + + ElasticController::from_file(Path::new(file.path()), opts) + .unwrap() + .generate() + .unwrap() +} + +fn assert_index_schema( + out: &PlannerOutput, + index: &str, + metric_column: &str, + metadata_columns: &[&str], +) { + let inference_config = out + .to_inference_config(QueryLanguage::elastic_querydsl) + .unwrap(); + + match inference_config.schema { + SchemaConfig::ElasticQueryDSL(schema) => { + assert_eq!( + schema.get_time_field(index), + Some(&"@timestamp".to_string()) + ); + + let metric_columns = schema.get_metric_columns(index).unwrap(); + assert!(metric_columns.contains(metric_column)); + + let actual_metadata = schema.get_metadata_columns(index).unwrap(); + for column in metadata_columns { + assert!(actual_metadata.contains(*column)); + } + } + other => panic!("expected elastic querydsl schema, got {:?}", other), + } +} + +#[test] +fn elastic_querydsl_emits_index_schema() { + let opts = ElasticRuntimeOptions { + streaming_engine: StreamingEngine::Arroyo, + data_ingestion_interval: 15, + }; + let c = ElasticController::from_file(Path::new("tests/elastic_example.yaml"), opts).unwrap(); + let out = c.generate().unwrap(); + let inference_config = out + .to_inference_config(QueryLanguage::elastic_querydsl) + .unwrap(); + + match inference_config.schema { + SchemaConfig::ElasticQueryDSL(schema) => { + assert_eq!( + schema.get_time_field("metrics"), + Some(&"@timestamp".to_string()) + ); + let metric_columns = schema.get_metric_columns("metrics").unwrap(); + assert!(metric_columns.contains("cpu_usage")); + let metadata_columns = schema.get_metadata_columns("metrics").unwrap(); + assert!(metadata_columns.is_empty()); + } + other => panic!("expected elastic querydsl schema, got {:?}", other), + } +} + +#[test] +fn elastic_sum_produces_basic_plan_and_schema() { + let query = r#" +{ + "aggs": { + "by_datacenter": { + "terms": { + "field": "datacenter.keyword" + }, + "aggs": { + "sum_cpu": { + "sum": { + "field": "cpu_usage" + } + } + } + } + }, + "query": { + "bool": { + "filter": [ + { + "range": { + "@timestamp": { + "gte": "now-5m", + "lte": "now" + } + } + } + ] + } + } +} +"#; + let out = elastic_output("metrics", "\"@timestamp\"", query, 300); + + assert_eq!(out.streaming_aggregation_count(), 2); + assert_eq!(out.inference_query_count(), 1); + assert!(out.has_aggregation_type("CountMinSketch")); + assert!(out.has_aggregation_type("DeltaSetAggregator")); + assert!(out.all_tumbling_window_sizes_eq(300)); + assert_eq!( + out.aggregation_table_name("CountMinSketch"), + Some("metrics".to_string()) + ); + assert_eq!( + out.aggregation_value_column("CountMinSketch"), + Some("cpu_usage".to_string()) + ); + assert_index_schema(&out, "metrics", "cpu_usage", &["datacenter"]); +} + +#[test] +fn elastic_avg_produces_three_configs() { + let query = r#" +{ + "aggs": { + "by_datacenter": { + "terms": { + "field": "datacenter.keyword" + }, + "aggs": { + "avg_cpu": { + "avg": { + "field": "cpu_usage" + } + } + } + } + }, + "query": { + "bool": { + "filter": [ + { + "range": { + "@timestamp": { + "gte": "now-5m", + "lte": "now" + } + } + } + ] + } + } +} +"#; + let out = elastic_output("metrics", "\"@timestamp\"", query, 300); + + assert_eq!(out.streaming_aggregation_count(), 2); + assert_eq!(out.inference_query_count(), 1); + assert!(out.has_aggregation_type("MultipleSum")); + assert!(!out.has_aggregation_type("DeltaSetAggregator")); + assert!(out.all_tumbling_window_sizes_eq(300)); + assert_eq!( + out.aggregation_table_name("MultipleSum"), + Some("metrics".to_string()) + ); + assert_eq!( + out.aggregation_value_column("MultipleSum"), + Some("cpu_usage".to_string()) + ); + + assert_index_schema(&out, "metrics", "cpu_usage", &["datacenter"]); +} + +#[test] +fn elastic_min_produces_exact_plan() { + let query = r#" +{ + "aggs": { + "by_service": { + "terms": { + "field": "service.keyword" + }, + "aggs": { + "min_cpu": { + "min": { + "field": "cpu_usage" + } + } + } + } + }, + "query": { + "bool": { + "filter": [ + { + "range": { + "@timestamp": { + "gte": "now-5m", + "lte": "now" + } + } + } + ] + } + } +} +"#; + let out = elastic_output("metrics", "\"@timestamp\"", query, 300); + + assert_eq!(out.streaming_aggregation_count(), 1); + assert_eq!(out.inference_query_count(), 1); + assert!(out.has_aggregation_type("MultipleMinMax")); + assert!(!out.has_aggregation_type("DeltaSetAggregator")); + assert!(out.all_tumbling_window_sizes_eq(300)); + assert_eq!( + out.aggregation_table_name("MultipleMinMax"), + Some("metrics".to_string()) + ); + assert_eq!( + out.aggregation_value_column("MultipleMinMax"), + Some("cpu_usage".to_string()) + ); + assert_index_schema(&out, "metrics", "cpu_usage", &["service"]); +} + +#[test] +fn elastic_percentiles_produce_kll_plan() { + let query = r#" +{ + "aggs": { + "by_service": { + "terms": { + "field": "service.keyword" + }, + "aggs": { + "latency_percentiles": { + "percentiles": { + "field": "latency_ms", + "percents": [50.0, 95.0] + } + } + } + } + }, + "query": { + "bool": { + "filter": [ + { + "range": { + "@timestamp": { + "gte": "now-5m", + "lte": "now" + } + } + } + ] + } + } +} +"#; + let out = elastic_output("metrics", "\"@timestamp\"", query, 300); + + assert_eq!(out.streaming_aggregation_count(), 1); + assert_eq!(out.inference_query_count(), 1); + assert!(out.has_aggregation_type("DatasketchesKLL")); + assert!(!out.has_aggregation_type("DeltaSetAggregator")); + assert!(out.all_tumbling_window_sizes_eq(300)); + assert_eq!( + out.aggregation_table_name("DatasketchesKLL"), + Some("metrics".to_string()) + ); + assert_eq!( + out.aggregation_value_column("DatasketchesKLL"), + Some("latency_ms".to_string()) + ); + assert_index_schema(&out, "metrics", "latency_ms", &["service"]); +} + +#[test] +fn elastic_multi_index_schema_inference() { + let yaml = r#" +query_groups: + - id: 1 + index: metrics + time_field: "@timestamp" + queries: + - | + { + "aggs": { + "by_datacenter": { + "terms": { + "field": "datacenter.keyword" + }, + "aggs": { + "avg_cpu": { + "avg": { + "field": "cpu_usage" + } + } + } + } + }, + "query": { + "bool": { + "filter": [ + { + "range": { + "@timestamp": { + "gte": "now-5m", + "lte": "now" + } + } + } + ] + } + } + } + repetition_delay: 300 + controller_options: + accuracy_sla: 0.95 + latency_sla: 1.0 + - id: 2 + index: other_metrics + time_field: "timestamp" + queries: + - | + { + "aggs": { + "by_service": { + "terms": { + "field": "service.keyword" + }, + "aggs": { + "avg_mem": { + "avg": { + "field": "memory_usage" + } + } + } + } + }, + "query": { + "bool": { + "filter": [ + { + "range": { + "timestamp": { + "gte": "now-5m", + "lte": "now" + } + } + } + ] + } + } + } + repetition_delay: 300 + controller_options: + accuracy_sla: 0.95 + latency_sla: 1.0 +aggregate_cleanup: + policy: read_based +"#; + + let mut file = NamedTempFile::new().unwrap(); + file.write_all(yaml.as_bytes()).unwrap(); + + let opts = ElasticRuntimeOptions { + streaming_engine: StreamingEngine::Arroyo, + data_ingestion_interval: 15, + }; + + let c = ElasticController::from_file(Path::new(file.path()), opts).unwrap(); + let out = c.generate().unwrap(); + + assert_eq!(out.streaming_aggregation_count(), 4); + assert_eq!(out.inference_query_count(), 2); + assert!(out.has_aggregation_type("MultipleSum")); + assert!(!out.has_aggregation_type("DeltaSetAggregator")); + assert!(out.all_tumbling_window_sizes_eq(300)); + + let inference_config = out + .to_inference_config(QueryLanguage::elastic_querydsl) + .unwrap(); + + match inference_config.schema { + SchemaConfig::ElasticQueryDSL(schema) => { + assert_eq!( + schema.get_time_field("metrics"), + Some(&"@timestamp".to_string()) + ); + let metric_columns = schema.get_metric_columns("metrics").unwrap(); + assert!(metric_columns.contains("cpu_usage")); + let metadata_columns = schema.get_metadata_columns("metrics").unwrap(); + assert!(metadata_columns.contains("datacenter")); + + assert_eq!( + schema.get_time_field("other_metrics"), + Some(&"timestamp".to_string()) + ); + let other_metric_columns = schema.get_metric_columns("other_metrics").unwrap(); + assert!(other_metric_columns.contains("memory_usage")); + let other_metadata_columns = schema.get_metadata_columns("other_metrics").unwrap(); + assert!(other_metadata_columns.contains("service")); + } + other => panic!("expected elastic querydsl schema, got {:?}", other), + } +} diff --git a/asap-planner-rs/tests/elastic_example.yaml b/asap-planner-rs/tests/elastic_example.yaml new file mode 100644 index 00000000..ac311de7 --- /dev/null +++ b/asap-planner-rs/tests/elastic_example.yaml @@ -0,0 +1,35 @@ +query_groups: + - id: 1 + index: metrics + time_field: "@timestamp" + queries: + - | + { + "aggs": { + "avg_cpu": { + "avg": { + "field": "cpu_usage" + } + } + }, + "query": { + "bool": { + "filter": [ + { + "range": { + "@timestamp": { + "gte": "now-5m", + "lte": "now" + } + } + } + ] + } + } + } + repetition_delay: 300 + controller_options: + accuracy_sla: 0.95 + latency_sla: 1.0 +aggregate_cleanup: + policy: read_based diff --git a/asap-query-engine/src/engines/simple_engine/sql.rs b/asap-query-engine/src/engines/simple_engine/sql.rs index 9ef4dd53..9772bdbc 100644 --- a/asap-query-engine/src/engines/simple_engine/sql.rs +++ b/asap-query-engine/src/engines/simple_engine/sql.rs @@ -218,7 +218,7 @@ impl SimpleEngine { warn!("SQL query requested but config has PromQL schema"); return None; } - &SchemaConfig::ElasticQueryDSL => todo!(), + &SchemaConfig::ElasticQueryDSL(_) => todo!(), SchemaConfig::ElasticSQL(sql_schema) => sql_schema.clone(), }; diff --git a/asap-query-engine/src/tests/test_utilities/config_builders.rs b/asap-query-engine/src/tests/test_utilities/config_builders.rs index ec691272..4bd68ade 100644 --- a/asap-query-engine/src/tests/test_utilities/config_builders.rs +++ b/asap-query-engine/src/tests/test_utilities/config_builders.rs @@ -306,7 +306,7 @@ mod tests { assert!(promql_schema.get_labels("cpu_usage").is_some()); } SchemaConfig::SQL(_) => panic!("Expected PromQL schema"), - SchemaConfig::ElasticQueryDSL => panic!("Expected PromQL schema"), + SchemaConfig::ElasticQueryDSL(_) => panic!("Expected PromQL schema"), SchemaConfig::ElasticSQL(_) => panic!("Expected PromQL schema"), }