Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 99 additions & 2 deletions asap-tools/experiments/experiment_utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import os
import copy
import yaml
from typing import List, Tuple
from typing import Any, Dict, List, Tuple

from omegaconf import DictConfig, OmegaConf
from omegaconf import DictConfig, ListConfig, OmegaConf

import constants

Expand Down Expand Up @@ -544,6 +544,103 @@ def validate_config(cfg: DictConfig, script_name: str = "experiment_run_e2e"):
)


def _load_sql_queries(sql_file: str) -> List[str]:
"""Read a SQL file and return individual statements, preserving comment lines."""
with open(sql_file) as f:
content = f.read()
return [stmt.strip() for stmt in content.split(";") if stmt.strip()]


def generate_clickhouse_client_configs(
query_groups: Any,
local_experiment_dir: str,
mode_server_urls: Dict[str, str],
clickhouse_database: str = "default",
clickhouse_user: str = "default",
clickhouse_password: str = "",
) -> List[str]:
"""Generate prometheus-client config YAMLs for ClickHouse experiment modes.

SQL queries are read from the ``sql_file`` paths in each query group and
inlined into the YAML, so no separate SQL file rsync is required.

For each mode in ``mode_server_urls`` a file is written to
``{local_experiment_dir}/controller_client_configs/{mode}.yaml`` — the same
directory that ``rsync_controller_client_configs`` already syncs.

Args:
query_groups: Iterable of query-group dicts (or DictConfig/ListConfig).
Each entry must have ``sql_file`` and may have ``client_options``
(``starting_delay``, ``repetitions``) and ``repetition_delay``.
local_experiment_dir: Local directory under which
``controller_client_configs/`` is created.
mode_server_urls: Mapping of mode name to ClickHouse server URL, e.g.
``{"baseline": "http://localhost:8123"}``. One YAML file is
written per entry.
clickhouse_database: ClickHouse database name (default ``"default"``).
clickhouse_user: ClickHouse user (default ``"default"``).
clickhouse_password: ClickHouse password (default ``""``).

Returns:
List of mode names for which configs were generated.
"""
output_dir = os.path.join(local_experiment_dir, "controller_client_configs")
os.makedirs(output_dir, exist_ok=True)

# Normalise OmegaConf containers to plain Python structures
if isinstance(query_groups, (DictConfig, ListConfig)):
query_groups_list: List[Dict] = OmegaConf.to_container(query_groups, resolve=True) # type: ignore[assignment]
else:
query_groups_list = list(query_groups)

# Build query groups with SQL inlined from files
built_groups = []
for idx, group in enumerate(query_groups_list):
sql_file = group.get("sql_file")
if not sql_file:
name = group.get("name", str(idx))
raise ValueError(f"Query group {idx!r} ({name!r}) missing 'sql_file'")

queries = _load_sql_queries(sql_file)
if not queries:
raise ValueError(f"No SQL statements found in {sql_file!r}")

client_opts = dict(group.get("client_options") or {})
client_opts.setdefault("starting_delay", 0)
client_opts.setdefault("repetitions", 1)

built_groups.append(
{
"id": idx,
"queries": queries,
"repetition_delay": group.get("repetition_delay", 0),
"client_options": client_opts,
"time_window_seconds": group.get("time_window_seconds"),
}
)

modes = list(mode_server_urls.keys())
for mode, url in mode_server_urls.items():
config: Dict[str, Any] = {
"servers": [
{
"name": mode,
"url": url,
"protocol": "clickhouse",
"database": clickhouse_database,
"user": clickhouse_user,
"password": clickhouse_password,
}
],
"query_groups": built_groups,
}
config_path = os.path.join(output_dir, f"{mode}.yaml")
with open(config_path, "w") as f:
yaml.dump(config, f)

return modes


