Skip to content

Commit e8f97ab

Browse files
feat(query-engine): add backend config for clickhouse and elasticsearch (#323)
1 parent 73dc190 commit e8f97ab

4 files changed

Lines changed: 355 additions & 44 deletions

File tree

asap-query-engine/examples/engine_config.yaml

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,6 @@
1212
output_dir: "./output"
1313
log_level: "INFO" # DEBUG | INFO | WARN | ERROR (also respects RUST_LOG)
1414

15-
# Query language used for both ingest and query parsing.
16-
# Values are case-sensitive: use lowercase exactly as shown.
17-
query_language: "promql" # promql | sql | elastic_querydsl | elastic_sql
18-
1915
# Prometheus scrape interval in seconds. Used by the query tracker and planner.
2016
prometheus_scrape_interval: 15
2117

@@ -33,12 +29,37 @@ do_profiling: false
3329
http_server:
3430
port: 8088
3531

36-
# Prometheus server used for query forwarding and planner context.
37-
prometheus_server: "http://localhost:9090"
32+
# ---------------------------------------------------------------------------
33+
# DB backend — determines the query protocol and optional fallback forwarding.
34+
# Choose exactly one type.
35+
# ---------------------------------------------------------------------------
3836

39-
# When true, queries not answerable from sketches are forwarded to prometheus_server.
40-
# The server must be reachable at startup when this is enabled.
41-
forward_unsupported_queries: false
37+
# Prometheus (default) — exposes a PromQL-compatible HTTP API.
38+
backend:
39+
type: "prometheus"
40+
server: "http://localhost:9090" # used for forwarding and planner context
41+
forward_unsupported_queries: false # when true, server must be reachable at startup
42+
43+
# ClickHouse — exposes an SQL-over-HTTP API.
44+
# backend:
45+
# type: "clickhouse"
46+
# url: "http://localhost:8123"
47+
# database: "default"
48+
# forward_unsupported_queries: false
49+
50+
# Elasticsearch (QueryDSL):
51+
# backend:
52+
# type: "elastic_querydsl"
53+
# url: "http://localhost:9200"
54+
# index: "metrics-*" # required
55+
# forward_unsupported_queries: false
56+
57+
# Elasticsearch (SQL):
58+
# backend:
59+
# type: "elastic_sql"
60+
# url: "http://localhost:9200"
61+
# index: "metrics-*" # required
62+
# forward_unsupported_queries: false
4263

4364
# ---------------------------------------------------------------------------
4465
# Store
@@ -98,7 +119,7 @@ precompute_engine:
98119
dump_precomputes: false # dump received precomputes to output_dir for debugging
99120

100121
# ---------------------------------------------------------------------------
101-
# Query tracker / planner (optional)
122+
# Query tracker / planner (optional — Prometheus backend only)
102123
# ---------------------------------------------------------------------------
103124

104125
query_tracker:

asap-query-engine/src/engine_config.rs

Lines changed: 261 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ pub fn check_config(config: &EngineConfig) -> Result<(), String> {
3333
return Err("prometheus_scrape_interval must be greater than 0".into());
3434
}
3535

36+
if config.query_tracker.enabled && !matches!(config.backend, BackendConfig::Prometheus { .. }) {
37+
return Err("query_tracker.enabled=true requires backend.type=prometheus".into());
38+
}
39+
3640
Ok(())
3741
}
3842

