From c384e8526cb895fed843543893c2c264c6b62991 Mon Sep 17 00:00:00 2001 From: "John C. Burnham" Date: Mon, 8 Jun 2026 04:11:13 -0400 Subject: [PATCH 1/3] Sharding: out-of-circuit kernel profiler + balanced env partitioner Add the measure -> partition -> manifest pipeline for splitting a compiled `.ixe` environment into balanced, low-overhead shards, so large environments (mathlib is ~631k blocks) can be proven in zero-knowledge as independent conditional proofs rather than one infeasible monolith. Two new CLI verbs; the kernel runs once to profile, then sharding is cheap to re-tune for any shard count: ix profile -> env.ixesp (per-block heartbeats + delta graph) ix shard -> env.ixes (N-shard manifest + what-if metrics) Phase A -- out-of-circuit kernel profiler (gated; zero overhead when off): - Record, per checked constant, its heartbeats (recursive fuel) and the set of constants whose definition bodies it delta-unfolds. The *delta-unfold* graph, not the static reference graph, is the real cross-shard cost: a shard must ingress the body of any foreign block its members unfold. - KEnv gains an optional ProfileSink (per-worker accumulator). Capture sites are the delta-unfold commit points in whnf.rs and per-constant fuel accounting in tc.rs; begin_const markers in check.rs/inductive.rs attribute work to the right constant. Per-constant cache isolation makes recording sound (no unfolds skipped by cross-constant memo) and faithful to in-circuit, un-memoized cost. - src/ix/profile.rs: the `.ixesp` block-profile model (heartbeats + serialized size + CSR delta graph), the ProfileSink accumulator, and explicit little-endian (de)serialization (no serde dependency). Phase B -- partitioner (src/ix/shard.rs, pure): - Weighted hypergraph partitioning under the connectivity-1 (km1) metric: vertices = blocks weighted by heartbeats (balance), nets = blocks weighted by serialized size (ingress cost); objective = total cross-shard ingress bytes. - Recursive bisection via greedy graph-growing + Fiduccia-Mattheyses refinement, with cut-net splitting on recursion and a hub-net cap. Leaf-count allocation distributes the N-shard budget by heartbeat mass clamped to [1, side size], so any N works (not only powers of two) and every shard is non-empty when #blocks >= N. A balance-weight cap (heartbeats capped at total/N) keeps splits even while accepting the unavoidable imbalance from atomic mutual blocks. - Parallelized with rayon across independent subtrees (deterministic: identical result to serial), with live progress output so a slow run is never mistaken for a stuck one. Phase C -- manifest (src/ix/shard.rs): - `.ixes` per-shard manifest: member blocks, heartbeat/own-size sums, foreign (delta-dependency) block sets, delta-based assumption-tree roots (merkle_root_canonical), and a what-if summary (balance, total cross-shard ingress, atomic-block floor). FFI + CLI: - src/ffi/kernel.rs: profile_anon_ixe (runs the anon kernel over a `.ixe` with recording, maps constants to home blocks, writes `.ixesp`) and shard_esp (partition + manifest); rs_kernel_profile_anon / rs_shard_esp FFI. - Ix/Cli/{ProfileCmd,ShardCmd}.lean, Ix/KernelCheck.lean externs, Main wiring. No external graph-library dependency; the partitioner is self-contained. Kernel changes are gated behind KEnv::profile_sink, so production checking is unaffected. Validated end-to-end on initstd/lean/mathlib (64/128/256 shards, 0 empty shards). Follow-up: multilevel coarsening to make mathlib-scale partitions run in seconds and recover cut quality. --- Ix/Cli/ProfileCmd.lean | 67 +++ Ix/Cli/ShardCmd.lean | 62 ++ Ix/KernelCheck.lean | 24 + Main.lean | 4 + src/ffi/kernel.rs | 270 +++++++++ src/ix.rs | 2 + src/ix/kernel/check.rs | 52 ++ src/ix/kernel/env.rs | 32 ++ src/ix/kernel/inductive.rs | 5 + src/ix/kernel/tc.rs | 46 ++ src/ix/kernel/whnf.rs | 2 + src/ix/profile.rs | 485 ++++++++++++++++ src/ix/shard.rs | 1119 ++++++++++++++++++++++++++++++++++++ 13 files changed, 2170 insertions(+) create mode 100644 Ix/Cli/ProfileCmd.lean create mode 100644 Ix/Cli/ShardCmd.lean create mode 100644 src/ix/profile.rs create mode 100644 src/ix/shard.rs diff --git a/Ix/Cli/ProfileCmd.lean b/Ix/Cli/ProfileCmd.lean new file mode 100644 index 00000000..28584673 --- /dev/null +++ b/Ix/Cli/ProfileCmd.lean @@ -0,0 +1,67 @@ +/- + `ix profile `: run the Ix kernel out of circuit over a serialized + `.ixe` environment, recording per-block heartbeats and the delta-unfold graph + into a `.ixesp` sidecar. This is the cost model consumed by `ix shard` + (see `plans/sharding.md`). + + Recording defaults to *cache-isolated* mode: the kernel clears its + cross-constant reduction-memo caches between constants so every delta-unfold + re-executes (sound recording) and recorded heartbeats reflect the in-circuit + (un-memoized) cost. `--keep-caches` trades fidelity for speed. +-/ +module +public import Cli +public import Ix.Common +public import Ix.KernelCheck +public import Std.Internal.UV.System + +public section + +open Ix.KernelCheck + +namespace Ix.Cli.ProfileCmd + +def runProfileCmd (p : Cli.Parsed) : IO UInt32 := do + let some pathArg := p.positionalArg? "path" + | p.printError "error: must specify to a .ixe file" + return 1 + let envPath := pathArg.as! String + let outPath : String := + match p.flag? "out" with + | some flag => flag.as! String + | none => envPath ++ ".ixesp" + let isolate := !(p.flag? "keep-caches" |>.isSome) + let quiet := !(p.flag? "verbose" |>.isSome) + + if let some flag := p.flag? "workers" then + let n := flag.as! Nat + if n == 0 then + p.printError "error: --workers must be > 0" + return 1 + Std.Internal.UV.System.osSetenv "IX_KERNEL_CHECK_WORKERS" (toString n) + + IO.println s!"Profiling {envPath} → {outPath} (isolate={isolate})" + let start ← IO.monoMsNow + rsProfileAnonFFI envPath outPath isolate quiet + let elapsed := (← IO.monoMsNow) - start + IO.println s!"[profile] wrote {outPath} in {elapsed.formatMs}" + return 0 + +end Ix.Cli.ProfileCmd + +open Ix.Cli.ProfileCmd in +def profileCmd : Cli.Cmd := `[Cli| + "profile" VIA runProfileCmd; + "Profile a `.ixe` out of circuit → `.ixesp` (sharding cost + delta graph)" + + FLAGS: + out : String; "Output .ixesp path (default: .ixesp)" + "keep-caches"; "Keep cross-constant caches: faster, lower-fidelity, may under-record" + workers : Nat; "Parallel kernel workers (default: available_parallelism). Plumbs IX_KERNEL_CHECK_WORKERS." + verbose; "Log every constant (default: quiet)" + + ARGS: + path : String; "Path to a serialized .ixe environment" +] + +end diff --git a/Ix/Cli/ShardCmd.lean b/Ix/Cli/ShardCmd.lean new file mode 100644 index 00000000..12ce04ae --- /dev/null +++ b/Ix/Cli/ShardCmd.lean @@ -0,0 +1,62 @@ +/- + `ix shard --shards N`: partition a profiled environment into `N` + shards via recursive balanced min-cut bisection, minimizing cross-shard + delta-unfold ingress (see `plans/sharding.md`). + + Reads the `.ixesp` produced by `ix profile` (pure offline graph work, so `N` + is cheap to re-tune without re-running the kernel). Writes a `.ixes` manifest + and prints a what-if report (per-shard heartbeat balance + total cross-shard + ingress). The partitioner is self-contained — no external graph-library + dependency. +-/ +module +public import Cli +public import Ix.KernelCheck + +public section + +open Ix.KernelCheck + +namespace Ix.Cli.ShardCmd + +def runShardCmd (p : Cli.Parsed) : IO UInt32 := do + let some pathArg := p.positionalArg? "path" + | p.printError "error: must specify to a .ixesp file" + return 1 + let espPath := pathArg.as! String + let numShards : Nat := + match p.flag? "shards" with + | some flag => flag.as! Nat + | none => 1 + let balancePct : Nat := + match p.flag? "balance" with + | some flag => flag.as! Nat + | none => 5 + let outPath : String := + match p.flag? "out" with + | some flag => flag.as! String + | none => espPath ++ ".ixes" + + IO.println s!"Sharding {espPath} into {numShards} shards (balance ±{balancePct}%)" + rsShardEspFFI espPath (toString numShards) (toString balancePct) outPath + if !outPath.isEmpty then + IO.println s!"[shard] wrote {outPath}" + return 0 + +end Ix.Cli.ShardCmd + +open Ix.Cli.ShardCmd in +def shardCmd : Cli.Cmd := `[Cli| + "shard" VIA runShardCmd; + "Partition a `.ixesp` into N balanced shards minimizing cross-shard delta" + + FLAGS: + shards : Nat; "Number of shards N (default 1 = a single shard)" + balance : Nat; "Per-bisection balance tolerance, percent (default 5)" + out : String; "Output .ixes manifest path (default: .ixes)" + + ARGS: + path : String; "Path to a .ixesp produced by `ix profile`" +] + +end diff --git a/Ix/KernelCheck.lean b/Ix/KernelCheck.lean index b5cf3a05..ee0c9ca5 100644 --- a/Ix/KernelCheck.lean +++ b/Ix/KernelCheck.lean @@ -142,6 +142,30 @@ opaque rsCheckAnonFFI : @& String → -- fail-out path ("" = none) IO (Array (String × Option CheckError)) +/-- FFI: profile a `.ixe` out of circuit, writing a `.ixesp` sidecar with + per-block heartbeats + the delta-unfold graph (the sharding cost model, + see `plans/sharding.md`). Runs the anon kernel over every checkable target. + `isolate` clears the kernel's reduction-memo caches between constants for + sound/faithful recording; `quiet` suppresses per-constant progress. -/ +@[extern "rs_kernel_profile_anon"] +opaque rsProfileAnonFFI : + @& String → -- .ixe path + @& String → -- .ixesp output path + @& Bool → -- isolate caches + @& Bool → -- quiet + IO Unit + +/-- FFI: partition a `.ixesp` into `numShards` shards, writing a `.ixes` + manifest. `numShards` and `balancePct` are decimal strings (kept ABI-simple). + Empty `outPath` skips the manifest. Prints a what-if report to stderr. -/ +@[extern "rs_shard_esp"] +opaque rsShardEspFFI : + @& String → -- .ixesp path + @& String → -- num_shards (N) + @& String → -- balance percent + @& String → -- .ixes output path ("" = skip) + IO Unit + end Ix.KernelCheck end diff --git a/Main.lean b/Main.lean index 75ed3623..e72ae1e8 100644 --- a/Main.lean +++ b/Main.lean @@ -5,7 +5,9 @@ import Ix.Cli.CheckRsCmd import Ix.Cli.ClaimCmd import Ix.Cli.CompileCmd import Ix.Cli.IngressCmd +import Ix.Cli.ProfileCmd import Ix.Cli.ProveCmd +import Ix.Cli.ShardCmd import Ix.Cli.TreeCmd import Ix.Cli.ValidateCmd import Ix.Cli.VerifyCmd @@ -27,7 +29,9 @@ def ixCmd : Cli.Cmd := `[Cli| checkRsCmd; claimCmd; treeCmd; + profileCmd; proveCmd; + shardCmd; verifyCmd; addrOfCmd; ingressCmd; diff --git a/src/ffi/kernel.rs b/src/ffi/kernel.rs index 6d8bb3a2..e3dcbeb6 100644 --- a/src/ffi/kernel.rs +++ b/src/ffi/kernel.rs @@ -81,6 +81,7 @@ use crate::ix::kernel::ingress::{ use crate::ix::kernel::ingress::{ixon_ingress, lean_ingress}; use crate::ix::kernel::mode::{Anon, CheckDupLevelParams, KernelMode, Meta}; use crate::ix::kernel::tc::TypeChecker; +use crate::ix::profile::{BlockProfile, ProfileBuilder, ProfileSink}; unsafe extern "C" { fn lean_name_mk_string( @@ -1682,6 +1683,275 @@ pub extern "C" fn rs_kernel_check_anon( build_anon_result_array(&addrs_for_return, &results) } +// =========================================================================== +// Sharding profiler: run the anon kernel out of circuit over a `.ixe`, +// recording per-block heartbeats + the delta-unfold graph into a `.ixesp`. +// See `plans/sharding.md`. +// =========================================================================== + +/// Summary returned by [`profile_anon_ixe`]. +pub struct ProfileStats { + pub targets: usize, + pub passed: usize, + pub failed: usize, + pub blocks: usize, + pub edges: usize, + pub bytes: usize, +} + +/// Map a constant address to its *block* (ingress unit): the `block` address of +/// a projection constant, otherwise the address itself. +fn profile_block_of(env: &IxonEnv, addr: &Address) -> Address { + match env.get_const(addr) { + Some(c) => match &c.info { + IxonCI::IPrj(p) => p.block.clone(), + IxonCI::CPrj(p) => p.block.clone(), + IxonCI::RPrj(p) => p.block.clone(), + IxonCI::DPrj(p) => p.block.clone(), + _ => addr.clone(), + }, + None => addr.clone(), + } +} + +/// Serialized byte length of a block constant (the ingress-cost / net weight). +fn profile_block_size(env: &IxonEnv, block: &Address) -> u32 { + env + .get_const_bytes(block) + .map(|b| b.len().min(u32::MAX as usize) as u32) + .unwrap_or(0) +} + +/// Aggregate per-constant records into a block-level [`BlockProfile`]: map each +/// consumer/producer constant to its home block, attach serialized sizes, and +/// accumulate heartbeats + delta edges. +fn build_block_profile(env: &IxonEnv, merged: &ProfileSink) -> BlockProfile { + let mut builder = ProfileBuilder::new(); + // addr -> (home block, block serialized size), memoized. + let mut cache: FxHashMap = FxHashMap::default(); + let mut resolve = |a: &Address| -> (Address, u32) { + if let Some(v) = cache.get(a) { + return v.clone(); + } + let block = profile_block_of(env, a); + let size = profile_block_size(env, &block); + let v = (block, size); + cache.insert(a.clone(), v.clone()); + v + }; + for (consumer, rec) in &merged.records { + let (cblock, csize) = resolve(consumer); + builder.block(cblock.clone(), rec.fuel, csize, 1); + for prod in &rec.producers { + let (pblock, psize) = resolve(prod); + builder.block(pblock.clone(), 0, psize, 0); + builder.delta_edge(cblock.clone(), pblock); + } + } + builder.finish() +} + +/// Run the anon kernel over `work`, with per-worker profile recording, and +/// return `(passed, failed, merged_sink)`. +#[allow(clippy::needless_pass_by_value)] +fn run_anon_profile_parallel( + env: Arc, + work: Vec, + isolate: bool, + quiet: bool, +) -> Result<(usize, usize, ProfileSink), String> { + let work_total = work.len(); + let worker_count = resolve_kernel_check_workers(work_total, quiet); + eprintln!( + "[rs_kernel_profile] profiling {work_total} work item(s) with {worker_count} worker(s), isolate={isolate}..." + ); + let work = Arc::new(work); + let next_index = Arc::new(AtomicUsize::new(0)); + let passed = Arc::new(AtomicUsize::new(0)); + let failed = Arc::new(AtomicUsize::new(0)); + let sinks: Arc>> = Arc::new(Mutex::new(Vec::new())); + + let mut handles: Vec> = + Vec::with_capacity(worker_count); + for worker_idx in 0..worker_count { + let env = Arc::clone(&env); + let work = Arc::clone(&work); + let next_index = Arc::clone(&next_index); + let passed = Arc::clone(&passed); + let failed = Arc::clone(&failed); + let sinks = Arc::clone(&sinks); + let handle = thread::Builder::new() + .name(format!("ix-kernel-profile-{worker_idx}")) + .stack_size(KERNEL_CHECK_STACK_SIZE) + .spawn(move || { + let mut kenv = KEnv::::new(); + kenv.profile_sink = Some(ProfileSink::new(isolate)); + let clear_every = kernel_check_clear_every(); + let mut checks_since_clear = clear_every; + loop { + let work_idx = next_index.fetch_add(1, Ordering::Relaxed); + if work_idx >= work_total { + break; + } + // `clear_releasing_memory` preserves `profile_sink`, so recording + // accumulates across scheduled-block boundaries. + if checks_since_clear >= clear_every { + kenv.clear_releasing_memory(); + checks_since_clear = 0; + } + let primary_addr = match &work[work_idx] { + AnonWorkItem::Standalone { addr, .. } => addr.clone(), + AnonWorkItem::Block { primary_addr, .. } => primary_addr.clone(), + }; + let kid = KId::::new(primary_addr, ()); + let res = { + let mut tc = + TypeChecker::::new_with_lazy_anon(&mut kenv, &env); + let r = tc.check_const(&kid); + // The TypeChecker is recreated per work item, so the final + // constant's record would never be flushed by a trailing reset — + // flush it explicitly. + tc.finish_constant_accounting(); + r + }; + if res.is_ok() { + passed.fetch_add(1, Ordering::Relaxed); + } else { + failed.fetch_add(1, Ordering::Relaxed); + } + checks_since_clear += 1; + } + if let Some(sink) = kenv.profile_sink.take() { + sinks.lock().unwrap().push(sink); + } + }) + .map_err(|e| format!("spawn profile worker: {e}"))?; + handles.push(handle); + } + + let mut panicked = false; + for h in handles { + if h.join().is_err() { + panicked = true; + } + } + if panicked { + return Err("profile worker panicked".to_string()); + } + + let mut merged = ProfileSink::new(isolate); + let collected = Arc::try_unwrap(sinks) + .map_err(|_| "profile sinks still shared".to_string())? + .into_inner() + .map_err(|_| "profile sinks mutex poisoned".to_string())?; + for s in collected { + merged.merge(s); + } + Ok((passed.load(Ordering::Relaxed), failed.load(Ordering::Relaxed), merged)) +} + +/// Profile a `.ixe` out of circuit and write the `.ixesp` sidecar. Pure-Rust +/// entry point (used by the FFI wrapper and Rust tests). +pub fn profile_anon_ixe( + path: &str, + out: &str, + isolate: bool, + quiet: bool, +) -> Result { + let load_start = Instant::now(); + let ixon_env = IxonEnv::get_anon_mmap(std::path::Path::new(path)) + .map_err(|e| format!("profile: mmap+deserialize {path}: {e}"))?; + eprintln!( + "[rs_kernel_profile] loaded {} consts in {:.1?}", + ixon_env.const_count(), + load_start.elapsed() + ); + let (work, addrs) = build_anon_work(&ixon_env)?; + let targets = addrs.len(); + let env_arc = Arc::new(ixon_env); + let run_start = Instant::now(); + let (passed, failed, merged) = + run_anon_profile_parallel(Arc::clone(&env_arc), work, isolate, quiet)?; + eprintln!( + "[rs_kernel_profile] checked {passed}/{} ({} failed) in {:.1?}", + passed + failed, + failed, + run_start.elapsed() + ); + let profile = build_block_profile(&env_arc, &merged); + let bytes = profile.to_bytes(); + std::fs::write(out, &bytes).map_err(|e| format!("write {out}: {e}"))?; + eprintln!( + "[rs_kernel_profile] wrote {out}: {} blocks, {} delta edges, {} bytes", + profile.num_blocks(), + profile.num_edges(), + bytes.len() + ); + Ok(ProfileStats { + targets, + passed, + failed, + blocks: profile.num_blocks(), + edges: profile.num_edges(), + bytes: bytes.len(), + }) +} + +/// FFI: profile a `.ixe` out of circuit and write a `.ixesp` sidecar. +#[unsafe(no_mangle)] +pub extern "C" fn rs_kernel_profile_anon( + env_path: LeanString>, + out_path: LeanString>, + isolate: LeanBool>, + quiet: LeanBool>, +) -> LeanIOResult { + match profile_anon_ixe( + &env_path.to_string(), + &out_path.to_string(), + isolate.to_bool(), + quiet.to_bool(), + ) { + Ok(s) => { + eprintln!( + "[rs_kernel_profile] done: {} targets, {} blocks, {} edges, {} failed", + s.targets, s.blocks, s.edges, s.failed + ); + LeanIOResult::ok(LeanOwned::box_usize(0)) + }, + Err(e) => { + LeanIOResult::error_string(&format!("rs_kernel_profile_anon: {e}")) + }, + } +} + +/// FFI: partition a `.ixesp` into `num_shards` shards and write a `.ixes` +/// manifest. Prints a what-if report to stderr. +#[unsafe(no_mangle)] +pub extern "C" fn rs_shard_esp( + esp_path: LeanString>, + num_shards: LeanString>, + balance_pct: LeanString>, + out_path: LeanString>, +) -> LeanIOResult { + let num_shards = num_shards.to_string().parse::().unwrap_or(1); + let balance_pct = balance_pct.to_string().parse::().unwrap_or(5); + let out = out_path.to_string(); + let out_opt = if out.is_empty() { None } else { Some(out.as_str()) }; + let balance = (balance_pct as f64) / 100.0; + match crate::ix::shard::shard_esp( + &esp_path.to_string(), + num_shards, + balance, + out_opt, + ) { + Ok(report) => { + eprintln!("[rs_shard]\n{report}"); + LeanIOResult::ok(LeanOwned::box_usize(0)) + }, + Err(e) => LeanIOResult::error_string(&format!("rs_shard_esp: {e}")), + } +} + #[cfg(test)] mod tests { use super::{compact_in_flight_label, resolve_kernel_check_workers_from}; diff --git a/src/ix.rs b/src/ix.rs index af5d3329..a1fd600b 100644 --- a/src/ix.rs +++ b/src/ix.rs @@ -15,5 +15,7 @@ pub mod ground; pub mod ixon; pub mod kernel; pub mod mutual; +pub mod profile; +pub mod shard; pub mod store; pub mod strong_ordering; diff --git a/src/ix/kernel/check.rs b/src/ix/kernel/check.rs index 4d3dad1c..0b0f9764 100644 --- a/src/ix/kernel/check.rs +++ b/src/ix/kernel/check.rs @@ -87,6 +87,7 @@ impl TypeChecker<'_, M> { M::MField>: CheckDupLevelParams, { self.reset(); + self.begin_const(id); let c = self.get_const(id)?; self.check_const_member(id, &c) @@ -870,6 +871,57 @@ mod tests { use crate::ix::env::{DefinitionSafety, ReducibilityHints}; use crate::ix::ixon::constant::DefKind; + #[test] + fn profile_sink_records_delta_edge_and_fuel() { + use crate::ix::kernel::mode::Meta; + use crate::ix::kernel::testing as t; + use crate::ix::profile::ProfileSink; + + // g : Sort 2 := Sort 1 — a Definition, delta-reducible to Sort 1. + let (g_id, g) = t::mk_defn( + "g", + 0, + vec![], + t::sort(t::usucc(t::usucc(t::uzero()))), + t::sort1(), + ReducibilityHints::Regular(5), + ); + // f : g := Sort 0 — checking f must whnf (delta-unfold) g → Sort 1 to match + // infer(Sort 0) = Sort 1, so the recorder must capture the edge f→g. + let (f_id, f) = t::mk_defn( + "f", + 0, + vec![], + t::cnst("g", &[]), + t::sort0(), + ReducibilityHints::Regular(5), + ); + + let mut env = KEnv::::new(); + env.insert(g_id.clone(), g); + env.insert(f_id.clone(), f); + env.profile_sink = Some(ProfileSink::new(true)); + + { + let mut tc = TypeChecker::new(&mut env); + tc.check_const(&g_id).unwrap(); + tc.check_const(&f_id).unwrap(); + tc.finish_constant_accounting(); // flush the last constant's record + } + + let sink = env.profile_sink.as_ref().unwrap(); + let f_rec = sink.records.get(&f_id.addr).expect("f should be recorded"); + assert!( + f_rec.producers.contains(&g_id.addr), + "checking f must record a delta-unfold of g" + ); + assert!(f_rec.fuel > 0, "checking f consumes heartbeats"); + // g unfolds nothing of its own. + if let Some(g_rec) = sink.records.get(&g_id.addr) { + assert!(!g_rec.producers.contains(&g_id.addr)); + } + } + type AE = KExpr; type AU = KUniv; diff --git a/src/ix/kernel/env.rs b/src/ix/kernel/env.rs index 0383ab3f..1fdda019 100644 --- a/src/ix/kernel/env.rs +++ b/src/ix/kernel/env.rs @@ -352,6 +352,13 @@ pub struct KEnv { /// `IX_PERF_COUNTERS=1`. When the env var is unset the counters are /// no-ops; when set, the totals are dumped from the `Drop` impl below. pub perf: PerfCounters, + + /// Out-of-circuit profile recorder for sharding (see `plans/sharding.md`). + /// `Some` enables per-constant heartbeat + delta-unfold recording on this + /// worker; `None` (the default) has zero overhead. Deliberately preserved + /// across `clear`/`clear_releasing_memory` so recording survives scheduled + /// block boundaries within a run. + pub profile_sink: Option, } impl Default for KEnv { @@ -407,6 +414,7 @@ impl KEnv { block_check_results: FxHashMap::default(), next_fvar_id: 0, perf: PerfCounters::default(), + profile_sink: None, } } @@ -580,6 +588,30 @@ impl KEnv { self.block_check_results = FxHashMap::default(); self.next_fvar_id = 0; } + + /// Clear only the reduction-memo caches (whnf / infer / def-eq / unfold / + /// is-prop). Structural caches (`consts`, `blocks`, `intern`, recursor + /// caches, `block_check_results`) and the profile sink are preserved. + /// + /// Used by the profile recorder's per-constant isolation mode: clearing the + /// cross-constant memo between constants forces every delta-unfold to + /// re-execute (sound delta recording) and makes recorded heartbeats reflect + /// the in-circuit cost, which has no cross-constant memoization. Clearing a + /// pure memo never affects correctness — only performance. + pub fn clear_reduction_caches(&mut self) { + self.whnf_cache.clear(); + self.whnf_no_delta_cache.clear(); + self.whnf_no_delta_cheap_cache.clear(); + self.whnf_core_cache.clear(); + self.whnf_core_cheap_cache.clear(); + self.infer_cache.clear(); + self.infer_only_cache.clear(); + self.def_eq_cache.clear(); + self.def_eq_cheap_cache.clear(); + self.def_eq_failure.clear(); + self.unfold_cache.clear(); + self.is_prop_cache.clear(); + } } #[cfg(test)] diff --git a/src/ix/kernel/inductive.rs b/src/ix/kernel/inductive.rs index b1ee08f9..4cba8c1c 100644 --- a/src/ix/kernel/inductive.rs +++ b/src/ix/kernel/inductive.rs @@ -107,6 +107,7 @@ impl TypeChecker<'_, M> { for member in members { self.reset(); + self.begin_const(member); let c = self.get_const(member)?; self.validate_const_well_scoped(&c)?; match c { @@ -130,6 +131,7 @@ impl TypeChecker<'_, M> { for ind_id in &ind_ids { self.reset(); + self.begin_const(ind_id); self.check_inductive_member(ind_id)?; } for ctor_id in &ctor_ids { @@ -138,6 +140,7 @@ impl TypeChecker<'_, M> { _ => continue, }; self.reset(); + self.begin_const(ctor_id); self.check_ctor_against_inductive_member(ctor_id, &induct)?; } Ok(()) @@ -4040,6 +4043,7 @@ re-run with `IX_RECURSOR_DUMP={}` for the full breakdown.", ) -> Result<(), TcError> { for member in members { self.reset(); + self.begin_const(member); let c = self.get_const(member)?; self.validate_const_well_scoped(&c)?; match c { @@ -4057,6 +4061,7 @@ re-run with `IX_RECURSOR_DUMP={}` for the full breakdown.", for member in members { self.reset(); + self.begin_const(member); self.check_recursor_member(member)?; } Ok(()) diff --git a/src/ix/kernel/tc.rs b/src/ix/kernel/tc.rs index 3ab0bb12..3cccfa8d 100644 --- a/src/ix/kernel/tc.rs +++ b/src/ix/kernel/tc.rs @@ -160,6 +160,15 @@ pub struct TypeChecker<'a, M: KernelMode> { pub rec_fuel: u64, /// Optional diagnostic label for the current top-level constant. pub debug_label: Option, + + // -- Sharding profiler (see `plans/sharding.md`) -- + /// Address of the constant currently being checked, used to attribute + /// recorded heartbeats and delta-unfold edges. `None` unless a `profile_sink` + /// is installed on `env`; set by `begin_const` after each per-constant reset. + pub(crate) cur_const: Option
, + /// Addresses of constants whose bodies were delta-unfolded during the current + /// constant's check. Drained per constant by `record_current_fuel_used`. + pub(crate) delta_targets: FxHashSet
, /// Gated miss sampler for fuel-exhaustion diagnostics. Populated only when /// `IX_HOT_MISSES=1`, keyed by a compact phase/head/lbr shape. hot_misses: FxHashMap, @@ -208,6 +217,8 @@ impl<'a, M: KernelMode> TypeChecker<'a, M> { def_eq_peak: 0, rec_fuel: max_rec_fuel(), debug_label: None, + cur_const: None, + delta_targets: FxHashSet::default(), hot_misses: FxHashMap::default(), ctx_addr_cache: FxHashMap::default(), lctx: super::lctx::LocalContext::new(), @@ -821,6 +832,12 @@ impl<'a, M: KernelMode> TypeChecker<'a, M> { // Record fuel consumed by the *previous* constant check (if any) before // wiping it. `Drop` records the final check in a TypeChecker's lifetime. self.record_current_fuel_used(); + // Per-constant cache isolation for sound/faithful profile recording: clear + // the cross-constant reduction memo so the next constant re-executes every + // delta-unfold and its heartbeats reflect in-circuit (un-memoized) cost. + if self.env.profile_sink.as_ref().is_some_and(|s| s.isolate) { + self.env.clear_reduction_caches(); + } self.rec_fuel = max_rec_fuel(); self.hot_misses.clear(); // Reset the local context (it must always be empty between constants). @@ -885,6 +902,35 @@ impl<'a, M: KernelMode> TypeChecker<'a, M> { if used > 0 { self.env.perf.record_constant_fuel_used(used); } + // Sharding profiler: flush the just-finished constant's heartbeats and + // delta-unfold edges. `delta_targets` is always drained (even when + // `cur_const` is unset) so producers never leak into the next constant. + if self.env.profile_sink.is_some() { + let producers = std::mem::take(&mut self.delta_targets); + if let Some(addr) = self.cur_const.take() { + let sink = self.env.profile_sink.as_mut().unwrap(); + sink.record(addr, used, producers); + } + } + } + + /// Sharding profiler: mark `id` as the constant now being checked, so its + /// heartbeats and delta-unfold edges are attributed correctly. No-op unless a + /// `profile_sink` is installed. Called right after each per-constant `reset`. + #[inline] + pub(crate) fn begin_const(&mut self, id: &KId) { + if self.env.profile_sink.is_some() { + self.cur_const = Some(id.addr.clone()); + } + } + + /// Sharding profiler: record that the current constant delta-unfolds the body + /// of `id`. No-op unless a `profile_sink` is installed. + #[inline] + pub(crate) fn record_delta_target(&mut self, id: &KId) { + if self.env.profile_sink.is_some() { + self.delta_targets.insert(id.addr.clone()); + } } // ----------------------------------------------------------------------- diff --git a/src/ix/kernel/whnf.rs b/src/ix/kernel/whnf.rs index 03d53b08..16d74f4d 100644 --- a/src/ix/kernel/whnf.rs +++ b/src/ix/kernel/whnf.rs @@ -766,6 +766,7 @@ impl TypeChecker<'_, M> { && matches!(kind, DefKind::Definition | DefKind::Theorem) { self.dump_delta_trace(id, 0, e); + self.record_delta_target(id); let val = val.clone(); let us: Vec<_> = us.to_vec(); return Ok(Some(self.unfold_const_value(e, &val, &us)?)); @@ -792,6 +793,7 @@ impl TypeChecker<'_, M> { .. }) => { self.dump_delta_trace(id, args.len(), e); + self.record_delta_target(id); val.clone() }, _ => return Ok(None), diff --git a/src/ix/profile.rs b/src/ix/profile.rs new file mode 100644 index 00000000..fe8e0273 --- /dev/null +++ b/src/ix/profile.rs @@ -0,0 +1,485 @@ +//! Out-of-circuit kernel profile (`.ixesp`) — the cost + delta-graph hints +//! that drive the sharding strategy (see `plans/sharding.md`). +//! +//! A profile is computed by running the Rust kernel **out of circuit** over an +//! environment and recording, per *block*: +//! +//! - `heartbeats`: total recursive fuel consumed checking the block's members +//! (the balance metric for partitioning), +//! - `serialized_size`: the block's serialized byte length (the ingress-cost +//! metric — net weight in the partition hypergraph), +//! - `const_count`: number of constants in the block, +//! - the set of *other blocks whose definition bodies the block delta-unfolds* +//! (the delta edges — the cost graph the partitioner cuts on). +//! +//! A "block" is the ingress unit: a `Muts` mutual block or a standalone +//! constant. Projection constants (`*Prj { block, .. }`) are attributed to +//! their `block` address; everything else is its own block. +//! +//! The delta graph is stored in compressed-sparse-row (CSR) form keyed by +//! stable block ids (assigned by sorting block addresses), and the on-disk +//! `.ixesp` format is an explicit little-endian binary so it does not depend +//! on the optional `serde`/`bincode` feature. + +use rustc_hash::{FxHashMap, FxHashSet}; + +use crate::ix::address::Address; + +/// Magic bytes at the head of every `.ixesp` file. +const MAGIC: &[u8; 8] = b"IXESP\0\0\0"; +/// On-disk format version. Bump on any incompatible layout change. +const VERSION: u32 = 1; + +/// Per-block recorded statistics. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct BlockEntry { + /// Content address of the block (a `Muts` block or a standalone constant). + pub addr: Address, + /// Total recursive fuel (heartbeats) consumed checking this block's members. + pub heartbeats: u64, + /// Serialized byte length of the block (ingress cost / net weight). + pub serialized_size: u32, + /// Number of constants in the block (1 for standalone constants). + pub const_count: u32, +} + +/// A recorded kernel profile over an environment. +/// +/// Blocks are indexed by stable id `0..num_blocks`. The delta graph is stored +/// in CSR form: `producers(c)` are the block ids whose definition bodies block +/// `c` delta-unfolds (the "consumer → producer" direction). Self-edges are +/// dropped; producer lists are sorted and deduplicated. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct BlockProfile { + blocks: Vec, + /// CSR row offsets into `delta_col`, length `blocks.len() + 1`. + delta_row: Vec, + /// CSR column indices: producer block ids, grouped by consumer. + delta_col: Vec, +} + +impl BlockProfile { + /// Number of blocks (vertices). + pub fn num_blocks(&self) -> usize { + self.blocks.len() + } + + /// Number of delta edges (consumer → producer pairs). + pub fn num_edges(&self) -> usize { + self.delta_col.len() + } + + /// The block entries, indexed by block id. + pub fn blocks(&self) -> &[BlockEntry] { + &self.blocks + } + + /// The entry for block id `i`. + pub fn block(&self, i: u32) -> &BlockEntry { + &self.blocks[i as usize] + } + + /// Producer block ids unfolded by consumer block `c` (sorted, deduped, no + /// self-edges). + pub fn producers(&self, c: u32) -> &[u32] { + let lo = self.delta_row[c as usize]; + let hi = self.delta_row[c as usize + 1]; + &self.delta_col[lo..hi] + } + + /// Build the reverse delta adjacency: for each producer block, the sorted set + /// of consumer blocks that unfold it. This is the natural form for the + /// partition hypergraph, where `net(p) = {p} ∪ consumers_of(p)`. + pub fn consumers_csr(&self) -> (Vec, Vec) { + let n = self.num_blocks(); + let mut counts = vec![0usize; n + 1]; + for &p in &self.delta_col { + counts[p as usize + 1] += 1; + } + for i in 0..n { + counts[i + 1] += counts[i]; + } + let row = counts.clone(); + let mut col = vec![0u32; self.delta_col.len()]; + let mut cursor = counts; + for c in 0..n as u32 { + for &p in self.producers(c) { + let slot = cursor[p as usize]; + col[slot] = c; + cursor[p as usize] += 1; + } + } + (row, col) + } + + /// Total heartbeats across all blocks. + pub fn total_heartbeats(&self) -> u128 { + self.blocks.iter().map(|b| b.heartbeats as u128).sum() + } + + /// Serialize to the `.ixesp` binary format. + pub fn to_bytes(&self) -> Vec { + let n = self.blocks.len(); + let mut out = Vec::with_capacity( + 8 + 4 + 4 + n * 48 + 8 + (n + 1) * 8 + self.delta_col.len() * 4, + ); + out.extend_from_slice(MAGIC); + out.extend_from_slice(&VERSION.to_le_bytes()); + out.extend_from_slice(&(n as u32).to_le_bytes()); + for b in &self.blocks { + out.extend_from_slice(b.addr.as_bytes()); + out.extend_from_slice(&b.heartbeats.to_le_bytes()); + out.extend_from_slice(&b.serialized_size.to_le_bytes()); + out.extend_from_slice(&b.const_count.to_le_bytes()); + } + out.extend_from_slice(&(self.delta_col.len() as u64).to_le_bytes()); + // CSR row offsets (n+1 entries) as u64. + for &off in &self.delta_row { + out.extend_from_slice(&(off as u64).to_le_bytes()); + } + for &p in &self.delta_col { + out.extend_from_slice(&p.to_le_bytes()); + } + out + } + + /// Deserialize from the `.ixesp` binary format. + pub fn from_bytes(bytes: &[u8]) -> Result { + let mut r = Reader::new(bytes); + let magic = r.take(8)?; + if magic != MAGIC { + return Err(ProfileError::BadMagic); + } + let version = r.u32()?; + if version != VERSION { + return Err(ProfileError::BadVersion(version)); + } + let n = r.u32()? as usize; + let mut blocks = Vec::with_capacity(n); + for _ in 0..n { + let addr = Address::from_slice(r.take(32)?) + .map_err(|_| ProfileError::Truncated)?; + let heartbeats = r.u64()?; + let serialized_size = r.u32()?; + let const_count = r.u32()?; + blocks.push(BlockEntry { + addr, + heartbeats, + serialized_size, + const_count, + }); + } + let num_edges = r.u64()? as usize; + let mut delta_row = Vec::with_capacity(n + 1); + for _ in 0..n + 1 { + delta_row.push(r.u64()? as usize); + } + let mut delta_col = Vec::with_capacity(num_edges); + for _ in 0..num_edges { + delta_col.push(r.u32()?); + } + // Structural validation: monotone offsets bounded by edge count, in-range ids. + if delta_row.len() != n + 1 + || delta_row.first() != Some(&0) + || delta_row.last() != Some(&num_edges) + { + return Err(ProfileError::Corrupt); + } + for w in delta_row.windows(2) { + if w[0] > w[1] { + return Err(ProfileError::Corrupt); + } + } + for &p in &delta_col { + if p as usize >= n { + return Err(ProfileError::Corrupt); + } + } + Ok(BlockProfile { blocks, delta_row, delta_col }) + } +} + +/// Errors from decoding a `.ixesp` file. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ProfileError { + BadMagic, + BadVersion(u32), + Truncated, + Corrupt, +} + +impl std::fmt::Display for ProfileError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ProfileError::BadMagic => write!(f, "not an .ixesp file (bad magic)"), + ProfileError::BadVersion(v) => { + write!(f, "unsupported .ixesp version {v}") + }, + ProfileError::Truncated => write!(f, "truncated .ixesp file"), + ProfileError::Corrupt => write!(f, "corrupt .ixesp file"), + } + } +} + +impl std::error::Error for ProfileError {} + +/// Minimal little-endian byte reader with bounds checking. +struct Reader<'a> { + buf: &'a [u8], + pos: usize, +} + +impl<'a> Reader<'a> { + fn new(buf: &'a [u8]) -> Self { + Reader { buf, pos: 0 } + } + fn take(&mut self, n: usize) -> Result<&'a [u8], ProfileError> { + let end = self.pos.checked_add(n).ok_or(ProfileError::Truncated)?; + if end > self.buf.len() { + return Err(ProfileError::Truncated); + } + let s = &self.buf[self.pos..end]; + self.pos = end; + Ok(s) + } + fn u32(&mut self) -> Result { + Ok(u32::from_le_bytes(self.take(4)?.try_into().unwrap())) + } + fn u64(&mut self) -> Result { + Ok(u64::from_le_bytes(self.take(8)?.try_into().unwrap())) + } +} + +/// Accumulates block-level statistics and delta edges (keyed by address), then +/// freezes into a [`BlockProfile`] with stable, address-sorted block ids. +/// +/// Phase A (the kernel recorder) feeds this with one `block(..)` per checked +/// block and one `delta_edge(consumer, producer)` per recorded cross-block +/// unfold. The builder is the merge point for per-worker accumulators: calling +/// `block`/`delta_edge` is commutative and idempotent w.r.t. edge sets, so +/// merge order does not affect the result. +#[derive(Default)] +pub struct ProfileBuilder { + blocks: FxHashMap, +} + +#[derive(Default)] +struct Accum { + heartbeats: u64, + serialized_size: u32, + const_count: u32, + producers: FxHashSet
, +} + +impl ProfileBuilder { + pub fn new() -> Self { + Self::default() + } + + /// Record (or accumulate into) a block's statistics. Heartbeats and + /// const_count accumulate additively; serialized_size is set (idempotent for + /// a fixed block). + pub fn block( + &mut self, + addr: Address, + heartbeats: u64, + serialized_size: u32, + const_count: u32, + ) { + let e = self.blocks.entry(addr).or_default(); + e.heartbeats = e.heartbeats.saturating_add(heartbeats); + e.serialized_size = serialized_size; + e.const_count = e.const_count.saturating_add(const_count); + } + + /// Record that `consumer` delta-unfolds the body of `producer`. Self-edges + /// are ignored. Ensures both endpoints exist as blocks (with zeroed stats if + /// not yet seen) so the graph is well-formed even if a producer is only ever + /// referenced, never directly checked. + pub fn delta_edge(&mut self, consumer: Address, producer: Address) { + if consumer == producer { + return; + } + self.blocks.entry(producer.clone()).or_default(); + self.blocks.entry(consumer).or_default().producers.insert(producer); + } + + /// Freeze into an immutable [`BlockProfile`]. Block ids are assigned by + /// sorting addresses, so the result is deterministic regardless of insertion + /// order. + pub fn finish(self) -> BlockProfile { + let mut addrs: Vec
= self.blocks.keys().cloned().collect(); + addrs.sort(); + let id_of: FxHashMap = + addrs.iter().enumerate().map(|(i, a)| (a.clone(), i as u32)).collect(); + + let mut blocks = Vec::with_capacity(addrs.len()); + let mut delta_row = Vec::with_capacity(addrs.len() + 1); + let mut delta_col = Vec::new(); + delta_row.push(0usize); + + for addr in &addrs { + let a = &self.blocks[addr]; + blocks.push(BlockEntry { + addr: addr.clone(), + heartbeats: a.heartbeats, + serialized_size: a.serialized_size, + const_count: a.const_count, + }); + let mut prods: Vec = a.producers.iter().map(|p| id_of[p]).collect(); + prods.sort_unstable(); + prods.dedup(); + delta_col.extend_from_slice(&prods); + delta_row.push(delta_col.len()); + } + + BlockProfile { blocks, delta_row, delta_col } + } +} + +/// Per-worker raw accumulator filled by the out-of-circuit kernel recorder: for +/// each *constant* (by address) checked on this worker, its heartbeats and the +/// set of constant addresses whose definition bodies it delta-unfolded. The +/// env-aware layer later maps these constant addresses to their home blocks and +/// attaches serialized sizes to produce a [`BlockProfile`]. +#[derive(Default, Debug)] +pub struct ProfileSink { + /// When true, the kernel clears its reduction-memo caches between constants + /// so recording is sound (no unfolds skipped by cross-constant cache hits) + /// and heartbeats reflect the no-cross-constant-memo in-circuit cost. + pub isolate: bool, + /// Consumer constant address → record. + pub records: FxHashMap, +} + +/// One constant's recorded statistics (pre block-aggregation). +#[derive(Default, Debug, Clone)] +pub struct ConstRecord { + /// Recursive fuel (heartbeats) consumed checking this constant. + pub fuel: u64, + /// Constant addresses whose bodies were delta-unfolded during the check. + pub producers: FxHashSet
, +} + +impl ProfileSink { + pub fn new(isolate: bool) -> Self { + ProfileSink { isolate, records: FxHashMap::default() } + } + + /// Accumulate one constant's record (additive in fuel, set-union in + /// producers) so repeated flushes for the same constant combine correctly. + pub fn record( + &mut self, + consumer: Address, + fuel: u64, + producers: impl IntoIterator, + ) { + let rec = self.records.entry(consumer).or_default(); + rec.fuel = rec.fuel.saturating_add(fuel); + rec.producers.extend(producers); + } + + /// Merge another worker's sink into this one (order-independent). + pub fn merge(&mut self, other: ProfileSink) { + for (addr, rec) in other.records { + let e = self.records.entry(addr).or_default(); + e.fuel = e.fuel.saturating_add(rec.fuel); + e.producers.extend(rec.producers); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn addr(byte: u8) -> Address { + Address::from_slice(&[byte; 32]).unwrap() + } + + fn sample() -> BlockProfile { + let mut b = ProfileBuilder::new(); + // Three blocks a = col[lo..hi].to_vec(); + got.sort_unstable(); + assert_eq!(got, vec![0, 2]); + } + + #[test] + fn roundtrip_serialization() { + let p = sample(); + let bytes = p.to_bytes(); + let q = BlockProfile::from_bytes(&bytes).unwrap(); + assert_eq!(p, q); + } + + #[test] + fn rejects_bad_magic_and_truncation() { + assert_eq!( + BlockProfile::from_bytes(b"nope").unwrap_err(), + ProfileError::Truncated + ); + let mut bytes = sample().to_bytes(); + bytes[0] = b'X'; + assert_eq!( + BlockProfile::from_bytes(&bytes).unwrap_err(), + ProfileError::BadMagic + ); + } + + #[test] + fn merge_order_independent() { + // Build the same logical profile with edges added in a different order and + // via separate builders merged conceptually; result must be identical. + let mut b = ProfileBuilder::new(); + b.delta_edge(addr(3), addr(2)); + b.block(addr(3), 300, 30, 1); + b.delta_edge(addr(1), addr(3)); + b.block(addr(2), 200, 20, 3); + b.delta_edge(addr(1), addr(2)); + b.block(addr(1), 100, 10, 1); + b.delta_edge(addr(2), addr(2)); + assert_eq!(b.finish(), sample()); + } +} diff --git a/src/ix/shard.rs b/src/ix/shard.rs new file mode 100644 index 00000000..59ea7658 --- /dev/null +++ b/src/ix/shard.rs @@ -0,0 +1,1119 @@ +//! Sharding: partition an environment's blocks into `N` shards that are +//! balanced by heartbeats and minimize cross-shard delta-unfold ingress +//! (see `plans/sharding.md`). +//! +//! ## Model +//! +//! We solve a **weighted hypergraph partitioning** problem under the +//! connectivity-1 (km1) metric: +//! +//! - **Vertices** are blocks, weighted by `heartbeats` (the balance metric). +//! - **Nets** are blocks `p` that get delta-unfolded by someone; the net +//! connects `p`'s home plus every block that unfolds `p`, and is weighted by +//! `serialized_size(p)` (the ingress cost paid to pull `p`'s body into a +//! foreign shard). +//! - The objective is `Σ_p size(p) · (λ(p) − 1)`, where `λ(p)` is the number +//! of distinct shards spanning `net(p)`. Because the home block is always a +//! pin, `λ(p) − 1` counts exactly the foreign shards that must ingress `p` — +//! i.e. total cross-shard ingress bytes. +//! +//! ## Algorithm +//! +//! **Recursive bisection.** Balanced min-cut bisection recursively splits the +//! block hypergraph, allocating the `N`-shard budget between the two sides at +//! each step (so any `N` works, not just powers of two); the bisection tree is +//! the future proof-aggregation tree. +//! Each bisection uses greedy graph-growing for a locality-aware initial cut +//! followed by Fiduccia–Mattheyses (FM) refinement. On recursion we apply +//! **cut-net splitting**: a net already cut at an upper level is restricted to +//! its pins within each half so it is not recounted deeper. +//! +//! Multilevel coarsening (a further quality boost) is intentionally left as a +//! follow-up; the recursive bisection + FM here is fully self-contained (no +//! external partitioner dependency) and ample for the ~100k-block environments +//! this targets. + +use rustc_hash::FxHashSet; +use std::cmp::Reverse; +use std::collections::BinaryHeap; +use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; +use std::time::Instant; + +use crate::ix::address::Address; +use crate::ix::profile::BlockProfile; + +/// Recurse the two halves of a bisection in parallel (rayon) when the parent +/// sub-problem is at least this large; below it the join overhead isn't worth +/// it. The two halves own disjoint block sets, so their writes to the shared +/// (atomic) assignment buffer never alias. +const PARALLEL_THRESHOLD: usize = 4_000; + +/// Sub-problems larger than this skip Fiduccia–Mattheyses refinement entirely +/// and rely on the greedy graph-growing cut alone. FM is ~O(V·d²) per pass, so +/// on dense graphs (mathlib ≈ 20 edges/block) running it at the top of the +/// recursion is the dominant cost; the few biggest cuts are coarse anyway. +const FM_SKIP_VERTICES: usize = 50_000; + +/// Vertex count below which FM runs to (capped) convergence; between this and +/// [`FM_SKIP_VERTICES`] it runs a few passes only. +const FM_FULL_VERTICES: usize = 10_000; + +/// Nets with more pins than this are "hubs" (e.g. a core lemma delta-unfolded by +/// thousands of blocks). They are cut in essentially any balanced partition, so +/// they carry no useful refinement gradient, and visiting all their pins per +/// move would make FM O(pins-of-hub) per step. Refinement (greedy growth + FM) +/// therefore ignores them; the reported [`Hypergraph::connectivity_objective`] +/// still counts every net in full. After cut-net splitting a former hub may +/// shrink below the cap at deeper recursion levels and become active again. +const FM_NET_CAP: usize = 400; + +/// Maximum FM passes per bisection (a safety bound against pathological +/// oscillation; passes normally converge well before this). +const MAX_FM_PASSES: u32 = 10; + +/// A weighted hypergraph derived from a [`BlockProfile`]. Vertex ids are block +/// ids (identical to the profile's). Nets are stored with global pins. +pub struct Hypergraph { + /// Vertex (block) weights = heartbeats. + vweight: Vec, + /// Net weights = serialized_size of the unfolded block. + net_weight: Vec, + /// Net pins (global block ids). `pins[i]` are the pins of net `i`. + net_pins: Vec>, +} + +impl Hypergraph { + /// Build the partition hypergraph from a profile. A net is created for every + /// block that has at least one *external* consumer (i.e. is delta-unfolded by + /// some other block); singleton nets are omitted as they can never be cut. + pub fn from_profile(profile: &BlockProfile) -> Hypergraph { + let n = profile.num_blocks(); + let vweight: Vec = + (0..n as u32).map(|i| profile.block(i).heartbeats).collect(); + let (row, col) = profile.consumers_csr(); + let mut net_weight = Vec::new(); + let mut net_pins = Vec::new(); + for p in 0..n { + let consumers = &col[row[p]..row[p + 1]]; + if consumers.is_empty() { + continue; + } + let mut pins = Vec::with_capacity(consumers.len() + 1); + pins.push(p as u32); // home pin + pins.extend_from_slice(consumers); + net_weight.push(profile.block(p as u32).serialized_size as u64); + net_pins.push(pins); + } + Hypergraph { vweight, net_weight, net_pins } + } + + pub fn num_vertices(&self) -> usize { + self.vweight.len() + } + + pub fn num_nets(&self) -> usize { + self.net_pins.len() + } + + /// The connectivity-1 (km1) objective for a given shard assignment: + /// `Σ_net weight · (λ − 1)`, where `λ` is the number of distinct shards the + /// net's pins fall into. This is the total cross-shard ingress in bytes. + pub fn connectivity_objective(&self, shard_of: &[u32]) -> u128 { + let mut total: u128 = 0; + let mut seen: FxHashSet = FxHashSet::default(); + for (i, pins) in self.net_pins.iter().enumerate() { + seen.clear(); + for &v in pins { + seen.insert(shard_of[v as usize]); + } + let lambda = seen.len() as u128; + if lambda > 1 { + total += self.net_weight[i] as u128 * (lambda - 1); + } + } + total + } + + /// Partition the blocks into `num_shards` shards via recursive bisection. + /// Returns the shard id (in `0..num_shards`) for every block id. `epsilon` is + /// the per-bisection balance tolerance (e.g. `0.05` for ±5%). `num_shards` + /// need not be a power of two. + pub fn partition(&self, num_shards: usize, epsilon: f64) -> Vec { + let n = self.num_vertices(); + if num_shards <= 1 { + return vec![0u32; n]; // everything in shard 0 + } + // Cap each block's balance weight at the ideal per-shard heartbeats + // (total / num_shards). This keeps balancing heartbeat-aware while a balanced + // split is achievable (few shards), but stops a single oversized *atomic* + // block from skewing every split toward it once there are many shards. The + // resulting heartbeat imbalance is accepted; the goal is `num_shards` + // **non-empty** parallel shards with minimal cross-shard ingress. + let total_hb: u128 = self.vweight.iter().map(|&w| w as u128).sum(); + let cap = + (total_hb / num_shards as u128).max(1).min(u64::MAX as u128) as u64; + let sub = SubHyper::full(self, cap); + // Atomic assignment buffer: independent subtrees own disjoint block sets, so + // they can be partitioned on separate threads without their writes aliasing. + let shard_of: Vec = (0..n).map(|_| AtomicU32::new(0)).collect(); + let prog = PartitionProgress::new(num_shards); + rec_bisect(&sub, num_shards, 0, epsilon, &shard_of, &prog); + prog.done(); + shard_of.into_iter().map(AtomicU32::into_inner).collect() + } +} + +/// A sub-hypergraph used during recursion, with vertices re-indexed locally and +/// nets restricted (cut-net split) to the vertices it contains. +struct SubHyper { + /// local vertex -> global block id + global: Vec, + /// local vertex weights (heartbeats) + vw: Vec, + /// balance weights: each block's heartbeats capped at `cap` (or all 1 if every + /// capped weight is zero in this sub) + bw: Vec, + /// restricted nets: (weight, local pins) + nets: Vec<(u64, Vec)>, + /// local vertex -> incident net indices + vnets: Vec>, + /// global per-shard heartbeat target used to cap `bw` (propagated unchanged + /// through recursion) + cap: u64, +} + +impl SubHyper { + fn full(h: &Hypergraph, cap: u64) -> SubHyper { + let n = h.num_vertices(); + let global: Vec = (0..n as u32).collect(); + let vw = h.vweight.clone(); + let nets: Vec<(u64, Vec)> = h + .net_pins + .iter() + .enumerate() + .map(|(i, pins)| (h.net_weight[i], pins.clone())) + .collect(); + SubHyper::assemble(global, vw, nets, cap) + } + + fn assemble( + global: Vec, + vw: Vec, + nets: Vec<(u64, Vec)>, + cap: u64, + ) -> SubHyper { + let n = vw.len(); + let mut bw: Vec = vw.iter().map(|&w| w.min(cap)).collect(); + let total: u128 = bw.iter().map(|&w| w as u128).sum(); + if total == 0 { + bw = vec![1; n]; + } + let mut vnets = vec![Vec::new(); n]; + for (i, (_, pins)) in nets.iter().enumerate() { + for &v in pins { + vnets[v as usize].push(i as u32); + } + } + SubHyper { global, vw, bw, nets, vnets, cap } + } + + fn num_vertices(&self) -> usize { + self.vw.len() + } + + /// Induce the sub-hypergraph on the local vertices selected by `keep`, + /// applying cut-net splitting: each net is restricted to its kept pins and + /// dropped if fewer than two remain. + fn induce(&self, keep: &[bool]) -> SubHyper { + let mut remap = vec![u32::MAX; self.num_vertices()]; + let mut global = Vec::new(); + let mut vw = Vec::new(); + for v in 0..self.num_vertices() { + if keep[v] { + remap[v] = global.len() as u32; + global.push(self.global[v]); + vw.push(self.vw[v]); + } + } + let mut nets = Vec::new(); + for (w, pins) in &self.nets { + let kept: Vec = pins + .iter() + .filter(|&&p| keep[p as usize]) + .map(|&p| remap[p as usize]) + .collect(); + if kept.len() >= 2 { + nets.push((*w, kept)); + } + } + SubHyper::assemble(global, vw, nets, self.cap) + } +} + +/// Recursively split `sub` into `k` shards (contiguous ids `base..base+k`), +/// writing shard ids into `shard_of` (indexed by global block id). +/// +/// The `k` leaf budget is allocated between the two sides ~proportional to +/// heartbeat mass, but clamped so each side receives at least 1 leaf and at most +/// its own vertex count. That guarantees **every shard is non-empty** whenever +/// the subtree holds at least `k` vertices (at the top, `#blocks ≥ 2ⁿ`), so the +/// full parallelism is realized; the heartbeat-proportional split also keeps +/// per-shard work as even as the atomic blocks allow. The resulting bisection +/// tree is a (possibly unbalanced) binary tree — still a valid aggregation tree. +fn rec_bisect( + sub: &SubHyper, + k: usize, + base: u32, + epsilon: f64, + shard_of: &[AtomicU32], + prog: &PartitionProgress, +) { + if k <= 1 || sub.num_vertices() <= 1 { + // A leaf (or a subtree with ≤1 vertex): everything here is one shard. + for &g in &sub.global { + shard_of[g as usize].store(base, Ordering::Relaxed); + } + prog.leaf(); + return; + } + prog.bisecting(sub.num_vertices(), k); + let side = bisect(sub, epsilon); + let keep0: Vec = side.iter().map(|&s| s == 0).collect(); + let keep1: Vec = side.iter().map(|&s| s == 1).collect(); + let left = sub.induce(&keep0); + let right = sub.induce(&keep1); + let nl = left.num_vertices(); + let nr = right.num_vertices(); + if nl == 0 || nr == 0 { + // Degenerate split (shouldn't happen for |sub| ≥ 2): keep together. + for &g in &sub.global { + shard_of[g as usize].store(base, Ordering::Relaxed); + } + prog.leaf(); + return; + } + // Allocate leaves ~proportional to heartbeat mass, clamped to [1, side size]. + // `lo`/`hi` are feasible (lo ≤ hi) because nl + nr ≥ k with nl, nr ≥ 1. + let wl: u128 = left.vw.iter().map(|&w| w as u128).sum(); + let wr: u128 = right.vw.iter().map(|&w| w as u128).sum(); + let wsum = (wl + wr).max(1); + let ideal = ((k as u128 * wl + wsum / 2) / wsum) as usize; + let lo = 1.max(k.saturating_sub(nr)); + let hi = (k - 1).min(nl); + let k_left = ideal.clamp(lo, hi); + let rbase = base + k_left as u32; + // The two halves are independent (disjoint blocks) — recurse them in parallel + // when the work is large enough to amortize the join. The result is identical + // to serial execution (each subtree is deterministic). + if nl + nr >= PARALLEL_THRESHOLD { + rayon::join( + || rec_bisect(&left, k_left, base, epsilon, shard_of, prog), + || rec_bisect(&right, k - k_left, rbase, epsilon, shard_of, prog), + ); + } else { + rec_bisect(&left, k_left, base, epsilon, shard_of, prog); + rec_bisect(&right, k - k_left, rbase, epsilon, shard_of, prog); + } +} + +/// Progress reporter for [`Hypergraph::partition`]. The partitioner is silent +/// otherwise (it only writes its output at the end), which makes a slow run +/// indistinguishable from a stuck one; this logs each large bisection and the +/// running shards-finalized count (to stderr, unless `IX_SHARD_QUIET` is set). +struct PartitionProgress { + total: usize, + finalized: AtomicUsize, + start: Instant, + enabled: bool, +} + +impl PartitionProgress { + fn new(total: usize) -> Self { + PartitionProgress { + total, + finalized: AtomicUsize::new(0), + start: Instant::now(), + enabled: std::env::var("IX_SHARD_QUIET").is_err(), + } + } + + /// Log a large bisection about to run (the slow steps). `&self` so it can be + /// shared across the parallel recursion. + fn bisecting(&self, n: usize, k: usize) { + if self.enabled && n >= FM_SKIP_VERTICES { + eprintln!( + "[shard] bisecting {n} vertices → {k} shards ({:.1?} elapsed, {}/{} shards done)", + self.start.elapsed(), + self.finalized.load(Ordering::Relaxed), + self.total + ); + } + } + + /// A shard leaf was finalized; log roughly every 1/16 of the shards. Uses an + /// atomic counter so concurrent subtrees report coherently. + fn leaf(&self) { + let done = self.finalized.fetch_add(1, Ordering::Relaxed) + 1; + let step = (self.total / 16).max(1); + if self.enabled && (done % step == 0 || done == self.total) { + eprintln!( + "[shard] {}/{} shards finalized ({:.1?} elapsed)", + done, + self.total, + self.start.elapsed() + ); + } + } + + fn done(&self) { + if self.enabled { + eprintln!( + "[shard] partitioned into {} shards in {:.1?}", + self.finalized.load(Ordering::Relaxed), + self.start.elapsed() + ); + } + } +} + +/// Bisect `sub` into two balanced parts minimizing cut weight. Returns a side +/// (0/1) per local vertex. Greedy graph-growing initial partition + FM. +fn bisect(sub: &SubHyper, epsilon: f64) -> Vec { + let n = sub.num_vertices(); + if n == 0 { + return Vec::new(); + } + if n == 1 { + return vec![0]; + } + let total_bw: u64 = sub.bw.iter().sum(); + // Balance bounds for side weights. + let wmax = ((0.5 + epsilon) * total_bw as f64).ceil() as u64; + let wmin = ((0.5 - epsilon) * total_bw as f64).floor() as u64; + + let mut side = greedy_grow(sub, total_bw); + fm_refine(sub, &mut side, wmin, wmax); + side +} + +/// Greedy graph-growing: start from the heaviest vertex and repeatedly absorb +/// the boundary vertex most strongly connected (by net weight) to side 0 until +/// side 0 reaches ~half the balance weight. Remaining vertices go to side 1. +fn greedy_grow(sub: &SubHyper, total_bw: u64) -> Vec { + let n = sub.num_vertices(); + let mut side = vec![1u8; n]; + // Seed: heaviest vertex (deterministic; ties broken by lowest id). + let seed = (0..n) + .max_by(|&a, &b| { + sub.vw[a].cmp(&sub.vw[b]).then(Reverse(a).cmp(&Reverse(b))) + }) + .unwrap(); + + // connection[v] = total weight of nets that already touch side 0 and include v. + let mut connection = vec![0u64; n]; + let mut in_side0 = vec![false; n]; + // Whether a net already contributes to side 0. A net bumps its pins exactly + // once (on first connection), so growth is O(Σ net_size), not O(Σ net_size²) + // — essential on dense graphs (and it also stops over-counting connection). + let mut net_touched = vec![false; sub.nets.len()]; + let mut heap: BinaryHeap<(u64, Reverse)> = BinaryHeap::new(); + let mut side0_bw = 0u64; + let target = total_bw / 2; + // Forward cursor for the disconnected-fallback path (amortized O(n) total, + // vs. an O(n) max-scan per fallback which is O(n²) on delta-sparse envs). + let mut fallback_cursor = 0usize; + + let mut add = |v: usize, + side: &mut [u8], + in_side0: &mut [bool], + connection: &mut [u64], + heap: &mut BinaryHeap<(u64, Reverse)>, + side0_bw: &mut u64| { + side[v] = 0; + in_side0[v] = true; + *side0_bw += sub.bw[v]; + for &ni in &sub.vnets[v] { + let nis = ni as usize; + let (w, pins) = &sub.nets[nis]; + // Skip hub nets, and nets already counted toward side 0. + if pins.len() > FM_NET_CAP || net_touched[nis] { + continue; + } + net_touched[nis] = true; + for &u in pins { + let u = u as usize; + if !in_side0[u] { + connection[u] += *w; + heap.push((connection[u], Reverse(u as u32))); + } + } + } + }; + + add( + seed, + &mut side, + &mut in_side0, + &mut connection, + &mut heap, + &mut side0_bw, + ); + + while side0_bw < target { + // Pop the best-connected vertex not yet in side 0 (lazy-validated). + let mut next = None; + while let Some((c, Reverse(v))) = heap.pop() { + let v = v as usize; + if !in_side0[v] && c == connection[v] { + next = Some(v); + break; + } + } + match next { + Some(v) => { + add( + v, + &mut side, + &mut in_side0, + &mut connection, + &mut heap, + &mut side0_bw, + ); + }, + None => { + // No connected frontier left (graph is disconnected here); fill toward + // balance with the next unassigned vertex by id. Disconnected vertices + // have no incident (non-hub) nets, so their placement order is + // cut-neutral. + while fallback_cursor < n && in_side0[fallback_cursor] { + fallback_cursor += 1; + } + if fallback_cursor < n { + add( + fallback_cursor, + &mut side, + &mut in_side0, + &mut connection, + &mut heap, + &mut side0_bw, + ); + } else { + break; + } + }, + } + } + side +} + +/// Per-net pin counts on each side, for incremental cut maintenance. +struct NetState { + cnt: Vec<[u32; 2]>, +} + +impl NetState { + fn new(sub: &SubHyper, side: &[u8]) -> (NetState, u128) { + let mut cnt = vec![[0u32; 2]; sub.nets.len()]; + let mut cut: u128 = 0; + for (i, (w, pins)) in sub.nets.iter().enumerate() { + if pins.len() > FM_NET_CAP { + continue; // hub net: invisible to FM (counts stay zero) + } + for &v in pins { + cnt[i][side[v as usize] as usize] += 1; + } + if cnt[i][0] > 0 && cnt[i][1] > 0 { + cut += *w as u128; + } + } + (NetState { cnt }, cut) + } +} + +/// FM gain of moving local vertex `v` from its side to the other. +/// `+w` for each net where `v`'s side has exactly one pin (the move uncuts it); +/// `−w` for each net fully on `v`'s side with ≥2 pins (the move newly cuts it). +fn fm_gain(sub: &SubHyper, ns: &NetState, side: &[u8], v: usize) -> i128 { + let a = side[v] as usize; + let b = 1 - a; + let mut g: i128 = 0; + for &ni in &sub.vnets[v] { + let (w, pins) = &sub.nets[ni as usize]; + if pins.len() > FM_NET_CAP { + continue; // hub net: ignored by FM + } + let ca = ns.cnt[ni as usize][a]; + let cb = ns.cnt[ni as usize][b]; + if cb >= 1 && ca == 1 { + g += *w as i128; + } else if cb == 0 && ca >= 2 { + g -= *w as i128; + } + } + g +} + +/// Fiduccia–Mattheyses refinement: repeated passes, each moving every vertex at +/// most once in decreasing-gain order subject to the balance window, tracking +/// the lowest-cut prefix and rolling back to it. Stops when a pass yields no +/// improvement. +fn fm_refine(sub: &SubHyper, side: &mut [u8], wmin: u64, wmax: u64) { + let n = sub.num_vertices(); + // Greedy-only above the skip threshold (FM is too slow on huge dense subs); + // a few passes in the middle band; full (capped) convergence when small. + if n <= 1 || n > FM_SKIP_VERTICES { + return; + } + let budget = if n > FM_FULL_VERTICES { 3 } else { MAX_FM_PASSES }; + let mut pass = 0u32; + loop { + let (mut ns, mut cut) = NetState::new(sub, side); + let mut side0_bw: u64 = + (0..n).filter(|&v| side[v] == 0).map(|v| sub.bw[v]).sum(); + + let mut heap: BinaryHeap<(i128, Reverse)> = BinaryHeap::new(); + let mut cur_gain = vec![0i128; n]; + for v in 0..n { + let g = fm_gain(sub, &ns, side, v); + cur_gain[v] = g; + heap.push((g, Reverse(v as u32))); + } + + let mut locked = vec![false; n]; + let mut moves: Vec = Vec::new(); + let mut best_cut = cut; + let mut best_prefix = 0usize; // number of moves to keep + + while let Some((g, Reverse(v))) = heap.pop() { + let v = v as usize; + if locked[v] || g != cur_gain[v] { + continue; // stale entry + } + // Balance check: moving v shifts bw[v] across the cut. + let (s0_after, ok) = if side[v] == 0 { + let s0 = side0_bw - sub.bw[v]; + (s0, s0 >= wmin) + } else { + let s0 = side0_bw + sub.bw[v]; + (s0, s0 <= wmax) + }; + if !ok { + // Infeasible right now; lock it out of this pass. + locked[v] = true; + continue; + } + // Apply the move. + let a = side[v] as usize; + let b = 1 - a; + for &ni in &sub.vnets[v] { + let (w, pins) = &sub.nets[ni as usize]; + if pins.len() > FM_NET_CAP { + continue; // hub net: not tracked, skip cnt update + } + let before = ns.cnt[ni as usize][0] > 0 && ns.cnt[ni as usize][1] > 0; + ns.cnt[ni as usize][a] -= 1; + ns.cnt[ni as usize][b] += 1; + let after = ns.cnt[ni as usize][0] > 0 && ns.cnt[ni as usize][1] > 0; + if before && !after { + cut -= *w as u128; + } else if !before && after { + cut += *w as u128; + } + } + side[v] = b as u8; + side0_bw = s0_after; + locked[v] = true; + moves.push(v); + + // Recompute gains of neighbors (and v's net co-pins). + let mut touched: FxHashSet = FxHashSet::default(); + for &ni in &sub.vnets[v] { + let pins = &sub.nets[ni as usize].1; + if pins.len() > FM_NET_CAP { + continue; // hub net: ignored by FM + } + for &u in pins { + if !locked[u as usize] { + touched.insert(u); + } + } + } + for u in touched { + let ng = fm_gain(sub, &ns, side, u as usize); + if ng != cur_gain[u as usize] { + cur_gain[u as usize] = ng; + heap.push((ng, Reverse(u))); + } + } + + if cut < best_cut { + best_cut = cut; + best_prefix = moves.len(); + } + } + + // Roll back moves after the best prefix. + for &v in moves.iter().skip(best_prefix) { + side[v] ^= 1; + } + + pass += 1; + if best_prefix == 0 || pass >= budget { + break; // converged, or hit the pass bound + } + } +} + +// ============================================================================ +// Manifest +// ============================================================================ + +/// Per-shard summary in a [`ShardManifest`]. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ShardInfo { + /// Shard id (an `n`-bit path through the bisection tree). + pub id: u32, + /// Member block addresses. + pub blocks: Vec
, + /// Sum of member heartbeats (balance metric). + pub heartbeats: u64, + /// Sum of member serialized sizes (the shard's own ingress). + pub own_size: u64, + /// Foreign blocks delta-unfolded by members but proven in other shards. + pub foreign_blocks: Vec
, + /// Sum of `serialized_size` over `foreign_blocks` (this shard's share of the + /// cross-shard ingress objective). + pub cross_ingress: u64, + /// Sound assumption-tree root for this shard's conditional proof: the Merkle + /// root over the foreign part of the shard's static reference closure. `None` + /// until populated by the env-aware layer (the pure partitioner has no `Env`). + pub assumption_root: Option
, +} + +/// The sharding manifest: the partition plus its cost metrics. Assumption-tree +/// roots (which require the static reference closure from the `Env`) are filled +/// in by the env-aware CLI layer, not here. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ShardManifest { + /// Number of shards (the partition target == `shards.len()`). + pub num_shards: u32, + pub shards: Vec, + /// Total cross-shard ingress bytes (the km1 objective). + pub total_cross_ingress: u128, +} + +impl ShardManifest { + /// Build the manifest from a profile and a shard assignment (shard id in + /// `0..num_shards` per block id). + pub fn build( + profile: &BlockProfile, + shard_of: &[u32], + num_shards: usize, + ) -> ShardManifest { + let mut members: Vec> = vec![Vec::new(); num_shards]; + for (b, &s) in shard_of.iter().enumerate() { + members[s as usize].push(b as u32); + } + // Distinct foreign blocks per shard. + let mut foreign: Vec> = + vec![FxHashSet::default(); num_shards]; + for b in 0..profile.num_blocks() as u32 { + let s = shard_of[b as usize] as usize; + for &p in profile.producers(b) { + if shard_of[p as usize] as usize != s { + foreign[s].insert(p); + } + } + } + let mut shards = Vec::with_capacity(num_shards); + let mut total_cross: u128 = 0; + for s in 0..num_shards { + let blocks: Vec
= + members[s].iter().map(|&b| profile.block(b).addr.clone()).collect(); + let heartbeats: u64 = + members[s].iter().map(|&b| profile.block(b).heartbeats).sum(); + let own_size: u64 = members[s] + .iter() + .map(|&b| profile.block(b).serialized_size as u64) + .sum(); + let mut fb: Vec = foreign[s].iter().copied().collect(); + fb.sort_unstable(); + let cross_ingress: u64 = + fb.iter().map(|&p| profile.block(p).serialized_size as u64).sum(); + total_cross += cross_ingress as u128; + let foreign_blocks: Vec
= + fb.iter().map(|&p| profile.block(p).addr.clone()).collect(); + shards.push(ShardInfo { + id: s as u32, + blocks, + heartbeats, + own_size, + foreign_blocks, + cross_ingress, + assumption_root: None, + }); + } + ShardManifest { + num_shards: num_shards as u32, + shards, + total_cross_ingress: total_cross, + } + } + + /// A human-readable what-if summary line. + pub fn summary(&self) -> String { + let hbs: Vec = self.shards.iter().map(|s| s.heartbeats).collect(); + let nonempty: Vec = self + .shards + .iter() + .filter(|s| !s.blocks.is_empty()) + .map(|s| s.heartbeats) + .collect(); + let max = hbs.iter().copied().max().unwrap_or(0); + let min = nonempty.iter().copied().min().unwrap_or(0); + let total: u128 = hbs.iter().map(|&h| h as u128).sum(); + let mean = if self.shards.is_empty() { + 0 + } else { + (total / self.shards.len() as u128) as u64 + }; + let empty = self.shards.iter().filter(|s| s.blocks.is_empty()).count(); + let max_cross = + self.shards.iter().map(|s| s.cross_ingress).max().unwrap_or(0); + format!( + "shards={} (empty={}) heartbeats[min={} mean={} max={}] imbalance={:.2}x \ + cross_ingress_total={} max_shard_cross={}", + self.shards.len(), + empty, + min, + mean, + max, + if mean == 0 { 0.0 } else { max as f64 / mean as f64 }, + self.total_cross_ingress, + max_cross, + ) + } + + /// Serialize to the `.ixes` binary format. + pub fn to_bytes(&self) -> Vec { + let mut out = Vec::new(); + out.extend_from_slice(SHARD_MAGIC); + out.extend_from_slice(&self.total_cross_ingress.to_le_bytes()); + out.extend_from_slice(&(self.shards.len() as u32).to_le_bytes()); + let put_addrs = |out: &mut Vec, addrs: &[Address]| { + out.extend_from_slice(&(addrs.len() as u32).to_le_bytes()); + for a in addrs { + out.extend_from_slice(a.as_bytes()); + } + }; + for sh in &self.shards { + out.extend_from_slice(&sh.id.to_le_bytes()); + out.extend_from_slice(&sh.heartbeats.to_le_bytes()); + out.extend_from_slice(&sh.own_size.to_le_bytes()); + out.extend_from_slice(&sh.cross_ingress.to_le_bytes()); + match &sh.assumption_root { + Some(a) => { + out.push(1); + out.extend_from_slice(a.as_bytes()); + }, + None => out.push(0), + } + put_addrs(&mut out, &sh.blocks); + put_addrs(&mut out, &sh.foreign_blocks); + } + out + } + + /// Deserialize from the `.ixes` binary format. + pub fn from_bytes(bytes: &[u8]) -> Result { + let mut c = Cur { buf: bytes, pos: 0 }; + if c.take(8)? != SHARD_MAGIC { + return Err("not an .ixes file (bad magic)".into()); + } + let total_cross_ingress = c.u128()?; + let num_shards = c.u32()? as usize; + let mut shards = Vec::with_capacity(num_shards); + for _ in 0..num_shards { + let id = c.u32()?; + let heartbeats = c.u64()?; + let own_size = c.u64()?; + let cross_ingress = c.u64()?; + let assumption_root = if c.u8()? == 1 { Some(c.addr()?) } else { None }; + let blocks = c.addrs()?; + let foreign_blocks = c.addrs()?; + shards.push(ShardInfo { + id, + blocks, + heartbeats, + own_size, + foreign_blocks, + cross_ingress, + assumption_root, + }); + } + Ok(ShardManifest { + num_shards: num_shards as u32, + shards, + total_cross_ingress, + }) + } +} + +/// Magic bytes at the head of every `.ixes` file. +const SHARD_MAGIC: &[u8; 8] = b"IXES\0\0\0\0"; + +/// Minimal little-endian cursor for manifest decoding. +struct Cur<'a> { + buf: &'a [u8], + pos: usize, +} + +impl<'a> Cur<'a> { + fn take(&mut self, n: usize) -> Result<&'a [u8], String> { + let end = self.pos.checked_add(n).ok_or("truncated .ixes")?; + if end > self.buf.len() { + return Err("truncated .ixes".into()); + } + let s = &self.buf[self.pos..end]; + self.pos = end; + Ok(s) + } + fn u8(&mut self) -> Result { + Ok(self.take(1)?[0]) + } + fn u32(&mut self) -> Result { + Ok(u32::from_le_bytes(self.take(4)?.try_into().unwrap())) + } + fn u64(&mut self) -> Result { + Ok(u64::from_le_bytes(self.take(8)?.try_into().unwrap())) + } + fn u128(&mut self) -> Result { + Ok(u128::from_le_bytes(self.take(16)?.try_into().unwrap())) + } + fn addr(&mut self) -> Result { + Address::from_slice(self.take(32)?).map_err(|_| "bad address".into()) + } + fn addrs(&mut self) -> Result, String> { + let n = self.u32()? as usize; + let mut v = Vec::with_capacity(n); + for _ in 0..n { + v.push(self.addr()?); + } + Ok(v) + } +} + +/// Read a `.ixesp`, partition into `num_shards` shards, and emit a manifest with +/// per-shard cost metrics, foreign-block sets, and (delta-based) assumption +/// roots. Optionally writes the manifest (`.ixes`). Returns a what-if report. +/// +/// The assumption root here is the Merkle root over the foreign *blocks* whose +/// bodies the shard must ingress (the cost-model-tight set). The fully-sound +/// variant for proof generation uses the static reference closure over member +/// constants and additionally needs the `.ixe`; see `plans/sharding.md` §4. +pub fn shard_esp( + esp_path: &str, + num_shards: usize, + balance: f64, + out_path: Option<&str>, +) -> Result { + let bytes = + std::fs::read(esp_path).map_err(|e| format!("read {esp_path}: {e}"))?; + let profile = BlockProfile::from_bytes(&bytes) + .map_err(|e| format!("parse {esp_path}: {e}"))?; + let h = Hypergraph::from_profile(&profile); + let shard_of = h.partition(num_shards, balance); + let mut manifest = ShardManifest::build(&profile, &shard_of, num_shards); + for shard in &mut manifest.shards { + shard.assumption_root = + crate::ix::ixon::merkle::merkle_root_canonical(&shard.foreign_blocks); + } + if let Some(op) = out_path { + std::fs::write(op, manifest.to_bytes()) + .map_err(|e| format!("write {op}: {e}"))?; + } + // The largest single block's heartbeats is the *floor* on achievable + // per-shard balance: a mutual block is atomic and cannot be split, so no + // partition can drive max-shard heartbeats below it. When the heaviest shard + // is pinned at this floor, adding more shards only worsens imbalance — a + // signal to stop raising the shard count or to split the block. + let max_block_hb = + profile.blocks().iter().map(|b| b.heartbeats).max().unwrap_or(0); + let max_shard_hb = + manifest.shards.iter().map(|s| s.heartbeats).max().unwrap_or(0); + let floored = + num_shards > 1 && max_shard_hb <= max_block_hb.saturating_mul(11) / 10; + let note = if floored { + " [balance floored by largest atomic block — more shards won't help]" + } else { + "" + }; + Ok(format!( + "blocks={} delta_edges={} nets={}\n{}\nlargest_block_hb={}{}", + profile.num_blocks(), + profile.num_edges(), + h.num_nets(), + manifest.summary(), + max_block_hb, + note, + )) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::ix::profile::ProfileBuilder; + + fn addr(byte: u8) -> Address { + Address::from_slice(&[byte; 32]).unwrap() + } + + /// Two tight clusters {1,2,3} and {4,5,6} with heavy intra-cluster delta and a + /// single thin cross edge. A good bisection cuts only the thin edge. + fn two_clusters() -> BlockProfile { + let mut b = ProfileBuilder::new(); + for i in 1..=6u8 { + b.block(addr(i), 100, 1000, 1); + } + // intra cluster A + b.delta_edge(addr(1), addr(2)); + b.delta_edge(addr(2), addr(3)); + b.delta_edge(addr(3), addr(1)); + // intra cluster B + b.delta_edge(addr(4), addr(5)); + b.delta_edge(addr(5), addr(6)); + b.delta_edge(addr(6), addr(4)); + // thin cross edge A->B + b.delta_edge(addr(3), addr(4)); + b.finish() + } + + #[test] + fn bisect_separates_clusters() { + let p = two_clusters(); + let h = Hypergraph::from_profile(&p); + let shard_of = h.partition(2, 0.10); + // Blocks 0,1,2 (addr 1,2,3) should share a shard; 3,4,5 (addr 4,5,6) the other. + assert_eq!(shard_of[0], shard_of[1]); + assert_eq!(shard_of[1], shard_of[2]); + assert_eq!(shard_of[3], shard_of[4]); + assert_eq!(shard_of[4], shard_of[5]); + assert_ne!(shard_of[0], shard_of[3]); + // Objective: only the cross net (addr 4, size 1000) is cut, λ=2 → 1000. + assert_eq!(h.connectivity_objective(&shard_of), 1000); + } + + #[test] + fn objective_matches_manifest_cross_ingress() { + let p = two_clusters(); + let h = Hypergraph::from_profile(&p); + let shard_of = h.partition(2, 0.10); + let m = ShardManifest::build(&p, &shard_of, 2); + // Σ per-shard cross-ingress must equal the km1 objective. + assert_eq!(m.total_cross_ingress, h.connectivity_objective(&shard_of)); + } + + #[test] + fn balanced_partition() { + let p = two_clusters(); + let h = Hypergraph::from_profile(&p); + let shard_of = h.partition(2, 0.10); + let m = ShardManifest::build(&p, &shard_of, 2); + assert_eq!(m.shards.len(), 2); + // Each cluster has 3 blocks × 100 heartbeats = 300; perfectly balanced. + assert_eq!(m.shards[0].heartbeats, 300); + assert_eq!(m.shards[1].heartbeats, 300); + } + + #[test] + fn four_shards_perfect_split() { + // Four clusters of 4 blocks each, heavy intra, no cross: perfect 4-way split. + let mut b = ProfileBuilder::new(); + for c in 0..4u8 { + let base = c * 4 + 1; + for k in 0..4u8 { + b.block(addr(base + k), 100, 500, 1); + } + b.delta_edge(addr(base), addr(base + 1)); + b.delta_edge(addr(base + 1), addr(base + 2)); + b.delta_edge(addr(base + 2), addr(base + 3)); + b.delta_edge(addr(base + 3), addr(base)); + } + let p = b.finish(); + let h = Hypergraph::from_profile(&p); + let shard_of = h.partition(4, 0.10); + let m = ShardManifest::build(&p, &shard_of, 4); + assert_eq!(m.shards.len(), 4); + // No cross-cluster edges → zero cross ingress achievable. + assert_eq!(m.total_cross_ingress, 0); + // Each non-empty shard should hold exactly one cluster (4×100). + for s in &m.shards { + assert_eq!(s.heartbeats, 400); + } + } + + #[test] + fn manifest_roundtrip_serialization() { + let p = two_clusters(); + let h = Hypergraph::from_profile(&p); + let shard_of = h.partition(2, 0.10); + let mut m = ShardManifest::build(&p, &shard_of, 2); + // Simulate the env layer filling in an assumption root on one shard. + m.shards[0].assumption_root = Some(addr(42)); + let bytes = m.to_bytes(); + let q = ShardManifest::from_bytes(&bytes).unwrap(); + assert_eq!(m, q); + assert_eq!(q.shards[0].assumption_root, Some(addr(42))); + assert_eq!(q.shards[1].assumption_root, None); + } + + #[test] + fn cap_keeps_shards_non_empty_under_skew() { + // A heavy atomic block among many light ones. Capping the balance weight at + // total/num_shards (plus leaf-count allocation) must keep every shard + // non-empty (parallelism is the goal), even though heartbeat balance is + // impossible. + let mut b = ProfileBuilder::new(); + b.block(addr(1), 30_000, 100, 1); // ~30x a light block + for i in 2..=65u8 { + b.block(addr(i), 1000, 100, 1); + } + for i in 2..=64u8 { + b.delta_edge(addr(i), addr(i + 1)); + } + let p = b.finish(); + let h = Hypergraph::from_profile(&p); + let shard_of = h.partition(8, 0.20); // 8 shards from 65 blocks + let m = ShardManifest::build(&p, &shard_of, 8); + assert_eq!(m.shards.len(), 8); + assert_eq!( + m.shards.iter().filter(|s| s.blocks.is_empty()).count(), + 0, + "capping must keep all shards non-empty" + ); + } + + #[test] + fn shard_esp_file_roundtrip() { + // Exercise the CLI's shard path: write a .ixesp, run shard_esp, read back + // the .ixes manifest. + let p = two_clusters(); + let dir = std::env::temp_dir(); + let pid = std::process::id(); + let prof = dir.join(format!("ix_shard_rt_{pid}.ixesp")); + let shard = dir.join(format!("ix_shard_rt_{pid}.ixes")); + std::fs::write(&prof, p.to_bytes()).unwrap(); + + let report = + shard_esp(prof.to_str().unwrap(), 2, 0.10, Some(shard.to_str().unwrap())) + .unwrap(); + assert!(report.contains("shards=2"), "report was: {report}"); + + let m = ShardManifest::from_bytes(&std::fs::read(&shard).unwrap()).unwrap(); + assert_eq!(m.shards.len(), 2); + assert_eq!(m.total_cross_ingress, 1000); + + let _ = std::fs::remove_file(&prof); + let _ = std::fs::remove_file(&shard); + } +} From fd06667ca4babdce0db203aab746d283b99c17e0 Mon Sep 17 00:00:00 2001 From: "John C. Burnham" Date: Mon, 8 Jun 2026 05:37:30 -0400 Subject: [PATCH 2/3] Sharding: multilevel coarsening for the env partitioner MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the flat greedy+FM body of `bisect()` with a multilevel V-cycle (coarsen → partition the tiny coarsest graph → uncoarsen + refine), so large environments partition in a fraction of the time *and* at markedly lower cross-shard ingress. Self-contained to `src/ix/shard.rs`; recursion, leaf-count allocation, cut-net splitting, rayon parallelism, and the balance-weight cap are unchanged. Each bisection now: - coarsens the sub-hypergraph by heavy-edge matching (merge blocks that co-occur in small, heavy delta-nets) under a cluster-weight cap, down to ~256 supervertices; - decides the global cut once on that tiny graph (greedy graph-growing + FM to convergence, from several diverse seeds, keeping the lowest balanced cut); - uncoarsens, projecting the cut down and boundary-refining with FM at each level. Deciding global structure on the small graph and only ever refining locally on the big ones is what improves both axes at once. Benchmarks (profiled `.ixe` → partition; balance ±5%; flat baseline → multilevel; 0 empty shards, deterministic in every case): env n partition time cross-shard ingress init 64 5.0s → 2.7s 8.77M → 6.52M (-26%) init 128 6.0s → 2.7s 12.44M → 9.83M (-21%) init 256 6.0s → 3.0s 17.71M → 15.03M (-15%) lean 64 8.0s → 3.4s 9.59M → 7.42M (-23%) lean 128 8.0s → 3.5s 12.59M → 10.48M (-17%) lean 256 7.0s → 3.6s 18.73M → 15.49M (-17%) mathlib 64 99s → 30.8s 185.63M → 93.60M (-50%) mathlib 128 101s → 31.4s 233.58M →131.84M (-44%) mathlib 256 104s → 33.8s 294.85M →184.32M (-37%) mathlib (631k blocks, 12.4M delta edges) partitions ~3.2x faster with roughly half the cross-shard ingress; its heartbeat imbalance also drops (1.66x → 1.53x at n=64). The non-empty-shard guarantee and determinism hold throughout. Implementation notes — what real-env validation required beyond the textbook scheme: - Fallback pairing in matching: a vertex with no heavy-edge partner (delta-sparse or only in hub nets) is merged with the next unmatched vertex. They share no tracked net, so the merge is cut-neutral, but it keeps coarsening shrinking ~2x/pass instead of stalling far above the target (which had left an expensive initial partition on a ~51k-vertex graph). - Incremental FM gains: a move updates only the changed net's contribution to each co-pin (O(Σ pins of the moved vertex's nets)) instead of recomputing neighbour gains from scratch (O(Σ neighbour degrees)). Dominant speedup — refining the finest level dropped from ~11s to ~0.9s on init. - Degenerate-split rejection: graph-growing seeded at a light vertex can sweep an entire sub onto one side (cut 0) when one atomic block holds more than half the balance weight; the initial-partition selector now rejects empty-sided candidates and prefers balanced-then-min-cut, so no shard is left empty. - One boundary-refinement pass per uncoarsen level (measured 1.6–1.8x faster than two for only ~3–6% more ingress — a clear win given the margin). Removes the FM-skip heuristic (`FM_SKIP_VERTICES` / `FM_FULL_VERTICES`): full refinement is now affordable at every size because it only ever runs on already-good partitions. Adds unit tests for one-level contraction, cluster-cap matching, large-cluster separation through the full V-cycle, determinism, and the non-empty guarantee under a dominant block. --- src/ix/shard.rs | 714 +++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 610 insertions(+), 104 deletions(-) diff --git a/src/ix/shard.rs b/src/ix/shard.rs index 59ea7658..193ae59c 100644 --- a/src/ix/shard.rs +++ b/src/ix/shard.rs @@ -22,16 +22,20 @@ //! **Recursive bisection.** Balanced min-cut bisection recursively splits the //! block hypergraph, allocating the `N`-shard budget between the two sides at //! each step (so any `N` works, not just powers of two); the bisection tree is -//! the future proof-aggregation tree. -//! Each bisection uses greedy graph-growing for a locality-aware initial cut -//! followed by Fiduccia–Mattheyses (FM) refinement. On recursion we apply -//! **cut-net splitting**: a net already cut at an upper level is restricted to -//! its pins within each half so it is not recounted deeper. +//! the future proof-aggregation tree. On recursion we apply **cut-net +//! splitting**: a net already cut at an upper level is restricted to its pins +//! within each half so it is not recounted deeper. //! -//! Multilevel coarsening (a further quality boost) is intentionally left as a -//! follow-up; the recursive bisection + FM here is fully self-contained (no -//! external partitioner dependency) and ample for the ~100k-block environments -//! this targets. +//! **Multilevel coarsening.** Each individual bisection is solved with a +//! multilevel V-cycle rather than flat local search: we *coarsen* the +//! sub-hypergraph by repeatedly merging tightly-coupled blocks into +//! supervertices (heavy-edge matching) until only a few hundred remain, decide +//! the cut once on that tiny graph (greedy graph-growing + Fiduccia–Mattheyses +//! to convergence), then *uncoarsen* — projecting the cut back down and +//! boundary-refining with FM at each level. Deciding global structure on the +//! small graph and only ever refining locally on the big ones is what keeps +//! large environments (mathlib ≈ 631k blocks) partitioning in seconds. The +//! partitioner is fully self-contained — no external partitioner dependency. use rustc_hash::FxHashSet; use std::cmp::Reverse; @@ -48,15 +52,41 @@ use crate::ix::profile::BlockProfile; /// (atomic) assignment buffer never alias. const PARALLEL_THRESHOLD: usize = 4_000; -/// Sub-problems larger than this skip Fiduccia–Mattheyses refinement entirely -/// and rely on the greedy graph-growing cut alone. FM is ~O(V·d²) per pass, so -/// on dense graphs (mathlib ≈ 20 edges/block) running it at the top of the -/// recursion is the dominant cost; the few biggest cuts are coarse anyway. -const FM_SKIP_VERTICES: usize = 50_000; - -/// Vertex count below which FM runs to (capped) convergence; between this and -/// [`FM_SKIP_VERTICES`] it runs a few passes only. -const FM_FULL_VERTICES: usize = 10_000; +/// Target vertex count for the coarsest graph in the multilevel V-cycle. We +/// coarsen down to roughly this many supervertices, decide the global cut there +/// (cheaply, at high quality), then uncoarsen and locally refine. Small enough +/// that greedy + full FM on it is instant; large enough to still admit a +/// balanced bisection. +const COARSEST_TARGET: usize = 256; + +/// Stop coarsening if a matching pass fails to shrink the graph by at least +/// `1 − COARSEN_STALL_RATIO` (here 10%). Guards against spinning on graphs with +/// few matchable pairs (very sparse or hub-dominated subs); [`bisect`] then +/// falls back to refining whatever level was reached. +const COARSEN_STALL_RATIO: f64 = 0.90; + +/// Nets larger than this are skipped when rating vertices for heavy-edge +/// matching: a co-pin of an `m`-pin net earns only `weight/(m−1)` toward a +/// match, so large nets carry negligible clustering signal while costing `O(m)` +/// to scan. Keeping the matching rating near-linear bounds coarsening time. +const MATCH_NET_CAP: usize = 100; + +/// FM passes per level during uncoarsening. The projected partition is already +/// globally good, so a single boundary-refinement pass suffices to clean up each +/// level's cut frontier (passes also stop early once one yields no improvement). +const REFINE_PASSES: u32 = 1; + +/// Initial-partition restarts on the (tiny) coarsest graph: greedy graph-growing +/// from this many diverse deterministic seeds, each FM-refined, keeping the +/// lowest-cut result. Graph-growing from a single seed can "leak" across a thin +/// bridge between two clusters; diverse seeds (one of which starts far from the +/// bridge) recover the clean cut. Cheap — the coarsest graph is ≈ +/// [`COARSEST_TARGET`] vertices and this runs once per bisection. +const INITIAL_RESTARTS: usize = 4; + +/// Bisections of at least this many vertices are logged by [`PartitionProgress`] +/// (purely an observability threshold — a slow run should never look stuck). +const PROGRESS_BISECT_LOG: usize = 50_000; /// Nets with more pins than this are "hubs" (e.g. a core lemma delta-unfolded by /// thousands of blocks). They are cut in essentially any balanced partition, so @@ -163,6 +193,29 @@ impl Hypergraph { } } +/// A borrowed, cut-relevant view of one hypergraph level — either a top-level +/// [`SubHyper`] or a coarsened [`CoarseLevel`]. The greedy / FM machinery is +/// written against this view so it runs unchanged at every level of the +/// multilevel hierarchy. `Copy` (four slice references), so it is passed by +/// value freely. +#[derive(Clone, Copy)] +struct Level<'a> { + /// vertex weights (heartbeats) + vw: &'a [u64], + /// balance weights (capped heartbeats) + bw: &'a [u64], + /// nets: (weight, pins) + nets: &'a [(u64, Vec)], + /// vertex -> incident net indices + vnets: &'a [Vec], +} + +impl<'a> Level<'a> { + fn num_vertices(&self) -> usize { + self.vw.len() + } +} + /// A sub-hypergraph used during recursion, with vertices re-indexed locally and /// nets restricted (cut-net split) to the vertices it contains. struct SubHyper { @@ -221,6 +274,11 @@ impl SubHyper { self.vw.len() } + /// Borrow this sub-hypergraph as a [`Level`] for the greedy / FM machinery. + fn level(&self) -> Level<'_> { + Level { vw: &self.vw, bw: &self.bw, nets: &self.nets, vnets: &self.vnets } + } + /// Induce the sub-hypergraph on the local vertices selected by `keep`, /// applying cut-net splitting: each net is restricted to its kept pins and /// dropped if fewer than two remain. @@ -285,7 +343,9 @@ fn rec_bisect( let nl = left.num_vertices(); let nr = right.num_vertices(); if nl == 0 || nr == 0 { - // Degenerate split (shouldn't happen for |sub| ≥ 2): keep together. + // Degenerate split: `bisect` guarantees both sides non-empty for |sub| ≥ 2 + // (its initial partition rejects empty-sided candidates), so this is a + // defensive backstop only — keep the subtree together as one shard. for &g in &sub.global { shard_of[g as usize].store(base, Ordering::Relaxed); } @@ -340,7 +400,7 @@ impl PartitionProgress { /// Log a large bisection about to run (the slow steps). `&self` so it can be /// shared across the parallel recursion. fn bisecting(&self, n: usize, k: usize) { - if self.enabled && n >= FM_SKIP_VERTICES { + if self.enabled && n >= PROGRESS_BISECT_LOG { eprintln!( "[shard] bisecting {n} vertices → {k} shards ({:.1?} elapsed, {}/{} shards done)", self.start.elapsed(), @@ -377,7 +437,13 @@ impl PartitionProgress { } /// Bisect `sub` into two balanced parts minimizing cut weight. Returns a side -/// (0/1) per local vertex. Greedy graph-growing initial partition + FM. +/// (0/1) per local vertex. +/// +/// Multilevel V-cycle: coarsen `sub` into a hierarchy, decide the cut on the +/// tiny coarsest graph (greedy graph-growing + FM to convergence), then +/// uncoarsen — projecting the cut back down and boundary-refining each level. +/// When `sub` is small or won't coarsen, `levels` is empty and we partition it +/// directly, so the worst case is never worse than a flat bisection. fn bisect(sub: &SubHyper, epsilon: f64) -> Vec { let n = sub.num_vertices(); if n == 0 { @@ -387,27 +453,24 @@ fn bisect(sub: &SubHyper, epsilon: f64) -> Vec { return vec![0]; } let total_bw: u64 = sub.bw.iter().sum(); - // Balance bounds for side weights. + // Balance bounds for side weights (invariant across levels — coarsening sums + // balance weight, so the total is preserved at every level). let wmax = ((0.5 + epsilon) * total_bw as f64).ceil() as u64; let wmin = ((0.5 - epsilon) * total_bw as f64).floor() as u64; - let mut side = greedy_grow(sub, total_bw); - fm_refine(sub, &mut side, wmin, wmax); - side + let levels = coarsen(sub); + let coarsest = + levels.last().map(|l| l.level()).unwrap_or_else(|| sub.level()); + let side = initial_partition(coarsest, wmin, wmax); + uncoarsen_refine(sub, &levels, side, wmin, wmax) } -/// Greedy graph-growing: start from the heaviest vertex and repeatedly absorb -/// the boundary vertex most strongly connected (by net weight) to side 0 until -/// side 0 reaches ~half the balance weight. Remaining vertices go to side 1. -fn greedy_grow(sub: &SubHyper, total_bw: u64) -> Vec { - let n = sub.num_vertices(); +/// Greedy graph-growing: start from `seed` and repeatedly absorb the boundary +/// vertex most strongly connected (by net weight) to side 0 until side 0 reaches +/// ~half the balance weight. Remaining vertices go to side 1. +fn greedy_grow(lv: Level<'_>, total_bw: u64, seed: usize) -> Vec { + let n = lv.num_vertices(); let mut side = vec![1u8; n]; - // Seed: heaviest vertex (deterministic; ties broken by lowest id). - let seed = (0..n) - .max_by(|&a, &b| { - sub.vw[a].cmp(&sub.vw[b]).then(Reverse(a).cmp(&Reverse(b))) - }) - .unwrap(); // connection[v] = total weight of nets that already touch side 0 and include v. let mut connection = vec![0u64; n]; @@ -415,7 +478,7 @@ fn greedy_grow(sub: &SubHyper, total_bw: u64) -> Vec { // Whether a net already contributes to side 0. A net bumps its pins exactly // once (on first connection), so growth is O(Σ net_size), not O(Σ net_size²) // — essential on dense graphs (and it also stops over-counting connection). - let mut net_touched = vec![false; sub.nets.len()]; + let mut net_touched = vec![false; lv.nets.len()]; let mut heap: BinaryHeap<(u64, Reverse)> = BinaryHeap::new(); let mut side0_bw = 0u64; let target = total_bw / 2; @@ -431,10 +494,10 @@ fn greedy_grow(sub: &SubHyper, total_bw: u64) -> Vec { side0_bw: &mut u64| { side[v] = 0; in_side0[v] = true; - *side0_bw += sub.bw[v]; - for &ni in &sub.vnets[v] { + *side0_bw += lv.bw[v]; + for &ni in &lv.vnets[v] { let nis = ni as usize; - let (w, pins) = &sub.nets[nis]; + let (w, pins) = &lv.nets[nis]; // Skip hub nets, and nets already counted toward side 0. if pins.len() > FM_NET_CAP || net_touched[nis] { continue; @@ -512,10 +575,10 @@ struct NetState { } impl NetState { - fn new(sub: &SubHyper, side: &[u8]) -> (NetState, u128) { - let mut cnt = vec![[0u32; 2]; sub.nets.len()]; + fn new(lv: Level<'_>, side: &[u8]) -> (NetState, u128) { + let mut cnt = vec![[0u32; 2]; lv.nets.len()]; let mut cut: u128 = 0; - for (i, (w, pins)) in sub.nets.iter().enumerate() { + for (i, (w, pins)) in lv.nets.iter().enumerate() { if pins.len() > FM_NET_CAP { continue; // hub net: invisible to FM (counts stay zero) } @@ -533,12 +596,12 @@ impl NetState { /// FM gain of moving local vertex `v` from its side to the other. /// `+w` for each net where `v`'s side has exactly one pin (the move uncuts it); /// `−w` for each net fully on `v`'s side with ≥2 pins (the move newly cuts it). -fn fm_gain(sub: &SubHyper, ns: &NetState, side: &[u8], v: usize) -> i128 { +fn fm_gain(lv: Level<'_>, ns: &NetState, side: &[u8], v: usize) -> i128 { let a = side[v] as usize; let b = 1 - a; let mut g: i128 = 0; - for &ni in &sub.vnets[v] { - let (w, pins) = &sub.nets[ni as usize]; + for &ni in &lv.vnets[v] { + let (w, pins) = &lv.nets[ni as usize]; if pins.len() > FM_NET_CAP { continue; // hub net: ignored by FM } @@ -553,48 +616,87 @@ fn fm_gain(sub: &SubHyper, ns: &NetState, side: &[u8], v: usize) -> i128 { g } -/// Fiduccia–Mattheyses refinement: repeated passes, each moving every vertex at -/// most once in decreasing-gain order subject to the balance window, tracking -/// the lowest-cut prefix and rolling back to it. Stops when a pass yields no -/// improvement. -fn fm_refine(sub: &SubHyper, side: &mut [u8], wmin: u64, wmax: u64) { - let n = sub.num_vertices(); - // Greedy-only above the skip threshold (FM is too slow on huge dense subs); - // a few passes in the middle band; full (capped) convergence when small. - if n <= 1 || n > FM_SKIP_VERTICES { +/// Fiduccia–Mattheyses refinement (boundary variant): up to `max_passes` passes, +/// each moving every vertex at most once in decreasing-gain order subject to the +/// balance window, tracking the lowest-cut prefix and rolling back to it. Stops +/// when a pass yields no improvement. +/// +/// Only **boundary** vertices (incident to a cut net) are seeded into the gain +/// heap: an interior vertex has non-positive gain (moving it can only newly-cut +/// nets), so it never improves the cut; vertices exposed to the boundary by an +/// applied move are added as that move's neighbors. This bounds a pass at +/// roughly `O(boundary + moves·degree)` rather than `O(V·degree)`, which is what +/// makes full refinement affordable even on the finest (≈`V`-sized) level. +fn fm_refine(lv: Level<'_>, side: &mut [u8], wmin: u64, wmax: u64, max_passes: u32) { + let n = lv.num_vertices(); + if n <= 1 { return; } - let budget = if n > FM_FULL_VERTICES { 3 } else { MAX_FM_PASSES }; + // Contribution of one net (counts `ca` on the vertex's side, `cb` on the + // other, weight `w`) to the gain of moving that vertex: `+w` if it is the last + // pin on its side (the move uncuts the net), `−w` if its side is full with ≥2 + // pins (the move newly cuts it), else 0. + let contrib = |ca: u32, cb: u32, w: i128| -> i128 { + if cb >= 1 && ca == 1 { + w + } else if cb == 0 && ca >= 2 { + -w + } else { + 0 + } + }; + // Buffers reused across passes (no per-move allocation). + let mut gain = vec![0i128; n]; + let mut queued = vec![false; n]; + let mut locked = vec![false; n]; + let mut heap: BinaryHeap<(i128, Reverse)> = BinaryHeap::new(); + let mut moves: Vec = Vec::new(); let mut pass = 0u32; loop { - let (mut ns, mut cut) = NetState::new(sub, side); + let (mut ns, mut cut) = NetState::new(lv, side); let mut side0_bw: u64 = - (0..n).filter(|&v| side[v] == 0).map(|v| sub.bw[v]).sum(); - - let mut heap: BinaryHeap<(i128, Reverse)> = BinaryHeap::new(); - let mut cur_gain = vec![0i128; n]; - for v in 0..n { - let g = fm_gain(sub, &ns, side, v); - cur_gain[v] = g; - heap.push((g, Reverse(v as u32))); + (0..n).filter(|&v| side[v] == 0).map(|v| lv.bw[v]).sum(); + + // Full gain initialization (O(pins), cheap) so the incremental neighbor + // updates below — which only adjust the changed net's contribution — stay + // exact for interior vertices that later reach the boundary. + for (v, g) in gain.iter_mut().enumerate() { + *g = fm_gain(lv, &ns, side, v); + } + heap.clear(); + moves.clear(); + queued.iter_mut().for_each(|q| *q = false); + locked.iter_mut().for_each(|l| *l = false); + // Seed the cut frontier only (interior vertices have non-positive gain). + for (i, (_, pins)) in lv.nets.iter().enumerate() { + if pins.len() > FM_NET_CAP { + continue; // hub net: invisible to FM + } + if ns.cnt[i][0] > 0 && ns.cnt[i][1] > 0 { + for &v in pins { + let v = v as usize; + if !queued[v] { + queued[v] = true; + heap.push((gain[v], Reverse(v as u32))); + } + } + } } - let mut locked = vec![false; n]; - let mut moves: Vec = Vec::new(); let mut best_cut = cut; let mut best_prefix = 0usize; // number of moves to keep while let Some((g, Reverse(v))) = heap.pop() { let v = v as usize; - if locked[v] || g != cur_gain[v] { + if locked[v] || g != gain[v] { continue; // stale entry } // Balance check: moving v shifts bw[v] across the cut. let (s0_after, ok) = if side[v] == 0 { - let s0 = side0_bw - sub.bw[v]; + let s0 = side0_bw - lv.bw[v]; (s0, s0 >= wmin) } else { - let s0 = side0_bw + sub.bw[v]; + let s0 = side0_bw + lv.bw[v]; (s0, s0 <= wmax) }; if !ok { @@ -602,50 +704,50 @@ fn fm_refine(sub: &SubHyper, side: &mut [u8], wmin: u64, wmax: u64) { locked[v] = true; continue; } - // Apply the move. + // Apply the move, updating the cut and the gains of co-pins incrementally: + // only this net's contribution to each pin changes, so a move costs + // O(Σ pins of v's nets), not O(Σ neighbour degrees). let a = side[v] as usize; - let b = 1 - a; - for &ni in &sub.vnets[v] { - let (w, pins) = &sub.nets[ni as usize]; + for &ni in &lv.vnets[v] { + let nis = ni as usize; + let (w, pins) = &lv.nets[nis]; if pins.len() > FM_NET_CAP { - continue; // hub net: not tracked, skip cnt update + continue; // hub net: not tracked } - let before = ns.cnt[ni as usize][0] > 0 && ns.cnt[ni as usize][1] > 0; - ns.cnt[ni as usize][a] -= 1; - ns.cnt[ni as usize][b] += 1; - let after = ns.cnt[ni as usize][0] > 0 && ns.cnt[ni as usize][1] > 0; - if before && !after { + let wi = *w as i128; + let c0 = ns.cnt[nis][0]; + let c1 = ns.cnt[nis][1]; + let (d0, d1) = if a == 0 { (c0 - 1, c1 + 1) } else { (c0 + 1, c1 - 1) }; + ns.cnt[nis][0] = d0; + ns.cnt[nis][1] = d1; + let before_cut = c0 > 0 && c1 > 0; + let after_cut = d0 > 0 && d1 > 0; + if before_cut && !after_cut { cut -= *w as u128; - } else if !before && after { + } else if !before_cut && after_cut { cut += *w as u128; } + for &uu in pins { + let u = uu as usize; + if u == v || locked[u] { + continue; + } + let su = side[u] as usize; + let (ca, cb) = if su == 0 { (c0, c1) } else { (c1, c0) }; + let (ca2, cb2) = if su == 0 { (d0, d1) } else { (d1, d0) }; + let delta = contrib(ca2, cb2, wi) - contrib(ca, cb, wi); + if delta != 0 { + gain[u] += delta; + queued[u] = true; + heap.push((gain[u], Reverse(uu))); + } + } } - side[v] = b as u8; + side[v] = (1 - a) as u8; side0_bw = s0_after; locked[v] = true; moves.push(v); - // Recompute gains of neighbors (and v's net co-pins). - let mut touched: FxHashSet = FxHashSet::default(); - for &ni in &sub.vnets[v] { - let pins = &sub.nets[ni as usize].1; - if pins.len() > FM_NET_CAP { - continue; // hub net: ignored by FM - } - for &u in pins { - if !locked[u as usize] { - touched.insert(u); - } - } - } - for u in touched { - let ng = fm_gain(sub, &ns, side, u as usize); - if ng != cur_gain[u as usize] { - cur_gain[u as usize] = ng; - heap.push((ng, Reverse(u))); - } - } - if cut < best_cut { best_cut = cut; best_prefix = moves.len(); @@ -658,12 +760,273 @@ fn fm_refine(sub: &SubHyper, side: &mut [u8], wmin: u64, wmax: u64) { } pass += 1; - if best_prefix == 0 || pass >= budget { + if best_prefix == 0 || pass >= max_passes { break; // converged, or hit the pass bound } } } +// ============================================================================ +// Multilevel coarsening +// ============================================================================ + +/// One coarser level of a single bisection's multilevel hierarchy. Mirrors the +/// cut-relevant fields of [`SubHyper`] (`vw`/`bw`/`nets`/`vnets`) plus the +/// `match_map` that records, for each vertex of the *next-finer* level, which +/// supervertex here it was merged into — the inverse map used to project a +/// coarse partition back down during uncoarsening. +struct CoarseLevel { + vw: Vec, + bw: Vec, + nets: Vec<(u64, Vec)>, + vnets: Vec>, + match_map: Vec, +} + +impl CoarseLevel { + fn level(&self) -> Level<'_> { + Level { vw: &self.vw, bw: &self.bw, nets: &self.nets, vnets: &self.vnets } + } +} + +/// Build the coarsening hierarchy for one bisection: `levels[0]` is the first +/// contraction of `sub`, `levels.last()` is the coarsest graph (≈ +/// [`COARSEST_TARGET`] vertices). The finest level (`sub` itself) is *not* +/// duplicated into the vector — it is supplied separately during uncoarsening. +/// Returns an empty vector when `sub` is already small or can't be coarsened, in +/// which case [`bisect`] partitions `sub` directly. +fn coarsen(sub: &SubHyper) -> Vec { + let total_bw: u64 = sub.bw.iter().sum(); + // Cap a supervertex's balance weight so the coarsest graph keeps at least + // ~COARSEST_TARGET pieces and stays balanceable; never merge past it. + let max_cluster = (total_bw / COARSEST_TARGET as u64).max(1); + let mut levels: Vec = Vec::new(); + loop { + let cur: Level<'_> = match levels.last() { + Some(l) => l.level(), + None => sub.level(), + }; + let n = cur.num_vertices(); + if n <= COARSEST_TARGET { + break; + } + let (super_id, next) = match_vertices(cur, max_cluster); + if (next as f64) > COARSEN_STALL_RATIO * n as f64 { + break; // stalled: too few matchable pairs to make progress + } + let level = contract(cur, &super_id, next); + levels.push(level); + if next <= COARSEST_TARGET { + break; + } + } + levels +} + +/// Heavy-edge matching pass: pair each still-unmatched vertex (visited in id +/// order) with the unmatched neighbor it co-occurs with in the heaviest small +/// nets, subject to the cluster-weight cap. A vertex with no such neighbor +/// (delta-sparse, or only in hub nets) is instead paired with the next +/// unmatched vertex — a cut-neutral merge (they share no tracked net) that keeps +/// the graph shrinking ~2× per pass so coarsening reaches [`COARSEST_TARGET`] +/// instead of stalling far above it (which would make the initial partition +/// expensive). Returns `(super_id, num_super)` where `super_id[v]` is `v`'s +/// supervertex in the next-coarser level. Deterministic — ties break to lowest +/// id. +fn match_vertices(lv: Level<'_>, max_cluster: u64) -> (Vec, usize) { + let n = lv.num_vertices(); + let mut super_id = vec![u32::MAX; n]; + let mut next: u32 = 0; + // Dense score accumulator, reset per vertex via the `touched` list. + let mut score = vec![0u64; n]; + let mut touched: Vec = Vec::new(); + // Forward cursor over still-unmatched vertices for the fallback pairing + // (advances monotonically → amortized O(n) over the whole pass). + let mut fb = 0usize; + for v in 0..n { + if super_id[v] != u32::MAX { + continue; // already claimed as an earlier vertex's partner + } + for &ni in &lv.vnets[v] { + let (w, pins) = &lv.nets[ni as usize]; + let deg = pins.len(); + if deg < 2 || deg > MATCH_NET_CAP { + continue; // singleton or hub: no clustering signal worth scanning + } + let contrib = w / (deg as u64 - 1); + if contrib == 0 { + continue; + } + for &u in pins { + let u = u as usize; + if u == v || super_id[u] != u32::MAX { + continue; // self, or already-matched vertex + } + if score[u] == 0 { + touched.push(u as u32); + } + score[u] = score[u].saturating_add(contrib); + } + } + // Pick the best-scoring partner that fits the cluster-weight cap. The + // explicit `u < best_u` tie-break makes the result independent of the + // (net/pin) iteration order. + let mut best_u = usize::MAX; + let mut best_score = 0u64; + for &ut in &touched { + let u = ut as usize; + let s = score[u]; + let fits = lv.bw[v].saturating_add(lv.bw[u]) <= max_cluster; + if fits && (s > best_score || (s == best_score && u < best_u)) { + best_score = s; + best_u = u; + } + } + for &ut in &touched { + score[ut as usize] = 0; + } + touched.clear(); + // Fallback: no heavy-edge partner — pair with the next unmatched vertex so + // coarsening keeps making progress. Cut-neutral (no shared tracked net). + // The cursor skips matched vertices and everything ≤ v (so never v itself). + if best_u == usize::MAX { + while fb < n && (super_id[fb] != u32::MAX || fb <= v) { + fb += 1; + } + if fb < n && lv.bw[v].saturating_add(lv.bw[fb]) <= max_cluster { + best_u = fb; + } + } + super_id[v] = next; + if best_u != usize::MAX { + super_id[best_u] = next; // matched pair shares a supervertex + } + next += 1; + } + (super_id, next as usize) +} + +/// Contract `lv` by `super_id` (which maps each vertex to one of `next` +/// supervertices) into the next coarser [`CoarseLevel`]. Supervertex weights are +/// member sums; each net's pins are remapped, sorted, and deduplicated, and the +/// net is dropped if it has fewer than two distinct coarse pins (it became +/// internal to a supervertex). Net weights are preserved. `super_id` is stored +/// as the level's `match_map` for the uncoarsening projection. +fn contract(lv: Level<'_>, super_id: &[u32], next: usize) -> CoarseLevel { + let mut vw = vec![0u64; next]; + let mut bw = vec![0u64; next]; + for v in 0..lv.num_vertices() { + let s = super_id[v] as usize; + vw[s] = vw[s].saturating_add(lv.vw[v]); + bw[s] = bw[s].saturating_add(lv.bw[v]); + } + let mut nets: Vec<(u64, Vec)> = Vec::with_capacity(lv.nets.len()); + for (w, pins) in lv.nets { + let mut cp: Vec = + pins.iter().map(|&p| super_id[p as usize]).collect(); + cp.sort_unstable(); + cp.dedup(); + if cp.len() >= 2 { + nets.push((*w, cp)); + } + } + let mut vnets = vec![Vec::new(); next]; + for (i, (_, pins)) in nets.iter().enumerate() { + for &v in pins { + vnets[v as usize].push(i as u32); + } + } + CoarseLevel { vw, bw, nets, vnets, match_map: super_id.to_vec() } +} + +/// Decide the bisection on the coarsest graph: greedy graph-growing for a +/// locality-aware initial cut, then FM to (capped) convergence. The graph is +/// tiny (≈[`COARSEST_TARGET`] vertices), so this is where we spend on quality — +/// the *global* cut is decided here and only locally refined while uncoarsening. +/// We restart from several diverse seeds (see [`INITIAL_RESTARTS`]) and keep the +/// lowest-cut partition, which makes graph-growing robust to "leaking" across a +/// thin bridge between two clusters. +fn initial_partition(lv: Level<'_>, wmin: u64, wmax: u64) -> Vec { + let n = lv.num_vertices(); + if n == 0 { + return Vec::new(); + } + if n == 1 { + return vec![0]; + } + let total_bw: u64 = lv.bw.iter().sum(); + // Diverse deterministic seeds: the heaviest vertex plus points spread evenly + // across the id range (one tends to start far from any inter-cluster bridge). + let heaviest = (0..n) + .max_by(|&a, &b| lv.vw[a].cmp(&lv.vw[b]).then(Reverse(a).cmp(&Reverse(b)))) + .unwrap(); + let mut seeds = vec![heaviest]; + for i in 0..INITIAL_RESTARTS { + let s = (i * n) / INITIAL_RESTARTS; + if !seeds.contains(&s) { + seeds.push(s); + } + } + // Select the lowest-cut candidate, but *only* among non-degenerate splits + // (both sides non-empty) and preferring those within the balance window. A + // graph-growing run seeded at a light vertex can sweep an entire sub onto one + // side (cut 0) when a single atomic block holds more than half the balance + // weight; such a split is both unbalanced and degenerate, and accepting it + // would leave a shard empty downstream. Key: (unbalanced?, cut), minimized. + let mut best: Option<((u8, u128), Vec)> = None; + for &seed in &seeds { + let mut side = greedy_grow(lv, total_bw, seed); + fm_refine(lv, &mut side, wmin, wmax, MAX_FM_PASSES); + let s0 = (0..n).filter(|&v| side[v] == 0).count(); + if s0 == 0 || s0 == n { + continue; // degenerate (one side empty) — never select + } + let side0_bw: u64 = (0..n).filter(|&v| side[v] == 0).map(|v| lv.bw[v]).sum(); + let unbalanced = u8::from(side0_bw < wmin || side0_bw > wmax); + let (_, cut) = NetState::new(lv, &side); + let key = (unbalanced, cut); + if best.as_ref().is_none_or(|(bk, _)| key < *bk) { + best = Some((key, side)); + } + } + // Fallback (no non-degenerate candidate — pathological): split by id so both + // sides are non-empty and recursion can still realize every shard. + best.map(|(_, s)| s).unwrap_or_else(|| { + (0..n).map(|v| u8::from(v >= n / 2)).collect() + }) +} + +/// Project the coarse partition down to the finest level, boundary-refining at +/// each step. `side` enters indexed by coarsest supervertex; each finer level +/// inherits its supervertex's side (`side[fine] = coarse[match_map[fine]]`) and +/// is FM-refined ([`REFINE_PASSES`] passes) to fix only its cut frontier. The +/// returned `side` is indexed by `sub` vertex — the same contract a flat +/// bisection returns. +fn uncoarsen_refine( + sub: &SubHyper, + levels: &[CoarseLevel], + mut side: Vec, + wmin: u64, + wmax: u64, +) -> Vec { + for i in (0..levels.len()).rev() { + // The level finer than `levels[i]` is `levels[i-1]`, or `sub` at the bottom. + let (finer_n, finer_lv) = if i == 0 { + (sub.num_vertices(), sub.level()) + } else { + (levels[i - 1].vw.len(), levels[i - 1].level()) + }; + let mm = &levels[i].match_map; // finer vertex -> levels[i] supervertex + let mut finer_side = vec![0u8; finer_n]; + for v in 0..finer_n { + finer_side[v] = side[mm[v] as usize]; + } + fm_refine(finer_lv, &mut finer_side, wmin, wmax, REFINE_PASSES); + side = finer_side; + } + side +} + // ============================================================================ // Manifest // ============================================================================ @@ -968,6 +1331,16 @@ mod tests { Address::from_slice(&[byte; 32]).unwrap() } + /// A distinct address for each `n` (more than the 256 `addr(u8)` affords), + /// for fixtures large enough to exercise the multilevel coarsening path. + /// Big-endian so that address-sort order (which fixes block ids) matches + /// numeric order, keeping cluster members id-contiguous in the fixtures below. + fn addr_u32(n: u32) -> Address { + let mut b = [0u8; 32]; + b[..4].copy_from_slice(&n.to_be_bytes()); + Address::from_slice(&b).unwrap() + } + /// Two tight clusters {1,2,3} and {4,5,6} with heavy intra-cluster delta and a /// single thin cross edge. A good bisection cuts only the thin edge. fn two_clusters() -> BlockProfile { @@ -1116,4 +1489,137 @@ mod tests { let _ = std::fs::remove_file(&prof); let _ = std::fs::remove_file(&shard); } + + #[test] + fn contract_sums_weights_and_drops_internal_nets() { + // 4 vertices, 3 nets. Merge {0,1}->super0 and {2,3}->super1. + let global = vec![0u32, 1, 2, 3]; + let vw = vec![10u64, 20, 30, 40]; + let nets = vec![ + (100u64, vec![0u32, 1]), // becomes internal to super0 -> dropped + (200u64, vec![0u32, 2]), // crosses -> kept + (300u64, vec![2u32, 3]), // becomes internal to super1 -> dropped + ]; + let sub = SubHyper::assemble(global, vw, nets, 1_000_000); + let super_id = vec![0u32, 0, 1, 1]; + let cl = contract(sub.level(), &super_id, 2); + + assert_eq!(cl.vw, vec![30, 70], "supervertex weights are member sums"); + assert_eq!(cl.bw, vec![30, 70]); + assert_eq!(cl.match_map, super_id); + // Only the crossing net survives, with its weight preserved. + assert_eq!(cl.nets.len(), 1); + assert_eq!(cl.nets[0].0, 200); + assert_eq!(cl.nets[0].1, vec![0u32, 1]); + // vnets rebuilt to reference the surviving net. + assert_eq!(cl.vnets, vec![vec![0u32], vec![0u32]]); + } + + #[test] + fn match_respects_cluster_weight_cap() { + // 0 and 1 share a heavy net; 2 is isolated. bw is each 10. + let sub = SubHyper::assemble( + vec![0u32, 1, 2], + vec![10u64, 10, 10], + vec![(100u64, vec![0u32, 1])], + 1_000_000, + ); + // Generous cap: 0 and 1 merge, 2 stays singleton -> 2 supervertices. + let (sid, k) = match_vertices(sub.level(), 100); + assert_eq!(sid[0], sid[1]); + assert_ne!(sid[0], sid[2]); + assert_eq!(k, 2); + // Cap below the combined weight (10+10=20): no merge -> 3 supervertices. + let (sid2, k2) = match_vertices(sub.level(), 19); + assert_ne!(sid2[0], sid2[1]); + assert_eq!(k2, 3); + } + + /// Two tight clusters of `m` blocks each (intra-cluster delta cycles) joined by + /// a single thin cross edge — large enough (`2m > COARSEST_TARGET`) to drive + /// the full coarsen → uncoarsen path. + fn two_big_clusters(m: u32) -> BlockProfile { + let mut b = ProfileBuilder::new(); + for i in 0..2 * m { + b.block(addr_u32(i + 1), 100, 1000, 1); + } + for i in 0..m { + // cluster A cycle over addrs 1..=m + b.delta_edge(addr_u32(1 + i), addr_u32(1 + (i + 1) % m)); + // cluster B cycle over addrs m+1..=2m + b.delta_edge(addr_u32(1 + m + i), addr_u32(1 + m + (i + 1) % m)); + } + // single thin cross edge: A_0 unfolds B_0 + b.delta_edge(addr_u32(1), addr_u32(1 + m)); + b.finish() + } + + #[test] + fn multilevel_separates_large_clusters() { + // 1024 blocks: past the cap "dead-zone" (n < 2·COARSEST_TARGET), so heavy- + // edge matching contracts the graph across multiple levels. + let m = 512u32; + let p = two_big_clusters(m); + let h = Hypergraph::from_profile(&p); + let shard_of = h.partition(2, 0.10); + + // Optimal cut is exactly the single cross net (size 1000). Any split that + // also breaks a cluster cycle would cut an intra net (also 1000), so this + // equality certifies clean cluster separation. + assert_eq!(h.connectivity_objective(&shard_of), 1000); + + // Cluster coherence, mapping addresses to (sorted) block ids. + let id_of: std::collections::HashMap = (0..p.num_blocks() + as u32) + .map(|i| (p.block(i).addr.clone(), i)) + .collect(); + let sid = |n: u32| shard_of[id_of[&addr_u32(n)] as usize]; + let shard_a = sid(1); + let shard_b = sid(1 + m); + assert_ne!(shard_a, shard_b); + for i in 0..m { + assert_eq!(sid(1 + i), shard_a, "cluster A must be coherent"); + assert_eq!(sid(1 + m + i), shard_b, "cluster B must be coherent"); + } + } + + #[test] + fn multilevel_partition_is_deterministic() { + // 360 blocks > COARSEST_TARGET, partitioned into 4 (exercises recursion + + // coarsening). The partition must be byte-for-byte reproducible. + let p = two_big_clusters(180); + let h = Hypergraph::from_profile(&p); + let a = h.partition(4, 0.10); + let b = h.partition(4, 0.10); + assert_eq!(a, b); + } + + #[test] + fn multilevel_no_empty_shards_with_dominant_block() { + // One block far heavier than all others, among enough light blocks to drive + // coarsening. During recursion a sub can be dominated by the giant; a + // restart seeded at a light block then sweeps the whole sub onto one side + // (cut 0). The initial-partition selection must reject such degenerate + // candidates, or a shard ends up empty. (Regression guard.) + let mut b = ProfileBuilder::new(); + let m = 800u32; + b.block(addr_u32(1), 5_000_000, 100, 1); // the giant + for i in 2..=m { + b.block(addr_u32(i), 1000, 100, 1); + } + for i in 1..m { + b.delta_edge(addr_u32(i), addr_u32(i + 1)); // a chain (incl. the giant) + } + let p = b.finish(); + let h = Hypergraph::from_profile(&p); + for n in [16usize, 64, 200] { + let shard_of = h.partition(n, 0.05); + let mani = ShardManifest::build(&p, &shard_of, n); + assert_eq!( + mani.shards.iter().filter(|s| s.blocks.is_empty()).count(), + 0, + "n={n}: every shard must be non-empty despite the dominant block" + ); + } + } } From 4def34c5dc09cc3b914dcb526b460f0f0d7ac283 Mon Sep 17 00:00:00 2001 From: "John C. Burnham" Date: Mon, 8 Jun 2026 06:33:25 -0400 Subject: [PATCH 3/3] clippy & fmt --- src/ffi/kernel.rs | 8 +++-- src/ix/profile.rs | 6 +++- src/ix/shard.rs | 79 +++++++++++++++++++++++++++++------------------ 3 files changed, 59 insertions(+), 34 deletions(-) diff --git a/src/ffi/kernel.rs b/src/ffi/kernel.rs index e3dcbeb6..84ba7d84 100644 --- a/src/ffi/kernel.rs +++ b/src/ffi/kernel.rs @@ -1715,11 +1715,11 @@ fn profile_block_of(env: &IxonEnv, addr: &Address) -> Address { } /// Serialized byte length of a block constant (the ingress-cost / net weight). +#[allow(clippy::cast_possible_truncation)] // clamped to u32::MAX above fn profile_block_size(env: &IxonEnv, block: &Address) -> u32 { env .get_const_bytes(block) - .map(|b| b.len().min(u32::MAX as usize) as u32) - .unwrap_or(0) + .map_or(0, |b| b.len().min(u32::MAX as usize) as u32) } /// Aggregate per-constant records into a block-level [`BlockProfile`]: map each @@ -1753,7 +1753,8 @@ fn build_block_profile(env: &IxonEnv, merged: &ProfileSink) -> BlockProfile { /// Run the anon kernel over `work`, with per-worker profile recording, and /// return `(passed, failed, merged_sink)`. -#[allow(clippy::needless_pass_by_value)] +// `map_err_ignore`: the discarded `Arc`/`PoisonError` carry no useful context. +#[allow(clippy::needless_pass_by_value, clippy::map_err_ignore)] fn run_anon_profile_parallel( env: Arc, work: Vec, @@ -1926,6 +1927,7 @@ pub extern "C" fn rs_kernel_profile_anon( /// FFI: partition a `.ixesp` into `num_shards` shards and write a `.ixes` /// manifest. Prints a what-if report to stderr. +#[allow(clippy::cast_precision_loss)] // balance_pct is a small percentage #[unsafe(no_mangle)] pub extern "C" fn rs_shard_esp( esp_path: LeanString>, diff --git a/src/ix/profile.rs b/src/ix/profile.rs index fe8e0273..ed05c539 100644 --- a/src/ix/profile.rs +++ b/src/ix/profile.rs @@ -21,6 +21,10 @@ //! `.ixesp` format is an explicit little-endian binary so it does not depend //! on the optional `serde`/`bincode` feature. +// Block ids and counts are `u32` (envs are far below the limit); the binary +// decoder maps decode failures to a single message. Both are intentional here. +#![allow(clippy::cast_possible_truncation, clippy::map_err_ignore)] + use rustc_hash::{FxHashMap, FxHashSet}; use crate::ix::address::Address; @@ -114,7 +118,7 @@ impl BlockProfile { /// Total heartbeats across all blocks. pub fn total_heartbeats(&self) -> u128 { - self.blocks.iter().map(|b| b.heartbeats as u128).sum() + self.blocks.iter().map(|b| u128::from(b.heartbeats)).sum() } /// Serialize to the `.ixesp` binary format. diff --git a/src/ix/shard.rs b/src/ix/shard.rs index 193ae59c..0fd9682e 100644 --- a/src/ix/shard.rs +++ b/src/ix/shard.rs @@ -37,6 +37,21 @@ //! large environments (mathlib ≈ 631k blocks) partitioning in seconds. The //! partitioner is fully self-contained — no external partitioner dependency. +// This module works throughout with `u32` block/vertex ids, `u64`/`u128` weights +// (heartbeats, ingress bytes), and `f64` balance ratios. The narrowing/precision +// casts between them are pervasive and intentional — envs are far below `u32` +// limits and the balance arithmetic tolerates `f64` rounding — so the lossy-cast +// lints are allowed module-wide rather than annotated at ~30 sites. The binary +// manifest decoder maps decode failures to a single message (`map_err_ignore`), +// and a few matching loops mutate the array they index (`needless_range_loop`). +#![allow( + clippy::cast_possible_truncation, + clippy::cast_precision_loss, + clippy::cast_sign_loss, + clippy::map_err_ignore, + clippy::needless_range_loop +)] + use rustc_hash::FxHashSet; use std::cmp::Reverse; use std::collections::BinaryHeap; @@ -131,7 +146,7 @@ impl Hypergraph { let mut pins = Vec::with_capacity(consumers.len() + 1); pins.push(p as u32); // home pin pins.extend_from_slice(consumers); - net_weight.push(profile.block(p as u32).serialized_size as u64); + net_weight.push(u64::from(profile.block(p as u32).serialized_size)); net_pins.push(pins); } Hypergraph { vweight, net_weight, net_pins } @@ -158,7 +173,7 @@ impl Hypergraph { } let lambda = seen.len() as u128; if lambda > 1 { - total += self.net_weight[i] as u128 * (lambda - 1); + total += u128::from(self.net_weight[i]) * (lambda - 1); } } total @@ -179,9 +194,9 @@ impl Hypergraph { // block from skewing every split toward it once there are many shards. The // resulting heartbeat imbalance is accepted; the goal is `num_shards` // **non-empty** parallel shards with minimal cross-shard ingress. - let total_hb: u128 = self.vweight.iter().map(|&w| w as u128).sum(); + let total_hb: u128 = self.vweight.iter().map(|&w| u128::from(w)).sum(); let cap = - (total_hb / num_shards as u128).max(1).min(u64::MAX as u128) as u64; + (total_hb / num_shards as u128).max(1).min(u128::from(u64::MAX)) as u64; let sub = SubHyper::full(self, cap); // Atomic assignment buffer: independent subtrees own disjoint block sets, so // they can be partitioned on separate threads without their writes aliasing. @@ -257,7 +272,7 @@ impl SubHyper { ) -> SubHyper { let n = vw.len(); let mut bw: Vec = vw.iter().map(|&w| w.min(cap)).collect(); - let total: u128 = bw.iter().map(|&w| w as u128).sum(); + let total: u128 = bw.iter().map(|&w| u128::from(w)).sum(); if total == 0 { bw = vec![1; n]; } @@ -354,8 +369,8 @@ fn rec_bisect( } // Allocate leaves ~proportional to heartbeat mass, clamped to [1, side size]. // `lo`/`hi` are feasible (lo ≤ hi) because nl + nr ≥ k with nl, nr ≥ 1. - let wl: u128 = left.vw.iter().map(|&w| w as u128).sum(); - let wr: u128 = right.vw.iter().map(|&w| w as u128).sum(); + let wl: u128 = left.vw.iter().map(|&w| u128::from(w)).sum(); + let wr: u128 = right.vw.iter().map(|&w| u128::from(w)).sum(); let wsum = (wl + wr).max(1); let ideal = ((k as u128 * wl + wsum / 2) / wsum) as usize; let lo = 1.max(k.saturating_sub(nr)); @@ -415,7 +430,7 @@ impl PartitionProgress { fn leaf(&self) { let done = self.finalized.fetch_add(1, Ordering::Relaxed) + 1; let step = (self.total / 16).max(1); - if self.enabled && (done % step == 0 || done == self.total) { + if self.enabled && (done.is_multiple_of(step) || done == self.total) { eprintln!( "[shard] {}/{} shards finalized ({:.1?} elapsed)", done, @@ -459,8 +474,7 @@ fn bisect(sub: &SubHyper, epsilon: f64) -> Vec { let wmin = ((0.5 - epsilon) * total_bw as f64).floor() as u64; let levels = coarsen(sub); - let coarsest = - levels.last().map(|l| l.level()).unwrap_or_else(|| sub.level()); + let coarsest = levels.last().map_or_else(|| sub.level(), |l| l.level()); let side = initial_partition(coarsest, wmin, wmax); uncoarsen_refine(sub, &levels, side, wmin, wmax) } @@ -586,7 +600,7 @@ impl NetState { cnt[i][side[v as usize] as usize] += 1; } if cnt[i][0] > 0 && cnt[i][1] > 0 { - cut += *w as u128; + cut += u128::from(*w); } } (NetState { cnt }, cut) @@ -608,9 +622,9 @@ fn fm_gain(lv: Level<'_>, ns: &NetState, side: &[u8], v: usize) -> i128 { let ca = ns.cnt[ni as usize][a]; let cb = ns.cnt[ni as usize][b]; if cb >= 1 && ca == 1 { - g += *w as i128; + g += i128::from(*w); } else if cb == 0 && ca >= 2 { - g -= *w as i128; + g -= i128::from(*w); } } g @@ -627,7 +641,13 @@ fn fm_gain(lv: Level<'_>, ns: &NetState, side: &[u8], v: usize) -> i128 { /// applied move are added as that move's neighbors. This bounds a pass at /// roughly `O(boundary + moves·degree)` rather than `O(V·degree)`, which is what /// makes full refinement affordable even on the finest (≈`V`-sized) level. -fn fm_refine(lv: Level<'_>, side: &mut [u8], wmin: u64, wmax: u64, max_passes: u32) { +fn fm_refine( + lv: Level<'_>, + side: &mut [u8], + wmin: u64, + wmax: u64, + max_passes: u32, +) { let n = lv.num_vertices(); if n <= 1 { return; @@ -665,8 +685,8 @@ fn fm_refine(lv: Level<'_>, side: &mut [u8], wmin: u64, wmax: u64, max_passes: u } heap.clear(); moves.clear(); - queued.iter_mut().for_each(|q| *q = false); - locked.iter_mut().for_each(|l| *l = false); + queued.fill(false); + locked.fill(false); // Seed the cut frontier only (interior vertices have non-positive gain). for (i, (_, pins)) in lv.nets.iter().enumerate() { if pins.len() > FM_NET_CAP { @@ -714,7 +734,7 @@ fn fm_refine(lv: Level<'_>, side: &mut [u8], wmin: u64, wmax: u64, max_passes: u if pins.len() > FM_NET_CAP { continue; // hub net: not tracked } - let wi = *w as i128; + let wi = i128::from(*w); let c0 = ns.cnt[nis][0]; let c1 = ns.cnt[nis][1]; let (d0, d1) = if a == 0 { (c0 - 1, c1 + 1) } else { (c0 + 1, c1 - 1) }; @@ -723,9 +743,9 @@ fn fm_refine(lv: Level<'_>, side: &mut [u8], wmin: u64, wmax: u64, max_passes: u let before_cut = c0 > 0 && c1 > 0; let after_cut = d0 > 0 && d1 > 0; if before_cut && !after_cut { - cut -= *w as u128; + cut -= u128::from(*w); } else if !before_cut && after_cut { - cut += *w as u128; + cut += u128::from(*w); } for &uu in pins { let u = uu as usize; @@ -850,7 +870,7 @@ fn match_vertices(lv: Level<'_>, max_cluster: u64) -> (Vec, usize) { for &ni in &lv.vnets[v] { let (w, pins) = &lv.nets[ni as usize]; let deg = pins.len(); - if deg < 2 || deg > MATCH_NET_CAP { + if !(2..=MATCH_NET_CAP).contains(°) { continue; // singleton or hub: no clustering signal worth scanning } let contrib = w / (deg as u64 - 1); @@ -922,8 +942,7 @@ fn contract(lv: Level<'_>, super_id: &[u32], next: usize) -> CoarseLevel { } let mut nets: Vec<(u64, Vec)> = Vec::with_capacity(lv.nets.len()); for (w, pins) in lv.nets { - let mut cp: Vec = - pins.iter().map(|&p| super_id[p as usize]).collect(); + let mut cp: Vec = pins.iter().map(|&p| super_id[p as usize]).collect(); cp.sort_unstable(); cp.dedup(); if cp.len() >= 2 { @@ -981,7 +1000,8 @@ fn initial_partition(lv: Level<'_>, wmin: u64, wmax: u64) -> Vec { if s0 == 0 || s0 == n { continue; // degenerate (one side empty) — never select } - let side0_bw: u64 = (0..n).filter(|&v| side[v] == 0).map(|v| lv.bw[v]).sum(); + let side0_bw: u64 = + (0..n).filter(|&v| side[v] == 0).map(|v| lv.bw[v]).sum(); let unbalanced = u8::from(side0_bw < wmin || side0_bw > wmax); let (_, cut) = NetState::new(lv, &side); let key = (unbalanced, cut); @@ -991,9 +1011,8 @@ fn initial_partition(lv: Level<'_>, wmin: u64, wmax: u64) -> Vec { } // Fallback (no non-degenerate candidate — pathological): split by id so both // sides are non-empty and recursion can still realize every shard. - best.map(|(_, s)| s).unwrap_or_else(|| { - (0..n).map(|v| u8::from(v >= n / 2)).collect() - }) + best + .map_or_else(|| (0..n).map(|v| u8::from(v >= n / 2)).collect(), |(_, s)| s) } /// Project the coarse partition down to the finest level, boundary-refining at @@ -1097,13 +1116,13 @@ impl ShardManifest { members[s].iter().map(|&b| profile.block(b).heartbeats).sum(); let own_size: u64 = members[s] .iter() - .map(|&b| profile.block(b).serialized_size as u64) + .map(|&b| u64::from(profile.block(b).serialized_size)) .sum(); let mut fb: Vec = foreign[s].iter().copied().collect(); fb.sort_unstable(); let cross_ingress: u64 = - fb.iter().map(|&p| profile.block(p).serialized_size as u64).sum(); - total_cross += cross_ingress as u128; + fb.iter().map(|&p| u64::from(profile.block(p).serialized_size)).sum(); + total_cross += u128::from(cross_ingress); let foreign_blocks: Vec
= fb.iter().map(|&p| profile.block(p).addr.clone()).collect(); shards.push(ShardInfo { @@ -1134,7 +1153,7 @@ impl ShardManifest { .collect(); let max = hbs.iter().copied().max().unwrap_or(0); let min = nonempty.iter().copied().min().unwrap_or(0); - let total: u128 = hbs.iter().map(|&h| h as u128).sum(); + let total: u128 = hbs.iter().map(|&h| u128::from(h)).sum(); let mean = if self.shards.is_empty() { 0 } else {