From ba151896d59ba7cc8403aeff0f83e60c8bffde30 Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Mon, 25 May 2026 22:32:34 -0400 Subject: [PATCH 1/5] added json source --- asap-query-engine/src/engine_config.rs | 42 +++- asap-query-engine/src/main.rs | 30 ++- .../src/precompute_engine/json_ingest.rs | 192 ++++++++++++++++++ .../src/precompute_engine/mod.rs | 2 + 4 files changed, 264 insertions(+), 2 deletions(-) create mode 100644 asap-query-engine/src/precompute_engine/json_ingest.rs diff --git a/asap-query-engine/src/engine_config.rs b/asap-query-engine/src/engine_config.rs index 80bd2a21..04e72587 100644 --- a/asap-query-engine/src/engine_config.rs +++ b/asap-query-engine/src/engine_config.rs @@ -5,7 +5,9 @@ pub fn check_config(config: &EngineConfig) -> Result<(), String> { match (&config.ingest, &config.streaming_engine) { (IngestConfig::Kafka { .. }, StreamingEngine::Arroyo) => {} ( - IngestConfig::HttpRemoteWrite { .. } | IngestConfig::Csv { .. }, + IngestConfig::HttpRemoteWrite { .. } + | IngestConfig::Csv { .. } + | IngestConfig::Json { .. }, StreamingEngine::Precompute, ) => {} (IngestConfig::Otlp { .. }, StreamingEngine::Arroyo) => {} @@ -241,6 +243,18 @@ pub enum IngestConfig { #[serde(default = "default_otlp_http_port")] http_port: u16, }, + Json { + path: String, + metric_name: String, + value_col: String, + #[serde(default)] + label_cols: Vec, + timestamp_col: String, + #[serde(default = "default_timestamp_unit")] + timestamp_unit: String, + #[serde(default = "default_json_batch_size")] + batch_size: usize, + }, } impl Default for IngestConfig { @@ -271,6 +285,14 @@ fn default_otlp_http_port() -> u16 { 4318 } +fn default_timestamp_unit() -> String { + "seconds".to_string() +} + +fn default_json_batch_size() -> usize { + 1000 +} + #[derive(Debug, serde::Deserialize)] #[serde(default)] pub struct PrecomputeSettings { @@ -435,6 +457,24 @@ output_dir: "./output" assert!(check_config(&config).is_ok()); } + #[test] + fn check_config_valid_json_precompute() { + let yaml = r#" +streaming_engine: "precompute" +ingest: + type: "json" + path: "hits.json" + metric_name: "hits" + value_col: "ResolutionWidth" + label_cols: ["OS", "RegionID"] + timestamp_col: "EventTime" + timestamp_unit: "seconds" +output_dir: "./output" +"#; + let config: EngineConfig = Figment::new().merge(Yaml::string(yaml)).extract().unwrap(); + assert!(check_config(&config).is_ok()); + } + #[test] fn check_config_valid_otlp_arroyo() { let yaml = r#" diff --git a/asap-query-engine/src/main.rs b/asap-query-engine/src/main.rs index c29ca2a8..412a329d 100644 --- a/asap-query-engine/src/main.rs +++ b/asap-query-engine/src/main.rs @@ -16,6 +16,9 @@ use query_engine_rust::data_model::enums::{CleanupPolicy, StreamingEngine}; use query_engine_rust::drivers::AdapterConfig; use query_engine_rust::precompute_engine::config::LateDataPolicy; use query_engine_rust::precompute_engine::csv_ingest::{CsvFileIngestConfig, CsvFileIngestSource}; +use query_engine_rust::precompute_engine::json_ingest::{ + JsonFileIngestConfig, JsonFileIngestSource, TimestampUnit, +}; use query_engine_rust::precompute_engine::PrecomputeWorkerDiagnostics; use query_engine_rust::utils::file_io::{read_inference_config, read_streaming_config}; use query_engine_rust::InferenceConfig; @@ -238,13 +241,38 @@ async fn main() -> Result<()> { batch_size: *batch_size, }))] } + IngestConfig::Json { + path, + metric_name, + value_col, + label_cols, + timestamp_col, + timestamp_unit, + batch_size, + } => { + let unit = timestamp_unit + .parse::() + .map_err(|e| format!("Invalid timestamp_unit: {e}"))?; + info!("JSON file ingest mode: {}", path); + vec![Box::new(JsonFileIngestSource::new(JsonFileIngestConfig { + path: path.clone(), + metric_name: metric_name.clone(), + value_col: value_col.clone(), + label_cols: label_cols.clone(), + timestamp_col: timestamp_col.clone(), + timestamp_unit: unit, + batch_size: *batch_size, + }))] + } IngestConfig::HttpRemoteWrite { port } => { info!("Starting precompute engine on port {}", port); vec![Box::new(HttpIngestSource::new(HttpIngestConfig { port: *port, }))] } - _ => unreachable!("check_config enforces precompute requires http_remote_write or csv"), + _ => unreachable!( + "check_config enforces precompute requires http_remote_write, csv, or json" + ), }; let pe = PrecomputeEngine::new( precompute_config, diff --git a/asap-query-engine/src/precompute_engine/json_ingest.rs b/asap-query-engine/src/precompute_engine/json_ingest.rs new file mode 100644 index 00000000..f5d7dc27 --- /dev/null +++ b/asap-query-engine/src/precompute_engine/json_ingest.rs @@ -0,0 +1,192 @@ +use crate::drivers::ingest::prometheus_remote_write::DecodedSample; +use crate::precompute_engine::ingest_source::{route_decoded_samples, IngestContext, IngestSource}; +use std::time::Instant; +use tracing::info; + +pub enum TimestampUnit { + Seconds, + Millis, +} + +impl std::str::FromStr for TimestampUnit { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "seconds" => Ok(TimestampUnit::Seconds), + "millis" => Ok(TimestampUnit::Millis), + other => Err(format!( + "unknown timestamp_unit '{}': expected 'seconds' or 'millis'", + other + )), + } + } +} + +impl TimestampUnit { + pub fn to_ms(&self, raw: i64) -> i64 { + match self { + TimestampUnit::Seconds => raw * 1000, + TimestampUnit::Millis => raw, + } + } +} + +pub struct JsonFileIngestConfig { + pub path: String, + pub metric_name: String, + pub value_col: String, + /// Label columns. Will be sorted alphabetically in the labels string. + pub label_cols: Vec, + pub timestamp_col: String, + pub timestamp_unit: TimestampUnit, + pub batch_size: usize, +} + +pub struct JsonFileIngestSource { + config: JsonFileIngestConfig, +} + +impl JsonFileIngestSource { + pub fn new(config: JsonFileIngestConfig) -> Self { + Self { config } + } +} + +#[async_trait::async_trait] +impl IngestSource for JsonFileIngestSource { + async fn run( + self: Box, + ctx: IngestContext, + ) -> Result<(), Box> { + let config = self.config; + let (tx, mut rx) = tokio::sync::mpsc::channel::>(8); + + let reader_handle = tokio::task::spawn_blocking( + move || -> Result> { + let file = std::fs::File::open(&config.path)?; + let reader = std::io::BufReader::new(file); + + let mut sorted_label_cols = config.label_cols.clone(); + sorted_label_cols.sort(); + + let mut batch: Vec = Vec::with_capacity(config.batch_size); + let mut row_count: u64 = 0; + + use std::io::BufRead; + for line_result in reader.lines() { + let line = line_result?; + let trimmed = line.trim(); + if trimmed.is_empty() { + continue; + } + + let obj: serde_json::Value = serde_json::from_str(trimmed).map_err( + |e| -> Box { + std::io::Error::other(format!( + "failed to parse JSON line {}: {}", + row_count + 1, + e + )) + .into() + }, + )?; + + let value: f64 = obj + .get(&config.value_col) + .ok_or_else(|| -> Box { + std::io::Error::other(format!( + "value column '{}' not found in JSON object", + config.value_col + )) + .into() + })? + .as_f64() + .ok_or_else(|| -> Box { + std::io::Error::other(format!( + "value column '{}' is not a number", + config.value_col + )) + .into() + })?; + + let raw_ts: i64 = obj + .get(&config.timestamp_col) + .ok_or_else(|| -> Box { + std::io::Error::other(format!( + "timestamp column '{}' not found in JSON object", + config.timestamp_col + )) + .into() + })? + .as_i64() + .ok_or_else(|| -> Box { + std::io::Error::other(format!( + "timestamp column '{}' is not an integer", + config.timestamp_col + )) + .into() + })?; + + let timestamp_ms = config.timestamp_unit.to_ms(raw_ts); + + let labels = if sorted_label_cols.is_empty() { + config.metric_name.clone() + } else { + let mut s = String::with_capacity(64); + s.push_str(&config.metric_name); + s.push('{'); + for (i, col) in sorted_label_cols.iter().enumerate() { + if i > 0 { + s.push(','); + } + let val = obj.get(col).and_then(|v| v.as_str()).unwrap_or(""); + s.push_str(col); + s.push_str("=\""); + s.push_str(val); + s.push('"'); + } + s.push('}'); + s + }; + + batch.push(DecodedSample { + labels, + timestamp_ms, + value, + }); + row_count += 1; + + if batch.len() >= config.batch_size { + let send_batch = + std::mem::replace(&mut batch, Vec::with_capacity(config.batch_size)); + if tx.blocking_send(send_batch).is_err() { + break; + } + } + } + + if !batch.is_empty() { + let _ = tx.blocking_send(batch); + } + + Ok(row_count) + }, + ); + + let mut total_samples: u64 = 0; + while let Some(batch) = rx.recv().await { + total_samples += batch.len() as u64; + route_decoded_samples(&ctx, batch, Instant::now()).await?; + } + + let rows = reader_handle.await??; + info!( + "JSON ingest complete: {} rows ingested, {} samples routed", + rows, total_samples + ); + + ctx.router.broadcast_shutdown().await?; + Ok(()) + } +} diff --git a/asap-query-engine/src/precompute_engine/mod.rs b/asap-query-engine/src/precompute_engine/mod.rs index d5c10665..702ed8e7 100644 --- a/asap-query-engine/src/precompute_engine/mod.rs +++ b/asap-query-engine/src/precompute_engine/mod.rs @@ -4,6 +4,7 @@ pub mod csv_ingest; mod engine; mod ingest_handler; pub mod ingest_source; +pub mod json_ingest; pub mod output_sink; pub mod series_buffer; pub mod series_router; @@ -14,3 +15,4 @@ pub use csv_ingest::{CsvFileIngestConfig, CsvFileIngestSource}; pub use engine::{PrecomputeEngine, PrecomputeEngineHandle, PrecomputeWorkerDiagnostics}; pub use ingest_handler::{HttpIngestConfig, HttpIngestSource}; pub use ingest_source::{IngestContext, IngestSource}; +pub use json_ingest::{JsonFileIngestConfig, JsonFileIngestSource, TimestampUnit}; From e0023c2520359e7e8dff8fdd384709f229430554 Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Mon, 25 May 2026 22:34:19 -0400 Subject: [PATCH 2/5] updated planner --- asap-planner-rs/src/planner/cleanup.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/asap-planner-rs/src/planner/cleanup.rs b/asap-planner-rs/src/planner/cleanup.rs index 838e084a..23730967 100644 --- a/asap-planner-rs/src/planner/cleanup.rs +++ b/asap-planner-rs/src/planner/cleanup.rs @@ -76,6 +76,13 @@ pub fn get_sql_cleanup_param( ) -> Result { match cleanup_policy { CleanupPolicy::CircularBuffer | CleanupPolicy::ReadBased => { + if t_repeat == 0 { + return Err( + "repetition_delay must be > 0 for cleanup param calculation; \ + set a non-zero repetition_delay in your query group config" + .to_string(), + ); + } Ok(t_lookback.div_ceil(t_repeat)) } CleanupPolicy::NoCleanup => { From bc263213048bc8bf2d7b0b3c8eca886c257be1fd Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Mon, 25 May 2026 22:35:56 -0400 Subject: [PATCH 3/5] updated experiment infra --- .../experiments/experiment_run_clickhouse.py | 257 ++++++++++++++++-- .../experiments/experiment_utils/config.py | 140 ++++++++++ .../services/clickhouse_service.py | 52 ++-- .../experiment_utils/services/misc.py | 66 +++-- .../experiment_utils/services/query_engine.py | 20 +- .../experiments/experiment_utils/sync.py | 35 +++ 6 files changed, 491 insertions(+), 79 deletions(-) diff --git a/asap-tools/experiments/experiment_run_clickhouse.py b/asap-tools/experiments/experiment_run_clickhouse.py index a6286cfa..68c13050 100644 --- a/asap-tools/experiments/experiment_run_clickhouse.py +++ b/asap-tools/experiments/experiment_run_clickhouse.py @@ -1,30 +1,95 @@ """ -Experiment runner for ClickHouse/SQL experiments — baseline mode. +Experiment runner for ClickHouse/SQL experiments. -Flow: +Supports two modes, selected automatically based on config: + + baseline — queries go directly to ClickHouse (:8123). + sketchdb — queries go to the ASAP precompute engine (:8088), which serves + approximate results from KLL sketches and forwards unsupported + queries to ClickHouse as a fallback. + +──────────────────────────────────────────────────────────── +Baseline-only flow +──────────────────────────────────────────────────────────── rsync dataset file → node ClickHouseService.start() - ClickHouseDataLoaderService.start() (once, before mode loop; DROP + reload) + ClickHouseDataLoaderService.start() (DROP + reload) for experiment_mode in ["baseline"]: - run prometheus-client in ClickHouse SQL mode (blocking) + run prometheus-client → ClickHouse :8123 rsync results back - teardown if not no_teardown -Usage: +──────────────────────────────────────────────────────────── +Sketchdb flow (enabled when sketchdb_query_groups is set in config) +──────────────────────────────────────────────────────────── + (same rsync + ClickHouse start + data load as above) + + for experiment_mode in ["baseline", "sketchdb"]: + if sketchdb: + rsync streaming_config.yaml → node + query_engine_rust starts (precompute engine + JSON ingest + CH fallback) + wait until process is up + sleep flow.steady_state_wait (ingest completes during this window) + run prometheus-client → ASAP :8088 + rsync results back + stop query_engine_rust + +──────────────────────────────────────────────────────────── +Pre-run steps for sketchdb mode +──────────────────────────────────────────────────────────── +1. Generate the SQL query file (once per dataset/window configuration): + + python asap-tools/execution-utilities/benchmark/generate_queries.py \\ + --table-name hits \\ + --ts-column EventTime \\ + --value-column ResolutionWidth \\ + --group-by-columns OS,RegionID,TraficSourceID,UserAgent \\ + --window-size 10 \\ + --output-prefix /path/to/output/clickbench \\ + --auto-detect-timestamps \\ + --data-file /path/to/hits.json \\ + --data-file-format jsonl + + The same SQL file is used for both baseline (load tester) and sketchdb + (load tester + planner input). + +2. Ensure the release binaries are built on the CloudLab node: + cargo build --release (in ~/code/asap-query-engine) + cargo build --release (in ~/code/asap-planner-rs) + + asap-planner runs automatically in SQL mode during the experiment to + generate streaming_config.yaml and inference_config.yaml — no manual + config file authoring needed. + +──────────────────────────────────────────────────────────── +Usage +──────────────────────────────────────────────────────────── +Baseline only: + python experiment_run_clickhouse.py \\ + experiment_type=clickhouse \\ + experiment.name=my_bench \\ + cloudlab.num_nodes=1 \\ + cloudlab.username=myuser \\ + cloudlab.hostname_suffix=myexp.cloudlab.us \\ + experiment_params.dataset.local_data_file=/path/to/hits.json \\ + 'experiment_params.query_groups[0].sql_file=/path/to/clickbench.sql' + +Baseline + sketchdb: python experiment_run_clickhouse.py \\ experiment_type=clickhouse \\ experiment.name=my_bench \\ cloudlab.num_nodes=1 \\ cloudlab.username=myuser \\ cloudlab.hostname_suffix=myexp.cloudlab.us \\ - experiment_params.dataset.name=clickbench \\ experiment_params.dataset.local_data_file=/path/to/hits.json \\ - 'experiment_params.query_groups[0].sql_file=/path/to/queries.sql' + 'experiment_params.query_groups[0].sql_file=/path/to/clickbench.sql' \\ + 'experiment_params.sketchdb_query_groups[0].sql_file=/path/to/clickbench_asap.sql' \\ + experiment_params.streaming_config_file=/path/to/clickbench_streaming.yaml """ import json import os +import time from urllib.parse import urlparse import hydra @@ -33,7 +98,13 @@ import constants from experiment_utils import config, sync from experiment_utils.providers.factory import create_provider -from experiment_utils.services import ClickHouseDataLoaderService, ClickHouseService +from experiment_utils.services import ( + ClickHouseDataLoaderService, + ClickHouseService, + PrometheusClientService, +) +from experiment_utils.services.misc import ControllerService +from experiment_utils.services.query_engine import QueryEngineRustService # Register resolvers used by config.yaml interpolation. OmegaConf.register_new_resolver( @@ -178,6 +249,11 @@ def main(cfg: DictConfig) -> None: clickhouse_database = str(cfg.clickhouse.database) clickhouse_http_port = urlparse(clickhouse_url).port or 8123 + # --- sketchdb config (optional) --- + sketchdb_query_groups = ep.get("sketchdb_query_groups") or None + data_ingestion_interval = int(ep.get("data_ingestion_interval", 15)) + asap_http_port = 8088 + # --- generate prometheus-client config YAMLs for each experiment mode --- if not skip_querying: mode_server_urls = {constants.BASELINE_EXPERIMENT_NAME: clickhouse_url} @@ -186,6 +262,8 @@ def main(cfg: DictConfig) -> None: local_experiment_dir=local_experiment_root_dir, mode_server_urls=mode_server_urls, clickhouse_database=clickhouse_database, + sketchdb_query_groups=sketchdb_query_groups, + sketchdb_server_url=f"http://localhost:{asap_http_port}/clickhouse/query", ) sync.rsync_controller_client_configs( provider, @@ -198,6 +276,8 @@ def main(cfg: DictConfig) -> None: print("skip_querying=True: no SQL queries will be executed") print("-" * 40) experiment_modes = [constants.BASELINE_EXPERIMENT_NAME] + if sketchdb_query_groups: + experiment_modes.append(constants.SKETCHDB_EXPERIMENT_NAME) # --- rsync dataset file to node --- remote_data_dir = os.path.join(experiment_root_output_dir, "data") @@ -232,9 +312,18 @@ def main(cfg: DictConfig) -> None: ) # --- mode loop --- + prometheus_client_service = PrometheusClientService( + provider, use_container=use_container, node_offset=node_offset + ) + for experiment_mode in experiment_modes: print(f"Running experiment mode: {experiment_mode}") + # Clean up any leftover prometheus-client container from the previous mode, + # mirroring the prometheus_client_service.stop() call at the top of the + # e2e mode loop. + prometheus_client_service.stop() + experiment_output_dir = os.path.join( experiment_root_output_dir, experiment_mode ) @@ -248,27 +337,145 @@ def main(cfg: DictConfig) -> None: ) os.makedirs(local_experiment_dir, exist_ok=True) - if not skip_querying: - controller_client_config = os.path.join( - experiment_root_output_dir, - "controller_client_configs", - f"{experiment_mode}.yaml", + if experiment_mode == constants.SKETCHDB_EXPERIMENT_NAME: + # --- sketchdb mode: precompute engine + JSON ingest + ClickHouse fallback --- + # Mirrors experiment_run_e2e.py: planner runs first and generates + # streaming_config.yaml + inference_config.yaml into controller_output_dir, + # then the query engine starts reading from that same directory. + + local_controller_dir = os.path.join( + local_experiment_root_dir, "controller_output" ) - _run_query_client( + remote_controller_dir = os.path.join( + experiment_root_output_dir, "controller_output" + ) + os.makedirs(local_controller_dir, exist_ok=True) + + # Generate and rsync the planner input config to the node + planner_input_yaml = config.generate_sql_planner_input( + sketchdb_query_groups, dataset_cfg + ) + local_planner_input = os.path.join( + local_controller_dir, "planner_input.yaml" + ) + with open(local_planner_input, "w") as _f: + _f.write(planner_input_yaml) + sync.rsync_streaming_configs( + provider, local_controller_dir, remote_controller_dir, node_offset + ) + + # Run asap-planner (SQL mode) — writes streaming_config.yaml + inference_config.yaml + controller_service = ControllerService( + provider=provider, use_container=False, node_offset=node_offset + ) + controller_service.start( + controller_input_file=os.path.join( + remote_controller_dir, "planner_input.yaml" + ), + streaming_engine="precompute", + controller_remote_output_dir=remote_controller_dir, + punting=False, + query_language="sql", + data_ingestion_interval=data_ingestion_interval, + ) + sync.rsync_controller_config_remote_to_local( + provider, remote_controller_dir, local_controller_dir, node_offset + ) + + # Start query engine (precompute + JSON ingest + ClickHouse fallback) + query_engine_service = QueryEngineRustService( provider=provider, + use_container=False, node_offset=node_offset, - config_file=controller_client_config, - output_dir=experiment_output_dir, - use_container=use_container, - parallel=parallel, + ) + dataset_precompute_cfg = dataset_cfg.precompute + query_engine_service.start( + experiment_output_dir=experiment_output_dir, + local_experiment_dir=local_experiment_dir, + flink_output_format="json", + prometheus_scrape_interval=15, + log_level="INFO", + profile_query_engine=False, + manual=False, + streaming_engine="precompute", + controller_remote_output_dir=remote_controller_dir, + compress_json=False, + dump_precomputes=False, + lock_strategy="per-key", + backend_config={ + "type": "clickhouse", + "url": clickhouse_url, + "database": clickhouse_database, + "forward_unsupported_queries": True, + }, + http_port=asap_http_port, + ingest_json_config={ + "path": remote_data_file, + "metric_name": str(dataset_cfg.metric_name), + "value_col": str(dataset_precompute_cfg.value_col), + "label_cols": list(dataset_precompute_cfg.label_cols), + "timestamp_col": str(dataset_precompute_cfg.timestamp_col), + "timestamp_unit": "seconds", + "batch_size": 1000, + }, ) - sync.rsync_experiment_data( - provider, - experiment_output_dir, - local_experiment_dir, - node_offset=node_offset, - ) + query_engine_service.wait_until_ready() + + if not skip_querying: + steady_state_wait = int(cfg.flow.steady_state_wait) + print( + f"Waiting {steady_state_wait}s for precompute ingest to complete..." + ) + time.sleep(steady_state_wait) + + controller_client_config = os.path.join( + experiment_root_output_dir, + "controller_client_configs", + f"{experiment_mode}.yaml", + ) + _run_query_client( + provider=provider, + node_offset=node_offset, + config_file=controller_client_config, + output_dir=experiment_output_dir, + use_container=use_container, + parallel=parallel, + ) + + sync.rsync_experiment_data( + provider, + experiment_output_dir, + local_experiment_dir, + node_offset=node_offset, + ) + + if not no_teardown: + query_engine_service.stop() + + else: + # --- baseline mode --- + if not skip_querying: + controller_client_config = os.path.join( + experiment_root_output_dir, + "controller_client_configs", + f"{experiment_mode}.yaml", + ) + _run_query_client( + provider=provider, + node_offset=node_offset, + config_file=controller_client_config, + output_dir=experiment_output_dir, + use_container=use_container, + parallel=parallel, + ) + + sync.rsync_experiment_data( + provider, + experiment_output_dir, + local_experiment_dir, + node_offset=node_offset, + ) # --- teardown --- if not no_teardown: diff --git a/asap-tools/experiments/experiment_utils/config.py b/asap-tools/experiments/experiment_utils/config.py index a0520ee8..ecfaf20c 100644 --- a/asap-tools/experiments/experiment_utils/config.py +++ b/asap-tools/experiments/experiment_utils/config.py @@ -122,6 +122,21 @@ def _validate_clickhouse_experiment_config(experiment_params: DictConfig) -> Non print(" skip_querying=True means no queries will be executed") print("-" * 60) + # Validate sketchdb_query_groups sql files if present + sketchdb_groups = experiment_params.get("sketchdb_query_groups") or None + if sketchdb_groups and not skip_querying: + for i, group in enumerate(sketchdb_groups): + sql_file = group.get("sql_file") + if not sql_file or sql_file == "???": + raise ValueError( + f"sketchdb_query_groups[{i}] missing 'sql_file'. " + "Generate ASAP SQL files with benchmark/generate_queries.py first." + ) + if not os.path.exists(sql_file): + raise ValueError( + f"sketchdb_query_groups[{i}] sql_file={sql_file!r} does not exist." + ) + def validate_experiment_config( experiment_params: DictConfig, require_queries: bool = True @@ -663,6 +678,8 @@ def generate_clickhouse_client_configs( clickhouse_database: str = "default", clickhouse_user: str = "default", clickhouse_password: str = "", + sketchdb_query_groups: Any = None, + sketchdb_server_url: str = "http://localhost:8088/clickhouse/query", ) -> List[str]: """Generate prometheus-client config YAMLs for ClickHouse experiment modes. @@ -743,9 +760,132 @@ def generate_clickhouse_client_configs( with open(config_path, "w") as f: yaml.dump(config, f) + if sketchdb_query_groups is not None: + if isinstance(sketchdb_query_groups, (DictConfig, ListConfig)): + sketchdb_groups_list: List[Dict] = OmegaConf.to_container(sketchdb_query_groups, resolve=True) # type: ignore[assignment] + else: + sketchdb_groups_list = list(sketchdb_query_groups) + + built_sketchdb_groups = [] + for idx, group in enumerate(sketchdb_groups_list): + sql_file = group.get("sql_file") + if not sql_file: + name = group.get("name", str(idx)) + raise ValueError( + f"sketchdb query group {idx!r} ({name!r}) missing 'sql_file'" + ) + + queries = _load_sql_queries(sql_file) + if not queries: + raise ValueError( + f"No SQL statements found in sketchdb sql_file {sql_file!r}" + ) + + client_opts = dict(group.get("client_options") or {}) + client_opts.setdefault("starting_delay", 0) + client_opts.setdefault("repetitions", 1) + + built_sketchdb_groups.append( + { + "id": idx, + "queries": queries, + "repetition_delay": group.get("repetition_delay", 0), + "client_options": client_opts, + "time_window_seconds": group.get("time_window_seconds"), + } + ) + + sketchdb_config: Dict[str, Any] = { + "servers": [ + { + "name": constants.SKETCHDB_EXPERIMENT_NAME, + "url": sketchdb_server_url, + "protocol": "clickhouse", + "database": clickhouse_database, + "user": clickhouse_user, + "password": clickhouse_password, + } + ], + "query_groups": built_sketchdb_groups, + } + sketchdb_path = os.path.join( + output_dir, f"{constants.SKETCHDB_EXPERIMENT_NAME}.yaml" + ) + with open(sketchdb_path, "w") as f: + yaml.dump(sketchdb_config, f) + modes.append(constants.SKETCHDB_EXPERIMENT_NAME) + return modes +def generate_sql_planner_input(sketchdb_query_groups: Any, dataset_cfg: Any) -> str: + """Generate the YAML input file for asap-planner in SQL mode. + + The planner (``asap-planner --query-language sql``) reads a + ``SQLControllerConfig`` YAML that contains: + - ``tables``: schema of the tables being queried + - ``query_groups``: SQL queries with controller options + + This function builds that YAML from the experiment config so the runner + does not need a hand-authored planner input file. + + Args: + sketchdb_query_groups: ListConfig of sketchdb query group dicts. + Each entry must have ``sql_file``, ``repetition_delay``, and + ``controller_options`` (``accuracy_sla``, ``latency_sla``). + dataset_cfg: DictConfig with ``table``/``name``, and ``precompute`` + sub-config (``timestamp_col``, ``value_col``, ``label_cols``). + + Returns: + YAML string ready to write to disk and pass to asap-planner. + """ + precompute_cfg = dataset_cfg.precompute + table_name = str(dataset_cfg.get("table") or dataset_cfg.name) + value_col = str(precompute_cfg.value_col) + tables = [ + { + "name": table_name, + "time_column": str(precompute_cfg.timestamp_col), + "value_columns": [value_col], + "metadata_columns": list(precompute_cfg.label_cols), + } + ] + + if isinstance(sketchdb_query_groups, (DictConfig, ListConfig)): + groups_list = OmegaConf.to_container(sketchdb_query_groups, resolve=True) + else: + groups_list = list(sketchdb_query_groups) + + query_groups = [] + for idx, group in enumerate(groups_list): + sql_file = group.get("sql_file") + if not sql_file: + raise ValueError(f"sketchdb_query_groups[{idx}] missing 'sql_file'") + queries = _load_sql_queries(sql_file) + if not queries: + raise ValueError(f"No SQL statements found in {sql_file!r}") + + ctrl_opts = dict(group.get("controller_options") or {}) + query_groups.append( + { + "id": idx + 1, + "repetition_delay": int(group.get("repetition_delay", 0)), + "queries": queries, + "controller_options": { + "accuracy_sla": float(ctrl_opts.get("accuracy_sla", 0.95)), + "latency_sla": float(ctrl_opts.get("latency_sla", 100.0)), + }, + } + ) + + planner_input = { + "tables": tables, + "query_groups": query_groups, + "aggregate_cleanup": {"policy": "read_based"}, + } + return yaml.dump(planner_input, default_flow_style=False, allow_unicode=True) + + def generate_and_copy_prometheus_config( num_nodes_in_experiment, local_experiment_dir, diff --git a/asap-tools/experiments/experiment_utils/services/clickhouse_service.py b/asap-tools/experiments/experiment_utils/services/clickhouse_service.py index 07dc627c..36112eb0 100644 --- a/asap-tools/experiments/experiment_utils/services/clickhouse_service.py +++ b/asap-tools/experiments/experiment_utils/services/clickhouse_service.py @@ -408,30 +408,28 @@ def _load_clickbench( ) -> None: """Stream a JSON-lines file (optionally gzipped) into ClickHouse.""" print(f"Loading ClickBench data from {remote_data_file!r}...") - insert_url = "{}?query=INSERT+INTO+{}+FORMAT+JSONEachRow".format( - url.rstrip("/") + "/", table - ) file_lower = remote_data_file.lower() is_gz = file_lower.endswith(".json.gz") or file_lower.endswith(".jsonl.gz") + insert_sql = shlex.quote(f"INSERT INTO {table} FORMAT JSONEachRow") if is_gz: if max_rows > 0: - cmd = "zcat {} | head -n {} | curl -sS {} --data-binary @-".format( - shlex.quote(remote_data_file), max_rows, shlex.quote(insert_url) + reader = "zcat {} | head -n {}".format( + shlex.quote(remote_data_file), max_rows ) else: - cmd = "zcat {} | curl -sS {} --data-binary @-".format( - shlex.quote(remote_data_file), shlex.quote(insert_url) - ) + reader = "zcat {}".format(shlex.quote(remote_data_file)) else: if max_rows > 0: - cmd = "head -n {} {} | curl -sS {} --data-binary @-".format( - max_rows, shlex.quote(remote_data_file), shlex.quote(insert_url) - ) + reader = "head -n {} {}".format(max_rows, shlex.quote(remote_data_file)) else: - cmd = "curl -sS {} --data-binary @{}".format( - shlex.quote(insert_url), shlex.quote(remote_data_file) - ) + reader = "cat {}".format(shlex.quote(remote_data_file)) + + cmd = ( + "{} | docker exec -i clickhouse-server clickhouse-client --query {}".format( + reader, insert_sql + ) + ) result = self.provider.execute_command( node_idx=self.node_offset, @@ -483,37 +481,35 @@ def _load_custom( ) -> None: """Stream a custom JSON-lines file (plain or gzipped) into ClickHouse.""" print(f"Loading custom data from {remote_data_file!r} into {table!r}...") - insert_url = "{}?query=INSERT+INTO+{}+FORMAT+JSONEachRow".format( - url.rstrip("/") + "/", table - ) file_lower = remote_data_file.lower() is_gz = file_lower.endswith(".json.gz") or file_lower.endswith(".jsonl.gz") is_json = file_lower.endswith(".json") or file_lower.endswith(".jsonl") + insert_sql = shlex.quote(f"INSERT INTO {table} FORMAT JSONEachRow") if is_gz: if max_rows > 0: - cmd = "zcat {} | head -n {} | curl -sS {} --data-binary @-".format( - shlex.quote(remote_data_file), max_rows, shlex.quote(insert_url) + reader = "zcat {} | head -n {}".format( + shlex.quote(remote_data_file), max_rows ) else: - cmd = "zcat {} | curl -sS {} --data-binary @-".format( - shlex.quote(remote_data_file), shlex.quote(insert_url) - ) + reader = "zcat {}".format(shlex.quote(remote_data_file)) elif is_json: if max_rows > 0: - cmd = "head -n {} {} | curl -sS {} --data-binary @-".format( - max_rows, shlex.quote(remote_data_file), shlex.quote(insert_url) - ) + reader = "head -n {} {}".format(max_rows, shlex.quote(remote_data_file)) else: - cmd = "curl -sS {} --data-binary @{}".format( - shlex.quote(insert_url), shlex.quote(remote_data_file) - ) + reader = "cat {}".format(shlex.quote(remote_data_file)) else: raise ValueError( f"Unsupported file format for {remote_data_file!r}. " "Use dataset_name='h2o' for CSV files." ) + cmd = ( + "{} | docker exec -i clickhouse-server clickhouse-client --query {}".format( + reader, insert_sql + ) + ) + result = self.provider.execute_command( node_idx=self.node_offset, cmd=cmd, diff --git a/asap-tools/experiments/experiment_utils/services/misc.py b/asap-tools/experiments/experiment_utils/services/misc.py index 3c37da99..40e28ed0 100644 --- a/asap-tools/experiments/experiment_utils/services/misc.py +++ b/asap-tools/experiments/experiment_utils/services/misc.py @@ -5,6 +5,7 @@ import os import random import subprocess +from typing import Optional import constants from .base import BaseService @@ -194,65 +195,77 @@ def __init__( def start( self, controller_input_file: str, - prometheus_scrape_interval: int, streaming_engine: str, controller_remote_output_dir: str, punting: bool, - prometheus_url: str, + prometheus_scrape_interval: Optional[int] = None, + prometheus_url: Optional[str] = None, + query_language: str = "promql", + data_ingestion_interval: Optional[int] = None, **kwargs, ) -> None: """ - Start the controller. + Start the controller (asap-planner). Args: controller_input_file: Path to controller input configuration - prometheus_scrape_interval: Prometheus scraping interval streaming_engine: Type of streaming engine controller_remote_output_dir: Controller output directory punting: Enable query punting based on performance heuristics - prometheus_url: Base URL of the Prometheus instance for metric label inference + prometheus_scrape_interval: Required for PromQL mode + prometheus_url: Prometheus URL for label discovery (PromQL mode only) + query_language: 'promql' (default) or 'sql' + data_ingestion_interval: Required for SQL mode (seconds) **kwargs: Additional configuration """ if self.use_container: return self._start_containerized( controller_input_file, - prometheus_scrape_interval, streaming_engine, controller_remote_output_dir, punting, + prometheus_scrape_interval, prometheus_url, + query_language, + data_ingestion_interval, ) else: return self._start_bare_metal( controller_input_file, - prometheus_scrape_interval, streaming_engine, controller_remote_output_dir, punting, + prometheus_scrape_interval, prometheus_url, + query_language, + data_ingestion_interval, ) def _start_bare_metal( self, controller_input_file: str, - prometheus_scrape_interval: int, streaming_engine: str, controller_remote_output_dir: str, punting: bool, - prometheus_url: str, + prometheus_scrape_interval: Optional[int], + prometheus_url: Optional[str], + query_language: str, + data_ingestion_interval: Optional[int], ) -> None: controller_log = os.path.join(controller_remote_output_dir, "controller.log") cmd = ( - "./target/release/asap-planner" - " --input_config {} --prometheus_scrape_interval {} --output_dir {}" - " --streaming_engine {} --prometheus-url {}" - ).format( - controller_input_file, - prometheus_scrape_interval, - controller_remote_output_dir, - streaming_engine, - prometheus_url, + f"../target/release/asap-planner" + f" --input_config {controller_input_file}" + f" --output_dir {controller_remote_output_dir}" + f" --streaming_engine {streaming_engine}" + f" --query-language {query_language}" ) + if prometheus_scrape_interval is not None: + cmd += f" --prometheus_scrape_interval {prometheus_scrape_interval}" + if prometheus_url: + cmd += f" --prometheus-url {prometheus_url}" + if data_ingestion_interval is not None: + cmd += f" --data-ingestion-interval {data_ingestion_interval}" if punting: cmd += " --enable-punting" cmd += f" > {controller_log} 2>&1" @@ -269,11 +282,13 @@ def _start_bare_metal( def _start_containerized( self, controller_input_file: str, - prometheus_scrape_interval: int, streaming_engine: str, controller_remote_output_dir: str, punting: bool, - prometheus_url: str, + prometheus_scrape_interval: Optional[int], + prometheus_url: Optional[str], + query_language: str, + data_ingestion_interval: Optional[int], ): controller_dir = os.path.join( self.provider.get_home_dir(), "code", "asap-planner-rs" @@ -299,9 +314,16 @@ def _start_containerized( generate_cmd += f" --container-name {self.container_name}" generate_cmd += f" --input-config-path {controller_input_file}" generate_cmd += f" --controller-output-dir {controller_remote_output_dir}" - generate_cmd += f" --prometheus-scrape-interval {prometheus_scrape_interval}" generate_cmd += f" --streaming-engine {streaming_engine}" - generate_cmd += f" --prometheus-url {prometheus_url}" + generate_cmd += f" --query-language {query_language}" + if prometheus_scrape_interval is not None: + generate_cmd += ( + f" --prometheus-scrape-interval {prometheus_scrape_interval}" + ) + if prometheus_url: + generate_cmd += f" --prometheus-url {prometheus_url}" + if data_ingestion_interval is not None: + generate_cmd += f" --data-ingestion-interval {data_ingestion_interval}" if punting: generate_cmd += " --punting" diff --git a/asap-tools/experiments/experiment_utils/services/query_engine.py b/asap-tools/experiments/experiment_utils/services/query_engine.py index 9e97ad02..57210369 100644 --- a/asap-tools/experiments/experiment_utils/services/query_engine.py +++ b/asap-tools/experiments/experiment_utils/services/query_engine.py @@ -4,6 +4,7 @@ import os import subprocess +from typing import Optional import yaml @@ -84,6 +85,7 @@ def _build_engine_config( lock_strategy: str, profile_query_engine: bool, kafka_broker: str, + ingest_json_config: Optional[dict], ) -> dict: """ Build an EngineConfig dict matching asap-query-engine's engine_config.rs schema. @@ -122,10 +124,13 @@ def _build_engine_config( "decompress_json": compress_json, } elif streaming_engine == "precompute": - ingest = { - "type": "http_remote_write", - "port": remote_write_port, - } + if ingest_json_config is not None: + ingest = {"type": "json", **ingest_json_config} + else: + ingest = { + "type": "http_remote_write", + "port": remote_write_port, + } else: raise ValueError( f"streaming_engine='{streaming_engine}' is not supported by the Rust query engine. " @@ -207,6 +212,7 @@ def start( backend_config: dict, http_port: int, remote_write_port: int = 8080, + ingest_json_config: Optional[dict] = None, ) -> None: """ Start the Rust query engine. @@ -251,6 +257,7 @@ def start( remote_write_port, dump_precomputes, lock_strategy, + ingest_json_config, ) else: self._start_bare_metal( @@ -269,6 +276,7 @@ def start( remote_write_port, dump_precomputes, lock_strategy, + ingest_json_config, ) def _start_bare_metal( @@ -288,6 +296,7 @@ def _start_bare_metal( remote_write_port: int, dump_precomputes: bool, lock_strategy: str, + ingest_json_config: Optional[dict], ) -> None: """Start Rust QueryEngine using bare metal deployment.""" output_dir = os.path.join(experiment_output_dir, "query_engine_output") @@ -308,6 +317,7 @@ def _start_bare_metal( lock_strategy=lock_strategy, profile_query_engine=profile_query_engine, kafka_broker=f"{self.provider.get_node_ip(self.node_offset)}:9092", + ingest_json_config=ingest_json_config, ) self._write_engine_config_to_remote( config_dict=config, @@ -350,6 +360,7 @@ def _start_containerized( remote_write_port: int, dump_precomputes: bool, lock_strategy: str, + ingest_json_config: Optional[dict], ) -> None: """Start Rust QueryEngine using containerized deployment with Jinja template.""" output_dir = os.path.join(experiment_output_dir, "query_engine_output") @@ -376,6 +387,7 @@ def _start_containerized( lock_strategy=lock_strategy, profile_query_engine=profile_query_engine, kafka_broker=f"{self.provider.get_node_ip(self.node_offset)}:9092", + ingest_json_config=ingest_json_config, ) # Write the config to the host path that is volume-mounted as /app/outputs, # so the container finds it at /app/outputs/engine_config.yaml. diff --git a/asap-tools/experiments/experiment_utils/sync.py b/asap-tools/experiments/experiment_utils/sync.py index 3daf92bd..3a44a35b 100644 --- a/asap-tools/experiments/experiment_utils/sync.py +++ b/asap-tools/experiments/experiment_utils/sync.py @@ -153,6 +153,41 @@ def rsync_dataset_file( return os.path.join(remote_dir, os.path.basename(local_data_file)) +def rsync_streaming_configs( + provider: InfrastructureProvider, + local_dir: str, + remote_dir: str, + node_offset: int, +) -> None: + """Rsync a local directory of streaming config files (streaming_config.yaml, + inference_config.yaml) to a remote node. + + Follows the same pattern as rsync_controller_client_configs. + + Args: + provider: Infrastructure provider. + local_dir: Local directory containing the config files. + remote_dir: Absolute remote directory path (created if absent). + node_offset: Index of the target node. + """ + hostname = f"node{node_offset}.{provider.hostname_suffix}" + provider.execute_command( + node_idx=node_offset, + cmd=f"mkdir -p {remote_dir}", + cmd_dir=None, + nohup=False, + popen=False, + ) + cmd = 'rsync -azh -e "ssh {}" {}/ {}@{}:{}/'.format( + constants.SSH_OPTIONS, + local_dir, + provider.username, + hostname, + remote_dir, + ) + utils.run_cmd_with_retry(cmd, popen=False, ignore_errors=False) + + def copy_experiment_config(experiment_params, local_experiment_dir: str): """Save the experiment config to local directory for reference.""" os.makedirs(os.path.join(local_experiment_dir, "experiment_config"), exist_ok=True) From 37f973291ebc6765278cf9afc9af90b5806079d5 Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Tue, 26 May 2026 17:14:22 -0400 Subject: [PATCH 4/5] bug fixes --- .../src/drivers/query/servers/http.rs | 23 ++++++-- .../src/precompute_engine/ingest_source.rs | 36 ++++++++++++ .../src/precompute_engine/json_ingest.rs | 55 ++++++++++++++----- .../benchmark/generate_queries.py | 3 - .../experiments/experiment_run_clickhouse.py | 48 +++++++++++++++- 5 files changed, 142 insertions(+), 23 deletions(-) diff --git a/asap-query-engine/src/drivers/query/servers/http.rs b/asap-query-engine/src/drivers/query/servers/http.rs index a3fb49a3..bd144d2d 100644 --- a/asap-query-engine/src/drivers/query/servers/http.rs +++ b/asap-query-engine/src/drivers/query/servers/http.rs @@ -324,7 +324,7 @@ async fn handle_instant_query_post( .and_then(|v| v.to_str().ok()) .unwrap_or(""); - debug!("Content-Type: {}", content_type); + debug!("POST content-type: '{}'", content_type); let parsed_request = if content_type.contains("application/json") { // Handle JSON POST (Elasticsearch) @@ -369,11 +369,24 @@ async fn handle_instant_query_post( } }; - // Parse form parameters - let params: HashMap = form_urlencoded::parse(body_str.as_bytes()) + debug!( + "POST body prefix (first 200 chars): '{}'", + &body_str.chars().take(200).collect::() + ); + + // Parse form parameters, falling back to treating the raw body as the + // SQL query if no "query" key is found (ClickHouse native HTTP style). + let mut params: HashMap = form_urlencoded::parse(body_str.as_bytes()) .into_owned() .collect(); - debug!("Form params extracted: {:?}", params); + if !params.contains_key("query") && !body_str.is_empty() { + params.clear(); + params.insert("query".to_string(), body_str.clone()); + } + debug!( + "POST form params keys: {:?}", + params.keys().collect::>() + ); // Use adapter to parse POST request (handles form-encoded parameters) match state.adapter.parse_post_request(Form(params)).await { @@ -385,7 +398,7 @@ async fn handle_instant_query_post( req } Err(parse_error) => { - debug!("Failed to parse POST request: {:?}", parse_error); + debug!("POST parse failed: {}", parse_error); return match state.adapter.format_error_response(&parse_error).await { Ok(json) => json.into_response(), Err(status) => status.into_response(), diff --git a/asap-query-engine/src/precompute_engine/ingest_source.rs b/asap-query-engine/src/precompute_engine/ingest_source.rs index 8e155489..384b6403 100644 --- a/asap-query-engine/src/precompute_engine/ingest_source.rs +++ b/asap-query-engine/src/precompute_engine/ingest_source.rs @@ -4,8 +4,10 @@ use crate::precompute_engine::worker::{extract_metric_name, parse_labels_from_se use arc_swap::ArcSwap; use asap_types::aggregation_config::AggregationConfig; use std::collections::HashMap; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Instant; +use tracing::{debug, warn}; /// Everything a source needs to push decoded samples into the worker pool. #[derive(Clone)] @@ -89,15 +91,42 @@ pub(crate) async fn route_decoded_samples( // Load agg_configs once per request (lock-free ArcSwap read). let agg_configs = ctx.agg_configs.load(); + + // On first batch: log config metrics vs sample metric to diagnose mismatches. + static FIRST_BATCH_LOGGED: AtomicBool = AtomicBool::new(false); + if !FIRST_BATCH_LOGGED.swap(true, Ordering::Relaxed) { + if let Some(first) = samples.first() { + let sample_metric = extract_metric_name(&first.labels); + warn!( + sample_metric, + sample_labels = %first.labels, + num_agg_configs = agg_configs.len(), + "routing: first batch diagnostic" + ); + for cfg in agg_configs.iter() { + warn!( + agg_id = cfg.aggregation_id, + config_metric = %cfg.metric, + config_spatial_filter = %cfg.spatial_filter, + table_name = ?cfg.table_name, + "routing: agg config metric" + ); + } + } + } + + let mut matched_samples: usize = 0; for s in &samples { let metric_name = extract_metric_name(&s.labels); for config in agg_configs.iter() { if config.metric != metric_name && config.spatial_filter_normalized != metric_name && config.spatial_filter != metric_name + && config.table_name.as_deref() != Some(metric_name) { continue; } + matched_samples += 1; let group_key = extract_group_key(&s.labels, config); by_group .entry((config.aggregation_id, group_key)) @@ -106,6 +135,13 @@ pub(crate) async fn route_decoded_samples( } } + debug!( + total_samples = samples.len(), + matched_samples, + groups_formed = by_group.len(), + "routing: batch match summary" + ); + let messages: Vec = by_group .into_iter() .map( diff --git a/asap-query-engine/src/precompute_engine/json_ingest.rs b/asap-query-engine/src/precompute_engine/json_ingest.rs index f5d7dc27..598596ac 100644 --- a/asap-query-engine/src/precompute_engine/json_ingest.rs +++ b/asap-query-engine/src/precompute_engine/json_ingest.rs @@ -1,5 +1,6 @@ use crate::drivers::ingest::prometheus_remote_write::DecodedSample; use crate::precompute_engine::ingest_source::{route_decoded_samples, IngestContext, IngestSource}; +use chrono::NaiveDateTime; use std::time::Instant; use tracing::info; @@ -110,23 +111,38 @@ impl IngestSource for JsonFileIngestSource { .into() })?; - let raw_ts: i64 = obj - .get(&config.timestamp_col) - .ok_or_else(|| -> Box { + let ts_val = obj.get(&config.timestamp_col).ok_or_else( + || -> Box { std::io::Error::other(format!( "timestamp column '{}' not found in JSON object", config.timestamp_col )) .into() - })? - .as_i64() - .ok_or_else(|| -> Box { - std::io::Error::other(format!( - "timestamp column '{}' is not an integer", - config.timestamp_col - )) - .into() - })?; + }, + )?; + + // Accept integer (Unix epoch) or string datetime "YYYY-MM-DD HH:MM:SS". + let raw_ts: i64 = if let Some(i) = ts_val.as_i64() { + i + } else if let Some(s) = ts_val.as_str() { + NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") + .map_err(|e| -> Box { + std::io::Error::other(format!( + "timestamp column '{}' value {:?} is not an integer or \ + a parseable datetime string: {}", + config.timestamp_col, s, e + )) + .into() + })? + .and_utc() + .timestamp() + } else { + return Err(std::io::Error::other(format!( + "timestamp column '{}' is not an integer or string", + config.timestamp_col + )) + .into()); + }; let timestamp_ms = config.timestamp_unit.to_ms(raw_ts); @@ -140,7 +156,20 @@ impl IngestSource for JsonFileIngestSource { if i > 0 { s.push(','); } - let val = obj.get(col).and_then(|v| v.as_str()).unwrap_or(""); + let val_owned; + let val = if let Some(s) = obj.get(col).and_then(|v| v.as_str()) { + s + } else if let Some(v) = obj.get(col) { + val_owned = v.to_string(); + val_owned.as_str() + } else { + return Err(std::io::Error::other(format!( + "label column '{}' not found in JSON object (row {})", + col, + row_count + 1 + )) + .into()); + }; s.push_str(col); s.push_str("=\""); s.push_str(val); diff --git a/asap-tools/execution-utilities/benchmark/generate_queries.py b/asap-tools/execution-utilities/benchmark/generate_queries.py index eb1b5d4e..5923338d 100644 --- a/asap-tools/execution-utilities/benchmark/generate_queries.py +++ b/asap-tools/execution-utilities/benchmark/generate_queries.py @@ -240,15 +240,12 @@ def generate_sql_file( start_str = (end_ts - timedelta(seconds=window_size)).strftime( "%Y-%m-%d %H:%M:%S" ) - label = f"T{i:03d}" - if window_form == "dateadd": where = f"{ts_column} BETWEEN DATEADD(s, -{window_size}, '{end_str}') AND '{end_str}'" else: where = f"{ts_column} BETWEEN '{start_str}' AND '{end_str}'" lines.append( - f"-- {label}: quantile window ending at {end_str}\n" f"SELECT quantile({quantile})({value_column}) FROM {table_name} " f"WHERE {where} GROUP BY {group_by_clause};" ) diff --git a/asap-tools/experiments/experiment_run_clickhouse.py b/asap-tools/experiments/experiment_run_clickhouse.py index 68c13050..1a59898a 100644 --- a/asap-tools/experiments/experiment_run_clickhouse.py +++ b/asap-tools/experiments/experiment_run_clickhouse.py @@ -93,6 +93,7 @@ from urllib.parse import urlparse import hydra +import yaml from omegaconf import DictConfig, OmegaConf import constants @@ -106,6 +107,39 @@ from experiment_utils.services.misc import ControllerService from experiment_utils.services.query_engine import QueryEngineRustService + +def _inline_sql_queries_in_experiment_config(local_experiment_root_dir: str) -> None: + """Enrich the saved experiment_params.yaml by inlining SQL from sql_file references. + + Downstream analysis scripts expect query_groups[i]["queries"] to be a list + of query strings. The clickhouse config stores sql_file paths instead, so + we read each file and add the queries in-place before the scripts run. + """ + config_path = os.path.join( + local_experiment_root_dir, "experiment_config", "experiment_params.yaml" + ) + if not os.path.exists(config_path): + return + with open(config_path) as f: + data = yaml.safe_load(f) + + def _expand_groups(groups): + if not groups: + return + for group in groups: + sql_file = group.get("sql_file") + if sql_file and "queries" not in group: + with open(sql_file) as fq: + content = fq.read() + group["queries"] = [s.strip() for s in content.split(";") if s.strip()] + + _expand_groups(data.get("query_groups")) + _expand_groups(data.get("sketchdb_query_groups")) + + with open(config_path, "w") as f: + yaml.dump(data, f, allow_unicode=True) + + # Register resolvers used by config.yaml interpolation. OmegaConf.register_new_resolver( "local_experiment_dir", lambda: constants.LOCAL_EXPERIMENT_DIR @@ -234,6 +268,7 @@ def main(cfg: DictConfig) -> None: ) sync.copy_experiment_config(cfg.experiment_params, local_experiment_root_dir) + _inline_sql_queries_in_experiment_config(local_experiment_root_dir) # --- dataset config --- ep = cfg.experiment_params @@ -339,6 +374,11 @@ def main(cfg: DictConfig) -> None: if experiment_mode == constants.SKETCHDB_EXPERIMENT_NAME: # --- sketchdb mode: precompute engine + JSON ingest + ClickHouse fallback --- + # Kill any leftover query_engine_rust from a previous run (mirrors + # query_engine_service.stop() at the top of the e2e mode loop). + QueryEngineRustService( + provider=provider, use_container=False, node_offset=node_offset + ).stop() # Mirrors experiment_run_e2e.py: planner runs first and generates # streaming_config.yaml + inference_config.yaml into controller_output_dir, # then the query engine starts reading from that same directory. @@ -438,7 +478,9 @@ def main(cfg: DictConfig) -> None: provider=provider, node_offset=node_offset, config_file=controller_client_config, - output_dir=experiment_output_dir, + output_dir=os.path.join( + experiment_output_dir, "prometheus_client_output" + ), use_container=use_container, parallel=parallel, ) @@ -465,7 +507,9 @@ def main(cfg: DictConfig) -> None: provider=provider, node_offset=node_offset, config_file=controller_client_config, - output_dir=experiment_output_dir, + output_dir=os.path.join( + experiment_output_dir, "prometheus_client_output" + ), use_container=use_container, parallel=parallel, ) From 91ea886c3d8bc5b97c53821a7cc323427ae1cf92 Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Tue, 26 May 2026 22:11:01 -0400 Subject: [PATCH 5/5] Updated experiment config format --- .../clickhouse_quantile_queries.sql | 530 ------------------ asap-tools/experiments/config/config.yaml | 5 - .../experiments/experiment_run_clickhouse.py | 161 +++--- .../experiments/experiment_utils/config.py | 128 +---- 4 files changed, 96 insertions(+), 728 deletions(-) delete mode 100644 asap-tools/execution-utilities/asap_query_latency/clickhouse_quantile_queries.sql diff --git a/asap-tools/execution-utilities/asap_query_latency/clickhouse_quantile_queries.sql b/asap-tools/execution-utilities/asap_query_latency/clickhouse_quantile_queries.sql deleted file mode 100644 index 7c7ced4f..00000000 --- a/asap-tools/execution-utilities/asap_query_latency/clickhouse_quantile_queries.sql +++ /dev/null @@ -1,530 +0,0 @@ --- T000: quantile window ending at 2013-07-01 20:00:00 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-01 19:50:00' AND '2013-07-01 20:00:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T001: quantile window ending at 2013-07-01 20:13:16 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-01 20:03:16' AND '2013-07-01 20:13:16' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T002: quantile window ending at 2013-07-01 20:26:32 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-01 20:16:32' AND '2013-07-01 20:26:32' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T003: quantile window ending at 2013-07-01 20:39:48 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-01 20:29:48' AND '2013-07-01 20:39:48' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T004: quantile window ending at 2013-07-01 20:53:04 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-01 20:43:04' AND '2013-07-01 20:53:04' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T005: quantile window ending at 2013-07-01 21:06:20 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-01 20:56:20' AND '2013-07-01 21:06:20' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T006: quantile window ending at 2013-07-01 21:19:36 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-01 21:09:36' AND '2013-07-01 21:19:36' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T007: quantile window ending at 2013-07-01 21:32:52 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-01 21:22:52' AND '2013-07-01 21:32:52' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T008: quantile window ending at 2013-07-01 21:46:08 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-01 21:36:08' AND '2013-07-01 21:46:08' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T009: quantile window ending at 2013-07-01 21:59:24 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-01 21:49:24' AND '2013-07-01 21:59:24' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T010: quantile window ending at 2013-07-01 22:12:40 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-01 22:02:40' AND '2013-07-01 22:12:40' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T011: quantile window ending at 2013-07-01 22:25:56 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-01 22:15:56' AND '2013-07-01 22:25:56' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T012: quantile window ending at 2013-07-01 22:39:12 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-01 22:29:12' AND '2013-07-01 22:39:12' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T013: quantile window ending at 2013-07-01 22:52:28 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-01 22:42:28' AND '2013-07-01 22:52:28' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T014: quantile window ending at 2013-07-01 23:05:44 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-01 22:55:44' AND '2013-07-01 23:05:44' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T015: quantile window ending at 2013-07-01 23:19:00 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-01 23:09:00' AND '2013-07-01 23:19:00' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T016: quantile window ending at 2013-07-01 23:32:16 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-01 23:22:16' AND '2013-07-01 23:32:16' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T017: quantile window ending at 2013-07-01 23:45:32 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-01 23:35:32' AND '2013-07-01 23:45:32' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T018: quantile window ending at 2013-07-01 23:58:48 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-01 23:48:48' AND '2013-07-01 23:58:48' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T019: quantile window ending at 2013-07-02 00:12:04 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 00:02:04' AND '2013-07-02 00:12:04' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T020: quantile window ending at 2013-07-02 00:25:20 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 00:15:20' AND '2013-07-02 00:25:20' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T021: quantile window ending at 2013-07-02 00:38:36 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 00:28:36' AND '2013-07-02 00:38:36' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T022: quantile window ending at 2013-07-02 00:51:52 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 00:41:52' AND '2013-07-02 00:51:52' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T023: quantile window ending at 2013-07-02 01:05:08 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 00:55:08' AND '2013-07-02 01:05:08' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T024: quantile window ending at 2013-07-02 01:18:24 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 01:08:24' AND '2013-07-02 01:18:24' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T025: quantile window ending at 2013-07-02 01:31:40 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 01:21:40' AND '2013-07-02 01:31:40' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T026: quantile window ending at 2013-07-02 01:44:56 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 01:34:56' AND '2013-07-02 01:44:56' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T027: quantile window ending at 2013-07-02 01:58:12 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 01:48:12' AND '2013-07-02 01:58:12' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T028: quantile window ending at 2013-07-02 02:11:28 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 02:01:28' AND '2013-07-02 02:11:28' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T029: quantile window ending at 2013-07-02 02:24:45 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 02:14:45' AND '2013-07-02 02:24:45' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T030: quantile window ending at 2013-07-02 02:38:01 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 02:28:01' AND '2013-07-02 02:38:01' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T031: quantile window ending at 2013-07-02 02:51:17 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 02:41:17' AND '2013-07-02 02:51:17' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T032: quantile window ending at 2013-07-02 03:04:33 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 02:54:33' AND '2013-07-02 03:04:33' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T033: quantile window ending at 2013-07-02 03:17:49 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 03:07:49' AND '2013-07-02 03:17:49' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T034: quantile window ending at 2013-07-02 03:31:05 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 03:21:05' AND '2013-07-02 03:31:05' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T035: quantile window ending at 2013-07-02 03:44:21 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 03:34:21' AND '2013-07-02 03:44:21' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T036: quantile window ending at 2013-07-02 03:57:37 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 03:47:37' AND '2013-07-02 03:57:37' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T037: quantile window ending at 2013-07-02 04:10:53 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 04:00:53' AND '2013-07-02 04:10:53' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T038: quantile window ending at 2013-07-02 04:24:09 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 04:14:09' AND '2013-07-02 04:24:09' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T039: quantile window ending at 2013-07-02 04:37:25 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 04:27:25' AND '2013-07-02 04:37:25' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T040: quantile window ending at 2013-07-02 04:50:41 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 04:40:41' AND '2013-07-02 04:50:41' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T041: quantile window ending at 2013-07-02 05:03:57 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 04:53:57' AND '2013-07-02 05:03:57' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T042: quantile window ending at 2013-07-02 05:17:13 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 05:07:13' AND '2013-07-02 05:17:13' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T043: quantile window ending at 2013-07-02 05:30:29 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 05:20:29' AND '2013-07-02 05:30:29' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T044: quantile window ending at 2013-07-02 05:43:45 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 05:33:45' AND '2013-07-02 05:43:45' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T045: quantile window ending at 2013-07-02 05:57:01 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 05:47:01' AND '2013-07-02 05:57:01' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T046: quantile window ending at 2013-07-02 06:10:17 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 06:00:17' AND '2013-07-02 06:10:17' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T047: quantile window ending at 2013-07-02 06:23:33 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 06:13:33' AND '2013-07-02 06:23:33' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T048: quantile window ending at 2013-07-02 06:36:49 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 06:26:49' AND '2013-07-02 06:36:49' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T049: quantile window ending at 2013-07-02 06:50:05 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 06:40:05' AND '2013-07-02 06:50:05' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T050: quantile window ending at 2013-07-02 07:03:21 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 06:53:21' AND '2013-07-02 07:03:21' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T051: quantile window ending at 2013-07-02 07:16:37 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 07:06:37' AND '2013-07-02 07:16:37' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T052: quantile window ending at 2013-07-02 07:29:53 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 07:19:53' AND '2013-07-02 07:29:53' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T053: quantile window ending at 2013-07-02 07:43:09 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 07:33:09' AND '2013-07-02 07:43:09' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T054: quantile window ending at 2013-07-02 07:56:25 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 07:46:25' AND '2013-07-02 07:56:25' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T055: quantile window ending at 2013-07-02 08:09:41 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 07:59:41' AND '2013-07-02 08:09:41' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T056: quantile window ending at 2013-07-02 08:22:57 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 08:12:57' AND '2013-07-02 08:22:57' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T057: quantile window ending at 2013-07-02 08:36:13 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 08:26:13' AND '2013-07-02 08:36:13' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T058: quantile window ending at 2013-07-02 08:49:30 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 08:39:30' AND '2013-07-02 08:49:30' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T059: quantile window ending at 2013-07-02 09:02:46 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 08:52:46' AND '2013-07-02 09:02:46' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T060: quantile window ending at 2013-07-02 09:16:02 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 09:06:02' AND '2013-07-02 09:16:02' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T061: quantile window ending at 2013-07-02 09:29:18 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 09:19:18' AND '2013-07-02 09:29:18' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T062: quantile window ending at 2013-07-02 09:42:34 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 09:32:34' AND '2013-07-02 09:42:34' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T063: quantile window ending at 2013-07-02 09:55:50 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 09:45:50' AND '2013-07-02 09:55:50' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T064: quantile window ending at 2013-07-02 10:09:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 09:59:06' AND '2013-07-02 10:09:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T065: quantile window ending at 2013-07-02 10:22:22 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 10:12:22' AND '2013-07-02 10:22:22' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T066: quantile window ending at 2013-07-02 10:35:38 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 10:25:38' AND '2013-07-02 10:35:38' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T067: quantile window ending at 2013-07-02 10:48:54 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 10:38:54' AND '2013-07-02 10:48:54' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T068: quantile window ending at 2013-07-02 11:02:10 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 10:52:10' AND '2013-07-02 11:02:10' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T069: quantile window ending at 2013-07-02 11:15:26 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 11:05:26' AND '2013-07-02 11:15:26' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T070: quantile window ending at 2013-07-02 11:28:42 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 11:18:42' AND '2013-07-02 11:28:42' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T071: quantile window ending at 2013-07-02 11:41:58 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 11:31:58' AND '2013-07-02 11:41:58' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T072: quantile window ending at 2013-07-02 11:55:14 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 11:45:14' AND '2013-07-02 11:55:14' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T073: quantile window ending at 2013-07-02 12:08:30 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 11:58:30' AND '2013-07-02 12:08:30' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T074: quantile window ending at 2013-07-02 12:21:46 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 12:11:46' AND '2013-07-02 12:21:46' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T075: quantile window ending at 2013-07-02 12:35:02 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 12:25:02' AND '2013-07-02 12:35:02' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T076: quantile window ending at 2013-07-02 12:48:18 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 12:38:18' AND '2013-07-02 12:48:18' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T077: quantile window ending at 2013-07-02 13:01:34 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 12:51:34' AND '2013-07-02 13:01:34' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T078: quantile window ending at 2013-07-02 13:14:50 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 13:04:50' AND '2013-07-02 13:14:50' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T079: quantile window ending at 2013-07-02 13:28:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 13:18:06' AND '2013-07-02 13:28:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T080: quantile window ending at 2013-07-02 13:41:22 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 13:31:22' AND '2013-07-02 13:41:22' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T081: quantile window ending at 2013-07-02 13:54:38 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 13:44:38' AND '2013-07-02 13:54:38' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T082: quantile window ending at 2013-07-02 14:07:54 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 13:57:54' AND '2013-07-02 14:07:54' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T083: quantile window ending at 2013-07-02 14:21:10 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 14:11:10' AND '2013-07-02 14:21:10' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T084: quantile window ending at 2013-07-02 14:34:26 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 14:24:26' AND '2013-07-02 14:34:26' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T085: quantile window ending at 2013-07-02 14:47:42 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 14:37:42' AND '2013-07-02 14:47:42' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T086: quantile window ending at 2013-07-02 15:00:59 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 14:50:59' AND '2013-07-02 15:00:59' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T087: quantile window ending at 2013-07-02 15:14:15 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 15:04:15' AND '2013-07-02 15:14:15' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T088: quantile window ending at 2013-07-02 15:27:31 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 15:17:31' AND '2013-07-02 15:27:31' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T089: quantile window ending at 2013-07-02 15:40:47 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 15:30:47' AND '2013-07-02 15:40:47' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T090: quantile window ending at 2013-07-02 15:54:03 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 15:44:03' AND '2013-07-02 15:54:03' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T091: quantile window ending at 2013-07-02 16:07:19 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 15:57:19' AND '2013-07-02 16:07:19' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T092: quantile window ending at 2013-07-02 16:20:35 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 16:10:35' AND '2013-07-02 16:20:35' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T093: quantile window ending at 2013-07-02 16:33:51 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 16:23:51' AND '2013-07-02 16:33:51' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T094: quantile window ending at 2013-07-02 16:47:07 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 16:37:07' AND '2013-07-02 16:47:07' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T095: quantile window ending at 2013-07-02 17:00:23 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 16:50:23' AND '2013-07-02 17:00:23' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T096: quantile window ending at 2013-07-02 17:13:39 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 17:03:39' AND '2013-07-02 17:13:39' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T097: quantile window ending at 2013-07-02 17:26:55 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 17:16:55' AND '2013-07-02 17:26:55' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T098: quantile window ending at 2013-07-02 17:40:11 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 17:30:11' AND '2013-07-02 17:40:11' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T099: quantile window ending at 2013-07-02 17:53:27 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 17:43:27' AND '2013-07-02 17:53:27' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T100: quantile window ending at 2013-07-02 18:06:43 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 17:56:43' AND '2013-07-02 18:06:43' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T101: quantile window ending at 2013-07-02 18:19:59 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 18:09:59' AND '2013-07-02 18:19:59' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T102: quantile window ending at 2013-07-02 18:33:15 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 18:23:15' AND '2013-07-02 18:33:15' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T103: quantile window ending at 2013-07-02 18:46:31 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 18:36:31' AND '2013-07-02 18:46:31' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T104: quantile window ending at 2013-07-02 18:59:47 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 18:49:47' AND '2013-07-02 18:59:47' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T105: quantile window ending at 2013-07-02 19:13:03 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 19:03:03' AND '2013-07-02 19:13:03' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T106: quantile window ending at 2013-07-02 19:26:19 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 19:16:19' AND '2013-07-02 19:26:19' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T107: quantile window ending at 2013-07-02 19:39:35 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 19:29:35' AND '2013-07-02 19:39:35' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T108: quantile window ending at 2013-07-02 19:52:51 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 19:42:51' AND '2013-07-02 19:52:51' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T109: quantile window ending at 2013-07-02 20:06:07 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 19:56:07' AND '2013-07-02 20:06:07' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T110: quantile window ending at 2013-07-02 20:19:23 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 20:09:23' AND '2013-07-02 20:19:23' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T111: quantile window ending at 2013-07-02 20:32:39 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 20:22:39' AND '2013-07-02 20:32:39' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T112: quantile window ending at 2013-07-02 20:45:55 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 20:35:55' AND '2013-07-02 20:45:55' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T113: quantile window ending at 2013-07-02 20:59:11 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 20:49:11' AND '2013-07-02 20:59:11' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T114: quantile window ending at 2013-07-02 21:12:27 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:02:27' AND '2013-07-02 21:12:27' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T115: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T116: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T117: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T118: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T119: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T120: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T121: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T122: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T123: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T124: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T125: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T126: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T127: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T128: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T129: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T130: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T131: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T132: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T133: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T134: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T135: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T136: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T137: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T138: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T139: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T140: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T141: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T142: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T143: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T144: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T145: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T146: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T147: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T148: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T149: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T150: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T151: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T152: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T153: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T154: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T155: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T156: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T157: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T158: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T159: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T160: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T161: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T162: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T163: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T164: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T165: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T166: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T167: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T168: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T169: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T170: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T171: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T172: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T173: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T174: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T175: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T176: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T177: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T178: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T179: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T180: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T181: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T182: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T183: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T184: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T185: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T186: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T187: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T188: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T189: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T190: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T191: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T192: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T193: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T194: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T195: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T196: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T197: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T198: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T199: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T200: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T201: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T202: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T203: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T204: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T205: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T206: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T207: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T208: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T209: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T210: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T211: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T212: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T213: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T214: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T215: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T216: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T217: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T218: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T219: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T220: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T221: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T222: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T223: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T224: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T225: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T226: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T227: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T228: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T229: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T230: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T231: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T232: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T233: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T234: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T235: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T236: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T237: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T238: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T239: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T240: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T241: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T242: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T243: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T244: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T245: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T246: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T247: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T248: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T249: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T250: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T251: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T252: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T253: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T254: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T255: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T256: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T257: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T258: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T259: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T260: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T261: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T262: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T263: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; --- T264: quantile window ending at 2013-07-02 21:19:06 -SELECT quantile(0.95)(ResolutionWidth) FROM hits WHERE EventTime BETWEEN '2013-07-02 21:09:06' AND '2013-07-02 21:19:06' GROUP BY RegionID, OS, UserAgent, TraficSourceID; diff --git a/asap-tools/experiments/config/config.yaml b/asap-tools/experiments/config/config.yaml index 90bee69a..a62495a6 100644 --- a/asap-tools/experiments/config/config.yaml +++ b/asap-tools/experiments/config/config.yaml @@ -78,11 +78,6 @@ fake_exporter_language: "rust" # choices: ["python", "rust"] # Cluster data exporter configuration cluster_data_directory: "/data/cluster_traces" # Path to directory containing Google/Alibaba cluster trace data -# ClickHouse connection defaults (overridden per experiment_type config or CLI) -clickhouse: - url: "http://localhost:8123" - database: "default" - # Backend configuration for the query engine (aligned with BackendConfig in asap-query-engine/src/engine_config.rs) backend: type: "prometheus" # choices: ["prometheus", "clickhouse", "elastic_querydsl", "elastic_sql"] diff --git a/asap-tools/experiments/experiment_run_clickhouse.py b/asap-tools/experiments/experiment_run_clickhouse.py index 1a59898a..3077fd52 100644 --- a/asap-tools/experiments/experiment_run_clickhouse.py +++ b/asap-tools/experiments/experiment_run_clickhouse.py @@ -20,7 +20,7 @@ rsync results back ──────────────────────────────────────────────────────────── -Sketchdb flow (enabled when sketchdb_query_groups is set in config) +Sketchdb flow (enabled when experiment list contains mode: sketchdb) ──────────────────────────────────────────────────────────── (same rsync + ClickHouse start + data load as above) @@ -43,9 +43,10 @@ --table-name hits \\ --ts-column EventTime \\ --value-column ResolutionWidth \\ - --group-by-columns OS,RegionID,TraficSourceID,UserAgent \\ - --window-size 10 \\ - --output-prefix /path/to/output/clickbench \\ + --group-by-columns RegionID,OS,UserAgent,TraficSourceID \\ + --window-size 600 \\ + --stride-seconds 600 \\ + --output-prefix /path/to/output/clickhouse_quantile_queries \\ --auto-detect-timestamps \\ --data-file /path/to/hits.json \\ --data-file-format jsonl @@ -82,9 +83,11 @@ cloudlab.username=myuser \\ cloudlab.hostname_suffix=myexp.cloudlab.us \\ experiment_params.dataset.local_data_file=/path/to/hits.json \\ - 'experiment_params.query_groups[0].sql_file=/path/to/clickbench.sql' \\ - 'experiment_params.sketchdb_query_groups[0].sql_file=/path/to/clickbench_asap.sql' \\ - experiment_params.streaming_config_file=/path/to/clickbench_streaming.yaml + 'experiment_params.query_groups[0].sql_file=/path/to/clickbench.sql' + + Sketchdb mode runs when experiment_params.experiment contains an entry + with mode: sketchdb (mirrors the Prometheus experiment config structure). + Remove that entry to run baseline only. """ import json @@ -107,6 +110,8 @@ from experiment_utils.services.misc import ControllerService from experiment_utils.services.query_engine import QueryEngineRustService +CLICKHOUSE_DATABASE = "default" + def _inline_sql_queries_in_experiment_config(local_experiment_root_dir: str) -> None: """Enrich the saved experiment_params.yaml by inlining SQL from sql_file references. @@ -134,7 +139,6 @@ def _expand_groups(groups): group["queries"] = [s.strip() for s in content.split(";") if s.strip()] _expand_groups(data.get("query_groups")) - _expand_groups(data.get("sketchdb_query_groups")) with open(config_path, "w") as f: yaml.dump(data, f, allow_unicode=True) @@ -242,7 +246,6 @@ def main(cfg: DictConfig) -> None: experiment_name = cfg.experiment.name node_offset = cfg.cloudlab.node_offset no_teardown = cfg.flow.no_teardown - skip_querying = cfg.experiment_params.get("skip_querying", False) use_container = cfg.use_container.prometheus_client parallel = cfg.prometheus_client.parallel @@ -275,44 +278,33 @@ def main(cfg: DictConfig) -> None: dataset_cfg = ep.dataset dataset_name = str(dataset_cfg.name) local_data_file = str(dataset_cfg.local_data_file) - table = dataset_cfg.get("table") or None - init_sql_file = dataset_cfg.get("init_sql_file") or None - max_rows = int(dataset_cfg.get("max_rows", 0)) - - # --- ClickHouse connection --- - clickhouse_url = str(cfg.clickhouse.url) - clickhouse_database = str(cfg.clickhouse.database) - clickhouse_http_port = urlparse(clickhouse_url).port or 8123 - - # --- sketchdb config (optional) --- - sketchdb_query_groups = ep.get("sketchdb_query_groups") or None - data_ingestion_interval = int(ep.get("data_ingestion_interval", 15)) - asap_http_port = 8088 + table = dataset_cfg.table + init_sql_file = dataset_cfg.init_sql_file + max_rows = int(dataset_cfg.max_rows) + + # --- experiment modes and server URLs from config (mirrors Prometheus structure) --- + experiment_cfg = OmegaConf.to_container(ep.experiment, resolve=True) + servers_by_name = { + s["name"]: s["url"] for s in OmegaConf.to_container(ep.servers, resolve=True) + } + mode_server_urls = {m["mode"]: servers_by_name[m["server"]] for m in experiment_cfg} + clickhouse_url = servers_by_name["clickhouse"] + clickhouse_http_port = urlparse(clickhouse_url).port + data_ingestion_interval = int(ep.data_ingestion_interval) # --- generate prometheus-client config YAMLs for each experiment mode --- - if not skip_querying: - mode_server_urls = {constants.BASELINE_EXPERIMENT_NAME: clickhouse_url} - experiment_modes = config.generate_clickhouse_client_configs( - query_groups=ep.query_groups, - local_experiment_dir=local_experiment_root_dir, - mode_server_urls=mode_server_urls, - clickhouse_database=clickhouse_database, - sketchdb_query_groups=sketchdb_query_groups, - sketchdb_server_url=f"http://localhost:{asap_http_port}/clickhouse/query", - ) - sync.rsync_controller_client_configs( - provider, - experiment_root_output_dir, - local_experiment_root_dir, - node_offset=node_offset, - ) - else: - print("-" * 40) - print("skip_querying=True: no SQL queries will be executed") - print("-" * 40) - experiment_modes = [constants.BASELINE_EXPERIMENT_NAME] - if sketchdb_query_groups: - experiment_modes.append(constants.SKETCHDB_EXPERIMENT_NAME) + experiment_modes = config.generate_clickhouse_client_configs( + query_groups=ep.query_groups, + local_experiment_dir=local_experiment_root_dir, + mode_server_urls=mode_server_urls, + clickhouse_database=CLICKHOUSE_DATABASE, + ) + sync.rsync_controller_client_configs( + provider, + experiment_root_output_dir, + local_experiment_root_dir, + node_offset=node_offset, + ) # --- rsync dataset file to node --- remote_data_dir = os.path.join(experiment_root_output_dir, "data") @@ -328,7 +320,7 @@ def main(cfg: DictConfig) -> None: experiment_output_dir=experiment_root_output_dir, local_experiment_dir=local_experiment_root_dir, http_port=clickhouse_http_port, - database=clickhouse_database, + database=CLICKHOUSE_DATABASE, ) # --- load data once before the mode loop (DROP + reload) --- @@ -374,6 +366,7 @@ def main(cfg: DictConfig) -> None: if experiment_mode == constants.SKETCHDB_EXPERIMENT_NAME: # --- sketchdb mode: precompute engine + JSON ingest + ClickHouse fallback --- + asap_http_port = urlparse(mode_server_urls[experiment_mode]).port # Kill any leftover query_engine_rust from a previous run (mirrors # query_engine_service.stop() at the top of the e2e mode loop). QueryEngineRustService( @@ -393,7 +386,7 @@ def main(cfg: DictConfig) -> None: # Generate and rsync the planner input config to the node planner_input_yaml = config.generate_sql_planner_input( - sketchdb_query_groups, dataset_cfg + ep.query_groups, dataset_cfg ) local_planner_input = os.path.join( local_controller_dir, "planner_input.yaml" @@ -445,7 +438,7 @@ def main(cfg: DictConfig) -> None: backend_config={ "type": "clickhouse", "url": clickhouse_url, - "database": clickhouse_database, + "database": CLICKHOUSE_DATABASE, "forward_unsupported_queries": True, }, http_port=asap_http_port, @@ -462,28 +455,25 @@ def main(cfg: DictConfig) -> None: query_engine_service.wait_until_ready() - if not skip_querying: - steady_state_wait = int(cfg.flow.steady_state_wait) - print( - f"Waiting {steady_state_wait}s for precompute ingest to complete..." - ) - time.sleep(steady_state_wait) - - controller_client_config = os.path.join( - experiment_root_output_dir, - "controller_client_configs", - f"{experiment_mode}.yaml", - ) - _run_query_client( - provider=provider, - node_offset=node_offset, - config_file=controller_client_config, - output_dir=os.path.join( - experiment_output_dir, "prometheus_client_output" - ), - use_container=use_container, - parallel=parallel, - ) + steady_state_wait = int(cfg.flow.steady_state_wait) + print(f"Waiting {steady_state_wait}s for precompute ingest to complete...") + time.sleep(steady_state_wait) + + controller_client_config = os.path.join( + experiment_root_output_dir, + "controller_client_configs", + f"{experiment_mode}.yaml", + ) + _run_query_client( + provider=provider, + node_offset=node_offset, + config_file=controller_client_config, + output_dir=os.path.join( + experiment_output_dir, "prometheus_client_output" + ), + use_container=use_container, + parallel=parallel, + ) sync.rsync_experiment_data( provider, @@ -497,22 +487,21 @@ def main(cfg: DictConfig) -> None: else: # --- baseline mode --- - if not skip_querying: - controller_client_config = os.path.join( - experiment_root_output_dir, - "controller_client_configs", - f"{experiment_mode}.yaml", - ) - _run_query_client( - provider=provider, - node_offset=node_offset, - config_file=controller_client_config, - output_dir=os.path.join( - experiment_output_dir, "prometheus_client_output" - ), - use_container=use_container, - parallel=parallel, - ) + controller_client_config = os.path.join( + experiment_root_output_dir, + "controller_client_configs", + f"{experiment_mode}.yaml", + ) + _run_query_client( + provider=provider, + node_offset=node_offset, + config_file=controller_client_config, + output_dir=os.path.join( + experiment_output_dir, "prometheus_client_output" + ), + use_container=use_container, + parallel=parallel, + ) sync.rsync_experiment_data( provider, diff --git a/asap-tools/experiments/experiment_utils/config.py b/asap-tools/experiments/experiment_utils/config.py index ecfaf20c..cf0a8347 100644 --- a/asap-tools/experiments/experiment_utils/config.py +++ b/asap-tools/experiments/experiment_utils/config.py @@ -62,8 +62,6 @@ def _is_clickhouse_experiment(experiment_params: DictConfig) -> bool: def _validate_clickhouse_experiment_config(experiment_params: DictConfig) -> None: """Validate experiment_params for a ClickHouse experiment.""" - skip_querying = experiment_params.get("skip_querying", False) - # Validate dataset section if "dataset" not in experiment_params: raise ValueError( @@ -95,47 +93,20 @@ def _validate_clickhouse_experiment_config(experiment_params: DictConfig) -> Non "Run benchmark/prepare_data.py first to produce the JSON-lines file." ) - # Validate query_groups (required unless skip_querying) - if not skip_querying: - if ( - "query_groups" not in experiment_params - or not experiment_params.query_groups - ): + # Validate query_groups + if "query_groups" not in experiment_params or not experiment_params.query_groups: + raise ValueError( + "At least one query group must be defined in experiment config." + ) + for i, group in enumerate(experiment_params.query_groups): + sql_file = group.get("sql_file") + if not sql_file or sql_file == "???": raise ValueError( - "At least one query group must be defined in experiment config " - "when skip_querying=False" + f"Query group {i} missing 'sql_file'. " + "Generate SQL files with benchmark/generate_queries.py first." ) - for i, group in enumerate(experiment_params.query_groups): - sql_file = group.get("sql_file") - if not sql_file or sql_file == "???": - raise ValueError( - f"Query group {i} missing 'sql_file'. " - "Generate SQL files with benchmark/generate_queries.py first." - ) - if not os.path.exists(sql_file): - raise ValueError( - f"Query group {i} sql_file={sql_file!r} does not exist." - ) - elif "query_groups" in experiment_params and experiment_params.query_groups: - print("-" * 60) - print("WARNING: query_groups is present but will be IGNORED") - print(" skip_querying=True means no queries will be executed") - print("-" * 60) - - # Validate sketchdb_query_groups sql files if present - sketchdb_groups = experiment_params.get("sketchdb_query_groups") or None - if sketchdb_groups and not skip_querying: - for i, group in enumerate(sketchdb_groups): - sql_file = group.get("sql_file") - if not sql_file or sql_file == "???": - raise ValueError( - f"sketchdb_query_groups[{i}] missing 'sql_file'. " - "Generate ASAP SQL files with benchmark/generate_queries.py first." - ) - if not os.path.exists(sql_file): - raise ValueError( - f"sketchdb_query_groups[{i}] sql_file={sql_file!r} does not exist." - ) + if not os.path.exists(sql_file): + raise ValueError(f"Query group {i} sql_file={sql_file!r} does not exist.") def validate_experiment_config( @@ -678,8 +649,6 @@ def generate_clickhouse_client_configs( clickhouse_database: str = "default", clickhouse_user: str = "default", clickhouse_password: str = "", - sketchdb_query_groups: Any = None, - sketchdb_server_url: str = "http://localhost:8088/clickhouse/query", ) -> List[str]: """Generate prometheus-client config YAMLs for ClickHouse experiment modes. @@ -760,65 +729,10 @@ def generate_clickhouse_client_configs( with open(config_path, "w") as f: yaml.dump(config, f) - if sketchdb_query_groups is not None: - if isinstance(sketchdb_query_groups, (DictConfig, ListConfig)): - sketchdb_groups_list: List[Dict] = OmegaConf.to_container(sketchdb_query_groups, resolve=True) # type: ignore[assignment] - else: - sketchdb_groups_list = list(sketchdb_query_groups) - - built_sketchdb_groups = [] - for idx, group in enumerate(sketchdb_groups_list): - sql_file = group.get("sql_file") - if not sql_file: - name = group.get("name", str(idx)) - raise ValueError( - f"sketchdb query group {idx!r} ({name!r}) missing 'sql_file'" - ) - - queries = _load_sql_queries(sql_file) - if not queries: - raise ValueError( - f"No SQL statements found in sketchdb sql_file {sql_file!r}" - ) - - client_opts = dict(group.get("client_options") or {}) - client_opts.setdefault("starting_delay", 0) - client_opts.setdefault("repetitions", 1) - - built_sketchdb_groups.append( - { - "id": idx, - "queries": queries, - "repetition_delay": group.get("repetition_delay", 0), - "client_options": client_opts, - "time_window_seconds": group.get("time_window_seconds"), - } - ) - - sketchdb_config: Dict[str, Any] = { - "servers": [ - { - "name": constants.SKETCHDB_EXPERIMENT_NAME, - "url": sketchdb_server_url, - "protocol": "clickhouse", - "database": clickhouse_database, - "user": clickhouse_user, - "password": clickhouse_password, - } - ], - "query_groups": built_sketchdb_groups, - } - sketchdb_path = os.path.join( - output_dir, f"{constants.SKETCHDB_EXPERIMENT_NAME}.yaml" - ) - with open(sketchdb_path, "w") as f: - yaml.dump(sketchdb_config, f) - modes.append(constants.SKETCHDB_EXPERIMENT_NAME) - return modes -def generate_sql_planner_input(sketchdb_query_groups: Any, dataset_cfg: Any) -> str: +def generate_sql_planner_input(query_groups: Any, dataset_cfg: Any) -> str: """Generate the YAML input file for asap-planner in SQL mode. The planner (``asap-planner --query-language sql``) reads a @@ -830,7 +744,7 @@ def generate_sql_planner_input(sketchdb_query_groups: Any, dataset_cfg: Any) -> does not need a hand-authored planner input file. Args: - sketchdb_query_groups: ListConfig of sketchdb query group dicts. + query_groups: ListConfig of query group dicts. Each entry must have ``sql_file``, ``repetition_delay``, and ``controller_options`` (``accuracy_sla``, ``latency_sla``). dataset_cfg: DictConfig with ``table``/``name``, and ``precompute`` @@ -851,22 +765,22 @@ def generate_sql_planner_input(sketchdb_query_groups: Any, dataset_cfg: Any) -> } ] - if isinstance(sketchdb_query_groups, (DictConfig, ListConfig)): - groups_list = OmegaConf.to_container(sketchdb_query_groups, resolve=True) + if isinstance(query_groups, (DictConfig, ListConfig)): + groups_list = OmegaConf.to_container(query_groups, resolve=True) else: - groups_list = list(sketchdb_query_groups) + groups_list = list(query_groups) - query_groups = [] + planner_query_groups = [] for idx, group in enumerate(groups_list): sql_file = group.get("sql_file") if not sql_file: - raise ValueError(f"sketchdb_query_groups[{idx}] missing 'sql_file'") + raise ValueError(f"query_groups[{idx}] missing 'sql_file'") queries = _load_sql_queries(sql_file) if not queries: raise ValueError(f"No SQL statements found in {sql_file!r}") ctrl_opts = dict(group.get("controller_options") or {}) - query_groups.append( + planner_query_groups.append( { "id": idx + 1, "repetition_delay": int(group.get("repetition_delay", 0)), @@ -880,7 +794,7 @@ def generate_sql_planner_input(sketchdb_query_groups: Any, dataset_cfg: Any) -> planner_input = { "tables": tables, - "query_groups": query_groups, + "query_groups": planner_query_groups, "aggregate_cleanup": {"policy": "read_based"}, } return yaml.dump(planner_input, default_flow_style=False, allow_unicode=True)