From 81830be5eb91e646b65645cbbbdcf6bd893c0a00 Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Thu, 21 May 2026 15:26:42 -0400 Subject: [PATCH 1/2] feat(tools): add client for SQL querying for experiment infra --- .../experiments/experiment_utils/config.py | 101 +++++++++++++++++- .../services/prometheus_client_service.py | 9 +- asap-tools/experiments/remote_monitor.py | 1 + .../main_prometheus_client.py | 25 +++-- 4 files changed, 124 insertions(+), 12 deletions(-) diff --git a/asap-tools/experiments/experiment_utils/config.py b/asap-tools/experiments/experiment_utils/config.py index eda2df43..51af0461 100644 --- a/asap-tools/experiments/experiment_utils/config.py +++ b/asap-tools/experiments/experiment_utils/config.py @@ -6,9 +6,9 @@ import os import copy import yaml -from typing import List, Tuple +from typing import Any, Dict, List, Tuple -from omegaconf import DictConfig, OmegaConf +from omegaconf import DictConfig, ListConfig, OmegaConf import constants @@ -544,6 +544,103 @@ def validate_config(cfg: DictConfig, script_name: str = "experiment_run_e2e"): ) +def _load_sql_queries(sql_file: str) -> List[str]: + """Read a SQL file and return individual statements, preserving comment lines.""" + with open(sql_file) as f: + content = f.read() + return [stmt.strip() for stmt in content.split(";") if stmt.strip()] + + +def generate_clickhouse_client_configs( + query_groups: Any, + local_experiment_dir: str, + mode_server_urls: Dict[str, str], + clickhouse_database: str = "default", + clickhouse_user: str = "default", + clickhouse_password: str = "", +) -> List[str]: + """Generate prometheus-client config YAMLs for ClickHouse experiment modes. + + SQL queries are read from the ``sql_file`` paths in each query group and + inlined into the YAML, so no separate SQL file rsync is required. + + For each mode in ``mode_server_urls`` a file is written to + ``{local_experiment_dir}/controller_client_configs/{mode}.yaml`` — the same + directory that ``rsync_controller_client_configs`` already syncs. + + Args: + query_groups: Iterable of query-group dicts (or DictConfig/ListConfig). + Each entry must have ``sql_file`` and may have ``client_options`` + (``starting_delay``, ``repetitions``) and ``repetition_delay``. + local_experiment_dir: Local directory under which + ``controller_client_configs/`` is created. + mode_server_urls: Mapping of mode name to ClickHouse server URL, e.g. + ``{"baseline": "http://localhost:8123"}``. One YAML file is + written per entry. + clickhouse_database: ClickHouse database name (default ``"default"``). + clickhouse_user: ClickHouse user (default ``"default"``). + clickhouse_password: ClickHouse password (default ``""``). + + Returns: + List of mode names for which configs were generated. + """ + output_dir = os.path.join(local_experiment_dir, "controller_client_configs") + os.makedirs(output_dir, exist_ok=True) + + # Normalise OmegaConf containers to plain Python structures + if isinstance(query_groups, (DictConfig, ListConfig)): + query_groups_list: List[Dict] = OmegaConf.to_container(query_groups, resolve=True) # type: ignore[assignment] + else: + query_groups_list = list(query_groups) + + # Build query groups with SQL inlined from files + built_groups = [] + for idx, group in enumerate(query_groups_list): + sql_file = group.get("sql_file") + if not sql_file: + name = group.get("name", str(idx)) + raise ValueError(f"Query group {idx!r} ({name!r}) missing 'sql_file'") + + queries = _load_sql_queries(sql_file) + if not queries: + raise ValueError(f"No SQL statements found in {sql_file!r}") + + client_opts = dict(group.get("client_options") or {}) + client_opts.setdefault("starting_delay", 0) + client_opts.setdefault("repetitions", 1) + + built_groups.append( + { + "id": idx, + "queries": queries, + "repetition_delay": group.get("repetition_delay", 0), + "client_options": client_opts, + "time_window_seconds": group.get("time_window_seconds"), + } + ) + + modes = list(mode_server_urls.keys()) + for mode, url in mode_server_urls.items(): + config: Dict[str, Any] = { + "servers": [ + { + "name": mode, + "url": url, + "protocol": "clickhouse", + "database": clickhouse_database, + "user": clickhouse_user, + "password": clickhouse_password, + } + ], + "query_groups": built_groups, + } + config_path = os.path.join(output_dir, f"{mode}.yaml") + with open(config_path, "w") as f: + yaml.dump(config, f) + + return modes + + def generate_and_copy_prometheus_config( num_nodes_in_experiment, local_experiment_dir, diff --git a/asap-tools/experiments/experiment_utils/services/prometheus_client_service.py b/asap-tools/experiments/experiment_utils/services/prometheus_client_service.py index ac0512b4..d22bff64 100644 --- a/asap-tools/experiments/experiment_utils/services/prometheus_client_service.py +++ b/asap-tools/experiments/experiment_utils/services/prometheus_client_service.py @@ -39,6 +39,7 @@ def start( profile_query_engine_pid: Optional[int], profile_prometheus_time: Optional[int], parallel: bool, + sql_mode: bool, **kwargs, ): if self.use_container: @@ -52,6 +53,7 @@ def start( profile_query_engine_pid, profile_prometheus_time, parallel, + sql_mode, ) else: return self._start_bare_metal( @@ -64,6 +66,7 @@ def start( profile_query_engine_pid, profile_prometheus_time, parallel, + sql_mode, ) def _start_containerized( @@ -77,6 +80,7 @@ def _start_containerized( profile_query_engine_pid: Optional[int], profile_prometheus_time: Optional[int], parallel: bool, + sql_mode: bool, ): prometheus_client_dir = os.path.join( self.provider.get_home_dir(), @@ -116,7 +120,7 @@ def _start_containerized( if parallel: gen_compose_cmd += " --parallel" - if experiment_mode == constants.SKETCHDB_EXPERIMENT_NAME: + if experiment_mode == constants.SKETCHDB_EXPERIMENT_NAME and not sql_mode: assert query_engine_config_file is not None gen_compose_cmd += f" --align-query-time --server-for-alignment sketchdb --query-engine-config-file {query_engine_config_file}" @@ -150,12 +154,13 @@ def _start_bare_metal( profile_query_engine_pid: Optional[int], profile_prometheus_time: Optional[int], parallel: bool, + sql_mode: bool, ): cmd = "python3 -u main_prometheus_client.py --config_file {} --output_dir {} --output_file {}{}".format( config_file, output_dir, output_file, " --parallel" if parallel else "" ) - if experiment_mode == constants.SKETCHDB_EXPERIMENT_NAME: + if experiment_mode == constants.SKETCHDB_EXPERIMENT_NAME and not sql_mode: assert query_engine_config_file is not None cmd += " --align_query_time --server_for_alignment sketchdb --query_engine_config_file {}".format( query_engine_config_file diff --git a/asap-tools/experiments/remote_monitor.py b/asap-tools/experiments/remote_monitor.py index ef9582c4..d46ac519 100644 --- a/asap-tools/experiments/remote_monitor.py +++ b/asap-tools/experiments/remote_monitor.py @@ -373,6 +373,7 @@ def main(args): profile_query_engine_pid, args.profile_prometheus_time, args.prometheus_client_parallel, + sql_mode=False, ) if prometheus_client_service.use_container: diff --git a/asap-tools/queriers/prometheus-client/main_prometheus_client.py b/asap-tools/queriers/prometheus-client/main_prometheus_client.py index 08a9bf53..121f834d 100644 --- a/asap-tools/queriers/prometheus-client/main_prometheus_client.py +++ b/asap-tools/queriers/prometheus-client/main_prometheus_client.py @@ -34,12 +34,12 @@ from promql_utilities.query_results.serializers import SerializerFactory -class PrometheusDebugRetry(Retry): +class DebugRetry(Retry): def __init__(self, *args: Any, server_name: str = "", **kwargs: Any) -> None: super().__init__(*args, **kwargs) self.server_name = server_name - def new(self, **kw: Any) -> "PrometheusDebugRetry": + def new(self, **kw: Any) -> "DebugRetry": """Override new() to preserve server_name when creating new instances.""" new_retry = super().new(**kw) new_retry.server_name = self.server_name @@ -53,7 +53,7 @@ def increment( error: Optional[Exception] = None, _pool: Optional[Any] = None, _stacktrace: Optional[Any] = None, - ) -> "PrometheusDebugRetry": + ) -> "DebugRetry": # Calculate current attempt number assert self.total is not None current_retries = self.total - ( @@ -74,11 +74,11 @@ def increment( ) result = super().increment(method, url, response, error, _pool, _stacktrace) - assert isinstance(result, PrometheusDebugRetry) + assert isinstance(result, DebugRetry) return result -class PrometheusDebugHTTPAdapter(HTTPAdapter): +class DebugHTTPAdapter(HTTPAdapter): def __init__(self, server_name: str, *args: Any, **kwargs: Any) -> None: self.server_name = server_name super().__init__(*args, **kwargs) @@ -609,7 +609,7 @@ def main(args: Any) -> None: if protocol == "prometheus": # Create custom retry adapter with debug logging - debug_retry = PrometheusDebugRetry( + debug_retry = DebugRetry( server_name=server.name, total=3, backoff_factor=1, @@ -625,7 +625,7 @@ def main(args: Any) -> None: ) # Mount debug adapter for HTTP request logging - debug_adapter = PrometheusDebugHTTPAdapter(server.name) + debug_adapter = DebugHTTPAdapter(server.name) client.session.mount("http://", debug_adapter) client.session.mount("https://", debug_adapter) else: @@ -640,7 +640,7 @@ def main(args: Any) -> None: ) # Mount debug adapter for HTTP request logging - debug_adapter = PrometheusDebugHTTPAdapter(server.name) + debug_adapter = DebugHTTPAdapter(server.name) client.session.mount("http://", debug_adapter) client.session.mount("https://", debug_adapter) @@ -654,6 +654,15 @@ def main(args: Any) -> None: if args.align_query_time and server.name == args.server_for_alignment: server_url_for_alignment = server.url + # ClickHouse does not expose /api/v1/status/runtimeinfo — skip alignment silently + if args.align_query_time and any( + (s.protocol or "prometheus") == "clickhouse" for s in config.servers + ): + logger.info( + "Skipping query-time alignment: ClickHouse protocol has no runtimeinfo endpoint" + ) + args.align_query_time = False + query_start_times = None if args.align_query_time: assert server_url_for_alignment is not None From d15b95aac450aba47e24a5a6810856127c8ecb62 Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Thu, 21 May 2026 15:34:07 -0400 Subject: [PATCH 2/2] renamed service, changed sql_mode to backend --- .../experiment_utils/services/__init__.py | 3 ++- .../services/prometheus_client_service.py | 26 +++++++++++++------ asap-tools/experiments/remote_monitor.py | 2 +- 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/asap-tools/experiments/experiment_utils/services/__init__.py b/asap-tools/experiments/experiment_utils/services/__init__.py index 25b09710..bc399b60 100644 --- a/asap-tools/experiments/experiment_utils/services/__init__.py +++ b/asap-tools/experiments/experiment_utils/services/__init__.py @@ -25,7 +25,7 @@ from .system_exporters import SystemExportersService from .prometheus import PrometheusService from .prometheus_kafka_adapter import PrometheusKafkaAdapterService -from .prometheus_client_service import PrometheusClientService +from .prometheus_client_service import QueryClientService, PrometheusClientService from .remote_monitor_service import RemoteMonitorService from .docker_prometheus import DockerPrometheusService from .docker_victoriametrics import DockerVictoriaMetricsService @@ -141,6 +141,7 @@ def create_prometheus_service(cfg, provider, num_nodes: int, node_offset: int): "SystemExportersService", "PrometheusService", "PrometheusKafkaAdapterService", + "QueryClientService", "PrometheusClientService", "RemoteMonitorService", "DockerPrometheusService", diff --git a/asap-tools/experiments/experiment_utils/services/prometheus_client_service.py b/asap-tools/experiments/experiment_utils/services/prometheus_client_service.py index d22bff64..518ed09e 100644 --- a/asap-tools/experiments/experiment_utils/services/prometheus_client_service.py +++ b/asap-tools/experiments/experiment_utils/services/prometheus_client_service.py @@ -12,7 +12,7 @@ from experiment_utils.providers.base import InfrastructureProvider -class PrometheusClientService(BaseService): +class QueryClientService(BaseService): def __init__( self, provider: InfrastructureProvider, @@ -39,7 +39,7 @@ def start( profile_query_engine_pid: Optional[int], profile_prometheus_time: Optional[int], parallel: bool, - sql_mode: bool, + backend_type: str, **kwargs, ): if self.use_container: @@ -53,7 +53,7 @@ def start( profile_query_engine_pid, profile_prometheus_time, parallel, - sql_mode, + backend_type, ) else: return self._start_bare_metal( @@ -66,7 +66,7 @@ def start( profile_query_engine_pid, profile_prometheus_time, parallel, - sql_mode, + backend_type, ) def _start_containerized( @@ -80,7 +80,7 @@ def _start_containerized( profile_query_engine_pid: Optional[int], profile_prometheus_time: Optional[int], parallel: bool, - sql_mode: bool, + backend_type: str, ): prometheus_client_dir = os.path.join( self.provider.get_home_dir(), @@ -120,7 +120,10 @@ def _start_containerized( if parallel: gen_compose_cmd += " --parallel" - if experiment_mode == constants.SKETCHDB_EXPERIMENT_NAME and not sql_mode: + if ( + experiment_mode == constants.SKETCHDB_EXPERIMENT_NAME + and backend_type != "clickhouse" + ): assert query_engine_config_file is not None gen_compose_cmd += f" --align-query-time --server-for-alignment sketchdb --query-engine-config-file {query_engine_config_file}" @@ -154,13 +157,16 @@ def _start_bare_metal( profile_query_engine_pid: Optional[int], profile_prometheus_time: Optional[int], parallel: bool, - sql_mode: bool, + backend_type: str, ): cmd = "python3 -u main_prometheus_client.py --config_file {} --output_dir {} --output_file {}{}".format( config_file, output_dir, output_file, " --parallel" if parallel else "" ) - if experiment_mode == constants.SKETCHDB_EXPERIMENT_NAME and not sql_mode: + if ( + experiment_mode == constants.SKETCHDB_EXPERIMENT_NAME + and backend_type != "clickhouse" + ): assert query_engine_config_file is not None cmd += " --align_query_time --server_for_alignment sketchdb --query_engine_config_file {}".format( query_engine_config_file @@ -286,3 +292,7 @@ def _is_healthy_containerized(self) -> bool: return False except Exception: return False + + +# Backward-compatible alias +PrometheusClientService = QueryClientService diff --git a/asap-tools/experiments/remote_monitor.py b/asap-tools/experiments/remote_monitor.py index d46ac519..577d6f07 100644 --- a/asap-tools/experiments/remote_monitor.py +++ b/asap-tools/experiments/remote_monitor.py @@ -373,7 +373,7 @@ def main(args): profile_query_engine_pid, args.profile_prometheus_time, args.prometheus_client_parallel, - sql_mode=False, + backend_type="prometheus", ) if prometheus_client_service.use_container: