Skip to content

Commit 4fad4d3

Browse files
Moved some files from asa-query-engine to asap-common (#171)
* Added many crates from asa-common to Cargo workspace * Moved some files from asa-query-engine to asap-common * formatting * removed unnecessary method
1 parent 197c4be commit 4fad4d3

12 files changed

Lines changed: 367 additions & 342 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

asap-common/dependencies/rs/promql_utilities/src/query_logics/logics.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ pub fn map_statistic_to_precompute_operator(
4747
Statistic::Rate | Statistic::Increase => {
4848
Ok(("MultipleIncrease".to_string(), "".to_string()))
4949
}
50+
Statistic::Topk => Ok(("CountMinSketchWithHeap".to_string(), "topk".to_string())),
5051
_ => Err(format!("Statistic {statistic:?} not supported")),
5152
}
5253
}
@@ -157,6 +158,17 @@ mod tests {
157158
));
158159
}
159160

161+
#[test]
162+
fn test_topk_maps_to_count_min_sketch_with_heap() {
163+
let result =
164+
map_statistic_to_precompute_operator(Statistic::Topk, QueryTreatmentType::Approximate)
165+
.unwrap();
166+
assert_eq!(
167+
result,
168+
("CountMinSketchWithHeap".to_string(), "topk".to_string())
169+
);
170+
}
171+
160172
#[test]
161173
fn test_get_is_collapsable() {
162174
assert!(get_is_collapsable("sum_over_time", "sum"));

asap-common/dependencies/rs/sketch_db_common/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ edition.workspace = true
55

66
[dependencies]
77
promql_utilities.workspace = true
8+
sql_utilities.workspace = true
89
serde.workspace = true
910
serde_json.workspace = true
1011
serde_yaml.workspace = true
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
use serde::{Deserialize, Serialize};
2+
3+
#[derive(Debug, Clone, Serialize, Deserialize)]
4+
pub struct AggregationReference {
5+
pub aggregation_id: u64,
6+
/// For circular_buffer policy: keep this many most recent aggregates
7+
#[serde(skip_serializing_if = "Option::is_none")]
8+
pub num_aggregates_to_retain: Option<u64>,
9+
/// For read_based policy: remove aggregate after this many reads
10+
#[serde(skip_serializing_if = "Option::is_none")]
11+
pub read_count_threshold: Option<u64>,
12+
}
13+
14+
impl AggregationReference {
15+
pub fn new(aggregation_id: u64, num_aggregates_to_retain: Option<u64>) -> Self {
16+
Self {
17+
aggregation_id,
18+
num_aggregates_to_retain,
19+
read_count_threshold: None,
20+
}
21+
}
22+
23+
pub fn with_read_count_threshold(
24+
aggregation_id: u64,
25+
read_count_threshold: Option<u64>,
26+
) -> Self {
27+
Self {
28+
aggregation_id,
29+
num_aggregates_to_retain: None,
30+
read_count_threshold,
31+
}
32+
}
33+
}
Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
use anyhow::Result;
2+
use serde_yaml::Value;
3+
use std::collections::HashSet;
4+
use std::fs::File;
5+
use std::io::BufReader;
6+
7+
use crate::aggregation_reference::AggregationReference;
8+
use crate::enums::{CleanupPolicy, QueryLanguage};
9+
use crate::promql_schema::PromQLSchema;
10+
use crate::query_config::QueryConfig;
11+
use promql_utilities::data_model::KeyByLabelNames;
12+
use sql_utilities::sqlhelper::{SQLSchema, Table};
13+
14+
/// Schema configuration that can be either PromQL or SQL format
15+
#[derive(Debug, Clone)]
16+
pub enum SchemaConfig {
17+
PromQL(PromQLSchema),
18+
SQL(SQLSchema),
19+
ElasticQueryDSL,
20+
ElasticSQL,
21+
}
22+
23+
#[derive(Debug, Clone)]
24+
pub struct InferenceConfig {
25+
pub schema: SchemaConfig,
26+
pub query_configs: Vec<QueryConfig>,
27+
pub cleanup_policy: CleanupPolicy,
28+
}
29+
30+
impl InferenceConfig {
31+
pub fn new(query_language: QueryLanguage, cleanup_policy: CleanupPolicy) -> Self {
32+
let schema = match query_language {
33+
QueryLanguage::promql => SchemaConfig::PromQL(PromQLSchema::new()),
34+
QueryLanguage::sql => SchemaConfig::SQL(SQLSchema::new(Vec::new())),
35+
QueryLanguage::elastic_querydsl => SchemaConfig::ElasticQueryDSL,
36+
QueryLanguage::elastic_sql => SchemaConfig::ElasticSQL,
37+
};
38+
Self {
39+
schema,
40+
query_configs: Vec::new(),
41+
cleanup_policy,
42+
}
43+
}
44+
45+
pub fn from_yaml_file(yaml_file: &str, query_language: QueryLanguage) -> Result<Self> {
46+
let file = File::open(yaml_file)?;
47+
let reader = BufReader::new(file);
48+
let data: Value = serde_yaml::from_reader(reader)?;
49+
50+
Self::from_yaml_data(&data, query_language)
51+
}
52+
53+
pub fn from_yaml_data(data: &Value, query_language: QueryLanguage) -> Result<Self> {
54+
let schema = match query_language {
55+
QueryLanguage::promql => {
56+
let promql_schema = Self::parse_promql_schema(data)?;
57+
SchemaConfig::PromQL(promql_schema)
58+
}
59+
QueryLanguage::sql => {
60+
let sql_schema = Self::parse_sql_schema(data)?;
61+
SchemaConfig::SQL(sql_schema)
62+
}
63+
QueryLanguage::elastic_querydsl => SchemaConfig::ElasticQueryDSL,
64+
QueryLanguage::elastic_sql => SchemaConfig::ElasticSQL,
65+
};
66+
67+
let cleanup_policy = Self::parse_cleanup_policy(data)?;
68+
let query_configs = Self::parse_query_configs(data, cleanup_policy)?;
69+
70+
Ok(Self {
71+
schema,
72+
query_configs,
73+
cleanup_policy,
74+
})
75+
}
76+
77+
/// Parse PromQL schema from YAML data (metrics: key)
78+
fn parse_promql_schema(data: &Value) -> Result<PromQLSchema> {
79+
let mut promql_schema = PromQLSchema::new();
80+
if let Some(metrics) = data.get("metrics") {
81+
if let Some(metrics_map) = metrics.as_mapping() {
82+
for (metric_name_val, labels_val) in metrics_map {
83+
if let (Some(metric_name), Some(labels_seq)) =
84+
(metric_name_val.as_str(), labels_val.as_sequence())
85+
{
86+
let labels: Vec<String> = labels_seq
87+
.iter()
88+
.filter_map(|v| v.as_str())
89+
.map(|s| s.to_string())
90+
.collect();
91+
let key_by_label_names = KeyByLabelNames::new(labels);
92+
promql_schema =
93+
promql_schema.add_metric(metric_name.to_string(), key_by_label_names);
94+
}
95+
}
96+
}
97+
}
98+
Ok(promql_schema)
99+
}
100+
101+
/// Parse SQL schema from YAML data (tables: key at top level, matching ArroyoSketch format)
102+
fn parse_sql_schema(data: &Value) -> Result<SQLSchema> {
103+
let tables_data = data
104+
.get("tables")
105+
.and_then(|v| v.as_sequence())
106+
.ok_or_else(|| {
107+
anyhow::anyhow!("Missing or invalid tables field for SQL query language")
108+
})?;
109+
110+
let mut tables = Vec::new();
111+
for table_data in tables_data {
112+
let name = table_data
113+
.get("name")
114+
.and_then(|v| v.as_str())
115+
.ok_or_else(|| anyhow::anyhow!("Missing name field in table"))?
116+
.to_string();
117+
118+
let time_column = table_data
119+
.get("time_column")
120+
.and_then(|v| v.as_str())
121+
.ok_or_else(|| anyhow::anyhow!("Missing time_column field in table {}", name))?
122+
.to_string();
123+
124+
let value_columns: HashSet<String> = table_data
125+
.get("value_columns")
126+
.and_then(|v| v.as_sequence())
127+
.ok_or_else(|| anyhow::anyhow!("Missing value_columns field in table {}", name))?
128+
.iter()
129+
.filter_map(|v| v.as_str())
130+
.map(|s| s.to_string())
131+
.collect();
132+
133+
let metadata_columns: HashSet<String> = table_data
134+
.get("metadata_columns")
135+
.and_then(|v| v.as_sequence())
136+
.ok_or_else(|| anyhow::anyhow!("Missing metadata_columns field in table {}", name))?
137+
.iter()
138+
.filter_map(|v| v.as_str())
139+
.map(|s| s.to_string())
140+
.collect();
141+
142+
tables.push(Table::new(
143+
name,
144+
time_column,
145+
value_columns,
146+
metadata_columns,
147+
));
148+
}
149+
150+
Ok(SQLSchema::new(tables))
151+
}
152+
153+
/// Parse cleanup policy from YAML data. Errors if not specified.
154+
fn parse_cleanup_policy(data: &Value) -> Result<CleanupPolicy> {
155+
let cleanup_policy_data = data.get("cleanup_policy").ok_or_else(|| {
156+
anyhow::anyhow!(
157+
"Missing cleanup_policy section in inference_config.yaml. \
158+
Must specify cleanup_policy.name as one of: circular_buffer, read_based, no_cleanup"
159+
)
160+
})?;
161+
162+
let name = cleanup_policy_data
163+
.get("name")
164+
.and_then(|v| v.as_str())
165+
.ok_or_else(|| {
166+
anyhow::anyhow!(
167+
"Missing cleanup_policy.name in inference_config.yaml. \
168+
Must be one of: circular_buffer, read_based, no_cleanup"
169+
)
170+
})?;
171+
172+
match name {
173+
"circular_buffer" => Ok(CleanupPolicy::CircularBuffer),
174+
"read_based" => Ok(CleanupPolicy::ReadBased),
175+
"no_cleanup" => Ok(CleanupPolicy::NoCleanup),
176+
_ => Err(anyhow::anyhow!(
177+
"Invalid cleanup policy: '{}'. Valid options: circular_buffer, read_based, no_cleanup",
178+
name
179+
)),
180+
}
181+
}
182+
183+
fn parse_query_configs(
184+
data: &Value,
185+
cleanup_policy: CleanupPolicy,
186+
) -> Result<Vec<QueryConfig>> {
187+
let query_configs = if let Some(queries) = data.get("queries").and_then(|v| v.as_sequence())
188+
{
189+
let mut configs = Vec::new();
190+
for query_data in queries {
191+
let query = query_data
192+
.get("query")
193+
.and_then(|v| v.as_str())
194+
.ok_or_else(|| anyhow::anyhow!("Missing query field"))?
195+
.to_string();
196+
197+
let aggregations = if let Some(aggregations_data) =
198+
query_data.get("aggregations").and_then(|v| v.as_sequence())
199+
{
200+
let mut agg_refs = Vec::new();
201+
for agg_data in aggregations_data {
202+
let aggregation_id = agg_data
203+
.get("aggregation_id")
204+
.and_then(|v| v.as_u64())
205+
.ok_or_else(|| {
206+
anyhow::anyhow!("Missing aggregation_id in aggregation")
207+
})?;
208+
209+
let agg_ref = match cleanup_policy {
210+
CleanupPolicy::CircularBuffer => {
211+
let num_aggregates_to_retain = agg_data
212+
.get("num_aggregates_to_retain")
213+
.and_then(|v| v.as_u64());
214+
AggregationReference::new(aggregation_id, num_aggregates_to_retain)
215+
}
216+
CleanupPolicy::ReadBased => {
217+
let read_count_threshold = agg_data
218+
.get("read_count_threshold")
219+
.and_then(|v| v.as_u64());
220+
AggregationReference::with_read_count_threshold(
221+
aggregation_id,
222+
read_count_threshold,
223+
)
224+
}
225+
CleanupPolicy::NoCleanup => {
226+
AggregationReference::new(aggregation_id, None)
227+
}
228+
};
229+
agg_refs.push(agg_ref);
230+
}
231+
agg_refs
232+
} else {
233+
Vec::new()
234+
};
235+
236+
let config = QueryConfig::new(query).with_aggregations(aggregations);
237+
configs.push(config);
238+
}
239+
configs
240+
} else {
241+
Vec::new()
242+
};
243+
Ok(query_configs)
244+
}
245+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,15 @@
11
pub mod aggregation_config;
2+
pub mod aggregation_reference;
23
pub mod enums;
4+
pub mod inference_config;
5+
pub mod promql_schema;
6+
pub mod query_config;
37
pub mod traits;
48
pub mod utils;
9+
10+
pub use aggregation_config::*;
11+
pub use aggregation_reference::*;
12+
pub use enums::*;
13+
pub use inference_config::*;
14+
pub use promql_schema::*;
15+
pub use query_config::*;
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
use serde::{Deserialize, Serialize};
2+
use std::collections::HashMap;
3+
4+
use promql_utilities::data_model::KeyByLabelNames;
5+
6+
#[derive(Debug, Clone, Serialize, Deserialize)]
7+
pub struct PromQLSchema {
8+
pub config: HashMap<String, KeyByLabelNames>,
9+
}
10+
11+
impl PromQLSchema {
12+
pub fn new() -> Self {
13+
Self {
14+
config: HashMap::new(),
15+
}
16+
}
17+
18+
pub fn add_metric(mut self, metric: String, labels: KeyByLabelNames) -> Self {
19+
self.config.insert(metric, labels);
20+
self
21+
}
22+
23+
pub fn get_labels(&self, metric: &str) -> Option<&KeyByLabelNames> {
24+
self.config.get(metric)
25+
}
26+
}
27+
28+
impl Default for PromQLSchema {
29+
fn default() -> Self {
30+
Self::new()
31+
}
32+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
use serde::{Deserialize, Serialize};
2+
3+
use crate::aggregation_reference::AggregationReference;
4+
5+
#[derive(Debug, Clone, Serialize, Deserialize)]
6+
pub struct QueryConfig {
7+
pub query: String,
8+
pub aggregations: Vec<AggregationReference>,
9+
}
10+
11+
impl QueryConfig {
12+
pub fn new(query: String) -> Self {
13+
Self {
14+
query,
15+
aggregations: Vec::new(),
16+
}
17+
}
18+
19+
pub fn add_aggregation(mut self, aggregation: AggregationReference) -> Self {
20+
self.aggregations.push(aggregation);
21+
self
22+
}
23+
24+
pub fn with_aggregations(mut self, aggregations: Vec<AggregationReference>) -> Self {
25+
self.aggregations = aggregations;
26+
self
27+
}
28+
}

0 commit comments

Comments
 (0)