diff --git a/asap-planner-rs/src/clickhouse_client.rs b/asap-planner-rs/src/clickhouse_client.rs new file mode 100644 index 00000000..a497a682 --- /dev/null +++ b/asap-planner-rs/src/clickhouse_client.rs @@ -0,0 +1,131 @@ +use std::collections::HashSet; +use std::thread; +use std::time::Duration; + +use serde::Deserialize; +use tracing::{debug, warn}; + +use crate::error::ControllerError; + +const MAX_RETRIES: u32 = 15; +const RETRY_DELAY: Duration = Duration::from_secs(2); + +#[derive(Deserialize)] +struct ColumnRow { + name: String, + #[serde(rename = "type")] + column_type: String, +} + +/// Fetch `(name, type)` pairs for all columns in `database.table` via the +/// ClickHouse HTTP API (`system.columns`). +fn fetch_columns_for_table( + clickhouse_url: &str, + database: &str, + table: &str, +) -> Result, ControllerError> { + let base_url = clickhouse_url.trim_end_matches('/'); + let sql = format!( + "SELECT name, type FROM system.columns WHERE database = '{}' AND table = '{}'", + database, table + ); + let client = reqwest::blocking::Client::new(); + + for attempt in 1..=MAX_RETRIES { + let response = client + .get(base_url) + .query(&[("query", sql.as_str()), ("default_format", "JSONEachRow")]) + .send() + .map_err(|e| { + ControllerError::ClickHouseClient(format!( + "HTTP request failed for table '{}.{}': {}", + database, table, e + )) + })?; + + let status = response.status(); + + if status == reqwest::StatusCode::SERVICE_UNAVAILABLE { + warn!( + "ClickHouse returned 503 for table '{}.{}' (attempt {}/{}); retrying in {}s", + database, + table, + attempt, + MAX_RETRIES, + RETRY_DELAY.as_secs(), + ); + thread::sleep(RETRY_DELAY); + continue; + } + + if !status.is_success() { + return Err(ControllerError::ClickHouseClient(format!( + "ClickHouse returned HTTP {} for table '{}.{}'", + status, database, table + ))); + } + + let body = response.text().map_err(|e| { + ControllerError::ClickHouseClient(format!( + "Failed to read ClickHouse response for table '{}.{}': {}", + database, table, e + )) + })?; + + let mut columns = Vec::new(); + for line in body.lines() { + let row: ColumnRow = serde_json::from_str(line).map_err(|e| { + ControllerError::ClickHouseClient(format!( + "Failed to parse ClickHouse column row {:?}: {}", + line, e + )) + })?; + columns.push((row.name, row.column_type)); + } + + debug!( + "Fetched {} columns for table '{}.{}'", + columns.len(), + database, + table + ); + return Ok(columns); + } + + Err(ControllerError::ClickHouseClient(format!( + "ClickHouse returned 503 for table '{}.{}' after {} attempts; giving up", + database, table, MAX_RETRIES + ))) +} + +/// Query `system.columns` and return all column names that are not the time +/// column or one of the value columns, sorted alphabetically. +/// +/// These are the metadata (dimension) columns the planner uses for rollup, +/// analogous to PromQL label sets discovered from Prometheus. +pub fn infer_metadata_columns( + clickhouse_url: &str, + database: &str, + table_name: &str, + time_column: &str, + value_columns: &[String], +) -> Result, ControllerError> { + let all_columns = fetch_columns_for_table(clickhouse_url, database, table_name)?; + + let exclude: HashSet<&str> = std::iter::once(time_column) + .chain(value_columns.iter().map(String::as_str)) + .collect(); + + let mut metadata: Vec = all_columns + .into_iter() + .map(|(name, _)| name) + .filter(|name| !exclude.contains(name.as_str())) + .collect(); + metadata.sort(); + + debug!( + "Inferred metadata columns for table '{}': {:?}", + table_name, metadata + ); + Ok(metadata) +} diff --git a/asap-planner-rs/src/config/input.rs b/asap-planner-rs/src/config/input.rs index bba96cab..6143493a 100644 --- a/asap-planner-rs/src/config/input.rs +++ b/asap-planner-rs/src/config/input.rs @@ -142,6 +142,7 @@ pub struct TableDefinition { pub name: String, pub time_column: String, pub value_columns: Vec, + #[serde(default)] pub metadata_columns: Vec, } diff --git a/asap-planner-rs/src/error.rs b/asap-planner-rs/src/error.rs index 873c25c7..f257f41d 100644 --- a/asap-planner-rs/src/error.rs +++ b/asap-planner-rs/src/error.rs @@ -20,6 +20,8 @@ pub enum ControllerError { UnknownTable(String), #[error("Prometheus client error: {0}")] PrometheusClient(String), + #[error("ClickHouse client error: {0}")] + ClickHouseClient(String), #[error("Elasticsearch DSL parse error: {0}")] ElasticDSLParse(String), #[error("Unsupported Elasticsearch DSL query: {0}")] diff --git a/asap-planner-rs/src/lib.rs b/asap-planner-rs/src/lib.rs index 1adbce23..40ca3bf9 100644 --- a/asap-planner-rs/src/lib.rs +++ b/asap-planner-rs/src/lib.rs @@ -1,3 +1,4 @@ +pub mod clickhouse_client; pub mod config; pub mod elastic_dsl; pub mod error; diff --git a/asap-planner-rs/src/main.rs b/asap-planner-rs/src/main.rs index 2035dada..ccb96003 100644 --- a/asap-planner-rs/src/main.rs +++ b/asap-planner-rs/src/main.rs @@ -48,6 +48,14 @@ struct Args { #[arg(long = "data-ingestion-interval", required = false)] data_ingestion_interval: Option, + /// ClickHouse base URL for auto-inferring metadata_columns when not listed + /// in the config file. Example: http://localhost:8123 + #[arg(long = "clickhouse-url", required = false)] + clickhouse_url: Option, + + #[arg(long = "clickhouse-database", required = false)] + clickhouse_database: Option, + #[arg(short, long, action = clap::ArgAction::Count)] verbose: u8, } @@ -126,7 +134,16 @@ fn main() -> anyhow::Result<()> { query_evaluation_time: None, data_ingestion_interval: interval, }; - SQLController::from_file(&config_path, opts)?.generate_to_dir(&args.output_dir)?; + let controller = match args.clickhouse_url { + Some(ref url) => SQLController::from_file_with_discovery( + &config_path, + url, + args.clickhouse_database.as_deref().unwrap_or("default"), + opts, + )?, + None => SQLController::from_file(&config_path, opts)?, + }; + controller.generate_to_dir(&args.output_dir)?; } QueryLanguage::elastic_querydsl => { let interval = args.data_ingestion_interval.ok_or_else(|| { diff --git a/asap-planner-rs/src/sql/controller.rs b/asap-planner-rs/src/sql/controller.rs index fb4f5aaa..a1ebeb47 100644 --- a/asap-planner-rs/src/sql/controller.rs +++ b/asap-planner-rs/src/sql/controller.rs @@ -1,6 +1,9 @@ use std::path::Path; +use tracing::debug; + use super::generator; +use crate::clickhouse_client; use crate::config::input::SQLControllerConfig; use crate::error::ControllerError; use crate::planner_output::PlannerOutput; @@ -21,6 +24,47 @@ impl SQLController { Self::from_yaml(&yaml_str, opts) } + /// Build a `SQLController` from a config file, filling in any empty + /// `metadata_columns` via auto-discovery from the ClickHouse HTTP API. + /// + /// Mirrors `promql::Controller::from_file`, which fetches label sets from + /// Prometheus. Tables whose `metadata_columns` is already populated in the + /// config are left untouched; only empty ones are discovered. + pub fn from_file_with_discovery( + path: &Path, + clickhouse_url: &str, + clickhouse_database: &str, + opts: SQLRuntimeOptions, + ) -> Result { + let yaml_str = std::fs::read_to_string(path)?; + let mut config: SQLControllerConfig = serde_yaml::from_str(&yaml_str)?; + for table in &mut config.tables { + if table.metadata_columns.is_empty() { + debug!( + "Table '{}' has no metadata_columns; discovering via ClickHouse system.columns at {}", + table.name, clickhouse_url + ); + table.metadata_columns = clickhouse_client::infer_metadata_columns( + clickhouse_url, + clickhouse_database, + &table.name, + &table.time_column, + &table.value_columns, + )?; + } else { + debug!( + "Table '{}' has {} metadata_columns in config; skipping discovery", + table.name, + table.metadata_columns.len() + ); + } + } + Ok(Self { + config, + options: opts, + }) + } + pub fn from_yaml(yaml: &str, opts: SQLRuntimeOptions) -> Result { let config: SQLControllerConfig = serde_yaml::from_str(yaml)?; Ok(Self { diff --git a/asap-planner-rs/src/sql/generator.rs b/asap-planner-rs/src/sql/generator.rs index 16e13075..9982a6b8 100644 --- a/asap-planner-rs/src/sql/generator.rs +++ b/asap-planner-rs/src/sql/generator.rs @@ -48,6 +48,18 @@ pub fn generate_sql_plan( } } + // Validate that all tables have metadata_columns populated (either from config + // or filled in by from_file_with_discovery before reaching here). + for t in &config.tables { + if t.metadata_columns.is_empty() { + return Err(ControllerError::PlannerError(format!( + "Table '{}' has no metadata_columns. List them in the config file \ + or pass --clickhouse-url for auto-discovery.", + t.name + ))); + } + } + // Check for duplicate queries let mut seen_queries = std::collections::HashSet::new(); for qg in &config.query_groups { diff --git a/asap-planner-rs/tests/clickhouse_client_test.rs b/asap-planner-rs/tests/clickhouse_client_test.rs new file mode 100644 index 00000000..ea95ae6a --- /dev/null +++ b/asap-planner-rs/tests/clickhouse_client_test.rs @@ -0,0 +1,53 @@ +use std::io::{Read, Write}; +use std::net::TcpListener; + +use asap_planner::clickhouse_client::infer_metadata_columns; + +/// Spawn a single-shot HTTP server that returns a hardcoded `system.columns` +/// response, then verify that `infer_metadata_columns` correctly excludes the +/// time column and value columns and returns the rest sorted. +/// +/// Table: hits +/// EventTime DateTime ← excluded (time_column) +/// ResolutionWidth UInt16 ← excluded (value_column) +/// OS UInt8 ← metadata +/// RegionID UInt32 ← metadata +/// +/// Expected result: ["OS", "RegionID"] +#[test] +fn test_infer_metadata_columns_via_mock() { + let body = concat!( + "{\"name\":\"EventTime\",\"type\":\"DateTime\"}\n", + "{\"name\":\"ResolutionWidth\",\"type\":\"UInt16\"}\n", + "{\"name\":\"OS\",\"type\":\"UInt8\"}\n", + "{\"name\":\"RegionID\",\"type\":\"UInt32\"}\n", + ); + + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let port = listener.local_addr().unwrap().port(); + + let server = std::thread::spawn(move || { + let (mut stream, _) = listener.accept().unwrap(); + let mut buf = [0u8; 4096]; + let _ = stream.read(&mut buf); + let response = format!( + "HTTP/1.1 200 OK\r\nContent-Length: {}\r\nContent-Type: application/x-ndjson\r\nConnection: close\r\n\r\n{}", + body.len(), + body + ); + stream.write_all(response.as_bytes()).unwrap(); + }); + + let url = format!("http://127.0.0.1:{}", port); + let result = infer_metadata_columns( + &url, + "default", + "hits", + "EventTime", + &["ResolutionWidth".to_string()], + ) + .unwrap(); + + server.join().unwrap(); + assert_eq!(result, vec!["OS".to_string(), "RegionID".to_string()]); +} diff --git a/asap-tools/experiments/ISSUE_350_PLAN.md b/asap-tools/experiments/ISSUE_350_PLAN.md new file mode 100644 index 00000000..ddcecc10 --- /dev/null +++ b/asap-tools/experiments/ISSUE_350_PLAN.md @@ -0,0 +1,280 @@ +# ClickHouse Label Discovery — Implementation Plan (Issue #350) + +## Context for new sessions + +**Objective**: Let `metadata_columns` (the label/dimension columns the planner tracks for +rollup) be inferred from a live ClickHouse instance via its native HTTP API, instead of +requiring the user to list them manually under `dataset.precompute.label_cols` in the +experiment config. This mirrors how the PromQL planner discovers label sets from Prometheus. + +**Read these files first before writing anything**: +- `asap-planner-rs/src/prometheus_client.rs` — the discovery pattern to copy +- `asap-planner-rs/src/promql/controller.rs` — how discovery feeds into plan generation +- `asap-planner-rs/src/config/input.rs` — `TableDefinition` (the struct to extend) +- `asap-planner-rs/src/sql/controller.rs` — where `from_file_with_discovery` goes +- `asap-planner-rs/src/main.rs` — CLI arg wiring +- `experiment_utils/services/misc.py` — `ControllerService` to extend +- `experiment_utils/config.py` — `generate_sql_planner_input` to update +- `experiment_run_clickhouse.py` — the runner that calls `ControllerService` +- `config/experiment_type/clickhouse.yaml` — config knobs affected + +**Current state (what already exists)**: +- `experiment_run_clickhouse.py` fully orchestrates baseline + sketchdb modes +- `ClickHouseService` / `ClickHouseDataLoaderService` exist and work +- `SQLController` already plans from explicit `metadata_columns` in `TableDefinition` +- `asap-planner --query-language sql` already accepts `--data-ingestion-interval` +- `dataset.precompute.label_cols` is currently REQUIRED in the experiment config +- `reqwest` is already a Cargo dependency (used by `prometheus_client.rs`) +- The ClickHouse HTTP API is directly accessible at `:8123` — no adapter needed + +**Why this is not blocked on #336**: +The original plan noted "blocked on #336" because an earlier design routed discovery +through the ASAP ClickHouse adapter. This plan queries ClickHouse directly via its native +HTTP API (`SELECT name, type FROM system.columns WHERE …`), which requires no changes to +the adapter and is independent of #336. + +**Design decisions (do not re-open)**: +- Discovery queries `system.columns` (not `information_schema`): same result, more idiomatic ClickHouse. +- `metadata_columns` = all columns that are NOT the `time_column` and NOT in `value_columns`. + No type-based filtering — the user-provided time/value column names are the only excludes. +- When `--clickhouse-url` is absent, the planner requires all `metadata_columns` to be + explicit (current behavior unchanged). No silent fallback to empty. +- `--clickhouse-database` defaults to `"default"`. +- In the Python runner, `label_cols` becomes optional. When absent (or empty list), the + planner input YAML omits `metadata_columns` and `--clickhouse-url` is passed so the + planner infers them. + +--- + +## What changes in each component + +| Component | Change | +|-----------|--------| +| `asap-planner-rs/src/clickhouse_client.rs` | **Create** — HTTP query to `system.columns` | +| `asap-planner-rs/src/lib.rs` | Export new module | +| `asap-planner-rs/src/config/input.rs` | `metadata_columns` gets `#[serde(default)]` | +| `asap-planner-rs/src/sql/controller.rs` | `from_file_with_discovery()` constructor | +| `asap-planner-rs/src/main.rs` | `--clickhouse-url`, `--clickhouse-database` flags | +| `experiment_utils/services/misc.py` | `ControllerService.start()` + bare-metal/containerized helpers | +| `experiment_utils/config.py` | `generate_sql_planner_input()` — `label_cols` optional | +| `config/experiment_type/clickhouse.yaml` | `label_cols` becomes optional, documented | +| `experiment_run_clickhouse.py` | Pass `clickhouse_url` to controller in sketchdb mode | + +--- + +## Checkpoint 1 — `clickhouse_client.rs`: schema discovery + +**What**: New Rust module that queries the ClickHouse HTTP API to discover table column +names and types, then infers which columns are metadata (dimension) columns. + +**ClickHouse HTTP API**: a simple GET to `:8123/?query=` with +`default_format=JSONEachRow`. Response is newline-delimited JSON. + +Example: +``` +GET http://localhost:8123/?query=SELECT+name%2Ctype+FROM+system.columns+WHERE+database%3D'default'+AND+table%3D'hits'&default_format=JSONEachRow +→ {"name":"WatchID","type":"UInt64"} + {"name":"JavaEnable","type":"UInt8"} + {"name":"EventTime","type":"DateTime"} + ... +``` + +**Files**: +- `asap-planner-rs/src/clickhouse_client.rs` (new) + ```rust + // Fetch (name, type) pairs for all columns in the given table. + pub fn fetch_columns_for_table( + clickhouse_url: &str, + database: &str, + table: &str, + ) -> Result, ControllerError> + + // Return all column names that are not the time column or a value column. + pub fn infer_metadata_columns( + clickhouse_url: &str, + database: &str, + table_name: &str, + time_column: &str, + value_columns: &[String], + ) -> Result, ControllerError> + ``` + - `fetch_columns_for_table`: GET `/?query=SELECT+name,type+FROM+system.columns+WHERE+database=''+AND+table=''&default_format=JSONEachRow` + - Uses `reqwest::blocking::Client` (already a dependency) + - Retry loop (up to 15 attempts, 2 s delay) matching `prometheus_client.rs` pattern + - `infer_metadata_columns`: calls `fetch_columns_for_table`, filters out `time_column` + and all entries in `value_columns`; returns the rest sorted +- `asap-planner-rs/src/lib.rs` — add `pub mod clickhouse_client;` + +**Error variant** (add to `error.rs`): +```rust +#[error("ClickHouse client error: {0}")] +ClickHouseClient(String), +``` + +--- + +## Checkpoint 2 — Make `metadata_columns` optional in `TableDefinition` + +**What**: Change the serde annotation so that a `TableDefinition` with no `metadata_columns` +field is valid YAML. When the planner is given `--clickhouse-url`, it fills in missing +columns via Checkpoint 1. When no URL is given and `metadata_columns` is empty, the planner +errors with a clear message. + +**Files**: +- `asap-planner-rs/src/config/input.rs` + ```rust + #[derive(Debug, Clone, Deserialize)] + pub struct TableDefinition { + pub name: String, + pub time_column: String, + pub value_columns: Vec, + #[serde(default)] // ← add this + pub metadata_columns: Vec, + } + ``` +- `asap-planner-rs/src/sql/controller.rs` + - Keep existing `from_file` / `from_yaml` constructors unchanged + - Add new constructor: + ```rust + pub fn from_file_with_discovery( + path: &Path, + clickhouse_url: &str, + clickhouse_database: &str, + opts: SQLRuntimeOptions, + ) -> Result + ``` + Reads YAML, then for each `TableDefinition` where `metadata_columns.is_empty()`, + calls `clickhouse_client::infer_metadata_columns(url, db, table, time_col, value_cols)`. +- `asap-planner-rs/src/sql/generator.rs` + - At the top of `generate_sql_plan`, validate that every table has non-empty + `metadata_columns` (they must be filled in before this point): + ```rust + for t in &config.tables { + if t.metadata_columns.is_empty() { + return Err(ControllerError::PlannerError(format!( + "Table '{}' has no metadata_columns. List them in the config file \ + or pass --clickhouse-url for auto-discovery.", + t.name + ))); + } + } + ``` + +Existing unit tests in `tests/sql_integration.rs` continue to pass unchanged because they +supply explicit `metadata_columns`. + +--- + +## Checkpoint 3 — Add `--clickhouse-url` / `--clickhouse-database` CLI flags + +**What**: Wire the new constructor into `main.rs`. + +**File**: `asap-planner-rs/src/main.rs` + +```rust +/// ClickHouse base URL for auto-inferring metadata_columns when not specified +/// in the config file. Example: http://localhost:8123 +#[arg(long = "clickhouse-url", required = false)] +clickhouse_url: Option, + +#[arg(long = "clickhouse-database", default_value = "default")] +clickhouse_database: String, +``` + +In the SQL arm of `main()`: +```rust +let controller = match args.clickhouse_url { + Some(url) => SQLController::from_file_with_discovery( + &config_path, &url, &args.clickhouse_database, opts, + )?, + None => SQLController::from_file(&config_path, opts)?, +}; +controller.generate_to_dir(&args.output_dir)?; +``` + +**Test**: `cargo build --release` on the CloudLab node; run with `--clickhouse-url` against +a loaded ClickHouse instance; confirm the generated `streaming_config.yaml` lists the +correct label columns under `rollup`. + +--- + +## Checkpoint 4 — Python infra: `ControllerService` + `generate_sql_planner_input` + +**What**: Plumb the ClickHouse URL through the Python layer. + +**Files**: +- `experiment_utils/services/misc.py` (`ControllerService`): + - `start()`: add `clickhouse_url: Optional[str] = None`, `clickhouse_database: str = "default"` + - `_start_bare_metal()` / `_start_containerized()`: when `query_language == "sql"` and `clickhouse_url` is set: + ```python + if clickhouse_url: + cmd += f" --clickhouse-url {clickhouse_url}" + cmd += f" --clickhouse-database {clickhouse_database}" + ``` + +- `experiment_utils/config.py` (`generate_sql_planner_input`): + - Read `label_cols` from `precompute_cfg`; if missing or empty, emit `metadata_columns: []` + (the planner will error if no `--clickhouse-url` is given, or auto-discover if it is). + - Remove the hard assertion that `label_cols` is non-empty. + +- `config/experiment_type/clickhouse.yaml`: + ```yaml + precompute: + value_col: ResolutionWidth + label_cols: [] # empty = auto-discover from ClickHouse; list explicitly to override + timestamp_col: EventTime + ``` + +--- + +## Checkpoint 5 — Wire into `experiment_run_clickhouse.py` + +**What**: Pass `clickhouse_url` to `controller_service.start()` in the sketchdb mode branch. +ClickHouse is already loaded and running at this point, so it can answer `system.columns` +queries immediately. + +**File**: `experiment_run_clickhouse.py` — in the sketchdb block, update the +`controller_service.start()` call: + +```python +controller_service.start( + controller_input_file=os.path.join(remote_controller_dir, "planner_input.yaml"), + streaming_engine="precompute", + controller_remote_output_dir=remote_controller_dir, + punting=False, + query_language="sql", + data_ingestion_interval=data_ingestion_interval, + clickhouse_url=clickhouse_url, # ← new + clickhouse_database=CLICKHOUSE_DATABASE, # ← new +) +``` + +**Test**: end-to-end run without `label_cols` in the experiment config. Verify: +1. `controller.log` shows the planner output (no error about missing metadata_columns) +2. `streaming_config.yaml` and `inference_config.yaml` are written with correct rollup labels +3. The query engine starts, ingests, and the query client gets results + +--- + +## Dependency Graph + +``` +[1] clickhouse_client.rs — independent +[2] TableDefinition optional — independent +[3] CLI flags — depends on [1][2] +[4] Python infra — depends on [3] +[5] Runner wiring — depends on [4] +``` + +Checkpoints 1 and 2 can be done in parallel (both are Rust, same crate). +Checkpoint 3 needs both. 4 and 5 need 3 to be built. + +--- + +## What stays unchanged + +- `experiment_run_e2e.py` — untouched +- All existing SQL planner unit tests (`metadata_columns` explicit still works) +- `benchmark/` scripts — untouched +- `ClickHouseService` / `ClickHouseDataLoaderService` — untouched +- `ControllerService.start()` existing call sites — new params default to `None`; no breakage diff --git a/asap-tools/experiments/experiment_run_clickhouse.py b/asap-tools/experiments/experiment_run_clickhouse.py index 3077fd52..e49b3fed 100644 --- a/asap-tools/experiments/experiment_run_clickhouse.py +++ b/asap-tools/experiments/experiment_run_clickhouse.py @@ -107,7 +107,7 @@ ClickHouseService, PrometheusClientService, ) -from experiment_utils.services.misc import ControllerService +from experiment_utils.services.misc import ControllerService, DiscoveryBackend from experiment_utils.services.query_engine import QueryEngineRustService CLICKHOUSE_DATABASE = "default" @@ -408,6 +408,11 @@ def main(cfg: DictConfig) -> None: streaming_engine="precompute", controller_remote_output_dir=remote_controller_dir, punting=False, + discovery_backend=DiscoveryBackend( + type="clickhouse", + url=clickhouse_url, + database=CLICKHOUSE_DATABASE, + ), query_language="sql", data_ingestion_interval=data_ingestion_interval, ) diff --git a/asap-tools/experiments/experiment_run_e2e.py b/asap-tools/experiments/experiment_run_e2e.py index fed14203..1e470073 100644 --- a/asap-tools/experiments/experiment_run_e2e.py +++ b/asap-tools/experiments/experiment_run_e2e.py @@ -32,6 +32,7 @@ DockerVictoriaMetricsService, SystemExportersService, ) +from experiment_utils.services.misc import DiscoveryBackend COMPRESS_JSON = True @@ -347,11 +348,15 @@ def main(cfg: DictConfig): ) controller_service.start( controller_input_file=controller_input_config, - prometheus_scrape_interval=prometheus_scrape_interval, streaming_engine=args.streaming_engine, controller_remote_output_dir=CONTROLLER_REMOTE_OUTPUT_DIR, punting=args.controller_punting, - prometheus_url=prometheus_url, + discovery_backend=DiscoveryBackend( + type="prometheus", + url=prometheus_url, + database=None, + ), + prometheus_scrape_interval=prometheus_scrape_interval, ) sync.rsync_controller_config_remote_to_local( provider, diff --git a/asap-tools/experiments/experiment_utils/services/misc.py b/asap-tools/experiments/experiment_utils/services/misc.py index 9a85e5fd..01d28f2c 100644 --- a/asap-tools/experiments/experiment_utils/services/misc.py +++ b/asap-tools/experiments/experiment_utils/services/misc.py @@ -5,6 +5,7 @@ import os import random import subprocess +from dataclasses import dataclass from typing import Optional import constants @@ -12,6 +13,19 @@ from experiment_utils.providers.base import InfrastructureProvider +@dataclass +class DiscoveryBackend: + """Backend connection for label/column auto-discovery by asap-planner. + + PromQL mode: DiscoveryBackend(type="prometheus", url=, database=None) + SQL mode: DiscoveryBackend(type="clickhouse", url=, database=) + """ + + type: str + url: str + database: Optional[str] + + class DeathstarService(BaseService): """Service for managing DeathStar benchmark.""" @@ -190,8 +204,8 @@ def start( streaming_engine: str, controller_remote_output_dir: str, punting: bool, + discovery_backend: DiscoveryBackend, prometheus_scrape_interval: Optional[int] = None, - prometheus_url: Optional[str] = None, query_language: str = "promql", data_ingestion_interval: Optional[int] = None, **kwargs, @@ -204,8 +218,10 @@ def start( streaming_engine: Type of streaming engine controller_remote_output_dir: Controller output directory punting: Enable query punting based on performance heuristics + discovery_backend: Backend used for label/column auto-discovery. + PromQL mode: DiscoveryBackend(type="prometheus", url=, database=None) + SQL mode: DiscoveryBackend(type="clickhouse", url=, database=) prometheus_scrape_interval: Required for PromQL mode - prometheus_url: Prometheus URL for label discovery (PromQL mode only) query_language: 'promql' (default) or 'sql' data_ingestion_interval: Required for SQL mode (seconds) **kwargs: Additional configuration @@ -216,8 +232,8 @@ def start( streaming_engine, controller_remote_output_dir, punting, + discovery_backend, prometheus_scrape_interval, - prometheus_url, query_language, data_ingestion_interval, ) @@ -227,8 +243,8 @@ def start( streaming_engine, controller_remote_output_dir, punting, + discovery_backend, prometheus_scrape_interval, - prometheus_url, query_language, data_ingestion_interval, ) @@ -239,8 +255,8 @@ def _start_bare_metal( streaming_engine: str, controller_remote_output_dir: str, punting: bool, + discovery_backend: DiscoveryBackend, prometheus_scrape_interval: Optional[int], - prometheus_url: Optional[str], query_language: str, data_ingestion_interval: Optional[int], ) -> None: @@ -254,12 +270,17 @@ def _start_bare_metal( ) if prometheus_scrape_interval is not None: cmd += f" --prometheus_scrape_interval {prometheus_scrape_interval}" - if prometheus_url: - cmd += f" --prometheus-url {prometheus_url}" if data_ingestion_interval is not None: cmd += f" --data-ingestion-interval {data_ingestion_interval}" + if discovery_backend.type == "prometheus": + cmd += f" --prometheus-url {discovery_backend.url}" + elif discovery_backend.type == "clickhouse": + cmd += f" --clickhouse-url {discovery_backend.url}" + if discovery_backend.database: + cmd += f" --clickhouse-database {discovery_backend.database}" if punting: cmd += " --enable-punting" + cmd += " -v" cmd += f" > {controller_log} 2>&1" cmd_dir = os.path.join(self.provider.get_home_dir(), "code", "asap-planner-rs") self.provider.execute_command( @@ -277,8 +298,8 @@ def _start_containerized( streaming_engine: str, controller_remote_output_dir: str, punting: bool, + discovery_backend: DiscoveryBackend, prometheus_scrape_interval: Optional[int], - prometheus_url: Optional[str], query_language: str, data_ingestion_interval: Optional[int], ): @@ -312,10 +333,14 @@ def _start_containerized( generate_cmd += ( f" --prometheus-scrape-interval {prometheus_scrape_interval}" ) - if prometheus_url: - generate_cmd += f" --prometheus-url {prometheus_url}" if data_ingestion_interval is not None: generate_cmd += f" --data-ingestion-interval {data_ingestion_interval}" + if discovery_backend.type == "prometheus": + generate_cmd += f" --prometheus-url {discovery_backend.url}" + elif discovery_backend.type == "clickhouse": + generate_cmd += f" --clickhouse-url {discovery_backend.url}" + if discovery_backend.database: + generate_cmd += f" --clickhouse-database {discovery_backend.database}" if punting: generate_cmd += " --punting"