From 1c7c4d15bdadee371d3d5e13ff80fb58408b9725 Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Thu, 21 May 2026 12:53:31 -0400 Subject: [PATCH 1/2] feat(tools): Add ClickhouseDataLoaderService --- .../experiment_utils/services/__init__.py | 3 +- .../services/clickbench_init.sql | 115 ++++++ .../services/clickhouse_service.py | 372 +++++++++++++++++- .../services/h2o_clickhouse_loader.py | 107 +++++ .../experiment_utils/services/h2o_init.sql | 18 + .../experiments/experiment_utils/sync.py | 36 ++ 6 files changed, 648 insertions(+), 3 deletions(-) create mode 100644 asap-tools/experiments/experiment_utils/services/clickbench_init.sql create mode 100644 asap-tools/experiments/experiment_utils/services/h2o_clickhouse_loader.py create mode 100644 asap-tools/experiments/experiment_utils/services/h2o_init.sql diff --git a/asap-tools/experiments/experiment_utils/services/__init__.py b/asap-tools/experiments/experiment_utils/services/__init__.py index 974a6f67..25b09710 100644 --- a/asap-tools/experiments/experiment_utils/services/__init__.py +++ b/asap-tools/experiments/experiment_utils/services/__init__.py @@ -39,7 +39,7 @@ DumbKafkaConsumerService, ) from .grafana import GrafanaService -from .clickhouse_service import ClickHouseService +from .clickhouse_service import ClickHouseService, ClickHouseDataLoaderService def create_prometheus_service(cfg, provider, num_nodes: int, node_offset: int): @@ -155,4 +155,5 @@ def create_prometheus_service(cfg, provider, num_nodes: int, node_offset: int): "GrafanaService", "create_prometheus_service", "ClickHouseService", + "ClickHouseDataLoaderService", ] diff --git a/asap-tools/experiments/experiment_utils/services/clickbench_init.sql b/asap-tools/experiments/experiment_utils/services/clickbench_init.sql new file mode 100644 index 00000000..87a7ce9b --- /dev/null +++ b/asap-tools/experiments/experiment_utils/services/clickbench_init.sql @@ -0,0 +1,115 @@ +-- ClickHouse schema for ClickBench (hits table, MergeTree only, no Kafka engine). +-- Matches benchmark/configs/clickbench_hits_init.sql. + +CREATE TABLE IF NOT EXISTS hits +( + WatchID Int64, + JavaEnable UInt8, + Title String, + GoodEvent Int16, + EventTime DateTime, + EventDate Date, + CounterID UInt32, + ClientIP Int32, + RegionID UInt32, + UserID Int64, + CounterClass Int8, + OS UInt8, + UserAgent UInt8, + URL String, + Referer String, + IsRefresh UInt8, + RefererCategoryID UInt16, + RefererRegionID UInt32, + URLCategoryID UInt16, + URLRegionID UInt32, + ResolutionWidth UInt16, + ResolutionHeight UInt16, + ResolutionDepth UInt8, + FlashMajor UInt8, + FlashMinor UInt8, + FlashMinor2 String, + NetMajor UInt8, + NetMinor UInt8, + UserAgentMajor UInt16, + UserAgentMinor String, + CookieEnable UInt8, + JavascriptEnable UInt8, + IsMobile UInt8, + MobilePhone UInt8, + MobilePhoneModel String, + Params String, + IPNetworkID UInt32, + TraficSourceID Int8, + SearchEngineID UInt16, + SearchPhrase String, + AdvEngineID UInt8, + IsArtifical UInt8, + WindowClientWidth UInt16, + WindowClientHeight UInt16, + ClientTimeZone Int16, + ClientEventTime DateTime, + SilverlightVersion1 UInt8, + SilverlightVersion2 UInt8, + SilverlightVersion3 UInt32, + SilverlightVersion4 UInt16, + PageCharset String, + CodeVersion UInt32, + IsLink UInt8, + IsDownload UInt8, + IsNotBounce UInt8, + FUniqID Int64, + OriginalURL String, + HID UInt32, + IsOldCounter UInt8, + IsEvent UInt8, + IsParameter UInt8, + DontCountHits UInt8, + WithHash UInt8, + HitColor String, + LocalEventTime DateTime, + Age UInt8, + Sex UInt8, + Income UInt8, + Interests UInt16, + Robotness UInt8, + RemoteIP Int32, + WindowName Int32, + OpenerName Int32, + HistoryLength Int16, + BrowserLanguage String, + BrowserCountry String, + SocialNetwork String, + SocialAction String, + HTTPError UInt16, + SendTiming UInt32, + DNSTiming UInt32, + ConnectTiming UInt32, + ResponseStartTiming UInt32, + ResponseEndTiming UInt32, + FetchTiming UInt32, + SocialSourceNetworkID UInt8, + SocialSourcePage String, + ParamPrice Int64, + ParamOrderID String, + ParamCurrency String, + ParamCurrencyID UInt16, + OpenstatServiceName String, + OpenstatCampaignID String, + OpenstatAdID String, + OpenstatSourceID String, + UTMSource String, + UTMMedium String, + UTMCampaign String, + UTMContent String, + UTMTerm String, + FromTag String, + HasGCLID UInt8, + RefererHash Int64, + URLHash Int64, + CLID UInt32 +) +ENGINE = MergeTree +PARTITION BY toYYYYMM(EventDate) +ORDER BY (CounterID, EventDate, intHash32(UserID), EventTime, WatchID) +SETTINGS index_granularity = 8192; diff --git a/asap-tools/experiments/experiment_utils/services/clickhouse_service.py b/asap-tools/experiments/experiment_utils/services/clickhouse_service.py index 60847224..8a9c7a2a 100644 --- a/asap-tools/experiments/experiment_utils/services/clickhouse_service.py +++ b/asap-tools/experiments/experiment_utils/services/clickhouse_service.py @@ -1,13 +1,14 @@ """ -ClickHouse Docker service management for SQL experiment infrastructure. +ClickHouse Docker service management and data loading for SQL experiment infrastructure. """ import os +import shlex import subprocess from typing import Optional from jinja2 import Template -from .base import DockerServiceBase +from .base import BaseService, DockerServiceBase from experiment_utils.providers.base import InfrastructureProvider import constants import utils @@ -157,3 +158,370 @@ def stop(self, **kwargs) -> None: self.compose_file = None else: self._force_cleanup_container() + + +class ClickHouseDataLoaderService(BaseService): + """Loads datasets into ClickHouse for SQL experiment infrastructure. + + Supports ClickBench (JSON-lines) and H2O GroupBy (CSV) out of the box, plus + custom JSON-lines datasets. Always drops and recreates the target table before + loading to guarantee a clean state. + + Built-in DDL and the H2O loader script live alongside this file in + ``experiment_utils/services/`` and are rsynced to the remote node at runtime: + - ``clickbench_init.sql`` — hits table schema + - ``h2o_init.sql`` — h2o_groupby table schema + - ``h2o_clickhouse_loader.py`` — standalone H2O CSV → ClickHouse loader + + Typical usage:: + + loader = ClickHouseDataLoaderService(provider, num_nodes=1, node_offset=0) + loader.prepare(local_data_file="/local/hits.json.gz", remote_dir="/scratch/data") + loader.start(dataset_name="clickbench") + """ + + # Directory containing this file; used to locate sibling DDL / script assets. + _ASSETS_DIR = os.path.dirname(os.path.abspath(__file__)) + + # Built-in DDL file names (relative to _ASSETS_DIR). + BUILTIN_DDL_FILES = { + "clickbench": "clickbench_init.sql", + "h2o": "h2o_init.sql", + } + + # H2O loader script (relative to _ASSETS_DIR). + H2O_LOADER_SCRIPT = "h2o_clickhouse_loader.py" + + H2O_BATCH_SIZE = 50_000 + + DEFAULT_TABLES = { + "clickbench": "hits", + "h2o": "h2o_groupby", + } + + def __init__( + self, + provider: InfrastructureProvider, + num_nodes: int, + node_offset: int, + clickhouse_http_port: int = 8123, + ): + super().__init__(provider) + self.num_nodes = num_nodes + self.node_offset = node_offset + self.clickhouse_http_port = clickhouse_http_port + self.remote_data_file: Optional[str] = None + + def prepare(self, local_data_file: str, remote_dir: str) -> str: + """Rsync a local data file to the remote node. + + Args: + local_data_file: Absolute path to the local data file. + remote_dir: Remote directory to place the file in. + + Returns: + The full remote path to the rsynced file. + """ + self.provider.execute_command( + node_idx=self.node_offset, + cmd=f"mkdir -p {remote_dir}", + cmd_dir=None, + nohup=False, + popen=False, + ) + hostname = f"node{self.node_offset}.{self.provider.hostname_suffix}" + rsync_cmd = 'rsync -azh --progress -e "ssh {}" {} {}@{}:{}/'.format( + constants.SSH_OPTIONS, + local_data_file, + self.provider.username, + hostname, + remote_dir, + ) + utils.run_cmd_with_retry(rsync_cmd, popen=False, ignore_errors=False) + self.remote_data_file = os.path.join( + remote_dir, os.path.basename(local_data_file) + ) + return self.remote_data_file + + def start( + self, + dataset_name: str, + remote_data_file: Optional[str] = None, + table: Optional[str] = None, + batch_size: int = H2O_BATCH_SIZE, + init_sql_file: Optional[str] = None, + max_rows: int = 0, + **kwargs, + ) -> None: + """Drop, recreate, and load data into ClickHouse. + + Args: + dataset_name: One of ``'clickbench'``, ``'h2o'``, or ``'custom'``. + remote_data_file: Path on the remote node. Defaults to the path + stored by the most recent :meth:`prepare` call. + table: Target table name. Defaults to the dataset's standard table + (``hits`` for clickbench, ``h2o_groupby`` for h2o). + batch_size: INSERT batch size for H2O loading (default 50 000). + init_sql_file: Path to a DDL SQL file *already on the remote node*. + When ``None``, the built-in ``*_init.sql`` is rsynced and used. + max_rows: Maximum rows to load (0 = all). + """ + if remote_data_file is None: + remote_data_file = self.remote_data_file + if remote_data_file is None: + raise ValueError( + "remote_data_file not set; call prepare() first or pass remote_data_file" + ) + + table = table or self.DEFAULT_TABLES.get(dataset_name) + if table is None: + raise ValueError( + f"table must be specified for dataset_name={dataset_name!r}" + ) + + url = f"http://localhost:{self.clickhouse_http_port}/" + + print(f"Dropping table {table!r}...") + self._exec_sql(f"DROP TABLE IF EXISTS {table}", url) + + if init_sql_file is not None: + print(f"Running init SQL from {init_sql_file!r}...") + self._exec_sql_file(init_sql_file, url) + elif dataset_name in self.BUILTIN_DDL_FILES: + local_ddl = os.path.join( + self._ASSETS_DIR, self.BUILTIN_DDL_FILES[dataset_name] + ) + print(f"Initializing schema for {dataset_name!r} from {local_ddl!r}...") + remote_ddl = f"/tmp/{dataset_name}_init_{os.getpid()}.sql" + self._rsync_to_remote(local_ddl, remote_ddl) + try: + self._exec_sql_file(remote_ddl, url) + finally: + self._remote_rm(remote_ddl) + elif dataset_name != "custom": + raise ValueError( + f"No built-in DDL for dataset_name={dataset_name!r}; pass init_sql_file" + ) + + if dataset_name == "clickbench": + self._load_clickbench(remote_data_file, url, table, max_rows) + elif dataset_name == "h2o": + self._load_h2o(remote_data_file, url, batch_size, max_rows) + elif dataset_name == "custom": + self._load_custom(remote_data_file, table, url, max_rows) + else: + raise ValueError( + f"Unsupported dataset_name={dataset_name!r}; " + "expected 'clickbench', 'h2o', or 'custom'" + ) + + count = self._check_row_count(table, url) + print(f"Loaded {count:,} rows into ClickHouse ({table!r})") + + def stop(self, **kwargs) -> None: + pass + + # ------------------------------------------------------------------ # + # Internal helpers # + # ------------------------------------------------------------------ # + + def _rsync_to_remote(self, local_path: str, remote_path: str) -> None: + """Rsync a single local file to an exact remote path.""" + hostname = f"node{self.node_offset}.{self.provider.hostname_suffix}" + cmd = 'rsync -azh -e "ssh {}" {} {}@{}:{}'.format( + constants.SSH_OPTIONS, + local_path, + self.provider.username, + hostname, + remote_path, + ) + utils.run_cmd_with_retry(cmd, popen=False, ignore_errors=False) + + def _remote_rm(self, remote_path: str) -> None: + """Remove a file on the remote node, ignoring errors.""" + self.provider.execute_command( + node_idx=self.node_offset, + cmd=f"rm -f {shlex.quote(remote_path)}", + cmd_dir=None, + nohup=False, + popen=False, + ignore_errors=True, + ) + + def _exec_sql(self, sql: str, url: str) -> None: + """Execute a single SQL statement via curl on the remote node.""" + cmd = "curl -sS {} --data {}".format(shlex.quote(url), shlex.quote(sql)) + result = self.provider.execute_command( + node_idx=self.node_offset, + cmd=cmd, + cmd_dir=None, + nohup=False, + popen=False, + ) + if isinstance(result, subprocess.CompletedProcess): + if result.returncode != 0: + raise RuntimeError( + f"SQL execution failed (exit {result.returncode}): " + f"{result.stderr.strip()[:200]}" + ) + if result.stdout and "Code:" in result.stdout: + print(f" WARN ClickHouse: {result.stdout.strip()[:200]}") + short = sql.strip()[:80].replace("\n", " ") + print(f" SQL OK: {short}") + + def _exec_sql_file(self, remote_sql_file: str, url: str) -> None: + """Read a SQL file on the remote node and execute each semicolon-delimited statement.""" + result = self.provider.execute_command( + node_idx=self.node_offset, + cmd="cat {}".format(shlex.quote(remote_sql_file)), + cmd_dir=None, + nohup=False, + popen=False, + ) + assert isinstance(result, subprocess.CompletedProcess) + for stmt in (s.strip() for s in result.stdout.split(";") if s.strip()): + self._exec_sql(stmt, url) + + def _check_row_count(self, table: str, url: str) -> int: + """Return the row count for a table on the remote node, or 0 on error.""" + cmd = "curl -sS {} --data {}".format( + shlex.quote(url), + shlex.quote(f"SELECT count() FROM {table}"), + ) + result = self.provider.execute_command( + node_idx=self.node_offset, + cmd=cmd, + cmd_dir=None, + nohup=False, + popen=False, + ignore_errors=True, + ) + if isinstance(result, subprocess.CompletedProcess) and result.returncode == 0: + try: + return int(result.stdout.strip()) + except (ValueError, TypeError): + pass + return 0 + + def _load_clickbench( + self, remote_data_file: str, url: str, table: str, max_rows: int + ) -> None: + """Stream a JSON-lines file (optionally gzipped) into ClickHouse.""" + print(f"Loading ClickBench data from {remote_data_file!r}...") + insert_url = "{}?query=INSERT+INTO+{}+FORMAT+JSONEachRow".format( + url.rstrip("/") + "/", table + ) + file_lower = remote_data_file.lower() + is_gz = file_lower.endswith(".json.gz") or file_lower.endswith(".jsonl.gz") + + if is_gz: + if max_rows > 0: + cmd = "zcat {} | head -n {} | curl -sS {} --data-binary @-".format( + shlex.quote(remote_data_file), max_rows, shlex.quote(insert_url) + ) + else: + cmd = "zcat {} | curl -sS {} --data-binary @-".format( + shlex.quote(remote_data_file), shlex.quote(insert_url) + ) + else: + if max_rows > 0: + cmd = "head -n {} {} | curl -sS {} --data-binary @-".format( + max_rows, shlex.quote(remote_data_file), shlex.quote(insert_url) + ) + else: + cmd = "curl -sS {} --data-binary @{}".format( + shlex.quote(insert_url), shlex.quote(remote_data_file) + ) + + result = self.provider.execute_command( + node_idx=self.node_offset, + cmd=cmd, + cmd_dir=None, + nohup=False, + popen=False, + ) + if isinstance(result, subprocess.CompletedProcess) and result.returncode != 0: + raise RuntimeError( + f"ClickBench data load failed: {result.stderr.strip()[:200]}" + ) + + def _load_h2o( + self, remote_data_file: str, url: str, batch_size: int, max_rows: int + ) -> None: + """Rsync h2o_clickhouse_loader.py to the remote node and execute it.""" + print(f"Loading H2O data from {remote_data_file!r}...") + local_script = os.path.join(self._ASSETS_DIR, self.H2O_LOADER_SCRIPT) + remote_script = f"/tmp/h2o_loader_{os.getpid()}.py" + self._rsync_to_remote(local_script, remote_script) + try: + cmd = "python3 {} --data-file {} --url {} --batch-size {} --max-rows {}".format( + shlex.quote(remote_script), + shlex.quote(remote_data_file), + shlex.quote(url), + batch_size, + max_rows, + ) + result = self.provider.execute_command( + node_idx=self.node_offset, + cmd=cmd, + cmd_dir=None, + nohup=False, + popen=False, + ) + if ( + isinstance(result, subprocess.CompletedProcess) + and result.returncode != 0 + ): + raise RuntimeError( + f"H2O data load failed: {result.stderr.strip()[:200]}" + ) + finally: + self._remote_rm(remote_script) + + def _load_custom( + self, remote_data_file: str, table: str, url: str, max_rows: int + ) -> None: + """Stream a custom JSON-lines file (plain or gzipped) into ClickHouse.""" + print(f"Loading custom data from {remote_data_file!r} into {table!r}...") + insert_url = "{}?query=INSERT+INTO+{}+FORMAT+JSONEachRow".format( + url.rstrip("/") + "/", table + ) + file_lower = remote_data_file.lower() + is_gz = file_lower.endswith(".json.gz") or file_lower.endswith(".jsonl.gz") + is_json = file_lower.endswith(".json") or file_lower.endswith(".jsonl") + + if is_gz: + if max_rows > 0: + cmd = "zcat {} | head -n {} | curl -sS {} --data-binary @-".format( + shlex.quote(remote_data_file), max_rows, shlex.quote(insert_url) + ) + else: + cmd = "zcat {} | curl -sS {} --data-binary @-".format( + shlex.quote(remote_data_file), shlex.quote(insert_url) + ) + elif is_json: + if max_rows > 0: + cmd = "head -n {} {} | curl -sS {} --data-binary @-".format( + max_rows, shlex.quote(remote_data_file), shlex.quote(insert_url) + ) + else: + cmd = "curl -sS {} --data-binary @{}".format( + shlex.quote(insert_url), shlex.quote(remote_data_file) + ) + else: + raise ValueError( + f"Unsupported file format for {remote_data_file!r}. " + "Use dataset_name='h2o' for CSV files." + ) + + result = self.provider.execute_command( + node_idx=self.node_offset, + cmd=cmd, + cmd_dir=None, + nohup=False, + popen=False, + ) + if isinstance(result, subprocess.CompletedProcess) and result.returncode != 0: + raise RuntimeError( + f"Custom data load failed: {result.stderr.strip()[:200]}" + ) diff --git a/asap-tools/experiments/experiment_utils/services/h2o_clickhouse_loader.py b/asap-tools/experiments/experiment_utils/services/h2o_clickhouse_loader.py new file mode 100644 index 00000000..84414040 --- /dev/null +++ b/asap-tools/experiments/experiment_utils/services/h2o_clickhouse_loader.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 +""" +Load an H2O GroupBy CSV file into ClickHouse with synthetic timestamps. + +Timestamps are assigned at ROWS_PER_SECOND rows/sec starting from BASE_EPOCH +(2024-01-01T00:00:00Z), matching the logic in benchmark/export_to_database.py. + +Uses only Python stdlib (urllib) so it runs on any Python 3 installation +without extra packages. + +Usage: + python3 h2o_clickhouse_loader.py \\ + --data-file /path/to/G1_1e7_1e2_0_0.csv \\ + --url http://localhost:8123/ \\ + --batch-size 50000 \\ + --max-rows 0 +""" + +import argparse +import urllib.request +from datetime import datetime, timezone + +BASE_EPOCH = 1_704_067_200 # 2024-01-01T00:00:00Z +ROWS_PER_SECOND = 1_000 + + +def flush_batch(url: str, rows: list) -> None: + sql = "INSERT INTO h2o_groupby VALUES " + ",".join(rows) + req = urllib.request.Request(url, data=sql.encode("utf-8")) + with urllib.request.urlopen(req) as resp: + body = resp.read() + if resp.status != 200: + raise RuntimeError( + "ClickHouse insert failed: " + + body[:200].decode("utf-8", errors="replace") + ) + + +def load(data_file: str, url: str, batch_size: int, max_rows: int) -> None: + batch = [] + total = 0 + with open(data_file, "r", encoding="utf-8") as f: + f.readline() # skip CSV header + for i, line in enumerate(f): + if max_rows > 0 and i >= max_rows: + break + parts = line.rstrip("\n").split(",") + abs_sec = BASE_EPOCH + i // ROWS_PER_SECOND + ts = datetime.fromtimestamp(abs_sec, tz=timezone.utc).strftime( + "%Y-%m-%d %H:%M:%S" + ) + row = ( + "('" + + ts + + "','" + + parts[0] + + "','" + + parts[1] + + "','" + + parts[2] + + "'," + + parts[3].strip() + + "," + + parts[4].strip() + + "," + + parts[5].strip() + + "," + + parts[6].strip() + + "," + + parts[7].strip() + + "," + + parts[8].strip() + + ")" + ) + batch.append(row) + if len(batch) >= batch_size: + flush_batch(url, batch) + total += len(batch) + batch = [] + if total % 500_000 == 0: + print(f" Inserted {total:,} rows...") + + if batch: + flush_batch(url, batch) + total += len(batch) + + print(f"H2O load complete: {total:,} rows") + + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--data-file", required=True, help="Path to H2O CSV file") + parser.add_argument( + "--url", default="http://localhost:8123/", help="ClickHouse HTTP URL" + ) + parser.add_argument( + "--batch-size", type=int, default=50_000, help="INSERT batch size" + ) + parser.add_argument( + "--max-rows", type=int, default=0, help="Max rows to load (0 = all)" + ) + args = parser.parse_args() + load(args.data_file, args.url, args.batch_size, args.max_rows) + + +if __name__ == "__main__": + main() diff --git a/asap-tools/experiments/experiment_utils/services/h2o_init.sql b/asap-tools/experiments/experiment_utils/services/h2o_init.sql new file mode 100644 index 00000000..0b2ecff6 --- /dev/null +++ b/asap-tools/experiments/experiment_utils/services/h2o_init.sql @@ -0,0 +1,18 @@ +-- ClickHouse schema for H2O GroupBy benchmark. +-- Matches benchmark/configs/h2o_init.sql. +-- DROP is handled by ClickHouseDataLoaderService.start() before calling this file. + +CREATE TABLE IF NOT EXISTS h2o_groupby +( + timestamp DateTime, + id1 String, + id2 String, + id3 String, + id4 Int32, + id5 Int32, + id6 Int32, + v1 Int32, + v2 Int32, + v3 Float64 +) ENGINE = MergeTree() +ORDER BY (id1, id2); diff --git a/asap-tools/experiments/experiment_utils/sync.py b/asap-tools/experiments/experiment_utils/sync.py index f35c5968..3daf92bd 100644 --- a/asap-tools/experiments/experiment_utils/sync.py +++ b/asap-tools/experiments/experiment_utils/sync.py @@ -117,6 +117,42 @@ def rsync_controller_config_remote_to_local( utils.run_cmd_with_retry(cmd, popen=False, ignore_errors=False) +def rsync_dataset_file( + provider: InfrastructureProvider, + local_data_file: str, + remote_dir: str, + node_offset: int, +) -> str: + """Rsync a local dataset file to a remote node. + + Args: + provider: Infrastructure provider. + local_data_file: Absolute path to the local data file. + remote_dir: Remote directory to place the file in. + node_offset: Index of the target node. + + Returns: + The full remote path to the rsynced file. + """ + hostname = f"node{node_offset}.{provider.hostname_suffix}" + provider.execute_command( + node_idx=node_offset, + cmd=f"mkdir -p {remote_dir}", + cmd_dir=None, + nohup=False, + popen=False, + ) + cmd = 'rsync -azh --progress -e "ssh {}" {} {}@{}:{}/'.format( + constants.SSH_OPTIONS, + local_data_file, + provider.username, + hostname, + remote_dir, + ) + utils.run_cmd_with_retry(cmd, popen=False, ignore_errors=False) + return os.path.join(remote_dir, os.path.basename(local_data_file)) + + def copy_experiment_config(experiment_params, local_experiment_dir: str): """Save the experiment config to local directory for reference.""" os.makedirs(os.path.join(local_experiment_dir, "experiment_config"), exist_ok=True) From fa5bb36661d72337fc3c71ab68966fd122aeb38f Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Thu, 21 May 2026 13:01:59 -0400 Subject: [PATCH 2/2] moved some files --- .../services/{clickbench_init.sql => clickbench/init.sql} | 0 .../experiment_utils/services/clickhouse_service.py | 6 +++--- .../services/{h2o_init.sql => h2o/init.sql} | 0 .../services/{h2o_clickhouse_loader.py => h2o/loader.py} | 0 4 files changed, 3 insertions(+), 3 deletions(-) rename asap-tools/experiments/experiment_utils/services/{clickbench_init.sql => clickbench/init.sql} (100%) rename asap-tools/experiments/experiment_utils/services/{h2o_init.sql => h2o/init.sql} (100%) rename asap-tools/experiments/experiment_utils/services/{h2o_clickhouse_loader.py => h2o/loader.py} (100%) diff --git a/asap-tools/experiments/experiment_utils/services/clickbench_init.sql b/asap-tools/experiments/experiment_utils/services/clickbench/init.sql similarity index 100% rename from asap-tools/experiments/experiment_utils/services/clickbench_init.sql rename to asap-tools/experiments/experiment_utils/services/clickbench/init.sql diff --git a/asap-tools/experiments/experiment_utils/services/clickhouse_service.py b/asap-tools/experiments/experiment_utils/services/clickhouse_service.py index 8a9c7a2a..07dc627c 100644 --- a/asap-tools/experiments/experiment_utils/services/clickhouse_service.py +++ b/asap-tools/experiments/experiment_utils/services/clickhouse_service.py @@ -185,12 +185,12 @@ class ClickHouseDataLoaderService(BaseService): # Built-in DDL file names (relative to _ASSETS_DIR). BUILTIN_DDL_FILES = { - "clickbench": "clickbench_init.sql", - "h2o": "h2o_init.sql", + "clickbench": "clickbench/init.sql", + "h2o": "h2o/init.sql", } # H2O loader script (relative to _ASSETS_DIR). - H2O_LOADER_SCRIPT = "h2o_clickhouse_loader.py" + H2O_LOADER_SCRIPT = "h2o/loader.py" H2O_BATCH_SIZE = 50_000 diff --git a/asap-tools/experiments/experiment_utils/services/h2o_init.sql b/asap-tools/experiments/experiment_utils/services/h2o/init.sql similarity index 100% rename from asap-tools/experiments/experiment_utils/services/h2o_init.sql rename to asap-tools/experiments/experiment_utils/services/h2o/init.sql diff --git a/asap-tools/experiments/experiment_utils/services/h2o_clickhouse_loader.py b/asap-tools/experiments/experiment_utils/services/h2o/loader.py similarity index 100% rename from asap-tools/experiments/experiment_utils/services/h2o_clickhouse_loader.py rename to asap-tools/experiments/experiment_utils/services/h2o/loader.py