diff --git a/benchmarks/bench_utils/__init__.py b/benchmarks/bench_utils/__init__.py index 1960f93..4cabcc2 100644 --- a/benchmarks/bench_utils/__init__.py +++ b/benchmarks/bench_utils/__init__.py @@ -21,10 +21,19 @@ from bench_utils.loaders import load_pickle, load_sdf, load_smarts, load_smiles from bench_utils.molprep import clone_mols_with_conformers, embed_and_jitter, perturb_conformer, prep_mols -from bench_utils.timing import TimingResult, time_it +from bench_utils.timing import ( + Deadline, + TimingResult, + add_rdkit_max_seconds_arg, + throughput_per_s, + time_it, + time_it_bounded, +) __all__ = [ + "Deadline", "TimingResult", + "add_rdkit_max_seconds_arg", "clone_mols_with_conformers", "embed_and_jitter", "load_pickle", @@ -33,5 +42,7 @@ "load_smiles", "perturb_conformer", "prep_mols", + "throughput_per_s", "time_it", + "time_it_bounded", ] diff --git a/benchmarks/bench_utils/timing.py b/benchmarks/bench_utils/timing.py index 29c6738..f04be56 100644 --- a/benchmarks/bench_utils/timing.py +++ b/benchmarks/bench_utils/timing.py @@ -15,6 +15,7 @@ """Shared timing utilities for nvMolKit benchmarks.""" +import argparse import statistics import time from dataclasses import dataclass, field @@ -89,3 +90,94 @@ def sync() -> None: times_ms.append((t1 - t0) * 1000.0) return TimingResult(times_ms=times_ms) + + +class Deadline: + """Wall-clock budget that benchmark loops can poll for early termination. + + A ``max_seconds`` of ``0`` (or negative) disables the budget, in which + case :meth:`expired` always returns ``False``. Construction starts the + clock; pass the same instance to nested loops to share one deadline. + """ + + def __init__(self, max_seconds: float) -> None: + self._end: float | None = time.perf_counter() + max_seconds if max_seconds > 0 else None + + def expired(self) -> bool: + return self._end is not None and time.perf_counter() >= self._end + + @property + def active(self) -> bool: + """``True`` when a real budget is being enforced.""" + return self._end is not None + + +def throughput_per_s(items: float, elapsed_ms: float) -> float: + """Items per second from a millisecond count; ``NaN`` if ``elapsed_ms <= 0``.""" + if elapsed_ms <= 0: + return float("nan") + return items / (elapsed_ms / 1000.0) + + +def time_it_bounded( + run: Callable[[Deadline], None], + runs: int, + max_seconds: float, + progress_getter: Callable[[], int], + progress_target: int, +) -> tuple[float, float, int]: + """Repeat ``run`` up to ``runs`` times, stopping early on budget exhaustion. + + A single :class:`Deadline` covering the whole call is constructed from + ``max_seconds`` and passed to ``run`` on every invocation; the closure + must poll it inside its inner work loop to honour the budget mid-run. + After each invocation, ``progress_getter()`` reports how much of the + workload was actually completed; a value below ``progress_target`` is + treated as a partial run and further iterations are skipped. + + Returns ``(avg_ms, std_ms, last_progress)``. ``avg`` and ``std`` are + computed only over runs that completed end-to-end; if no full run + finished, the single partial timing is returned with ``std=0``. + """ + deadline = Deadline(max_seconds) + completed_times_ms: list[float] = [] + partial_time_ms: float | None = None + last_progress = 0 + for _ in range(runs): + if deadline.expired(): + break + start = time.perf_counter() + run(deadline) + elapsed_ms = (time.perf_counter() - start) * 1000.0 + last_progress = progress_getter() + if last_progress < progress_target: + partial_time_ms = elapsed_ms + break + completed_times_ms.append(elapsed_ms) + if completed_times_ms: + avg_ms = statistics.mean(completed_times_ms) + std_ms = statistics.pstdev(completed_times_ms) if len(completed_times_ms) > 1 else 0.0 + return avg_ms, std_ms, last_progress + if partial_time_ms is not None: + return partial_time_ms, 0.0, last_progress + return 0.0, 0.0, last_progress + + +def add_rdkit_max_seconds_arg(parser: argparse.ArgumentParser, *, extra_help: str = "") -> None: + """Register the shared ``--rdkit_max_seconds`` CLI flag. + + ``extra_help`` is appended to the standard help string so individual + benchmarks can describe how partial-run semantics apply to their RDKit + code path (e.g. per-molecule vs. per-query truncation). + """ + base_help = ( + "Stop the RDKit comparison after this many wall-clock seconds and " + "report throughput on the work actually completed. 0 disables the " + "cap and runs the full workload (default: 0)." + ) + parser.add_argument( + "--rdkit_max_seconds", + type=float, + default=0.0, + help=f"{base_help} {extra_help}".rstrip(), + ) diff --git a/benchmarks/etkdg_bench.py b/benchmarks/etkdg_bench.py index d2b9931..0c8402e 100644 --- a/benchmarks/etkdg_bench.py +++ b/benchmarks/etkdg_bench.py @@ -33,12 +33,15 @@ import nvtx from bench_utils import ( + Deadline, TimingResult, + add_rdkit_max_seconds_arg, clone_mols_with_conformers, load_pickle, load_sdf, load_smiles, prep_mols, + throughput_per_s, time_it, ) from nvmolkit import autotune as nv_autotune @@ -132,23 +135,38 @@ def bench_rdkit( confs_per_mol: int, runs: int, warmup: bool, -) -> tuple[TimingResult, list[Chem.Mol]]: - """Benchmark RDKit ``EmbedMultipleConfs``; return ``(timing, last_run_mols)``.""" + max_seconds: float = 0.0, +) -> tuple[TimingResult, list[Chem.Mol], int]: + """Benchmark RDKit ``EmbedMultipleConfs``; return ``(timing, processed_mols, processed_count)``. + + When ``max_seconds > 0``, the inner loop stops processing molecules once + wall-clock elapsed exceeds the cap. The reported timing is over the + molecules actually processed; throughput is items / elapsed at the call + site. Cloned molecules that were never processed are omitted from the + returned list so downstream energy validation only sees comparable inputs. + """ last_run_mols: list[list[Chem.Mol]] = [[]] + processed_count = [0] @nvtx.annotate("etkdg_rdkit_run", color="yellow") def run() -> None: cloned = clone_mols_with_conformers(mols) + deadline = Deadline(max_seconds) + n_done = 0 for mol in cloned: rdDistGeom.EmbedMultipleConfs(mol, numConfs=confs_per_mol, params=params) - last_run_mols[0] = cloned + n_done += 1 + if deadline.expired(): + break + last_run_mols[0] = cloned[:n_done] + processed_count[0] = n_done if warmup: warmup_mol = Chem.RWMol(mols[0]) rdDistGeom.EmbedMultipleConfs(warmup_mol, numConfs=1, params=params) timing = time_it(run, runs=runs, warmups=0, gpu_sync=False) - return timing, last_run_mols[0] + return timing, last_run_mols[0], processed_count[0] def _build_etkdg_params(max_iterations: int, num_threads: int, seed: int) -> rdDistGeom.EmbedParameters: @@ -176,10 +194,11 @@ def _build_hardware_options( CSV_HEADER = ( - "method,input_file,input_type,num_mols,confs_per_mol,max_iterations," + "method,input_file,input_type,num_mols,mols_processed,confs_per_mol,max_iterations," "batch_size,batches_per_gpu,prep_threads,num_gpus,nvmolkit_config_source," - "rdkit_threads,time_ms,std_ms,conformers_generated,mean_energy_diff,median_energy_diff," - "energy_diff_pairs" + "rdkit_threads,rdkit_max_seconds,time_ms,std_ms,conformers_generated," + "confs_per_second,vs_rdkit_throughput_ratio," + "mean_energy_diff,median_energy_diff,energy_diff_pairs" ) @@ -219,6 +238,10 @@ def main() -> None: default=1, help="Threads passed to RDKit ETKDG via params.numThreads (default: 1)", ) + add_rdkit_max_seconds_arg( + parser, + extra_help="The RDKit ETKDG loop stops at the next molecule boundary once the budget is hit.", + ) parser.add_argument("--batch_size", "-b", type=int, default=1024, help="nvmolkit batch size (default: 1024)") parser.add_argument( @@ -455,10 +478,16 @@ def main() -> None: except ImportError as exc: print(f" nvmolkit: SKIPPED (import error: {exc})") + rdkit_processed_count = len(mols) if not args.no_rdkit: print("\nRunning RDKit ETKDG benchmark...") - rd_timing, rd_mols = bench_rdkit(mols, params, args.confs_per_mol, args.runs, args.warmup) - print(f" RDKit: {rd_timing.mean_ms:10.2f} ms (+/- {rd_timing.std_ms:.2f} ms)") + rd_timing, rd_mols, rdkit_processed_count = bench_rdkit( + mols, params, args.confs_per_mol, args.runs, args.warmup, max_seconds=args.rdkit_max_seconds + ) + print( + f" RDKit: {rd_timing.mean_ms:10.2f} ms (+/- {rd_timing.std_ms:.2f} ms)" + f" [processed {rdkit_processed_count}/{len(mols)} mols]" + ) results["rdkit"] = (rd_timing, rd_mols) if not results: @@ -467,11 +496,16 @@ def main() -> None: print("\n" + "=" * 70) print("Summary:") - baseline_ms = results["rdkit"][0].mean_ms if "rdkit" in results else None - for name, (timing, _) in results.items(): + rdkit_throughput_per_s: float | None = None + if "rdkit" in results and results["rdkit"][0].mean_ms > 0: + rdkit_throughput_per_s = throughput_per_s( + rdkit_processed_count * args.confs_per_mol, results["rdkit"][0].mean_ms + ) + for name, (timing, run_mols) in results.items(): speedup = "" - if baseline_ms is not None and name != "rdkit" and timing.mean_ms > 0: - speedup = f", {baseline_ms / timing.mean_ms:.1f}x vs RDKit" + if rdkit_throughput_per_s is not None and name != "rdkit" and timing.mean_ms > 0: + method_throughput = throughput_per_s(len(mols) * args.confs_per_mol, timing.mean_ms) + speedup = f", {method_throughput / rdkit_throughput_per_s:.1f}x vs RDKit (throughput)" print(f" {name:20s}: {timing.mean_ms:10.2f} ms (+/- {timing.std_ms:.2f} ms){speedup}") energy_mean = float("nan") @@ -493,21 +527,31 @@ def main() -> None: csv_rows: list[str] = [] for name, (timing, run_mols) in results.items(): is_nv = name == "nvmolkit" + is_rdkit = name == "rdkit" batch_size = applied_batch_size if is_nv else "N/A" batches_per_gpu = applied_batches_per_gpu if is_nv else "N/A" prep_threads = applied_prep_threads if is_nv else "N/A" num_gpus = applied_num_gpus if is_nv else "N/A" nvmolkit_config_source = config_source if is_nv else "N/A" - rdkit_threads = args.rdkit_threads if name == "rdkit" else "N/A" + rdkit_threads = args.rdkit_threads if is_rdkit else "N/A" + rdkit_max_seconds = args.rdkit_max_seconds if is_rdkit else "N/A" + mols_processed = rdkit_processed_count if is_rdkit else len(mols) confs_generated = _conformer_count(run_mols) + confs_per_second = throughput_per_s(mols_processed * args.confs_per_mol, timing.mean_ms) + if rdkit_throughput_per_s is not None and not is_rdkit and timing.mean_ms > 0: + vs_rdkit_throughput_ratio = f"{confs_per_second / rdkit_throughput_per_s:.4f}" + else: + vs_rdkit_throughput_ratio = "N/A" mean_diff = energy_mean if (diff_computed and is_nv) else "N/A" median_diff = energy_median if (diff_computed and is_nv) else "N/A" pairs = energy_pairs if (diff_computed and is_nv) else "N/A" csv_rows.append( - f"{name},{input_file},{input_type},{len(mols)},{args.confs_per_mol}," + f"{name},{input_file},{input_type},{len(mols)},{mols_processed},{args.confs_per_mol}," f"{args.max_iterations},{batch_size},{batches_per_gpu},{prep_threads},{num_gpus}," - f"{nvmolkit_config_source},{rdkit_threads},{timing.mean_ms:.2f},{timing.std_ms:.2f}," - f"{confs_generated},{mean_diff},{median_diff},{pairs}" + f"{nvmolkit_config_source},{rdkit_threads},{rdkit_max_seconds}," + f"{timing.mean_ms:.2f},{timing.std_ms:.2f}," + f"{confs_generated},{confs_per_second:.2f},{vs_rdkit_throughput_ratio}," + f"{mean_diff},{median_diff},{pairs}" ) print("\n\nCSV Results:") diff --git a/benchmarks/ff_optimize_bench.py b/benchmarks/ff_optimize_bench.py index 0292f6a..d12372c 100644 --- a/benchmarks/ff_optimize_bench.py +++ b/benchmarks/ff_optimize_bench.py @@ -38,12 +38,15 @@ import nvtx import torch from bench_utils import ( + Deadline, + add_rdkit_max_seconds_arg, clone_mols_with_conformers, embed_and_jitter, load_pickle, load_sdf, load_smiles, prep_mols, + throughput_per_s, time_it, ) from nvmolkit import autotune as nv_autotune @@ -141,8 +144,15 @@ def bench_rdkit( runs: int, warmup: bool, num_threads: int, -) -> tuple[float, float, list[float]]: - """Benchmark RDKit MMFF/UFF optimization; return ``(mean_ms, std_ms, energies)``.""" + max_seconds: float = 0.0, +) -> tuple[float, float, list[float], int]: + """Benchmark RDKit MMFF/UFF optimization; return ``(mean_ms, std_ms, energies, processed_mols)``. + + When ``max_seconds > 0`` the per-molecule loop stops once wall-clock + elapsed exceeds the cap. Throughput at the call site should be measured + as items / elapsed; the returned timing is over the molecules actually + processed. + """ if ff == "mmff": rdkit_optimize = lambda mol: AllChem.MMFFOptimizeMoleculeConfs( # noqa: E731 mol, numThreads=num_threads, maxIters=max_iters @@ -155,11 +165,21 @@ def bench_rdkit( raise ValueError(f"Unknown ff: {ff!r}") last_results: list[list[list[tuple[int, float]]]] = [[]] + processed_count = [0] @nvtx.annotate("ff_rdkit_run", color="yellow") def run() -> None: cloned = clone_mols_with_conformers(mols) - last_results[0] = [rdkit_optimize(mol) for mol in cloned] + deadline = Deadline(max_seconds) + out: list[list[tuple[int, float]]] = [] + n_done = 0 + for mol in cloned: + out.append(rdkit_optimize(mol)) + n_done += 1 + if deadline.expired(): + break + last_results[0] = out + processed_count[0] = n_done if warmup: warmup_mols = clone_mols_with_conformers(mols[: min(4, len(mols))]) @@ -170,7 +190,7 @@ def run() -> None: energies, not_converged = _flatten_rdkit_energies(last_results[0]) if not_converged > 0: print(f" RDKit: {not_converged} conformer(s) reported non-zero status (not converged)") - return result.mean_ms, result.std_ms, energies + return result.mean_ms, result.std_ms, energies, processed_count[0] def _build_hardware_options( @@ -188,9 +208,11 @@ def _build_hardware_options( CSV_HEADER = ( - "method,ff,input_file,input_type,num_mols,confs_per_mol,max_iters," + "method,ff,input_file,input_type,num_mols,mols_processed,confs_per_mol,max_iters," "batch_size,batches_per_gpu,prep_threads,num_gpus,nvmolkit_config_source," - "rdkit_threads,time_ms,std_ms,energies_compared,mean_abs_energy_diff,max_abs_energy_diff" + "rdkit_threads,rdkit_max_seconds,time_ms,std_ms," + "confs_per_second,vs_rdkit_throughput_ratio," + "energies_compared,mean_abs_energy_diff,max_abs_energy_diff" ) @@ -232,6 +254,10 @@ def main() -> None: default=1, help="Threads passed to RDKit FF optimizer via numThreads (default: 1)", ) + add_rdkit_max_seconds_arg( + parser, + extra_help="The RDKit FF optimizer loop stops at the next molecule boundary once the budget is hit.", + ) parser.add_argument("--batch_size", "-b", type=int, default=1024, help="nvmolkit batch size (default: 1024)") parser.add_argument( @@ -457,12 +483,22 @@ def main() -> None: results["nvmolkit"] = (nv_avg, nv_std, nv_energies) torch.cuda.cudart().cudaProfilerStop() + rdkit_processed_count = len(mols) if not args.no_rdkit: print(f"\nRunning RDKit {args.ff.upper()} optimize benchmark...") - rd_avg, rd_std, rd_energies = bench_rdkit( - mols, args.ff, args.max_iters, args.runs, args.warmup, args.rdkit_threads + rd_avg, rd_std, rd_energies, rdkit_processed_count = bench_rdkit( + mols, + args.ff, + args.max_iters, + args.runs, + args.warmup, + args.rdkit_threads, + max_seconds=args.rdkit_max_seconds, + ) + print( + f" RDKit: {rd_avg:10.2f} ms (+/- {rd_std:.2f} ms)" + f" [processed {rdkit_processed_count}/{len(mols)} mols]" ) - print(f" RDKit: {rd_avg:10.2f} ms (+/- {rd_std:.2f} ms)") results["rdkit"] = (rd_avg, rd_std, rd_energies) if not results: @@ -471,11 +507,14 @@ def main() -> None: print("\n" + "=" * 70) print("Summary:") - baseline_ms = results.get("rdkit", (None, None, None))[0] + rdkit_throughput_per_s: float | None = None + if "rdkit" in results and results["rdkit"][0] > 0: + rdkit_throughput_per_s = throughput_per_s(rdkit_processed_count * args.confs_per_mol, results["rdkit"][0]) for name, (avg_ms, std_ms, _) in results.items(): speedup = "" - if baseline_ms is not None and name != "rdkit" and avg_ms > 0: - speedup = f", {baseline_ms / avg_ms:.1f}x vs RDKit" + if rdkit_throughput_per_s is not None and name != "rdkit" and avg_ms > 0: + method_throughput = throughput_per_s(len(mols) * args.confs_per_mol, avg_ms) + speedup = f", {method_throughput / rdkit_throughput_per_s:.1f}x vs RDKit (throughput)" print(f" {name:20s}: {avg_ms:10.2f} ms (+/- {std_ms:.2f} ms){speedup}") energy_mean = float("nan") @@ -506,19 +545,29 @@ def main() -> None: csv_rows: list[str] = [] for name, (avg_ms, std_ms, energies) in results.items(): is_nv = name == "nvmolkit" + is_rdkit = name == "rdkit" batch_size = applied_batch_size if is_nv else "N/A" batches_per_gpu = applied_batches_per_gpu if is_nv else "N/A" prep_threads = applied_prep_threads if is_nv else "N/A" num_gpus = applied_num_gpus if is_nv else "N/A" nvmolkit_config_source = config_source if is_nv else "N/A" - rdkit_threads = args.rdkit_threads if name == "rdkit" else "N/A" + rdkit_threads = args.rdkit_threads if is_rdkit else "N/A" + rdkit_max_seconds = args.rdkit_max_seconds if is_rdkit else "N/A" + mols_processed = rdkit_processed_count if is_rdkit else len(mols) + confs_per_second = throughput_per_s(mols_processed * args.confs_per_mol, avg_ms) + if rdkit_throughput_per_s is not None and not is_rdkit and avg_ms > 0: + vs_rdkit_throughput_ratio = f"{confs_per_second / rdkit_throughput_per_s:.4f}" + else: + vs_rdkit_throughput_ratio = "N/A" mean_diff = energy_mean if (args.validate and is_nv) else "N/A" max_diff = energy_max if (args.validate and is_nv) else "N/A" pairs = energy_pairs if (args.validate and is_nv) else "N/A" csv_rows.append( - f"{name},{args.ff},{input_file},{input_type},{len(mols)},{args.confs_per_mol}," + f"{name},{args.ff},{input_file},{input_type},{len(mols)},{mols_processed},{args.confs_per_mol}," f"{args.max_iters},{batch_size},{batches_per_gpu},{prep_threads},{num_gpus}," - f"{nvmolkit_config_source},{rdkit_threads},{avg_ms:.2f},{std_ms:.2f}," + f"{nvmolkit_config_source},{rdkit_threads},{rdkit_max_seconds}," + f"{avg_ms:.2f},{std_ms:.2f}," + f"{confs_per_second:.2f},{vs_rdkit_throughput_ratio}," f"{pairs},{mean_diff},{max_diff}" ) diff --git a/benchmarks/substruct_bench.py b/benchmarks/substruct_bench.py index 61e6915..a342431 100644 --- a/benchmarks/substruct_bench.py +++ b/benchmarks/substruct_bench.py @@ -47,6 +47,10 @@ # Use SubstructLibrary with native threading: python substruct_bench.py --smiles --smarts --rdkit_match_mode substructlib --rdkit_threads 8 + # Sweep multiple RDKit match modes and thread counts in one run: + python substruct_bench.py --smiles --smarts \ + --rdkit_match_mode raw substructlib --rdkit_threads 1 4 16 + # Run multiple configurations from a dataframe (smarts, batch_size, workers, prep_threads, mode, num_gpus): python substruct_bench.py --smiles --config @@ -61,7 +65,7 @@ import nvtx import pandas as pd -from bench_utils import load_pickle, load_smarts, load_smiles +from bench_utils import add_rdkit_max_seconds_arg, load_pickle, load_smarts, load_smiles, time_it_bounded from benchmark_timing import time_it as _time_it from nvmolkit import autotune as nv_autotune from nvmolkit.substructure import ( @@ -116,15 +120,32 @@ def _rdkit_worker_count(mol_binary: bytes) -> list[int]: @nvtx.annotate("bench_rdkit_substruct", color="green") def bench_rdkit_substruct( - mols: list[Chem.Mol], queries: list[Chem.Mol], runs: int, mode: str, max_matches: int, threads: int = 1 -) -> tuple[float, float, list]: - """Benchmark RDKit SubstructMatch API.""" + mols: list[Chem.Mol], + queries: list[Chem.Mol], + runs: int, + mode: str, + max_matches: int, + threads: int = 1, + max_seconds: float = 0.0, +) -> tuple[float, float, list, int]: + """Benchmark RDKit SubstructMatch API. + + @param max_seconds When > 0, abort additional runs (and the per-molecule + loop in single-threaded mode) once the elapsed time + exceeds this budget. The threaded path can only be + bounded between runs since `pool.map` is monolithic. + @return tuple of (avg_ms, std_ms, results_data, pairs_processed_per_run). + """ + num_mols = len(mols) + num_queries = len(queries) + pairs_total = num_mols * num_queries params = Chem.SubstructMatchParameters() params.uniquify = False if max_matches > 0: params.maxMatches = max_matches results_data = [] + pairs_done_this_run = 0 if threads > 1: mol_binaries = [mol.ToBinary() for mol in mols] @@ -138,47 +159,55 @@ def bench_rdkit_substruct( chunksize = max(1, len(mol_binaries) // (threads * 4)) @nvtx.annotate("substruct_run_mp", color="yellow") - def run(): - nonlocal results_data + def run(_deadline): + nonlocal results_data, pairs_done_this_run with Pool(threads, initializer=_rdkit_worker_init, initargs=(query_binaries, max_matches)) as pool: results_data = pool.map(worker_func, mol_binaries, chunksize=chunksize) + pairs_done_this_run = pairs_total else: + if mode == "hasSubstructMatch": + match_fn = lambda mol, query: mol.HasSubstructMatch(query, params) # noqa: E731 + elif mode == "countSubstructMatches": + match_fn = lambda mol, query: len(mol.GetSubstructMatches(query, params)) # noqa: E731 + else: + match_fn = lambda mol, query: mol.GetSubstructMatches(query, params) # noqa: E731 @nvtx.annotate("substruct_run", color="yellow") - def run(): - nonlocal results_data + def run(deadline): + nonlocal results_data, pairs_done_this_run results_data = [] - if mode == "hasSubstructMatch": - for mol in mols: - mol_results = [] - for query in queries: - mol_results.append(mol.HasSubstructMatch(query, params)) - results_data.append(mol_results) - elif mode == "countSubstructMatches": - for mol in mols: - mol_results = [] - for query in queries: - mol_results.append(len(mol.GetSubstructMatches(query, params))) - results_data.append(mol_results) - else: - for mol in mols: - mol_results = [] - for query in queries: - matches = mol.GetSubstructMatches(query, params) - mol_results.append(matches) - results_data.append(mol_results) - - avg_ms, std_ms = time_it(run, runs) - return avg_ms, std_ms, results_data + pairs_done_this_run = 0 + for mol in mols: + if deadline.expired(): + break + results_data.append([match_fn(mol, query) for query in queries]) + pairs_done_this_run += num_queries + + avg_ms, std_ms, last_pairs = time_it_bounded(run, runs, max_seconds, lambda: pairs_done_this_run, pairs_total) + return avg_ms, std_ms, results_data, last_pairs @nvtx.annotate("bench_rdkit_substructlib", color="green") def bench_rdkit_substructlib( - mols: list[Chem.Mol], queries: list[Chem.Mol], runs: int, mode: str, max_matches: int, threads: int = 1 -) -> tuple[float, float, list]: - """Benchmark RDKit SubstructLibrary API with native multithreading.""" + mols: list[Chem.Mol], + queries: list[Chem.Mol], + runs: int, + mode: str, + max_matches: int, + threads: int = 1, + max_seconds: float = 0.0, +) -> tuple[float, float, list, int]: + """Benchmark RDKit SubstructLibrary API with native multithreading. + + @param max_seconds When > 0, abort the per-query loop once the elapsed + time exceeds this budget. The library build itself + still runs to completion since `lib.GetMatches` is + the only point where partial results are well-defined. + @return tuple of (avg_ms, std_ms, results_data, pairs_processed_per_run). + """ num_mols = len(mols) num_queries = len(queries) + pairs_total = num_mols * num_queries params = Chem.SubstructMatchParameters() params.uniquify = False @@ -186,10 +215,33 @@ def bench_rdkit_substructlib( params.maxMatches = max_matches results_data = [[None] * num_queries for _ in range(num_mols)] + pairs_done_this_run = 0 + + if mode == "hasSubstructMatch": + + def fill_column(q_idx, query, matching_set): + for m_idx in range(num_mols): + results_data[m_idx][q_idx] = m_idx in matching_set + elif mode == "countSubstructMatches": + + def fill_column(q_idx, query, matching_set): + for m_idx in range(num_mols): + if m_idx in matching_set: + results_data[m_idx][q_idx] = len(mols[m_idx].GetSubstructMatches(query, params)) + else: + results_data[m_idx][q_idx] = 0 + else: + + def fill_column(q_idx, query, matching_set): + for m_idx in range(num_mols): + if m_idx in matching_set: + results_data[m_idx][q_idx] = mols[m_idx].GetSubstructMatches(query, params) + else: + results_data[m_idx][q_idx] = () @nvtx.annotate("substructlib_run", color="yellow") - def run(): - nonlocal results_data + def run(deadline): + nonlocal results_data, pairs_done_this_run mol_holder = rdSubstructLibrary.CachedMolHolder() fp_holder = rdSubstructLibrary.PatternHolder() @@ -198,34 +250,17 @@ def run(): lib.AddMol(mol) results_data = [[None] * num_queries for _ in range(num_mols)] + pairs_done_this_run = 0 - if mode == "hasSubstructMatch": - for q_idx, query in enumerate(queries): - matching_indices = lib.GetMatches(query, numThreads=threads) - matching_set = set(matching_indices) - for m_idx in range(num_mols): - results_data[m_idx][q_idx] = m_idx in matching_set - elif mode == "countSubstructMatches": - for q_idx, query in enumerate(queries): - matching_indices = lib.GetMatches(query, numThreads=threads) - matching_set = set(matching_indices) - for m_idx in range(num_mols): - if m_idx in matching_set: - results_data[m_idx][q_idx] = len(mols[m_idx].GetSubstructMatches(query, params)) - else: - results_data[m_idx][q_idx] = 0 - else: - for q_idx, query in enumerate(queries): - matching_indices = lib.GetMatches(query, numThreads=threads) - matching_set = set(matching_indices) - for m_idx in range(num_mols): - if m_idx in matching_set: - results_data[m_idx][q_idx] = mols[m_idx].GetSubstructMatches(query, params) - else: - results_data[m_idx][q_idx] = () + for q_idx, query in enumerate(queries): + if deadline.expired(): + break + matching_set = set(lib.GetMatches(query, numThreads=threads)) + fill_column(q_idx, query, matching_set) + pairs_done_this_run += num_mols - avg_ms, std_ms = time_it(run, runs) - return avg_ms, std_ms, results_data + avg_ms, std_ms, last_pairs = time_it_bounded(run, runs, max_seconds, lambda: pairs_done_this_run, pairs_total) + return avg_ms, std_ms, results_data, last_pairs @nvtx.annotate("bench_nvmolkit", color="red") @@ -253,6 +288,34 @@ def _load_config_dataframe(config_path: str) -> list[dict]: return pd.read_csv(config_path).to_dict("records") +def _validate_matches(mode: str, nvmolkit_data, rdkit_data, num_mols: int, num_queries: int) -> None: + """Print per-cell agreement between nvmolkit and RDKit results for ``mode``.""" + matches = 0 + total = num_mols * num_queries + if mode == "hasSubstructMatch": + for t in range(num_mols): + for q in range(num_queries): + if bool(nvmolkit_data[t][q]) == rdkit_data[t][q]: + matches += 1 + label = "Boolean match agreement" + elif mode == "countSubstructMatches": + for t in range(num_mols): + for q in range(num_queries): + if int(nvmolkit_data[t][q]) == int(rdkit_data[t][q]): + matches += 1 + label = "Count agreement" + else: + for t in range(num_mols): + for q in range(num_queries): + nv_matches = set(tuple(m) for m in nvmolkit_data[t][q]) + rd_matches = set(rdkit_data[t][q]) + if nv_matches == rd_matches: + matches += 1 + label = "Full match agreement" + pct = 100.0 * matches / total if total > 0 else 0 + print(f" {label}: {matches}/{total} ({pct:.1f}%)") + + def main(): parser = argparse.ArgumentParser(description="Substructure search benchmark: nvmolkit vs RDKit SubstructMatch") parser.add_argument("--smiles", "-s", help="Path to SMILES file with molecules to search") @@ -293,14 +356,30 @@ def main(): parser.add_argument( "--rdkit_match_mode", choices=["raw", "substructlib"], - default="raw", - help="RDKit matching mode: raw (direct API) or substructlib (SubstructLibrary) (default: raw)", + nargs="+", + default=["raw"], + help=( + "RDKit matching mode(s) to benchmark. Pass one or more of 'raw' / 'substructlib'; " + "every mode is combined with every value of --rdkit_threads. (default: raw)" + ), ) parser.add_argument( "--rdkit_threads", type=int, - default=1, - help="RDKit threads (multiprocessing for raw, native for substructlib) (default: 1)", + nargs="+", + default=[1], + help=( + "RDKit thread count(s) to benchmark (multiprocessing for raw, native for substructlib). " + "Pass multiple values to sweep; the cartesian product with --rdkit_match_mode is run. " + "(default: 1)" + ), + ) + add_rdkit_max_seconds_arg( + parser, + extra_help=( + "RDKit aborts between queries (substructlib) or molecules (raw, single-thread); " + "the threaded raw path can only be bounded between runs." + ), ) parser.add_argument("--batch_size", "-b", type=int, default=1024, help="nvmolkit batch size (default: 1024)") parser.add_argument("--workers", type=int, default=-1, help="nvmolkit GPU worker threads per GPU (-1 = auto)") @@ -424,8 +503,8 @@ def main(): print(f" Run nvmolkit: {not args.no_nvmolkit}") print(f" Run RDKit: {not args.no_rdkit}") if not args.no_rdkit: - print(f" RDKit match mode: {args.rdkit_match_mode}") - print(f" RDKit threads: {args.rdkit_threads}") + print(f" RDKit match modes: {args.rdkit_match_mode}") + print(f" RDKit thread counts: {args.rdkit_threads}") if args.config: print(f" Config dataframe: {args.config}") else: @@ -593,25 +672,48 @@ def main(): print("Running nvmolkit GPU benchmark...") nvmolkit_avg, nvmolkit_std, nvmolkit_results = bench_nvmolkit(mols, queries, args.runs, mode, config) print(f" nvmolkit: {nvmolkit_avg:10.2f} ms (± {nvmolkit_std:.2f} ms)") - results["nvmolkit"] = (nvmolkit_avg, nvmolkit_std, nvmolkit_results) + results["nvmolkit"] = (nvmolkit_avg, nvmolkit_std, nvmolkit_results, len(mols) * num_patterns) torch.cuda.cudart().cudaProfilerStop() except ImportError as e: print(f" nvmolkit: SKIPPED (import error: {e})") + rdkit_variants: list[tuple[str, str, int]] = [] if not args.no_rdkit: - if args.rdkit_match_mode == "substructlib": - print("\nRunning RDKit SubstructLibrary benchmark...") - rdkit_avg, rdkit_std, rdkit_results = bench_rdkit_substructlib( - mols, queries, args.runs, mode, args.max_matches, args.rdkit_threads - ) - else: - print("\nRunning RDKit SubstructMatch benchmark...") - rdkit_avg, rdkit_std, rdkit_results = bench_rdkit_substruct( - mols, queries, args.runs, mode, args.max_matches, args.rdkit_threads - ) - print(f" RDKit: {rdkit_avg:10.2f} ms (± {rdkit_std:.2f} ms)") - results["rdkit"] = (rdkit_avg, rdkit_std, rdkit_results) + pairs_total = len(mols) * num_patterns + for rdkit_mode in args.rdkit_match_mode: + for rdkit_threads in args.rdkit_threads: + variant_key = f"rdkit_{rdkit_mode}_t{rdkit_threads}" + if rdkit_mode == "substructlib": + print(f"\nRunning RDKit SubstructLibrary benchmark (threads={rdkit_threads})...") + rdkit_avg, rdkit_std, rdkit_results, rdkit_pairs = bench_rdkit_substructlib( + mols, + queries, + args.runs, + mode, + args.max_matches, + rdkit_threads, + args.rdkit_max_seconds, + ) + else: + print(f"\nRunning RDKit SubstructMatch benchmark (threads={rdkit_threads})...") + rdkit_avg, rdkit_std, rdkit_results, rdkit_pairs = bench_rdkit_substruct( + mols, + queries, + args.runs, + mode, + args.max_matches, + rdkit_threads, + args.rdkit_max_seconds, + ) + if rdkit_pairs < pairs_total: + print( + f" RDKit hit max_seconds budget: processed {rdkit_pairs}/{pairs_total} pairs " + f"({100.0 * rdkit_pairs / pairs_total:.1f}%) in {rdkit_avg:.2f} ms" + ) + print(f" {variant_key:24s}: {rdkit_avg:10.2f} ms (± {rdkit_std:.2f} ms)") + results[variant_key] = (rdkit_avg, rdkit_std, rdkit_results, rdkit_pairs) + rdkit_variants.append((variant_key, rdkit_mode, rdkit_threads)) print("\n" + "=" * 70) print("Summary:") @@ -621,57 +723,48 @@ def main(): sys.exit(1) baseline = None - if "rdkit" in results: - baseline = ("RDKit", results["rdkit"][0]) - - for name, (avg_ms, std_ms, _) in results.items(): + baseline_key = None + best_rdkit_throughput = 0.0 + for variant_key, _mode, _threads in rdkit_variants: + rdkit_avg_ms = results[variant_key][0] + rdkit_pairs_done = results[variant_key][3] + throughput = (rdkit_pairs_done * 1000.0 / rdkit_avg_ms) if rdkit_avg_ms > 0 else 0.0 + if throughput > best_rdkit_throughput: + best_rdkit_throughput = throughput + baseline = (variant_key, rdkit_avg_ms, throughput) + baseline_key = variant_key + + for name, (avg_ms, std_ms, _, pairs_done) in results.items(): speedup_str = "" - if baseline and name != "rdkit": - speedup = baseline[1] / avg_ms if avg_ms > 0 else 0 - speedup_str = f", {speedup:.1f}x vs {baseline[0]}" - print(f" {name:20s}: {avg_ms:10.2f} ms (± {std_ms:.2f} ms){speedup_str}") + throughput = (pairs_done * 1000.0 / avg_ms) if avg_ms > 0 else 0.0 + if baseline and name != baseline_key and not name.startswith("rdkit_"): + speedup = throughput / baseline[2] if baseline[2] > 0 else 0 + speedup_str = f", {speedup:.1f}x vs {baseline[0]} (throughput-normalised)" + print( + f" {name:24s}: {avg_ms:10.2f} ms (± {std_ms:.2f} ms), " + f"{pairs_done:,} pairs, {throughput:,.0f} pairs/s{speedup_str}" + ) + + validation_key = None + if rdkit_variants: + pairs_total = len(mols) * len(queries) + for variant_key, _mode, _threads in rdkit_variants: + if results[variant_key][3] >= pairs_total: + validation_key = variant_key + break - if args.validate and "nvmolkit" in results and "rdkit" in results: + if args.validate and "nvmolkit" in results and rdkit_variants: print("\nValidation:") - nvmolkit_data = results["nvmolkit"][2] - rdkit_data = results["rdkit"][2] - - if mode == "hasSubstructMatch": - matches = 0 - total = 0 - for t in range(len(mols)): - for q in range(len(queries)): - nv_match = bool(nvmolkit_data[t][q]) - rd_match = rdkit_data[t][q] - if nv_match == rd_match: - matches += 1 - total += 1 - pct = 100.0 * matches / total if total > 0 else 0 - print(f" Boolean match agreement: {matches}/{total} ({pct:.1f}%)") - elif mode == "countSubstructMatches": - matches = 0 - total = 0 - for t in range(len(mols)): - for q in range(len(queries)): - nv_count = int(nvmolkit_data[t][q]) - rd_count = int(rdkit_data[t][q]) - if nv_count == rd_count: - matches += 1 - total += 1 - pct = 100.0 * matches / total if total > 0 else 0 - print(f" Count agreement: {matches}/{total} ({pct:.1f}%)") + pairs_total = len(mols) * len(queries) + if validation_key is None: + print( + f" Skipping validation: every RDKit variant hit max_seconds budget before " + f"{pairs_total} pairs and the partial-result indices differ between " + "substructlib (per-query) and raw (per-mol)." + ) else: - matches = 0 - total = 0 - for t in range(len(mols)): - for q in range(len(queries)): - nv_matches = set(tuple(m) for m in nvmolkit_data[t][q]) - rd_matches = set(rdkit_data[t][q]) - if nv_matches == rd_matches: - matches += 1 - total += 1 - pct = 100.0 * matches / total if total > 0 else 0 - print(f" Full match agreement: {matches}/{total} ({pct:.1f}%)") + print(f" Validating against {validation_key}") + _validate_matches(mode, results["nvmolkit"][2], results[validation_key][2], len(mols), len(queries)) if ran_nvmolkit: applied_batch_size = int(config.batchSize) @@ -691,14 +784,25 @@ def main(): else: config_source = "cli" - for name, (avg_ms, std_ms, _) in results.items(): + rdkit_variant_meta = {key: (mode, threads) for key, mode, threads in rdkit_variants} + baseline_throughput = baseline[2] if baseline else 0.0 + + for name, (avg_ms, std_ms, _, pairs_done) in results.items(): + is_rdkit = name in rdkit_variant_meta batch_size = applied_batch_size if name == "nvmolkit" else "N/A" workers = applied_workers if name == "nvmolkit" else "N/A" prep_threads = applied_prep_threads if name == "nvmolkit" else "N/A" num_gpus = applied_num_gpus if name == "nvmolkit" else config_row["num_gpus"] nvmolkit_config_source = config_source if name == "nvmolkit" else "N/A" - rdkit_threads = args.rdkit_threads if name == "rdkit" else "N/A" - rdkit_match_mode = args.rdkit_match_mode if name == "rdkit" else "N/A" + if is_rdkit: + rdkit_match_mode, rdkit_threads = rdkit_variant_meta[name] + rdkit_max_seconds = args.rdkit_max_seconds + else: + rdkit_match_mode = "N/A" + rdkit_threads = "N/A" + rdkit_max_seconds = "N/A" + throughput = (pairs_done * 1000.0 / avg_ms) if avg_ms > 0 else 0.0 + vs_rdkit = (throughput / baseline_throughput) if (not is_rdkit and baseline_throughput > 0) else "N/A" csv_rows.append( ( name, @@ -719,6 +823,10 @@ def main(): rdkit_match_mode, avg_ms, std_ms, + pairs_done, + rdkit_max_seconds, + throughput, + vs_rdkit, ) ) @@ -732,7 +840,8 @@ def main(): print( "method,mode,smarts,input_file,input_type,sanitize,num_mols,num_patterns," "max_matches,batch_size,num_gpus,workers,prep_threads,nvmolkit_config_source," - "rdkit_threads,rdkit_match_mode,time_ms,std_ms" + "rdkit_threads,rdkit_match_mode,time_ms,std_ms," + "pairs_processed,rdkit_max_seconds,pairs_per_second,vs_rdkit_throughput_ratio" ) for row in csv_rows: ( @@ -754,11 +863,20 @@ def main(): rdkit_match_mode, avg_ms, std_ms, + pairs_done, + rdkit_max_seconds, + throughput, + vs_rdkit, ) = row + vs_rdkit_str = f"{vs_rdkit:.4f}" if isinstance(vs_rdkit, float) else str(vs_rdkit) + rdkit_max_seconds_str = ( + f"{rdkit_max_seconds:g}" if isinstance(rdkit_max_seconds, float) else str(rdkit_max_seconds) + ) print( f"{name},{mode},{smarts_path},{input_file},{input_type},{sanitize}," f"{num_mols},{num_patterns},{max_matches},{batch_size},{num_gpus},{workers},{prep_threads}," - f"{nvmolkit_config_source},{rdkit_threads},{rdkit_match_mode},{avg_ms:.2f},{std_ms:.2f}" + f"{nvmolkit_config_source},{rdkit_threads},{rdkit_match_mode},{avg_ms:.2f},{std_ms:.2f}," + f"{pairs_done},{rdkit_max_seconds_str},{throughput:.2f},{vs_rdkit_str}" ) diff --git a/benchmarks/tests/conftest.py b/benchmarks/tests/conftest.py new file mode 100644 index 0000000..ffff4e3 --- /dev/null +++ b/benchmarks/tests/conftest.py @@ -0,0 +1,17 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Pytest setup for benchmark unit tests. + +Bench scripts import the sibling ``bench_utils`` package without any +``sys.path`` manipulation because they run with ``benchmarks/`` as the +working directory. The same trick is needed for these tests, which live +one level deeper. +""" + +import sys +from pathlib import Path + +_BENCHMARKS_DIR = Path(__file__).resolve().parent.parent +if str(_BENCHMARKS_DIR) not in sys.path: + sys.path.insert(0, str(_BENCHMARKS_DIR)) diff --git a/benchmarks/tests/test_timing.py b/benchmarks/tests/test_timing.py new file mode 100644 index 0000000..852040d --- /dev/null +++ b/benchmarks/tests/test_timing.py @@ -0,0 +1,148 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Unit tests for the shared bench_utils timing helpers.""" + +import math +import time + +import pytest +from bench_utils.timing import Deadline, throughput_per_s, time_it_bounded + + +@pytest.mark.parametrize("max_seconds", [0.0, -1.0]) +def test_deadline_non_positive_is_disabled(max_seconds): + deadline = Deadline(max_seconds) + assert not deadline.active + assert not deadline.expired() + + +def test_deadline_expires_after_budget(): + deadline = Deadline(0.01) + assert deadline.active + assert not deadline.expired() + time.sleep(0.05) + assert deadline.expired() + + +def test_deadline_independent_instances(): + long = Deadline(10.0) + short = Deadline(0.01) + time.sleep(0.05) + assert short.expired() + assert not long.expired() + + +def test_throughput_per_s_simple_conversion(): + # 100 items in 500ms -> 200 items/s + assert throughput_per_s(100, 500.0) == pytest.approx(200.0) + + +@pytest.mark.parametrize("elapsed_ms", [0.0, -5.0]) +def test_throughput_per_s_non_positive_elapsed_returns_nan(elapsed_ms): + assert math.isnan(throughput_per_s(100, elapsed_ms)) + + +def test_time_it_bounded_runs_to_completion_when_progress_full(): + call_count = [0] + + def run(_deadline): + call_count[0] += 1 + time.sleep(0.001) + + avg_ms, std_ms, progress = time_it_bounded( + run, runs=3, max_seconds=0.0, progress_getter=lambda: 10, progress_target=10 + ) + + assert call_count[0] == 3 + assert avg_ms > 0 + assert std_ms >= 0 + assert progress == 10 + + +def test_time_it_bounded_stops_after_partial_run(): + call_count = [0] + progress = [10] + + def run(_deadline): + call_count[0] += 1 + progress[0] = 3 + + avg_ms, std_ms, last_progress = time_it_bounded( + run, runs=5, max_seconds=0.0, progress_getter=lambda: progress[0], progress_target=10 + ) + + assert call_count[0] == 1 + assert last_progress == 3 + assert std_ms == 0.0 + assert avg_ms > 0 + + +def test_time_it_bounded_stops_when_budget_exhausted_between_runs(): + call_count = [0] + + def run(_deadline): + call_count[0] += 1 + time.sleep(0.05) + + # 5 runs * 50ms = 250ms total, but budget is only 60ms. + # Run 1 completes at ~50ms (deadline check before run 2 still passes), run 2 + # completes at ~100ms, and the deadline check before run 3 stops the loop. + avg_ms, _std_ms, _progress = time_it_bounded( + run, runs=5, max_seconds=0.06, progress_getter=lambda: 1, progress_target=1 + ) + + assert 1 <= call_count[0] < 5 + assert avg_ms > 0 + + +def test_time_it_bounded_zero_runs_returns_zero(): + avg_ms, std_ms, progress = time_it_bounded( + lambda _deadline: None, runs=0, max_seconds=0.0, progress_getter=lambda: 0, progress_target=1 + ) + assert avg_ms == 0.0 + assert std_ms == 0.0 + assert progress == 0 + + +def test_time_it_bounded_stddev_positive_for_multiple_completed_runs(): + delays = iter([0.001, 0.02, 0.001]) + + def run(_deadline): + time.sleep(next(delays)) + + _avg_ms, std_ms, _progress = time_it_bounded( + run, runs=3, max_seconds=0.0, progress_getter=lambda: 1, progress_target=1 + ) + assert std_ms > 0.0 + + +def test_time_it_bounded_shared_deadline_caps_inner_loop(): + """Verify ``time_it_bounded`` exposes a single shared :class:`Deadline`. + + The ``run`` callback honours the budget mid-iteration, and a second call + receives the same (already-elapsed) deadline rather than a fresh one, + which is the property that makes ``max_seconds`` a true total cap. + """ + iterations_per_run: list[int] = [] + progress = [0] + + def run(deadline): + n_done = 0 + for _ in range(1000): + if deadline.expired(): + break + time.sleep(0.005) + n_done += 1 + iterations_per_run.append(n_done) + progress[0] = n_done + + _avg_ms, _std_ms, _progress = time_it_bounded( + run, runs=5, max_seconds=0.05, progress_getter=lambda: progress[0], progress_target=1000 + ) + + assert iterations_per_run, "run should have been invoked at least once" + # The first run consumes essentially the whole budget; any subsequent call + # must see an already-expired deadline and exit immediately. + for later_count in iterations_per_run[1:]: + assert later_count == 0