Skip to content

Commit 522233b

Browse files
benjamib112benjamib112
authored andcommitted
added cleanup between runs in pipeline, visualization per benchmark
1 parent ea50243 commit 522233b

2 files changed

Lines changed: 97 additions & 21 deletions

File tree

asap-tools/execution-utilities/asap_benchmark_pipeline/run_benchmark.py

Lines changed: 92 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,26 @@
2525
# Query extraction
2626
# ---------------------------------------------------------------------------
2727

28+
2829
def extract_queries_from_sql(sql_file: Path) -> List[Tuple[str, str]]:
2930
"""Extract (query_id, sql) pairs from an annotated SQL file."""
3031
with open(sql_file) as f:
3132
content = f.read()
3233
pattern = r"-- ([A-Za-z0-9_]+):[^\n]*\n(SELECT[^;]+;)"
33-
return [(qid, sql.strip()) for qid, sql in re.findall(pattern, content, re.DOTALL | re.IGNORECASE)]
34+
return [
35+
(qid, sql.strip())
36+
for qid, sql in re.findall(pattern, content, re.DOTALL | re.IGNORECASE)
37+
]
3438

3539

3640
# ---------------------------------------------------------------------------
3741
# Data loading — ClickHouse direct
3842
# ---------------------------------------------------------------------------
3943

40-
def load_h2o_data_clickhouse(clickhouse_url: str, skip_table_init: bool = False, max_rows: int = 0):
44+
45+
def load_h2o_data_clickhouse(
46+
clickhouse_url: str, skip_table_init: bool = False, max_rows: int = 0
47+
):
4148
"""Load H2O CSV into ClickHouse MergeTree (baseline path)."""
4249

4350
if not skip_table_init:
@@ -110,6 +117,7 @@ def _flush_batch(clickhouse_url: str, rows: list):
110117
# Data loading — Kafka (for Arroyo sketch pipeline)
111118
# ---------------------------------------------------------------------------
112119

120+
113121
def produce_h2o_to_kafka(topic: str = "h2o_groupby", max_rows: int = 0):
114122
"""Stream H2O CSV rows into Kafka with fixed 2024-01-01 message timestamps."""
115123
csv_path = _download_h2o_csv()
@@ -171,6 +179,7 @@ def _download_h2o_csv() -> str:
171179
# Pipeline latency measurement
172180
# ---------------------------------------------------------------------------
173181

