diff --git a/Ix/Cli/ProfileCmd.lean b/Ix/Cli/ProfileCmd.lean new file mode 100644 index 00000000..28584673 --- /dev/null +++ b/Ix/Cli/ProfileCmd.lean @@ -0,0 +1,67 @@ +/- + `ix profile `: run the Ix kernel out of circuit over a serialized + `.ixe` environment, recording per-block heartbeats and the delta-unfold graph + into a `.ixesp` sidecar. This is the cost model consumed by `ix shard` + (see `plans/sharding.md`). + + Recording defaults to *cache-isolated* mode: the kernel clears its + cross-constant reduction-memo caches between constants so every delta-unfold + re-executes (sound recording) and recorded heartbeats reflect the in-circuit + (un-memoized) cost. `--keep-caches` trades fidelity for speed. +-/ +module +public import Cli +public import Ix.Common +public import Ix.KernelCheck +public import Std.Internal.UV.System + +public section + +open Ix.KernelCheck + +namespace Ix.Cli.ProfileCmd + +def runProfileCmd (p : Cli.Parsed) : IO UInt32 := do + let some pathArg := p.positionalArg? "path" + | p.printError "error: must specify to a .ixe file" + return 1 + let envPath := pathArg.as! String + let outPath : String := + match p.flag? "out" with + | some flag => flag.as! String + | none => envPath ++ ".ixesp" + let isolate := !(p.flag? "keep-caches" |>.isSome) + let quiet := !(p.flag? "verbose" |>.isSome) + + if let some flag := p.flag? "workers" then + let n := flag.as! Nat + if n == 0 then + p.printError "error: --workers must be > 0" + return 1 + Std.Internal.UV.System.osSetenv "IX_KERNEL_CHECK_WORKERS" (toString n) + + IO.println s!"Profiling {envPath} → {outPath} (isolate={isolate})" + let start ← IO.monoMsNow + rsProfileAnonFFI envPath outPath isolate quiet + let elapsed := (← IO.monoMsNow) - start + IO.println s!"[profile] wrote {outPath} in {elapsed.formatMs}" + return 0 + +end Ix.Cli.ProfileCmd + +open Ix.Cli.ProfileCmd in +def profileCmd : Cli.Cmd := `[Cli| + "profile" VIA runProfileCmd; + "Profile a `.ixe` out of circuit → `.ixesp` (sharding cost + delta graph)" + + FLAGS: + out : String; "Output .ixesp path (default: .ixesp)" + "keep-caches"; "Keep cross-constant caches: faster, lower-fidelity, may under-record" + workers : Nat; "Parallel kernel workers (default: available_parallelism). Plumbs IX_KERNEL_CHECK_WORKERS." + verbose; "Log every constant (default: quiet)" + + ARGS: + path : String; "Path to a serialized .ixe environment" +] + +end diff --git a/Ix/Cli/ShardCmd.lean b/Ix/Cli/ShardCmd.lean new file mode 100644 index 00000000..12ce04ae --- /dev/null +++ b/Ix/Cli/ShardCmd.lean @@ -0,0 +1,62 @@ +/- + `ix shard --shards N`: partition a profiled environment into `N` + shards via recursive balanced min-cut bisection, minimizing cross-shard + delta-unfold ingress (see `plans/sharding.md`). + + Reads the `.ixesp` produced by `ix profile` (pure offline graph work, so `N` + is cheap to re-tune without re-running the kernel). Writes a `.ixes` manifest + and prints a what-if report (per-shard heartbeat balance + total cross-shard + ingress). The partitioner is self-contained — no external graph-library + dependency. +-/ +module +public import Cli +public import Ix.KernelCheck + +public section + +open Ix.KernelCheck + +namespace Ix.Cli.ShardCmd + +def runShardCmd (p : Cli.Parsed) : IO UInt32 := do + let some pathArg := p.positionalArg? "path" + | p.printError "error: must specify to a .ixesp file" + return 1 + let espPath := pathArg.as! String + let numShards : Nat := + match p.flag? "shards" with + | some flag => flag.as! Nat + | none => 1 + let balancePct : Nat := + match p.flag? "balance" with + | some flag => flag.as! Nat + | none => 5 + let outPath : String := + match p.flag? "out" with + | some flag => flag.as! String + | none => espPath ++ ".ixes" + + IO.println s!"Sharding {espPath} into {numShards} shards (balance ±{balancePct}%)" + rsShardEspFFI espPath (toString numShards) (toString balancePct) outPath + if !outPath.isEmpty then + IO.println s!"[shard] wrote {outPath}" + return 0 + +end Ix.Cli.ShardCmd + +open Ix.Cli.ShardCmd in +def shardCmd : Cli.Cmd := `[Cli| + "shard" VIA runShardCmd; + "Partition a `.ixesp` into N balanced shards minimizing cross-shard delta" + + FLAGS: + shards : Nat; "Number of shards N (default 1 = a single shard)" + balance : Nat; "Per-bisection balance tolerance, percent (default 5)" + out : String; "Output .ixes manifest path (default: .ixes)" + + ARGS: + path : String; "Path to a .ixesp produced by `ix profile`" +] + +end diff --git a/Ix/KernelCheck.lean b/Ix/KernelCheck.lean index b5cf3a05..ee0c9ca5 100644 --- a/Ix/KernelCheck.lean +++ b/Ix/KernelCheck.lean @@ -142,6 +142,30 @@ opaque rsCheckAnonFFI : @& String → -- fail-out path ("" = none) IO (Array (String × Option CheckError)) +/-- FFI: profile a `.ixe` out of circuit, writing a `.ixesp` sidecar with + per-block heartbeats + the delta-unfold graph (the sharding cost model, + see `plans/sharding.md`). Runs the anon kernel over every checkable target. + `isolate` clears the kernel's reduction-memo caches between constants for + sound/faithful recording; `quiet` suppresses per-constant progress. -/ +@[extern "rs_kernel_profile_anon"] +opaque rsProfileAnonFFI : + @& String → -- .ixe path + @& String → -- .ixesp output path + @& Bool → -- isolate caches + @& Bool → -- quiet + IO Unit + +/-- FFI: partition a `.ixesp` into `numShards` shards, writing a `.ixes` + manifest. `numShards` and `balancePct` are decimal strings (kept ABI-simple). + Empty `outPath` skips the manifest. Prints a what-if report to stderr. -/ +@[extern "rs_shard_esp"] +opaque rsShardEspFFI : + @& String → -- .ixesp path + @& String → -- num_shards (N) + @& String → -- balance percent + @& String → -- .ixes output path ("" = skip) + IO Unit + end Ix.KernelCheck end diff --git a/Main.lean b/Main.lean index 75ed3623..e72ae1e8 100644 --- a/Main.lean +++ b/Main.lean @@ -5,7 +5,9 @@ import Ix.Cli.CheckRsCmd import Ix.Cli.ClaimCmd import Ix.Cli.CompileCmd import Ix.Cli.IngressCmd +import Ix.Cli.ProfileCmd import Ix.Cli.ProveCmd +import Ix.Cli.ShardCmd import Ix.Cli.TreeCmd import Ix.Cli.ValidateCmd import Ix.Cli.VerifyCmd @@ -27,7 +29,9 @@ def ixCmd : Cli.Cmd := `[Cli| checkRsCmd; claimCmd; treeCmd; + profileCmd; proveCmd; + shardCmd; verifyCmd; addrOfCmd; ingressCmd; diff --git a/src/ffi/kernel.rs b/src/ffi/kernel.rs index 6d8bb3a2..84ba7d84 100644 --- a/src/ffi/kernel.rs +++ b/src/ffi/kernel.rs @@ -81,6 +81,7 @@ use crate::ix::kernel::ingress::{ use crate::ix::kernel::ingress::{ixon_ingress, lean_ingress}; use crate::ix::kernel::mode::{Anon, CheckDupLevelParams, KernelMode, Meta}; use crate::ix::kernel::tc::TypeChecker; +use crate::ix::profile::{BlockProfile, ProfileBuilder, ProfileSink}; unsafe extern "C" { fn lean_name_mk_string( @@ -1682,6 +1683,277 @@ pub extern "C" fn rs_kernel_check_anon( build_anon_result_array(&addrs_for_return, &results) } +// =========================================================================== +// Sharding profiler: run the anon kernel out of circuit over a `.ixe`, +// recording per-block heartbeats + the delta-unfold graph into a `.ixesp`. +// See `plans/sharding.md`. +// =========================================================================== + +/// Summary returned by [`profile_anon_ixe`]. +pub struct ProfileStats { + pub targets: usize, + pub passed: usize, + pub failed: usize, + pub blocks: usize, + pub edges: usize, + pub bytes: usize, +} + +/// Map a constant address to its *block* (ingress unit): the `block` address of +/// a projection constant, otherwise the address itself. +fn profile_block_of(env: &IxonEnv, addr: &Address) -> Address { + match env.get_const(addr) { + Some(c) => match &c.info { + IxonCI::IPrj(p) => p.block.clone(), + IxonCI::CPrj(p) => p.block.clone(), + IxonCI::RPrj(p) => p.block.clone(), + IxonCI::DPrj(p) => p.block.clone(), + _ => addr.clone(), + }, + None => addr.clone(), + } +} + +/// Serialized byte length of a block constant (the ingress-cost / net weight). +#[allow(clippy::cast_possible_truncation)] // clamped to u32::MAX above +fn profile_block_size(env: &IxonEnv, block: &Address) -> u32 { + env + .get_const_bytes(block) + .map_or(0, |b| b.len().min(u32::MAX as usize) as u32) +} + +/// Aggregate per-constant records into a block-level [`BlockProfile`]: map each +/// consumer/producer constant to its home block, attach serialized sizes, and +/// accumulate heartbeats + delta edges. +fn build_block_profile(env: &IxonEnv, merged: &ProfileSink) -> BlockProfile { + let mut builder = ProfileBuilder::new(); + // addr -> (home block, block serialized size), memoized. + let mut cache: FxHashMap = FxHashMap::default(); + let mut resolve = |a: &Address| -> (Address, u32) { + if let Some(v) = cache.get(a) { + return v.clone(); + } + let block = profile_block_of(env, a); + let size = profile_block_size(env, &block); + let v = (block, size); + cache.insert(a.clone(), v.clone()); + v + }; + for (consumer, rec) in &merged.records { + let (cblock, csize) = resolve(consumer); + builder.block(cblock.clone(), rec.fuel, csize, 1); + for prod in &rec.producers { + let (pblock, psize) = resolve(prod); + builder.block(pblock.clone(), 0, psize, 0); + builder.delta_edge(cblock.clone(), pblock); + } + } + builder.finish() +} + +/// Run the anon kernel over `work`, with per-worker profile recording, and +/// return `(passed, failed, merged_sink)`. +// `map_err_ignore`: the discarded `Arc`/`PoisonError` carry no useful context. +#[allow(clippy::needless_pass_by_value, clippy::map_err_ignore)] +fn run_anon_profile_parallel( + env: Arc, + work: Vec, + isolate: bool, + quiet: bool, +) -> Result<(usize, usize, ProfileSink), String> { + let work_total = work.len(); + let worker_count = resolve_kernel_check_workers(work_total, quiet); + eprintln!( + "[rs_kernel_profile] profiling {work_total} work item(s) with {worker_count} worker(s), isolate={isolate}..." + ); + let work = Arc::new(work); + let next_index = Arc::new(AtomicUsize::new(0)); + let passed = Arc::new(AtomicUsize::new(0)); + let failed = Arc::new(AtomicUsize::new(0)); + let sinks: Arc>> = Arc::new(Mutex::new(Vec::new())); + + let mut handles: Vec> = + Vec::with_capacity(worker_count); + for worker_idx in 0..worker_count { + let env = Arc::clone(&env); + let work = Arc::clone(&work); + let next_index = Arc::clone(&next_index); + let passed = Arc::clone(&passed); + let failed = Arc::clone(&failed); + let sinks = Arc::clone(&sinks); + let handle = thread::Builder::new() + .name(format!("ix-kernel-profile-{worker_idx}")) + .stack_size(KERNEL_CHECK_STACK_SIZE) + .spawn(move || { + let mut kenv = KEnv::::new(); + kenv.profile_sink = Some(ProfileSink::new(isolate)); + let clear_every = kernel_check_clear_every(); + let mut checks_since_clear = clear_every; + loop { + let work_idx = next_index.fetch_add(1, Ordering::Relaxed); + if work_idx >= work_total { + break; + } + // `clear_releasing_memory` preserves `profile_sink`, so recording + // accumulates across scheduled-block boundaries. + if checks_since_clear >= clear_every { + kenv.clear_releasing_memory(); + checks_since_clear = 0; + } + let primary_addr = match &work[work_idx] { + AnonWorkItem::Standalone { addr, .. } => addr.clone(), + AnonWorkItem::Block { primary_addr, .. } => primary_addr.clone(), + }; + let kid = KId::::new(primary_addr, ()); + let res = { + let mut tc = + TypeChecker::::new_with_lazy_anon(&mut kenv, &env); + let r = tc.check_const(&kid); + // The TypeChecker is recreated per work item, so the final + // constant's record would never be flushed by a trailing reset — + // flush it explicitly. + tc.finish_constant_accounting(); + r + }; + if res.is_ok() { + passed.fetch_add(1, Ordering::Relaxed); + } else { + failed.fetch_add(1, Ordering::Relaxed); + } + checks_since_clear += 1; + } + if let Some(sink) = kenv.profile_sink.take() { + sinks.lock().unwrap().push(sink); + } + }) + .map_err(|e| format!("spawn profile worker: {e}"))?; + handles.push(handle); + } + + let mut panicked = false; + for h in handles { + if h.join().is_err() { + panicked = true; + } + } + if panicked { + return Err("profile worker panicked".to_string()); + } + + let mut merged = ProfileSink::new(isolate); + let collected = Arc::try_unwrap(sinks) + .map_err(|_| "profile sinks still shared".to_string())? + .into_inner() + .map_err(|_| "profile sinks mutex poisoned".to_string())?; + for s in collected { + merged.merge(s); + } + Ok((passed.load(Ordering::Relaxed), failed.load(Ordering::Relaxed), merged)) +} + +/// Profile a `.ixe` out of circuit and write the `.ixesp` sidecar. Pure-Rust +/// entry point (used by the FFI wrapper and Rust tests). +pub fn profile_anon_ixe( + path: &str, + out: &str, + isolate: bool, + quiet: bool, +) -> Result { + let load_start = Instant::now(); + let ixon_env = IxonEnv::get_anon_mmap(std::path::Path::new(path)) + .map_err(|e| format!("profile: mmap+deserialize {path}: {e}"))?; + eprintln!( + "[rs_kernel_profile] loaded {} consts in {:.1?}", + ixon_env.const_count(), + load_start.elapsed() + ); + let (work, addrs) = build_anon_work(&ixon_env)?; + let targets = addrs.len(); + let env_arc = Arc::new(ixon_env); + let run_start = Instant::now(); + let (passed, failed, merged) = + run_anon_profile_parallel(Arc::clone(&env_arc), work, isolate, quiet)?; + eprintln!( + "[rs_kernel_profile] checked {passed}/{} ({} failed) in {:.1?}", + passed + failed, + failed, + run_start.elapsed() + ); + let profile = build_block_profile(&env_arc, &merged); + let bytes = profile.to_bytes(); + std::fs::write(out, &bytes).map_err(|e| format!("write {out}: {e}"))?; + eprintln!( + "[rs_kernel_profile] wrote {out}: {} blocks, {} delta edges, {} bytes", + profile.num_blocks(), + profile.num_edges(), + bytes.len() + ); + Ok(ProfileStats { + targets, + passed, + failed, + blocks: profile.num_blocks(), + edges: profile.num_edges(), + bytes: bytes.len(), + }) +} + +/// FFI: profile a `.ixe` out of circuit and write a `.ixesp` sidecar. +#[unsafe(no_mangle)] +pub extern "C" fn rs_kernel_profile_anon( + env_path: LeanString>, + out_path: LeanString>, + isolate: LeanBool>, + quiet: LeanBool>, +) -> LeanIOResult { + match profile_anon_ixe( + &env_path.to_string(), + &out_path.to_string(), + isolate.to_bool(), + quiet.to_bool(), + ) { + Ok(s) => { + eprintln!( + "[rs_kernel_profile] done: {} targets, {} blocks, {} edges, {} failed", + s.targets, s.blocks, s.edges, s.failed + ); + LeanIOResult::ok(LeanOwned::box_usize(0)) + }, + Err(e) => { + LeanIOResult::error_string(&format!("rs_kernel_profile_anon: {e}")) + }, + } +} + +/// FFI: partition a `.ixesp` into `num_shards` shards and write a `.ixes` +/// manifest. Prints a what-if report to stderr. +#[allow(clippy::cast_precision_loss)] // balance_pct is a small percentage +#[unsafe(no_mangle)] +pub extern "C" fn rs_shard_esp( + esp_path: LeanString>, + num_shards: LeanString>, + balance_pct: LeanString>, + out_path: LeanString>, +) -> LeanIOResult { + let num_shards = num_shards.to_string().parse::().unwrap_or(1); + let balance_pct = balance_pct.to_string().parse::().unwrap_or(5); + let out = out_path.to_string(); + let out_opt = if out.is_empty() { None } else { Some(out.as_str()) }; + let balance = (balance_pct as f64) / 100.0; + match crate::ix::shard::shard_esp( + &esp_path.to_string(), + num_shards, + balance, + out_opt, + ) { + Ok(report) => { + eprintln!("[rs_shard]\n{report}"); + LeanIOResult::ok(LeanOwned::box_usize(0)) + }, + Err(e) => LeanIOResult::error_string(&format!("rs_shard_esp: {e}")), + } +} + #[cfg(test)] mod tests { use super::{compact_in_flight_label, resolve_kernel_check_workers_from}; diff --git a/src/ix.rs b/src/ix.rs index af5d3329..a1fd600b 100644 --- a/src/ix.rs +++ b/src/ix.rs @@ -15,5 +15,7 @@ pub mod ground; pub mod ixon; pub mod kernel; pub mod mutual; +pub mod profile; +pub mod shard; pub mod store; pub mod strong_ordering; diff --git a/src/ix/kernel/check.rs b/src/ix/kernel/check.rs index 4d3dad1c..0b0f9764 100644 --- a/src/ix/kernel/check.rs +++ b/src/ix/kernel/check.rs @@ -87,6 +87,7 @@ impl TypeChecker<'_, M> { M::MField>: CheckDupLevelParams, { self.reset(); + self.begin_const(id); let c = self.get_const(id)?; self.check_const_member(id, &c) @@ -870,6 +871,57 @@ mod tests { use crate::ix::env::{DefinitionSafety, ReducibilityHints}; use crate::ix::ixon::constant::DefKind; + #[test] + fn profile_sink_records_delta_edge_and_fuel() { + use crate::ix::kernel::mode::Meta; + use crate::ix::kernel::testing as t; + use crate::ix::profile::ProfileSink; + + // g : Sort 2 := Sort 1 — a Definition, delta-reducible to Sort 1. + let (g_id, g) = t::mk_defn( + "g", + 0, + vec![], + t::sort(t::usucc(t::usucc(t::uzero()))), + t::sort1(), + ReducibilityHints::Regular(5), + ); + // f : g := Sort 0 — checking f must whnf (delta-unfold) g → Sort 1 to match + // infer(Sort 0) = Sort 1, so the recorder must capture the edge f→g. + let (f_id, f) = t::mk_defn( + "f", + 0, + vec![], + t::cnst("g", &[]), + t::sort0(), + ReducibilityHints::Regular(5), + ); + + let mut env = KEnv::::new(); + env.insert(g_id.clone(), g); + env.insert(f_id.clone(), f); + env.profile_sink = Some(ProfileSink::new(true)); + + { + let mut tc = TypeChecker::new(&mut env); + tc.check_const(&g_id).unwrap(); + tc.check_const(&f_id).unwrap(); + tc.finish_constant_accounting(); // flush the last constant's record + } + + let sink = env.profile_sink.as_ref().unwrap(); + let f_rec = sink.records.get(&f_id.addr).expect("f should be recorded"); + assert!( + f_rec.producers.contains(&g_id.addr), + "checking f must record a delta-unfold of g" + ); + assert!(f_rec.fuel > 0, "checking f consumes heartbeats"); + // g unfolds nothing of its own. + if let Some(g_rec) = sink.records.get(&g_id.addr) { + assert!(!g_rec.producers.contains(&g_id.addr)); + } + } + type AE = KExpr; type AU = KUniv; diff --git a/src/ix/kernel/env.rs b/src/ix/kernel/env.rs index 0383ab3f..1fdda019 100644 --- a/src/ix/kernel/env.rs +++ b/src/ix/kernel/env.rs @@ -352,6 +352,13 @@ pub struct KEnv { /// `IX_PERF_COUNTERS=1`. When the env var is unset the counters are /// no-ops; when set, the totals are dumped from the `Drop` impl below. pub perf: PerfCounters, + + /// Out-of-circuit profile recorder for sharding (see `plans/sharding.md`). + /// `Some` enables per-constant heartbeat + delta-unfold recording on this + /// worker; `None` (the default) has zero overhead. Deliberately preserved + /// across `clear`/`clear_releasing_memory` so recording survives scheduled + /// block boundaries within a run. + pub profile_sink: Option, } impl Default for KEnv { @@ -407,6 +414,7 @@ impl KEnv { block_check_results: FxHashMap::default(), next_fvar_id: 0, perf: PerfCounters::default(), + profile_sink: None, } } @@ -580,6 +588,30 @@ impl KEnv { self.block_check_results = FxHashMap::default(); self.next_fvar_id = 0; } + + /// Clear only the reduction-memo caches (whnf / infer / def-eq / unfold / + /// is-prop). Structural caches (`consts`, `blocks`, `intern`, recursor + /// caches, `block_check_results`) and the profile sink are preserved. + /// + /// Used by the profile recorder's per-constant isolation mode: clearing the + /// cross-constant memo between constants forces every delta-unfold to + /// re-execute (sound delta recording) and makes recorded heartbeats reflect + /// the in-circuit cost, which has no cross-constant memoization. Clearing a + /// pure memo never affects correctness — only performance. + pub fn clear_reduction_caches(&mut self) { + self.whnf_cache.clear(); + self.whnf_no_delta_cache.clear(); + self.whnf_no_delta_cheap_cache.clear(); + self.whnf_core_cache.clear(); + self.whnf_core_cheap_cache.clear(); + self.infer_cache.clear(); + self.infer_only_cache.clear(); + self.def_eq_cache.clear(); + self.def_eq_cheap_cache.clear(); + self.def_eq_failure.clear(); + self.unfold_cache.clear(); + self.is_prop_cache.clear(); + } } #[cfg(test)] diff --git a/src/ix/kernel/inductive.rs b/src/ix/kernel/inductive.rs index b1ee08f9..4cba8c1c 100644 --- a/src/ix/kernel/inductive.rs +++ b/src/ix/kernel/inductive.rs @@ -107,6 +107,7 @@ impl TypeChecker<'_, M> { for member in members { self.reset(); + self.begin_const(member); let c = self.get_const(member)?; self.validate_const_well_scoped(&c)?; match c { @@ -130,6 +131,7 @@ impl TypeChecker<'_, M> { for ind_id in &ind_ids { self.reset(); + self.begin_const(ind_id); self.check_inductive_member(ind_id)?; } for ctor_id in &ctor_ids { @@ -138,6 +140,7 @@ impl TypeChecker<'_, M> { _ => continue, }; self.reset(); + self.begin_const(ctor_id); self.check_ctor_against_inductive_member(ctor_id, &induct)?; } Ok(()) @@ -4040,6 +4043,7 @@ re-run with `IX_RECURSOR_DUMP={}` for the full breakdown.", ) -> Result<(), TcError> { for member in members { self.reset(); + self.begin_const(member); let c = self.get_const(member)?; self.validate_const_well_scoped(&c)?; match c { @@ -4057,6 +4061,7 @@ re-run with `IX_RECURSOR_DUMP={}` for the full breakdown.", for member in members { self.reset(); + self.begin_const(member); self.check_recursor_member(member)?; } Ok(()) diff --git a/src/ix/kernel/tc.rs b/src/ix/kernel/tc.rs index 3ab0bb12..3cccfa8d 100644 --- a/src/ix/kernel/tc.rs +++ b/src/ix/kernel/tc.rs @@ -160,6 +160,15 @@ pub struct TypeChecker<'a, M: KernelMode> { pub rec_fuel: u64, /// Optional diagnostic label for the current top-level constant. pub debug_label: Option, + + // -- Sharding profiler (see `plans/sharding.md`) -- + /// Address of the constant currently being checked, used to attribute + /// recorded heartbeats and delta-unfold edges. `None` unless a `profile_sink` + /// is installed on `env`; set by `begin_const` after each per-constant reset. + pub(crate) cur_const: Option
, + /// Addresses of constants whose bodies were delta-unfolded during the current + /// constant's check. Drained per constant by `record_current_fuel_used`. + pub(crate) delta_targets: FxHashSet
, /// Gated miss sampler for fuel-exhaustion diagnostics. Populated only when /// `IX_HOT_MISSES=1`, keyed by a compact phase/head/lbr shape. hot_misses: FxHashMap, @@ -208,6 +217,8 @@ impl<'a, M: KernelMode> TypeChecker<'a, M> { def_eq_peak: 0, rec_fuel: max_rec_fuel(), debug_label: None, + cur_const: None, + delta_targets: FxHashSet::default(), hot_misses: FxHashMap::default(), ctx_addr_cache: FxHashMap::default(), lctx: super::lctx::LocalContext::new(), @@ -821,6 +832,12 @@ impl<'a, M: KernelMode> TypeChecker<'a, M> { // Record fuel consumed by the *previous* constant check (if any) before // wiping it. `Drop` records the final check in a TypeChecker's lifetime. self.record_current_fuel_used(); + // Per-constant cache isolation for sound/faithful profile recording: clear + // the cross-constant reduction memo so the next constant re-executes every + // delta-unfold and its heartbeats reflect in-circuit (un-memoized) cost. + if self.env.profile_sink.as_ref().is_some_and(|s| s.isolate) { + self.env.clear_reduction_caches(); + } self.rec_fuel = max_rec_fuel(); self.hot_misses.clear(); // Reset the local context (it must always be empty between constants). @@ -885,6 +902,35 @@ impl<'a, M: KernelMode> TypeChecker<'a, M> { if used > 0 { self.env.perf.record_constant_fuel_used(used); } + // Sharding profiler: flush the just-finished constant's heartbeats and + // delta-unfold edges. `delta_targets` is always drained (even when + // `cur_const` is unset) so producers never leak into the next constant. + if self.env.profile_sink.is_some() { + let producers = std::mem::take(&mut self.delta_targets); + if let Some(addr) = self.cur_const.take() { + let sink = self.env.profile_sink.as_mut().unwrap(); + sink.record(addr, used, producers); + } + } + } + + /// Sharding profiler: mark `id` as the constant now being checked, so its + /// heartbeats and delta-unfold edges are attributed correctly. No-op unless a + /// `profile_sink` is installed. Called right after each per-constant `reset`. + #[inline] + pub(crate) fn begin_const(&mut self, id: &KId) { + if self.env.profile_sink.is_some() { + self.cur_const = Some(id.addr.clone()); + } + } + + /// Sharding profiler: record that the current constant delta-unfolds the body + /// of `id`. No-op unless a `profile_sink` is installed. + #[inline] + pub(crate) fn record_delta_target(&mut self, id: &KId) { + if self.env.profile_sink.is_some() { + self.delta_targets.insert(id.addr.clone()); + } } // ----------------------------------------------------------------------- diff --git a/src/ix/kernel/whnf.rs b/src/ix/kernel/whnf.rs index 03d53b08..16d74f4d 100644 --- a/src/ix/kernel/whnf.rs +++ b/src/ix/kernel/whnf.rs @@ -766,6 +766,7 @@ impl TypeChecker<'_, M> { && matches!(kind, DefKind::Definition | DefKind::Theorem) { self.dump_delta_trace(id, 0, e); + self.record_delta_target(id); let val = val.clone(); let us: Vec<_> = us.to_vec(); return Ok(Some(self.unfold_const_value(e, &val, &us)?)); @@ -792,6 +793,7 @@ impl TypeChecker<'_, M> { .. }) => { self.dump_delta_trace(id, args.len(), e); + self.record_delta_target(id); val.clone() }, _ => return Ok(None), diff --git a/src/ix/profile.rs b/src/ix/profile.rs new file mode 100644 index 00000000..ed05c539 --- /dev/null +++ b/src/ix/profile.rs @@ -0,0 +1,489 @@ +//! Out-of-circuit kernel profile (`.ixesp`) — the cost + delta-graph hints +//! that drive the sharding strategy (see `plans/sharding.md`). +//! +//! A profile is computed by running the Rust kernel **out of circuit** over an +//! environment and recording, per *block*: +//! +//! - `heartbeats`: total recursive fuel consumed checking the block's members +//! (the balance metric for partitioning), +//! - `serialized_size`: the block's serialized byte length (the ingress-cost +//! metric — net weight in the partition hypergraph), +//! - `const_count`: number of constants in the block, +//! - the set of *other blocks whose definition bodies the block delta-unfolds* +//! (the delta edges — the cost graph the partitioner cuts on). +//! +//! A "block" is the ingress unit: a `Muts` mutual block or a standalone +//! constant. Projection constants (`*Prj { block, .. }`) are attributed to +//! their `block` address; everything else is its own block. +//! +//! The delta graph is stored in compressed-sparse-row (CSR) form keyed by +//! stable block ids (assigned by sorting block addresses), and the on-disk +//! `.ixesp` format is an explicit little-endian binary so it does not depend +//! on the optional `serde`/`bincode` feature. + +// Block ids and counts are `u32` (envs are far below the limit); the binary +// decoder maps decode failures to a single message. Both are intentional here. +#![allow(clippy::cast_possible_truncation, clippy::map_err_ignore)] + +use rustc_hash::{FxHashMap, FxHashSet}; + +use crate::ix::address::Address; + +/// Magic bytes at the head of every `.ixesp` file. +const MAGIC: &[u8; 8] = b"IXESP\0\0\0"; +/// On-disk format version. Bump on any incompatible layout change. +const VERSION: u32 = 1; + +/// Per-block recorded statistics. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct BlockEntry { + /// Content address of the block (a `Muts` block or a standalone constant). + pub addr: Address, + /// Total recursive fuel (heartbeats) consumed checking this block's members. + pub heartbeats: u64, + /// Serialized byte length of the block (ingress cost / net weight). + pub serialized_size: u32, + /// Number of constants in the block (1 for standalone constants). + pub const_count: u32, +} + +/// A recorded kernel profile over an environment. +/// +/// Blocks are indexed by stable id `0..num_blocks`. The delta graph is stored +/// in CSR form: `producers(c)` are the block ids whose definition bodies block +/// `c` delta-unfolds (the "consumer → producer" direction). Self-edges are +/// dropped; producer lists are sorted and deduplicated. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct BlockProfile { + blocks: Vec, + /// CSR row offsets into `delta_col`, length `blocks.len() + 1`. + delta_row: Vec, + /// CSR column indices: producer block ids, grouped by consumer. + delta_col: Vec, +} + +impl BlockProfile { + /// Number of blocks (vertices). + pub fn num_blocks(&self) -> usize { + self.blocks.len() + } + + /// Number of delta edges (consumer → producer pairs). + pub fn num_edges(&self) -> usize { + self.delta_col.len() + } + + /// The block entries, indexed by block id. + pub fn blocks(&self) -> &[BlockEntry] { + &self.blocks + } + + /// The entry for block id `i`. + pub fn block(&self, i: u32) -> &BlockEntry { + &self.blocks[i as usize] + } + + /// Producer block ids unfolded by consumer block `c` (sorted, deduped, no + /// self-edges). + pub fn producers(&self, c: u32) -> &[u32] { + let lo = self.delta_row[c as usize]; + let hi = self.delta_row[c as usize + 1]; + &self.delta_col[lo..hi] + } + + /// Build the reverse delta adjacency: for each producer block, the sorted set + /// of consumer blocks that unfold it. This is the natural form for the + /// partition hypergraph, where `net(p) = {p} ∪ consumers_of(p)`. + pub fn consumers_csr(&self) -> (Vec, Vec) { + let n = self.num_blocks(); + let mut counts = vec![0usize; n + 1]; + for &p in &self.delta_col { + counts[p as usize + 1] += 1; + } + for i in 0..n { + counts[i + 1] += counts[i]; + } + let row = counts.clone(); + let mut col = vec![0u32; self.delta_col.len()]; + let mut cursor = counts; + for c in 0..n as u32 { + for &p in self.producers(c) { + let slot = cursor[p as usize]; + col[slot] = c; + cursor[p as usize] += 1; + } + } + (row, col) + } + + /// Total heartbeats across all blocks. + pub fn total_heartbeats(&self) -> u128 { + self.blocks.iter().map(|b| u128::from(b.heartbeats)).sum() + } + + /// Serialize to the `.ixesp` binary format. + pub fn to_bytes(&self) -> Vec { + let n = self.blocks.len(); + let mut out = Vec::with_capacity( + 8 + 4 + 4 + n * 48 + 8 + (n + 1) * 8 + self.delta_col.len() * 4, + ); + out.extend_from_slice(MAGIC); + out.extend_from_slice(&VERSION.to_le_bytes()); + out.extend_from_slice(&(n as u32).to_le_bytes()); + for b in &self.blocks { + out.extend_from_slice(b.addr.as_bytes()); + out.extend_from_slice(&b.heartbeats.to_le_bytes()); + out.extend_from_slice(&b.serialized_size.to_le_bytes()); + out.extend_from_slice(&b.const_count.to_le_bytes()); + } + out.extend_from_slice(&(self.delta_col.len() as u64).to_le_bytes()); + // CSR row offsets (n+1 entries) as u64. + for &off in &self.delta_row { + out.extend_from_slice(&(off as u64).to_le_bytes()); + } + for &p in &self.delta_col { + out.extend_from_slice(&p.to_le_bytes()); + } + out + } + + /// Deserialize from the `.ixesp` binary format. + pub fn from_bytes(bytes: &[u8]) -> Result { + let mut r = Reader::new(bytes); + let magic = r.take(8)?; + if magic != MAGIC { + return Err(ProfileError::BadMagic); + } + let version = r.u32()?; + if version != VERSION { + return Err(ProfileError::BadVersion(version)); + } + let n = r.u32()? as usize; + let mut blocks = Vec::with_capacity(n); + for _ in 0..n { + let addr = Address::from_slice(r.take(32)?) + .map_err(|_| ProfileError::Truncated)?; + let heartbeats = r.u64()?; + let serialized_size = r.u32()?; + let const_count = r.u32()?; + blocks.push(BlockEntry { + addr, + heartbeats, + serialized_size, + const_count, + }); + } + let num_edges = r.u64()? as usize; + let mut delta_row = Vec::with_capacity(n + 1); + for _ in 0..n + 1 { + delta_row.push(r.u64()? as usize); + } + let mut delta_col = Vec::with_capacity(num_edges); + for _ in 0..num_edges { + delta_col.push(r.u32()?); + } + // Structural validation: monotone offsets bounded by edge count, in-range ids. + if delta_row.len() != n + 1 + || delta_row.first() != Some(&0) + || delta_row.last() != Some(&num_edges) + { + return Err(ProfileError::Corrupt); + } + for w in delta_row.windows(2) { + if w[0] > w[1] { + return Err(ProfileError::Corrupt); + } + } + for &p in &delta_col { + if p as usize >= n { + return Err(ProfileError::Corrupt); + } + } + Ok(BlockProfile { blocks, delta_row, delta_col }) + } +} + +/// Errors from decoding a `.ixesp` file. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ProfileError { + BadMagic, + BadVersion(u32), + Truncated, + Corrupt, +} + +impl std::fmt::Display for ProfileError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ProfileError::BadMagic => write!(f, "not an .ixesp file (bad magic)"), + ProfileError::BadVersion(v) => { + write!(f, "unsupported .ixesp version {v}") + }, + ProfileError::Truncated => write!(f, "truncated .ixesp file"), + ProfileError::Corrupt => write!(f, "corrupt .ixesp file"), + } + } +} + +impl std::error::Error for ProfileError {} + +/// Minimal little-endian byte reader with bounds checking. +struct Reader<'a> { + buf: &'a [u8], + pos: usize, +} + +impl<'a> Reader<'a> { + fn new(buf: &'a [u8]) -> Self { + Reader { buf, pos: 0 } + } + fn take(&mut self, n: usize) -> Result<&'a [u8], ProfileError> { + let end = self.pos.checked_add(n).ok_or(ProfileError::Truncated)?; + if end > self.buf.len() { + return Err(ProfileError::Truncated); + } + let s = &self.buf[self.pos..end]; + self.pos = end; + Ok(s) + } + fn u32(&mut self) -> Result { + Ok(u32::from_le_bytes(self.take(4)?.try_into().unwrap())) + } + fn u64(&mut self) -> Result { + Ok(u64::from_le_bytes(self.take(8)?.try_into().unwrap())) + } +} + +/// Accumulates block-level statistics and delta edges (keyed by address), then +/// freezes into a [`BlockProfile`] with stable, address-sorted block ids. +/// +/// Phase A (the kernel recorder) feeds this with one `block(..)` per checked +/// block and one `delta_edge(consumer, producer)` per recorded cross-block +/// unfold. The builder is the merge point for per-worker accumulators: calling +/// `block`/`delta_edge` is commutative and idempotent w.r.t. edge sets, so +/// merge order does not affect the result. +#[derive(Default)] +pub struct ProfileBuilder { + blocks: FxHashMap, +} + +#[derive(Default)] +struct Accum { + heartbeats: u64, + serialized_size: u32, + const_count: u32, + producers: FxHashSet
, +} + +impl ProfileBuilder { + pub fn new() -> Self { + Self::default() + } + + /// Record (or accumulate into) a block's statistics. Heartbeats and + /// const_count accumulate additively; serialized_size is set (idempotent for + /// a fixed block). + pub fn block( + &mut self, + addr: Address, + heartbeats: u64, + serialized_size: u32, + const_count: u32, + ) { + let e = self.blocks.entry(addr).or_default(); + e.heartbeats = e.heartbeats.saturating_add(heartbeats); + e.serialized_size = serialized_size; + e.const_count = e.const_count.saturating_add(const_count); + } + + /// Record that `consumer` delta-unfolds the body of `producer`. Self-edges + /// are ignored. Ensures both endpoints exist as blocks (with zeroed stats if + /// not yet seen) so the graph is well-formed even if a producer is only ever + /// referenced, never directly checked. + pub fn delta_edge(&mut self, consumer: Address, producer: Address) { + if consumer == producer { + return; + } + self.blocks.entry(producer.clone()).or_default(); + self.blocks.entry(consumer).or_default().producers.insert(producer); + } + + /// Freeze into an immutable [`BlockProfile`]. Block ids are assigned by + /// sorting addresses, so the result is deterministic regardless of insertion + /// order. + pub fn finish(self) -> BlockProfile { + let mut addrs: Vec
= self.blocks.keys().cloned().collect(); + addrs.sort(); + let id_of: FxHashMap = + addrs.iter().enumerate().map(|(i, a)| (a.clone(), i as u32)).collect(); + + let mut blocks = Vec::with_capacity(addrs.len()); + let mut delta_row = Vec::with_capacity(addrs.len() + 1); + let mut delta_col = Vec::new(); + delta_row.push(0usize); + + for addr in &addrs { + let a = &self.blocks[addr]; + blocks.push(BlockEntry { + addr: addr.clone(), + heartbeats: a.heartbeats, + serialized_size: a.serialized_size, + const_count: a.const_count, + }); + let mut prods: Vec = a.producers.iter().map(|p| id_of[p]).collect(); + prods.sort_unstable(); + prods.dedup(); + delta_col.extend_from_slice(&prods); + delta_row.push(delta_col.len()); + } + + BlockProfile { blocks, delta_row, delta_col } + } +} + +/// Per-worker raw accumulator filled by the out-of-circuit kernel recorder: for +/// each *constant* (by address) checked on this worker, its heartbeats and the +/// set of constant addresses whose definition bodies it delta-unfolded. The +/// env-aware layer later maps these constant addresses to their home blocks and +/// attaches serialized sizes to produce a [`BlockProfile`]. +#[derive(Default, Debug)] +pub struct ProfileSink { + /// When true, the kernel clears its reduction-memo caches between constants + /// so recording is sound (no unfolds skipped by cross-constant cache hits) + /// and heartbeats reflect the no-cross-constant-memo in-circuit cost. + pub isolate: bool, + /// Consumer constant address → record. + pub records: FxHashMap, +} + +/// One constant's recorded statistics (pre block-aggregation). +#[derive(Default, Debug, Clone)] +pub struct ConstRecord { + /// Recursive fuel (heartbeats) consumed checking this constant. + pub fuel: u64, + /// Constant addresses whose bodies were delta-unfolded during the check. + pub producers: FxHashSet
, +} + +impl ProfileSink { + pub fn new(isolate: bool) -> Self { + ProfileSink { isolate, records: FxHashMap::default() } + } + + /// Accumulate one constant's record (additive in fuel, set-union in + /// producers) so repeated flushes for the same constant combine correctly. + pub fn record( + &mut self, + consumer: Address, + fuel: u64, + producers: impl IntoIterator, + ) { + let rec = self.records.entry(consumer).or_default(); + rec.fuel = rec.fuel.saturating_add(fuel); + rec.producers.extend(producers); + } + + /// Merge another worker's sink into this one (order-independent). + pub fn merge(&mut self, other: ProfileSink) { + for (addr, rec) in other.records { + let e = self.records.entry(addr).or_default(); + e.fuel = e.fuel.saturating_add(rec.fuel); + e.producers.extend(rec.producers); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn addr(byte: u8) -> Address { + Address::from_slice(&[byte; 32]).unwrap() + } + + fn sample() -> BlockProfile { + let mut b = ProfileBuilder::new(); + // Three blocks a = col[lo..hi].to_vec(); + got.sort_unstable(); + assert_eq!(got, vec![0, 2]); + } + + #[test] + fn roundtrip_serialization() { + let p = sample(); + let bytes = p.to_bytes(); + let q = BlockProfile::from_bytes(&bytes).unwrap(); + assert_eq!(p, q); + } + + #[test] + fn rejects_bad_magic_and_truncation() { + assert_eq!( + BlockProfile::from_bytes(b"nope").unwrap_err(), + ProfileError::Truncated + ); + let mut bytes = sample().to_bytes(); + bytes[0] = b'X'; + assert_eq!( + BlockProfile::from_bytes(&bytes).unwrap_err(), + ProfileError::BadMagic + ); + } + + #[test] + fn merge_order_independent() { + // Build the same logical profile with edges added in a different order and + // via separate builders merged conceptually; result must be identical. + let mut b = ProfileBuilder::new(); + b.delta_edge(addr(3), addr(2)); + b.block(addr(3), 300, 30, 1); + b.delta_edge(addr(1), addr(3)); + b.block(addr(2), 200, 20, 3); + b.delta_edge(addr(1), addr(2)); + b.block(addr(1), 100, 10, 1); + b.delta_edge(addr(2), addr(2)); + assert_eq!(b.finish(), sample()); + } +} diff --git a/src/ix/shard.rs b/src/ix/shard.rs new file mode 100644 index 00000000..0fd9682e --- /dev/null +++ b/src/ix/shard.rs @@ -0,0 +1,1644 @@ +//! Sharding: partition an environment's blocks into `N` shards that are +//! balanced by heartbeats and minimize cross-shard delta-unfold ingress +//! (see `plans/sharding.md`). +//! +//! ## Model +//! +//! We solve a **weighted hypergraph partitioning** problem under the +//! connectivity-1 (km1) metric: +//! +//! - **Vertices** are blocks, weighted by `heartbeats` (the balance metric). +//! - **Nets** are blocks `p` that get delta-unfolded by someone; the net +//! connects `p`'s home plus every block that unfolds `p`, and is weighted by +//! `serialized_size(p)` (the ingress cost paid to pull `p`'s body into a +//! foreign shard). +//! - The objective is `Σ_p size(p) · (λ(p) − 1)`, where `λ(p)` is the number +//! of distinct shards spanning `net(p)`. Because the home block is always a +//! pin, `λ(p) − 1` counts exactly the foreign shards that must ingress `p` — +//! i.e. total cross-shard ingress bytes. +//! +//! ## Algorithm +//! +//! **Recursive bisection.** Balanced min-cut bisection recursively splits the +//! block hypergraph, allocating the `N`-shard budget between the two sides at +//! each step (so any `N` works, not just powers of two); the bisection tree is +//! the future proof-aggregation tree. On recursion we apply **cut-net +//! splitting**: a net already cut at an upper level is restricted to its pins +//! within each half so it is not recounted deeper. +//! +//! **Multilevel coarsening.** Each individual bisection is solved with a +//! multilevel V-cycle rather than flat local search: we *coarsen* the +//! sub-hypergraph by repeatedly merging tightly-coupled blocks into +//! supervertices (heavy-edge matching) until only a few hundred remain, decide +//! the cut once on that tiny graph (greedy graph-growing + Fiduccia–Mattheyses +//! to convergence), then *uncoarsen* — projecting the cut back down and +//! boundary-refining with FM at each level. Deciding global structure on the +//! small graph and only ever refining locally on the big ones is what keeps +//! large environments (mathlib ≈ 631k blocks) partitioning in seconds. The +//! partitioner is fully self-contained — no external partitioner dependency. + +// This module works throughout with `u32` block/vertex ids, `u64`/`u128` weights +// (heartbeats, ingress bytes), and `f64` balance ratios. The narrowing/precision +// casts between them are pervasive and intentional — envs are far below `u32` +// limits and the balance arithmetic tolerates `f64` rounding — so the lossy-cast +// lints are allowed module-wide rather than annotated at ~30 sites. The binary +// manifest decoder maps decode failures to a single message (`map_err_ignore`), +// and a few matching loops mutate the array they index (`needless_range_loop`). +#![allow( + clippy::cast_possible_truncation, + clippy::cast_precision_loss, + clippy::cast_sign_loss, + clippy::map_err_ignore, + clippy::needless_range_loop +)] + +use rustc_hash::FxHashSet; +use std::cmp::Reverse; +use std::collections::BinaryHeap; +use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; +use std::time::Instant; + +use crate::ix::address::Address; +use crate::ix::profile::BlockProfile; + +/// Recurse the two halves of a bisection in parallel (rayon) when the parent +/// sub-problem is at least this large; below it the join overhead isn't worth +/// it. The two halves own disjoint block sets, so their writes to the shared +/// (atomic) assignment buffer never alias. +const PARALLEL_THRESHOLD: usize = 4_000; + +/// Target vertex count for the coarsest graph in the multilevel V-cycle. We +/// coarsen down to roughly this many supervertices, decide the global cut there +/// (cheaply, at high quality), then uncoarsen and locally refine. Small enough +/// that greedy + full FM on it is instant; large enough to still admit a +/// balanced bisection. +const COARSEST_TARGET: usize = 256; + +/// Stop coarsening if a matching pass fails to shrink the graph by at least +/// `1 − COARSEN_STALL_RATIO` (here 10%). Guards against spinning on graphs with +/// few matchable pairs (very sparse or hub-dominated subs); [`bisect`] then +/// falls back to refining whatever level was reached. +const COARSEN_STALL_RATIO: f64 = 0.90; + +/// Nets larger than this are skipped when rating vertices for heavy-edge +/// matching: a co-pin of an `m`-pin net earns only `weight/(m−1)` toward a +/// match, so large nets carry negligible clustering signal while costing `O(m)` +/// to scan. Keeping the matching rating near-linear bounds coarsening time. +const MATCH_NET_CAP: usize = 100; + +/// FM passes per level during uncoarsening. The projected partition is already +/// globally good, so a single boundary-refinement pass suffices to clean up each +/// level's cut frontier (passes also stop early once one yields no improvement). +const REFINE_PASSES: u32 = 1; + +/// Initial-partition restarts on the (tiny) coarsest graph: greedy graph-growing +/// from this many diverse deterministic seeds, each FM-refined, keeping the +/// lowest-cut result. Graph-growing from a single seed can "leak" across a thin +/// bridge between two clusters; diverse seeds (one of which starts far from the +/// bridge) recover the clean cut. Cheap — the coarsest graph is ≈ +/// [`COARSEST_TARGET`] vertices and this runs once per bisection. +const INITIAL_RESTARTS: usize = 4; + +/// Bisections of at least this many vertices are logged by [`PartitionProgress`] +/// (purely an observability threshold — a slow run should never look stuck). +const PROGRESS_BISECT_LOG: usize = 50_000; + +/// Nets with more pins than this are "hubs" (e.g. a core lemma delta-unfolded by +/// thousands of blocks). They are cut in essentially any balanced partition, so +/// they carry no useful refinement gradient, and visiting all their pins per +/// move would make FM O(pins-of-hub) per step. Refinement (greedy growth + FM) +/// therefore ignores them; the reported [`Hypergraph::connectivity_objective`] +/// still counts every net in full. After cut-net splitting a former hub may +/// shrink below the cap at deeper recursion levels and become active again. +const FM_NET_CAP: usize = 400; + +/// Maximum FM passes per bisection (a safety bound against pathological +/// oscillation; passes normally converge well before this). +const MAX_FM_PASSES: u32 = 10; + +/// A weighted hypergraph derived from a [`BlockProfile`]. Vertex ids are block +/// ids (identical to the profile's). Nets are stored with global pins. +pub struct Hypergraph { + /// Vertex (block) weights = heartbeats. + vweight: Vec, + /// Net weights = serialized_size of the unfolded block. + net_weight: Vec, + /// Net pins (global block ids). `pins[i]` are the pins of net `i`. + net_pins: Vec>, +} + +impl Hypergraph { + /// Build the partition hypergraph from a profile. A net is created for every + /// block that has at least one *external* consumer (i.e. is delta-unfolded by + /// some other block); singleton nets are omitted as they can never be cut. + pub fn from_profile(profile: &BlockProfile) -> Hypergraph { + let n = profile.num_blocks(); + let vweight: Vec = + (0..n as u32).map(|i| profile.block(i).heartbeats).collect(); + let (row, col) = profile.consumers_csr(); + let mut net_weight = Vec::new(); + let mut net_pins = Vec::new(); + for p in 0..n { + let consumers = &col[row[p]..row[p + 1]]; + if consumers.is_empty() { + continue; + } + let mut pins = Vec::with_capacity(consumers.len() + 1); + pins.push(p as u32); // home pin + pins.extend_from_slice(consumers); + net_weight.push(u64::from(profile.block(p as u32).serialized_size)); + net_pins.push(pins); + } + Hypergraph { vweight, net_weight, net_pins } + } + + pub fn num_vertices(&self) -> usize { + self.vweight.len() + } + + pub fn num_nets(&self) -> usize { + self.net_pins.len() + } + + /// The connectivity-1 (km1) objective for a given shard assignment: + /// `Σ_net weight · (λ − 1)`, where `λ` is the number of distinct shards the + /// net's pins fall into. This is the total cross-shard ingress in bytes. + pub fn connectivity_objective(&self, shard_of: &[u32]) -> u128 { + let mut total: u128 = 0; + let mut seen: FxHashSet = FxHashSet::default(); + for (i, pins) in self.net_pins.iter().enumerate() { + seen.clear(); + for &v in pins { + seen.insert(shard_of[v as usize]); + } + let lambda = seen.len() as u128; + if lambda > 1 { + total += u128::from(self.net_weight[i]) * (lambda - 1); + } + } + total + } + + /// Partition the blocks into `num_shards` shards via recursive bisection. + /// Returns the shard id (in `0..num_shards`) for every block id. `epsilon` is + /// the per-bisection balance tolerance (e.g. `0.05` for ±5%). `num_shards` + /// need not be a power of two. + pub fn partition(&self, num_shards: usize, epsilon: f64) -> Vec { + let n = self.num_vertices(); + if num_shards <= 1 { + return vec![0u32; n]; // everything in shard 0 + } + // Cap each block's balance weight at the ideal per-shard heartbeats + // (total / num_shards). This keeps balancing heartbeat-aware while a balanced + // split is achievable (few shards), but stops a single oversized *atomic* + // block from skewing every split toward it once there are many shards. The + // resulting heartbeat imbalance is accepted; the goal is `num_shards` + // **non-empty** parallel shards with minimal cross-shard ingress. + let total_hb: u128 = self.vweight.iter().map(|&w| u128::from(w)).sum(); + let cap = + (total_hb / num_shards as u128).max(1).min(u128::from(u64::MAX)) as u64; + let sub = SubHyper::full(self, cap); + // Atomic assignment buffer: independent subtrees own disjoint block sets, so + // they can be partitioned on separate threads without their writes aliasing. + let shard_of: Vec = (0..n).map(|_| AtomicU32::new(0)).collect(); + let prog = PartitionProgress::new(num_shards); + rec_bisect(&sub, num_shards, 0, epsilon, &shard_of, &prog); + prog.done(); + shard_of.into_iter().map(AtomicU32::into_inner).collect() + } +} + +/// A borrowed, cut-relevant view of one hypergraph level — either a top-level +/// [`SubHyper`] or a coarsened [`CoarseLevel`]. The greedy / FM machinery is +/// written against this view so it runs unchanged at every level of the +/// multilevel hierarchy. `Copy` (four slice references), so it is passed by +/// value freely. +#[derive(Clone, Copy)] +struct Level<'a> { + /// vertex weights (heartbeats) + vw: &'a [u64], + /// balance weights (capped heartbeats) + bw: &'a [u64], + /// nets: (weight, pins) + nets: &'a [(u64, Vec)], + /// vertex -> incident net indices + vnets: &'a [Vec], +} + +impl<'a> Level<'a> { + fn num_vertices(&self) -> usize { + self.vw.len() + } +} + +/// A sub-hypergraph used during recursion, with vertices re-indexed locally and +/// nets restricted (cut-net split) to the vertices it contains. +struct SubHyper { + /// local vertex -> global block id + global: Vec, + /// local vertex weights (heartbeats) + vw: Vec, + /// balance weights: each block's heartbeats capped at `cap` (or all 1 if every + /// capped weight is zero in this sub) + bw: Vec, + /// restricted nets: (weight, local pins) + nets: Vec<(u64, Vec)>, + /// local vertex -> incident net indices + vnets: Vec>, + /// global per-shard heartbeat target used to cap `bw` (propagated unchanged + /// through recursion) + cap: u64, +} + +impl SubHyper { + fn full(h: &Hypergraph, cap: u64) -> SubHyper { + let n = h.num_vertices(); + let global: Vec = (0..n as u32).collect(); + let vw = h.vweight.clone(); + let nets: Vec<(u64, Vec)> = h + .net_pins + .iter() + .enumerate() + .map(|(i, pins)| (h.net_weight[i], pins.clone())) + .collect(); + SubHyper::assemble(global, vw, nets, cap) + } + + fn assemble( + global: Vec, + vw: Vec, + nets: Vec<(u64, Vec)>, + cap: u64, + ) -> SubHyper { + let n = vw.len(); + let mut bw: Vec = vw.iter().map(|&w| w.min(cap)).collect(); + let total: u128 = bw.iter().map(|&w| u128::from(w)).sum(); + if total == 0 { + bw = vec![1; n]; + } + let mut vnets = vec![Vec::new(); n]; + for (i, (_, pins)) in nets.iter().enumerate() { + for &v in pins { + vnets[v as usize].push(i as u32); + } + } + SubHyper { global, vw, bw, nets, vnets, cap } + } + + fn num_vertices(&self) -> usize { + self.vw.len() + } + + /// Borrow this sub-hypergraph as a [`Level`] for the greedy / FM machinery. + fn level(&self) -> Level<'_> { + Level { vw: &self.vw, bw: &self.bw, nets: &self.nets, vnets: &self.vnets } + } + + /// Induce the sub-hypergraph on the local vertices selected by `keep`, + /// applying cut-net splitting: each net is restricted to its kept pins and + /// dropped if fewer than two remain. + fn induce(&self, keep: &[bool]) -> SubHyper { + let mut remap = vec![u32::MAX; self.num_vertices()]; + let mut global = Vec::new(); + let mut vw = Vec::new(); + for v in 0..self.num_vertices() { + if keep[v] { + remap[v] = global.len() as u32; + global.push(self.global[v]); + vw.push(self.vw[v]); + } + } + let mut nets = Vec::new(); + for (w, pins) in &self.nets { + let kept: Vec = pins + .iter() + .filter(|&&p| keep[p as usize]) + .map(|&p| remap[p as usize]) + .collect(); + if kept.len() >= 2 { + nets.push((*w, kept)); + } + } + SubHyper::assemble(global, vw, nets, self.cap) + } +} + +/// Recursively split `sub` into `k` shards (contiguous ids `base..base+k`), +/// writing shard ids into `shard_of` (indexed by global block id). +/// +/// The `k` leaf budget is allocated between the two sides ~proportional to +/// heartbeat mass, but clamped so each side receives at least 1 leaf and at most +/// its own vertex count. That guarantees **every shard is non-empty** whenever +/// the subtree holds at least `k` vertices (at the top, `#blocks ≥ 2ⁿ`), so the +/// full parallelism is realized; the heartbeat-proportional split also keeps +/// per-shard work as even as the atomic blocks allow. The resulting bisection +/// tree is a (possibly unbalanced) binary tree — still a valid aggregation tree. +fn rec_bisect( + sub: &SubHyper, + k: usize, + base: u32, + epsilon: f64, + shard_of: &[AtomicU32], + prog: &PartitionProgress, +) { + if k <= 1 || sub.num_vertices() <= 1 { + // A leaf (or a subtree with ≤1 vertex): everything here is one shard. + for &g in &sub.global { + shard_of[g as usize].store(base, Ordering::Relaxed); + } + prog.leaf(); + return; + } + prog.bisecting(sub.num_vertices(), k); + let side = bisect(sub, epsilon); + let keep0: Vec = side.iter().map(|&s| s == 0).collect(); + let keep1: Vec = side.iter().map(|&s| s == 1).collect(); + let left = sub.induce(&keep0); + let right = sub.induce(&keep1); + let nl = left.num_vertices(); + let nr = right.num_vertices(); + if nl == 0 || nr == 0 { + // Degenerate split: `bisect` guarantees both sides non-empty for |sub| ≥ 2 + // (its initial partition rejects empty-sided candidates), so this is a + // defensive backstop only — keep the subtree together as one shard. + for &g in &sub.global { + shard_of[g as usize].store(base, Ordering::Relaxed); + } + prog.leaf(); + return; + } + // Allocate leaves ~proportional to heartbeat mass, clamped to [1, side size]. + // `lo`/`hi` are feasible (lo ≤ hi) because nl + nr ≥ k with nl, nr ≥ 1. + let wl: u128 = left.vw.iter().map(|&w| u128::from(w)).sum(); + let wr: u128 = right.vw.iter().map(|&w| u128::from(w)).sum(); + let wsum = (wl + wr).max(1); + let ideal = ((k as u128 * wl + wsum / 2) / wsum) as usize; + let lo = 1.max(k.saturating_sub(nr)); + let hi = (k - 1).min(nl); + let k_left = ideal.clamp(lo, hi); + let rbase = base + k_left as u32; + // The two halves are independent (disjoint blocks) — recurse them in parallel + // when the work is large enough to amortize the join. The result is identical + // to serial execution (each subtree is deterministic). + if nl + nr >= PARALLEL_THRESHOLD { + rayon::join( + || rec_bisect(&left, k_left, base, epsilon, shard_of, prog), + || rec_bisect(&right, k - k_left, rbase, epsilon, shard_of, prog), + ); + } else { + rec_bisect(&left, k_left, base, epsilon, shard_of, prog); + rec_bisect(&right, k - k_left, rbase, epsilon, shard_of, prog); + } +} + +/// Progress reporter for [`Hypergraph::partition`]. The partitioner is silent +/// otherwise (it only writes its output at the end), which makes a slow run +/// indistinguishable from a stuck one; this logs each large bisection and the +/// running shards-finalized count (to stderr, unless `IX_SHARD_QUIET` is set). +struct PartitionProgress { + total: usize, + finalized: AtomicUsize, + start: Instant, + enabled: bool, +} + +impl PartitionProgress { + fn new(total: usize) -> Self { + PartitionProgress { + total, + finalized: AtomicUsize::new(0), + start: Instant::now(), + enabled: std::env::var("IX_SHARD_QUIET").is_err(), + } + } + + /// Log a large bisection about to run (the slow steps). `&self` so it can be + /// shared across the parallel recursion. + fn bisecting(&self, n: usize, k: usize) { + if self.enabled && n >= PROGRESS_BISECT_LOG { + eprintln!( + "[shard] bisecting {n} vertices → {k} shards ({:.1?} elapsed, {}/{} shards done)", + self.start.elapsed(), + self.finalized.load(Ordering::Relaxed), + self.total + ); + } + } + + /// A shard leaf was finalized; log roughly every 1/16 of the shards. Uses an + /// atomic counter so concurrent subtrees report coherently. + fn leaf(&self) { + let done = self.finalized.fetch_add(1, Ordering::Relaxed) + 1; + let step = (self.total / 16).max(1); + if self.enabled && (done.is_multiple_of(step) || done == self.total) { + eprintln!( + "[shard] {}/{} shards finalized ({:.1?} elapsed)", + done, + self.total, + self.start.elapsed() + ); + } + } + + fn done(&self) { + if self.enabled { + eprintln!( + "[shard] partitioned into {} shards in {:.1?}", + self.finalized.load(Ordering::Relaxed), + self.start.elapsed() + ); + } + } +} + +/// Bisect `sub` into two balanced parts minimizing cut weight. Returns a side +/// (0/1) per local vertex. +/// +/// Multilevel V-cycle: coarsen `sub` into a hierarchy, decide the cut on the +/// tiny coarsest graph (greedy graph-growing + FM to convergence), then +/// uncoarsen — projecting the cut back down and boundary-refining each level. +/// When `sub` is small or won't coarsen, `levels` is empty and we partition it +/// directly, so the worst case is never worse than a flat bisection. +fn bisect(sub: &SubHyper, epsilon: f64) -> Vec { + let n = sub.num_vertices(); + if n == 0 { + return Vec::new(); + } + if n == 1 { + return vec![0]; + } + let total_bw: u64 = sub.bw.iter().sum(); + // Balance bounds for side weights (invariant across levels — coarsening sums + // balance weight, so the total is preserved at every level). + let wmax = ((0.5 + epsilon) * total_bw as f64).ceil() as u64; + let wmin = ((0.5 - epsilon) * total_bw as f64).floor() as u64; + + let levels = coarsen(sub); + let coarsest = levels.last().map_or_else(|| sub.level(), |l| l.level()); + let side = initial_partition(coarsest, wmin, wmax); + uncoarsen_refine(sub, &levels, side, wmin, wmax) +} + +/// Greedy graph-growing: start from `seed` and repeatedly absorb the boundary +/// vertex most strongly connected (by net weight) to side 0 until side 0 reaches +/// ~half the balance weight. Remaining vertices go to side 1. +fn greedy_grow(lv: Level<'_>, total_bw: u64, seed: usize) -> Vec { + let n = lv.num_vertices(); + let mut side = vec![1u8; n]; + + // connection[v] = total weight of nets that already touch side 0 and include v. + let mut connection = vec![0u64; n]; + let mut in_side0 = vec![false; n]; + // Whether a net already contributes to side 0. A net bumps its pins exactly + // once (on first connection), so growth is O(Σ net_size), not O(Σ net_size²) + // — essential on dense graphs (and it also stops over-counting connection). + let mut net_touched = vec![false; lv.nets.len()]; + let mut heap: BinaryHeap<(u64, Reverse)> = BinaryHeap::new(); + let mut side0_bw = 0u64; + let target = total_bw / 2; + // Forward cursor for the disconnected-fallback path (amortized O(n) total, + // vs. an O(n) max-scan per fallback which is O(n²) on delta-sparse envs). + let mut fallback_cursor = 0usize; + + let mut add = |v: usize, + side: &mut [u8], + in_side0: &mut [bool], + connection: &mut [u64], + heap: &mut BinaryHeap<(u64, Reverse)>, + side0_bw: &mut u64| { + side[v] = 0; + in_side0[v] = true; + *side0_bw += lv.bw[v]; + for &ni in &lv.vnets[v] { + let nis = ni as usize; + let (w, pins) = &lv.nets[nis]; + // Skip hub nets, and nets already counted toward side 0. + if pins.len() > FM_NET_CAP || net_touched[nis] { + continue; + } + net_touched[nis] = true; + for &u in pins { + let u = u as usize; + if !in_side0[u] { + connection[u] += *w; + heap.push((connection[u], Reverse(u as u32))); + } + } + } + }; + + add( + seed, + &mut side, + &mut in_side0, + &mut connection, + &mut heap, + &mut side0_bw, + ); + + while side0_bw < target { + // Pop the best-connected vertex not yet in side 0 (lazy-validated). + let mut next = None; + while let Some((c, Reverse(v))) = heap.pop() { + let v = v as usize; + if !in_side0[v] && c == connection[v] { + next = Some(v); + break; + } + } + match next { + Some(v) => { + add( + v, + &mut side, + &mut in_side0, + &mut connection, + &mut heap, + &mut side0_bw, + ); + }, + None => { + // No connected frontier left (graph is disconnected here); fill toward + // balance with the next unassigned vertex by id. Disconnected vertices + // have no incident (non-hub) nets, so their placement order is + // cut-neutral. + while fallback_cursor < n && in_side0[fallback_cursor] { + fallback_cursor += 1; + } + if fallback_cursor < n { + add( + fallback_cursor, + &mut side, + &mut in_side0, + &mut connection, + &mut heap, + &mut side0_bw, + ); + } else { + break; + } + }, + } + } + side +} + +/// Per-net pin counts on each side, for incremental cut maintenance. +struct NetState { + cnt: Vec<[u32; 2]>, +} + +impl NetState { + fn new(lv: Level<'_>, side: &[u8]) -> (NetState, u128) { + let mut cnt = vec![[0u32; 2]; lv.nets.len()]; + let mut cut: u128 = 0; + for (i, (w, pins)) in lv.nets.iter().enumerate() { + if pins.len() > FM_NET_CAP { + continue; // hub net: invisible to FM (counts stay zero) + } + for &v in pins { + cnt[i][side[v as usize] as usize] += 1; + } + if cnt[i][0] > 0 && cnt[i][1] > 0 { + cut += u128::from(*w); + } + } + (NetState { cnt }, cut) + } +} + +/// FM gain of moving local vertex `v` from its side to the other. +/// `+w` for each net where `v`'s side has exactly one pin (the move uncuts it); +/// `−w` for each net fully on `v`'s side with ≥2 pins (the move newly cuts it). +fn fm_gain(lv: Level<'_>, ns: &NetState, side: &[u8], v: usize) -> i128 { + let a = side[v] as usize; + let b = 1 - a; + let mut g: i128 = 0; + for &ni in &lv.vnets[v] { + let (w, pins) = &lv.nets[ni as usize]; + if pins.len() > FM_NET_CAP { + continue; // hub net: ignored by FM + } + let ca = ns.cnt[ni as usize][a]; + let cb = ns.cnt[ni as usize][b]; + if cb >= 1 && ca == 1 { + g += i128::from(*w); + } else if cb == 0 && ca >= 2 { + g -= i128::from(*w); + } + } + g +} + +/// Fiduccia–Mattheyses refinement (boundary variant): up to `max_passes` passes, +/// each moving every vertex at most once in decreasing-gain order subject to the +/// balance window, tracking the lowest-cut prefix and rolling back to it. Stops +/// when a pass yields no improvement. +/// +/// Only **boundary** vertices (incident to a cut net) are seeded into the gain +/// heap: an interior vertex has non-positive gain (moving it can only newly-cut +/// nets), so it never improves the cut; vertices exposed to the boundary by an +/// applied move are added as that move's neighbors. This bounds a pass at +/// roughly `O(boundary + moves·degree)` rather than `O(V·degree)`, which is what +/// makes full refinement affordable even on the finest (≈`V`-sized) level. +fn fm_refine( + lv: Level<'_>, + side: &mut [u8], + wmin: u64, + wmax: u64, + max_passes: u32, +) { + let n = lv.num_vertices(); + if n <= 1 { + return; + } + // Contribution of one net (counts `ca` on the vertex's side, `cb` on the + // other, weight `w`) to the gain of moving that vertex: `+w` if it is the last + // pin on its side (the move uncuts the net), `−w` if its side is full with ≥2 + // pins (the move newly cuts it), else 0. + let contrib = |ca: u32, cb: u32, w: i128| -> i128 { + if cb >= 1 && ca == 1 { + w + } else if cb == 0 && ca >= 2 { + -w + } else { + 0 + } + }; + // Buffers reused across passes (no per-move allocation). + let mut gain = vec![0i128; n]; + let mut queued = vec![false; n]; + let mut locked = vec![false; n]; + let mut heap: BinaryHeap<(i128, Reverse)> = BinaryHeap::new(); + let mut moves: Vec = Vec::new(); + let mut pass = 0u32; + loop { + let (mut ns, mut cut) = NetState::new(lv, side); + let mut side0_bw: u64 = + (0..n).filter(|&v| side[v] == 0).map(|v| lv.bw[v]).sum(); + + // Full gain initialization (O(pins), cheap) so the incremental neighbor + // updates below — which only adjust the changed net's contribution — stay + // exact for interior vertices that later reach the boundary. + for (v, g) in gain.iter_mut().enumerate() { + *g = fm_gain(lv, &ns, side, v); + } + heap.clear(); + moves.clear(); + queued.fill(false); + locked.fill(false); + // Seed the cut frontier only (interior vertices have non-positive gain). + for (i, (_, pins)) in lv.nets.iter().enumerate() { + if pins.len() > FM_NET_CAP { + continue; // hub net: invisible to FM + } + if ns.cnt[i][0] > 0 && ns.cnt[i][1] > 0 { + for &v in pins { + let v = v as usize; + if !queued[v] { + queued[v] = true; + heap.push((gain[v], Reverse(v as u32))); + } + } + } + } + + let mut best_cut = cut; + let mut best_prefix = 0usize; // number of moves to keep + + while let Some((g, Reverse(v))) = heap.pop() { + let v = v as usize; + if locked[v] || g != gain[v] { + continue; // stale entry + } + // Balance check: moving v shifts bw[v] across the cut. + let (s0_after, ok) = if side[v] == 0 { + let s0 = side0_bw - lv.bw[v]; + (s0, s0 >= wmin) + } else { + let s0 = side0_bw + lv.bw[v]; + (s0, s0 <= wmax) + }; + if !ok { + // Infeasible right now; lock it out of this pass. + locked[v] = true; + continue; + } + // Apply the move, updating the cut and the gains of co-pins incrementally: + // only this net's contribution to each pin changes, so a move costs + // O(Σ pins of v's nets), not O(Σ neighbour degrees). + let a = side[v] as usize; + for &ni in &lv.vnets[v] { + let nis = ni as usize; + let (w, pins) = &lv.nets[nis]; + if pins.len() > FM_NET_CAP { + continue; // hub net: not tracked + } + let wi = i128::from(*w); + let c0 = ns.cnt[nis][0]; + let c1 = ns.cnt[nis][1]; + let (d0, d1) = if a == 0 { (c0 - 1, c1 + 1) } else { (c0 + 1, c1 - 1) }; + ns.cnt[nis][0] = d0; + ns.cnt[nis][1] = d1; + let before_cut = c0 > 0 && c1 > 0; + let after_cut = d0 > 0 && d1 > 0; + if before_cut && !after_cut { + cut -= u128::from(*w); + } else if !before_cut && after_cut { + cut += u128::from(*w); + } + for &uu in pins { + let u = uu as usize; + if u == v || locked[u] { + continue; + } + let su = side[u] as usize; + let (ca, cb) = if su == 0 { (c0, c1) } else { (c1, c0) }; + let (ca2, cb2) = if su == 0 { (d0, d1) } else { (d1, d0) }; + let delta = contrib(ca2, cb2, wi) - contrib(ca, cb, wi); + if delta != 0 { + gain[u] += delta; + queued[u] = true; + heap.push((gain[u], Reverse(uu))); + } + } + } + side[v] = (1 - a) as u8; + side0_bw = s0_after; + locked[v] = true; + moves.push(v); + + if cut < best_cut { + best_cut = cut; + best_prefix = moves.len(); + } + } + + // Roll back moves after the best prefix. + for &v in moves.iter().skip(best_prefix) { + side[v] ^= 1; + } + + pass += 1; + if best_prefix == 0 || pass >= max_passes { + break; // converged, or hit the pass bound + } + } +} + +// ============================================================================ +// Multilevel coarsening +// ============================================================================ + +/// One coarser level of a single bisection's multilevel hierarchy. Mirrors the +/// cut-relevant fields of [`SubHyper`] (`vw`/`bw`/`nets`/`vnets`) plus the +/// `match_map` that records, for each vertex of the *next-finer* level, which +/// supervertex here it was merged into — the inverse map used to project a +/// coarse partition back down during uncoarsening. +struct CoarseLevel { + vw: Vec, + bw: Vec, + nets: Vec<(u64, Vec)>, + vnets: Vec>, + match_map: Vec, +} + +impl CoarseLevel { + fn level(&self) -> Level<'_> { + Level { vw: &self.vw, bw: &self.bw, nets: &self.nets, vnets: &self.vnets } + } +} + +/// Build the coarsening hierarchy for one bisection: `levels[0]` is the first +/// contraction of `sub`, `levels.last()` is the coarsest graph (≈ +/// [`COARSEST_TARGET`] vertices). The finest level (`sub` itself) is *not* +/// duplicated into the vector — it is supplied separately during uncoarsening. +/// Returns an empty vector when `sub` is already small or can't be coarsened, in +/// which case [`bisect`] partitions `sub` directly. +fn coarsen(sub: &SubHyper) -> Vec { + let total_bw: u64 = sub.bw.iter().sum(); + // Cap a supervertex's balance weight so the coarsest graph keeps at least + // ~COARSEST_TARGET pieces and stays balanceable; never merge past it. + let max_cluster = (total_bw / COARSEST_TARGET as u64).max(1); + let mut levels: Vec = Vec::new(); + loop { + let cur: Level<'_> = match levels.last() { + Some(l) => l.level(), + None => sub.level(), + }; + let n = cur.num_vertices(); + if n <= COARSEST_TARGET { + break; + } + let (super_id, next) = match_vertices(cur, max_cluster); + if (next as f64) > COARSEN_STALL_RATIO * n as f64 { + break; // stalled: too few matchable pairs to make progress + } + let level = contract(cur, &super_id, next); + levels.push(level); + if next <= COARSEST_TARGET { + break; + } + } + levels +} + +/// Heavy-edge matching pass: pair each still-unmatched vertex (visited in id +/// order) with the unmatched neighbor it co-occurs with in the heaviest small +/// nets, subject to the cluster-weight cap. A vertex with no such neighbor +/// (delta-sparse, or only in hub nets) is instead paired with the next +/// unmatched vertex — a cut-neutral merge (they share no tracked net) that keeps +/// the graph shrinking ~2× per pass so coarsening reaches [`COARSEST_TARGET`] +/// instead of stalling far above it (which would make the initial partition +/// expensive). Returns `(super_id, num_super)` where `super_id[v]` is `v`'s +/// supervertex in the next-coarser level. Deterministic — ties break to lowest +/// id. +fn match_vertices(lv: Level<'_>, max_cluster: u64) -> (Vec, usize) { + let n = lv.num_vertices(); + let mut super_id = vec![u32::MAX; n]; + let mut next: u32 = 0; + // Dense score accumulator, reset per vertex via the `touched` list. + let mut score = vec![0u64; n]; + let mut touched: Vec = Vec::new(); + // Forward cursor over still-unmatched vertices for the fallback pairing + // (advances monotonically → amortized O(n) over the whole pass). + let mut fb = 0usize; + for v in 0..n { + if super_id[v] != u32::MAX { + continue; // already claimed as an earlier vertex's partner + } + for &ni in &lv.vnets[v] { + let (w, pins) = &lv.nets[ni as usize]; + let deg = pins.len(); + if !(2..=MATCH_NET_CAP).contains(°) { + continue; // singleton or hub: no clustering signal worth scanning + } + let contrib = w / (deg as u64 - 1); + if contrib == 0 { + continue; + } + for &u in pins { + let u = u as usize; + if u == v || super_id[u] != u32::MAX { + continue; // self, or already-matched vertex + } + if score[u] == 0 { + touched.push(u as u32); + } + score[u] = score[u].saturating_add(contrib); + } + } + // Pick the best-scoring partner that fits the cluster-weight cap. The + // explicit `u < best_u` tie-break makes the result independent of the + // (net/pin) iteration order. + let mut best_u = usize::MAX; + let mut best_score = 0u64; + for &ut in &touched { + let u = ut as usize; + let s = score[u]; + let fits = lv.bw[v].saturating_add(lv.bw[u]) <= max_cluster; + if fits && (s > best_score || (s == best_score && u < best_u)) { + best_score = s; + best_u = u; + } + } + for &ut in &touched { + score[ut as usize] = 0; + } + touched.clear(); + // Fallback: no heavy-edge partner — pair with the next unmatched vertex so + // coarsening keeps making progress. Cut-neutral (no shared tracked net). + // The cursor skips matched vertices and everything ≤ v (so never v itself). + if best_u == usize::MAX { + while fb < n && (super_id[fb] != u32::MAX || fb <= v) { + fb += 1; + } + if fb < n && lv.bw[v].saturating_add(lv.bw[fb]) <= max_cluster { + best_u = fb; + } + } + super_id[v] = next; + if best_u != usize::MAX { + super_id[best_u] = next; // matched pair shares a supervertex + } + next += 1; + } + (super_id, next as usize) +} + +/// Contract `lv` by `super_id` (which maps each vertex to one of `next` +/// supervertices) into the next coarser [`CoarseLevel`]. Supervertex weights are +/// member sums; each net's pins are remapped, sorted, and deduplicated, and the +/// net is dropped if it has fewer than two distinct coarse pins (it became +/// internal to a supervertex). Net weights are preserved. `super_id` is stored +/// as the level's `match_map` for the uncoarsening projection. +fn contract(lv: Level<'_>, super_id: &[u32], next: usize) -> CoarseLevel { + let mut vw = vec![0u64; next]; + let mut bw = vec![0u64; next]; + for v in 0..lv.num_vertices() { + let s = super_id[v] as usize; + vw[s] = vw[s].saturating_add(lv.vw[v]); + bw[s] = bw[s].saturating_add(lv.bw[v]); + } + let mut nets: Vec<(u64, Vec)> = Vec::with_capacity(lv.nets.len()); + for (w, pins) in lv.nets { + let mut cp: Vec = pins.iter().map(|&p| super_id[p as usize]).collect(); + cp.sort_unstable(); + cp.dedup(); + if cp.len() >= 2 { + nets.push((*w, cp)); + } + } + let mut vnets = vec![Vec::new(); next]; + for (i, (_, pins)) in nets.iter().enumerate() { + for &v in pins { + vnets[v as usize].push(i as u32); + } + } + CoarseLevel { vw, bw, nets, vnets, match_map: super_id.to_vec() } +} + +/// Decide the bisection on the coarsest graph: greedy graph-growing for a +/// locality-aware initial cut, then FM to (capped) convergence. The graph is +/// tiny (≈[`COARSEST_TARGET`] vertices), so this is where we spend on quality — +/// the *global* cut is decided here and only locally refined while uncoarsening. +/// We restart from several diverse seeds (see [`INITIAL_RESTARTS`]) and keep the +/// lowest-cut partition, which makes graph-growing robust to "leaking" across a +/// thin bridge between two clusters. +fn initial_partition(lv: Level<'_>, wmin: u64, wmax: u64) -> Vec { + let n = lv.num_vertices(); + if n == 0 { + return Vec::new(); + } + if n == 1 { + return vec![0]; + } + let total_bw: u64 = lv.bw.iter().sum(); + // Diverse deterministic seeds: the heaviest vertex plus points spread evenly + // across the id range (one tends to start far from any inter-cluster bridge). + let heaviest = (0..n) + .max_by(|&a, &b| lv.vw[a].cmp(&lv.vw[b]).then(Reverse(a).cmp(&Reverse(b)))) + .unwrap(); + let mut seeds = vec![heaviest]; + for i in 0..INITIAL_RESTARTS { + let s = (i * n) / INITIAL_RESTARTS; + if !seeds.contains(&s) { + seeds.push(s); + } + } + // Select the lowest-cut candidate, but *only* among non-degenerate splits + // (both sides non-empty) and preferring those within the balance window. A + // graph-growing run seeded at a light vertex can sweep an entire sub onto one + // side (cut 0) when a single atomic block holds more than half the balance + // weight; such a split is both unbalanced and degenerate, and accepting it + // would leave a shard empty downstream. Key: (unbalanced?, cut), minimized. + let mut best: Option<((u8, u128), Vec)> = None; + for &seed in &seeds { + let mut side = greedy_grow(lv, total_bw, seed); + fm_refine(lv, &mut side, wmin, wmax, MAX_FM_PASSES); + let s0 = (0..n).filter(|&v| side[v] == 0).count(); + if s0 == 0 || s0 == n { + continue; // degenerate (one side empty) — never select + } + let side0_bw: u64 = + (0..n).filter(|&v| side[v] == 0).map(|v| lv.bw[v]).sum(); + let unbalanced = u8::from(side0_bw < wmin || side0_bw > wmax); + let (_, cut) = NetState::new(lv, &side); + let key = (unbalanced, cut); + if best.as_ref().is_none_or(|(bk, _)| key < *bk) { + best = Some((key, side)); + } + } + // Fallback (no non-degenerate candidate — pathological): split by id so both + // sides are non-empty and recursion can still realize every shard. + best + .map_or_else(|| (0..n).map(|v| u8::from(v >= n / 2)).collect(), |(_, s)| s) +} + +/// Project the coarse partition down to the finest level, boundary-refining at +/// each step. `side` enters indexed by coarsest supervertex; each finer level +/// inherits its supervertex's side (`side[fine] = coarse[match_map[fine]]`) and +/// is FM-refined ([`REFINE_PASSES`] passes) to fix only its cut frontier. The +/// returned `side` is indexed by `sub` vertex — the same contract a flat +/// bisection returns. +fn uncoarsen_refine( + sub: &SubHyper, + levels: &[CoarseLevel], + mut side: Vec, + wmin: u64, + wmax: u64, +) -> Vec { + for i in (0..levels.len()).rev() { + // The level finer than `levels[i]` is `levels[i-1]`, or `sub` at the bottom. + let (finer_n, finer_lv) = if i == 0 { + (sub.num_vertices(), sub.level()) + } else { + (levels[i - 1].vw.len(), levels[i - 1].level()) + }; + let mm = &levels[i].match_map; // finer vertex -> levels[i] supervertex + let mut finer_side = vec![0u8; finer_n]; + for v in 0..finer_n { + finer_side[v] = side[mm[v] as usize]; + } + fm_refine(finer_lv, &mut finer_side, wmin, wmax, REFINE_PASSES); + side = finer_side; + } + side +} + +// ============================================================================ +// Manifest +// ============================================================================ + +/// Per-shard summary in a [`ShardManifest`]. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ShardInfo { + /// Shard id (an `n`-bit path through the bisection tree). + pub id: u32, + /// Member block addresses. + pub blocks: Vec
, + /// Sum of member heartbeats (balance metric). + pub heartbeats: u64, + /// Sum of member serialized sizes (the shard's own ingress). + pub own_size: u64, + /// Foreign blocks delta-unfolded by members but proven in other shards. + pub foreign_blocks: Vec
, + /// Sum of `serialized_size` over `foreign_blocks` (this shard's share of the + /// cross-shard ingress objective). + pub cross_ingress: u64, + /// Sound assumption-tree root for this shard's conditional proof: the Merkle + /// root over the foreign part of the shard's static reference closure. `None` + /// until populated by the env-aware layer (the pure partitioner has no `Env`). + pub assumption_root: Option
, +} + +/// The sharding manifest: the partition plus its cost metrics. Assumption-tree +/// roots (which require the static reference closure from the `Env`) are filled +/// in by the env-aware CLI layer, not here. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ShardManifest { + /// Number of shards (the partition target == `shards.len()`). + pub num_shards: u32, + pub shards: Vec, + /// Total cross-shard ingress bytes (the km1 objective). + pub total_cross_ingress: u128, +} + +impl ShardManifest { + /// Build the manifest from a profile and a shard assignment (shard id in + /// `0..num_shards` per block id). + pub fn build( + profile: &BlockProfile, + shard_of: &[u32], + num_shards: usize, + ) -> ShardManifest { + let mut members: Vec> = vec![Vec::new(); num_shards]; + for (b, &s) in shard_of.iter().enumerate() { + members[s as usize].push(b as u32); + } + // Distinct foreign blocks per shard. + let mut foreign: Vec> = + vec![FxHashSet::default(); num_shards]; + for b in 0..profile.num_blocks() as u32 { + let s = shard_of[b as usize] as usize; + for &p in profile.producers(b) { + if shard_of[p as usize] as usize != s { + foreign[s].insert(p); + } + } + } + let mut shards = Vec::with_capacity(num_shards); + let mut total_cross: u128 = 0; + for s in 0..num_shards { + let blocks: Vec
= + members[s].iter().map(|&b| profile.block(b).addr.clone()).collect(); + let heartbeats: u64 = + members[s].iter().map(|&b| profile.block(b).heartbeats).sum(); + let own_size: u64 = members[s] + .iter() + .map(|&b| u64::from(profile.block(b).serialized_size)) + .sum(); + let mut fb: Vec = foreign[s].iter().copied().collect(); + fb.sort_unstable(); + let cross_ingress: u64 = + fb.iter().map(|&p| u64::from(profile.block(p).serialized_size)).sum(); + total_cross += u128::from(cross_ingress); + let foreign_blocks: Vec
= + fb.iter().map(|&p| profile.block(p).addr.clone()).collect(); + shards.push(ShardInfo { + id: s as u32, + blocks, + heartbeats, + own_size, + foreign_blocks, + cross_ingress, + assumption_root: None, + }); + } + ShardManifest { + num_shards: num_shards as u32, + shards, + total_cross_ingress: total_cross, + } + } + + /// A human-readable what-if summary line. + pub fn summary(&self) -> String { + let hbs: Vec = self.shards.iter().map(|s| s.heartbeats).collect(); + let nonempty: Vec = self + .shards + .iter() + .filter(|s| !s.blocks.is_empty()) + .map(|s| s.heartbeats) + .collect(); + let max = hbs.iter().copied().max().unwrap_or(0); + let min = nonempty.iter().copied().min().unwrap_or(0); + let total: u128 = hbs.iter().map(|&h| u128::from(h)).sum(); + let mean = if self.shards.is_empty() { + 0 + } else { + (total / self.shards.len() as u128) as u64 + }; + let empty = self.shards.iter().filter(|s| s.blocks.is_empty()).count(); + let max_cross = + self.shards.iter().map(|s| s.cross_ingress).max().unwrap_or(0); + format!( + "shards={} (empty={}) heartbeats[min={} mean={} max={}] imbalance={:.2}x \ + cross_ingress_total={} max_shard_cross={}", + self.shards.len(), + empty, + min, + mean, + max, + if mean == 0 { 0.0 } else { max as f64 / mean as f64 }, + self.total_cross_ingress, + max_cross, + ) + } + + /// Serialize to the `.ixes` binary format. + pub fn to_bytes(&self) -> Vec { + let mut out = Vec::new(); + out.extend_from_slice(SHARD_MAGIC); + out.extend_from_slice(&self.total_cross_ingress.to_le_bytes()); + out.extend_from_slice(&(self.shards.len() as u32).to_le_bytes()); + let put_addrs = |out: &mut Vec, addrs: &[Address]| { + out.extend_from_slice(&(addrs.len() as u32).to_le_bytes()); + for a in addrs { + out.extend_from_slice(a.as_bytes()); + } + }; + for sh in &self.shards { + out.extend_from_slice(&sh.id.to_le_bytes()); + out.extend_from_slice(&sh.heartbeats.to_le_bytes()); + out.extend_from_slice(&sh.own_size.to_le_bytes()); + out.extend_from_slice(&sh.cross_ingress.to_le_bytes()); + match &sh.assumption_root { + Some(a) => { + out.push(1); + out.extend_from_slice(a.as_bytes()); + }, + None => out.push(0), + } + put_addrs(&mut out, &sh.blocks); + put_addrs(&mut out, &sh.foreign_blocks); + } + out + } + + /// Deserialize from the `.ixes` binary format. + pub fn from_bytes(bytes: &[u8]) -> Result { + let mut c = Cur { buf: bytes, pos: 0 }; + if c.take(8)? != SHARD_MAGIC { + return Err("not an .ixes file (bad magic)".into()); + } + let total_cross_ingress = c.u128()?; + let num_shards = c.u32()? as usize; + let mut shards = Vec::with_capacity(num_shards); + for _ in 0..num_shards { + let id = c.u32()?; + let heartbeats = c.u64()?; + let own_size = c.u64()?; + let cross_ingress = c.u64()?; + let assumption_root = if c.u8()? == 1 { Some(c.addr()?) } else { None }; + let blocks = c.addrs()?; + let foreign_blocks = c.addrs()?; + shards.push(ShardInfo { + id, + blocks, + heartbeats, + own_size, + foreign_blocks, + cross_ingress, + assumption_root, + }); + } + Ok(ShardManifest { + num_shards: num_shards as u32, + shards, + total_cross_ingress, + }) + } +} + +/// Magic bytes at the head of every `.ixes` file. +const SHARD_MAGIC: &[u8; 8] = b"IXES\0\0\0\0"; + +/// Minimal little-endian cursor for manifest decoding. +struct Cur<'a> { + buf: &'a [u8], + pos: usize, +} + +impl<'a> Cur<'a> { + fn take(&mut self, n: usize) -> Result<&'a [u8], String> { + let end = self.pos.checked_add(n).ok_or("truncated .ixes")?; + if end > self.buf.len() { + return Err("truncated .ixes".into()); + } + let s = &self.buf[self.pos..end]; + self.pos = end; + Ok(s) + } + fn u8(&mut self) -> Result { + Ok(self.take(1)?[0]) + } + fn u32(&mut self) -> Result { + Ok(u32::from_le_bytes(self.take(4)?.try_into().unwrap())) + } + fn u64(&mut self) -> Result { + Ok(u64::from_le_bytes(self.take(8)?.try_into().unwrap())) + } + fn u128(&mut self) -> Result { + Ok(u128::from_le_bytes(self.take(16)?.try_into().unwrap())) + } + fn addr(&mut self) -> Result { + Address::from_slice(self.take(32)?).map_err(|_| "bad address".into()) + } + fn addrs(&mut self) -> Result, String> { + let n = self.u32()? as usize; + let mut v = Vec::with_capacity(n); + for _ in 0..n { + v.push(self.addr()?); + } + Ok(v) + } +} + +/// Read a `.ixesp`, partition into `num_shards` shards, and emit a manifest with +/// per-shard cost metrics, foreign-block sets, and (delta-based) assumption +/// roots. Optionally writes the manifest (`.ixes`). Returns a what-if report. +/// +/// The assumption root here is the Merkle root over the foreign *blocks* whose +/// bodies the shard must ingress (the cost-model-tight set). The fully-sound +/// variant for proof generation uses the static reference closure over member +/// constants and additionally needs the `.ixe`; see `plans/sharding.md` §4. +pub fn shard_esp( + esp_path: &str, + num_shards: usize, + balance: f64, + out_path: Option<&str>, +) -> Result { + let bytes = + std::fs::read(esp_path).map_err(|e| format!("read {esp_path}: {e}"))?; + let profile = BlockProfile::from_bytes(&bytes) + .map_err(|e| format!("parse {esp_path}: {e}"))?; + let h = Hypergraph::from_profile(&profile); + let shard_of = h.partition(num_shards, balance); + let mut manifest = ShardManifest::build(&profile, &shard_of, num_shards); + for shard in &mut manifest.shards { + shard.assumption_root = + crate::ix::ixon::merkle::merkle_root_canonical(&shard.foreign_blocks); + } + if let Some(op) = out_path { + std::fs::write(op, manifest.to_bytes()) + .map_err(|e| format!("write {op}: {e}"))?; + } + // The largest single block's heartbeats is the *floor* on achievable + // per-shard balance: a mutual block is atomic and cannot be split, so no + // partition can drive max-shard heartbeats below it. When the heaviest shard + // is pinned at this floor, adding more shards only worsens imbalance — a + // signal to stop raising the shard count or to split the block. + let max_block_hb = + profile.blocks().iter().map(|b| b.heartbeats).max().unwrap_or(0); + let max_shard_hb = + manifest.shards.iter().map(|s| s.heartbeats).max().unwrap_or(0); + let floored = + num_shards > 1 && max_shard_hb <= max_block_hb.saturating_mul(11) / 10; + let note = if floored { + " [balance floored by largest atomic block — more shards won't help]" + } else { + "" + }; + Ok(format!( + "blocks={} delta_edges={} nets={}\n{}\nlargest_block_hb={}{}", + profile.num_blocks(), + profile.num_edges(), + h.num_nets(), + manifest.summary(), + max_block_hb, + note, + )) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::ix::profile::ProfileBuilder; + + fn addr(byte: u8) -> Address { + Address::from_slice(&[byte; 32]).unwrap() + } + + /// A distinct address for each `n` (more than the 256 `addr(u8)` affords), + /// for fixtures large enough to exercise the multilevel coarsening path. + /// Big-endian so that address-sort order (which fixes block ids) matches + /// numeric order, keeping cluster members id-contiguous in the fixtures below. + fn addr_u32(n: u32) -> Address { + let mut b = [0u8; 32]; + b[..4].copy_from_slice(&n.to_be_bytes()); + Address::from_slice(&b).unwrap() + } + + /// Two tight clusters {1,2,3} and {4,5,6} with heavy intra-cluster delta and a + /// single thin cross edge. A good bisection cuts only the thin edge. + fn two_clusters() -> BlockProfile { + let mut b = ProfileBuilder::new(); + for i in 1..=6u8 { + b.block(addr(i), 100, 1000, 1); + } + // intra cluster A + b.delta_edge(addr(1), addr(2)); + b.delta_edge(addr(2), addr(3)); + b.delta_edge(addr(3), addr(1)); + // intra cluster B + b.delta_edge(addr(4), addr(5)); + b.delta_edge(addr(5), addr(6)); + b.delta_edge(addr(6), addr(4)); + // thin cross edge A->B + b.delta_edge(addr(3), addr(4)); + b.finish() + } + + #[test] + fn bisect_separates_clusters() { + let p = two_clusters(); + let h = Hypergraph::from_profile(&p); + let shard_of = h.partition(2, 0.10); + // Blocks 0,1,2 (addr 1,2,3) should share a shard; 3,4,5 (addr 4,5,6) the other. + assert_eq!(shard_of[0], shard_of[1]); + assert_eq!(shard_of[1], shard_of[2]); + assert_eq!(shard_of[3], shard_of[4]); + assert_eq!(shard_of[4], shard_of[5]); + assert_ne!(shard_of[0], shard_of[3]); + // Objective: only the cross net (addr 4, size 1000) is cut, λ=2 → 1000. + assert_eq!(h.connectivity_objective(&shard_of), 1000); + } + + #[test] + fn objective_matches_manifest_cross_ingress() { + let p = two_clusters(); + let h = Hypergraph::from_profile(&p); + let shard_of = h.partition(2, 0.10); + let m = ShardManifest::build(&p, &shard_of, 2); + // Σ per-shard cross-ingress must equal the km1 objective. + assert_eq!(m.total_cross_ingress, h.connectivity_objective(&shard_of)); + } + + #[test] + fn balanced_partition() { + let p = two_clusters(); + let h = Hypergraph::from_profile(&p); + let shard_of = h.partition(2, 0.10); + let m = ShardManifest::build(&p, &shard_of, 2); + assert_eq!(m.shards.len(), 2); + // Each cluster has 3 blocks × 100 heartbeats = 300; perfectly balanced. + assert_eq!(m.shards[0].heartbeats, 300); + assert_eq!(m.shards[1].heartbeats, 300); + } + + #[test] + fn four_shards_perfect_split() { + // Four clusters of 4 blocks each, heavy intra, no cross: perfect 4-way split. + let mut b = ProfileBuilder::new(); + for c in 0..4u8 { + let base = c * 4 + 1; + for k in 0..4u8 { + b.block(addr(base + k), 100, 500, 1); + } + b.delta_edge(addr(base), addr(base + 1)); + b.delta_edge(addr(base + 1), addr(base + 2)); + b.delta_edge(addr(base + 2), addr(base + 3)); + b.delta_edge(addr(base + 3), addr(base)); + } + let p = b.finish(); + let h = Hypergraph::from_profile(&p); + let shard_of = h.partition(4, 0.10); + let m = ShardManifest::build(&p, &shard_of, 4); + assert_eq!(m.shards.len(), 4); + // No cross-cluster edges → zero cross ingress achievable. + assert_eq!(m.total_cross_ingress, 0); + // Each non-empty shard should hold exactly one cluster (4×100). + for s in &m.shards { + assert_eq!(s.heartbeats, 400); + } + } + + #[test] + fn manifest_roundtrip_serialization() { + let p = two_clusters(); + let h = Hypergraph::from_profile(&p); + let shard_of = h.partition(2, 0.10); + let mut m = ShardManifest::build(&p, &shard_of, 2); + // Simulate the env layer filling in an assumption root on one shard. + m.shards[0].assumption_root = Some(addr(42)); + let bytes = m.to_bytes(); + let q = ShardManifest::from_bytes(&bytes).unwrap(); + assert_eq!(m, q); + assert_eq!(q.shards[0].assumption_root, Some(addr(42))); + assert_eq!(q.shards[1].assumption_root, None); + } + + #[test] + fn cap_keeps_shards_non_empty_under_skew() { + // A heavy atomic block among many light ones. Capping the balance weight at + // total/num_shards (plus leaf-count allocation) must keep every shard + // non-empty (parallelism is the goal), even though heartbeat balance is + // impossible. + let mut b = ProfileBuilder::new(); + b.block(addr(1), 30_000, 100, 1); // ~30x a light block + for i in 2..=65u8 { + b.block(addr(i), 1000, 100, 1); + } + for i in 2..=64u8 { + b.delta_edge(addr(i), addr(i + 1)); + } + let p = b.finish(); + let h = Hypergraph::from_profile(&p); + let shard_of = h.partition(8, 0.20); // 8 shards from 65 blocks + let m = ShardManifest::build(&p, &shard_of, 8); + assert_eq!(m.shards.len(), 8); + assert_eq!( + m.shards.iter().filter(|s| s.blocks.is_empty()).count(), + 0, + "capping must keep all shards non-empty" + ); + } + + #[test] + fn shard_esp_file_roundtrip() { + // Exercise the CLI's shard path: write a .ixesp, run shard_esp, read back + // the .ixes manifest. + let p = two_clusters(); + let dir = std::env::temp_dir(); + let pid = std::process::id(); + let prof = dir.join(format!("ix_shard_rt_{pid}.ixesp")); + let shard = dir.join(format!("ix_shard_rt_{pid}.ixes")); + std::fs::write(&prof, p.to_bytes()).unwrap(); + + let report = + shard_esp(prof.to_str().unwrap(), 2, 0.10, Some(shard.to_str().unwrap())) + .unwrap(); + assert!(report.contains("shards=2"), "report was: {report}"); + + let m = ShardManifest::from_bytes(&std::fs::read(&shard).unwrap()).unwrap(); + assert_eq!(m.shards.len(), 2); + assert_eq!(m.total_cross_ingress, 1000); + + let _ = std::fs::remove_file(&prof); + let _ = std::fs::remove_file(&shard); + } + + #[test] + fn contract_sums_weights_and_drops_internal_nets() { + // 4 vertices, 3 nets. Merge {0,1}->super0 and {2,3}->super1. + let global = vec![0u32, 1, 2, 3]; + let vw = vec![10u64, 20, 30, 40]; + let nets = vec![ + (100u64, vec![0u32, 1]), // becomes internal to super0 -> dropped + (200u64, vec![0u32, 2]), // crosses -> kept + (300u64, vec![2u32, 3]), // becomes internal to super1 -> dropped + ]; + let sub = SubHyper::assemble(global, vw, nets, 1_000_000); + let super_id = vec![0u32, 0, 1, 1]; + let cl = contract(sub.level(), &super_id, 2); + + assert_eq!(cl.vw, vec![30, 70], "supervertex weights are member sums"); + assert_eq!(cl.bw, vec![30, 70]); + assert_eq!(cl.match_map, super_id); + // Only the crossing net survives, with its weight preserved. + assert_eq!(cl.nets.len(), 1); + assert_eq!(cl.nets[0].0, 200); + assert_eq!(cl.nets[0].1, vec![0u32, 1]); + // vnets rebuilt to reference the surviving net. + assert_eq!(cl.vnets, vec![vec![0u32], vec![0u32]]); + } + + #[test] + fn match_respects_cluster_weight_cap() { + // 0 and 1 share a heavy net; 2 is isolated. bw is each 10. + let sub = SubHyper::assemble( + vec![0u32, 1, 2], + vec![10u64, 10, 10], + vec![(100u64, vec![0u32, 1])], + 1_000_000, + ); + // Generous cap: 0 and 1 merge, 2 stays singleton -> 2 supervertices. + let (sid, k) = match_vertices(sub.level(), 100); + assert_eq!(sid[0], sid[1]); + assert_ne!(sid[0], sid[2]); + assert_eq!(k, 2); + // Cap below the combined weight (10+10=20): no merge -> 3 supervertices. + let (sid2, k2) = match_vertices(sub.level(), 19); + assert_ne!(sid2[0], sid2[1]); + assert_eq!(k2, 3); + } + + /// Two tight clusters of `m` blocks each (intra-cluster delta cycles) joined by + /// a single thin cross edge — large enough (`2m > COARSEST_TARGET`) to drive + /// the full coarsen → uncoarsen path. + fn two_big_clusters(m: u32) -> BlockProfile { + let mut b = ProfileBuilder::new(); + for i in 0..2 * m { + b.block(addr_u32(i + 1), 100, 1000, 1); + } + for i in 0..m { + // cluster A cycle over addrs 1..=m + b.delta_edge(addr_u32(1 + i), addr_u32(1 + (i + 1) % m)); + // cluster B cycle over addrs m+1..=2m + b.delta_edge(addr_u32(1 + m + i), addr_u32(1 + m + (i + 1) % m)); + } + // single thin cross edge: A_0 unfolds B_0 + b.delta_edge(addr_u32(1), addr_u32(1 + m)); + b.finish() + } + + #[test] + fn multilevel_separates_large_clusters() { + // 1024 blocks: past the cap "dead-zone" (n < 2·COARSEST_TARGET), so heavy- + // edge matching contracts the graph across multiple levels. + let m = 512u32; + let p = two_big_clusters(m); + let h = Hypergraph::from_profile(&p); + let shard_of = h.partition(2, 0.10); + + // Optimal cut is exactly the single cross net (size 1000). Any split that + // also breaks a cluster cycle would cut an intra net (also 1000), so this + // equality certifies clean cluster separation. + assert_eq!(h.connectivity_objective(&shard_of), 1000); + + // Cluster coherence, mapping addresses to (sorted) block ids. + let id_of: std::collections::HashMap = (0..p.num_blocks() + as u32) + .map(|i| (p.block(i).addr.clone(), i)) + .collect(); + let sid = |n: u32| shard_of[id_of[&addr_u32(n)] as usize]; + let shard_a = sid(1); + let shard_b = sid(1 + m); + assert_ne!(shard_a, shard_b); + for i in 0..m { + assert_eq!(sid(1 + i), shard_a, "cluster A must be coherent"); + assert_eq!(sid(1 + m + i), shard_b, "cluster B must be coherent"); + } + } + + #[test] + fn multilevel_partition_is_deterministic() { + // 360 blocks > COARSEST_TARGET, partitioned into 4 (exercises recursion + + // coarsening). The partition must be byte-for-byte reproducible. + let p = two_big_clusters(180); + let h = Hypergraph::from_profile(&p); + let a = h.partition(4, 0.10); + let b = h.partition(4, 0.10); + assert_eq!(a, b); + } + + #[test] + fn multilevel_no_empty_shards_with_dominant_block() { + // One block far heavier than all others, among enough light blocks to drive + // coarsening. During recursion a sub can be dominated by the giant; a + // restart seeded at a light block then sweeps the whole sub onto one side + // (cut 0). The initial-partition selection must reject such degenerate + // candidates, or a shard ends up empty. (Regression guard.) + let mut b = ProfileBuilder::new(); + let m = 800u32; + b.block(addr_u32(1), 5_000_000, 100, 1); // the giant + for i in 2..=m { + b.block(addr_u32(i), 1000, 100, 1); + } + for i in 1..m { + b.delta_edge(addr_u32(i), addr_u32(i + 1)); // a chain (incl. the giant) + } + let p = b.finish(); + let h = Hypergraph::from_profile(&p); + for n in [16usize, 64, 200] { + let shard_of = h.partition(n, 0.05); + let mani = ShardManifest::build(&p, &shard_of, n); + assert_eq!( + mani.shards.iter().filter(|s| s.blocks.is_empty()).count(), + 0, + "n={n}: every shard must be non-empty despite the dominant block" + ); + } + } +}