@@ -41,11 +45,11 @@ pub fn check_config(config: &EngineConfig) -> Result<(), String> {
4145
pub struct EngineConfig {
4246
pub output_dir: String,
4347
pub log_level: String,
44-
pub query_language: QueryLanguage,
4548
pub prometheus_scrape_interval: u64,
4649
pub streaming_engine: StreamingEngine,
4750
pub do_profiling: bool,
4851
pub http_server: HttpServerSettings,
52+
pub backend: BackendConfig,
4953
pub store: StoreSettings,
5054
pub ingest: IngestConfig,
5155
pub precompute_engine: PrecomputeSettings,
@@ -60,11 +64,11 @@ impl Default for EngineConfig {
6064
Self {
6165
output_dir: "./output".to_string(),
6266
log_level: "INFO".to_string(),
63-
query_language: QueryLanguage::promql,
6467
prometheus_scrape_interval: 15,
6568
streaming_engine: StreamingEngine::Precompute,
6669
do_profiling: false,
6770
http_server: HttpServerSettings::default(),
71+
backend: BackendConfig::default(),
6872
store: StoreSettings::default(),
6973
ingest: IngestConfig::default(),
7074
precompute_engine: PrecomputeSettings::default(),
@@ -80,20 +84,117 @@ impl Default for EngineConfig {
8084
#[serde(default)]
8185
pub struct HttpServerSettings {
8286
pub port: u16,
83-
pub prometheus_server: String,
84-
pub forward_unsupported_queries: bool,
8587
}
8688

8789
impl Default for HttpServerSettings {
8890
fn default() -> Self {
89-
Self {
90-
port: 8088,
91-
prometheus_server: "http://localhost:9090".to_string(),
91+
Self { port: 8088 }
92+
}
93+
}
94+
95+
/// Which DB backend the query server exposes and optionally forwards to.
96+
#[derive(Debug, serde::Deserialize)]
97+
#[serde(tag = "type", rename_all = "snake_case")]
98+
pub enum BackendConfig {
99+
Prometheus {
100+
/// Prometheus server URL used for query forwarding and planner context.
101+
#[serde(default = "default_prometheus_server")]
102+
server: String,
103+
/// When true, queries not answerable from sketches are forwarded to `server`.
104+
/// The server must be reachable at startup.
105+
#[serde(default)]
106+
forward_unsupported_queries: bool,
107+
},
108+
Clickhouse {
109+
/// ClickHouse HTTP interface base URL.
110+
#[serde(default = "default_clickhouse_url")]
111+
url: String,
112+
/// ClickHouse database name.
113+
#[serde(default = "default_clickhouse_database")]
114+
database: String,
115+
/// When true, queries not answerable from sketches are forwarded to `url`.
116+
#[serde(default)]
117+
forward_unsupported_queries: bool,
118+
},
119+
ElasticQuerydsl {
120+
/// Elasticsearch base URL.
121+
#[serde(default = "default_elastic_url")]
122+
url: String,
123+
/// Elasticsearch index pattern to query.
124+
index: String,
125+
/// When true, queries not answerable from sketches are forwarded to `url`.
126+
#[serde(default)]
127+
forward_unsupported_queries: bool,
128+
},
129+
ElasticSql {
130+
/// Elasticsearch base URL.
131+
#[serde(default = "default_elastic_url")]
132+
url: String,
133+
/// Elasticsearch index pattern to query.
134+
index: String,
135+
/// When true, queries not answerable from sketches are forwarded to `url`.
136+
#[serde(default)]
137+
forward_unsupported_queries: bool,
138+
},
139+
}
140+
141+
impl Default for BackendConfig {
142+
fn default() -> Self {
143+
BackendConfig::Prometheus {
144+
server: default_prometheus_server(),
92145
forward_unsupported_queries: false,
93146
}
94147
}
95148
}
96149

150+
impl BackendConfig {
151+
pub fn query_language(&self) -> QueryLanguage {
152+
match self {
153+
BackendConfig::Prometheus { .. } => QueryLanguage::promql,
154+
BackendConfig::Clickhouse { .. } => QueryLanguage::sql,
155+
BackendConfig::ElasticQuerydsl { .. } => QueryLanguage::elastic_querydsl,
156+
BackendConfig::ElasticSql { .. } => QueryLanguage::elastic_sql,
157+
}
158+
}
159+
160+
pub fn forward_unsupported_queries(&self) -> bool {
161+
match self {
162+
BackendConfig::Prometheus {
163+
forward_unsupported_queries,
164+
..
165+
}
166+
| BackendConfig::Clickhouse {
167+
forward_unsupported_queries,
168+
..
169+
}
170+
| BackendConfig::ElasticQuerydsl {
171+
forward_unsupported_queries,
172+
..
173+
}
174+
| BackendConfig::ElasticSql {
175+
forward_unsupported_queries,
176+
..
177+
} => *forward_unsupported_queries,
178+
}
179+
}
180+
}
181+
182+
fn default_prometheus_server() -> String {
183+
"http://localhost:9090".to_string()
184+
}
185+
186+
fn default_clickhouse_url() -> String {
187+
"http://localhost:8123".to_string()
188+
}
189+
190+
fn default_clickhouse_database() -> String {
191+
"default".to_string()
192+
}
193+
194+
fn default_elastic_url() -> String {
195+
"http://localhost:9200".to_string()
196+
}
197+
97198
#[derive(Debug, serde::Deserialize)]
98199
#[serde(default)]
99200
pub struct StoreSettings {
@@ -374,4 +475,157 @@ output_dir: "./output"
374475
let config: EngineConfig = Figment::new().merge(Yaml::string(yaml)).extract().unwrap();
375476
assert!(check_config(&config).is_err());
376477
}
478+
479+
#[test]
480+
fn backend_defaults_to_prometheus() {
481+
let config: EngineConfig = Figment::new()
482+
.merge(Yaml::string(MINIMAL_YAML))
483+
.extract()
484+
.unwrap();
485+
assert!(matches!(config.backend, BackendConfig::Prometheus { .. }));
486+
assert_eq!(config.backend.query_language(), QueryLanguage::promql);
487+
assert!(!config.backend.forward_unsupported_queries());
488+
}
489+
490+
#[test]
491+
fn backend_clickhouse_parses() {
492+
let yaml = r#"
493+
streaming_engine: "precompute"
494+
ingest:
495+
type: "http_remote_write"
496+
port: 9090
497+
output_dir: "./output"
498+
backend:
499+
type: "clickhouse"
500+
url: "http://clickhouse:8123"
501+
database: "metrics"
502+
"#;
503+
let config: EngineConfig = Figment::new().merge(Yaml::string(yaml)).extract().unwrap();
504+
assert!(matches!(config.backend, BackendConfig::Clickhouse { .. }));
505+
assert_eq!(config.backend.query_language(), QueryLanguage::sql);
506+
assert!(!config.backend.forward_unsupported_queries());
507+
}
508+
509+
#[test]
510+
fn backend_clickhouse_defaults_url_and_database() {
511+
let yaml = r#"
512+
streaming_engine: "precompute"
513+
ingest:
514+
type: "http_remote_write"
515+
port: 9090
516+
output_dir: "./output"
517+
backend:
518+
type: "clickhouse"
519+
"#;
520+
let config: EngineConfig = Figment::new().merge(Yaml::string(yaml)).extract().unwrap();
521+
if let BackendConfig::Clickhouse { url, database, .. } = &config.backend {
522+
assert_eq!(url, "http://localhost:8123");
523+
assert_eq!(database, "default");
524+
} else {
525+
panic!("expected Clickhouse backend");
526+
}
527+
}
528+
529+
#[test]
530+
fn backend_elastic_querydsl_parses() {
531+
let yaml = r#"
532+
streaming_engine: "precompute"
533+
ingest:
534+
type: "http_remote_write"
535+
port: 9090
536+
output_dir: "./output"
537+
backend:
538+
type: "elastic_querydsl"
539+
url: "http://elastic:9200"
540+
index: "metrics-*"
541+
"#;
542+
let config: EngineConfig = Figment::new().merge(Yaml::string(yaml)).extract().unwrap();
543+
assert!(matches!(
544+
config.backend,
545+
BackendConfig::ElasticQuerydsl { .. }
546+
));
547+
assert_eq!(
548+
config.backend.query_language(),
549+
QueryLanguage::elastic_querydsl
550+
);
551+
}
552+
553+
#[test]
554+
fn backend_elastic_sql_parses() {
555+
let yaml = r#"
556+
streaming_engine: "precompute"
557+
ingest:
558+
type: "http_remote_write"
559+
port: 9090
560+
output_dir: "./output"
561+
backend:
562+
type: "elastic_sql"
563+
url: "http://elastic:9200"
564+
index: "metrics-*"
565+
"#;
566+
let config: EngineConfig = Figment::new().merge(Yaml::string(yaml)).extract().unwrap();
567+
assert!(matches!(config.backend, BackendConfig::ElasticSql { .. }));
568+
assert_eq!(config.backend.query_language(), QueryLanguage::elastic_sql);
569+
}
570+
571+
#[test]
572+
fn backend_prometheus_explicit_fields() {
573+
let yaml = r#"
574+
streaming_engine: "precompute"
575+
ingest:
576+
type: "http_remote_write"
577+
port: 9090
578+
output_dir: "./output"
579+
backend:
580+
type: "prometheus"
581+
server: "http://prom:9090"
582+
forward_unsupported_queries: true
583+
"#;
584+
let config: EngineConfig = Figment::new().merge(Yaml::string(yaml)).extract().unwrap();
585+
if let BackendConfig::Prometheus {
586+
server,
587+
forward_unsupported_queries,
588+
} = &config.backend
589+
{
590+
assert_eq!(server, "http://prom:9090");
591+
assert!(forward_unsupported_queries);
592+
} else {
593+
panic!("expected Prometheus backend");
594+
}
595+
assert!(config.backend.forward_unsupported_queries());
596+
}
597+
598+
#[test]
599+
fn check_config_rejects_query_tracker_with_non_prometheus_backend() {
600+
let yaml = r#"
601+
streaming_engine: "precompute"
602+
ingest:
603+
type: "http_remote_write"
604+
port: 9090
605+
output_dir: "./output"
606+
backend:
607+
type: "clickhouse"
608+
query_tracker:
609+
enabled: true
610+
"#;
611+
let config: EngineConfig = Figment::new().merge(Yaml::string(yaml)).extract().unwrap();
612+
assert!(check_config(&config).is_err());
613+
}
614+
615+
#[test]
616+
fn check_config_allows_query_tracker_with_prometheus_backend() {
617+
let yaml = r#"
618+
streaming_engine: "precompute"
619+
ingest:
620+
type: "http_remote_write"
621+
port: 9090
622+
output_dir: "./output"
623+
backend:
624+
type: "prometheus"
625+
query_tracker:
626+
enabled: true
627+
"#;
628+
let config: EngineConfig = Figment::new().merge(Yaml::string(yaml)).extract().unwrap();
629+
assert!(check_config(&config).is_ok());
630+
}
377631
}

0 commit comments

Comments
 (0)