diff --git a/docs/technique_clustering_design.md b/docs/technique_clustering_design.md new file mode 100644 index 00000000..5668b922 --- /dev/null +++ b/docs/technique_clustering_design.md @@ -0,0 +1,169 @@ +# Technique-vector clustering for beam-search dedup + +This document is the design for a *semantic* deduplication / diversity layer +that complements the existing PTX-hash dedup in `BeamSearchStrategy`. + +PTX-hash dedup answers "are two kernels byte-identical at the compiler +level?" — which is correct but conservative (different code that uses the +same techniques does not collapse). This layer answers "are two kernels +applying the same set of optimization techniques?", as judged by an LLM, +and uses that signal to keep beam diversity even after PTX-dedup. + +## Architecture + +``` +update_with_results pipeline (round N): + + pool = top_kernels + new_entries + │ + ▼ + _dedup_by_ptx ← byte-identical kernels collapse to fastest + │ + ▼ + classify survivors ← LLM emits binary technique vector per kernel + │ (only for entries without a cached vector) + ▼ + diversity-aware select ← walk sorted-by-time pool, keep one per cluster, + backfill remaining slots with fastest unaccepted + │ + ▼ + next round's top_kernels +``` + +PTX-dedup runs first because it's free (already cached on `ProgramEntry`) +and provably correct. The technique-vector layer runs only on the +survivors — typically 12–15 per round in our production runs — so the +extra LLM cost is small. + +## The technique taxonomy (expandable) + +Techniques are defined in a YAML file. Each entry has: + +```yaml +- name: split_k_reduce # short identifier (must be unique, stable) + description: | # one-line human description + Split-K with separate reduce kernel. + llm_hint: | # how the LLM should detect this technique + Two kernels in the same module: one writes partial K-slice dot + products to a scratch buffer; a separate reduce kernel sums them. +``` + +The default list (`examples/configs/techniques_default.yaml`) covers +Triton-visible techniques observed in our production runs and the +common-techniques checklist: + +1. split_k_reduce +2. tensor_cores (`tl.dot`, WMMA, WGMMA) +3. software_pipelining (`num_stages >= 2`) +4. swizzled_load (XOR / blocked-swizzle to dodge bank conflicts) +5. persistent_kernel (one-block-per-SM + work loop) +6. vectorized_load (vector dtype loads, `tl.load(...)` width >1) +7. shared_memory_tiling +8. register_tiling (small `BLOCK_M`/`BLOCK_N`, large `BLOCK_K`) +9. autotuned_config (`@triton.autotune`) +10. masked_access (`tl.load(..., mask=...)`) +11. atomic_reduction (`tl.atomic_add` for cross-block reductions) +12. precision_split (fp32 accumulator, lower-precision operands) +13. warp_specialization (producer/consumer split inside one kernel) +14. thread_block_clusters (Hopper cluster launch / DSMEM) +15. grid_swizzling (custom rasterization order for L2 reuse) +16. epilogue_fusion (post-matmul ops fused inside the kernel) +17. loop_unrolling (`#pragma unroll`, `tl.range(num_stages=…)`) +18. async_copy_tma (`cp.async`, TMA descriptor loads) + +Adding a technique = adding a YAML entry. The vector dimension is +`len(techniques)` at runtime; persisted vectors with mismatched length +are discarded and re-classified on the next round. + +## When the LLM classification fires + +Manager-side, after PTX dedup. Only entries whose `technique_vector` is +`None` (newly arrived this round, or whose vector was invalidated by a +schema change) get classified. Existing beam members keep their cached +vector — surviving from prior rounds is free. + +Classifications run in a `ThreadPoolExecutor` with a small concurrency +cap so the manager doesn't sit on the LLM provider sequentially. With +~12 new survivors per round and 4-way concurrency, expect 10–30 s of +manager-side wall time added per round. + +## Clustering rule: diversity-aware selection + +Not exact-vector dedup (too aggressive — would discard same-technique +kernels even when they have different runtimes). Not Hamming-threshold +clustering (extra parameter to tune, brittle). Instead: **keep the +fastest representative of each cluster, then backfill remaining beam +slots from the fastest unaccepted entries**. + +Algorithm (`select_diverse_top_k`): + +``` +sort pool by time_ms ascending +beam, seen_clusters = [], set() +for entry in pool: + cluster_id = tuple(entry.technique_vector or ()) + if cluster_id not in seen_clusters: + beam.append(entry); seen_clusters.add(cluster_id) + if len(beam) == K: break +# backfill if fewer than K distinct clusters exist +for entry in pool: + if len(beam) == K: break + if entry not in beam: beam.append(entry) +return beam +``` + +Equivalent semantics: every distinct technique vector represented in the +pool gets at least one beam slot (capped at K). This guarantees diverse +expansion in the next round. Entries whose `technique_vector` is `None` +(classification failed or disabled) are treated as their own singleton +cluster — never merged. + +## Configuration + +YAML preset (`examples/configs/beam_search_diverse_clustered.yaml`): + +```yaml +strategy: beam_search +strategy_config: + num_top_kernels: 10 + num_expanding_parents: 2 + num_bottlenecks: 3 + samples_per_prompt: 5 + models: [claude-opus-4.6, gpt-5-4, gemini-2-5-pro] + technique_clustering: + enabled: true + techniques_yaml: examples/configs/techniques_default.yaml + classifier_model: claude-opus-4.6 # one model for stable vectors + max_concurrency: 4 +``` + +Default behavior (no `technique_clustering` block): clustering is +disabled, behavior matches the prior PTX-only dedup. + +## Cost / payoff + +- **Cost.** ~12 new classifications × 8 rounds = ~96 extra LLM calls per + run, parallelized 4-way. ~5–10% of the run's existing LLM budget. + Persistence of vectors across rounds means surviving beam members + don't re-classify. +- **Payoff (hypothesis).** In the concentrated production run, rounds + 5–7 just re-discovered the round-4 winner. PTX-dedup correctly + collapsed those re-discoveries, but the *beam* still filled with + near-clones because every kernel in the pool was the leader. Diversity + -aware selection forces the other 9 beam slots to represent different + technique vectors, which changes what gets expanded in subsequent + rounds — potentially producing a second improvement axis. + +## Open questions + +- Schema migration when techniques YAML changes: V1 just discards old + vectors with mismatched length. A more durable approach hashes the + YAML and stores `(schema_id, vector)` so old vectors are correctly + identified as stale. Add only if needed. +- LLM-vector inconsistency: the same kernel classified by two different + models can produce different vectors. Mitigated by using a single + `classifier_model` per run; can revisit if vectors look noisy. +- Future: replace LLM with regex/AST-based classifier for deterministic + detection of structural techniques (split-K, autotune, atomic). Cheap + and consistent. LLM remains the fallback for fuzzy cases like "warp + specialization" or "swizzled load pattern". diff --git a/examples/configs/beam_search.yaml b/examples/configs/beam_search.yaml index 1d447927..f2706501 100644 --- a/examples/configs/beam_search.yaml +++ b/examples/configs/beam_search.yaml @@ -22,7 +22,7 @@ strategy: beam_search num_workers: 4 strategy_config: - num_top_kernels: 2 + candidate_pool_size: 2 num_bottlenecks: 2 openai_model: claude-opus-4.5 high_reasoning_effort: true diff --git a/examples/configs/beam_search_diverse.yaml b/examples/configs/beam_search_diverse.yaml new file mode 100644 index 00000000..106f2b15 --- /dev/null +++ b/examples/configs/beam_search_diverse.yaml @@ -0,0 +1,67 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Diverse beam search — multi-LLM, multi-sample expansion with PTX dedup. +# +# SPREAD variant — broad beam expansion. +# +# Layout per round: +# 5 expanding parents (top half of beam) +# × 3 bottlenecks +# × 3 LLMs (Claude / GPT / Gemini) +# × 2 samples per prompt +# = 90 worker processes, each producing one candidate kernel. +# +# Pairs with examples/configs/beam_search_diverse_concentrated.yaml +# (top-2 parents × 5 samples each, same 90-worker budget) for A/B +# comparison of "spread vs. concentrated" expansion strategies. +# +# After workers return: +# 1. Candidates (and the existing 10-kernel beam) are deduplicated by +# normalized-PTX fingerprint — kernels that compile to identical PTX +# collapse to the fastest representative. +# 2. The surviving pool is sorted by runtime. +# 3. The top 10 become the next round's beam. +# +# Usage: +# python examples/run_opt_manager.py \ +# --kernel-dir examples/optimize_01_matvec \ +# --config examples/configs/beam_search_diverse.yaml + +strategy: beam_search +num_workers: 90 +strategy_config: + candidate_pool_size: 5 # every member of the pool is expanded + num_bottlenecks: 3 # 3 ranked bottlenecks per parent (post-plumbing-fix) + samples_per_prompt: 2 # two LLM draws per (parent, bottleneck, model) + # Three models routed via the Relay provider. Names not present in + # utils/providers/available_models.py are auto-routed to Relay by + # get_model_provider (see models.py:70–79), so the plugboard server + # resolves them. + models: + - claude-opus-4.6 + - gpt-5-4 + - gemini-2-5-pro + +# Default LLM (used when no per-candidate override is set; here it's +# overridden per-candidate via the `models` list above). +openai_model: claude-opus-4.6 +high_reasoning_effort: true + +# Worker configuration +benchmark_warmup: 25 +benchmark_repeat: 100 +divergence_threshold: 50.0 +target_platform: cuda +gpu_name: "NVIDIA H100 NVL 94GB" diff --git a/examples/configs/beam_search_diverse_clustered.yaml b/examples/configs/beam_search_diverse_clustered.yaml new file mode 100644 index 00000000..8e91d838 --- /dev/null +++ b/examples/configs/beam_search_diverse_clustered.yaml @@ -0,0 +1,56 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 + +# Concentrated beam search + technique-vector clustering. +# +# Layered diversity: +# 1. Same per-round expansion as beam_search_diverse_concentrated.yaml +# (P=2, M=3, K=3, C=5 = 90 workers per round). +# 2. PTX-hash dedup runs first (collapses byte-identical kernels). +# 3. NEW: technique-vector classification on the survivors via LLM — +# diversity-aware truncation keeps the fastest representative of +# each distinct technique vector. The technique taxonomy lives in +# examples/configs/techniques_default.yaml and is expandable. +# +# Pairs with beam_search_diverse_concentrated.yaml for A/B comparison +# of "PTX dedup only" vs. "PTX dedup + technique clustering" beams. +# +# Usage: +# python examples/run_opt_manager.py \ +# --kernel-dir examples/optimize_01_matvec \ +# --strategy beam_search_diverse_clustered + +strategy: beam_search +num_workers: 90 +strategy_config: + candidate_pool_size: 2 # every member of the pool is expanded + num_bottlenecks: 3 # 3 ranked bottlenecks per parent + samples_per_prompt: 5 # 5 LLM draws per (parent, bottleneck, model) + models: + - claude-opus-4.6 + - gpt-5-4 + - gemini-2-5-pro + technique_clustering: + enabled: true + # Path is resolved relative to the current working directory at + # run launch time. The default file ships with this repo. + techniques_yaml: examples/configs/techniques_default.yaml + # Use ONE classifier model for stable / comparable vectors across + # candidates regardless of which model produced the candidate. + classifier_model: claude-opus-4.6 + # Thread-pool concurrency for manager-side LLM classification. + max_concurrency: 4 + +openai_model: claude-opus-4.6 +high_reasoning_effort: true + +benchmark_warmup: 25 +benchmark_repeat: 100 +divergence_threshold: 50.0 +target_platform: cuda +gpu_name: "NVIDIA H100 NVL 94GB" diff --git a/examples/configs/beam_search_diverse_clustered_xl.yaml b/examples/configs/beam_search_diverse_clustered_xl.yaml new file mode 100644 index 00000000..7f7f14af --- /dev/null +++ b/examples/configs/beam_search_diverse_clustered_xl.yaml @@ -0,0 +1,67 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 + +# Technique-clustered beam search at the wider 225-worker fanout. +# +# Layout per round: +# 5 expanding parents +# × 3 bottlenecks +# × 3 LLMs (Claude / GPT / Gemini) +# × 5 samples per prompt +# = 225 worker processes, each producing one candidate kernel. +# +# After workers return, the beam update runs: +# 1. PTX-hash dedup (collapses byte-identical kernels). +# 2. LLM-based technique-vector classification on the survivors. +# 3. Round-robin diversity-aware truncation: pass 0 takes the +# fastest of every cluster, pass 1 takes the second-fastest of +# every cluster, etc., until 10 beam slots are filled. Output is +# ordered by selection (most-diverse-first), so when the next +# round picks `candidate_pool_size=5` parents, those are 5 +# different technique clusters. +# +# Pairs with beam_search_diverse_concentrated.yaml for an A/B +# comparison of "concentrated, no clustering" vs. "wider fanout + +# diversity-preserving clustering". +# +# Usage: +# python examples/run_opt_manager.py \ +# --kernel-dir \ +# --strategy beam_search_diverse_clustered_xl + +strategy: beam_search +num_workers: 225 +# Dynamic spawn-on-free-GPU-slot scheduler: at most 4 worker processes +# pinned to a single GPU at one moment. Pool capacity is +# workers_per_gpu × len(gpu_ids) = 4 × 8 = 32. When a worker exits its +# slot frees and the next pending candidate is spawned with that GPU id +# — keeps GPUs saturated even when individual workers spend most of +# their time in LLM calls. +workers_per_gpu: 4 +strategy_config: + candidate_pool_size: 5 # every member of the pool is expanded + num_bottlenecks: 3 # 3 ranked bottlenecks per parent + samples_per_prompt: 5 # 5 LLM draws per (parent, bottleneck, model) + models: + - claude-opus-4.6 + - gpt-5-4 + - gemini-2-5-pro + technique_clustering: + enabled: true + techniques_yaml: examples/configs/techniques_default.yaml + classifier_model: claude-opus-4.6 + max_concurrency: 4 + +openai_model: claude-opus-4.6 +high_reasoning_effort: true + +benchmark_warmup: 25 +benchmark_repeat: 100 +divergence_threshold: 50.0 +target_platform: cuda +gpu_name: "NVIDIA H100 NVL 94GB" diff --git a/examples/configs/beam_search_diverse_concentrated.yaml b/examples/configs/beam_search_diverse_concentrated.yaml new file mode 100644 index 00000000..d3eb119b --- /dev/null +++ b/examples/configs/beam_search_diverse_concentrated.yaml @@ -0,0 +1,63 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 + +# CONCENTRATED variant — push the leaders harder. +# +# Layout per round: +# 2 expanding parents (top of beam) +# × 3 bottlenecks +# × 3 LLMs (Claude / GPT / Gemini) +# × 5 samples per prompt +# = 90 worker processes, each producing one candidate kernel. +# +# Same 90-worker budget as beam_search_diverse.yaml (the spread variant) +# but reallocated toward more LLM draws per (parent, bottleneck, model) +# triple instead of more parents. Use this when you trust the leader- +# kernel pair and want to squeeze them rather than explore broadly. +# +# Motivation: in our smoke A/B, Design A (P=1, C=10) outperformed +# Design B (P=2, C=1) on the same problem and budget. This config is +# the production-scale version of that observation: keep enough beam- +# diversity to dedup against (P=2, beam=10) but spend most of the budget +# concentrating attempts on the current leaders. +# +# After workers return: +# 1. Candidates (and the existing 10-kernel beam) are deduplicated by +# normalized-PTX fingerprint — kernels that compile to identical PTX +# collapse to the fastest representative. +# 2. The surviving pool is sorted by runtime. +# 3. The top 10 become the next round's beam. +# +# Usage: +# python examples/run_opt_manager.py \ +# --kernel-dir examples/optimize_01_matvec \ +# --strategy beam_search_diverse_concentrated + +strategy: beam_search +num_workers: 90 +strategy_config: + candidate_pool_size: 2 # every member of the pool is expanded + num_bottlenecks: 3 # 3 ranked bottlenecks per parent + samples_per_prompt: 5 # five LLM draws per (parent, bottleneck, model) + models: + # Three models routed via the Relay provider. + - claude-opus-4.6 + - gpt-5-4 + - gemini-2-5-pro + +# Default LLM (used when no per-candidate override is set; here it's +# overridden per-candidate via the `models` list above). +openai_model: claude-opus-4.6 +high_reasoning_effort: true + +# Worker configuration +benchmark_warmup: 25 +benchmark_repeat: 100 +divergence_threshold: 50.0 +target_platform: cuda +gpu_name: "NVIDIA H100 NVL 94GB" diff --git a/examples/configs/beam_search_diverse_smoke.yaml b/examples/configs/beam_search_diverse_smoke.yaml new file mode 100644 index 00000000..beb85fdc --- /dev/null +++ b/examples/configs/beam_search_diverse_smoke.yaml @@ -0,0 +1,34 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 + +# Smoke-test variant of beam_search_diverse.yaml. +# +# Per-round fanout: 1 parent × 3 bottlenecks × 3 models × 1 sample = 9 workers. +# Exercises the bottleneck-plumbing fix — each (model, bottleneck_id) trio +# should produce 3 distinct optimization directions, not 3 copies of the +# top-ranked bottleneck. + +strategy: beam_search +num_workers: 9 +strategy_config: + candidate_pool_size: 1 # every member of the pool is expanded + num_bottlenecks: 3 # ← drives BottleneckAnalyzer to request 3 ranked options + samples_per_prompt: 1 + models: + - claude-opus-4.6 + - gpt-5-4 + - gemini-2-5-pro + +openai_model: claude-opus-4.6 +high_reasoning_effort: true + +benchmark_warmup: 25 +benchmark_repeat: 100 +divergence_threshold: 50.0 +target_platform: cuda +gpu_name: "NVIDIA H100 NVL 94GB" diff --git a/examples/configs/nvidia.yaml b/examples/configs/nvidia.yaml index b2b66c66..76158e3f 100644 --- a/examples/configs/nvidia.yaml +++ b/examples/configs/nvidia.yaml @@ -26,7 +26,7 @@ strategy: beam_search num_workers: 4 strategy_config: - num_top_kernels: 2 + candidate_pool_size: 2 num_bottlenecks: 2 openai_model: gpt-5 high_reasoning_effort: true diff --git a/examples/configs/techniques_default.yaml b/examples/configs/techniques_default.yaml new file mode 100644 index 00000000..da244053 --- /dev/null +++ b/examples/configs/techniques_default.yaml @@ -0,0 +1,143 @@ +# Default kernel-optimization technique taxonomy used by the +# technique-vector clustering layer in BeamSearchStrategy. +# +# Each entry defines one bit of the binary technique vector emitted per +# kernel. Ordering of this list is the bit ordering of the vector — do +# not reorder existing entries unless you intend to invalidate persisted +# vectors. Append-only is safe (old vectors get padded / re-classified +# on length mismatch). +# +# To add a new technique: append a new entry below. The strategy reads +# this YAML at run start and the vector dimension follows automatically. + +techniques: + - name: split_k_reduce + description: Split-K with separate reduce kernel + llm_hint: | + The kernel module defines two @triton.jit functions: one that + computes partial dot products over K-slices and writes them to a + scratch buffer, and a separate reduce kernel that sums the + partials into the final output. + + - name: tensor_cores + description: Tensor-core MMA usage + llm_hint: | + The kernel uses tl.dot, WMMA, or WGMMA on bf16 / fp16 / fp8 + operands. Bare scalar accumulation in a Python loop does not + count even if the dtypes are bf16. + + - name: software_pipelining + description: Software pipelining via num_stages + llm_hint: | + The kernel launches with num_stages >= 2, or uses + tl.range(..., num_stages=...) inside the loop, intentionally + requesting compile-time pipelining of loads and compute. + + - name: swizzled_load + description: Swizzled / blocked load pattern for bank-conflict avoidance + llm_hint: | + The kernel applies a swizzling pattern (XOR, blocked stride + manipulation, or explicit shared-memory layout reshuffle) when + loading data, beyond the default Triton compiler-managed layout. + + - name: persistent_kernel + description: Persistent grid (one-block-per-SM with internal work loop) + llm_hint: | + The kernel launches a fixed grid sized to the GPU's SM count and + each block iterates internally over multiple work tiles, rather + than launching one block per output tile. + + - name: vectorized_load + description: Vectorized memory load / store + llm_hint: | + Loads or stores fetch multiple elements per instruction + (e.g. tl.load casting to vector types, explicit float4-shaped + pointers, LDS.128 patterns). Default scalar tl.load(pointer) + does not count. + + - name: shared_memory_tiling + description: Explicit shared-memory tiling + llm_hint: | + The kernel stages global-memory tiles into shared memory and + reuses them across multiple threads in a block, beyond what + Triton implicitly does. + + - name: register_tiling + description: Register-tile shape (large K-tile, small M/N-tile) + llm_hint: | + Launch shape is dominated by accumulator-fits-in-registers + sizing — small BLOCK_M / BLOCK_N (≤32) with large BLOCK_K + (≥1024). + + - name: autotuned_config + description: Triton autotune + llm_hint: | + The kernel uses @triton.autotune with multiple Config entries. + + - name: masked_access + description: Masked / boundary-safe accesses + llm_hint: | + tl.load / tl.store calls explicitly pass mask=... to handle + boundary conditions instead of padding inputs. + + - name: atomic_reduction + description: Atomic-based cross-block reduction + llm_hint: | + The kernel uses tl.atomic_add (or similar) to accumulate + contributions from multiple blocks into the same output + location, instead of a separate reduce kernel. + + - name: precision_split + description: Mixed-precision compute (fp32 accumulator, lower-precision operands) + llm_hint: | + Inputs are bf16 / fp16 / fp8 but the accumulator is initialized + with dtype=tl.float32 (or wider than the input dtype) and the + final result is cast back down before store. + + - name: warp_specialization + description: Producer / consumer warp specialization + llm_hint: | + Within one kernel, distinct subsets of warps take different + roles — typically one warpgroup loads data while another + performs the MMA. Detected by tid-conditional branches that + split the thread block into role-specific code paths. + + - name: thread_block_clusters + description: Hopper thread-block clusters / DSMEM + llm_hint: | + The kernel uses cluster launch attributes or distributed shared + memory APIs that allow blocks within the same cluster to share + shared-memory directly. + + - name: grid_swizzling + description: Custom grid rasterization order for L2 reuse + llm_hint: | + The kernel computes its (pid_m, pid_n) tile coordinates from + tl.program_id(0) using a custom traversal (snake / blocked / + swizzled) rather than the default row-major mapping, intended + to maximize L2 cache hit rate across blocks. + + - name: epilogue_fusion + description: Epilogue fusion (bias / activation / scale / cast) + llm_hint: | + The kernel performs additional element-wise operations + (bias add, ReLU/SiLU/GELU, scale, dtype cast, quantize) on + the accumulator inside the kernel before writing the final + result, instead of producing an intermediate tensor. + + - name: loop_unrolling + description: Explicit loop unrolling + llm_hint: | + The Python loop or `tl.range(...)` call carries an explicit + unroll directive (`tl.range(..., num_stages=...)` already + counts under software_pipelining; this bit is reserved for + explicit `# pragma unroll`-style hints, manual fully-unrolled + loops, or `tl.range(loop_unroll_factor=...)`). + + - name: async_copy_tma + description: Asynchronous copy / TMA-based load + llm_hint: | + The kernel uses cp.async or TMA-style asynchronous loads that + overlap with compute, rather than synchronous tl.load. In + Triton this is most often expressed via tl.experimental APIs + or TensorMemoryAccessor patterns. diff --git a/examples/run_opt_manager.py b/examples/run_opt_manager.py index af5a0dd9..375a3dfb 100644 --- a/examples/run_opt_manager.py +++ b/examples/run_opt_manager.py @@ -30,10 +30,19 @@ import sys from pathlib import Path -from dotenv import load_dotenv +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +try: + from dotenv import load_dotenv +except ModuleNotFoundError: + + def load_dotenv() -> None: + """Allow running without optional python-dotenv installed.""" + return None + + from triton_kernel_agent.opt_manager import OptimizationManager -sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) load_dotenv() # Hardcoded config directory relative to this script. diff --git a/triton_kernel_agent/opt_manager.py b/triton_kernel_agent/opt_manager.py index 495b550d..7a4ec6b8 100644 --- a/triton_kernel_agent/opt_manager.py +++ b/triton_kernel_agent/opt_manager.py @@ -23,7 +23,7 @@ >>> manager = OptimizationManager( ... strategy="beam_search", ... num_workers=4, - ... strategy_config={"num_top_kernels": 2, "num_bottlenecks": 2}, + ... strategy_config={"candidate_pool_size": 2, "num_bottlenecks": 2}, ... ) >>> result = manager.run_optimization( ... initial_kernel=kernel_code, @@ -39,6 +39,7 @@ from pathlib import Path from typing import Any +from kernel_perf_agent.kernel_opt.diagnose_prompt.judger_prompt import BottleneckResult from triton_kernel_agent.opt_worker_component.searching.history.json_db import ( JSONProgramDatabase, ) @@ -61,6 +62,48 @@ _MANAGER_LEVEL_KEYS = {"verifier", "benchmarker", "worker_runner"} +def _detect_gpus() -> list[int]: + """Return physical GPU ids visible to this process. + + Crucially this MUST NOT initialize the CUDA context in the manager + process — workers are spawned via ``mp.Process`` (fork on Linux), and + if the manager has already touched ``torch.cuda`` the children inherit + that locked-in device list and ignore any post-fork + ``CUDA_VISIBLE_DEVICES`` override. We use ``nvidia-smi`` subprocess + detection instead. + + Order of resolution: + 1. ``CUDA_VISIBLE_DEVICES`` env (respect user restriction). + 2. ``nvidia-smi --query-gpu=index --format=csv,noheader``. + 3. Fallback to ``[0]`` (single GPU). + """ + import os + import subprocess + + cvd = os.environ.get("CUDA_VISIBLE_DEVICES") + if cvd: + try: + ids = [int(x.strip()) for x in cvd.split(",") if x.strip()] + if ids: + return ids + except ValueError: + pass + try: + out = subprocess.run( + ["nvidia-smi", "--query-gpu=index", "--format=csv,noheader"], + capture_output=True, + text=True, + timeout=5, + ) + if out.returncode == 0: + ids = [int(x.strip()) for x in out.stdout.splitlines() if x.strip()] + if ids: + return ids + except Exception: + pass + return [0] + + @config_injectable class OptimizationManager: """Manages parallel kernel optimization with pluggable strategies. @@ -88,6 +131,8 @@ def __init__( high_reasoning_effort: bool = True, bottleneck_override: str | None = None, platform: dict[str, str] | str | None = None, + gpu_ids: list[int] | None = None, + workers_per_gpu: int = 2, **worker_kwargs: Any, ): """Initialize the optimization manager. @@ -138,6 +183,20 @@ def __init__( strategy, strategy_config or {}, num_workers ) + # Forward the strategy's bottleneck-fanout knob to workers so the + # ``BottleneckAnalyzer`` actually requests that many ranked + # bottlenecks from the LLM. Without this, the strategy spawns N + # workers per parent (with bottleneck_id ∈ {1..N}) but the analyzer + # always returns 1, and workers with id>1 silently fall back to id=1. + if ( + strategy_config + and "num_bottlenecks" in strategy_config + and "num_bottlenecks_to_request" not in self.worker_kwargs + ): + self.worker_kwargs["num_bottlenecks_to_request"] = strategy_config[ + "num_bottlenecks" + ] + # Validate worker count if num_workers != self.strategy.num_workers_needed: raise ValueError( @@ -146,10 +205,33 @@ def __init__( ) self.num_workers = num_workers - self.benchmark_lock = mp.Lock() - # Semaphore to serialize NCU profiling - NCU requires exclusive GPU access - # and has high memory overhead, so only one worker should profile at a time - self.profiling_semaphore = mp.Semaphore(1) + + # Per-GPU lock pool — one lock per GPU does double duty for both + # benchmarking and NCU profiling (the two GPU-serialized + # operations). Workers running on different GPUs proceed in + # parallel; workers on the same GPU serialize. This is the + # "collapsed" multi-GPU design: a single shared object per GPU + # acts as both ``benchmark_lock`` and ``profiling_semaphore``. + self.gpu_ids: list[int] = list(gpu_ids) if gpu_ids else _detect_gpus() + self.gpu_locks: dict[int, Any] = {g: mp.Lock() for g in self.gpu_ids} + # Bound on how many worker processes can be pinned to a single GPU + # at once. The runner uses this as a dynamic pool: a freed slot + # pulls the next pending candidate (with the freed GPU's id) so + # GPUs that finish quickly don't sit idle while their statically- + # assigned share of candidates trickles through LLM phase. + self.workers_per_gpu: int = max(1, int(workers_per_gpu)) + + # Manager-level GPU work (initial-kernel verify, PyTorch baselines, + # baseline NCU cache) uses *separate* locks from workers'. All + # manager GPU operations happen between rounds when no workers + # are running, so they don't actually need to coordinate with + # workers — and giving them dedicated locks means a worker that + # dies holding ``gpu_locks[g]`` can't strand the manager forever + # (mp.Lock has no stale-holder recovery). + _first_gpu = self.gpu_ids[0] + self._mgr_gpu_locks: dict[int, Any] = {g: mp.Lock() for g in self.gpu_ids} + self.benchmark_lock = self._mgr_gpu_locks[_first_gpu] + self.profiling_semaphore = self._mgr_gpu_locks[_first_gpu] # Shared history across beam search iterations self.shared_history: list[ @@ -158,11 +240,23 @@ def __init__( self.shared_reflexions: list[dict] = [] # List of serialized Reflexion dicts self.history_size: int = 10 # Max history entries to pass to workers + # Per-parent baseline NCU/roofline cache, keyed by program_id. + # Populated in _run_workers; shared across all sibling workers of the + # same round (and surviving across rounds for beam members that stick). + self._baseline_profile_cache: dict[str, dict[str, Any] | None] = {} + # Per-parent bottleneck-analysis cache, keyed by program_id. + # Populated alongside the baseline profile so sibling workers reuse the + # same LLM bottleneck analysis instead of each issuing their own call. + self._bottleneck_analysis_cache: dict[str, list[dict[str, Any]] | None] = {} + # ── Platform components (resolved from registry) ───────── self._resolve_platform(platform) self.logger.info( - f"OptimizationManager initialized: strategy={strategy}, workers={num_workers}" + f"OptimizationManager initialized: strategy={strategy}, " + f"workers={num_workers}, gpus={self.gpu_ids}, " + f"workers_per_gpu={self.workers_per_gpu} " + f"(pool capacity {self.workers_per_gpu * len(self.gpu_ids)})" ) # ------------------------------------------------------------------ @@ -177,6 +271,11 @@ def _resolve_platform(self, platform: dict[str, str] | str | None) -> None: Worker-level component names are forwarded to worker processes via ``self.worker_kwargs["platform_config"]`` so each worker can resolve its own instances from the registry. + + Additionally, the manager resolves its *own* ``profiler`` and + ``roofline_analyzer`` instances (without removing them from the + worker config) so it can profile baseline kernels once per round + and share the result across sibling workers. """ from triton_kernel_agent.platform.registry import registry @@ -205,11 +304,48 @@ def _resolve_platform(self, platform: dict[str, str] | str | None) -> None: high_reasoning_effort=self.high_reasoning_effort, bottleneck_override=self.bottleneck_override, worker_kwargs=self.worker_kwargs, + gpu_ids=self.gpu_ids, + gpu_locks=self.gpu_locks, + workers_per_gpu=self.workers_per_gpu, ) self.verifier = components["verifier"] self.benchmarker = components["benchmarker"] self.worker_runner = components["worker_runner"] + # Resolve a manager-owned profiler + roofline_analyzer for the + # baseline-caching step. These coexist with the worker-level + # instances (workers still build their own from ``worker_config``). + self._mgr_profiler: Any | None = None + self._mgr_roofline: Any | None = None + self._mgr_bottleneck_analyzer: Any | None = None + for key, setter_attr in ( + ("profiler", "_mgr_profiler"), + ("roofline_analyzer", "_mgr_roofline"), + ("bottleneck_analyzer", "_mgr_bottleneck_analyzer"), + ): + impl_name = config.get(key) + if impl_name and registry.has(key, impl_name): + try: + setattr( + self, + setter_attr, + registry.create( + key, + impl_name, + logger=self.logger, + log_dir=self.log_dir, + artifacts_dir=self.log_dir / "baseline_profiles", + profiling_semaphore=self.profiling_semaphore, + openai_model=self.openai_model, + gpu_name=self.worker_kwargs.get("gpu_name"), + ), + ) + except Exception as e: + self.logger.warning( + f"Failed to create manager-level {key}: {e}. " + f"Manager-side caching for {key} disabled." + ) + # Propagate worker-level config (string names) to worker # processes — each worker resolves its own instances via the # registry so there are no pickling issues. @@ -240,6 +376,61 @@ def _setup_logging(self) -> logging.Logger: return logger + def _build_technique_clustering_kwargs( + self, config: dict[str, Any] + ) -> dict[str, Any]: + """Resolve the ``technique_clustering`` block from strategy_config. + + Returns the kwargs to splat into ``BeamSearchStrategy.__init__`` + (empty dict when clustering is disabled or misconfigured — + callers fall back to the plain PTX-dedup behavior). + """ + cfg = config.get("technique_clustering") + if not cfg or not cfg.get("enabled", False): + return {} + + from triton_kernel_agent.opt_worker_component.searching.technique_vector import ( + load_techniques, + ) + from utils.providers import get_model_provider + + yaml_path = cfg.get("techniques_yaml") + if not yaml_path: + self.logger.warning( + "technique_clustering.enabled=True but techniques_yaml missing; " + "clustering disabled." + ) + return {} + try: + techniques = load_techniques(yaml_path) + except Exception as e: + self.logger.warning( + f"Failed to load techniques from {yaml_path}: {e}. " + "Technique clustering disabled." + ) + return {} + + classifier_model = cfg.get("classifier_model") or self.openai_model + try: + provider = get_model_provider(classifier_model) + except Exception as e: + self.logger.warning( + f"Cannot obtain provider for technique classifier " + f"{classifier_model!r}: {e}. Clustering disabled." + ) + return {} + + self.logger.info( + f"Technique clustering enabled: {len(techniques)} techniques, " + f"classifier={classifier_model}" + ) + return { + "techniques": techniques, + "technique_classifier_provider": provider, + "technique_classifier_model": classifier_model, + "technique_classifier_concurrency": int(cfg.get("max_concurrency", 4)), + } + def _create_strategy( self, name: str, config: dict[str, Any], num_workers: int ) -> SearchStrategy: @@ -257,11 +448,15 @@ def _create_strategy( ValueError: If strategy name is unknown """ if name == "beam_search": + cluster_kwargs = self._build_technique_clustering_kwargs(config) return BeamSearchStrategy( - num_top_kernels=config.get("num_top_kernels", 2), + candidate_pool_size=config.get("candidate_pool_size", 2), num_bottlenecks=config.get("num_bottlenecks", 2), database=self.database, logger=self.logger, + models=config.get("models"), + samples_per_prompt=config.get("samples_per_prompt", 1), + **cluster_kwargs, ) elif name == "greedy": return GreedyStrategy( @@ -455,6 +650,18 @@ def _run_workers( pytorch_baseline: float, ) -> list[dict[str, Any]]: """Spawn workers for each candidate and collect results.""" + # Profile each distinct parent kernel once and share the NCU/roofline + # result across sibling workers. This avoids repeating an expensive, + # semaphore-serialized NCU run for every (bottleneck, model) fanout. + self._populate_baseline_cache(candidates, problem_file, round_num) + self._populate_bottleneck_cache(candidates, round_num) + for cand in candidates: + parent_id = cand["parent"].program_id + cand["baseline_metrics"] = self._baseline_profile_cache.get(parent_id) + cand["precomputed_bottleneck_results"] = ( + self._bottleneck_analysis_cache.get(parent_id) + ) + results = self.worker_runner.run_workers( candidates=candidates, round_num=round_num, @@ -488,3 +695,163 @@ def _run_workers( self.logger.debug(f"Traceback:\n{r.get('traceback')}") return results + + def _populate_baseline_cache( + self, + candidates: list[dict[str, Any]], + problem_file: Path, + round_num: int, + ) -> None: + """Profile each distinct parent kernel once and cache the result. + + The cache is keyed by ``parent.program_id`` and persists across + rounds, so a beam member that survives multiple rounds is profiled + at most once. If the manager-level profiler or roofline analyzer + is unavailable, this is a no-op and workers fall back to profiling + their own baselines. + """ + if self._mgr_profiler is None or self._mgr_roofline is None: + return + + from triton_kernel_agent.opt_worker_component.orchestrator.optimization_orchestrator import ( + _get_triton_kernel_metrics, + ) + + baseline_dir = self.log_dir / "baseline_profiles" + baseline_dir.mkdir(parents=True, exist_ok=True) + + # Collect (program_id, kernel_code) for parents we haven't cached yet. + # De-dup by program_id since many candidates share the same parent. + unseen: dict[str, str] = {} + for cand in candidates: + parent = cand["parent"] + pid = parent.program_id + if pid not in self._baseline_profile_cache and pid not in unseen: + unseen[pid] = parent.kernel_code + + for pid, kernel_code in unseen.items(): + try: + kernel_file = baseline_dir / f"{pid}.py" + kernel_file.write_text(kernel_code) + + profiler_results = self._mgr_profiler.profile_kernel( + kernel_file, problem_file, round_num + ) + if profiler_results is None or not getattr( + profiler_results, "metrics", None + ): + self._baseline_profile_cache[pid] = None + self.logger.warning( + f"Baseline profile failed for parent {pid}; " + f"workers will profile their own baselines." + ) + continue + + ncu_metrics = profiler_results.metrics + flat_metrics = _get_triton_kernel_metrics(ncu_metrics) + roofline_result = self._mgr_roofline.analyze(ncu_metrics=flat_metrics) + + self._baseline_profile_cache[pid] = { + "efficiency_pct": roofline_result.efficiency_pct, + "compute_sol_pct": roofline_result.compute_sol_pct, + "memory_sol_pct": roofline_result.memory_sol_pct, + "bottleneck": roofline_result.bottleneck, + "roofline_result": roofline_result, + "ncu_metrics": ncu_metrics, + } + self.logger.info( + f"Baseline profiled for parent {pid}: " + f"{roofline_result.bottleneck}-bound, " + f"{roofline_result.efficiency_pct:.1f}% SOL" + ) + except Exception as e: + self._baseline_profile_cache[pid] = None + self.logger.warning( + f"Baseline profile errored for parent {pid}: {e}; " + f"workers will profile their own baselines." + ) + + def _populate_bottleneck_cache( + self, + candidates: list[dict[str, Any]], + round_num: int, + ) -> None: + """Analyze each distinct parent kernel once and cache the LLM result.""" + if self._mgr_bottleneck_analyzer is None: + return + + unseen: dict[str, ProgramEntry] = {} + for cand in candidates: + parent = cand["parent"] + pid = parent.program_id + if pid not in self._bottleneck_analysis_cache and pid not in unseen: + unseen[pid] = parent + + for pid, parent in unseen.items(): + baseline_metrics = self._baseline_profile_cache.get(pid) + if not baseline_metrics or not baseline_metrics.get("ncu_metrics"): + self._bottleneck_analysis_cache[pid] = None + continue + + try: + ncu_metrics = baseline_metrics["ncu_metrics"] + roofline_result = baseline_metrics.get("roofline_result") + llm_results = self._mgr_bottleneck_analyzer.analyze( + parent.kernel_code, + ncu_metrics, + round_num, + roofline_result, + ) + + if self.bottleneck_override: + if llm_results: + cached = [ + BottleneckResult( + category=self.bottleneck_override, + summary=( + f"Pre-computed: {self.bottleneck_override}-bound kernel" + ), + reasoning=result.reasoning, + root_causes=result.root_causes, + recommended_fixes=result.recommended_fixes, + ).to_dict() + for result in llm_results + ] + else: + cached = [ + BottleneckResult( + category=self.bottleneck_override, + summary=( + f"Pre-computed: {self.bottleneck_override}-bound kernel" + ), + reasoning=( + "Classification based on operation arithmetic intensity" + ), + root_causes=[], + recommended_fixes=[], + ).to_dict() + ] + else: + cached = ( + [result.to_dict() for result in llm_results] + if llm_results + else None + ) + + self._bottleneck_analysis_cache[pid] = cached + if cached: + categories = ", ".join(item["category"] for item in cached) + self.logger.info( + f"Baseline bottlenecks cached for parent {pid}: {categories}" + ) + else: + self.logger.warning( + f"Baseline bottleneck analysis failed for parent {pid}; " + f"workers will analyze independently." + ) + except Exception as e: + self._bottleneck_analysis_cache[pid] = None + self.logger.warning( + f"Baseline bottleneck analysis errored for parent {pid}: {e}; " + f"workers will analyze independently." + ) diff --git a/triton_kernel_agent/opt_worker.py b/triton_kernel_agent/opt_worker.py index 7703efc2..0c51d208 100644 --- a/triton_kernel_agent/opt_worker.py +++ b/triton_kernel_agent/opt_worker.py @@ -88,6 +88,12 @@ def __init__( # BeamSearch parameters (passed by opt_manager) bottleneck_id: int | None = None, bottleneck_override: str | None = None, + # How many bottlenecks the analyzer should request from the LLM. + # Strategies that fan out across multiple bottleneck ranks (e.g. beam + # search with num_bottlenecks > 1) need the analyzer to actually + # produce that many ranked options — otherwise sibling workers all + # silently fall back to the first bottleneck. + num_bottlenecks_to_request: int = 1, # Shared history from beam search manager prior_history: list[dict] | None = None, prior_reflexions: list[dict] | None = None, @@ -156,6 +162,7 @@ def __init__( # BeamSearch parameters self.bottleneck_id = bottleneck_id self.bottleneck_override = bottleneck_override + self.num_bottlenecks_to_request = max(1, int(num_bottlenecks_to_request)) # Shared history from beam search manager self.prior_history = prior_history or [] @@ -246,6 +253,7 @@ def _resolve_platform_config(self) -> None: openai_model=self.openai_model, gpu_name=self.gpu_name, roofline_config=self.roofline_config, + num_bottlenecks=self.num_bottlenecks_to_request, ) for k, v in resolved.items(): if k not in self._platform: @@ -308,6 +316,7 @@ def _init_components(self) -> None: gpu_specs=self.gpu_specs, logs_dir=self.log_dir, logger=self.logger, + num_bottlenecks=self.num_bottlenecks_to_request, ) # Verification worker (for correctness checks) @@ -361,6 +370,8 @@ def optimize_kernel( test_code: str | list[str], known_kernel_time: float | None = None, max_opt_rounds: int | None = None, + baseline_metrics: dict[str, Any] | None = None, + precomputed_bottleneck_results: list[dict[str, Any]] | None = None, ) -> tuple[bool, str, dict[str, Any]]: """ Run hardware-guided optimization on a kernel. @@ -373,6 +384,12 @@ def optimize_kernel( are additional tests. known_kernel_time: Known baseline time in ms (skip initial benchmark) max_opt_rounds: Maximum optimization rounds (defaults to self.max_rounds) + baseline_metrics: Optional pre-computed NCU profile + roofline for + ``kernel_code``. Forwarded to the orchestrator so it can skip + its own NCU run on the baseline. + precomputed_bottleneck_results: Optional manager-computed bottleneck + analysis for the parent kernel. When supplied, workers reuse + it instead of issuing their own bottleneck-analysis LLM call. Returns: Tuple of (success, best_kernel_code, performance_metrics) @@ -424,4 +441,6 @@ def optimize_kernel( test_code=test_code, known_kernel_time=known_kernel_time, max_opt_rounds=max_opt_rounds, + baseline_metrics=baseline_metrics, + precomputed_bottleneck_results=precomputed_bottleneck_results, ) diff --git a/triton_kernel_agent/opt_worker_component/benchmarking/benchmark.py b/triton_kernel_agent/opt_worker_component/benchmarking/benchmark.py index 9f8314ac..0b9faadc 100644 --- a/triton_kernel_agent/opt_worker_component/benchmarking/benchmark.py +++ b/triton_kernel_agent/opt_worker_component/benchmarking/benchmark.py @@ -20,14 +20,21 @@ import json import logging +import os +import shutil import subprocess import sys +import tempfile import traceback from pathlib import Path from typing import Any, Optional import torch +from triton_kernel_agent.opt_worker_component.searching.ptx_fingerprint import ( + ptx_hash_from_cache, +) + from triton_kernel_agent.opt_worker_component.benchmarking.timing import ( compute_timing_stats, prepare_pytorch_model, @@ -124,6 +131,7 @@ def benchmark_kernel( - time_ms: Mean time in ms - speedup: Speedup vs baseline """ + ptx_cache_dir = Path(tempfile.mkdtemp(prefix="triton_cache_bench_")) try: with self.lock_manager: results_json = self.artifacts_dir / "benchmark_results.json" @@ -148,11 +156,17 @@ def benchmark_kernel( if baseline_file: cmd.extend(["--baseline"]) + # Isolate this benchmark's Triton compilation cache so we can + # capture its PTX for fingerprint-based dedup without being + # contaminated by sibling workers' artifacts. + env = {**os.environ, "TRITON_CACHE_DIR": str(ptx_cache_dir)} + result = subprocess.run( cmd, capture_output=True, text=True, timeout=300, + env=env, ) if result.returncode != 0: @@ -162,7 +176,7 @@ def benchmark_kernel( or "Unknown error" ) self.logger.error(f"Kernel benchmark failed: {error_msg}") - return {"time_ms": float("inf"), "speedup": 0.0} + return {"time_ms": float("inf"), "speedup": 0.0, "ptx_hash": None} with open(results_json, "r") as f: results = json.load(f) @@ -170,14 +184,21 @@ def benchmark_kernel( kernel_name = kernel_file.stem kernel_results = results.get("kernels", {}).get(kernel_name, {}) + # Capture the PTX fingerprint from the isolated cache dir. + # A None result is graceful — dedup treats it as a singleton. + ptx_hash = ptx_hash_from_cache(ptx_cache_dir) + return { "time_ms": kernel_results.get("time_ms", float("inf")), "speedup": kernel_results.get("speedup", 1.0), + "ptx_hash": ptx_hash, } except Exception as e: self.logger.error(f"Kernel benchmark failed: {e}") - return {"time_ms": float("inf"), "speedup": 0.0} + return {"time_ms": float("inf"), "speedup": 0.0, "ptx_hash": None} + finally: + shutil.rmtree(ptx_cache_dir, ignore_errors=True) def benchmark_pytorch( self, diff --git a/triton_kernel_agent/opt_worker_component/orchestrator/optimization_orchestrator.py b/triton_kernel_agent/opt_worker_component/orchestrator/optimization_orchestrator.py index 219b60c2..4b95a2d7 100644 --- a/triton_kernel_agent/opt_worker_component/orchestrator/optimization_orchestrator.py +++ b/triton_kernel_agent/opt_worker_component/orchestrator/optimization_orchestrator.py @@ -315,6 +315,8 @@ def optimize_kernel( test_code: list[str], known_kernel_time: float | None = None, max_opt_rounds: int = 5, + baseline_metrics: dict[str, Any] | None = None, + precomputed_bottleneck_results: list[dict[str, Any]] | None = None, ) -> tuple[bool, str, dict[str, Any]]: """ Main optimization loop. @@ -325,6 +327,16 @@ def optimize_kernel( test_code: List of test code strings (primary + additional tests) known_kernel_time: Known performance of kernel_code in ms max_opt_rounds: Maximum optimization rounds + baseline_metrics: Optional pre-computed NCU profile + roofline result + for ``kernel_code``. When supplied, the orchestrator skips its + own NCU runs on the baseline (used when the manager shares a + single profile across sibling workers in the same round). + Expected keys: ``efficiency_pct``, ``compute_sol_pct``, + ``memory_sol_pct``, ``bottleneck``, ``roofline_result``, + ``ncu_metrics``. + precomputed_bottleneck_results: Optional manager-computed + bottleneck analysis for the parent kernel. When supplied, + workers reuse it instead of issuing their own bottleneck LLM call. Returns: Tuple of (success, best_kernel_code, performance_metrics) @@ -342,6 +354,11 @@ def optimize_kernel( early_stop_reason = "" any_verified = False + # Cached baseline NCU — consumed at most once by _profile_and_analyze + # when it runs on the identical baseline kernel (round 1 only). + self._pending_baseline_metrics: dict[str, Any] | None = baseline_metrics + self._pending_bottleneck_results = precomputed_bottleneck_results + # Reset roofline history for new optimization run self.roofline_analyzer.reset_history() @@ -354,7 +371,9 @@ def optimize_kernel( # Benchmark baseline and PyTorch (now includes baseline SOL profiling) best_time, baseline_results, pytorch_baseline_time, baseline_sol = ( - self._benchmark_baseline(kernel_code, problem_file, known_kernel_time) + self._benchmark_baseline( + kernel_code, problem_file, known_kernel_time, baseline_metrics + ) ) # Two-kernel tracking: track best-by-runtime and best-by-SOL independently @@ -362,6 +381,7 @@ def optimize_kernel( best_runtime_kernel = kernel_code best_runtime_time = best_time best_runtime_sol = baseline_sol + best_runtime_ptx_hash: str | None = None best_sol_kernel = kernel_code best_sol_time = best_time @@ -474,7 +494,18 @@ def optimize_kernel( # Generate optimized kernel optimized_kernel = self._generate_optimized_kernel(opt_prompt, round_num) if not optimized_kernel: - error_feedback = "Failed to extract valid kernel code. Please provide complete kernel wrapped in ```python blocks." + error_feedback = self._last_generation_failure_reason or ( + "Failed to extract valid kernel code. Please provide complete kernel wrapped in ```python blocks." + ) + current_attempt.passed_verification = False + current_attempt.error_message = error_feedback + self.attempt_history.append(current_attempt) + if "Malformed LLM kernel response" in error_feedback: + self.logger.warning( + f"[{round_num}] Terminating worker due to malformed LLM response" + ) + early_stop_reason = "malformed_llm_response" + break continue # Verify and refine @@ -492,6 +523,12 @@ def optimize_kernel( current_attempt.passed_verification = False current_attempt.error_message = error_feedback self.attempt_history.append(current_attempt) + if "Malformed LLM kernel response" in error_feedback: + self.logger.warning( + f"[{round_num}] Terminating worker due to malformed LLM response" + ) + early_stop_reason = "malformed_llm_response" + break continue error_feedback = "" @@ -504,16 +541,12 @@ def optimize_kernel( kernel_file_round, problem_file ) new_time = bench_results["time_ms"] + new_ptx_hash = bench_results.get("ptx_hash") - # Profile the NEW kernel to get its SOL metrics - new_kernel_metrics = self._profile_kernel_for_sol( - optimized_kernel, problem_file, round_num - ) - new_sol = ( - new_kernel_metrics.get("efficiency_pct", 0.0) - if new_kernel_metrics - else 0.0 - ) + # Candidate-level NCU is intentionally deferred. The manager + # profiles only kernels that survive into the next round's parent set. + new_kernel_metrics = None + new_sol = 0.0 # Complete the attempt with benchmark results new_config = extract_triton_config(optimized_kernel) @@ -536,16 +569,6 @@ def optimize_kernel( any_verified = True current_attempt.config_changes = config_changes - # Add SOL metrics from new kernel profiling - if new_kernel_metrics: - current_attempt.compute_sol_pct = new_kernel_metrics.get( - "compute_sol_pct", 0.0 - ) - current_attempt.memory_sol_pct = new_kernel_metrics.get( - "memory_sol_pct", 0.0 - ) - current_attempt.combined_sol_pct = new_sol - # Add attempt to history self.attempt_history.append(current_attempt) @@ -578,64 +601,22 @@ def optimize_kernel( round_num, ) - # Track metadata when new best runtime is found - if new_time < best_runtime_time or new_sol > best_sol_sol: + # Track metadata when new best runtime is found. Note: + # ``best_runtime_time`` has just been updated by + # ``_update_kernels``, so we compare via the post-update + # ``best_runtime_kernel`` identity instead of revisiting + # ``new_time < best_runtime_time`` (which would always be false + # at this point). + if best_runtime_kernel == optimized_kernel: best_round_num = round_num best_bottleneck_category = primary.category - if new_kernel_metrics: - best_ncu_metrics = new_kernel_metrics.get("ncu_metrics") - - # Roofline check for early termination - # Use best_runtime kernel's SOL for early termination check - # We want a kernel that is both fast AND efficient - if new_kernel_metrics: - roofline_check = new_kernel_metrics.get("roofline_result") - if roofline_check: - self.logger.info( - f"[{round_num}] Roofline: {roofline_check.bottleneck}-bound, " - f"{roofline_check.efficiency_pct:.1f}% SOL " - f"(Compute: {roofline_check.compute_sol_pct:.1f}%, " - f"Memory: {roofline_check.memory_sol_pct:.1f}%)" - ) + if best_runtime_kernel == optimized_kernel: + best_runtime_ptx_hash = new_ptx_hash - # Only early terminate if the best runtime kernel is at roofline - # This prevents stopping with a slow but "efficient" kernel - if ( - best_runtime_kernel == optimized_kernel - and roofline_check.at_roofline - ): - should_stop, stop_reason = self.roofline_analyzer.should_stop( - roofline_check - ) - if should_stop and self.roofline_analyzer.config.early_stop: - self.logger.info( - f"[{round_num}] 🎯 Early termination: {stop_reason}" - ) - early_stop_reason = stop_reason - break - - # Profile the final best kernel to get its roofline - if best_round_num > 0: - final_kernel_file = self.artifact_dir / f"kernel_round_{best_round_num}.py" - if final_kernel_file.exists(): + if best_runtime_kernel == optimized_kernel: self.logger.info( - f"Profiling final best kernel (round {best_round_num})..." + f"[{round_num}] Runtime improved; defer NCU until the next round if this kernel is expanded" ) - final_profiler_results = self.profiler.profile_kernel( - final_kernel_file, problem_file, best_round_num - ) - if final_profiler_results and final_profiler_results.metrics: - best_ncu_metrics = final_profiler_results.metrics - final_flat_metrics = _get_triton_kernel_metrics(best_ncu_metrics) - final_roofline = self.roofline_analyzer.analyze( - ncu_metrics=final_flat_metrics, - ) - self.logger.info( - f"Final roofline (kernel_round_{best_round_num}): " - f"{final_roofline.bottleneck}-bound, {final_roofline.efficiency_pct:.1f}% SOL " - f"(Compute: {final_roofline.compute_sol_pct:.1f}%, " - f"Memory: {final_roofline.memory_sol_pct:.1f}%)" - ) # Final results - use best runtime kernel as primary result return self._finalize_results( @@ -653,13 +634,23 @@ def optimize_kernel( best_round_num, early_stop_reason, any_verified, + best_runtime_ptx_hash=best_runtime_ptx_hash, ) def _benchmark_baseline( - self, kernel_code: str, problem_file: Path, known_kernel_time: float | None + self, + kernel_code: str, + problem_file: Path, + known_kernel_time: float | None, + cached_baseline_metrics: dict[str, Any] | None = None, ) -> tuple[float, dict[str, float], float | None, float]: """Benchmark baseline kernel and PyTorch, and profile baseline SOL. + When ``cached_baseline_metrics`` is supplied, the NCU/roofline step + is skipped and the cached values are reused — this lets the manager + share a single baseline profile across sibling workers operating on + the same parent kernel. + Returns: Tuple of (best_time, baseline_results, pytorch_baseline_time, baseline_sol) """ @@ -683,8 +674,14 @@ def _benchmark_baseline( best_time = baseline_results["time_ms"] self.logger.info(f"📊 Baseline time: {best_time:.4f} ms") - # Profile baseline kernel for SOL metrics - baseline_metrics = self._profile_kernel_for_sol(kernel_code, problem_file, 0) + # Profile baseline kernel for SOL metrics (skip if cached) + if cached_baseline_metrics is not None: + baseline_metrics = cached_baseline_metrics + self.logger.info("📊 Baseline SOL: (using cached profile from manager)") + else: + baseline_metrics = self._profile_kernel_for_sol( + kernel_code, problem_file, 0 + ) if baseline_metrics: baseline_sol = baseline_metrics.get("efficiency_pct", 0.0) bottleneck = baseline_metrics.get("bottleneck", "unknown") @@ -726,29 +723,49 @@ def _profile_and_analyze( Tuple of (bottleneck_results, roofline_result, ncu_metrics). All can be None if profiling fails. """ - self.logger.info(f"[{round_num}] Profiling current kernel with NCU...") - kernel_file_round = self.artifact_dir / f"kernel_round_{round_num - 1}.py" - kernel_file_round.write_text(current_kernel) + # If the manager pre-profiled the baseline for us, consume it in round 1 + # (when current_kernel is still the baseline) and skip the NCU run. + cached = self._pending_baseline_metrics + if cached is not None and round_num == 1 and cached.get("ncu_metrics"): + self.logger.info( + f"[{round_num}] Using cached baseline NCU profile (skipping NCU)" + ) + # Still write the kernel file so downstream artifact paths are stable. + kernel_file_round = self.artifact_dir / f"kernel_round_{round_num - 1}.py" + kernel_file_round.write_text(current_kernel) + ncu_metrics = cached["ncu_metrics"] + self._pending_baseline_metrics = None # consume once + else: + self.logger.info(f"[{round_num}] Profiling current kernel with NCU...") + kernel_file_round = self.artifact_dir / f"kernel_round_{round_num - 1}.py" + kernel_file_round.write_text(current_kernel) - profiler_results = self.profiler.profile_kernel( - kernel_file_round, problem_file, round_num - ) + profiler_results = self.profiler.profile_kernel( + kernel_file_round, problem_file, round_num + ) - if profiler_results is None: - self.logger.warning(f"[{round_num}] Profiling failed") - return None, None, None + if profiler_results is None: + self.logger.warning(f"[{round_num}] Profiling failed") + return None, None, None - ncu_metrics = profiler_results.metrics + ncu_metrics = profiler_results.metrics - if not ncu_metrics: - return None, None, ncu_metrics + if not ncu_metrics: + return None, None, ncu_metrics # Run roofline analysis flat_metrics = next(iter(ncu_metrics.values()), {}) if ncu_metrics else {} roofline_result = self.bottleneck_analyzer.roofline.analyze(flat_metrics) + precomputed = self._pending_bottleneck_results + if precomputed is not None and round_num == 1: + self.logger.info( + f"[{round_num}] Using pre-computed bottleneck analysis (skipping LLM)" + ) + self._pending_bottleneck_results = None + bottleneck_results = [BottleneckResult(**item) for item in precomputed] # Use pre-computed bottleneck if override is set - if self.bottleneck_override: + elif self.bottleneck_override: self.logger.info( f"[{round_num}] Using pre-computed bottleneck: {self.bottleneck_override}-bound (with LLM analysis for details)" ) @@ -804,21 +821,13 @@ def _profile_kernel_for_sol( problem_file: Path, round_num: int, ) -> dict[str, Any] | None: - """Profile a kernel to get its SOL metrics. + """Profile a kernel to get SOL metrics. - This is a lightweight profiling specifically for SOL measurement, - used to evaluate the new kernel after benchmarking. - - Args: - kernel_code: Kernel code to profile - problem_file: Path to problem file - round_num: Current round number - - Returns: - Dict with efficiency_pct, roofline_result, ncu_metrics, or None if profiling fails + This helper is retained for baseline kernels. Candidate kernels are + intentionally not profiled in-worker; the manager profiles only kernels + that survive into the next round's parent set. """ try: - # Write kernel to temp file for profiling kernel_file = self.artifact_dir / f"kernel_round_{round_num}_sol.py" kernel_file.write_text(kernel_code) @@ -831,8 +840,6 @@ def _profile_kernel_for_sol( ncu_metrics = profiler_results.metrics flat_metrics = _get_triton_kernel_metrics(ncu_metrics) - - # Run roofline analysis roofline_result = self.roofline_analyzer.analyze(ncu_metrics=flat_metrics) return { @@ -843,7 +850,6 @@ def _profile_kernel_for_sol( "roofline_result": roofline_result, "ncu_metrics": ncu_metrics, } - except Exception as e: self.logger.warning(f"[{round_num}] SOL profiling failed: {e}") return None @@ -851,6 +857,7 @@ def _profile_kernel_for_sol( def _generate_optimized_kernel(self, opt_prompt: str, round_num: int) -> str | None: """Generate optimized kernel from LLM.""" self.logger.info(f"[{round_num}] Generating optimized kernel...") + self._last_generation_failure_reason = None try: messages = [{"role": "user", "content": opt_prompt}] response_text = self.verification_worker._call_llm( @@ -869,11 +876,26 @@ def _generate_optimized_kernel(self, opt_prompt: str, round_num: int) -> str | N ) if not optimized_kernel or len(optimized_kernel) < 100: + self._last_generation_failure_reason = ( + "Failed to extract valid kernel code from model response" + ) self.logger.warning( f"[{round_num}] Failed to extract valid kernel code" ) return None + malformed_reason = self.verification_worker._validate_kernel_candidate( + optimized_kernel + ) + if malformed_reason: + self._last_generation_failure_reason = ( + f"Malformed LLM kernel response: {malformed_reason}" + ) + self.logger.warning( + f"[{round_num}] {self._last_generation_failure_reason}" + ) + return None + return optimized_kernel except Exception as e: @@ -1072,7 +1094,8 @@ def _update_kernels( self.logger.info(f"[{round_num}] 📊 SOL: {new_sol:.1f}%") updated_runtime_kernel = optimized_kernel updated_runtime_time = new_time - updated_runtime_sol = new_sol # This kernel's SOL (consistent!) + if new_sol > 0: + updated_runtime_sol = new_sol # This kernel's SOL (consistent!) # Check for SOL improvement (independent of runtime) if new_sol > best_sol_sol: @@ -1138,6 +1161,7 @@ def _finalize_results( best_round: int = 0, early_stop_reason: str = "", any_verified: bool = False, + best_runtime_ptx_hash: str | None = None, ) -> tuple[bool, str, dict[str, Any]]: """Finalize and log optimization results. @@ -1187,6 +1211,7 @@ def _finalize_results( "baseline_time_ms": baseline_results["time_ms"], "best_time_ms": best_runtime_time, "best_runtime_sol_pct": best_runtime_sol, + "best_ptx_hash": best_runtime_ptx_hash, "speedup": baseline_speedup, "rounds": rounds, } diff --git a/triton_kernel_agent/opt_worker_component/profiling/kernel_profiler.py b/triton_kernel_agent/opt_worker_component/profiling/kernel_profiler.py index 6f3f6ffb..c0e3240f 100644 --- a/triton_kernel_agent/opt_worker_component/profiling/kernel_profiler.py +++ b/triton_kernel_agent/opt_worker_component/profiling/kernel_profiler.py @@ -37,9 +37,13 @@ # Default timeout for NCU profiling in seconds DEFAULT_NCU_TIMEOUT_SECONDS = 300 -# Default timeout for waiting on profiling semaphore (15 minutes) -# If exceeded, profiling is skipped for this round to prevent deadlocks -DEFAULT_SEMAPHORE_TIMEOUT_SECONDS = 900 +# Default timeout for waiting on profiling semaphore (60 seconds). +# If exceeded, profiling is skipped for this round and the caller falls +# back to per-worker NCU. The lower bound matters because mp.Lock can +# leak when a holder process dies with the lock held — a stuck lock +# would otherwise hang every subsequent acquire for 15 min each before +# this code unblocks. +DEFAULT_SEMAPHORE_TIMEOUT_SECONDS = 60 @dataclass diff --git a/triton_kernel_agent/opt_worker_component/searching/history/json_db.py b/triton_kernel_agent/opt_worker_component/searching/history/json_db.py index 652b5291..35c6cae8 100644 --- a/triton_kernel_agent/opt_worker_component/searching/history/json_db.py +++ b/triton_kernel_agent/opt_worker_component/searching/history/json_db.py @@ -139,6 +139,12 @@ def _entry_to_dict(self, entry: ProgramEntry) -> dict[str, Any]: "problem_id": entry.problem_id, "parent_id": entry.parent_id, "generation": entry.generation, + "ptx_hash": entry.ptx_hash, + "technique_vector": ( + list(entry.technique_vector) + if entry.technique_vector is not None + else None + ), "created_at": entry.created_at.isoformat(), } @@ -155,6 +161,11 @@ def _dict_to_entry(self, d: dict[str, Any]) -> ProgramEntry: else: created_at = datetime.now() + raw_vec = d.get("technique_vector") + technique_vector = ( + tuple(int(x) for x in raw_vec) if raw_vec is not None else None + ) + return ProgramEntry( program_id=d["program_id"], kernel_code=d["kernel_code"], @@ -162,5 +173,7 @@ def _dict_to_entry(self, d: dict[str, Any]) -> ProgramEntry: problem_id=d["problem_id"], parent_id=d.get("parent_id"), generation=d.get("generation", 0), + ptx_hash=d.get("ptx_hash"), + technique_vector=technique_vector, created_at=created_at, ) diff --git a/triton_kernel_agent/opt_worker_component/searching/history/models.py b/triton_kernel_agent/opt_worker_component/searching/history/models.py index 96e2d189..eb3e150e 100644 --- a/triton_kernel_agent/opt_worker_component/searching/history/models.py +++ b/triton_kernel_agent/opt_worker_component/searching/history/models.py @@ -42,6 +42,16 @@ class ProgramEntry: parent_id: str | None = None generation: int = 0 + # Normalized-PTX fingerprint used for dedup at beam selection time. + # ``None`` when PTX capture failed — such entries are treated as + # singletons (never merged with anything else) by the dedup step. + ptx_hash: str | None = None + + # Optional technique-vector classification (binary vector, one bit + # per entry of the configured technique taxonomy). ``None`` when + # clustering is disabled or classification failed. + technique_vector: tuple[int, ...] | None = None + # Timestamps created_at: datetime = field(default_factory=datetime.now) diff --git a/triton_kernel_agent/opt_worker_component/searching/ptx_fingerprint.py b/triton_kernel_agent/opt_worker_component/searching/ptx_fingerprint.py new file mode 100644 index 00000000..5da6bc6a --- /dev/null +++ b/triton_kernel_agent/opt_worker_component/searching/ptx_fingerprint.py @@ -0,0 +1,171 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 + +"""PTX-based kernel fingerprinting for dedup. + +Two Triton kernels that lower to identical (normalized) PTX are doing the +same work on the GPU — a strong equivalence relation for "is this the same +kernel?" that is invariant to variable renaming, comments, whitespace, and +most source-level cosmetic changes. + +Usage: + fingerprint = ptx_hash_from_cache(Path(triton_cache_dir)) + if fingerprint is None: + # PTX capture failed — treat as a singleton for dedup purposes. + ... + +Caveats: +- Raw PTX is not fully canonical; this module applies a normalization pass + (strip comments, debug, headers; canonicalize register/label names) before + hashing. Normalization is best-effort: the failure mode is a false + negative (two true duplicates hashed differently), which only reduces the + dedup rate — it does not cause incorrect merges. +- A kernel module may define multiple ``@triton.jit`` functions. This + module hashes the sorted concatenation of all normalized PTX strings + under the cache dir, so the fingerprint reflects the whole module. +""" + +from __future__ import annotations + +import hashlib +import re +from pathlib import Path + +# --- Normalization regex patterns ------------------------------------------- + +# Strip ``//`` line comments and ``/* */`` block comments. +_RE_LINE_COMMENT = re.compile(r"//[^\n]*") +_RE_BLOCK_COMMENT = re.compile(r"/\*.*?\*/", re.DOTALL) + +# PTX directives we want to drop entirely (debug + version/target headers). +# These vary with compile-time context but not with kernel behavior. +_DROP_DIRECTIVE_PREFIXES = ( + ".version", + ".target", + ".address_size", + ".loc", + ".file", + ".section .debug", +) + +# PTX register classes we canonicalize. Each class gets its own dense +# numbering starting from 0, assigned in first-occurrence order. +# %r — 32-bit unsigned +# %rd — 64-bit unsigned +# %rs — 16-bit unsigned +# %f — 32-bit float +# %fd — 64-bit float +# %p — predicate +_REGISTER_CLASSES = ("rd", "rs", "fd", "r", "f", "p") +# Regex fragment matching any register of any class, with the class captured. +# Order matters: longer prefixes ("rd", "rs", "fd") before shorter ones so we +# do not partial-match %rd42 as %r+"d42". +_RE_REGISTER = re.compile(r"%(rd|rs|fd|r|f|p)(\d+)\b") + +# PTX labels look like ``$L__BB0_3`` or ``$Ltmp1``; normalize by interning. +_RE_LABEL = re.compile(r"\$[A-Za-z_][A-Za-z0-9_]*") + +# Collapse runs of whitespace (including at start of line) to a single space; +# drop pure-blank lines. +_RE_WS = re.compile(r"[ \t]+") + + +def normalize_ptx(ptx: str) -> str: + """Return a canonical form of *ptx* suitable for hashing. + + Stripped: comments, debug directives, version/target headers. + Canonicalized: register names and labels (renamed to dense sequences). + Normalized: whitespace. + + The output is deterministic given the same input and (importantly) + identical for PTX strings that differ only in those cosmetic axes. + """ + text = _RE_BLOCK_COMMENT.sub("", ptx) + text = _RE_LINE_COMMENT.sub("", text) + + kept_lines: list[str] = [] + for raw in text.splitlines(): + stripped = raw.strip() + if not stripped: + continue + if any(stripped.startswith(p) for p in _DROP_DIRECTIVE_PREFIXES): + continue + kept_lines.append(stripped) + body = "\n".join(kept_lines) + + # Canonicalize registers. We do a single pass building a {old: new} + # mapping per class; substitution is applied once using re.sub. + reg_maps: dict[str, dict[str, int]] = {cls: {} for cls in _REGISTER_CLASSES} + + def _register_sub(m: re.Match[str]) -> str: + cls, num = m.group(1), m.group(2) + mapping = reg_maps[cls] + if num not in mapping: + mapping[num] = len(mapping) + return f"%{cls}{mapping[num]}" + + body = _RE_REGISTER.sub(_register_sub, body) + + # Canonicalize labels. Same first-occurrence renaming scheme. + label_map: dict[str, int] = {} + + def _label_sub(m: re.Match[str]) -> str: + name = m.group(0) + if name not in label_map: + label_map[name] = len(label_map) + return f"$L{label_map[name]}" + + body = _RE_LABEL.sub(_label_sub, body) + + # Final whitespace collapse. + body = _RE_WS.sub(" ", body) + + return body + + +def _find_ptx_files(cache_dir: Path) -> list[Path]: + """Return every ``*.ptx`` file under *cache_dir* (recursive).""" + if not cache_dir.exists(): + return [] + return sorted(cache_dir.rglob("*.ptx")) + + +def ptx_hash_from_cache(cache_dir: Path) -> str | None: + """Compute a stable hash of all PTX files under *cache_dir*. + + Returns ``None`` if no PTX files are present (e.g. compilation failed + or the cache dir was not populated). A ``None`` result tells callers + to treat the kernel as a dedup singleton. + + The hash covers the normalized PTX of every compiled Triton function + in the module, sorted by relative path for determinism across runs. + """ + ptx_files = _find_ptx_files(cache_dir) + if not ptx_files: + return None + + hasher = hashlib.sha256() + # Sort by relative path so the fingerprint is stable regardless of + # filesystem iteration order. + rel_sorted = sorted((p.relative_to(cache_dir).as_posix(), p) for p in ptx_files) + for rel, path in rel_sorted: + try: + normalized = normalize_ptx(path.read_text(errors="replace")) + except OSError: + continue + # Include the relative filename in the hash so two kernels with the + # same PTX content under different function names still differ. + # Strip the leading hash-bucket directory (Triton caches files in + # content-addressed subdirs, which we do not want in the fingerprint). + basename = Path(rel).name + hasher.update(basename.encode("utf-8")) + hasher.update(b"\0") + hasher.update(normalized.encode("utf-8")) + hasher.update(b"\0") + + return hasher.hexdigest() diff --git a/triton_kernel_agent/opt_worker_component/searching/strategy/beam_search.py b/triton_kernel_agent/opt_worker_component/searching/strategy/beam_search.py index 984736ef..aa30e637 100644 --- a/triton_kernel_agent/opt_worker_component/searching/strategy/beam_search.py +++ b/triton_kernel_agent/opt_worker_component/searching/strategy/beam_search.py @@ -14,54 +14,115 @@ """Beam search optimization strategy. -Maintains top-N kernels and explores M bottlenecks per kernel each round. -Total workers = N × M. +Maintains a candidate pool of top-N kernels and explores M bottlenecks +per kernel each round, optionally fanned out across K distinct LLMs and +C independent samples per prompt. Every member of the pool is expanded +each round; total workers = candidate_pool_size × M × K × C. """ import logging -from typing import Any +from typing import Any, Sequence from ..history.models import ProgramEntry, ProgramMetrics from ..history.store import ProgramDatabase +from ..technique_vector import ( + TechniqueDefinition, + classify_many, + select_diverse_top_k, +) from .strategy import SearchStrategy class BeamSearchStrategy(SearchStrategy): """Beam search strategy for kernel optimization. - This strategy maintains a beam of top-performing kernels and explores - multiple bottleneck directions for each. It mirrors the original - beam search behavior from optimization_manager.py. + This strategy maintains a candidate pool of top-performing kernels + and expands every member of the pool each round. Expansion can fan + out across several dimensions for diversity: - Workers = num_top_kernels × num_bottlenecks + - ``candidate_pool_size`` (N): pool size carried round-to-round; every + member is expanded. + - ``num_bottlenecks`` (M): bottleneck directions per parent. + - ``models`` (K): LLM providers to fan across; each generates its own + bottleneck analysis and rewrite. + - ``samples_per_prompt`` (C): independent LLM draws per (parent, + bottleneck, model) triple, to harvest sampling-level diversity. + + Workers per round = candidate_pool_size × M × K × C. + + After workers return, candidates are deduplicated by PTX fingerprint + (same normalized compiled PTX ⇒ same kernel) before being ranked and + truncated to ``candidate_pool_size``. """ def __init__( self, - num_top_kernels: int = 2, + candidate_pool_size: int = 2, num_bottlenecks: int = 2, database: ProgramDatabase | None = None, logger: logging.Logger | None = None, + models: list[str] | None = None, + samples_per_prompt: int = 1, + techniques: Sequence[TechniqueDefinition] | None = None, + technique_classifier_provider: Any | None = None, + technique_classifier_model: str | None = None, + technique_classifier_concurrency: int = 4, ): """Initialize beam search strategy. Args: - num_top_kernels: Number of top kernels to maintain in beam + candidate_pool_size: Number of kernels carried in the pool + round-to-round. Every member of the pool is expanded + each round. num_bottlenecks: Number of bottleneck directions to explore per kernel database: Optional program database for persistence logger: Optional logger + models: Optional list of LLM model names. When provided, every + (kernel, bottleneck) pair is expanded once per model. + samples_per_prompt: Number of independent LLM samples to draw + per (parent, bottleneck, model) triple. Values >1 rely on + the LLM being non-deterministic (temperature >0) to yield + distinct candidates. Default 1 preserves prior behavior. """ self.logger = logger or logging.getLogger(self.__class__.__name__) self.problem_id: str | None = None - self.num_top_kernels = num_top_kernels + self.candidate_pool_size = max(1, int(candidate_pool_size)) self.num_bottlenecks = num_bottlenecks self.database = database self.top_kernels: list[ProgramEntry] = [] + self.models = models + self.samples_per_prompt = max(1, samples_per_prompt) + # Internal iteration list: [None] means "use runner default". + self._expansion_models: list[str | None] = list(models) if models else [None] + + # Technique-vector clustering (opt-in): only active when both + # ``techniques`` and a classifier provider/model are supplied. + # When inactive, the strategy falls back to the previous + # PTX-dedup-then-sort-and-truncate behavior. + self._techniques: tuple[TechniqueDefinition, ...] = ( + tuple(techniques) if techniques else () + ) + self._classifier_provider = technique_classifier_provider + self._classifier_model = technique_classifier_model + self._classifier_concurrency = max(1, int(technique_classifier_concurrency)) + + @property + def technique_clustering_enabled(self) -> bool: + return bool( + self._techniques + and self._classifier_provider is not None + and self._classifier_model + ) @property def num_workers_needed(self) -> int: - """Number of workers = top_kernels × bottlenecks.""" - return self.num_top_kernels * self.num_bottlenecks + """Workers per round = pool × bottlenecks × models × samples.""" + return ( + self.candidate_pool_size + * self.num_bottlenecks + * len(self._expansion_models) + * self.samples_per_prompt + ) def initialize(self, initial_program: ProgramEntry) -> None: """Initialize with starting program. @@ -71,16 +132,22 @@ def initialize(self, initial_program: ProgramEntry) -> None: """ self.problem_id = initial_program.problem_id # Start with N copies of initial (will be deduplicated on first update) - self.top_kernels = [initial_program] * self.num_top_kernels + self.top_kernels = [initial_program] * self.candidate_pool_size + models_str = ( + ", ".join(str(m) for m in self.models) if self.models else "" + ) self.logger.info( - f"BeamSearch initialized: {self.num_top_kernels} kernels × " - f"{self.num_bottlenecks} bottlenecks = {self.num_workers_needed} workers" + f"BeamSearch initialized: pool={self.candidate_pool_size} × " + f"{self.num_bottlenecks} bottlenecks × {len(self._expansion_models)} " + f"models [{models_str}] × {self.samples_per_prompt} samples " + f"= {self.num_workers_needed} workers" ) def select_candidates(self, round_num: int) -> list[dict[str, Any]]: """Select candidates for this round. - Creates one candidate for each (kernel, bottleneck) pair. + Creates one candidate for each (parent, bottleneck, model, sample) + tuple. Every member of ``self.top_kernels`` is expanded. Args: round_num: Current round number @@ -88,16 +155,20 @@ def select_candidates(self, round_num: int) -> list[dict[str, Any]]: Returns: List of candidate specs for workers """ - candidates = [] + candidates: list[dict[str, Any]] = [] for rank, kernel in enumerate(self.top_kernels): for bottleneck_id in range(1, self.num_bottlenecks + 1): - candidates.append( - { - "parent": kernel, - "bottleneck_id": bottleneck_id, - "kernel_rank": rank, - } - ) + for model in self._expansion_models: + for sample_idx in range(self.samples_per_prompt): + candidates.append( + { + "parent": kernel, + "bottleneck_id": bottleneck_id, + "kernel_rank": rank, + "openai_model": model, + "sample_idx": sample_idx, + } + ) return candidates def update_with_results( @@ -112,38 +183,152 @@ def update_with_results( results: Worker results round_num: Current round number """ + # Build a (kernel_code → ptx_hash) lookup from the current beam. + # If a worker returned an *unchanged* parent (i.e. its LLM did not + # improve the kernel), the worker reports ptx_hash=None even though + # the parent's hash is already known to the strategy. Falling back + # to that hash lets PTX dedup correctly merge such results with + # the existing parent entry. + ptx_by_kernel_code: dict[str, str] = { + p.kernel_code: p.ptx_hash for p in self.top_kernels if p.ptx_hash + } + # Add successful results - new_entries = [] + new_entries: list[ProgramEntry] = [] + entry_models: dict[str, str | None] = {} for result in results: if result.get("success"): + program_id = f"r{round_num}_w{result['worker_id']}" + ptx_hash = result.get("ptx_hash") or ptx_by_kernel_code.get( + result.get("kernel_code", "") + ) entry = ProgramEntry( - program_id=f"r{round_num}_w{result['worker_id']}", + program_id=program_id, kernel_code=result["kernel_code"], metrics=ProgramMetrics(time_ms=result["time_ms"]), problem_id=self.problem_id, parent_id=result.get("parent_id"), generation=round_num, + ptx_hash=ptx_hash, ) new_entries.append(entry) + entry_models[program_id] = result.get("openai_model") if self.database: self.database.add_program(entry) - # Update top-k (combine, sort, truncate) + # Combine old beam + new results into the candidate pool. all_candidates = self.top_kernels + new_entries - all_candidates.sort(key=lambda x: x.metrics.time_ms) - self.top_kernels = all_candidates[: self.num_top_kernels] + + # Dedup by PTX fingerprint: entries with the same normalized PTX are + # the same kernel at the compiler level, so keep only the fastest + # per fingerprint. Entries with ``ptx_hash is None`` (capture + # failed) are treated as singletons so we never accidentally merge + # them together. + dedup_before = len(all_candidates) + pooled = self._dedup_by_ptx(all_candidates) + dedup_after = len(pooled) + + # Optional technique-vector clustering: classify newly-arrived + # survivors via LLM, then pick top-N with diversity preserved + # across distinct vectors. When disabled, fall back to the + # plain sort+truncate path. + cluster_log = "" + if self.technique_clustering_enabled: + self._classify_pooled(pooled) + self.top_kernels = select_diverse_top_k(pooled, self.candidate_pool_size) + distinct_clusters = len( + { + tuple(e.technique_vector) + if e.technique_vector is not None + else ("__none__", e.program_id) + for e in pooled + } + ) + cluster_log = f", clusters={distinct_clusters}" + else: + pooled.sort(key=lambda x: x.metrics.time_ms) + self.top_kernels = pooled[: self.candidate_pool_size] if self.database: self.database.save() # Log update if new_entries: - best_new = min(e.metrics.time_ms for e in new_entries) + best_new_entry = min(new_entries, key=lambda e: e.metrics.time_ms) + best_new_model = entry_models.get(best_new_entry.program_id) + model_tag = f" [{best_new_model}]" if best_new_model else "" self.logger.info( f"Round {round_num}: {len(new_entries)} successful, " - f"best new: {best_new:.4f}ms" + f"PTX dedup {dedup_before}→{dedup_after}{cluster_log}, " + f"best new: {best_new_entry.metrics.time_ms:.4f}ms{model_tag}" ) + def _classify_pooled(self, pooled: list[ProgramEntry]) -> None: + """Fill in missing / stale ``technique_vector`` on each entry. + + Entries whose vector is already present and matches the current + taxonomy length are left alone (cached from prior rounds or + prior runs via the JSON database). Everything else gets a + fresh LLM classification, fanned out via a thread pool. + """ + if not self.technique_clustering_enabled: + return + expected_dim = len(self._techniques) + # Identify entries that need (re)classification. + to_classify: list[tuple[str, str]] = [] # (program_id, kernel_code) + index: dict[str, ProgramEntry] = {} + for entry in pooled: + v = entry.technique_vector + if v is None or len(v) != expected_dim: + # Stale-length vectors (taxonomy changed) get reclassified. + if v is not None and len(v) != expected_dim: + entry.technique_vector = None + to_classify.append((entry.program_id, entry.kernel_code)) + index[entry.program_id] = entry + if not to_classify: + return + + self.logger.info( + f"Technique clustering: classifying {len(to_classify)} kernel(s) " + f"with {self._classifier_model} (taxonomy dim={expected_dim})" + ) + results = classify_many( + to_classify, + self._techniques, + self._classifier_provider, + self._classifier_model, + logger=self.logger, + max_concurrency=self._classifier_concurrency, + ) + for pid, vec in results.items(): + entry = index.get(pid) + if entry is None: + continue + entry.technique_vector = vec # may be None if classification failed + if self.database: + # Re-add to persist the updated vector. + self.database.add_program(entry) + + @staticmethod + def _dedup_by_ptx(entries: list[ProgramEntry]) -> list[ProgramEntry]: + """Collapse entries with identical PTX fingerprints to the fastest. + + Entries whose ``ptx_hash`` is ``None`` are kept as-is (treated as + singletons) so we never merge two kernels whose PTX we couldn't + capture. + """ + best_by_hash: dict[str, ProgramEntry] = {} + singletons: list[ProgramEntry] = [] + for e in entries: + h = e.ptx_hash + if h is None: + singletons.append(e) + continue + incumbent = best_by_hash.get(h) + if incumbent is None or e.metrics.time_ms < incumbent.metrics.time_ms: + best_by_hash[h] = e + return list(best_by_hash.values()) + singletons + def get_best_program(self) -> ProgramEntry | None: """Get the best performing kernel in the beam.""" if not self.top_kernels: diff --git a/triton_kernel_agent/opt_worker_component/searching/technique_vector.py b/triton_kernel_agent/opt_worker_component/searching/technique_vector.py new file mode 100644 index 00000000..763f77a3 --- /dev/null +++ b/triton_kernel_agent/opt_worker_component/searching/technique_vector.py @@ -0,0 +1,314 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 + +"""Technique-vector clustering for beam-search dedup. + +Companion to ``ptx_fingerprint.py``. Where PTX-hash dedup answers +"are two kernels byte-identical at the compiler level?", this module +answers "do two kernels apply the same set of optimization techniques?", +as judged by an LLM, and uses that signal to keep beam diversity even +after PTX-dedup has fired. + +The technique taxonomy is loaded from a YAML file at run start and is +expandable: appending a new entry to the YAML widens the vector +dimension by one. Persisted vectors with mismatched length are +discarded and re-classified on the next round. + +Public API: +- ``load_techniques(path)`` — read taxonomy from YAML. +- ``classify_kernel(kernel_code, techniques, provider, model, ...)`` — + one LLM call, returns a tuple of 0/1 ints of length ``len(techniques)``, + or ``None`` if the call fails. +- ``classify_many(...)`` — thread-pooled fan-out for a batch of kernels. +- ``select_diverse_top_k(entries, k)`` — diversity-aware truncation that + keeps the fastest representative of each technique cluster. +""" + +from __future__ import annotations + +import json +import logging +import re +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Iterable, Sequence + +from .history.models import ProgramEntry + + +# --- Taxonomy ---------------------------------------------------------------- + + +@dataclass(frozen=True) +class TechniqueDefinition: + """One bit of the binary technique vector. + + The ``name`` is a stable identifier persisted with vectors. The + ``llm_hint`` is the prompt fragment that tells the classifier model + what to look for; it should be a short, concrete description that + distinguishes this technique from others on the list. + """ + + name: str + description: str + llm_hint: str + + +def load_techniques(path: Path | str) -> list[TechniqueDefinition]: + """Read the technique taxonomy from a YAML file. + + The YAML must have a top-level ``techniques`` key whose value is a + list of ``{name, description, llm_hint}`` entries. Order of the + list is the bit ordering of the vector — appending is safe; + reordering invalidates persisted vectors. + """ + import yaml + + text = Path(path).read_text() + raw = yaml.safe_load(text) + if not isinstance(raw, dict) or "techniques" not in raw: + raise ValueError( + f"Techniques YAML at {path} must have a top-level 'techniques' key" + ) + out: list[TechniqueDefinition] = [] + seen: set[str] = set() + for i, entry in enumerate(raw["techniques"]): + name = entry.get("name") + if not name: + raise ValueError(f"Technique entry #{i} in {path} is missing 'name'") + if name in seen: + raise ValueError(f"Duplicate technique name {name!r} in {path}") + seen.add(name) + out.append( + TechniqueDefinition( + name=name, + description=str(entry.get("description", "")).strip(), + llm_hint=str(entry.get("llm_hint", "")).strip(), + ) + ) + return out + + +# --- Prompt + parser -------------------------------------------------------- + + +_PROMPT_HEADER = """\ +You are classifying a Triton GPU kernel against a fixed list of optimization +techniques. For each technique, answer 0 (not used) or 1 (used) based on +the kernel source below. Be conservative — only mark a bit as 1 if the +technique is unambiguously present in the source. + +Return your answer as a JSON object with one key per technique, in the +exact same order the techniques are listed. Do not include any +commentary, prose, or markdown fencing. Example shape: + + {"split_k_reduce": 0, "tensor_cores": 1, ...} +""" + + +def _build_prompt(kernel_code: str, techniques: Sequence[TechniqueDefinition]) -> str: + lines = [_PROMPT_HEADER, "", "Techniques:"] + for i, t in enumerate(techniques): + lines.append(f"{i + 1}. {t.name}: {t.description}") + if t.llm_hint: + for hint_line in t.llm_hint.splitlines(): + lines.append(f" {hint_line.strip()}") + lines.append("") + lines.append("Kernel source:") + lines.append("```python") + lines.append(kernel_code) + lines.append("```") + return "\n".join(lines) + + +_JSON_BLOCK_RE = re.compile(r"\{.*\}", re.DOTALL) + + +def _parse_response( + text: str, techniques: Sequence[TechniqueDefinition] +) -> tuple[int, ...] | None: + """Extract a length-N binary vector from the LLM response. + + Tolerates surrounding prose and common markdown fences. Returns + ``None`` if the response can't be parsed into the expected shape. + """ + match = _JSON_BLOCK_RE.search(text) + if not match: + return None + try: + obj = json.loads(match.group(0)) + except (json.JSONDecodeError, ValueError): + return None + if not isinstance(obj, dict): + return None + bits: list[int] = [] + for t in techniques: + v = obj.get(t.name) + if v is None: + return None + try: + bit = int(v) + except (TypeError, ValueError): + return None + if bit not in (0, 1): + return None + bits.append(bit) + return tuple(bits) + + +# --- Classification --------------------------------------------------------- + + +def classify_kernel( + kernel_code: str, + techniques: Sequence[TechniqueDefinition], + provider: Any, + model: str, + logger: logging.Logger | None = None, + max_tokens: int = 1024, +) -> tuple[int, ...] | None: + """One LLM call → binary technique vector for a single kernel. + + Returns ``None`` on any failure (LLM error, parse error, dimension + mismatch). Callers should treat ``None`` as "unclassified" — these + kernels are kept as singleton clusters by ``select_diverse_top_k``. + """ + log = logger or logging.getLogger(__name__) + try: + from triton_kernel_agent.worker_util import _call_llm + except Exception as e: # pragma: no cover — only when the import path moves + log.warning(f"technique_vector: cannot import _call_llm ({e})") + return None + + prompt = _build_prompt(kernel_code, techniques) + try: + response = _call_llm( + provider=provider, + model=model, + messages=[{"role": "user", "content": prompt}], + logger=log, + max_tokens=max_tokens, + ) + except Exception as e: + log.warning(f"technique_vector: LLM call failed ({e})") + return None + + vec = _parse_response(response, techniques) + if vec is None: + log.warning( + f"technique_vector: failed to parse response (first 200 chars: " + f"{response[:200]!r})" + ) + return vec + + +def classify_many( + items: Iterable[tuple[str, str]], + techniques: Sequence[TechniqueDefinition], + provider: Any, + model: str, + logger: logging.Logger | None = None, + max_concurrency: int = 4, +) -> dict[str, tuple[int, ...] | None]: + """Classify a batch of (program_id, kernel_code) pairs in parallel. + + Returns ``{program_id: vector_or_None}``. Uses a small thread pool + so the manager's wall-clock isn't dominated by sequential LLM + latency. + """ + log = logger or logging.getLogger(__name__) + items_list = list(items) + out: dict[str, tuple[int, ...] | None] = {} + if not items_list: + return out + + workers = max(1, min(max_concurrency, len(items_list))) + with ThreadPoolExecutor(max_workers=workers) as ex: + futures = { + ex.submit( + classify_kernel, + kernel_code, + techniques, + provider, + model, + log, + ): pid + for pid, kernel_code in items_list + } + for fut in as_completed(futures): + pid = futures[fut] + try: + out[pid] = fut.result() + except Exception as e: + log.warning(f"technique_vector: classify failed for {pid}: {e}") + out[pid] = None + return out + + +# --- Diversity-aware selection --------------------------------------------- + + +def select_diverse_top_k(entries: Sequence[ProgramEntry], k: int) -> list[ProgramEntry]: + """Pick top-k from ``entries`` while preserving cluster diversity. + + Round-robin by depth across clusters: pass 0 takes the fastest + member of every cluster (ordered by time across clusters); if that + still hasn't filled the beam, pass 1 takes the second-fastest of + every cluster (again ordered by time across clusters); and so on + until ``k`` members are accumulated or the pool is exhausted. + + The output is *not* sorted by time — it is ordered by selection + order — so that consumers slicing ``top_kernels[:N]`` get the + *most diverse* prefix rather than the fastest-by-time prefix + from one big cluster. + + Entries with ``technique_vector is None`` are treated as their own + singleton clusters (never merged with anything else). + """ + if k <= 0 or not entries: + return [] + + # Group by cluster and sort each cluster ascending by time. + clusters: dict[Any, list[ProgramEntry]] = {} + none_seq = 0 + for e in entries: + v = e.technique_vector + if v is None: + none_seq += 1 + cid: Any = ("__none__", none_seq) + else: + cid = tuple(v) + clusters.setdefault(cid, []).append(e) + for members in clusters.values(): + members.sort(key=lambda e: e.metrics.time_ms) + + if not clusters: + return [] + + # Walk depth-by-depth: at each depth, pull every cluster's depth-th + # member, sort across clusters by time, accept until we hit k. + accepted: list[ProgramEntry] = [] + accepted_ids: set[str] = set() + max_depth = max(len(m) for m in clusters.values()) + for depth in range(max_depth): + if len(accepted) >= k: + break + slate = [ + members[depth] for members in clusters.values() if depth < len(members) + ] + slate.sort(key=lambda e: e.metrics.time_ms) + for entry in slate: + if len(accepted) >= k: + break + if entry.program_id in accepted_ids: + continue + accepted.append(entry) + accepted_ids.add(entry.program_id) + + return accepted diff --git a/triton_kernel_agent/platform/nvidia.py b/triton_kernel_agent/platform/nvidia.py index 0c41955f..c94d3d7c 100644 --- a/triton_kernel_agent/platform/nvidia.py +++ b/triton_kernel_agent/platform/nvidia.py @@ -192,6 +192,9 @@ def __init__( high_reasoning_effort: bool, bottleneck_override: str | None, worker_kwargs: dict[str, Any], + gpu_ids: list[int] | None = None, + gpu_locks: dict[int, Any] | None = None, + workers_per_gpu: int = 2, ) -> None: self.log_dir = log_dir self.logger = logger @@ -201,6 +204,22 @@ def __init__( self.high_reasoning_effort = high_reasoning_effort self.bottleneck_override = bottleneck_override self.worker_kwargs = worker_kwargs + # Multi-GPU pool: workers round-robin across these GPUs and each + # uses its assigned GPU's lock for both benchmark and NCU. Falls + # back to legacy single-GPU behavior on GPU 0 when not provided. + self.gpu_ids: list[int] = list(gpu_ids) if gpu_ids else [0] + self.gpu_locks: dict[int, Any] = ( + dict(gpu_locks) if gpu_locks else {0: benchmark_lock} + ) + # Dynamic scheduler bound: at most ``workers_per_gpu`` worker + # processes pinned to any single GPU at one moment. The total + # active pool is ``workers_per_gpu * len(gpu_ids)``. When a + # worker exits its GPU slot is freed and the next pending + # candidate is spawned with that GPU id — so workers that + # finish their LLM phase quickly free up GPU capacity for + # workers still pending, and slow LLM calls don't block + # otherwise-idle GPUs. + self.workers_per_gpu: int = max(1, int(workers_per_gpu)) def run_workers( self, @@ -212,15 +231,76 @@ def run_workers( shared_history: list[dict], shared_reflexions: list[dict], ) -> list[dict[str, Any]]: + """Dynamic spawn-on-free-GPU-slot scheduler. + + Up to ``workers_per_gpu`` workers run concurrently per GPU, for a + total active pool of ``workers_per_gpu * len(gpu_ids)``. When a + worker exits, its GPU slot frees and the next pending candidate + is spawned pinned to that same GPU. This keeps GPUs saturated + even when individual workers spend most of their time in LLM + calls — quick-finishing GPUs immediately get new work, instead + of sitting idle while their statically-assigned share of + candidates trickles through the LLM phase. + """ + import queue as _queue_mod + result_queue = mp.Queue() - workers = [] - for i, candidate in enumerate(candidates): - workdir = self.log_dir / "workers" / f"w{i}" / f"r{round_num}" - workdir.mkdir(parents=True, exist_ok=True) + # Per-GPU free-slot counter. Spawn-when-positive, decrement on + # spawn, increment on worker exit. + free_slots: dict[int, int] = {g: self.workers_per_gpu for g in self.gpu_ids} + pool_capacity = self.workers_per_gpu * len(self.gpu_ids) + + # Pending queue (FIFO by candidate index — preserves the + # strategy's intended ordering of fanout). + pending: list[tuple[int, dict[str, Any]]] = list(enumerate(candidates)) + pending_idx = 0 # next index to spawn + + # Currently-running: (Process, gpu_id, worker_id). + running: list[tuple[mp.Process, int, int]] = [] + + worker_timeout = 1800 # 30 minutes wall-clock cap + deadline = time.time() + worker_timeout + results: list[dict[str, Any]] = [] + + self.logger.info( + f"Round {round_num}: scheduling {len(candidates)} candidates " + f"across {len(self.gpu_ids)} GPU(s) × {self.workers_per_gpu} " + f"slots/GPU = pool capacity {pool_capacity}" + ) + def _pick_free_gpu() -> int | None: + """Return GPU with the most free slots, or None if pool full.""" + best: int | None = None + best_free = 0 + for g in self.gpu_ids: + f = free_slots[g] + if f > best_free: + best, best_free = g, f + return best + + def _drain_queue() -> None: + while True: + try: + results.append(result_queue.get_nowait()) + except _queue_mod.Empty: + break + except Exception: + break + + def _spawn( + worker_id: int, candidate: dict[str, Any], gpu_id: int + ) -> mp.Process: + workdir = self.log_dir / "workers" / f"w{worker_id}" / f"r{round_num}" + workdir.mkdir(parents=True, exist_ok=True) + worker_model = candidate.get("openai_model") or self.openai_model + baseline_metrics = candidate.get("baseline_metrics") + precomputed_bottleneck_results = candidate.get( + "precomputed_bottleneck_results" + ) + gpu_lock = self.gpu_locks[gpu_id] args = ( - i, # worker_id + worker_id, candidate["parent"].kernel_code, candidate["parent"].metrics.time_ms, candidate["parent"].program_id, @@ -229,47 +309,73 @@ def run_workers( workdir, workdir / "logs", result_queue, - self.benchmark_lock, - self.profiling_semaphore, + gpu_lock, + gpu_lock, pytorch_baseline, candidate["bottleneck_id"], - self.openai_model, + worker_model, self.high_reasoning_effort, self.bottleneck_override, self.worker_kwargs, shared_history, shared_reflexions, + baseline_metrics, + precomputed_bottleneck_results, + gpu_id, ) - p = mp.Process(target=_nvidia_worker_process, args=args) p.start() - workers.append(p) - - # Wait for completion with timeout - worker_timeout = 1800 # 30 minutes - deadline = time.time() + worker_timeout - for w in workers: - remaining = max(0, deadline - time.time()) - w.join(timeout=remaining) - if w.is_alive(): - self.logger.warning(f"Worker {w.pid} timed out, terminating") - w.terminate() - w.join(timeout=5) + return p + + # Main scheduling loop. + while (pending_idx < len(pending) or running) and time.time() < deadline: + # 1. Top up pool: spawn pending candidates onto free GPU slots. + while pending_idx < len(pending): + gpu_id = _pick_free_gpu() + if gpu_id is None: + break + worker_id, candidate = pending[pending_idx] + p = _spawn(worker_id, candidate, gpu_id) + free_slots[gpu_id] -= 1 + running.append((p, gpu_id, worker_id)) + pending_idx += 1 + + # 2. Drain the result queue (prevents pipe-buffer deadlock). + _drain_queue() + + # 3. Reap finished workers; free their GPU slots. + still_running: list[tuple[mp.Process, int, int]] = [] + for w, gpu_id, wid in running: + w.join(timeout=0.5) if w.is_alive(): - self.logger.warning(f"Worker {w.pid} still alive, killing") - w.kill() - w.join(timeout=2) + still_running.append((w, gpu_id, wid)) + else: + free_slots[gpu_id] += 1 + w.close() + running = still_running + + # Past deadline: terminate any stragglers. + for w, gpu_id, wid in running: + self.logger.warning( + f"Worker {wid} (pid {w.pid}, gpu {gpu_id}) timed out, terminating" + ) + w.terminate() + w.join(timeout=5) + if w.is_alive(): + self.logger.warning(f"Worker {wid} still alive, killing") + w.kill() + w.join(timeout=2) w.close() - # Collect results - results: list[dict[str, Any]] = [] - while not result_queue.empty(): + # Final drain after every worker is gone. + while True: try: results.append(result_queue.get_nowait()) + except _queue_mod.Empty: + break except Exception: break - # Clean up queue resources to prevent thread hangs during GC result_queue.close() result_queue.join_thread() @@ -307,12 +413,28 @@ def _nvidia_worker_process( worker_kwargs: dict, prior_history: list[dict], prior_reflexions: list[dict], + baseline_metrics: dict[str, Any] | None, + precomputed_bottleneck_results: list[dict[str, Any]] | None, + gpu_id: int, ) -> None: """Worker process function for NVIDIA GPUs. Runs in a separate process to optimise a single kernel variant using NCU profiling and CUDA benchmarking. """ + import os + + # Pin this worker process to a single GPU before any torch import or + # GPU-touching subprocess. Both the benchmark subprocess and NCU + # subprocess inherit the env, so they automatically run on this GPU. + os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id) + # Print to harness log immediately so multi-GPU pinning is verifiable. + print( + f"[worker {worker_id}] pinned to GPU {gpu_id} " + f"(CUDA_VISIBLE_DEVICES={os.environ['CUDA_VISIBLE_DEVICES']})", + flush=True, + ) + import sys kernel_agent_path = Path(__file__).parent.parent.parent @@ -349,6 +471,8 @@ def _nvidia_worker_process( test_code=test_code, known_kernel_time=known_time, max_opt_rounds=1, + baseline_metrics=baseline_metrics, + precomputed_bottleneck_results=precomputed_bottleneck_results, ) attempt_data = metrics.get("last_attempt") @@ -361,6 +485,8 @@ def _nvidia_worker_process( "kernel_code": best_kernel, "time_ms": metrics.get("best_time_ms", float("inf")), "parent_id": parent_id, + "openai_model": openai_model, + "ptx_hash": metrics.get("best_ptx_hash"), "attempt": attempt_data, "reflexion": reflexion_data, } @@ -371,6 +497,7 @@ def _nvidia_worker_process( { "success": False, "worker_id": worker_id, + "openai_model": openai_model, "error": str(e), "traceback": traceback.format_exc(), } @@ -501,11 +628,13 @@ def __init__( openai_model: str = "gpt-5", gpu_name: str | None = None, roofline_config: Any | None = None, + num_bottlenecks: int = 1, ) -> None: self._logger = logger or logging.getLogger(__name__) self._log_dir = Path(log_dir) if log_dir else None self._openai_model = openai_model self._gpu_name = gpu_name + self._num_bottlenecks = max(1, int(num_bottlenecks)) self._delegate: Any | None = None # Orchestrator accesses ``bottleneck_analyzer.roofline`` directly. self.roofline = NvidiaRooflineAnalyzer( @@ -533,6 +662,7 @@ def _get_delegate(self) -> Any: gpu_specs=gpu_specs, logs_dir=self._log_dir, logger=self._logger, + num_bottlenecks=self._num_bottlenecks, ) return self._delegate diff --git a/triton_kernel_agent/worker.py b/triton_kernel_agent/worker.py index ea5d982f..e594d74c 100644 --- a/triton_kernel_agent/worker.py +++ b/triton_kernel_agent/worker.py @@ -14,6 +14,7 @@ """Verification Worker for testing and refining individual kernels.""" +import ast import json import logging import multiprocessing as mp @@ -261,6 +262,27 @@ def _extract_code_from_response( self.logger.warning("No code block found in LLM response") return None + def _validate_kernel_candidate(self, kernel_code: str | None) -> str | None: + """Return a reason when extracted kernel code is structurally malformed.""" + if not kernel_code or not kernel_code.strip(): + return "no Python kernel code was extracted from the model response" + + try: + module = ast.parse(kernel_code) + except SyntaxError as exc: + line = f"line {exc.lineno}" if exc.lineno else "unknown line" + return f"invalid Python syntax ({line}: {exc.msg})" + + has_kernel_function = any( + isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)) + and node.name == "kernel_function" + for node in module.body + ) + if not has_kernel_function: + return "missing required top-level kernel_function definition" + + return None + def _write_kernel(self, kernel_code: str): """Write only the kernel code to file.""" self.kernel_file.write_text(kernel_code) @@ -413,6 +435,12 @@ def _refine_kernel( response_text, prefer_kernel_function=getattr(self, "_has_multiple_tests", False), ) + malformed_reason = self._validate_kernel_candidate(refined_kernel) + + if malformed_reason: + raise ValueError( + f"Malformed LLM kernel response: {malformed_reason}" + ) if refined_kernel: self.logger.info( @@ -425,6 +453,8 @@ def _refine_kernel( return kernel_code except Exception as e: + if "Malformed LLM kernel response" in str(e): + raise self.logger.error(f"Error refining kernel with LLM API: {e}") # Fall back to mock refinement @@ -482,6 +512,17 @@ def run( self._has_multiple_tests = len(test_code) > 1 current_kernel = kernel_code + malformed_reason = self._validate_kernel_candidate(current_kernel) + if malformed_reason: + error_feedback = f"Malformed LLM kernel response: {malformed_reason}" + self.logger.warning(f"❌ {error_feedback}") + return { + "worker_id": self.worker_id, + "success": False, + "rounds": 0, + "error": error_feedback, + "history": list(self.history), + } for round_num in range(self.max_rounds): # Check if another worker has succeeded @@ -516,12 +557,38 @@ def run( "stderr": violation, "history": list(self.history), } - current_kernel = self._refine_kernel( - current_kernel, - error_info, - problem_description, - format_test_code_for_llm(test_code), - ) + try: + current_kernel = self._refine_kernel( + current_kernel, + error_info, + problem_description, + format_test_code_for_llm(test_code), + ) + except ValueError as exc: + if "Malformed LLM kernel response" not in str(exc): + raise + error_feedback = str(exc) + self.logger.warning(f"❌ {error_feedback}") + return { + "worker_id": self.worker_id, + "success": False, + "rounds": round_num + 1, + "error": error_feedback, + "history": list(self.history), + } + malformed_reason = self._validate_kernel_candidate(current_kernel) + if malformed_reason: + error_feedback = ( + f"Malformed LLM kernel response: {malformed_reason}" + ) + self.logger.warning(f"❌ {error_feedback}") + return { + "worker_id": self.worker_id, + "success": False, + "rounds": round_num + 1, + "error": error_feedback, + "history": list(self.history), + } continue # Log round @@ -546,12 +613,36 @@ def run( "history": list(self.history), } - current_kernel = self._refine_kernel( - current_kernel, - error_info, - problem_description, - format_test_code_for_llm(test_code), - ) + try: + current_kernel = self._refine_kernel( + current_kernel, + error_info, + problem_description, + format_test_code_for_llm(test_code), + ) + except ValueError as exc: + if "Malformed LLM kernel response" not in str(exc): + raise + error_feedback = str(exc) + self.logger.warning(f"❌ {error_feedback}") + return { + "worker_id": self.worker_id, + "success": False, + "rounds": round_num + 1, + "error": error_feedback, + "history": list(self.history), + } + malformed_reason = self._validate_kernel_candidate(current_kernel) + if malformed_reason: + error_feedback = f"Malformed LLM kernel response: {malformed_reason}" + self.logger.warning(f"❌ {error_feedback}") + return { + "worker_id": self.worker_id, + "success": False, + "rounds": round_num + 1, + "error": error_feedback, + "history": list(self.history), + } # Max rounds reached without success self.logger.warning(f"Max rounds ({self.max_rounds}) reached without success") @@ -619,6 +710,12 @@ def verify_with_refinement( current_kernel = kernel_code self._has_multiple_tests = len(test_code) > 1 + malformed_reason = self._validate_kernel_candidate(current_kernel) + if malformed_reason: + error_feedback = f"Malformed LLM kernel response: {malformed_reason}" + self.logger.warning(f"❌ {error_feedback}") + return False, current_kernel, error_feedback + # Write files for testing (primary + additional tests) self._write_files(current_kernel, test_code) @@ -653,12 +750,24 @@ def verify_with_refinement( } # Refine kernel - refined_kernel = self._refine_kernel( - current_kernel, - error_info, - problem_description, - format_test_code_for_llm(test_code), - ) + try: + refined_kernel = self._refine_kernel( + current_kernel, + error_info, + problem_description, + format_test_code_for_llm(test_code), + ) + except ValueError as exc: + if "Malformed LLM kernel response" not in str(exc): + raise + error_feedback = str(exc) + self.logger.warning(f"❌ {error_feedback}") + return False, current_kernel, error_feedback + malformed_reason = self._validate_kernel_candidate(refined_kernel) + if malformed_reason: + error_feedback = f"Malformed LLM kernel response: {malformed_reason}" + self.logger.warning(f"❌ {error_feedback}") + return False, current_kernel, error_feedback # Write and test refined kernel self._write_kernel(refined_kernel)