diff --git a/examples/configs/beam_search_diverse.yaml b/examples/configs/beam_search_diverse.yaml new file mode 100644 index 00000000..1eaed666 --- /dev/null +++ b/examples/configs/beam_search_diverse.yaml @@ -0,0 +1,68 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Diverse beam search — multi-LLM, multi-sample expansion with PTX dedup. +# +# SPREAD variant — broad beam expansion. +# +# Layout per round: +# 5 expanding parents (top half of beam) +# × 3 bottlenecks +# × 3 LLMs (Claude / GPT / Gemini) +# × 2 samples per prompt +# = 90 worker processes, each producing one candidate kernel. +# +# Pairs with examples/configs/beam_search_diverse_concentrated.yaml +# (top-2 parents × 5 samples each, same 90-worker budget) for A/B +# comparison of "spread vs. concentrated" expansion strategies. +# +# After workers return: +# 1. Candidates (and the existing 10-kernel beam) are deduplicated by +# normalized-PTX fingerprint — kernels that compile to identical PTX +# collapse to the fastest representative. +# 2. The surviving pool is sorted by runtime. +# 3. The top 10 become the next round's beam. +# +# Usage: +# python examples/run_opt_manager.py \ +# --kernel-dir examples/optimize_01_matvec \ +# --config examples/configs/beam_search_diverse.yaml + +strategy: beam_search +num_workers: 90 +strategy_config: + num_top_kernels: 10 # beam width (candidate pool size) + num_expanding_parents: 5 # spread expansion across top-5 + num_bottlenecks: 3 # 3 ranked bottlenecks per parent (post-plumbing-fix) + samples_per_prompt: 2 # two LLM draws per (parent, bottleneck, model) + # Three models routed via the Relay provider. Names not present in + # utils/providers/available_models.py are auto-routed to Relay by + # get_model_provider (see models.py:70–79), so the plugboard server + # resolves them. + models: + - claude-opus-4.6 + - gpt-5-4 + - gemini-2-5-pro + +# Default LLM (used when no per-candidate override is set; here it's +# overridden per-candidate via the `models` list above). +openai_model: claude-opus-4.6 +high_reasoning_effort: true + +# Worker configuration +benchmark_warmup: 25 +benchmark_repeat: 100 +divergence_threshold: 50.0 +target_platform: cuda +gpu_name: "NVIDIA H100 NVL 94GB" diff --git a/examples/configs/beam_search_diverse_concentrated.yaml b/examples/configs/beam_search_diverse_concentrated.yaml new file mode 100644 index 00000000..c10f5311 --- /dev/null +++ b/examples/configs/beam_search_diverse_concentrated.yaml @@ -0,0 +1,64 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 + +# CONCENTRATED variant — push the leaders harder. +# +# Layout per round: +# 2 expanding parents (top of beam) +# × 3 bottlenecks +# × 3 LLMs (Claude / GPT / Gemini) +# × 5 samples per prompt +# = 90 worker processes, each producing one candidate kernel. +# +# Same 90-worker budget as beam_search_diverse.yaml (the spread variant) +# but reallocated toward more LLM draws per (parent, bottleneck, model) +# triple instead of more parents. Use this when you trust the leader- +# kernel pair and want to squeeze them rather than explore broadly. +# +# Motivation: in our smoke A/B, Design A (P=1, C=10) outperformed +# Design B (P=2, C=1) on the same problem and budget. This config is +# the production-scale version of that observation: keep enough beam- +# diversity to dedup against (P=2, beam=10) but spend most of the budget +# concentrating attempts on the current leaders. +# +# After workers return: +# 1. Candidates (and the existing 10-kernel beam) are deduplicated by +# normalized-PTX fingerprint — kernels that compile to identical PTX +# collapse to the fastest representative. +# 2. The surviving pool is sorted by runtime. +# 3. The top 10 become the next round's beam. +# +# Usage: +# python examples/run_opt_manager.py \ +# --kernel-dir examples/optimize_01_matvec \ +# --strategy beam_search_diverse_concentrated + +strategy: beam_search +num_workers: 90 +strategy_config: + num_top_kernels: 10 # beam width (candidate pool size) + num_expanding_parents: 2 # concentrate on top-2 leaders + num_bottlenecks: 3 # 3 ranked bottlenecks per parent + samples_per_prompt: 5 # five LLM draws per (parent, bottleneck, model) + models: + # Three models routed via the Relay provider. + - claude-opus-4.6 + - gpt-5-4 + - gemini-2-5-pro + +# Default LLM (used when no per-candidate override is set; here it's +# overridden per-candidate via the `models` list above). +openai_model: claude-opus-4.6 +high_reasoning_effort: true + +# Worker configuration +benchmark_warmup: 25 +benchmark_repeat: 100 +divergence_threshold: 50.0 +target_platform: cuda +gpu_name: "NVIDIA H100 NVL 94GB" diff --git a/examples/configs/beam_search_diverse_smoke.yaml b/examples/configs/beam_search_diverse_smoke.yaml new file mode 100644 index 00000000..eaba2c01 --- /dev/null +++ b/examples/configs/beam_search_diverse_smoke.yaml @@ -0,0 +1,35 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 + +# Smoke-test variant of beam_search_diverse.yaml. +# +# Per-round fanout: 1 parent × 3 bottlenecks × 3 models × 1 sample = 9 workers. +# Exercises the bottleneck-plumbing fix — each (model, bottleneck_id) trio +# should produce 3 distinct optimization directions, not 3 copies of the +# top-ranked bottleneck. + +strategy: beam_search +num_workers: 9 +strategy_config: + num_top_kernels: 4 + num_expanding_parents: 1 + num_bottlenecks: 3 # ← drives BottleneckAnalyzer to request 3 ranked options + samples_per_prompt: 1 + models: + - claude-opus-4.6 + - gpt-5-4 + - gemini-2-5-pro + +openai_model: claude-opus-4.6 +high_reasoning_effort: true + +benchmark_warmup: 25 +benchmark_repeat: 100 +divergence_threshold: 50.0 +target_platform: cuda +gpu_name: "NVIDIA H100 NVL 94GB" diff --git a/triton_kernel_agent/opt_manager.py b/triton_kernel_agent/opt_manager.py index 495b550d..2ccd51f6 100644 --- a/triton_kernel_agent/opt_manager.py +++ b/triton_kernel_agent/opt_manager.py @@ -61,6 +61,48 @@ _MANAGER_LEVEL_KEYS = {"verifier", "benchmarker", "worker_runner"} +def _detect_gpus() -> list[int]: + """Return physical GPU ids visible to this process. + + Crucially this MUST NOT initialize the CUDA context in the manager + process — workers are spawned via ``mp.Process`` (fork on Linux), and + if the manager has already touched ``torch.cuda`` the children inherit + that locked-in device list and ignore any post-fork + ``CUDA_VISIBLE_DEVICES`` override. We use ``nvidia-smi`` subprocess + detection instead. + + Order of resolution: + 1. ``CUDA_VISIBLE_DEVICES`` env (respect user restriction). + 2. ``nvidia-smi --query-gpu=index --format=csv,noheader``. + 3. Fallback to ``[0]`` (single GPU). + """ + import os + import subprocess + + cvd = os.environ.get("CUDA_VISIBLE_DEVICES") + if cvd: + try: + ids = [int(x.strip()) for x in cvd.split(",") if x.strip()] + if ids: + return ids + except ValueError: + pass + try: + out = subprocess.run( + ["nvidia-smi", "--query-gpu=index", "--format=csv,noheader"], + capture_output=True, + text=True, + timeout=5, + ) + if out.returncode == 0: + ids = [int(x.strip()) for x in out.stdout.splitlines() if x.strip()] + if ids: + return ids + except Exception: + pass + return [0] + + @config_injectable class OptimizationManager: """Manages parallel kernel optimization with pluggable strategies. @@ -88,6 +130,7 @@ def __init__( high_reasoning_effort: bool = True, bottleneck_override: str | None = None, platform: dict[str, str] | str | None = None, + gpu_ids: list[int] | None = None, **worker_kwargs: Any, ): """Initialize the optimization manager. @@ -138,6 +181,20 @@ def __init__( strategy, strategy_config or {}, num_workers ) + # Forward the strategy's bottleneck-fanout knob to workers so the + # ``BottleneckAnalyzer`` actually requests that many ranked + # bottlenecks from the LLM. Without this, the strategy spawns N + # workers per parent (with bottleneck_id ∈ {1..N}) but the analyzer + # always returns 1, and workers with id>1 silently fall back to id=1. + if ( + strategy_config + and "num_bottlenecks" in strategy_config + and "num_bottlenecks_to_request" not in self.worker_kwargs + ): + self.worker_kwargs["num_bottlenecks_to_request"] = strategy_config[ + "num_bottlenecks" + ] + # Validate worker count if num_workers != self.strategy.num_workers_needed: raise ValueError( @@ -146,10 +203,24 @@ def __init__( ) self.num_workers = num_workers - self.benchmark_lock = mp.Lock() - # Semaphore to serialize NCU profiling - NCU requires exclusive GPU access - # and has high memory overhead, so only one worker should profile at a time - self.profiling_semaphore = mp.Semaphore(1) + + # Per-GPU lock pool — one lock per GPU does double duty for both + # benchmarking and NCU profiling (the two GPU-serialized + # operations). Workers running on different GPUs proceed in + # parallel; workers on the same GPU serialize. This is the + # "collapsed" multi-GPU design: a single shared object per GPU + # acts as both ``benchmark_lock`` and ``profiling_semaphore``. + self.gpu_ids: list[int] = list(gpu_ids) if gpu_ids else _detect_gpus() + self.gpu_locks: dict[int, Any] = {g: mp.Lock() for g in self.gpu_ids} + + # Manager-level GPU work (initial-kernel verify, PyTorch baselines, + # baseline NCU cache) runs in this process — it pins to the first + # GPU and uses that GPU's lock as both benchmark_lock and + # profiling_semaphore for back-compat with components that take + # those names. + _first_gpu = self.gpu_ids[0] + self.benchmark_lock = self.gpu_locks[_first_gpu] + self.profiling_semaphore = self.gpu_locks[_first_gpu] # Shared history across beam search iterations self.shared_history: list[ @@ -158,11 +229,17 @@ def __init__( self.shared_reflexions: list[dict] = [] # List of serialized Reflexion dicts self.history_size: int = 10 # Max history entries to pass to workers + # Per-parent baseline NCU/roofline cache, keyed by program_id. + # Populated in _run_workers; shared across all sibling workers of the + # same round (and surviving across rounds for beam members that stick). + self._baseline_profile_cache: dict[str, dict[str, Any] | None] = {} + # ── Platform components (resolved from registry) ───────── self._resolve_platform(platform) self.logger.info( - f"OptimizationManager initialized: strategy={strategy}, workers={num_workers}" + f"OptimizationManager initialized: strategy={strategy}, " + f"workers={num_workers}, gpus={self.gpu_ids}" ) # ------------------------------------------------------------------ @@ -177,6 +254,11 @@ def _resolve_platform(self, platform: dict[str, str] | str | None) -> None: Worker-level component names are forwarded to worker processes via ``self.worker_kwargs["platform_config"]`` so each worker can resolve its own instances from the registry. + + Additionally, the manager resolves its *own* ``profiler`` and + ``roofline_analyzer`` instances (without removing them from the + worker config) so it can profile baseline kernels once per round + and share the result across sibling workers. """ from triton_kernel_agent.platform.registry import registry @@ -205,11 +287,44 @@ def _resolve_platform(self, platform: dict[str, str] | str | None) -> None: high_reasoning_effort=self.high_reasoning_effort, bottleneck_override=self.bottleneck_override, worker_kwargs=self.worker_kwargs, + gpu_ids=self.gpu_ids, + gpu_locks=self.gpu_locks, ) self.verifier = components["verifier"] self.benchmarker = components["benchmarker"] self.worker_runner = components["worker_runner"] + # Resolve a manager-owned profiler + roofline_analyzer for the + # baseline-caching step. These coexist with the worker-level + # instances (workers still build their own from ``worker_config``). + self._mgr_profiler: Any | None = None + self._mgr_roofline: Any | None = None + for key, setter_attr in ( + ("profiler", "_mgr_profiler"), + ("roofline_analyzer", "_mgr_roofline"), + ): + impl_name = config.get(key) + if impl_name and registry.has(key, impl_name): + try: + setattr( + self, + setter_attr, + registry.create( + key, + impl_name, + logger=self.logger, + log_dir=self.log_dir, + artifacts_dir=self.log_dir / "baseline_profiles", + profiling_semaphore=self.profiling_semaphore, + ), + ) + except Exception as e: + self.logger.warning( + f"Failed to create manager-level {key}: {e}. " + f"Baseline NCU caching disabled; workers will profile " + f"their baselines individually." + ) + # Propagate worker-level config (string names) to worker # processes — each worker resolves its own instances via the # registry so there are no pickling issues. @@ -262,6 +377,9 @@ def _create_strategy( num_bottlenecks=config.get("num_bottlenecks", 2), database=self.database, logger=self.logger, + models=config.get("models"), + samples_per_prompt=config.get("samples_per_prompt", 1), + num_expanding_parents=config.get("num_expanding_parents"), ) elif name == "greedy": return GreedyStrategy( @@ -455,6 +573,14 @@ def _run_workers( pytorch_baseline: float, ) -> list[dict[str, Any]]: """Spawn workers for each candidate and collect results.""" + # Profile each distinct parent kernel once and share the NCU/roofline + # result across sibling workers. This avoids repeating an expensive, + # semaphore-serialized NCU run for every (bottleneck, model) fanout. + self._populate_baseline_cache(candidates, problem_file, round_num) + for cand in candidates: + parent_id = cand["parent"].program_id + cand["baseline_metrics"] = self._baseline_profile_cache.get(parent_id) + results = self.worker_runner.run_workers( candidates=candidates, round_num=round_num, @@ -488,3 +614,78 @@ def _run_workers( self.logger.debug(f"Traceback:\n{r.get('traceback')}") return results + + def _populate_baseline_cache( + self, + candidates: list[dict[str, Any]], + problem_file: Path, + round_num: int, + ) -> None: + """Profile each distinct parent kernel once and cache the result. + + The cache is keyed by ``parent.program_id`` and persists across + rounds, so a beam member that survives multiple rounds is profiled + at most once. If the manager-level profiler or roofline analyzer + is unavailable, this is a no-op and workers fall back to profiling + their own baselines. + """ + if self._mgr_profiler is None or self._mgr_roofline is None: + return + + from triton_kernel_agent.opt_worker_component.orchestrator.optimization_orchestrator import ( + _get_triton_kernel_metrics, + ) + + baseline_dir = self.log_dir / "baseline_profiles" + baseline_dir.mkdir(parents=True, exist_ok=True) + + # Collect (program_id, kernel_code) for parents we haven't cached yet. + # De-dup by program_id since many candidates share the same parent. + unseen: dict[str, str] = {} + for cand in candidates: + parent = cand["parent"] + pid = parent.program_id + if pid not in self._baseline_profile_cache and pid not in unseen: + unseen[pid] = parent.kernel_code + + for pid, kernel_code in unseen.items(): + try: + kernel_file = baseline_dir / f"{pid}.py" + kernel_file.write_text(kernel_code) + + profiler_results = self._mgr_profiler.profile_kernel( + kernel_file, problem_file, round_num + ) + if profiler_results is None or not getattr( + profiler_results, "metrics", None + ): + self._baseline_profile_cache[pid] = None + self.logger.warning( + f"Baseline profile failed for parent {pid}; " + f"workers will profile their own baselines." + ) + continue + + ncu_metrics = profiler_results.metrics + flat_metrics = _get_triton_kernel_metrics(ncu_metrics) + roofline_result = self._mgr_roofline.analyze(ncu_metrics=flat_metrics) + + self._baseline_profile_cache[pid] = { + "efficiency_pct": roofline_result.efficiency_pct, + "compute_sol_pct": roofline_result.compute_sol_pct, + "memory_sol_pct": roofline_result.memory_sol_pct, + "bottleneck": roofline_result.bottleneck, + "roofline_result": roofline_result, + "ncu_metrics": ncu_metrics, + } + self.logger.info( + f"Baseline profiled for parent {pid}: " + f"{roofline_result.bottleneck}-bound, " + f"{roofline_result.efficiency_pct:.1f}% SOL" + ) + except Exception as e: + self._baseline_profile_cache[pid] = None + self.logger.warning( + f"Baseline profile errored for parent {pid}: {e}; " + f"workers will profile their own baselines." + ) diff --git a/triton_kernel_agent/opt_worker.py b/triton_kernel_agent/opt_worker.py index 7703efc2..7c70a5a3 100644 --- a/triton_kernel_agent/opt_worker.py +++ b/triton_kernel_agent/opt_worker.py @@ -88,6 +88,12 @@ def __init__( # BeamSearch parameters (passed by opt_manager) bottleneck_id: int | None = None, bottleneck_override: str | None = None, + # How many bottlenecks the analyzer should request from the LLM. + # Strategies that fan out across multiple bottleneck ranks (e.g. beam + # search with num_bottlenecks > 1) need the analyzer to actually + # produce that many ranked options — otherwise sibling workers all + # silently fall back to the first bottleneck. + num_bottlenecks_to_request: int = 1, # Shared history from beam search manager prior_history: list[dict] | None = None, prior_reflexions: list[dict] | None = None, @@ -156,6 +162,7 @@ def __init__( # BeamSearch parameters self.bottleneck_id = bottleneck_id self.bottleneck_override = bottleneck_override + self.num_bottlenecks_to_request = max(1, int(num_bottlenecks_to_request)) # Shared history from beam search manager self.prior_history = prior_history or [] @@ -246,6 +253,7 @@ def _resolve_platform_config(self) -> None: openai_model=self.openai_model, gpu_name=self.gpu_name, roofline_config=self.roofline_config, + num_bottlenecks=self.num_bottlenecks_to_request, ) for k, v in resolved.items(): if k not in self._platform: @@ -308,6 +316,7 @@ def _init_components(self) -> None: gpu_specs=self.gpu_specs, logs_dir=self.log_dir, logger=self.logger, + num_bottlenecks=self.num_bottlenecks_to_request, ) # Verification worker (for correctness checks) @@ -361,6 +370,7 @@ def optimize_kernel( test_code: str | list[str], known_kernel_time: float | None = None, max_opt_rounds: int | None = None, + baseline_metrics: dict[str, Any] | None = None, ) -> tuple[bool, str, dict[str, Any]]: """ Run hardware-guided optimization on a kernel. @@ -373,6 +383,9 @@ def optimize_kernel( are additional tests. known_kernel_time: Known baseline time in ms (skip initial benchmark) max_opt_rounds: Maximum optimization rounds (defaults to self.max_rounds) + baseline_metrics: Optional pre-computed NCU profile + roofline for + ``kernel_code``. Forwarded to the orchestrator so it can skip + its own NCU run on the baseline. Returns: Tuple of (success, best_kernel_code, performance_metrics) @@ -424,4 +437,5 @@ def optimize_kernel( test_code=test_code, known_kernel_time=known_kernel_time, max_opt_rounds=max_opt_rounds, + baseline_metrics=baseline_metrics, ) diff --git a/triton_kernel_agent/opt_worker_component/benchmarking/benchmark.py b/triton_kernel_agent/opt_worker_component/benchmarking/benchmark.py index 9f8314ac..0b9faadc 100644 --- a/triton_kernel_agent/opt_worker_component/benchmarking/benchmark.py +++ b/triton_kernel_agent/opt_worker_component/benchmarking/benchmark.py @@ -20,14 +20,21 @@ import json import logging +import os +import shutil import subprocess import sys +import tempfile import traceback from pathlib import Path from typing import Any, Optional import torch +from triton_kernel_agent.opt_worker_component.searching.ptx_fingerprint import ( + ptx_hash_from_cache, +) + from triton_kernel_agent.opt_worker_component.benchmarking.timing import ( compute_timing_stats, prepare_pytorch_model, @@ -124,6 +131,7 @@ def benchmark_kernel( - time_ms: Mean time in ms - speedup: Speedup vs baseline """ + ptx_cache_dir = Path(tempfile.mkdtemp(prefix="triton_cache_bench_")) try: with self.lock_manager: results_json = self.artifacts_dir / "benchmark_results.json" @@ -148,11 +156,17 @@ def benchmark_kernel( if baseline_file: cmd.extend(["--baseline"]) + # Isolate this benchmark's Triton compilation cache so we can + # capture its PTX for fingerprint-based dedup without being + # contaminated by sibling workers' artifacts. + env = {**os.environ, "TRITON_CACHE_DIR": str(ptx_cache_dir)} + result = subprocess.run( cmd, capture_output=True, text=True, timeout=300, + env=env, ) if result.returncode != 0: @@ -162,7 +176,7 @@ def benchmark_kernel( or "Unknown error" ) self.logger.error(f"Kernel benchmark failed: {error_msg}") - return {"time_ms": float("inf"), "speedup": 0.0} + return {"time_ms": float("inf"), "speedup": 0.0, "ptx_hash": None} with open(results_json, "r") as f: results = json.load(f) @@ -170,14 +184,21 @@ def benchmark_kernel( kernel_name = kernel_file.stem kernel_results = results.get("kernels", {}).get(kernel_name, {}) + # Capture the PTX fingerprint from the isolated cache dir. + # A None result is graceful — dedup treats it as a singleton. + ptx_hash = ptx_hash_from_cache(ptx_cache_dir) + return { "time_ms": kernel_results.get("time_ms", float("inf")), "speedup": kernel_results.get("speedup", 1.0), + "ptx_hash": ptx_hash, } except Exception as e: self.logger.error(f"Kernel benchmark failed: {e}") - return {"time_ms": float("inf"), "speedup": 0.0} + return {"time_ms": float("inf"), "speedup": 0.0, "ptx_hash": None} + finally: + shutil.rmtree(ptx_cache_dir, ignore_errors=True) def benchmark_pytorch( self, diff --git a/triton_kernel_agent/opt_worker_component/orchestrator/optimization_orchestrator.py b/triton_kernel_agent/opt_worker_component/orchestrator/optimization_orchestrator.py index 219b60c2..5479e46b 100644 --- a/triton_kernel_agent/opt_worker_component/orchestrator/optimization_orchestrator.py +++ b/triton_kernel_agent/opt_worker_component/orchestrator/optimization_orchestrator.py @@ -315,6 +315,7 @@ def optimize_kernel( test_code: list[str], known_kernel_time: float | None = None, max_opt_rounds: int = 5, + baseline_metrics: dict[str, Any] | None = None, ) -> tuple[bool, str, dict[str, Any]]: """ Main optimization loop. @@ -325,6 +326,13 @@ def optimize_kernel( test_code: List of test code strings (primary + additional tests) known_kernel_time: Known performance of kernel_code in ms max_opt_rounds: Maximum optimization rounds + baseline_metrics: Optional pre-computed NCU profile + roofline result + for ``kernel_code``. When supplied, the orchestrator skips its + own NCU runs on the baseline (used when the manager shares a + single profile across sibling workers in the same round). + Expected keys: ``efficiency_pct``, ``compute_sol_pct``, + ``memory_sol_pct``, ``bottleneck``, ``roofline_result``, + ``ncu_metrics``. Returns: Tuple of (success, best_kernel_code, performance_metrics) @@ -342,6 +350,10 @@ def optimize_kernel( early_stop_reason = "" any_verified = False + # Cached baseline NCU — consumed at most once by _profile_and_analyze + # when it runs on the identical baseline kernel (round 1 only). + self._pending_baseline_metrics: dict[str, Any] | None = baseline_metrics + # Reset roofline history for new optimization run self.roofline_analyzer.reset_history() @@ -354,7 +366,9 @@ def optimize_kernel( # Benchmark baseline and PyTorch (now includes baseline SOL profiling) best_time, baseline_results, pytorch_baseline_time, baseline_sol = ( - self._benchmark_baseline(kernel_code, problem_file, known_kernel_time) + self._benchmark_baseline( + kernel_code, problem_file, known_kernel_time, baseline_metrics + ) ) # Two-kernel tracking: track best-by-runtime and best-by-SOL independently @@ -362,6 +376,7 @@ def optimize_kernel( best_runtime_kernel = kernel_code best_runtime_time = best_time best_runtime_sol = baseline_sol + best_runtime_ptx_hash: str | None = None best_sol_kernel = kernel_code best_sol_time = best_time @@ -504,6 +519,7 @@ def optimize_kernel( kernel_file_round, problem_file ) new_time = bench_results["time_ms"] + new_ptx_hash = bench_results.get("ptx_hash") # Profile the NEW kernel to get its SOL metrics new_kernel_metrics = self._profile_kernel_for_sol( @@ -578,12 +594,19 @@ def optimize_kernel( round_num, ) - # Track metadata when new best runtime is found - if new_time < best_runtime_time or new_sol > best_sol_sol: + # Track metadata when new best runtime is found. Note: + # ``best_runtime_time`` has just been updated by + # ``_update_kernels``, so we compare via the post-update + # ``best_runtime_kernel`` identity instead of revisiting + # ``new_time < best_runtime_time`` (which would always be false + # at this point). + if best_runtime_kernel == optimized_kernel or new_sol > best_sol_sol: best_round_num = round_num best_bottleneck_category = primary.category if new_kernel_metrics: best_ncu_metrics = new_kernel_metrics.get("ncu_metrics") + if best_runtime_kernel == optimized_kernel: + best_runtime_ptx_hash = new_ptx_hash # Roofline check for early termination # Use best_runtime kernel's SOL for early termination check @@ -653,13 +676,23 @@ def optimize_kernel( best_round_num, early_stop_reason, any_verified, + best_runtime_ptx_hash=best_runtime_ptx_hash, ) def _benchmark_baseline( - self, kernel_code: str, problem_file: Path, known_kernel_time: float | None + self, + kernel_code: str, + problem_file: Path, + known_kernel_time: float | None, + cached_baseline_metrics: dict[str, Any] | None = None, ) -> tuple[float, dict[str, float], float | None, float]: """Benchmark baseline kernel and PyTorch, and profile baseline SOL. + When ``cached_baseline_metrics`` is supplied, the NCU/roofline step + is skipped and the cached values are reused — this lets the manager + share a single baseline profile across sibling workers operating on + the same parent kernel. + Returns: Tuple of (best_time, baseline_results, pytorch_baseline_time, baseline_sol) """ @@ -683,8 +716,14 @@ def _benchmark_baseline( best_time = baseline_results["time_ms"] self.logger.info(f"📊 Baseline time: {best_time:.4f} ms") - # Profile baseline kernel for SOL metrics - baseline_metrics = self._profile_kernel_for_sol(kernel_code, problem_file, 0) + # Profile baseline kernel for SOL metrics (skip if cached) + if cached_baseline_metrics is not None: + baseline_metrics = cached_baseline_metrics + self.logger.info("📊 Baseline SOL: (using cached profile from manager)") + else: + baseline_metrics = self._profile_kernel_for_sol( + kernel_code, problem_file, 0 + ) if baseline_metrics: baseline_sol = baseline_metrics.get("efficiency_pct", 0.0) bottleneck = baseline_metrics.get("bottleneck", "unknown") @@ -726,22 +765,35 @@ def _profile_and_analyze( Tuple of (bottleneck_results, roofline_result, ncu_metrics). All can be None if profiling fails. """ - self.logger.info(f"[{round_num}] Profiling current kernel with NCU...") - kernel_file_round = self.artifact_dir / f"kernel_round_{round_num - 1}.py" - kernel_file_round.write_text(current_kernel) + # If the manager pre-profiled the baseline for us, consume it in round 1 + # (when current_kernel is still the baseline) and skip the NCU run. + cached = self._pending_baseline_metrics + if cached is not None and round_num == 1 and cached.get("ncu_metrics"): + self.logger.info( + f"[{round_num}] Using cached baseline NCU profile (skipping NCU)" + ) + # Still write the kernel file so downstream artifact paths are stable. + kernel_file_round = self.artifact_dir / f"kernel_round_{round_num - 1}.py" + kernel_file_round.write_text(current_kernel) + ncu_metrics = cached["ncu_metrics"] + self._pending_baseline_metrics = None # consume once + else: + self.logger.info(f"[{round_num}] Profiling current kernel with NCU...") + kernel_file_round = self.artifact_dir / f"kernel_round_{round_num - 1}.py" + kernel_file_round.write_text(current_kernel) - profiler_results = self.profiler.profile_kernel( - kernel_file_round, problem_file, round_num - ) + profiler_results = self.profiler.profile_kernel( + kernel_file_round, problem_file, round_num + ) - if profiler_results is None: - self.logger.warning(f"[{round_num}] Profiling failed") - return None, None, None + if profiler_results is None: + self.logger.warning(f"[{round_num}] Profiling failed") + return None, None, None - ncu_metrics = profiler_results.metrics + ncu_metrics = profiler_results.metrics - if not ncu_metrics: - return None, None, ncu_metrics + if not ncu_metrics: + return None, None, ncu_metrics # Run roofline analysis flat_metrics = next(iter(ncu_metrics.values()), {}) if ncu_metrics else {} @@ -1138,6 +1190,7 @@ def _finalize_results( best_round: int = 0, early_stop_reason: str = "", any_verified: bool = False, + best_runtime_ptx_hash: str | None = None, ) -> tuple[bool, str, dict[str, Any]]: """Finalize and log optimization results. @@ -1187,6 +1240,7 @@ def _finalize_results( "baseline_time_ms": baseline_results["time_ms"], "best_time_ms": best_runtime_time, "best_runtime_sol_pct": best_runtime_sol, + "best_ptx_hash": best_runtime_ptx_hash, "speedup": baseline_speedup, "rounds": rounds, } diff --git a/triton_kernel_agent/opt_worker_component/searching/history/json_db.py b/triton_kernel_agent/opt_worker_component/searching/history/json_db.py index 652b5291..1102d7c1 100644 --- a/triton_kernel_agent/opt_worker_component/searching/history/json_db.py +++ b/triton_kernel_agent/opt_worker_component/searching/history/json_db.py @@ -139,6 +139,7 @@ def _entry_to_dict(self, entry: ProgramEntry) -> dict[str, Any]: "problem_id": entry.problem_id, "parent_id": entry.parent_id, "generation": entry.generation, + "ptx_hash": entry.ptx_hash, "created_at": entry.created_at.isoformat(), } @@ -162,5 +163,6 @@ def _dict_to_entry(self, d: dict[str, Any]) -> ProgramEntry: problem_id=d["problem_id"], parent_id=d.get("parent_id"), generation=d.get("generation", 0), + ptx_hash=d.get("ptx_hash"), created_at=created_at, ) diff --git a/triton_kernel_agent/opt_worker_component/searching/history/models.py b/triton_kernel_agent/opt_worker_component/searching/history/models.py index 96e2d189..3eb266f3 100644 --- a/triton_kernel_agent/opt_worker_component/searching/history/models.py +++ b/triton_kernel_agent/opt_worker_component/searching/history/models.py @@ -42,6 +42,11 @@ class ProgramEntry: parent_id: str | None = None generation: int = 0 + # Normalized-PTX fingerprint used for dedup at beam selection time. + # ``None`` when PTX capture failed — such entries are treated as + # singletons (never merged with anything else) by the dedup step. + ptx_hash: str | None = None + # Timestamps created_at: datetime = field(default_factory=datetime.now) diff --git a/triton_kernel_agent/opt_worker_component/searching/ptx_fingerprint.py b/triton_kernel_agent/opt_worker_component/searching/ptx_fingerprint.py new file mode 100644 index 00000000..5da6bc6a --- /dev/null +++ b/triton_kernel_agent/opt_worker_component/searching/ptx_fingerprint.py @@ -0,0 +1,171 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 + +"""PTX-based kernel fingerprinting for dedup. + +Two Triton kernels that lower to identical (normalized) PTX are doing the +same work on the GPU — a strong equivalence relation for "is this the same +kernel?" that is invariant to variable renaming, comments, whitespace, and +most source-level cosmetic changes. + +Usage: + fingerprint = ptx_hash_from_cache(Path(triton_cache_dir)) + if fingerprint is None: + # PTX capture failed — treat as a singleton for dedup purposes. + ... + +Caveats: +- Raw PTX is not fully canonical; this module applies a normalization pass + (strip comments, debug, headers; canonicalize register/label names) before + hashing. Normalization is best-effort: the failure mode is a false + negative (two true duplicates hashed differently), which only reduces the + dedup rate — it does not cause incorrect merges. +- A kernel module may define multiple ``@triton.jit`` functions. This + module hashes the sorted concatenation of all normalized PTX strings + under the cache dir, so the fingerprint reflects the whole module. +""" + +from __future__ import annotations + +import hashlib +import re +from pathlib import Path + +# --- Normalization regex patterns ------------------------------------------- + +# Strip ``//`` line comments and ``/* */`` block comments. +_RE_LINE_COMMENT = re.compile(r"//[^\n]*") +_RE_BLOCK_COMMENT = re.compile(r"/\*.*?\*/", re.DOTALL) + +# PTX directives we want to drop entirely (debug + version/target headers). +# These vary with compile-time context but not with kernel behavior. +_DROP_DIRECTIVE_PREFIXES = ( + ".version", + ".target", + ".address_size", + ".loc", + ".file", + ".section .debug", +) + +# PTX register classes we canonicalize. Each class gets its own dense +# numbering starting from 0, assigned in first-occurrence order. +# %r — 32-bit unsigned +# %rd — 64-bit unsigned +# %rs — 16-bit unsigned +# %f — 32-bit float +# %fd — 64-bit float +# %p — predicate +_REGISTER_CLASSES = ("rd", "rs", "fd", "r", "f", "p") +# Regex fragment matching any register of any class, with the class captured. +# Order matters: longer prefixes ("rd", "rs", "fd") before shorter ones so we +# do not partial-match %rd42 as %r+"d42". +_RE_REGISTER = re.compile(r"%(rd|rs|fd|r|f|p)(\d+)\b") + +# PTX labels look like ``$L__BB0_3`` or ``$Ltmp1``; normalize by interning. +_RE_LABEL = re.compile(r"\$[A-Za-z_][A-Za-z0-9_]*") + +# Collapse runs of whitespace (including at start of line) to a single space; +# drop pure-blank lines. +_RE_WS = re.compile(r"[ \t]+") + + +def normalize_ptx(ptx: str) -> str: + """Return a canonical form of *ptx* suitable for hashing. + + Stripped: comments, debug directives, version/target headers. + Canonicalized: register names and labels (renamed to dense sequences). + Normalized: whitespace. + + The output is deterministic given the same input and (importantly) + identical for PTX strings that differ only in those cosmetic axes. + """ + text = _RE_BLOCK_COMMENT.sub("", ptx) + text = _RE_LINE_COMMENT.sub("", text) + + kept_lines: list[str] = [] + for raw in text.splitlines(): + stripped = raw.strip() + if not stripped: + continue + if any(stripped.startswith(p) for p in _DROP_DIRECTIVE_PREFIXES): + continue + kept_lines.append(stripped) + body = "\n".join(kept_lines) + + # Canonicalize registers. We do a single pass building a {old: new} + # mapping per class; substitution is applied once using re.sub. + reg_maps: dict[str, dict[str, int]] = {cls: {} for cls in _REGISTER_CLASSES} + + def _register_sub(m: re.Match[str]) -> str: + cls, num = m.group(1), m.group(2) + mapping = reg_maps[cls] + if num not in mapping: + mapping[num] = len(mapping) + return f"%{cls}{mapping[num]}" + + body = _RE_REGISTER.sub(_register_sub, body) + + # Canonicalize labels. Same first-occurrence renaming scheme. + label_map: dict[str, int] = {} + + def _label_sub(m: re.Match[str]) -> str: + name = m.group(0) + if name not in label_map: + label_map[name] = len(label_map) + return f"$L{label_map[name]}" + + body = _RE_LABEL.sub(_label_sub, body) + + # Final whitespace collapse. + body = _RE_WS.sub(" ", body) + + return body + + +def _find_ptx_files(cache_dir: Path) -> list[Path]: + """Return every ``*.ptx`` file under *cache_dir* (recursive).""" + if not cache_dir.exists(): + return [] + return sorted(cache_dir.rglob("*.ptx")) + + +def ptx_hash_from_cache(cache_dir: Path) -> str | None: + """Compute a stable hash of all PTX files under *cache_dir*. + + Returns ``None`` if no PTX files are present (e.g. compilation failed + or the cache dir was not populated). A ``None`` result tells callers + to treat the kernel as a dedup singleton. + + The hash covers the normalized PTX of every compiled Triton function + in the module, sorted by relative path for determinism across runs. + """ + ptx_files = _find_ptx_files(cache_dir) + if not ptx_files: + return None + + hasher = hashlib.sha256() + # Sort by relative path so the fingerprint is stable regardless of + # filesystem iteration order. + rel_sorted = sorted((p.relative_to(cache_dir).as_posix(), p) for p in ptx_files) + for rel, path in rel_sorted: + try: + normalized = normalize_ptx(path.read_text(errors="replace")) + except OSError: + continue + # Include the relative filename in the hash so two kernels with the + # same PTX content under different function names still differ. + # Strip the leading hash-bucket directory (Triton caches files in + # content-addressed subdirs, which we do not want in the fingerprint). + basename = Path(rel).name + hasher.update(basename.encode("utf-8")) + hasher.update(b"\0") + hasher.update(normalized.encode("utf-8")) + hasher.update(b"\0") + + return hasher.hexdigest() diff --git a/triton_kernel_agent/opt_worker_component/searching/strategy/beam_search.py b/triton_kernel_agent/opt_worker_component/searching/strategy/beam_search.py index 984736ef..39a034bf 100644 --- a/triton_kernel_agent/opt_worker_component/searching/strategy/beam_search.py +++ b/triton_kernel_agent/opt_worker_component/searching/strategy/beam_search.py @@ -14,8 +14,10 @@ """Beam search optimization strategy. -Maintains top-N kernels and explores M bottlenecks per kernel each round. -Total workers = N × M. +Maintains top-N kernels and explores M bottlenecks per kernel each round, +optionally fanned out across K distinct LLMs and C independent samples per +prompt. Total workers = P × M × K × C, where P is the number of beam +members expanded each round (defaults to all of them). """ import logging @@ -30,10 +32,24 @@ class BeamSearchStrategy(SearchStrategy): """Beam search strategy for kernel optimization. This strategy maintains a beam of top-performing kernels and explores - multiple bottleneck directions for each. It mirrors the original - beam search behavior from optimization_manager.py. + multiple bottleneck directions for each. Expansion can fan out across + several dimensions for diversity: - Workers = num_top_kernels × num_bottlenecks + - ``num_top_kernels`` (N): beam width kept round-to-round. + - ``num_expanding_parents`` (P): how many of those are expanded from + each round (defaults to N). Use ``P < N`` to concentrate expansion + on the leaders while keeping a wider dedup buffer in the beam. + - ``num_bottlenecks`` (M): bottleneck directions per parent. + - ``models`` (K): LLM providers to fan across; each generates its own + bottleneck analysis and rewrite. + - ``samples_per_prompt`` (C): independent LLM draws per (parent, + bottleneck, model) triple, to harvest sampling-level diversity. + + Workers per round = P × M × K × C. + + After workers return, candidates are deduplicated by PTX fingerprint + (same normalized compiled PTX ⇒ same kernel) before being ranked and + truncated to ``num_top_kernels``. """ def __init__( @@ -42,6 +58,9 @@ def __init__( num_bottlenecks: int = 2, database: ProgramDatabase | None = None, logger: logging.Logger | None = None, + models: list[str] | None = None, + samples_per_prompt: int = 1, + num_expanding_parents: int | None = None, ): """Initialize beam search strategy. @@ -50,6 +69,16 @@ def __init__( num_bottlenecks: Number of bottleneck directions to explore per kernel database: Optional program database for persistence logger: Optional logger + models: Optional list of LLM model names. When provided, every + (kernel, bottleneck) pair is expanded once per model. + samples_per_prompt: Number of independent LLM samples to draw + per (parent, bottleneck, model) triple. Values >1 rely on + the LLM being non-deterministic (temperature >0) to yield + distinct candidates. Default 1 preserves prior behavior. + num_expanding_parents: How many of the top-N beam members to + expand from each round. ``None`` (default) expands from + all beam members. Use a small value (e.g. 1) to focus + expansion on the leader while keeping a wider dedup buffer. """ self.logger = logger or logging.getLogger(self.__class__.__name__) self.problem_id: str | None = None @@ -57,11 +86,28 @@ def __init__( self.num_bottlenecks = num_bottlenecks self.database = database self.top_kernels: list[ProgramEntry] = [] + self.models = models + self.samples_per_prompt = max(1, samples_per_prompt) + self.num_expanding_parents = num_expanding_parents + # Internal iteration list: [None] means "use runner default". + self._expansion_models: list[str | None] = list(models) if models else [None] + + @property + def _effective_num_parents(self) -> int: + """How many beam members actually get expanded each round.""" + if self.num_expanding_parents is None: + return self.num_top_kernels + return min(self.num_expanding_parents, self.num_top_kernels) @property def num_workers_needed(self) -> int: - """Number of workers = top_kernels × bottlenecks.""" - return self.num_top_kernels * self.num_bottlenecks + """Number of workers = parents × bottlenecks × models × samples.""" + return ( + self._effective_num_parents + * self.num_bottlenecks + * len(self._expansion_models) + * self.samples_per_prompt + ) def initialize(self, initial_program: ProgramEntry) -> None: """Initialize with starting program. @@ -72,15 +118,23 @@ def initialize(self, initial_program: ProgramEntry) -> None: self.problem_id = initial_program.problem_id # Start with N copies of initial (will be deduplicated on first update) self.top_kernels = [initial_program] * self.num_top_kernels + models_str = ( + ", ".join(str(m) for m in self.models) if self.models else "" + ) self.logger.info( - f"BeamSearch initialized: {self.num_top_kernels} kernels × " - f"{self.num_bottlenecks} bottlenecks = {self.num_workers_needed} workers" + f"BeamSearch initialized: beam={self.num_top_kernels} " + f"parents={self._effective_num_parents} × " + f"{self.num_bottlenecks} bottlenecks × {len(self._expansion_models)} " + f"models [{models_str}] × {self.samples_per_prompt} samples " + f"= {self.num_workers_needed} workers" ) def select_candidates(self, round_num: int) -> list[dict[str, Any]]: """Select candidates for this round. - Creates one candidate for each (kernel, bottleneck) pair. + Creates one candidate for each (parent, bottleneck, model, sample) + tuple. Only the top ``num_expanding_parents`` beam members are + expanded; the rest stay in the beam purely for dedup and backup. Args: round_num: Current round number @@ -88,16 +142,21 @@ def select_candidates(self, round_num: int) -> list[dict[str, Any]]: Returns: List of candidate specs for workers """ - candidates = [] - for rank, kernel in enumerate(self.top_kernels): + parents_to_expand = self.top_kernels[: self._effective_num_parents] + candidates: list[dict[str, Any]] = [] + for rank, kernel in enumerate(parents_to_expand): for bottleneck_id in range(1, self.num_bottlenecks + 1): - candidates.append( - { - "parent": kernel, - "bottleneck_id": bottleneck_id, - "kernel_rank": rank, - } - ) + for model in self._expansion_models: + for sample_idx in range(self.samples_per_prompt): + candidates.append( + { + "parent": kernel, + "bottleneck_id": bottleneck_id, + "kernel_rank": rank, + "openai_model": model, + "sample_idx": sample_idx, + } + ) return candidates def update_with_results( @@ -112,38 +171,89 @@ def update_with_results( results: Worker results round_num: Current round number """ + # Build a (kernel_code → ptx_hash) lookup from the current beam. + # If a worker returned an *unchanged* parent (i.e. its LLM did not + # improve the kernel), the worker reports ptx_hash=None even though + # the parent's hash is already known to the strategy. Falling back + # to that hash lets PTX dedup correctly merge such results with + # the existing parent entry. + ptx_by_kernel_code: dict[str, str] = { + p.kernel_code: p.ptx_hash for p in self.top_kernels if p.ptx_hash + } + # Add successful results - new_entries = [] + new_entries: list[ProgramEntry] = [] + entry_models: dict[str, str | None] = {} for result in results: if result.get("success"): + program_id = f"r{round_num}_w{result['worker_id']}" + ptx_hash = result.get("ptx_hash") or ptx_by_kernel_code.get( + result.get("kernel_code", "") + ) entry = ProgramEntry( - program_id=f"r{round_num}_w{result['worker_id']}", + program_id=program_id, kernel_code=result["kernel_code"], metrics=ProgramMetrics(time_ms=result["time_ms"]), problem_id=self.problem_id, parent_id=result.get("parent_id"), generation=round_num, + ptx_hash=ptx_hash, ) new_entries.append(entry) + entry_models[program_id] = result.get("openai_model") if self.database: self.database.add_program(entry) - # Update top-k (combine, sort, truncate) + # Combine old beam + new results into the candidate pool. all_candidates = self.top_kernels + new_entries - all_candidates.sort(key=lambda x: x.metrics.time_ms) - self.top_kernels = all_candidates[: self.num_top_kernels] + + # Dedup by PTX fingerprint: entries with the same normalized PTX are + # the same kernel at the compiler level, so keep only the fastest + # per fingerprint. Entries with ``ptx_hash is None`` (capture + # failed) are treated as singletons so we never accidentally merge + # them together. + dedup_before = len(all_candidates) + pooled = self._dedup_by_ptx(all_candidates) + dedup_after = len(pooled) + + # Sort surviving representatives by runtime and truncate to beam width. + pooled.sort(key=lambda x: x.metrics.time_ms) + self.top_kernels = pooled[: self.num_top_kernels] if self.database: self.database.save() # Log update if new_entries: - best_new = min(e.metrics.time_ms for e in new_entries) + best_new_entry = min(new_entries, key=lambda e: e.metrics.time_ms) + best_new_model = entry_models.get(best_new_entry.program_id) + model_tag = f" [{best_new_model}]" if best_new_model else "" self.logger.info( f"Round {round_num}: {len(new_entries)} successful, " - f"best new: {best_new:.4f}ms" + f"PTX dedup {dedup_before}→{dedup_after}, " + f"best new: {best_new_entry.metrics.time_ms:.4f}ms{model_tag}" ) + @staticmethod + def _dedup_by_ptx(entries: list[ProgramEntry]) -> list[ProgramEntry]: + """Collapse entries with identical PTX fingerprints to the fastest. + + Entries whose ``ptx_hash`` is ``None`` are kept as-is (treated as + singletons) so we never merge two kernels whose PTX we couldn't + capture. + """ + best_by_hash: dict[str, ProgramEntry] = {} + singletons: list[ProgramEntry] = [] + for e in entries: + h = e.ptx_hash + if h is None: + singletons.append(e) + continue + incumbent = best_by_hash.get(h) + if incumbent is None or e.metrics.time_ms < incumbent.metrics.time_ms: + best_by_hash[h] = e + return list(best_by_hash.values()) + singletons + def get_best_program(self) -> ProgramEntry | None: """Get the best performing kernel in the beam.""" if not self.top_kernels: diff --git a/triton_kernel_agent/platform/nvidia.py b/triton_kernel_agent/platform/nvidia.py index 0c41955f..e2ed359f 100644 --- a/triton_kernel_agent/platform/nvidia.py +++ b/triton_kernel_agent/platform/nvidia.py @@ -192,6 +192,8 @@ def __init__( high_reasoning_effort: bool, bottleneck_override: str | None, worker_kwargs: dict[str, Any], + gpu_ids: list[int] | None = None, + gpu_locks: dict[int, Any] | None = None, ) -> None: self.log_dir = log_dir self.logger = logger @@ -201,6 +203,13 @@ def __init__( self.high_reasoning_effort = high_reasoning_effort self.bottleneck_override = bottleneck_override self.worker_kwargs = worker_kwargs + # Multi-GPU pool: workers round-robin across these GPUs and each + # uses its assigned GPU's lock for both benchmark and NCU. Falls + # back to legacy single-GPU behavior on GPU 0 when not provided. + self.gpu_ids: list[int] = list(gpu_ids) if gpu_ids else [0] + self.gpu_locks: dict[int, Any] = ( + dict(gpu_locks) if gpu_locks else {0: benchmark_lock} + ) def run_workers( self, @@ -219,6 +228,16 @@ def run_workers( workdir = self.log_dir / "workers" / f"w{i}" / f"r{round_num}" workdir.mkdir(parents=True, exist_ok=True) + worker_model = candidate.get("openai_model") or self.openai_model + baseline_metrics = candidate.get("baseline_metrics") + + # Round-robin GPU assignment. The same per-GPU lock is passed + # as both ``benchmark_lock`` and ``profiling_semaphore`` to the + # worker — collapsing the two GPU-serialization knobs into a + # single per-GPU mutex (one operation per GPU at a time). + gpu_id = self.gpu_ids[i % len(self.gpu_ids)] + gpu_lock = self.gpu_locks[gpu_id] + args = ( i, # worker_id candidate["parent"].kernel_code, @@ -229,47 +248,79 @@ def run_workers( workdir, workdir / "logs", result_queue, - self.benchmark_lock, - self.profiling_semaphore, + gpu_lock, # benchmark_lock + gpu_lock, # profiling_semaphore (same object, per-GPU mutex) pytorch_baseline, candidate["bottleneck_id"], - self.openai_model, + worker_model, self.high_reasoning_effort, self.bottleneck_override, self.worker_kwargs, shared_history, shared_reflexions, + baseline_metrics, + gpu_id, ) p = mp.Process(target=_nvidia_worker_process, args=args) p.start() workers.append(p) - # Wait for completion with timeout + # Wait for completion with timeout, draining the result queue as we + # go. We must not let the queue's pipe buffer fill up: a worker's + # ``mp.Queue.put`` enqueues data and a feeder thread serializes it + # over a pipe; if we don't read, the pipe fills, the feeder blocks, + # and the worker can't exit — which deadlocks ``join`` indefinitely. + # Polling the queue while polling joins keeps the pipe drained. + import queue as _queue_mod + worker_timeout = 1800 # 30 minutes deadline = time.time() + worker_timeout - for w in workers: - remaining = max(0, deadline - time.time()) - w.join(timeout=remaining) - if w.is_alive(): - self.logger.warning(f"Worker {w.pid} timed out, terminating") - w.terminate() - w.join(timeout=5) + results: list[dict[str, Any]] = [] + remaining_workers = list(workers) + + while remaining_workers and time.time() < deadline: + # Drain anything currently in the queue (non-blocking). + while True: + try: + results.append(result_queue.get_nowait()) + except _queue_mod.Empty: + break + except Exception: + break + # Reap any workers that have exited. Short timeout so we cycle + # back to draining the queue quickly. + still_alive = [] + for w in remaining_workers: + w.join(timeout=0.5) if w.is_alive(): - self.logger.warning(f"Worker {w.pid} still alive, killing") - w.kill() - w.join(timeout=2) + still_alive.append(w) + else: + w.close() + remaining_workers = still_alive + + # Anything still alive past the deadline is hung — terminate it. + for w in remaining_workers: + self.logger.warning(f"Worker {w.pid} timed out, terminating") + w.terminate() + w.join(timeout=5) + if w.is_alive(): + self.logger.warning(f"Worker {w.pid} still alive, killing") + w.kill() + w.join(timeout=2) w.close() - # Collect results - results: list[dict[str, Any]] = [] - while not result_queue.empty(): + # Final drain after every worker is gone, in case anything was + # placed on the queue between our last poll and the worker exit. + while True: try: results.append(result_queue.get_nowait()) + except _queue_mod.Empty: + break except Exception: break - # Clean up queue resources to prevent thread hangs during GC + # Clean up queue resources to prevent thread hangs during GC. result_queue.close() result_queue.join_thread() @@ -307,12 +358,27 @@ def _nvidia_worker_process( worker_kwargs: dict, prior_history: list[dict], prior_reflexions: list[dict], + baseline_metrics: dict[str, Any] | None, + gpu_id: int, ) -> None: """Worker process function for NVIDIA GPUs. Runs in a separate process to optimise a single kernel variant using NCU profiling and CUDA benchmarking. """ + import os + + # Pin this worker process to a single GPU before any torch import or + # GPU-touching subprocess. Both the benchmark subprocess and NCU + # subprocess inherit the env, so they automatically run on this GPU. + os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id) + # Print to harness log immediately so multi-GPU pinning is verifiable. + print( + f"[worker {worker_id}] pinned to GPU {gpu_id} " + f"(CUDA_VISIBLE_DEVICES={os.environ['CUDA_VISIBLE_DEVICES']})", + flush=True, + ) + import sys kernel_agent_path = Path(__file__).parent.parent.parent @@ -349,6 +415,7 @@ def _nvidia_worker_process( test_code=test_code, known_kernel_time=known_time, max_opt_rounds=1, + baseline_metrics=baseline_metrics, ) attempt_data = metrics.get("last_attempt") @@ -361,6 +428,8 @@ def _nvidia_worker_process( "kernel_code": best_kernel, "time_ms": metrics.get("best_time_ms", float("inf")), "parent_id": parent_id, + "openai_model": openai_model, + "ptx_hash": metrics.get("best_ptx_hash"), "attempt": attempt_data, "reflexion": reflexion_data, } @@ -371,6 +440,7 @@ def _nvidia_worker_process( { "success": False, "worker_id": worker_id, + "openai_model": openai_model, "error": str(e), "traceback": traceback.format_exc(), } @@ -501,11 +571,13 @@ def __init__( openai_model: str = "gpt-5", gpu_name: str | None = None, roofline_config: Any | None = None, + num_bottlenecks: int = 1, ) -> None: self._logger = logger or logging.getLogger(__name__) self._log_dir = Path(log_dir) if log_dir else None self._openai_model = openai_model self._gpu_name = gpu_name + self._num_bottlenecks = max(1, int(num_bottlenecks)) self._delegate: Any | None = None # Orchestrator accesses ``bottleneck_analyzer.roofline`` directly. self.roofline = NvidiaRooflineAnalyzer( @@ -533,6 +605,7 @@ def _get_delegate(self) -> Any: gpu_specs=gpu_specs, logs_dir=self._log_dir, logger=self._logger, + num_bottlenecks=self._num_bottlenecks, ) return self._delegate