def generate_and_copy_prometheus_config(
num_nodes_in_experiment,
local_experiment_dir,
Expand Down
3 changes: 2 additions & 1 deletion asap-tools/experiments/experiment_utils/services/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from .system_exporters import SystemExportersService
from .prometheus import PrometheusService
from .prometheus_kafka_adapter import PrometheusKafkaAdapterService
from .prometheus_client_service import PrometheusClientService
from .prometheus_client_service import QueryClientService, PrometheusClientService
from .remote_monitor_service import RemoteMonitorService
from .docker_prometheus import DockerPrometheusService
from .docker_victoriametrics import DockerVictoriaMetricsService
Expand Down Expand Up @@ -141,6 +141,7 @@ def create_prometheus_service(cfg, provider, num_nodes: int, node_offset: int):
"SystemExportersService",
"PrometheusService",
"PrometheusKafkaAdapterService",
"QueryClientService",
"PrometheusClientService",
"RemoteMonitorService",
"DockerPrometheusService",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from experiment_utils.providers.base import InfrastructureProvider


class PrometheusClientService(BaseService):
class QueryClientService(BaseService):
def __init__(
self,
provider: InfrastructureProvider,
Expand All @@ -39,6 +39,7 @@ def start(
profile_query_engine_pid: Optional[int],
profile_prometheus_time: Optional[int],
parallel: bool,
backend_type: str,
**kwargs,
):
if self.use_container:
Expand All @@ -52,6 +53,7 @@ def start(
profile_query_engine_pid,
profile_prometheus_time,
parallel,
backend_type,
)
else:
return self._start_bare_metal(
Expand All @@ -64,6 +66,7 @@ def start(
profile_query_engine_pid,
profile_prometheus_time,
parallel,
backend_type,
)

def _start_containerized(
Expand All @@ -77,6 +80,7 @@ def _start_containerized(
profile_query_engine_pid: Optional[int],
profile_prometheus_time: Optional[int],
parallel: bool,
backend_type: str,
):
prometheus_client_dir = os.path.join(
self.provider.get_home_dir(),
Expand Down Expand Up @@ -116,7 +120,10 @@ def _start_containerized(
if parallel:
gen_compose_cmd += " --parallel"

if experiment_mode == constants.SKETCHDB_EXPERIMENT_NAME:
if (
experiment_mode == constants.SKETCHDB_EXPERIMENT_NAME
and backend_type != "clickhouse"
):
assert query_engine_config_file is not None
gen_compose_cmd += f" --align-query-time --server-for-alignment sketchdb --query-engine-config-file {query_engine_config_file}"

Expand Down Expand Up @@ -150,12 +157,16 @@ def _start_bare_metal(
profile_query_engine_pid: Optional[int],
profile_prometheus_time: Optional[int],
parallel: bool,
backend_type: str,
):
cmd = "python3 -u main_prometheus_client.py --config_file {} --output_dir {} --output_file {}{}".format(
config_file, output_dir, output_file, " --parallel" if parallel else ""
)

if experiment_mode == constants.SKETCHDB_EXPERIMENT_NAME:
if (
experiment_mode == constants.SKETCHDB_EXPERIMENT_NAME
and backend_type != "clickhouse"
):
assert query_engine_config_file is not None
cmd += " --align_query_time --server_for_alignment sketchdb --query_engine_config_file {}".format(
query_engine_config_file
Expand Down Expand Up @@ -281,3 +292,7 @@ def _is_healthy_containerized(self) -> bool:
return False
except Exception:
return False


# Backward-compatible alias
PrometheusClientService = QueryClientService
1 change: 1 addition & 0 deletions asap-tools/experiments/remote_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ def main(args):
profile_query_engine_pid,
args.profile_prometheus_time,
args.prometheus_client_parallel,
backend_type="prometheus",
)

if prometheus_client_service.use_container:
Expand Down
25 changes: 17 additions & 8 deletions asap-tools/queriers/prometheus-client/main_prometheus_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
from promql_utilities.query_results.serializers import SerializerFactory


class PrometheusDebugRetry(Retry):
class DebugRetry(Retry):
def __init__(self, *args: Any, server_name: str = "", **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.server_name = server_name

def new(self, **kw: Any) -> "PrometheusDebugRetry":
def new(self, **kw: Any) -> "DebugRetry":
"""Override new() to preserve server_name when creating new instances."""
new_retry = super().new(**kw)
new_retry.server_name = self.server_name
Expand All @@ -53,7 +53,7 @@ def increment(
error: Optional[Exception] = None,
_pool: Optional[Any] = None,
_stacktrace: Optional[Any] = None,
) -> "PrometheusDebugRetry":
) -> "DebugRetry":
# Calculate current attempt number
assert self.total is not None
current_retries = self.total - (
Expand All @@ -74,11 +74,11 @@ def increment(
)

result = super().increment(method, url, response, error, _pool, _stacktrace)
assert isinstance(result, PrometheusDebugRetry)
assert isinstance(result, DebugRetry)
return result


class PrometheusDebugHTTPAdapter(HTTPAdapter):
class DebugHTTPAdapter(HTTPAdapter):
def __init__(self, server_name: str, *args: Any, **kwargs: Any) -> None:
self.server_name = server_name
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -609,7 +609,7 @@ def main(args: Any) -> None:

if protocol == "prometheus":
# Create custom retry adapter with debug logging
debug_retry = PrometheusDebugRetry(
debug_retry = DebugRetry(
server_name=server.name,
total=3,
backoff_factor=1,
Expand All @@ -625,7 +625,7 @@ def main(args: Any) -> None:
)

# Mount debug adapter for HTTP request logging
debug_adapter = PrometheusDebugHTTPAdapter(server.name)
debug_adapter = DebugHTTPAdapter(server.name)
client.session.mount("http://", debug_adapter)
client.session.mount("https://", debug_adapter)
else:
Expand All @@ -640,7 +640,7 @@ def main(args: Any) -> None:
)

# Mount debug adapter for HTTP request logging
debug_adapter = PrometheusDebugHTTPAdapter(server.name)
debug_adapter = DebugHTTPAdapter(server.name)
client.session.mount("http://", debug_adapter)
client.session.mount("https://", debug_adapter)

Expand All @@ -654,6 +654,15 @@ def main(args: Any) -> None:
if args.align_query_time and server.name == args.server_for_alignment:
server_url_for_alignment = server.url

# ClickHouse does not expose /api/v1/status/runtimeinfo — skip alignment silently
if args.align_query_time and any(
(s.protocol or "prometheus") == "clickhouse" for s in config.servers
):
logger.info(
"Skipping query-time alignment: ClickHouse protocol has no runtimeinfo endpoint"
)
args.align_query_time = False

query_start_times = None
if args.align_query_time:
assert server_url_for_alignment is not None
Expand Down
Loading