11"""
2- Experiment runner for ClickHouse/SQL experiments — baseline mode .
2+ Experiment runner for ClickHouse/SQL experiments.
33
4- Flow:
4+ Supports two modes, selected automatically based on config:
5+
6+ baseline — queries go directly to ClickHouse (:8123).
7+ sketchdb — queries go to the ASAP precompute engine (:8088), which serves
8+ approximate results from KLL sketches and forwards unsupported
9+ queries to ClickHouse as a fallback.
10+
11+ ────────────────────────────────────────────────────────────
12+ Baseline-only flow
13+ ────────────────────────────────────────────────────────────
514 rsync dataset file → node
615 ClickHouseService.start()
7- ClickHouseDataLoaderService.start() (once, before mode loop; DROP + reload)
16+ ClickHouseDataLoaderService.start() (DROP + reload)
817
918 for experiment_mode in ["baseline"]:
10- run prometheus-client in ClickHouse SQL mode (blocking)
19+ run prometheus-client → ClickHouse :8123
1120 rsync results back
12- teardown if not no_teardown
1321
14- Usage:
22+ ────────────────────────────────────────────────────────────
23+ Sketchdb flow (enabled when sketchdb_query_groups is set in config)
24+ ────────────────────────────────────────────────────────────
25+ (same rsync + ClickHouse start + data load as above)
26+
27+ for experiment_mode in ["baseline", "sketchdb"]:
28+ if sketchdb:
29+ rsync streaming_config.yaml → node
30+ query_engine_rust starts (precompute engine + JSON ingest + CH fallback)
31+ wait until process is up
32+ sleep flow.steady_state_wait (ingest completes during this window)
33+ run prometheus-client → ASAP :8088
34+ rsync results back
35+ stop query_engine_rust
36+
37+ ────────────────────────────────────────────────────────────
38+ Pre-run steps for sketchdb mode
39+ ────────────────────────────────────────────────────────────
40+ 1. Generate the SQL query file (once per dataset/window configuration):
41+
42+ python asap-tools/execution-utilities/benchmark/generate_queries.py \\
43+ --table-name hits \\
44+ --ts-column EventTime \\
45+ --value-column ResolutionWidth \\
46+ --group-by-columns OS,RegionID,TraficSourceID,UserAgent \\
47+ --window-size 10 \\
48+ --output-prefix /path/to/output/clickbench \\
49+ --auto-detect-timestamps \\
50+ --data-file /path/to/hits.json \\
51+ --data-file-format jsonl
52+
53+ The same SQL file is used for both baseline (load tester) and sketchdb
54+ (load tester + planner input).
55+
56+ 2. Ensure the release binaries are built on the CloudLab node:
57+ cargo build --release (in ~/code/asap-query-engine)
58+ cargo build --release (in ~/code/asap-planner-rs)
59+
60+ asap-planner runs automatically in SQL mode during the experiment to
61+ generate streaming_config.yaml and inference_config.yaml — no manual
62+ config file authoring needed.
63+
64+ ────────────────────────────────────────────────────────────
65+ Usage
66+ ────────────────────────────────────────────────────────────
67+ Baseline only:
68+ python experiment_run_clickhouse.py \\
69+ experiment_type=clickhouse \\
70+ experiment.name=my_bench \\
71+ cloudlab.num_nodes=1 \\
72+ cloudlab.username=myuser \\
73+ cloudlab.hostname_suffix=myexp.cloudlab.us \\
74+ experiment_params.dataset.local_data_file=/path/to/hits.json \\
75+ 'experiment_params.query_groups[0].sql_file=/path/to/clickbench.sql'
76+
77+ Baseline + sketchdb:
1578 python experiment_run_clickhouse.py \\
1679 experiment_type=clickhouse \\
1780 experiment.name=my_bench \\
1881 cloudlab.num_nodes=1 \\
1982 cloudlab.username=myuser \\
2083 cloudlab.hostname_suffix=myexp.cloudlab.us \\
21- experiment_params.dataset.name=clickbench \\
2284 experiment_params.dataset.local_data_file=/path/to/hits.json \\
23- 'experiment_params.query_groups[0].sql_file=/path/to/queries.sql'
85+ 'experiment_params.query_groups[0].sql_file=/path/to/clickbench.sql' \\
86+ 'experiment_params.sketchdb_query_groups[0].sql_file=/path/to/clickbench_asap.sql' \\
87+ experiment_params.streaming_config_file=/path/to/clickbench_streaming.yaml
2488"""
2589
2690import json
2791import os
92+ import time
2893from urllib .parse import urlparse
2994
3095import hydra
3398import constants
3499from experiment_utils import config , sync
35100from experiment_utils .providers .factory import create_provider
36- from experiment_utils .services import ClickHouseDataLoaderService , ClickHouseService
101+ from experiment_utils .services import (
102+ ClickHouseDataLoaderService ,
103+ ClickHouseService ,
104+ PrometheusClientService ,
105+ )
106+ from experiment_utils .services .misc import ControllerService
107+ from experiment_utils .services .query_engine import QueryEngineRustService
37108
38109# Register resolvers used by config.yaml interpolation.
39110OmegaConf .register_new_resolver (
@@ -178,6 +249,11 @@ def main(cfg: DictConfig) -> None:
178249 clickhouse_database = str (cfg .clickhouse .database )
179250 clickhouse_http_port = urlparse (clickhouse_url ).port or 8123
180251
252+ # --- sketchdb config (optional) ---
253+ sketchdb_query_groups = ep .get ("sketchdb_query_groups" ) or None
254+ data_ingestion_interval = int (ep .get ("data_ingestion_interval" , 15 ))
255+ asap_http_port = 8088
256+
181257 # --- generate prometheus-client config YAMLs for each experiment mode ---
182258 if not skip_querying :
183259 mode_server_urls = {constants .BASELINE_EXPERIMENT_NAME : clickhouse_url }
@@ -186,6 +262,8 @@ def main(cfg: DictConfig) -> None:
186262 local_experiment_dir = local_experiment_root_dir ,
187263 mode_server_urls = mode_server_urls ,
188264 clickhouse_database = clickhouse_database ,
265+ sketchdb_query_groups = sketchdb_query_groups ,
266+ sketchdb_server_url = f"http://localhost:{ asap_http_port } /clickhouse/query" ,
189267 )
190268 sync .rsync_controller_client_configs (
191269 provider ,
@@ -198,6 +276,8 @@ def main(cfg: DictConfig) -> None:
198276 print ("skip_querying=True: no SQL queries will be executed" )
199277 print ("-" * 40 )
200278 experiment_modes = [constants .BASELINE_EXPERIMENT_NAME ]
279+ if sketchdb_query_groups :
280+ experiment_modes .append (constants .SKETCHDB_EXPERIMENT_NAME )
201281
202282 # --- rsync dataset file to node ---
203283 remote_data_dir = os .path .join (experiment_root_output_dir , "data" )
@@ -232,9 +312,18 @@ def main(cfg: DictConfig) -> None:
232312 )
233313
234314 # --- mode loop ---
315+ prometheus_client_service = PrometheusClientService (
316+ provider , use_container = use_container , node_offset = node_offset
317+ )
318+
235319 for experiment_mode in experiment_modes :
236320 print (f"Running experiment mode: { experiment_mode } " )
237321
322+ # Clean up any leftover prometheus-client container from the previous mode,
323+ # mirroring the prometheus_client_service.stop() call at the top of the
324+ # e2e mode loop.
325+ prometheus_client_service .stop ()
326+
238327 experiment_output_dir = os .path .join (
239328 experiment_root_output_dir , experiment_mode
240329 )
@@ -248,27 +337,145 @@ def main(cfg: DictConfig) -> None:
248337 )
249338 os .makedirs (local_experiment_dir , exist_ok = True )
250339
251- if not skip_querying :
252- controller_client_config = os .path .join (
253- experiment_root_output_dir ,
254- "controller_client_configs" ,
255- f"{ experiment_mode } .yaml" ,
340+ if experiment_mode == constants .SKETCHDB_EXPERIMENT_NAME :
341+ # --- sketchdb mode: precompute engine + JSON ingest + ClickHouse fallback ---
342+ # Mirrors experiment_run_e2e.py: planner runs first and generates
343+ # streaming_config.yaml + inference_config.yaml into controller_output_dir,
344+ # then the query engine starts reading from that same directory.
345+
346+ local_controller_dir = os .path .join (
347+ local_experiment_root_dir , "controller_output"
256348 )
257- _run_query_client (
349+ remote_controller_dir = os .path .join (
350+ experiment_root_output_dir , "controller_output"
351+ )
352+ os .makedirs (local_controller_dir , exist_ok = True )
353+
354+ # Generate and rsync the planner input config to the node
355+ planner_input_yaml = config .generate_sql_planner_input (
356+ sketchdb_query_groups , dataset_cfg
357+ )
358+ local_planner_input = os .path .join (
359+ local_controller_dir , "planner_input.yaml"
360+ )
361+ with open (local_planner_input , "w" ) as _f :
362+ _f .write (planner_input_yaml )
363+ sync .rsync_streaming_configs (
364+ provider , local_controller_dir , remote_controller_dir , node_offset
365+ )
366+
367+ # Run asap-planner (SQL mode) — writes streaming_config.yaml + inference_config.yaml
368+ controller_service = ControllerService (
369+ provider = provider , use_container = False , node_offset = node_offset
370+ )
371+ controller_service .start (
372+ controller_input_file = os .path .join (
373+ remote_controller_dir , "planner_input.yaml"
374+ ),
375+ streaming_engine = "precompute" ,
376+ controller_remote_output_dir = remote_controller_dir ,
377+ punting = False ,
378+ query_language = "sql" ,
379+ data_ingestion_interval = data_ingestion_interval ,
380+ )
381+ sync .rsync_controller_config_remote_to_local (
382+ provider , remote_controller_dir , local_controller_dir , node_offset
383+ )
384+
385+ # Start query engine (precompute + JSON ingest + ClickHouse fallback)
386+ query_engine_service = QueryEngineRustService (
258387 provider = provider ,
388+ use_container = False ,
259389 node_offset = node_offset ,
260- config_file = controller_client_config ,
261- output_dir = experiment_output_dir ,
262- use_container = use_container ,
263- parallel = parallel ,
390+ )
391+ dataset_precompute_cfg = dataset_cfg .precompute
392+ query_engine_service .start (
393+ experiment_output_dir = experiment_output_dir ,
394+ local_experiment_dir = local_experiment_dir ,
395+ flink_output_format = "json" ,
396+ prometheus_scrape_interval = 15 ,
397+ log_level = "INFO" ,
398+ profile_query_engine = False ,
399+ manual = False ,
400+ streaming_engine = "precompute" ,
401+ controller_remote_output_dir = remote_controller_dir ,
402+ compress_json = False ,
403+ dump_precomputes = False ,
404+ lock_strategy = "per-key" ,
405+ backend_config = {
406+ "type" : "clickhouse" ,
407+ "url" : clickhouse_url ,
408+ "database" : clickhouse_database ,
409+ "forward_unsupported_queries" : True ,
410+ },
411+ http_port = asap_http_port ,
412+ ingest_json_config = {
413+ "path" : remote_data_file ,
414+ "metric_name" : str (dataset_cfg .metric_name ),
415+ "value_col" : str (dataset_precompute_cfg .value_col ),
416+ "label_cols" : list (dataset_precompute_cfg .label_cols ),
417+ "timestamp_col" : str (dataset_precompute_cfg .timestamp_col ),
418+ "timestamp_unit" : "seconds" ,
419+ "batch_size" : 1000 ,
420+ },
264421 )
265422
266- sync .rsync_experiment_data (
267- provider ,
268- experiment_output_dir ,
269- local_experiment_dir ,
270- node_offset = node_offset ,
271- )
423+ query_engine_service .wait_until_ready ()
424+
425+ if not skip_querying :
426+ steady_state_wait = int (cfg .flow .steady_state_wait )
427+ print (
428+ f"Waiting { steady_state_wait } s for precompute ingest to complete..."
429+ )
430+ time .sleep (steady_state_wait )
431+
432+ controller_client_config = os .path .join (
433+ experiment_root_output_dir ,
434+ "controller_client_configs" ,
435+ f"{ experiment_mode } .yaml" ,
436+ )
437+ _run_query_client (
438+ provider = provider ,
439+ node_offset = node_offset ,
440+ config_file = controller_client_config ,
441+ output_dir = experiment_output_dir ,
442+ use_container = use_container ,
443+ parallel = parallel ,
444+ )
445+
446+ sync .rsync_experiment_data (
447+ provider ,
448+ experiment_output_dir ,
449+ local_experiment_dir ,
450+ node_offset = node_offset ,
451+ )
452+
453+ if not no_teardown :
454+ query_engine_service .stop ()
455+
456+ else :
457+ # --- baseline mode ---
458+ if not skip_querying :
459+ controller_client_config = os .path .join (
460+ experiment_root_output_dir ,
461+ "controller_client_configs" ,
462+ f"{ experiment_mode } .yaml" ,
463+ )
464+ _run_query_client (
465+ provider = provider ,
466+ node_offset = node_offset ,
467+ config_file = controller_client_config ,
468+ output_dir = experiment_output_dir ,
469+ use_container = use_container ,
470+ parallel = parallel ,
471+ )
472+
473+ sync .rsync_experiment_data (
474+ provider ,
475+ experiment_output_dir ,
476+ local_experiment_dir ,
477+ node_offset = node_offset ,
478+ )
272479
273480 # --- teardown ---
274481 if not no_teardown :
0 commit comments