182+
174183
def measure_pipeline_latency(
175184
kafka_topic: str = "h2o_groupby",
176185
asap_url: str = "http://localhost:8088/clickhouse/query",
@@ -258,14 +267,17 @@ def measure_pipeline_latency(
258267

259268
latencies.sort()
260269
median = latencies[len(latencies) // 2]
261-
print(f"\nPipeline latency (data→query): median={median:.2f}s across {len(latencies)} trials")
270+
print(
271+
f"\nPipeline latency (data→query): median={median:.2f}s across {len(latencies)} trials"
272+
)
262273
return median * 1000 # return ms
263274

264275

265276
# ---------------------------------------------------------------------------
266277
# Benchmark runner
267278
# ---------------------------------------------------------------------------
268279

280+
269281
def run_query(
270282
query: str, endpoint_url: str, session: requests.Session, timeout: int = 30
271283
) -> Tuple[float, Optional[str], Optional[str]]:
@@ -314,15 +326,28 @@ def run_benchmark(
314326

315327
with open(output_csv, "w", newline="") as csvfile:
316328
writer = csv.writer(csvfile)
317-
writer.writerow(["query_id", "latency_ms", "serving_ms", "pipeline_ms", "result_rows", "result_preview", "error", "mode"])
329+
writer.writerow(
330+
[
331+
"query_id",
332+
"latency_ms",
333+
"serving_ms",
334+
"pipeline_ms",
335+
"result_rows",
336+
"result_preview",
337+
"error",
338+
"mode",
339+
]
340+
)
318341

319342
for query_id, sql in queries:
320343
print(f"Running {query_id}...", end=" ", flush=True)
321344
serving_ms, result, error = run_query(sql, endpoint_url, session)
322345

323346
if error:
324347
print(f"ERROR {error}")
325-
writer.writerow([query_id, serving_ms, serving_ms, 0, 0, "", error, mode])
348+
writer.writerow(
349+
[query_id, serving_ms, serving_ms, 0, 0, "", error, mode]
350+
)
326351
plot_latencies.append(0.0)
327352
else:
328353
total_ms = serving_ms + pipeline_overhead_ms
@@ -333,10 +358,23 @@ def run_benchmark(
333358
total_latencies.append(total_ms)
334359
plot_latencies.append(total_ms)
335360
if pipeline_overhead_ms > 0:
336-
print(f"{total_ms:.2f}ms (serving={serving_ms:.2f}ms + pipeline={pipeline_overhead_ms:.2f}ms, {num_rows} rows)")
361+
print(
362+
f"{total_ms:.2f}ms (serving={serving_ms:.2f}ms + pipeline={pipeline_overhead_ms:.2f}ms, {num_rows} rows)"
363+
)
337364
else:
338365
print(f"{total_ms:.2f}ms ({num_rows} rows)")
339-
writer.writerow([query_id, f"{total_ms:.2f}", f"{serving_ms:.2f}", f"{pipeline_overhead_ms:.2f}", num_rows, preview, "", mode])
366+
writer.writerow(
367+
[
368+
query_id,
369+
f"{total_ms:.2f}",
370+
f"{serving_ms:.2f}",
371+
f"{pipeline_overhead_ms:.2f}",
372+
num_rows,
373+
preview,
374+
"",
375+
mode,
376+
]
377+
)
340378

341379
time.sleep(0.1)
342380

@@ -346,29 +384,42 @@ def run_benchmark(
346384
total_latencies.sort()
347385
serving_latencies.sort()
348386
n = len(total_latencies)
387+
349388
def stats(arr):
350-
return arr[0], sum(arr)/len(arr), arr[int(len(arr)*0.5)], arr[int(len(arr)*0.95)], arr[-1]
389+
return (
390+
arr[0],
391+
sum(arr) / len(arr),
392+
arr[int(len(arr) * 0.5)],
393+
arr[int(len(arr) * 0.95)],
394+
arr[-1],
395+
)
351396

352397
t_min, t_avg, t_p50, t_p95, t_max = stats(total_latencies)
353398
print(f"\nTotal latency summary ({n} successful queries):")
354-
print(f" min={t_min:.2f}ms avg={t_avg:.2f}ms p50={t_p50:.2f}ms p95={t_p95:.2f}ms max={t_max:.2f}ms")
399+
print(
400+
f" min={t_min:.2f}ms avg={t_avg:.2f}ms p50={t_p50:.2f}ms p95={t_p95:.2f}ms max={t_max:.2f}ms"
401+
)
355402
if pipeline_overhead_ms > 0:
356403
s_min, s_avg, s_p50, s_p95, s_max = stats(serving_latencies)
357-
print(f" (serving only: min={s_min:.2f}ms avg={s_avg:.2f}ms p50={s_p50:.2f}ms)")
404+
print(
405+
f" (serving only: min={s_min:.2f}ms avg={s_avg:.2f}ms p50={s_p50:.2f}ms)"
406+
)
358407
print(f" (pipeline overhead: {pipeline_overhead_ms:.2f}ms per query)")
359408

360409
if plot_latencies:
361410
plt.figure(figsize=(10, 6))
362-
bar_color = '#1f77b4' if mode == 'baseline' else '#ff7f0e'
411+
bar_color = "#1f77b4" if mode == "baseline" else "#ff7f0e"
363412
execution_order = list(range(1, len(plot_latencies) + 1))
364-
plt.bar(execution_order, plot_latencies, color=bar_color, edgecolor='black')
365-
plt.xlabel("Query Execution Order", fontsize=12, fontweight='bold')
366-
plt.ylabel("Latency (ms)", fontsize=12, fontweight='bold')
413+
plt.bar(execution_order, plot_latencies, color=bar_color, edgecolor="black")
414+
plt.xlabel("Query Execution Order", fontsize=12, fontweight="bold")
415+
plt.ylabel("Latency (ms)", fontsize=12, fontweight="bold")
367416
max_order = len(execution_order)
368417
tick_step = max(1, max_order // 20) * 5
369418
plt.xticks(range(0, max_order + 1, tick_step))
370-
plt.title(f"Query Latency - {mode.upper()} Mode", fontsize=14, fontweight='bold')
371-
plt.grid(axis='y', linestyle='--', alpha=0.7)
419+
plt.title(
420+
f"Query Latency - {mode.upper()} Mode", fontsize=14, fontweight="bold"
421+
)
422+
plt.grid(axis="y", linestyle="--", alpha=0.7)
372423
plt.tight_layout()
373424
plot_output = output_csv.with_suffix(".png")
374425
plt.savefig(plot_output)
@@ -380,8 +431,11 @@ def stats(arr):
380431
# Main
381432
# ---------------------------------------------------------------------------
382433

434+
383435
def main():
384-
parser = argparse.ArgumentParser(description="Benchmark ASAP vs ClickHouse on H2O groupby data")
436+
parser = argparse.ArgumentParser(
437+
description="Benchmark ASAP vs ClickHouse on H2O groupby data"
438+
)
385439
parser.add_argument("--mode", choices=["baseline", "asap"], default="asap")
386440
parser.add_argument("--load-data", action="store_true", help="Load H2O data")
387441
parser.add_argument("--clickhouse-url", default="http://localhost:8123")
@@ -391,13 +445,23 @@ def main():
391445
parser.add_argument("--filter", default=None, help="Comma-separated query IDs")
392446
parser.add_argument("--no-benchmark", action="store_true", help="Load data only")
393447
parser.add_argument("--skip-table-init", action="store_true")
394-
parser.add_argument("--load-kafka", action="store_true", help="Stream data to Kafka (for Arroyo sketch pipeline)")
395-
parser.add_argument("--max-rows", type=int, default=0, help="Max rows to load (0 = all)")
448+
parser.add_argument(
449+
"--load-kafka",
450+
action="store_true",
451+
help="Stream data to Kafka (for Arroyo sketch pipeline)",
452+
)
453+
parser.add_argument(
454+
"--max-rows", type=int, default=0, help="Max rows to load (0 = all)"
455+
)
396456

397457
args = parser.parse_args()
398458

399459
if args.load_data:
400-
if not load_h2o_data_clickhouse(args.clickhouse_url, skip_table_init=args.skip_table_init, max_rows=args.max_rows):
460+
if not load_h2o_data_clickhouse(
461+
args.clickhouse_url,
462+
skip_table_init=args.skip_table_init,
463+
max_rows=args.max_rows,
464+
):
401465
print("Failed to load data")
402466
return 1
403467

@@ -422,7 +486,14 @@ def main():
422486
print("\nMeasuring pipeline latency (data → Kafka → Arroyo → QE → query)...")
423487
pipeline_overhead_ms = measure_pipeline_latency(asap_url=args.asap_url)
424488

425-
run_benchmark(sql_file, endpoint, Path(args.output), args.mode, query_filter, pipeline_overhead_ms)
489+
run_benchmark(
490+
sql_file,
491+
endpoint,
492+
Path(args.output),
493+
args.mode,
494+
query_filter,
495+
pipeline_overhead_ms,
496+
)
426497
return 0
427498

428499

asap-tools/execution-utilities/asap_benchmark_pipeline/run_pipeline.sh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,11 @@ case "$MODE" in
347347
;;
348348
both)
349349
run_baseline "baseline_results.csv"
350+
echo ""
351+
echo "========================================"
352+
echo "Cleaning up between runs..."
353+
echo "========================================"
354+
"$SCRIPT_DIR/cleanup.sh" --no-sudo
350355
run_asap "asap_results.csv"
351356
echo ""
352357
echo "========================================"

0 commit comments

Comments
 (0)