|
1 | 1 | use indexmap::IndexMap; |
2 | | -use serde_json::Value as JsonValue; |
3 | 2 | use serde_yaml::Value as YamlValue; |
4 | 3 | use std::collections::HashMap; |
5 | 4 |
|
6 | 5 | use asap_types::enums::CleanupPolicy; |
7 | 6 | use asap_types::PromQLSchema; |
8 | | -use promql_utilities::data_model::KeyByLabelNames; |
9 | 7 |
|
10 | 8 | use crate::config::input::ControllerConfig; |
11 | 9 | use crate::error::ControllerError; |
| 10 | +use crate::generator::{ |
| 11 | + build_aggregation_entry, build_queries_yaml, key_by_labels_to_yaml, GeneratorOutput, |
| 12 | + PuntedQuery, KEY_AGGREGATIONS, KEY_CLEANUP_POLICY, KEY_METRICS, KEY_NAME, KEY_QUERIES, |
| 13 | +}; |
12 | 14 | use crate::planner::agg_config::IntermediateAggConfig; |
13 | 15 | use crate::planner::promql::{BinaryArm, SingleQueryProcessor}; |
14 | 16 | use crate::RuntimeOptions; |
15 | 17 |
|
16 | | -// YAML key constants — shared with sql_generator.rs and lib.rs via pub(crate) |
17 | | -pub(crate) const KEY_AGG_ID: &str = "aggregationId"; |
18 | | -pub(crate) const KEY_AGG_SUB_TYPE: &str = "aggregationSubType"; |
19 | | -pub(crate) const KEY_AGG_TYPE: &str = "aggregationType"; |
20 | | -pub(crate) const KEY_AGGREGATION_ID: &str = "aggregation_id"; |
21 | | -pub(crate) const KEY_AGGREGATIONS: &str = "aggregations"; |
22 | | -pub(crate) const KEY_CLEANUP_POLICY: &str = "cleanup_policy"; |
23 | | -pub(crate) const KEY_LABELS: &str = "labels"; |
24 | | -pub(crate) const KEY_LABELS_AGGREGATED: &str = "aggregated"; |
25 | | -pub(crate) const KEY_LABELS_GROUPING: &str = "grouping"; |
26 | | -pub(crate) const KEY_LABELS_ROLLUP: &str = "rollup"; |
27 | | -pub(crate) const KEY_METADATA_COLUMNS: &str = "metadata_columns"; |
28 | | -pub(crate) const KEY_METRIC: &str = "metric"; |
29 | | -pub(crate) const KEY_METRICS: &str = "metrics"; |
30 | | -pub(crate) const KEY_NAME: &str = "name"; |
31 | | -pub(crate) const KEY_NUM_AGG_TO_RETAIN: &str = "num_aggregates_to_retain"; |
32 | | -pub(crate) const KEY_PARAMETERS: &str = "parameters"; |
33 | | -pub(crate) const KEY_QUERIES: &str = "queries"; |
34 | | -pub(crate) const KEY_QUERY: &str = "query"; |
35 | | -pub(crate) const KEY_READ_COUNT_THRESHOLD: &str = "read_count_threshold"; |
36 | | -pub(crate) const KEY_SLIDE_INTERVAL: &str = "slideInterval"; |
37 | | -pub(crate) const KEY_SPATIAL_FILTER: &str = "spatialFilter"; |
38 | | -pub(crate) const KEY_TABLE_NAME: &str = "table_name"; |
39 | | -pub(crate) const KEY_TABLES: &str = "tables"; |
40 | | -pub(crate) const KEY_TIME_COLUMN: &str = "time_column"; |
41 | | -pub(crate) const KEY_VALUE_COLUMN: &str = "value_column"; |
42 | | -pub(crate) const KEY_VALUE_COLUMNS: &str = "value_columns"; |
43 | | -pub(crate) const KEY_WINDOW_SIZE: &str = "windowSize"; |
44 | | -pub(crate) const KEY_WINDOW_TYPE: &str = "windowType"; |
45 | | - |
46 | 18 | /// `(query_string, Vec<(identifying_key, cleanup_param)>)` pairs produced by binary leaf decomposition. |
47 | 19 | type LeafEntries = Vec<(String, Vec<(String, Option<u64>)>)>; |
48 | 20 |
|
@@ -212,171 +184,6 @@ fn collect_binary_leaf_entries( |
212 | 184 | Ok(Some(all_entries)) |
213 | 185 | } |
214 | 186 |
|
215 | | -pub fn key_by_labels_to_yaml(labels: &KeyByLabelNames) -> YamlValue { |
216 | | - YamlValue::Sequence( |
217 | | - labels |
218 | | - .labels |
219 | | - .iter() |
220 | | - .map(|l| YamlValue::String(l.clone())) |
221 | | - .collect(), |
222 | | - ) |
223 | | -} |
224 | | - |
225 | | -pub fn build_aggregation_entry(id: u32, cfg: &IntermediateAggConfig) -> YamlValue { |
226 | | - let mut map = serde_yaml::Mapping::new(); |
227 | | - map.insert( |
228 | | - YamlValue::String(KEY_AGG_ID.to_string()), |
229 | | - YamlValue::Number(id.into()), |
230 | | - ); |
231 | | - map.insert( |
232 | | - YamlValue::String(KEY_AGG_SUB_TYPE.to_string()), |
233 | | - YamlValue::String(cfg.aggregation_sub_type.clone()), |
234 | | - ); |
235 | | - map.insert( |
236 | | - YamlValue::String(KEY_AGG_TYPE.to_string()), |
237 | | - YamlValue::String(cfg.aggregation_type.to_string()), |
238 | | - ); |
239 | | - |
240 | | - let mut labels_map = serde_yaml::Mapping::new(); |
241 | | - labels_map.insert( |
242 | | - YamlValue::String(KEY_LABELS_AGGREGATED.to_string()), |
243 | | - key_by_labels_to_yaml(&cfg.aggregated_labels), |
244 | | - ); |
245 | | - labels_map.insert( |
246 | | - YamlValue::String(KEY_LABELS_GROUPING.to_string()), |
247 | | - key_by_labels_to_yaml(&cfg.grouping_labels), |
248 | | - ); |
249 | | - labels_map.insert( |
250 | | - YamlValue::String(KEY_LABELS_ROLLUP.to_string()), |
251 | | - key_by_labels_to_yaml(&cfg.rollup_labels), |
252 | | - ); |
253 | | - map.insert( |
254 | | - YamlValue::String(KEY_LABELS.to_string()), |
255 | | - YamlValue::Mapping(labels_map), |
256 | | - ); |
257 | | - |
258 | | - map.insert( |
259 | | - YamlValue::String(KEY_METRIC.to_string()), |
260 | | - YamlValue::String(cfg.metric.clone()), |
261 | | - ); |
262 | | - map.insert( |
263 | | - YamlValue::String(KEY_PARAMETERS.to_string()), |
264 | | - params_to_yaml(&cfg.parameters), |
265 | | - ); |
266 | | - map.insert( |
267 | | - YamlValue::String(KEY_SLIDE_INTERVAL.to_string()), |
268 | | - YamlValue::Number(cfg.slide_interval.into()), |
269 | | - ); |
270 | | - map.insert( |
271 | | - YamlValue::String(KEY_SPATIAL_FILTER.to_string()), |
272 | | - YamlValue::String(cfg.spatial_filter.clone()), |
273 | | - ); |
274 | | - map.insert( |
275 | | - YamlValue::String(KEY_TABLE_NAME.to_string()), |
276 | | - match &cfg.table_name { |
277 | | - Some(t) => YamlValue::String(t.clone()), |
278 | | - None => YamlValue::Null, |
279 | | - }, |
280 | | - ); |
281 | | - map.insert( |
282 | | - YamlValue::String(KEY_VALUE_COLUMN.to_string()), |
283 | | - match &cfg.value_column { |
284 | | - Some(v) => YamlValue::String(v.clone()), |
285 | | - None => YamlValue::Null, |
286 | | - }, |
287 | | - ); |
288 | | - map.insert( |
289 | | - YamlValue::String(KEY_WINDOW_SIZE.to_string()), |
290 | | - YamlValue::Number(cfg.window_size.into()), |
291 | | - ); |
292 | | - map.insert( |
293 | | - YamlValue::String(KEY_WINDOW_TYPE.to_string()), |
294 | | - YamlValue::String(cfg.window_type.to_string()), |
295 | | - ); |
296 | | - |
297 | | - YamlValue::Mapping(map) |
298 | | -} |
299 | | - |
300 | | -pub fn build_queries_yaml( |
301 | | - cleanup_policy: CleanupPolicy, |
302 | | - query_keys_map: &IndexMap<String, Vec<(String, Option<u64>)>>, |
303 | | - id_map: &HashMap<String, u32>, |
304 | | -) -> Vec<YamlValue> { |
305 | | - query_keys_map |
306 | | - .iter() |
307 | | - .map(|(query_str, keys)| { |
308 | | - let aggregations: Vec<YamlValue> = keys |
309 | | - .iter() |
310 | | - .map(|(key, cleanup_param)| { |
311 | | - let agg_id = id_map[key]; |
312 | | - let mut agg_map = serde_yaml::Mapping::new(); |
313 | | - agg_map.insert( |
314 | | - YamlValue::String(KEY_AGGREGATION_ID.to_string()), |
315 | | - YamlValue::Number(agg_id.into()), |
316 | | - ); |
317 | | - if let Some(param) = cleanup_param { |
318 | | - match cleanup_policy { |
319 | | - CleanupPolicy::CircularBuffer => { |
320 | | - agg_map.insert( |
321 | | - YamlValue::String(KEY_NUM_AGG_TO_RETAIN.to_string()), |
322 | | - YamlValue::Number((*param).into()), |
323 | | - ); |
324 | | - } |
325 | | - CleanupPolicy::ReadBased => { |
326 | | - agg_map.insert( |
327 | | - YamlValue::String(KEY_READ_COUNT_THRESHOLD.to_string()), |
328 | | - YamlValue::Number((*param).into()), |
329 | | - ); |
330 | | - } |
331 | | - CleanupPolicy::NoCleanup => {} |
332 | | - } |
333 | | - } |
334 | | - YamlValue::Mapping(agg_map) |
335 | | - }) |
336 | | - .collect(); |
337 | | - |
338 | | - let mut q_map = serde_yaml::Mapping::new(); |
339 | | - q_map.insert( |
340 | | - YamlValue::String(KEY_AGGREGATIONS.to_string()), |
341 | | - YamlValue::Sequence(aggregations), |
342 | | - ); |
343 | | - q_map.insert( |
344 | | - YamlValue::String(KEY_QUERY.to_string()), |
345 | | - YamlValue::String(query_str.clone()), |
346 | | - ); |
347 | | - YamlValue::Mapping(q_map) |
348 | | - }) |
349 | | - .collect() |
350 | | -} |
351 | | - |
352 | | -pub fn params_to_yaml(params: &HashMap<String, JsonValue>) -> YamlValue { |
353 | | - if params.is_empty() { |
354 | | - return YamlValue::Mapping(serde_yaml::Mapping::new()); |
355 | | - } |
356 | | - let mut map = serde_yaml::Mapping::new(); |
357 | | - // Sort for determinism |
358 | | - let mut sorted: Vec<_> = params.iter().collect(); |
359 | | - sorted.sort_by_key(|(k, _)| k.as_str()); |
360 | | - for (k, v) in sorted { |
361 | | - let yaml_val = match v { |
362 | | - JsonValue::Number(n) => { |
363 | | - if let Some(i) = n.as_u64() { |
364 | | - YamlValue::Number(serde_yaml::Number::from(i)) |
365 | | - } else if let Some(f) = n.as_f64() { |
366 | | - YamlValue::Number(serde_yaml::Number::from(f)) |
367 | | - } else { |
368 | | - YamlValue::String(n.to_string()) |
369 | | - } |
370 | | - } |
371 | | - JsonValue::String(s) => YamlValue::String(s.clone()), |
372 | | - JsonValue::Bool(b) => YamlValue::Bool(*b), |
373 | | - other => YamlValue::String(other.to_string()), |
374 | | - }; |
375 | | - map.insert(YamlValue::String(k.clone()), yaml_val); |
376 | | - } |
377 | | - YamlValue::Mapping(map) |
378 | | -} |
379 | | - |
380 | 187 | fn build_streaming_yaml( |
381 | 188 | dedup_map: &IndexMap<String, IntermediateAggConfig>, |
382 | 189 | id_map: &HashMap<String, u32>, |
@@ -448,16 +255,3 @@ fn build_inference_yaml( |
448 | 255 |
|
449 | 256 | Ok(YamlValue::Mapping(root)) |
450 | 257 | } |
451 | | - |
452 | | -#[derive(Debug, Clone)] |
453 | | -pub struct PuntedQuery { |
454 | | - pub query: String, |
455 | | -} |
456 | | - |
457 | | -pub struct GeneratorOutput { |
458 | | - pub punted_queries: Vec<PuntedQuery>, |
459 | | - pub streaming_yaml: YamlValue, |
460 | | - pub inference_yaml: YamlValue, |
461 | | - pub aggregation_count: usize, |
462 | | - pub query_count: usize, |
463 | | -} |
0 commit comments