From 58d0ef6586b6aa95d56bf5983c49154d83ff990b Mon Sep 17 00:00:00 2001 From: Kevin Boyd Date: Tue, 26 May 2026 14:08:24 -0400 Subject: [PATCH 1/5] Bench: --rdkit_max_seconds budget + throughput-normalized reporting - Add --rdkit_max_seconds to etkdg/ff_optimize/substruct benches. When set, the RDKit comparison stops once wall-clock elapsed exceeds the cap; the reported timing is over molecules/pairs actually processed. - Replace 'speedup = baseline_ms / avg_ms' with throughput-based ratios (mols * confs / elapsed) so methods that processed different amounts of work are comparable. - CSV gains mols_processed/pairs_processed, rdkit_max_seconds, confs_per_second/pairs_per_second, vs_rdkit_throughput_ratio columns. - substruct: --rdkit_match_mode and --rdkit_threads become nargs='+'; main runs the cartesian product and picks the best-throughput variant as the validation/throughput baseline. --- benchmarks/etkdg_bench.py | 85 +++++-- benchmarks/ff_optimize_bench.py | 86 +++++-- benchmarks/substruct_bench.py | 401 ++++++++++++++++++++++---------- 3 files changed, 423 insertions(+), 149 deletions(-) diff --git a/benchmarks/etkdg_bench.py b/benchmarks/etkdg_bench.py index d2b99319..88940f27 100644 --- a/benchmarks/etkdg_bench.py +++ b/benchmarks/etkdg_bench.py @@ -132,23 +132,40 @@ def bench_rdkit( confs_per_mol: int, runs: int, warmup: bool, -) -> tuple[TimingResult, list[Chem.Mol]]: - """Benchmark RDKit ``EmbedMultipleConfs``; return ``(timing, last_run_mols)``.""" + max_seconds: float = 0.0, +) -> tuple[TimingResult, list[Chem.Mol], int]: + """Benchmark RDKit ``EmbedMultipleConfs``; return ``(timing, processed_mols, processed_count)``. + + When ``max_seconds > 0``, the inner loop stops processing molecules once + wall-clock elapsed exceeds the cap. The reported timing is over the + molecules actually processed; throughput is items / elapsed at the call + site. Cloned molecules that were never processed are omitted from the + returned list so downstream energy validation only sees comparable inputs. + """ + import time as _time + last_run_mols: list[list[Chem.Mol]] = [[]] + processed_count = [0] @nvtx.annotate("etkdg_rdkit_run", color="yellow") def run() -> None: cloned = clone_mols_with_conformers(mols) + deadline = _time.perf_counter() + max_seconds if max_seconds > 0 else None + n_done = 0 for mol in cloned: rdDistGeom.EmbedMultipleConfs(mol, numConfs=confs_per_mol, params=params) - last_run_mols[0] = cloned + n_done += 1 + if deadline is not None and _time.perf_counter() >= deadline: + break + last_run_mols[0] = cloned[:n_done] + processed_count[0] = n_done if warmup: warmup_mol = Chem.RWMol(mols[0]) rdDistGeom.EmbedMultipleConfs(warmup_mol, numConfs=1, params=params) timing = time_it(run, runs=runs, warmups=0, gpu_sync=False) - return timing, last_run_mols[0] + return timing, last_run_mols[0], processed_count[0] def _build_etkdg_params(max_iterations: int, num_threads: int, seed: int) -> rdDistGeom.EmbedParameters: @@ -176,10 +193,11 @@ def _build_hardware_options( CSV_HEADER = ( - "method,input_file,input_type,num_mols,confs_per_mol,max_iterations," + "method,input_file,input_type,num_mols,mols_processed,confs_per_mol,max_iterations," "batch_size,batches_per_gpu,prep_threads,num_gpus,nvmolkit_config_source," - "rdkit_threads,time_ms,std_ms,conformers_generated,mean_energy_diff,median_energy_diff," - "energy_diff_pairs" + "rdkit_threads,rdkit_max_seconds,time_ms,std_ms,conformers_generated," + "confs_per_second,vs_rdkit_throughput_ratio," + "mean_energy_diff,median_energy_diff,energy_diff_pairs" ) @@ -219,6 +237,16 @@ def main() -> None: default=1, help="Threads passed to RDKit ETKDG via params.numThreads (default: 1)", ) + parser.add_argument( + "--rdkit_max_seconds", + type=float, + default=0.0, + help=( + "Stop the RDKit comparison after this many wall-clock seconds and " + "report throughput on the molecules actually processed. 0 disables " + "the cap and runs the full workload (default: 0)." + ), + ) parser.add_argument("--batch_size", "-b", type=int, default=1024, help="nvmolkit batch size (default: 1024)") parser.add_argument( @@ -455,10 +483,16 @@ def main() -> None: except ImportError as exc: print(f" nvmolkit: SKIPPED (import error: {exc})") + rdkit_processed_count = len(mols) if not args.no_rdkit: print("\nRunning RDKit ETKDG benchmark...") - rd_timing, rd_mols = bench_rdkit(mols, params, args.confs_per_mol, args.runs, args.warmup) - print(f" RDKit: {rd_timing.mean_ms:10.2f} ms (+/- {rd_timing.std_ms:.2f} ms)") + rd_timing, rd_mols, rdkit_processed_count = bench_rdkit( + mols, params, args.confs_per_mol, args.runs, args.warmup, max_seconds=args.rdkit_max_seconds + ) + print( + f" RDKit: {rd_timing.mean_ms:10.2f} ms (+/- {rd_timing.std_ms:.2f} ms)" + f" [processed {rdkit_processed_count}/{len(mols)} mols]" + ) results["rdkit"] = (rd_timing, rd_mols) if not results: @@ -467,11 +501,16 @@ def main() -> None: print("\n" + "=" * 70) print("Summary:") - baseline_ms = results["rdkit"][0].mean_ms if "rdkit" in results else None - for name, (timing, _) in results.items(): + rdkit_throughput_per_s = None + if "rdkit" in results: + rd_timing = results["rdkit"][0] + if rd_timing.mean_ms > 0: + rdkit_throughput_per_s = (rdkit_processed_count * args.confs_per_mol) / (rd_timing.mean_ms / 1000.0) + for name, (timing, run_mols) in results.items(): speedup = "" - if baseline_ms is not None and name != "rdkit" and timing.mean_ms > 0: - speedup = f", {baseline_ms / timing.mean_ms:.1f}x vs RDKit" + if rdkit_throughput_per_s is not None and name != "rdkit" and timing.mean_ms > 0: + method_throughput = (len(mols) * args.confs_per_mol) / (timing.mean_ms / 1000.0) + speedup = f", {method_throughput / rdkit_throughput_per_s:.1f}x vs RDKit (throughput)" print(f" {name:20s}: {timing.mean_ms:10.2f} ms (+/- {timing.std_ms:.2f} ms){speedup}") energy_mean = float("nan") @@ -493,21 +532,33 @@ def main() -> None: csv_rows: list[str] = [] for name, (timing, run_mols) in results.items(): is_nv = name == "nvmolkit" + is_rdkit = name == "rdkit" batch_size = applied_batch_size if is_nv else "N/A" batches_per_gpu = applied_batches_per_gpu if is_nv else "N/A" prep_threads = applied_prep_threads if is_nv else "N/A" num_gpus = applied_num_gpus if is_nv else "N/A" nvmolkit_config_source = config_source if is_nv else "N/A" - rdkit_threads = args.rdkit_threads if name == "rdkit" else "N/A" + rdkit_threads = args.rdkit_threads if is_rdkit else "N/A" + rdkit_max_seconds = args.rdkit_max_seconds if is_rdkit else "N/A" + mols_processed = rdkit_processed_count if is_rdkit else len(mols) confs_generated = _conformer_count(run_mols) + confs_per_second = ( + (mols_processed * args.confs_per_mol) / (timing.mean_ms / 1000.0) if timing.mean_ms > 0 else float("nan") + ) + if rdkit_throughput_per_s is not None and not is_rdkit and timing.mean_ms > 0: + vs_rdkit_throughput_ratio = f"{confs_per_second / rdkit_throughput_per_s:.4f}" + else: + vs_rdkit_throughput_ratio = "N/A" mean_diff = energy_mean if (diff_computed and is_nv) else "N/A" median_diff = energy_median if (diff_computed and is_nv) else "N/A" pairs = energy_pairs if (diff_computed and is_nv) else "N/A" csv_rows.append( - f"{name},{input_file},{input_type},{len(mols)},{args.confs_per_mol}," + f"{name},{input_file},{input_type},{len(mols)},{mols_processed},{args.confs_per_mol}," f"{args.max_iterations},{batch_size},{batches_per_gpu},{prep_threads},{num_gpus}," - f"{nvmolkit_config_source},{rdkit_threads},{timing.mean_ms:.2f},{timing.std_ms:.2f}," - f"{confs_generated},{mean_diff},{median_diff},{pairs}" + f"{nvmolkit_config_source},{rdkit_threads},{rdkit_max_seconds}," + f"{timing.mean_ms:.2f},{timing.std_ms:.2f}," + f"{confs_generated},{confs_per_second:.2f},{vs_rdkit_throughput_ratio}," + f"{mean_diff},{median_diff},{pairs}" ) print("\n\nCSV Results:") diff --git a/benchmarks/ff_optimize_bench.py b/benchmarks/ff_optimize_bench.py index 7324c17c..61b6a3f7 100644 --- a/benchmarks/ff_optimize_bench.py +++ b/benchmarks/ff_optimize_bench.py @@ -166,8 +166,17 @@ def bench_rdkit( runs: int, warmup: bool, num_threads: int, -) -> tuple[float, float, list[float]]: - """Benchmark RDKit MMFF/UFF optimization; return ``(mean_ms, std_ms, energies)``.""" + max_seconds: float = 0.0, +) -> tuple[float, float, list[float], int]: + """Benchmark RDKit MMFF/UFF optimization; return ``(mean_ms, std_ms, energies, processed_mols)``. + + When ``max_seconds > 0`` the per-molecule loop stops once wall-clock + elapsed exceeds the cap. Throughput at the call site should be measured + as items / elapsed; the returned timing is over the molecules actually + processed. + """ + import time as _time + if ff == "mmff": rdkit_optimize = lambda mol: AllChem.MMFFOptimizeMoleculeConfs( # noqa: E731 mol, numThreads=num_threads, maxIters=max_iters @@ -180,11 +189,21 @@ def bench_rdkit( raise ValueError(f"Unknown ff: {ff!r}") last_results: list[list[list[tuple[int, float]]]] = [[]] + processed_count = [0] @nvtx.annotate("ff_rdkit_run", color="yellow") def run() -> None: cloned = clone_mols_with_conformers(mols) - last_results[0] = [rdkit_optimize(mol) for mol in cloned] + deadline = _time.perf_counter() + max_seconds if max_seconds > 0 else None + out: list[list[tuple[int, float]]] = [] + n_done = 0 + for mol in cloned: + out.append(rdkit_optimize(mol)) + n_done += 1 + if deadline is not None and _time.perf_counter() >= deadline: + break + last_results[0] = out + processed_count[0] = n_done if warmup: warmup_mols = clone_mols_with_conformers(mols[: min(4, len(mols))]) @@ -195,7 +214,7 @@ def run() -> None: energies, not_converged = _flatten_rdkit_energies(last_results[0]) if not_converged > 0: print(f" RDKit: {not_converged} conformer(s) reported non-zero status (not converged)") - return result.mean_ms, result.std_ms, energies + return result.mean_ms, result.std_ms, energies, processed_count[0] def _build_hardware_options( @@ -213,9 +232,11 @@ def _build_hardware_options( CSV_HEADER = ( - "method,ff,input_file,input_type,num_mols,confs_per_mol,max_iters," + "method,ff,input_file,input_type,num_mols,mols_processed,confs_per_mol,max_iters," "batch_size,batches_per_gpu,prep_threads,num_gpus,nvmolkit_config_source," - "rdkit_threads,time_ms,std_ms,energies_compared,mean_abs_energy_diff,max_abs_energy_diff" + "rdkit_threads,rdkit_max_seconds,time_ms,std_ms," + "confs_per_second,vs_rdkit_throughput_ratio," + "energies_compared,mean_abs_energy_diff,max_abs_energy_diff" ) @@ -257,6 +278,16 @@ def main() -> None: default=1, help="Threads passed to RDKit FF optimizer via numThreads (default: 1)", ) + parser.add_argument( + "--rdkit_max_seconds", + type=float, + default=0.0, + help=( + "Stop the RDKit comparison after this many wall-clock seconds and " + "report throughput on the molecules actually processed. 0 disables " + "the cap and runs the full workload (default: 0)." + ), + ) parser.add_argument("--batch_size", "-b", type=int, default=1024, help="nvmolkit batch size (default: 1024)") parser.add_argument( @@ -482,12 +513,22 @@ def main() -> None: results["nvmolkit"] = (nv_avg, nv_std, nv_energies) torch.cuda.cudart().cudaProfilerStop() + rdkit_processed_count = len(mols) if not args.no_rdkit: print(f"\nRunning RDKit {args.ff.upper()} optimize benchmark...") - rd_avg, rd_std, rd_energies = bench_rdkit( - mols, args.ff, args.max_iters, args.runs, args.warmup, args.rdkit_threads + rd_avg, rd_std, rd_energies, rdkit_processed_count = bench_rdkit( + mols, + args.ff, + args.max_iters, + args.runs, + args.warmup, + args.rdkit_threads, + max_seconds=args.rdkit_max_seconds, + ) + print( + f" RDKit: {rd_avg:10.2f} ms (+/- {rd_std:.2f} ms)" + f" [processed {rdkit_processed_count}/{len(mols)} mols]" ) - print(f" RDKit: {rd_avg:10.2f} ms (+/- {rd_std:.2f} ms)") results["rdkit"] = (rd_avg, rd_std, rd_energies) if not results: @@ -496,11 +537,16 @@ def main() -> None: print("\n" + "=" * 70) print("Summary:") - baseline_ms = results.get("rdkit", (None, None, None))[0] + rdkit_throughput_per_s = None + if "rdkit" in results: + rd_avg = results["rdkit"][0] + if rd_avg > 0: + rdkit_throughput_per_s = (rdkit_processed_count * args.confs_per_mol) / (rd_avg / 1000.0) for name, (avg_ms, std_ms, _) in results.items(): speedup = "" - if baseline_ms is not None and name != "rdkit" and avg_ms > 0: - speedup = f", {baseline_ms / avg_ms:.1f}x vs RDKit" + if rdkit_throughput_per_s is not None and name != "rdkit" and avg_ms > 0: + method_throughput = (len(mols) * args.confs_per_mol) / (avg_ms / 1000.0) + speedup = f", {method_throughput / rdkit_throughput_per_s:.1f}x vs RDKit (throughput)" print(f" {name:20s}: {avg_ms:10.2f} ms (+/- {std_ms:.2f} ms){speedup}") energy_mean = float("nan") @@ -531,19 +577,29 @@ def main() -> None: csv_rows: list[str] = [] for name, (avg_ms, std_ms, energies) in results.items(): is_nv = name == "nvmolkit" + is_rdkit = name == "rdkit" batch_size = applied_batch_size if is_nv else "N/A" batches_per_gpu = applied_batches_per_gpu if is_nv else "N/A" prep_threads = applied_prep_threads if is_nv else "N/A" num_gpus = applied_num_gpus if is_nv else "N/A" nvmolkit_config_source = config_source if is_nv else "N/A" - rdkit_threads = args.rdkit_threads if name == "rdkit" else "N/A" + rdkit_threads = args.rdkit_threads if is_rdkit else "N/A" + rdkit_max_seconds = args.rdkit_max_seconds if is_rdkit else "N/A" + mols_processed = rdkit_processed_count if is_rdkit else len(mols) + confs_per_second = (mols_processed * args.confs_per_mol) / (avg_ms / 1000.0) if avg_ms > 0 else float("nan") + if rdkit_throughput_per_s is not None and not is_rdkit and avg_ms > 0: + vs_rdkit_throughput_ratio = f"{confs_per_second / rdkit_throughput_per_s:.4f}" + else: + vs_rdkit_throughput_ratio = "N/A" mean_diff = energy_mean if (args.validate and is_nv) else "N/A" max_diff = energy_max if (args.validate and is_nv) else "N/A" pairs = energy_pairs if (args.validate and is_nv) else "N/A" csv_rows.append( - f"{name},{args.ff},{input_file},{input_type},{len(mols)},{args.confs_per_mol}," + f"{name},{args.ff},{input_file},{input_type},{len(mols)},{mols_processed},{args.confs_per_mol}," f"{args.max_iters},{batch_size},{batches_per_gpu},{prep_threads},{num_gpus}," - f"{nvmolkit_config_source},{rdkit_threads},{avg_ms:.2f},{std_ms:.2f}," + f"{nvmolkit_config_source},{rdkit_threads},{rdkit_max_seconds}," + f"{avg_ms:.2f},{std_ms:.2f}," + f"{confs_per_second:.2f},{vs_rdkit_throughput_ratio}," f"{pairs},{mean_diff},{max_diff}" ) diff --git a/benchmarks/substruct_bench.py b/benchmarks/substruct_bench.py index c5df1638..dd7988a0 100644 --- a/benchmarks/substruct_bench.py +++ b/benchmarks/substruct_bench.py @@ -47,6 +47,10 @@ # Use SubstructLibrary with native threading: python substruct_bench.py --smiles --smarts --rdkit_match_mode substructlib --rdkit_threads 8 + # Sweep multiple RDKit match modes and thread counts in one run: + python substruct_bench.py --smiles --smarts \ + --rdkit_match_mode raw substructlib --rdkit_threads 1 4 16 + # Run multiple configurations from a dataframe (smarts, batch_size, workers, prep_threads, mode, num_gpus): python substruct_bench.py --smiles --config @@ -57,8 +61,10 @@ import pickle import random import sys +import time from functools import partial from multiprocessing import Pool +from statistics import mean, pstdev from typing import Callable, Iterator import nvtx @@ -143,8 +149,6 @@ def load_smiles(filepath: str, max_count: int = 0, sanitize: bool = True, seed: streaming pass (with a 10% buffer to absorb parse failures) so the file isn't fully loaded into memory and only the sampled SMILES are parsed. """ - # Use a 10% buffer to account for potential parse failures - # "On parse failures continue down the file. Load 10% more molecules than needed" read_limit = int(max_count * 1.1) if max_count > 0 else 0 if read_limit > 0: @@ -176,7 +180,6 @@ def load_smiles(filepath: str, max_count: int = 0, sanitize: bool = True, seed: if parse_failures > 0: print(f" ({parse_failures} parse failures)") - # Trim to exactly max_count if we have more than requested if max_count > 0 and len(mols) > max_count: mols = mols[:max_count] @@ -211,6 +214,49 @@ def load_smarts(filepath: str, max_count: int = 0) -> tuple[list[Chem.Mol], list return queries, smarts_list +def _time_runs_bounded( + run: Callable[[], None], + runs: int, + max_seconds: float, + pairs_done_getter: Callable[[], int], + pairs_total: int, +) -> tuple[float, float, int]: + """Repeat ``run`` up to ``runs`` times, stopping early once ``max_seconds`` elapses. + + The supplied ``run`` closure is responsible for honouring ``max_seconds`` + inside a single invocation (e.g. by checking a per-run deadline). After + each run we read ``pairs_done_getter()`` to learn how many target/query + pairs were actually evaluated; if that's below ``pairs_total`` we treat + the run as partial and stop iterating. + + @return tuple of (avg_ms, std_ms, last_pairs_processed). avg/std are + computed only over runs that completed end-to-end (or the single + partial run if no full run was reached). + """ + deadline = time.perf_counter() + max_seconds if max_seconds > 0 else None + completed_times_ms: list[float] = [] + partial_time_ms: float | None = None + last_pairs = 0 + for _run_index in range(runs): + if deadline is not None and time.perf_counter() > deadline: + break + start = time.perf_counter() + run() + elapsed_ms = (time.perf_counter() - start) * 1000.0 + last_pairs = pairs_done_getter() + if last_pairs < pairs_total: + partial_time_ms = elapsed_ms + break + completed_times_ms.append(elapsed_ms) + if completed_times_ms: + avg_ms = mean(completed_times_ms) + std_ms = pstdev(completed_times_ms) if len(completed_times_ms) > 1 else 0.0 + return avg_ms, std_ms, last_pairs + if partial_time_ms is not None: + return partial_time_ms, 0.0, last_pairs + return 0.0, 0.0, last_pairs + + _worker_queries = None _worker_params = None @@ -245,15 +291,32 @@ def _rdkit_worker_count(mol_binary: bytes) -> list[int]: @nvtx.annotate("bench_rdkit_substruct", color="green") def bench_rdkit_substruct( - mols: list[Chem.Mol], queries: list[Chem.Mol], runs: int, mode: str, max_matches: int, threads: int = 1 -) -> tuple[float, float, list]: - """Benchmark RDKit SubstructMatch API.""" + mols: list[Chem.Mol], + queries: list[Chem.Mol], + runs: int, + mode: str, + max_matches: int, + threads: int = 1, + max_seconds: float = 0.0, +) -> tuple[float, float, list, int]: + """Benchmark RDKit SubstructMatch API. + + @param max_seconds When > 0, abort additional runs (and the per-molecule + loop in single-threaded mode) once the elapsed time + exceeds this budget. The threaded path can only be + bounded between runs since `pool.map` is monolithic. + @return tuple of (avg_ms, std_ms, results_data, pairs_processed_per_run). + """ + num_mols = len(mols) + num_queries = len(queries) + pairs_total = num_mols * num_queries params = Chem.SubstructMatchParameters() params.uniquify = False if max_matches > 0: params.maxMatches = max_matches - results_data = [] + results_data: list = [] + pairs_done_this_run = 0 if threads > 1: mol_binaries = [mol.ToBinary() for mol in mols] @@ -267,47 +330,60 @@ def bench_rdkit_substruct( chunksize = max(1, len(mol_binaries) // (threads * 4)) @nvtx.annotate("substruct_run_mp", color="yellow") - def run(): - nonlocal results_data + def run() -> None: + nonlocal results_data, pairs_done_this_run with Pool(threads, initializer=_rdkit_worker_init, initargs=(query_binaries, max_matches)) as pool: results_data = pool.map(worker_func, mol_binaries, chunksize=chunksize) + pairs_done_this_run = pairs_total else: @nvtx.annotate("substruct_run", color="yellow") - def run(): - nonlocal results_data + def run() -> None: + nonlocal results_data, pairs_done_this_run results_data = [] - if mode == "hasSubstructMatch": - for mol in mols: - mol_results = [] + pairs_done_this_run = 0 + run_deadline = time.perf_counter() + max_seconds if max_seconds > 0 else None + for mol in mols: + if run_deadline is not None and time.perf_counter() > run_deadline: + break + mol_results: list = [] + if mode == "hasSubstructMatch": for query in queries: mol_results.append(mol.HasSubstructMatch(query, params)) - results_data.append(mol_results) - elif mode == "countSubstructMatches": - for mol in mols: - mol_results = [] + elif mode == "countSubstructMatches": for query in queries: mol_results.append(len(mol.GetSubstructMatches(query, params))) - results_data.append(mol_results) - else: - for mol in mols: - mol_results = [] + else: for query in queries: - matches = mol.GetSubstructMatches(query, params) - mol_results.append(matches) - results_data.append(mol_results) + mol_results.append(mol.GetSubstructMatches(query, params)) + results_data.append(mol_results) + pairs_done_this_run += num_queries - avg_ms, std_ms = time_it(run, runs) - return avg_ms, std_ms, results_data + avg_ms, std_ms, last_pairs = _time_runs_bounded(run, runs, max_seconds, lambda: pairs_done_this_run, pairs_total) + return avg_ms, std_ms, results_data, last_pairs @nvtx.annotate("bench_rdkit_substructlib", color="green") def bench_rdkit_substructlib( - mols: list[Chem.Mol], queries: list[Chem.Mol], runs: int, mode: str, max_matches: int, threads: int = 1 -) -> tuple[float, float, list]: - """Benchmark RDKit SubstructLibrary API with native multithreading.""" + mols: list[Chem.Mol], + queries: list[Chem.Mol], + runs: int, + mode: str, + max_matches: int, + threads: int = 1, + max_seconds: float = 0.0, +) -> tuple[float, float, list, int]: + """Benchmark RDKit SubstructLibrary API with native multithreading. + + @param max_seconds When > 0, abort the per-query loop once the elapsed + time exceeds this budget. The library build itself + still runs to completion since `lib.GetMatches` is + the only point where partial results are well-defined. + @return tuple of (avg_ms, std_ms, results_data, pairs_processed_per_run). + """ num_mols = len(mols) num_queries = len(queries) + pairs_total = num_mols * num_queries params = Chem.SubstructMatchParameters() params.uniquify = False @@ -315,10 +391,11 @@ def bench_rdkit_substructlib( params.maxMatches = max_matches results_data = [[None] * num_queries for _ in range(num_mols)] + pairs_done_this_run = 0 @nvtx.annotate("substructlib_run", color="yellow") - def run(): - nonlocal results_data + def run() -> None: + nonlocal results_data, pairs_done_this_run mol_holder = rdSubstructLibrary.CachedMolHolder() fp_holder = rdSubstructLibrary.PatternHolder() @@ -327,34 +404,33 @@ def run(): lib.AddMol(mol) results_data = [[None] * num_queries for _ in range(num_mols)] + pairs_done_this_run = 0 + run_deadline = time.perf_counter() + max_seconds if max_seconds > 0 else None - if mode == "hasSubstructMatch": - for q_idx, query in enumerate(queries): - matching_indices = lib.GetMatches(query, numThreads=threads) - matching_set = set(matching_indices) + for q_idx, query in enumerate(queries): + if run_deadline is not None and time.perf_counter() > run_deadline: + break + matching_indices = lib.GetMatches(query, numThreads=threads) + matching_set = set(matching_indices) + if mode == "hasSubstructMatch": for m_idx in range(num_mols): results_data[m_idx][q_idx] = m_idx in matching_set - elif mode == "countSubstructMatches": - for q_idx, query in enumerate(queries): - matching_indices = lib.GetMatches(query, numThreads=threads) - matching_set = set(matching_indices) + elif mode == "countSubstructMatches": for m_idx in range(num_mols): if m_idx in matching_set: results_data[m_idx][q_idx] = len(mols[m_idx].GetSubstructMatches(query, params)) else: results_data[m_idx][q_idx] = 0 - else: - for q_idx, query in enumerate(queries): - matching_indices = lib.GetMatches(query, numThreads=threads) - matching_set = set(matching_indices) + else: for m_idx in range(num_mols): if m_idx in matching_set: results_data[m_idx][q_idx] = mols[m_idx].GetSubstructMatches(query, params) else: results_data[m_idx][q_idx] = () + pairs_done_this_run += num_mols - avg_ms, std_ms = time_it(run, runs) - return avg_ms, std_ms, results_data + avg_ms, std_ms, last_pairs = _time_runs_bounded(run, runs, max_seconds, lambda: pairs_done_this_run, pairs_total) + return avg_ms, std_ms, results_data, last_pairs @nvtx.annotate("bench_nvmolkit", color="red") @@ -422,14 +498,34 @@ def main(): parser.add_argument( "--rdkit_match_mode", choices=["raw", "substructlib"], - default="raw", - help="RDKit matching mode: raw (direct API) or substructlib (SubstructLibrary) (default: raw)", + nargs="+", + default=["raw"], + help=( + "RDKit matching mode(s) to benchmark. Pass one or more of 'raw' / 'substructlib'; " + "every mode is combined with every value of --rdkit_threads. (default: raw)" + ), ) parser.add_argument( "--rdkit_threads", type=int, - default=1, - help="RDKit threads (multiprocessing for raw, native for substructlib) (default: 1)", + nargs="+", + default=[1], + help=( + "RDKit thread count(s) to benchmark (multiprocessing for raw, native for substructlib). " + "Pass multiple values to sweep; the cartesian product with --rdkit_match_mode is run. " + "(default: 1)" + ), + ) + parser.add_argument( + "--rdkit_max_seconds", + type=float, + default=0.0, + help=( + "When > 0, cap the RDKit benchmark wall time per run to this many seconds. " + "RDKit aborts between queries (substructlib) or molecules (raw, single-thread); " + "the threaded raw path can only be bounded between runs. Reported throughput " + "uses the actual pairs processed. Default 0 = no limit." + ), ) parser.add_argument("--batch_size", "-b", type=int, default=1024, help="nvmolkit batch size (default: 1024)") parser.add_argument("--workers", type=int, default=-1, help="nvmolkit GPU worker threads per GPU (-1 = auto)") @@ -553,8 +649,8 @@ def main(): print(f" Run nvmolkit: {not args.no_nvmolkit}") print(f" Run RDKit: {not args.no_rdkit}") if not args.no_rdkit: - print(f" RDKit match mode: {args.rdkit_match_mode}") - print(f" RDKit threads: {args.rdkit_threads}") + print(f" RDKit match modes: {args.rdkit_match_mode}") + print(f" RDKit thread counts: {args.rdkit_threads}") if args.config: print(f" Config dataframe: {args.config}") else: @@ -722,25 +818,48 @@ def main(): print("Running nvmolkit GPU benchmark...") nvmolkit_avg, nvmolkit_std, nvmolkit_results = bench_nvmolkit(mols, queries, args.runs, mode, config) print(f" nvmolkit: {nvmolkit_avg:10.2f} ms (± {nvmolkit_std:.2f} ms)") - results["nvmolkit"] = (nvmolkit_avg, nvmolkit_std, nvmolkit_results) + results["nvmolkit"] = (nvmolkit_avg, nvmolkit_std, nvmolkit_results, len(mols) * num_patterns) torch.cuda.cudart().cudaProfilerStop() except ImportError as e: print(f" nvmolkit: SKIPPED (import error: {e})") + rdkit_variants: list[tuple[str, str, int]] = [] if not args.no_rdkit: - if args.rdkit_match_mode == "substructlib": - print("\nRunning RDKit SubstructLibrary benchmark...") - rdkit_avg, rdkit_std, rdkit_results = bench_rdkit_substructlib( - mols, queries, args.runs, mode, args.max_matches, args.rdkit_threads - ) - else: - print("\nRunning RDKit SubstructMatch benchmark...") - rdkit_avg, rdkit_std, rdkit_results = bench_rdkit_substruct( - mols, queries, args.runs, mode, args.max_matches, args.rdkit_threads - ) - print(f" RDKit: {rdkit_avg:10.2f} ms (± {rdkit_std:.2f} ms)") - results["rdkit"] = (rdkit_avg, rdkit_std, rdkit_results) + pairs_total = len(mols) * num_patterns + for rdkit_mode in args.rdkit_match_mode: + for rdkit_threads in args.rdkit_threads: + variant_key = f"rdkit_{rdkit_mode}_t{rdkit_threads}" + if rdkit_mode == "substructlib": + print(f"\nRunning RDKit SubstructLibrary benchmark (threads={rdkit_threads})...") + rdkit_avg, rdkit_std, rdkit_results, rdkit_pairs = bench_rdkit_substructlib( + mols, + queries, + args.runs, + mode, + args.max_matches, + rdkit_threads, + args.rdkit_max_seconds, + ) + else: + print(f"\nRunning RDKit SubstructMatch benchmark (threads={rdkit_threads})...") + rdkit_avg, rdkit_std, rdkit_results, rdkit_pairs = bench_rdkit_substruct( + mols, + queries, + args.runs, + mode, + args.max_matches, + rdkit_threads, + args.rdkit_max_seconds, + ) + if rdkit_pairs < pairs_total: + print( + f" RDKit hit max_seconds budget: processed {rdkit_pairs}/{pairs_total} pairs " + f"({100.0 * rdkit_pairs / pairs_total:.1f}%) in {rdkit_avg:.2f} ms" + ) + print(f" {variant_key:24s}: {rdkit_avg:10.2f} ms (± {rdkit_std:.2f} ms)") + results[variant_key] = (rdkit_avg, rdkit_std, rdkit_results, rdkit_pairs) + rdkit_variants.append((variant_key, rdkit_mode, rdkit_threads)) print("\n" + "=" * 70) print("Summary:") @@ -750,57 +869,85 @@ def main(): sys.exit(1) baseline = None - if "rdkit" in results: - baseline = ("RDKit", results["rdkit"][0]) - - for name, (avg_ms, std_ms, _) in results.items(): + baseline_key = None + best_rdkit_throughput = 0.0 + for variant_key, _mode, _threads in rdkit_variants: + rdkit_avg_ms = results[variant_key][0] + rdkit_pairs_done = results[variant_key][3] + throughput = (rdkit_pairs_done * 1000.0 / rdkit_avg_ms) if rdkit_avg_ms > 0 else 0.0 + if throughput > best_rdkit_throughput: + best_rdkit_throughput = throughput + baseline = (variant_key, rdkit_avg_ms, throughput) + baseline_key = variant_key + + for name, (avg_ms, std_ms, _, pairs_done) in results.items(): speedup_str = "" - if baseline and name != "rdkit": - speedup = baseline[1] / avg_ms if avg_ms > 0 else 0 - speedup_str = f", {speedup:.1f}x vs {baseline[0]}" - print(f" {name:20s}: {avg_ms:10.2f} ms (± {std_ms:.2f} ms){speedup_str}") + throughput = (pairs_done * 1000.0 / avg_ms) if avg_ms > 0 else 0.0 + if baseline and name != baseline_key and not name.startswith("rdkit_"): + speedup = throughput / baseline[2] if baseline[2] > 0 else 0 + speedup_str = f", {speedup:.1f}x vs {baseline[0]} (throughput-normalised)" + print( + f" {name:24s}: {avg_ms:10.2f} ms (± {std_ms:.2f} ms), " + f"{pairs_done:,} pairs, {throughput:,.0f} pairs/s{speedup_str}" + ) - if args.validate and "nvmolkit" in results and "rdkit" in results: - print("\nValidation:") - nvmolkit_data = results["nvmolkit"][2] - rdkit_data = results["rdkit"][2] + validation_key = None + if rdkit_variants: + pairs_total = len(mols) * len(queries) + for variant_key, _mode, _threads in rdkit_variants: + if results[variant_key][3] >= pairs_total: + validation_key = variant_key + break - if mode == "hasSubstructMatch": - matches = 0 - total = 0 - for t in range(len(mols)): - for q in range(len(queries)): - nv_match = bool(nvmolkit_data[t][q]) - rd_match = rdkit_data[t][q] - if nv_match == rd_match: - matches += 1 - total += 1 - pct = 100.0 * matches / total if total > 0 else 0 - print(f" Boolean match agreement: {matches}/{total} ({pct:.1f}%)") - elif mode == "countSubstructMatches": - matches = 0 - total = 0 - for t in range(len(mols)): - for q in range(len(queries)): - nv_count = int(nvmolkit_data[t][q]) - rd_count = int(rdkit_data[t][q]) - if nv_count == rd_count: - matches += 1 - total += 1 - pct = 100.0 * matches / total if total > 0 else 0 - print(f" Count agreement: {matches}/{total} ({pct:.1f}%)") + if args.validate and "nvmolkit" in results and rdkit_variants: + print("\nValidation:") + pairs_total = len(mols) * len(queries) + if validation_key is None: + print( + f" Skipping validation: every RDKit variant hit max_seconds budget before " + f"{pairs_total} pairs and the partial-result indices differ between " + "substructlib (per-query) and raw (per-mol)." + ) else: - matches = 0 - total = 0 - for t in range(len(mols)): - for q in range(len(queries)): - nv_matches = set(tuple(m) for m in nvmolkit_data[t][q]) - rd_matches = set(rdkit_data[t][q]) - if nv_matches == rd_matches: - matches += 1 - total += 1 - pct = 100.0 * matches / total if total > 0 else 0 - print(f" Full match agreement: {matches}/{total} ({pct:.1f}%)") + print(f" Validating against {validation_key}") + nvmolkit_data = results["nvmolkit"][2] + rdkit_data = results[validation_key][2] + if mode == "hasSubstructMatch": + matches = 0 + total = 0 + for t in range(len(mols)): + for q in range(len(queries)): + nv_match = bool(nvmolkit_data[t][q]) + rd_match = rdkit_data[t][q] + if nv_match == rd_match: + matches += 1 + total += 1 + pct = 100.0 * matches / total if total > 0 else 0 + print(f" Boolean match agreement: {matches}/{total} ({pct:.1f}%)") + elif mode == "countSubstructMatches": + matches = 0 + total = 0 + for t in range(len(mols)): + for q in range(len(queries)): + nv_count = int(nvmolkit_data[t][q]) + rd_count = int(rdkit_data[t][q]) + if nv_count == rd_count: + matches += 1 + total += 1 + pct = 100.0 * matches / total if total > 0 else 0 + print(f" Count agreement: {matches}/{total} ({pct:.1f}%)") + else: + matches = 0 + total = 0 + for t in range(len(mols)): + for q in range(len(queries)): + nv_matches = set(tuple(m) for m in nvmolkit_data[t][q]) + rd_matches = set(rdkit_data[t][q]) + if nv_matches == rd_matches: + matches += 1 + total += 1 + pct = 100.0 * matches / total if total > 0 else 0 + print(f" Full match agreement: {matches}/{total} ({pct:.1f}%)") if ran_nvmolkit: applied_batch_size = int(config.batchSize) @@ -820,14 +967,23 @@ def main(): else: config_source = "cli" - for name, (avg_ms, std_ms, _) in results.items(): + rdkit_variant_meta = {key: (mode, threads) for key, mode, threads in rdkit_variants} + baseline_throughput = baseline[2] if baseline else 0.0 + + for name, (avg_ms, std_ms, _, pairs_done) in results.items(): + is_rdkit = name in rdkit_variant_meta batch_size = applied_batch_size if name == "nvmolkit" else "N/A" workers = applied_workers if name == "nvmolkit" else "N/A" prep_threads = applied_prep_threads if name == "nvmolkit" else "N/A" num_gpus = applied_num_gpus if name == "nvmolkit" else config_row["num_gpus"] nvmolkit_config_source = config_source if name == "nvmolkit" else "N/A" - rdkit_threads = args.rdkit_threads if name == "rdkit" else "N/A" - rdkit_match_mode = args.rdkit_match_mode if name == "rdkit" else "N/A" + if is_rdkit: + rdkit_match_mode, rdkit_threads = rdkit_variant_meta[name] + else: + rdkit_match_mode = "N/A" + rdkit_threads = "N/A" + throughput = (pairs_done * 1000.0 / avg_ms) if avg_ms > 0 else 0.0 + vs_rdkit = (throughput / baseline_throughput) if (not is_rdkit and baseline_throughput > 0) else "N/A" csv_rows.append( ( name, @@ -848,6 +1004,10 @@ def main(): rdkit_match_mode, avg_ms, std_ms, + pairs_done, + args.rdkit_max_seconds, + throughput, + vs_rdkit, ) ) @@ -861,7 +1021,8 @@ def main(): print( "method,mode,smarts,input_file,input_type,sanitize,num_mols,num_patterns," "max_matches,batch_size,num_gpus,workers,prep_threads,nvmolkit_config_source," - "rdkit_threads,rdkit_match_mode,time_ms,std_ms" + "rdkit_threads,rdkit_match_mode,time_ms,std_ms," + "pairs_processed,rdkit_max_seconds,pairs_per_second,vs_rdkit_throughput_ratio" ) for row in csv_rows: ( @@ -883,11 +1044,17 @@ def main(): rdkit_match_mode, avg_ms, std_ms, + pairs_done, + rdkit_max_seconds, + throughput, + vs_rdkit, ) = row + vs_rdkit_str = f"{vs_rdkit:.4f}" if isinstance(vs_rdkit, float) else str(vs_rdkit) print( f"{name},{mode},{smarts_path},{input_file},{input_type},{sanitize}," f"{num_mols},{num_patterns},{max_matches},{batch_size},{num_gpus},{workers},{prep_threads}," - f"{nvmolkit_config_source},{rdkit_threads},{rdkit_match_mode},{avg_ms:.2f},{std_ms:.2f}" + f"{nvmolkit_config_source},{rdkit_threads},{rdkit_match_mode},{avg_ms:.2f},{std_ms:.2f}," + f"{pairs_done},{rdkit_max_seconds:g},{throughput:.2f},{vs_rdkit_str}" ) From b4f07c8c77c3d98984e95fe59fcc4acf1d6cb061 Mon Sep 17 00:00:00 2001 From: Kevin Boyd Date: Tue, 26 May 2026 15:34:36 -0400 Subject: [PATCH 2/5] Consolidate testing --- benchmarks/bench_utils/__init__.py | 13 +++- benchmarks/bench_utils/timing.py | 91 ++++++++++++++++++++++ benchmarks/etkdg_bench.py | 37 ++++----- benchmarks/ff_optimize_bench.py | 33 ++++---- benchmarks/substruct_bench.py | 75 ++++--------------- benchmarks/tests/conftest.py | 17 +++++ benchmarks/tests/test_timing.py | 116 +++++++++++++++++++++++++++++ 7 files changed, 280 insertions(+), 102 deletions(-) create mode 100644 benchmarks/tests/conftest.py create mode 100644 benchmarks/tests/test_timing.py diff --git a/benchmarks/bench_utils/__init__.py b/benchmarks/bench_utils/__init__.py index 13990f1f..60c98b4f 100644 --- a/benchmarks/bench_utils/__init__.py +++ b/benchmarks/bench_utils/__init__.py @@ -21,15 +21,26 @@ from bench_utils.loaders import load_pickle, load_sdf, load_smarts, load_smiles from bench_utils.molprep import clone_mols_with_conformers, prep_mols -from bench_utils.timing import TimingResult, time_it +from bench_utils.timing import ( + Deadline, + TimingResult, + add_rdkit_max_seconds_arg, + throughput_per_s, + time_it, + time_it_bounded, +) __all__ = [ + "Deadline", "TimingResult", + "add_rdkit_max_seconds_arg", "clone_mols_with_conformers", "load_pickle", "load_sdf", "load_smarts", "load_smiles", "prep_mols", + "throughput_per_s", "time_it", + "time_it_bounded", ] diff --git a/benchmarks/bench_utils/timing.py b/benchmarks/bench_utils/timing.py index 29c6738b..b70268cb 100644 --- a/benchmarks/bench_utils/timing.py +++ b/benchmarks/bench_utils/timing.py @@ -15,6 +15,7 @@ """Shared timing utilities for nvMolKit benchmarks.""" +import argparse import statistics import time from dataclasses import dataclass, field @@ -89,3 +90,93 @@ def sync() -> None: times_ms.append((t1 - t0) * 1000.0) return TimingResult(times_ms=times_ms) + + +class Deadline: + """Wall-clock budget that benchmark loops can poll for early termination. + + A ``max_seconds`` of ``0`` (or negative) disables the budget, in which + case :meth:`expired` always returns ``False``. Construction starts the + clock; pass the same instance to nested loops to share one deadline. + """ + + def __init__(self, max_seconds: float) -> None: + self._end: float | None = time.perf_counter() + max_seconds if max_seconds > 0 else None + + def expired(self) -> bool: + return self._end is not None and time.perf_counter() >= self._end + + @property + def active(self) -> bool: + """``True`` when a real budget is being enforced.""" + return self._end is not None + + +def throughput_per_s(items: float, elapsed_ms: float) -> float: + """Items per second from a millisecond count; ``NaN`` if ``elapsed_ms <= 0``.""" + if elapsed_ms <= 0: + return float("nan") + return items / (elapsed_ms / 1000.0) + + +def time_it_bounded( + run: Callable[[], None], + runs: int, + max_seconds: float, + progress_getter: Callable[[], int], + progress_target: int, +) -> tuple[float, float, int]: + """Repeat ``run`` up to ``runs`` times, stopping early on budget exhaustion. + + The supplied ``run`` closure is responsible for honouring the per-run + deadline internally (e.g. by checking a :class:`Deadline`). After each + invocation, ``progress_getter()`` reports how much of the workload was + actually completed; a value below ``progress_target`` is treated as a + partial run and further iterations are skipped. + + Returns ``(avg_ms, std_ms, last_progress)``. ``avg`` and ``std`` are + computed only over runs that completed end-to-end; if no full run + finished, the single partial timing is returned with ``std=0``. + """ + deadline = Deadline(max_seconds) + completed_times_ms: list[float] = [] + partial_time_ms: float | None = None + last_progress = 0 + for _ in range(runs): + if deadline.expired(): + break + start = time.perf_counter() + run() + elapsed_ms = (time.perf_counter() - start) * 1000.0 + last_progress = progress_getter() + if last_progress < progress_target: + partial_time_ms = elapsed_ms + break + completed_times_ms.append(elapsed_ms) + if completed_times_ms: + avg_ms = statistics.mean(completed_times_ms) + std_ms = statistics.pstdev(completed_times_ms) if len(completed_times_ms) > 1 else 0.0 + return avg_ms, std_ms, last_progress + if partial_time_ms is not None: + return partial_time_ms, 0.0, last_progress + return 0.0, 0.0, last_progress + + +def add_rdkit_max_seconds_arg(parser: argparse.ArgumentParser, *, extra_help: str = "") -> None: + """Register the shared ``--rdkit_max_seconds`` CLI flag. + + ``extra_help`` is appended to the standard help string so individual + benchmarks can describe how partial-run semantics apply to their RDKit + code path (e.g. per-molecule vs. per-query truncation). + """ + base_help = ( + "Stop the RDKit comparison after this many wall-clock seconds and " + "report throughput on the work actually completed. 0 disables the " + "cap and runs the full workload (default: 0)." + ) + parser.add_argument( + "--rdkit_max_seconds", + type=float, + default=0.0, + help=f"{base_help} {extra_help}".rstrip(), + ) diff --git a/benchmarks/etkdg_bench.py b/benchmarks/etkdg_bench.py index 88940f27..0c8402e8 100644 --- a/benchmarks/etkdg_bench.py +++ b/benchmarks/etkdg_bench.py @@ -33,12 +33,15 @@ import nvtx from bench_utils import ( + Deadline, TimingResult, + add_rdkit_max_seconds_arg, clone_mols_with_conformers, load_pickle, load_sdf, load_smiles, prep_mols, + throughput_per_s, time_it, ) from nvmolkit import autotune as nv_autotune @@ -142,20 +145,18 @@ def bench_rdkit( site. Cloned molecules that were never processed are omitted from the returned list so downstream energy validation only sees comparable inputs. """ - import time as _time - last_run_mols: list[list[Chem.Mol]] = [[]] processed_count = [0] @nvtx.annotate("etkdg_rdkit_run", color="yellow") def run() -> None: cloned = clone_mols_with_conformers(mols) - deadline = _time.perf_counter() + max_seconds if max_seconds > 0 else None + deadline = Deadline(max_seconds) n_done = 0 for mol in cloned: rdDistGeom.EmbedMultipleConfs(mol, numConfs=confs_per_mol, params=params) n_done += 1 - if deadline is not None and _time.perf_counter() >= deadline: + if deadline.expired(): break last_run_mols[0] = cloned[:n_done] processed_count[0] = n_done @@ -237,15 +238,9 @@ def main() -> None: default=1, help="Threads passed to RDKit ETKDG via params.numThreads (default: 1)", ) - parser.add_argument( - "--rdkit_max_seconds", - type=float, - default=0.0, - help=( - "Stop the RDKit comparison after this many wall-clock seconds and " - "report throughput on the molecules actually processed. 0 disables " - "the cap and runs the full workload (default: 0)." - ), + add_rdkit_max_seconds_arg( + parser, + extra_help="The RDKit ETKDG loop stops at the next molecule boundary once the budget is hit.", ) parser.add_argument("--batch_size", "-b", type=int, default=1024, help="nvmolkit batch size (default: 1024)") @@ -501,15 +496,15 @@ def main() -> None: print("\n" + "=" * 70) print("Summary:") - rdkit_throughput_per_s = None - if "rdkit" in results: - rd_timing = results["rdkit"][0] - if rd_timing.mean_ms > 0: - rdkit_throughput_per_s = (rdkit_processed_count * args.confs_per_mol) / (rd_timing.mean_ms / 1000.0) + rdkit_throughput_per_s: float | None = None + if "rdkit" in results and results["rdkit"][0].mean_ms > 0: + rdkit_throughput_per_s = throughput_per_s( + rdkit_processed_count * args.confs_per_mol, results["rdkit"][0].mean_ms + ) for name, (timing, run_mols) in results.items(): speedup = "" if rdkit_throughput_per_s is not None and name != "rdkit" and timing.mean_ms > 0: - method_throughput = (len(mols) * args.confs_per_mol) / (timing.mean_ms / 1000.0) + method_throughput = throughput_per_s(len(mols) * args.confs_per_mol, timing.mean_ms) speedup = f", {method_throughput / rdkit_throughput_per_s:.1f}x vs RDKit (throughput)" print(f" {name:20s}: {timing.mean_ms:10.2f} ms (+/- {timing.std_ms:.2f} ms){speedup}") @@ -542,9 +537,7 @@ def main() -> None: rdkit_max_seconds = args.rdkit_max_seconds if is_rdkit else "N/A" mols_processed = rdkit_processed_count if is_rdkit else len(mols) confs_generated = _conformer_count(run_mols) - confs_per_second = ( - (mols_processed * args.confs_per_mol) / (timing.mean_ms / 1000.0) if timing.mean_ms > 0 else float("nan") - ) + confs_per_second = throughput_per_s(mols_processed * args.confs_per_mol, timing.mean_ms) if rdkit_throughput_per_s is not None and not is_rdkit and timing.mean_ms > 0: vs_rdkit_throughput_ratio = f"{confs_per_second / rdkit_throughput_per_s:.4f}" else: diff --git a/benchmarks/ff_optimize_bench.py b/benchmarks/ff_optimize_bench.py index 61b6a3f7..f5d500e2 100644 --- a/benchmarks/ff_optimize_bench.py +++ b/benchmarks/ff_optimize_bench.py @@ -38,11 +38,14 @@ import nvtx import torch from bench_utils import ( + Deadline, + add_rdkit_max_seconds_arg, clone_mols_with_conformers, load_pickle, load_sdf, load_smiles, prep_mols, + throughput_per_s, time_it, ) from nvmolkit import autotune as nv_autotune @@ -175,8 +178,6 @@ def bench_rdkit( as items / elapsed; the returned timing is over the molecules actually processed. """ - import time as _time - if ff == "mmff": rdkit_optimize = lambda mol: AllChem.MMFFOptimizeMoleculeConfs( # noqa: E731 mol, numThreads=num_threads, maxIters=max_iters @@ -194,13 +195,13 @@ def bench_rdkit( @nvtx.annotate("ff_rdkit_run", color="yellow") def run() -> None: cloned = clone_mols_with_conformers(mols) - deadline = _time.perf_counter() + max_seconds if max_seconds > 0 else None + deadline = Deadline(max_seconds) out: list[list[tuple[int, float]]] = [] n_done = 0 for mol in cloned: out.append(rdkit_optimize(mol)) n_done += 1 - if deadline is not None and _time.perf_counter() >= deadline: + if deadline.expired(): break last_results[0] = out processed_count[0] = n_done @@ -278,15 +279,9 @@ def main() -> None: default=1, help="Threads passed to RDKit FF optimizer via numThreads (default: 1)", ) - parser.add_argument( - "--rdkit_max_seconds", - type=float, - default=0.0, - help=( - "Stop the RDKit comparison after this many wall-clock seconds and " - "report throughput on the molecules actually processed. 0 disables " - "the cap and runs the full workload (default: 0)." - ), + add_rdkit_max_seconds_arg( + parser, + extra_help="The RDKit FF optimizer loop stops at the next molecule boundary once the budget is hit.", ) parser.add_argument("--batch_size", "-b", type=int, default=1024, help="nvmolkit batch size (default: 1024)") @@ -537,15 +532,13 @@ def main() -> None: print("\n" + "=" * 70) print("Summary:") - rdkit_throughput_per_s = None - if "rdkit" in results: - rd_avg = results["rdkit"][0] - if rd_avg > 0: - rdkit_throughput_per_s = (rdkit_processed_count * args.confs_per_mol) / (rd_avg / 1000.0) + rdkit_throughput_per_s: float | None = None + if "rdkit" in results and results["rdkit"][0] > 0: + rdkit_throughput_per_s = throughput_per_s(rdkit_processed_count * args.confs_per_mol, results["rdkit"][0]) for name, (avg_ms, std_ms, _) in results.items(): speedup = "" if rdkit_throughput_per_s is not None and name != "rdkit" and avg_ms > 0: - method_throughput = (len(mols) * args.confs_per_mol) / (avg_ms / 1000.0) + method_throughput = throughput_per_s(len(mols) * args.confs_per_mol, avg_ms) speedup = f", {method_throughput / rdkit_throughput_per_s:.1f}x vs RDKit (throughput)" print(f" {name:20s}: {avg_ms:10.2f} ms (+/- {std_ms:.2f} ms){speedup}") @@ -586,7 +579,7 @@ def main() -> None: rdkit_threads = args.rdkit_threads if is_rdkit else "N/A" rdkit_max_seconds = args.rdkit_max_seconds if is_rdkit else "N/A" mols_processed = rdkit_processed_count if is_rdkit else len(mols) - confs_per_second = (mols_processed * args.confs_per_mol) / (avg_ms / 1000.0) if avg_ms > 0 else float("nan") + confs_per_second = throughput_per_s(mols_processed * args.confs_per_mol, avg_ms) if rdkit_throughput_per_s is not None and not is_rdkit and avg_ms > 0: vs_rdkit_throughput_ratio = f"{confs_per_second / rdkit_throughput_per_s:.4f}" else: diff --git a/benchmarks/substruct_bench.py b/benchmarks/substruct_bench.py index dd7988a0..8d4abf20 100644 --- a/benchmarks/substruct_bench.py +++ b/benchmarks/substruct_bench.py @@ -61,14 +61,18 @@ import pickle import random import sys -import time from functools import partial from multiprocessing import Pool -from statistics import mean, pstdev from typing import Callable, Iterator import nvtx import pandas as pd +from bench_utils import ( + Deadline, + add_rdkit_max_seconds_arg, + throughput_per_s, + time_it_bounded, +) from benchmark_timing import time_it as _time_it from nvmolkit import autotune as nv_autotune from nvmolkit.substructure import ( @@ -214,49 +218,6 @@ def load_smarts(filepath: str, max_count: int = 0) -> tuple[list[Chem.Mol], list return queries, smarts_list -def _time_runs_bounded( - run: Callable[[], None], - runs: int, - max_seconds: float, - pairs_done_getter: Callable[[], int], - pairs_total: int, -) -> tuple[float, float, int]: - """Repeat ``run`` up to ``runs`` times, stopping early once ``max_seconds`` elapses. - - The supplied ``run`` closure is responsible for honouring ``max_seconds`` - inside a single invocation (e.g. by checking a per-run deadline). After - each run we read ``pairs_done_getter()`` to learn how many target/query - pairs were actually evaluated; if that's below ``pairs_total`` we treat - the run as partial and stop iterating. - - @return tuple of (avg_ms, std_ms, last_pairs_processed). avg/std are - computed only over runs that completed end-to-end (or the single - partial run if no full run was reached). - """ - deadline = time.perf_counter() + max_seconds if max_seconds > 0 else None - completed_times_ms: list[float] = [] - partial_time_ms: float | None = None - last_pairs = 0 - for _run_index in range(runs): - if deadline is not None and time.perf_counter() > deadline: - break - start = time.perf_counter() - run() - elapsed_ms = (time.perf_counter() - start) * 1000.0 - last_pairs = pairs_done_getter() - if last_pairs < pairs_total: - partial_time_ms = elapsed_ms - break - completed_times_ms.append(elapsed_ms) - if completed_times_ms: - avg_ms = mean(completed_times_ms) - std_ms = pstdev(completed_times_ms) if len(completed_times_ms) > 1 else 0.0 - return avg_ms, std_ms, last_pairs - if partial_time_ms is not None: - return partial_time_ms, 0.0, last_pairs - return 0.0, 0.0, last_pairs - - _worker_queries = None _worker_params = None @@ -342,9 +303,9 @@ def run() -> None: nonlocal results_data, pairs_done_this_run results_data = [] pairs_done_this_run = 0 - run_deadline = time.perf_counter() + max_seconds if max_seconds > 0 else None + run_deadline = Deadline(max_seconds) for mol in mols: - if run_deadline is not None and time.perf_counter() > run_deadline: + if run_deadline.expired(): break mol_results: list = [] if mode == "hasSubstructMatch": @@ -359,7 +320,7 @@ def run() -> None: results_data.append(mol_results) pairs_done_this_run += num_queries - avg_ms, std_ms, last_pairs = _time_runs_bounded(run, runs, max_seconds, lambda: pairs_done_this_run, pairs_total) + avg_ms, std_ms, last_pairs = time_it_bounded(run, runs, max_seconds, lambda: pairs_done_this_run, pairs_total) return avg_ms, std_ms, results_data, last_pairs @@ -405,10 +366,10 @@ def run() -> None: results_data = [[None] * num_queries for _ in range(num_mols)] pairs_done_this_run = 0 - run_deadline = time.perf_counter() + max_seconds if max_seconds > 0 else None + run_deadline = Deadline(max_seconds) for q_idx, query in enumerate(queries): - if run_deadline is not None and time.perf_counter() > run_deadline: + if run_deadline.expired(): break matching_indices = lib.GetMatches(query, numThreads=threads) matching_set = set(matching_indices) @@ -429,7 +390,7 @@ def run() -> None: results_data[m_idx][q_idx] = () pairs_done_this_run += num_mols - avg_ms, std_ms, last_pairs = _time_runs_bounded(run, runs, max_seconds, lambda: pairs_done_this_run, pairs_total) + avg_ms, std_ms, last_pairs = time_it_bounded(run, runs, max_seconds, lambda: pairs_done_this_run, pairs_total) return avg_ms, std_ms, results_data, last_pairs @@ -516,15 +477,11 @@ def main(): "(default: 1)" ), ) - parser.add_argument( - "--rdkit_max_seconds", - type=float, - default=0.0, - help=( - "When > 0, cap the RDKit benchmark wall time per run to this many seconds. " + add_rdkit_max_seconds_arg( + parser, + extra_help=( "RDKit aborts between queries (substructlib) or molecules (raw, single-thread); " - "the threaded raw path can only be bounded between runs. Reported throughput " - "uses the actual pairs processed. Default 0 = no limit." + "the threaded raw path can only be bounded between runs." ), ) parser.add_argument("--batch_size", "-b", type=int, default=1024, help="nvmolkit batch size (default: 1024)") diff --git a/benchmarks/tests/conftest.py b/benchmarks/tests/conftest.py new file mode 100644 index 00000000..ffff4e32 --- /dev/null +++ b/benchmarks/tests/conftest.py @@ -0,0 +1,17 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Pytest setup for benchmark unit tests. + +Bench scripts import the sibling ``bench_utils`` package without any +``sys.path`` manipulation because they run with ``benchmarks/`` as the +working directory. The same trick is needed for these tests, which live +one level deeper. +""" + +import sys +from pathlib import Path + +_BENCHMARKS_DIR = Path(__file__).resolve().parent.parent +if str(_BENCHMARKS_DIR) not in sys.path: + sys.path.insert(0, str(_BENCHMARKS_DIR)) diff --git a/benchmarks/tests/test_timing.py b/benchmarks/tests/test_timing.py new file mode 100644 index 00000000..2413e9f1 --- /dev/null +++ b/benchmarks/tests/test_timing.py @@ -0,0 +1,116 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Unit tests for the shared bench_utils timing helpers.""" + +import math +import time + +import pytest +from bench_utils.timing import Deadline, throughput_per_s, time_it_bounded + + +@pytest.mark.parametrize("max_seconds", [0.0, -1.0]) +def test_deadline_non_positive_is_disabled(max_seconds): + deadline = Deadline(max_seconds) + assert not deadline.active + assert not deadline.expired() + + +def test_deadline_expires_after_budget(): + deadline = Deadline(0.01) + assert deadline.active + assert not deadline.expired() + time.sleep(0.05) + assert deadline.expired() + + +def test_deadline_independent_instances(): + long = Deadline(10.0) + short = Deadline(0.01) + time.sleep(0.05) + assert short.expired() + assert not long.expired() + + +def test_throughput_per_s_simple_conversion(): + # 100 items in 500ms -> 200 items/s + assert throughput_per_s(100, 500.0) == pytest.approx(200.0) + + +@pytest.mark.parametrize("elapsed_ms", [0.0, -5.0]) +def test_throughput_per_s_non_positive_elapsed_returns_nan(elapsed_ms): + assert math.isnan(throughput_per_s(100, elapsed_ms)) + + +def test_time_it_bounded_runs_to_completion_when_progress_full(): + call_count = [0] + + def run(): + call_count[0] += 1 + time.sleep(0.001) + + avg_ms, std_ms, progress = time_it_bounded( + run, runs=3, max_seconds=0.0, progress_getter=lambda: 10, progress_target=10 + ) + + assert call_count[0] == 3 + assert avg_ms > 0 + assert std_ms >= 0 + assert progress == 10 + + +def test_time_it_bounded_stops_after_partial_run(): + call_count = [0] + progress = [10] + + def run(): + call_count[0] += 1 + progress[0] = 3 + + avg_ms, std_ms, last_progress = time_it_bounded( + run, runs=5, max_seconds=0.0, progress_getter=lambda: progress[0], progress_target=10 + ) + + assert call_count[0] == 1 + assert last_progress == 3 + assert std_ms == 0.0 + assert avg_ms > 0 + + +def test_time_it_bounded_stops_when_budget_exhausted_between_runs(): + call_count = [0] + + def run(): + call_count[0] += 1 + time.sleep(0.05) + + # 5 runs * 50ms = 250ms total, but budget is only 60ms. + # First run completes; before run 2 the deadline check kicks in. + avg_ms, _std_ms, _progress = time_it_bounded( + run, runs=5, max_seconds=0.06, progress_getter=lambda: 1, progress_target=1 + ) + + assert 1 <= call_count[0] < 5 + assert avg_ms > 0 + + +def test_time_it_bounded_zero_runs_returns_zero(): + avg_ms, std_ms, progress = time_it_bounded( + lambda: None, runs=0, max_seconds=0.0, progress_getter=lambda: 0, progress_target=1 + ) + assert avg_ms == 0.0 + assert std_ms == 0.0 + assert progress == 0 + + +def test_time_it_bounded_stddev_positive_for_multiple_completed_runs(): + delays = iter([0.001, 0.02, 0.001]) + + def run(): + time.sleep(next(delays)) + + _avg_ms, std_ms, _progress = time_it_bounded( + run, runs=3, max_seconds=0.0, progress_getter=lambda: 1, progress_target=1 + ) + assert std_ms > 0.0 From 5553ea12d827bc1b43ed3232eb48f85e7806ae30 Mon Sep 17 00:00:00 2001 From: Kevin Boyd Date: Tue, 26 May 2026 15:44:12 -0400 Subject: [PATCH 3/5] Refactor substruct to reduce churn --- benchmarks/substruct_bench.py | 134 ++++++++++++++++------------------ 1 file changed, 64 insertions(+), 70 deletions(-) diff --git a/benchmarks/substruct_bench.py b/benchmarks/substruct_bench.py index 8d4abf20..c2003b63 100644 --- a/benchmarks/substruct_bench.py +++ b/benchmarks/substruct_bench.py @@ -276,7 +276,7 @@ def bench_rdkit_substruct( if max_matches > 0: params.maxMatches = max_matches - results_data: list = [] + results_data = [] pairs_done_this_run = 0 if threads > 1: @@ -291,15 +291,21 @@ def bench_rdkit_substruct( chunksize = max(1, len(mol_binaries) // (threads * 4)) @nvtx.annotate("substruct_run_mp", color="yellow") - def run() -> None: + def run(): nonlocal results_data, pairs_done_this_run with Pool(threads, initializer=_rdkit_worker_init, initargs=(query_binaries, max_matches)) as pool: results_data = pool.map(worker_func, mol_binaries, chunksize=chunksize) pairs_done_this_run = pairs_total else: + if mode == "hasSubstructMatch": + match_fn = lambda mol, query: mol.HasSubstructMatch(query, params) # noqa: E731 + elif mode == "countSubstructMatches": + match_fn = lambda mol, query: len(mol.GetSubstructMatches(query, params)) # noqa: E731 + else: + match_fn = lambda mol, query: mol.GetSubstructMatches(query, params) # noqa: E731 @nvtx.annotate("substruct_run", color="yellow") - def run() -> None: + def run(): nonlocal results_data, pairs_done_this_run results_data = [] pairs_done_this_run = 0 @@ -307,17 +313,7 @@ def run() -> None: for mol in mols: if run_deadline.expired(): break - mol_results: list = [] - if mode == "hasSubstructMatch": - for query in queries: - mol_results.append(mol.HasSubstructMatch(query, params)) - elif mode == "countSubstructMatches": - for query in queries: - mol_results.append(len(mol.GetSubstructMatches(query, params))) - else: - for query in queries: - mol_results.append(mol.GetSubstructMatches(query, params)) - results_data.append(mol_results) + results_data.append([match_fn(mol, query) for query in queries]) pairs_done_this_run += num_queries avg_ms, std_ms, last_pairs = time_it_bounded(run, runs, max_seconds, lambda: pairs_done_this_run, pairs_total) @@ -354,8 +350,30 @@ def bench_rdkit_substructlib( results_data = [[None] * num_queries for _ in range(num_mols)] pairs_done_this_run = 0 + if mode == "hasSubstructMatch": + + def fill_column(q_idx, query, matching_set): + for m_idx in range(num_mols): + results_data[m_idx][q_idx] = m_idx in matching_set + elif mode == "countSubstructMatches": + + def fill_column(q_idx, query, matching_set): + for m_idx in range(num_mols): + if m_idx in matching_set: + results_data[m_idx][q_idx] = len(mols[m_idx].GetSubstructMatches(query, params)) + else: + results_data[m_idx][q_idx] = 0 + else: + + def fill_column(q_idx, query, matching_set): + for m_idx in range(num_mols): + if m_idx in matching_set: + results_data[m_idx][q_idx] = mols[m_idx].GetSubstructMatches(query, params) + else: + results_data[m_idx][q_idx] = () + @nvtx.annotate("substructlib_run", color="yellow") - def run() -> None: + def run(): nonlocal results_data, pairs_done_this_run mol_holder = rdSubstructLibrary.CachedMolHolder() @@ -371,23 +389,8 @@ def run() -> None: for q_idx, query in enumerate(queries): if run_deadline.expired(): break - matching_indices = lib.GetMatches(query, numThreads=threads) - matching_set = set(matching_indices) - if mode == "hasSubstructMatch": - for m_idx in range(num_mols): - results_data[m_idx][q_idx] = m_idx in matching_set - elif mode == "countSubstructMatches": - for m_idx in range(num_mols): - if m_idx in matching_set: - results_data[m_idx][q_idx] = len(mols[m_idx].GetSubstructMatches(query, params)) - else: - results_data[m_idx][q_idx] = 0 - else: - for m_idx in range(num_mols): - if m_idx in matching_set: - results_data[m_idx][q_idx] = mols[m_idx].GetSubstructMatches(query, params) - else: - results_data[m_idx][q_idx] = () + matching_set = set(lib.GetMatches(query, numThreads=threads)) + fill_column(q_idx, query, matching_set) pairs_done_this_run += num_mols avg_ms, std_ms, last_pairs = time_it_bounded(run, runs, max_seconds, lambda: pairs_done_this_run, pairs_total) @@ -419,6 +422,34 @@ def _load_config_dataframe(config_path: str) -> list[dict]: return pd.read_csv(config_path).to_dict("records") +def _validate_matches(mode: str, nvmolkit_data, rdkit_data, num_mols: int, num_queries: int) -> None: + """Print per-cell agreement between nvmolkit and RDKit results for ``mode``.""" + matches = 0 + total = num_mols * num_queries + if mode == "hasSubstructMatch": + for t in range(num_mols): + for q in range(num_queries): + if bool(nvmolkit_data[t][q]) == rdkit_data[t][q]: + matches += 1 + label = "Boolean match agreement" + elif mode == "countSubstructMatches": + for t in range(num_mols): + for q in range(num_queries): + if int(nvmolkit_data[t][q]) == int(rdkit_data[t][q]): + matches += 1 + label = "Count agreement" + else: + for t in range(num_mols): + for q in range(num_queries): + nv_matches = set(tuple(m) for m in nvmolkit_data[t][q]) + rd_matches = set(rdkit_data[t][q]) + if nv_matches == rd_matches: + matches += 1 + label = "Full match agreement" + pct = 100.0 * matches / total if total > 0 else 0 + print(f" {label}: {matches}/{total} ({pct:.1f}%)") + + def main(): parser = argparse.ArgumentParser(description="Substructure search benchmark: nvmolkit vs RDKit SubstructMatch") parser.add_argument("--smiles", "-s", help="Path to SMILES file with molecules to search") @@ -867,44 +898,7 @@ def main(): ) else: print(f" Validating against {validation_key}") - nvmolkit_data = results["nvmolkit"][2] - rdkit_data = results[validation_key][2] - if mode == "hasSubstructMatch": - matches = 0 - total = 0 - for t in range(len(mols)): - for q in range(len(queries)): - nv_match = bool(nvmolkit_data[t][q]) - rd_match = rdkit_data[t][q] - if nv_match == rd_match: - matches += 1 - total += 1 - pct = 100.0 * matches / total if total > 0 else 0 - print(f" Boolean match agreement: {matches}/{total} ({pct:.1f}%)") - elif mode == "countSubstructMatches": - matches = 0 - total = 0 - for t in range(len(mols)): - for q in range(len(queries)): - nv_count = int(nvmolkit_data[t][q]) - rd_count = int(rdkit_data[t][q]) - if nv_count == rd_count: - matches += 1 - total += 1 - pct = 100.0 * matches / total if total > 0 else 0 - print(f" Count agreement: {matches}/{total} ({pct:.1f}%)") - else: - matches = 0 - total = 0 - for t in range(len(mols)): - for q in range(len(queries)): - nv_matches = set(tuple(m) for m in nvmolkit_data[t][q]) - rd_matches = set(rdkit_data[t][q]) - if nv_matches == rd_matches: - matches += 1 - total += 1 - pct = 100.0 * matches / total if total > 0 else 0 - print(f" Full match agreement: {matches}/{total} ({pct:.1f}%)") + _validate_matches(mode, results["nvmolkit"][2], results[validation_key][2], len(mols), len(queries)) if ran_nvmolkit: applied_batch_size = int(config.batchSize) From 5d243aa13290cbd80a71393614bc1515c47d2d97 Mon Sep 17 00:00:00 2001 From: Kevin Boyd Date: Wed, 27 May 2026 08:39:16 -0400 Subject: [PATCH 4/5] Address greptile comments --- benchmarks/substruct_bench.py | 9 +++++++-- benchmarks/tests/test_timing.py | 3 ++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/benchmarks/substruct_bench.py b/benchmarks/substruct_bench.py index c2003b63..09a5f989 100644 --- a/benchmarks/substruct_bench.py +++ b/benchmarks/substruct_bench.py @@ -930,9 +930,11 @@ def main(): nvmolkit_config_source = config_source if name == "nvmolkit" else "N/A" if is_rdkit: rdkit_match_mode, rdkit_threads = rdkit_variant_meta[name] + rdkit_max_seconds = args.rdkit_max_seconds else: rdkit_match_mode = "N/A" rdkit_threads = "N/A" + rdkit_max_seconds = "N/A" throughput = (pairs_done * 1000.0 / avg_ms) if avg_ms > 0 else 0.0 vs_rdkit = (throughput / baseline_throughput) if (not is_rdkit and baseline_throughput > 0) else "N/A" csv_rows.append( @@ -956,7 +958,7 @@ def main(): avg_ms, std_ms, pairs_done, - args.rdkit_max_seconds, + rdkit_max_seconds, throughput, vs_rdkit, ) @@ -1001,11 +1003,14 @@ def main(): vs_rdkit, ) = row vs_rdkit_str = f"{vs_rdkit:.4f}" if isinstance(vs_rdkit, float) else str(vs_rdkit) + rdkit_max_seconds_str = ( + f"{rdkit_max_seconds:g}" if isinstance(rdkit_max_seconds, float) else str(rdkit_max_seconds) + ) print( f"{name},{mode},{smarts_path},{input_file},{input_type},{sanitize}," f"{num_mols},{num_patterns},{max_matches},{batch_size},{num_gpus},{workers},{prep_threads}," f"{nvmolkit_config_source},{rdkit_threads},{rdkit_match_mode},{avg_ms:.2f},{std_ms:.2f}," - f"{pairs_done},{rdkit_max_seconds:g},{throughput:.2f},{vs_rdkit_str}" + f"{pairs_done},{rdkit_max_seconds_str},{throughput:.2f},{vs_rdkit_str}" ) diff --git a/benchmarks/tests/test_timing.py b/benchmarks/tests/test_timing.py index 2413e9f1..c46247c7 100644 --- a/benchmarks/tests/test_timing.py +++ b/benchmarks/tests/test_timing.py @@ -86,7 +86,8 @@ def run(): time.sleep(0.05) # 5 runs * 50ms = 250ms total, but budget is only 60ms. - # First run completes; before run 2 the deadline check kicks in. + # Run 1 completes at ~50ms (deadline check before run 2 still passes), run 2 + # completes at ~100ms, and the deadline check before run 3 stops the loop. avg_ms, _std_ms, _progress = time_it_bounded( run, runs=5, max_seconds=0.06, progress_getter=lambda: 1, progress_target=1 ) From dd75bfe6525a0e7153e7c3d1ccce03f2e76f4ec9 Mon Sep 17 00:00:00 2001 From: Kevin Boyd Date: Thu, 28 May 2026 18:19:42 -0400 Subject: [PATCH 5/5] Address review feedback --- benchmarks/bench_utils/timing.py | 15 ++++++------ benchmarks/substruct_bench.py | 13 ++++------ benchmarks/tests/test_timing.py | 41 ++++++++++++++++++++++++++++---- 3 files changed, 49 insertions(+), 20 deletions(-) diff --git a/benchmarks/bench_utils/timing.py b/benchmarks/bench_utils/timing.py index b70268cb..f04be567 100644 --- a/benchmarks/bench_utils/timing.py +++ b/benchmarks/bench_utils/timing.py @@ -120,7 +120,7 @@ def throughput_per_s(items: float, elapsed_ms: float) -> float: def time_it_bounded( - run: Callable[[], None], + run: Callable[[Deadline], None], runs: int, max_seconds: float, progress_getter: Callable[[], int], @@ -128,11 +128,12 @@ def time_it_bounded( ) -> tuple[float, float, int]: """Repeat ``run`` up to ``runs`` times, stopping early on budget exhaustion. - The supplied ``run`` closure is responsible for honouring the per-run - deadline internally (e.g. by checking a :class:`Deadline`). After each - invocation, ``progress_getter()`` reports how much of the workload was - actually completed; a value below ``progress_target`` is treated as a - partial run and further iterations are skipped. + A single :class:`Deadline` covering the whole call is constructed from + ``max_seconds`` and passed to ``run`` on every invocation; the closure + must poll it inside its inner work loop to honour the budget mid-run. + After each invocation, ``progress_getter()`` reports how much of the + workload was actually completed; a value below ``progress_target`` is + treated as a partial run and further iterations are skipped. Returns ``(avg_ms, std_ms, last_progress)``. ``avg`` and ``std`` are computed only over runs that completed end-to-end; if no full run @@ -146,7 +147,7 @@ def time_it_bounded( if deadline.expired(): break start = time.perf_counter() - run() + run(deadline) elapsed_ms = (time.perf_counter() - start) * 1000.0 last_progress = progress_getter() if last_progress < progress_target: diff --git a/benchmarks/substruct_bench.py b/benchmarks/substruct_bench.py index 09a5f989..c36e69f7 100644 --- a/benchmarks/substruct_bench.py +++ b/benchmarks/substruct_bench.py @@ -68,7 +68,6 @@ import nvtx import pandas as pd from bench_utils import ( - Deadline, add_rdkit_max_seconds_arg, throughput_per_s, time_it_bounded, @@ -291,7 +290,7 @@ def bench_rdkit_substruct( chunksize = max(1, len(mol_binaries) // (threads * 4)) @nvtx.annotate("substruct_run_mp", color="yellow") - def run(): + def run(_deadline): nonlocal results_data, pairs_done_this_run with Pool(threads, initializer=_rdkit_worker_init, initargs=(query_binaries, max_matches)) as pool: results_data = pool.map(worker_func, mol_binaries, chunksize=chunksize) @@ -305,13 +304,12 @@ def run(): match_fn = lambda mol, query: mol.GetSubstructMatches(query, params) # noqa: E731 @nvtx.annotate("substruct_run", color="yellow") - def run(): + def run(deadline): nonlocal results_data, pairs_done_this_run results_data = [] pairs_done_this_run = 0 - run_deadline = Deadline(max_seconds) for mol in mols: - if run_deadline.expired(): + if deadline.expired(): break results_data.append([match_fn(mol, query) for query in queries]) pairs_done_this_run += num_queries @@ -373,7 +371,7 @@ def fill_column(q_idx, query, matching_set): results_data[m_idx][q_idx] = () @nvtx.annotate("substructlib_run", color="yellow") - def run(): + def run(deadline): nonlocal results_data, pairs_done_this_run mol_holder = rdSubstructLibrary.CachedMolHolder() @@ -384,10 +382,9 @@ def run(): results_data = [[None] * num_queries for _ in range(num_mols)] pairs_done_this_run = 0 - run_deadline = Deadline(max_seconds) for q_idx, query in enumerate(queries): - if run_deadline.expired(): + if deadline.expired(): break matching_set = set(lib.GetMatches(query, numThreads=threads)) fill_column(q_idx, query, matching_set) diff --git a/benchmarks/tests/test_timing.py b/benchmarks/tests/test_timing.py index c46247c7..852040dc 100644 --- a/benchmarks/tests/test_timing.py +++ b/benchmarks/tests/test_timing.py @@ -46,7 +46,7 @@ def test_throughput_per_s_non_positive_elapsed_returns_nan(elapsed_ms): def test_time_it_bounded_runs_to_completion_when_progress_full(): call_count = [0] - def run(): + def run(_deadline): call_count[0] += 1 time.sleep(0.001) @@ -64,7 +64,7 @@ def test_time_it_bounded_stops_after_partial_run(): call_count = [0] progress = [10] - def run(): + def run(_deadline): call_count[0] += 1 progress[0] = 3 @@ -81,7 +81,7 @@ def run(): def test_time_it_bounded_stops_when_budget_exhausted_between_runs(): call_count = [0] - def run(): + def run(_deadline): call_count[0] += 1 time.sleep(0.05) @@ -98,7 +98,7 @@ def run(): def test_time_it_bounded_zero_runs_returns_zero(): avg_ms, std_ms, progress = time_it_bounded( - lambda: None, runs=0, max_seconds=0.0, progress_getter=lambda: 0, progress_target=1 + lambda _deadline: None, runs=0, max_seconds=0.0, progress_getter=lambda: 0, progress_target=1 ) assert avg_ms == 0.0 assert std_ms == 0.0 @@ -108,10 +108,41 @@ def test_time_it_bounded_zero_runs_returns_zero(): def test_time_it_bounded_stddev_positive_for_multiple_completed_runs(): delays = iter([0.001, 0.02, 0.001]) - def run(): + def run(_deadline): time.sleep(next(delays)) _avg_ms, std_ms, _progress = time_it_bounded( run, runs=3, max_seconds=0.0, progress_getter=lambda: 1, progress_target=1 ) assert std_ms > 0.0 + + +def test_time_it_bounded_shared_deadline_caps_inner_loop(): + """Verify ``time_it_bounded`` exposes a single shared :class:`Deadline`. + + The ``run`` callback honours the budget mid-iteration, and a second call + receives the same (already-elapsed) deadline rather than a fresh one, + which is the property that makes ``max_seconds`` a true total cap. + """ + iterations_per_run: list[int] = [] + progress = [0] + + def run(deadline): + n_done = 0 + for _ in range(1000): + if deadline.expired(): + break + time.sleep(0.005) + n_done += 1 + iterations_per_run.append(n_done) + progress[0] = n_done + + _avg_ms, _std_ms, _progress = time_it_bounded( + run, runs=5, max_seconds=0.05, progress_getter=lambda: progress[0], progress_target=1000 + ) + + assert iterations_per_run, "run should have been invoked at least once" + # The first run consumes essentially the whole budget; any subsequent call + # must see an already-expired deadline and exit immediately. + for later_count in iterations_per_run[1:]: + assert later_count == 0