Skip to content

[Feature] Add Distributed Posting Router for SPANN#448

Open
TerrenceZhangX wants to merge 12 commits into
users/qiazh/pre-merge-tikv-bugfixfrom
users/zhangt/merge-distributed-to-tikv
Open

[Feature] Add Distributed Posting Router for SPANN#448
TerrenceZhangX wants to merge 12 commits into
users/qiazh/pre-merge-tikv-bugfixfrom
users/zhangt/merge-distributed-to-tikv

Conversation

@TerrenceZhangX
Copy link
Copy Markdown

@TerrenceZhangX TerrenceZhangX commented May 7, 2026

Limitations

  1. Using buildOnly flags to mitigate no ditributed build limitation. Current flow is using buildOnly to let single node to build and distribute to other nodes. Then using disable buildOnly to let multiple nodes load same head index and run benchmarks.

Scale results

Dataset: SIFT1B bigann_base.u8bin, 128d UInt8, L2. SPANN 2-layer index,
4 search threads, 4 insert threads, top-K=5, 200 queries.

1M base + 1M insert

Metric 1node 2node Scale
Build time (s) 74.2 91.3 0.81×
Pre-insert QPS 429.3 696.3 1.62×
Pre-insert mean latency (ms) 9.26 8.63
Pre-insert p99 (ms) 27.31 29.18
Post-insert QPS 425.0 708.1 1.67×
Insert throughput (vec/s) ~900 1793
Recall@5 (pre-insert) 0.984 0.978
Recall@5 (post-insert) 0.983 0.984

Notes:

  • Build is slower on 2node at 1M scale: the cross-node coordination
    overhead (head-sync RPCs, control plane) dominates when there are only
    ~40k head vectors to build.
  • Search scales well already at 1M; recall is unchanged.

100M base + 1M insert

Metric 1node 2node Scale
Build time (s) 15292 16264 0.94×
Pre-insert QPS 183.3 360.5 1.97×
Pre-insert mean latency (ms) 21.56 21.92
Pre-insert p99 (ms) 32.81 39.55
Post-insert QPS (round 2) 183.2 337.2 1.84×
Insert throughput (vec/s) 738 1285 1.74×
Recall@5 (pre-insert) 0.912 0.904
Recall@5 (post-insert) 0.912 0.904

Notes:

  • Search scales near-linearly (1.97×) at the target scale — the per-node
    query partition + remote KV reads are well-balanced.
  • Insert scales sublinearly (1.74×), expected: every insert that
    promotes/splits a head triggers head-index sync across nodes, which is the
    current bottleneck.
  • Build is essentially flat (0.94×): build is dominated by per-node local
    graph construction; 2node has additional head-sync but no work split, so
    near-no scaling here is expected for the current single-builder design.
  • Recall is stable across configurations (within 0.01).

@TerrenceZhangX TerrenceZhangX force-pushed the users/zhangt/merge-distributed-to-tikv branch from dfc4c89 to a158014 Compare May 7, 2026 13:55
@TerrenceZhangX TerrenceZhangX marked this pull request as ready for review May 14, 2026 10:54
Comment thread AnnService/inc/Core/Common/FineGrainedLock.h Outdated
SPTAGLIB_LOG(Helper::LogLevel::LL_Error,
"[Bug 25] Search post-fetch: out-of-range VID:%d (count:%d, deficit:%d > %d, numWorkers:%d); records will be skipped\n",
maxVid, currentCount, deficit, maxAllowed, numWorkers);
} else if (m_versionMap->AddBatch(deficit) != ErrorCode::Success) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why search need to be changed?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each node's local m_count tracks only its own AddIndex progress, not peers'. When search reads a TiKV posting whose bytes were written by a peer running ahead of us, those record VIDs can fall above our local m_count. GetVersion short-circuits any VID >= m_count to 0xfe, so without growing m_count first the post-heap filter would drop those records as deleted and recall would silently regress. This loop grows m_count just enough to cover the largest VID we actually fetched

// at the trigger site avoids the round trip entirely.
// Single-node behavior (m_worker disabled) is preserved.
if (m_opt->m_asyncMergeInSearch && realNum <= m_mergeThreshold) {
if (m_worker && m_worker->IsEnabled()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though it is not local and not layer 0, it still needs to do mergeasync.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. This is over-simplification. Fixed to mergeAsync trigger based on headID owner.

@TerrenceZhangX TerrenceZhangX requested a review from zqxjjj May 18, 2026 06:59
Branch users/zhangt/merge-onto-qiazh ports our shared remote/local pool +
per-layer routing changes from users/zhangt/merge-distributed-to-tikv on
top of qianxi's TiKV bugfix branch (lock ordering, splitAsync, version
check, etc.). Avoids the 21-block ExtraDynamicSearcher.h merge conflict
on the merged_spfresh side by replaying instead of merging.

Pragmatic approach for heavy files (ExtraDynamicSearcher.h, SPFreshTest.cpp):
take our HEAD versions wholesale (which already contain our distributed +
MultiChunk logic), and patch only the compile-breaking deltas caused by
qianxi's refactors:
  - PostingCountCache moved from ExtraDynamicSearcher.h to ExtraTiKVController.h
  - KeyValueIO grew MultiMerge + LogAsyncWaitStatsAndReset virtuals
    (qianxi version kept; our MultiPut/MultiDelete virtuals re-added on top)
  - Options/ParameterDefinitionList: kept qianxi version (adds m_globalIDPath)
  - ThreadPool: kept our add_high + added addfront alias for qianxi callers

Index.h / IExtraSearcher.h / SPANNIndex.cpp: applied small additive hooks
on top of qianxi (forward-decl WorkerNode, SetWorker/GetSharedSplitPool
accessors, BuildIndexInternalLayer + AddIndex worker loop). qianxi
bugfixes preserved in those files.

Build system:
  - CMakeLists updated for absl_cord + cordz family (kvproto 25.3 uses
    absl 2308, anaconda's grpc bundles 2111; explicit linkage avoids
    DSO-missing-from-command-line)
  - cmake invoked with gRPC_DIR/Protobuf_DIR/absl_DIR pointing at
    /usr/local so generated kvproto + libabsl 2308 versions align

Verified: SPTAGTest links cleanly.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@TerrenceZhangX TerrenceZhangX force-pushed the users/zhangt/merge-distributed-to-tikv branch from 90dbbae to 8716007 Compare May 20, 2026 06:52
TerrenceZhangX and others added 11 commits May 20, 2026 07:21
Strip the SPFRESH_SHARD_STRIDE opt-in code path (4 helpers + plumbing
through LoadAndInsertBatch/RunBenchmark/RunWorker). No active config
sets the env var; we always use the contiguous slice partition.

Test/CMakeLists.txt: explicitly link ${TiKV_LIBRARIES} into SPTAGTest
so a clean build (no .o cache) resolves gpr_/grpc_ symbols pulled in
by the kvproto generated stubs.

ThirdParty/kvproto/.gitignore: stop tracking regenerated stubs going
forward — they are environment-specific (must match the protoc/grpc
in the build env); regenerate locally via generate_cpp.sh.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The previous if/else duplicated the thread launch+join. Restructure to
a single launch with an optional search-during-insert thread:
  - launch insertThreadCount workers
  - if benchmarking, launch one search thread in parallel
  - join all, then compute stats (only when search ran)

Also log a clear note when the bulk router path is used: the user-
supplied InsertThreadNum is unused there (driver runs one launcher
thread and parallelism comes from [BuildSSDIndex] AppendThreadNum
inside ExtraDynamicSearcher's append/split pool).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
8716007 removed the (m_layers+1) multiplier in the SPDK BlockController
queue-depth formula. The change was based on an incorrect assumption
that the distributed port collapses all per-layer SPDK pools into the
single shared layer-0 pool. In practice only layer 0 + the RPC receiver
share a pool; every inner layer (m_layer >= 1) still creates its own
SPDKThreadPool in both BuildIndex and LoadIndex.

With Layers=2 (current active configs) we therefore have ~2 independent
pools each running insert + reassign + append worker threads, so the
peak concurrent IO-submitter count remains the qianxi-original
(layers+1)*(insert+reassign+append) plus search threads. Under-sizing
the BlockController queue could stall IO submission under heavy
split/reassign + search load; over-sizing is harmless. Restore the
multiplier to match qianxi behaviour.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
All distributed runs override VersionCacheMaxChunks=0 (set by
run_distributed.sh in build/run/nocache phases), so the LRU cache is
effectively disabled. Using ReadChunkCached inside SetVersionBatch
adds bookkeeping noise (cache hit/miss path, refresh-mutex acquire)
that produces no benefit. Switch to direct ReadChunk; the dirty-byte
gating still saves the WriteChunk RPC when no version byte actually
changes.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The distributed port introduced a separate m_highJobs queue + add_high
in ThreadPool plus 'urgent' parameters on AppendAsync/ReassignAsync.
Receiver dispatch already discovered high-priority starved Split jobs
and switched to high=false. The remaining urgent=true callers were:

  - AppendAsync in CollectReAssign's non-TiKV branch (dead under
    Storage::TIKVIO which is the only storage we use)
  - ReassignAsync on head-miss in Append/BatchAppend (same starvation
    risk against Split that motivated the receiver-side revert)

Restore ThreadPool.h to the upstream deque+addfront shape (no semantic
change vs. original) and drop the urgent parameter from AppendAsync/
ReassignAsync, the high flag from JobSubmitter, and the high path from
WireJobSubmitterIfReady.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
run_distributed.sh:
- Remove wait_workers_ready() — dead since the driver-listens-on-30001
  handshake replaced log-grep readiness detection.
- Drop the stale 'Binary already pushed; nothing else to do here' comment
  that sat immediately after the actual binary-push rsync block.

README.md:
- Correct the TiKV deployment model: the cluster is SHARED (all PDs in
  one raft group, all TiKVs registered as stores, max-replicas=1) — not
  one isolated PD+TiKV per node as the old text claimed. Architecture
  diagram, port table, and pre-split helper updated accordingly (one PD
  endpoint, not a per-node loop).
- Fix Step 1 cluster-config path: configs/cluster_2node.conf (an actual
  shipped file), not the non-existent cluster.conf.example.
- Update port defaults to match cluster_2node.conf (23791/23801/20171)
  and call out that the driver's router_port must not collide with the
  dispatcher port 30001 (cluster_2node.conf uses 30011 for this reason).
- List all shipped configs (10m, 100m, insert_dominant, tikv.toml,
  cluster_*.conf) in the file table.
- Document setup-bins subcommand alongside deploy.
- Flag the Build / Distribute / Run split as a workaround for the
  missing distributed SelectHead/BuildHead implementation, so readers
  don't mistake it for the steady-state design.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The previous wording made it sound like the driver was a stateless
coordinator and workers only talked back to it. Reality: node 0 runs as
worker 0 (owns its hash shard like every other worker) and additionally
hosts the dispatcher; workers talk to each other directly through
PostingRouter for remote append, head sync, and merge hints — no
driver-mediated forwarding. Diagram and 'What run does' steps updated.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
We never actually ran the pre-split/scatter helper in our benchmark
runs. Keeping it in the doc gives the false impression that it's part
of the recommended setup. Remove the whole section.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants