diff --git a/src/bucket/BucketListSnapshot.cpp b/src/bucket/BucketListSnapshot.cpp index 99a44a8dfc..5c36290248 100644 --- a/src/bucket/BucketListSnapshot.cpp +++ b/src/bucket/BucketListSnapshot.cpp @@ -708,6 +708,203 @@ SearchableLiveBucketListSnapshot::scanForEntriesOfType( loopAllBuckets(scanBucket); } +namespace +{ +// Iterator for `BucketEntry`s of a given type in a bucket. Expects the stream +// to be positioned at the start of the type range. This is basically the same +// as SearchableLiveBucketListSnapshot::scanForEntriesOfType's scanBucket except +// with more control over when iteration happens. +class BucketEntryIterator +{ + BucketEntry mEntry; + LedgerKey mKey; + XDRInputFileStream& mStream; + LedgerEntryType const mType; + + public: + BucketEntryIterator(XDRInputFileStream& stream, LedgerEntryType type) + : mStream(stream), mType(type) + { + } + + BucketEntry const& + getEntry() const + { + return mEntry; + } + LedgerKey const& + getKey() const + { + return mKey; + } + bool advance(); +}; + +bool +BucketEntryIterator::advance() +{ + while (mStream.readOne(mEntry)) + { + if (isBucketMetaEntry(mEntry)) + { + continue; + } + mKey = getBucketLedgerKey(mEntry); + if (mKey.type() > mType) + { + break; + } + + if (mKey.type() == mType) + { + return true; + } + } + return false; +} +} // namespace + +void +SearchableLiveBucketListSnapshot::scanForLiveEntriesOfType( + LedgerEntryType type, + std::function callback) const +{ + ZoneScoped; + // We implement this as a k-way merge over all buckets. We use a loser tree + // for this. The benefit over a heap is ~2x fewer comparisons. A loser tree + // is like a single-elimination tournament. The leaves of the tree are the + // iterators, and the internal nodes represent the loser of the comparison + // between the two children. This implementation represents the binary tree + // in an array, where the tournament tree is from indices [1, 2n) (leaves + // are [n, 2n)). Index 0 is used for keeping track of the overall winner. To + // update, we just need to advance the iterator for the winning node and + // then do the log(k) comparisons upward along the path to the root to + // update the losers. While loser trees often store the whole node value at + // intermediate nodes, we just store an index, since copying the XDR types + // is probably more expensive than the extra indirection. + + std::vector iterators; + loopAllBuckets([&iterators, type, + this](std::shared_ptr const& bucket) { + if (bucket->isEmpty()) + { + return Loop::INCOMPLETE; + } + + auto range = bucket->getRangeForType(type); + if (!range) + { + return Loop::INCOMPLETE; + } + + auto& stream = getStream(bucket); + stream.seek(range->first); + + iterators.emplace_back(stream, type); + return Loop::INCOMPLETE; + }); + + if (iterators.empty()) + { + return; + } + + size_t const numIterators = iterators.size(); + + constexpr int exhausted = -1; + std::vector tree; + tree.resize(numIterators * 2); + for (size_t i = 0; i < numIterators; ++i) + { + if (iterators[i].advance()) + { + tree[numIterators + i] = i; + } + else + { + tree[numIterators + i] = exhausted; + } + } + + auto leftWins = [&iterators](int leftIndex, int rightIndex) -> bool { + if (leftIndex == exhausted) + { + return false; + } + if (rightIndex == exhausted) + { + return true; + } + if (auto cmp = compareLedgerKeys(iterators[leftIndex].getKey(), + iterators[rightIndex].getKey()); + cmp != std::partial_ordering::equivalent) + { + releaseAssert(cmp != std::partial_ordering::unordered); + return cmp == std::partial_ordering::less; + } + return leftIndex < rightIndex; + }; + + // Play the match at index i; store the loser, return the winner + auto play = [&tree, &leftWins](auto& play, size_t index) -> int { + if (2 * index >= tree.size()) + { + return tree[index]; + } + int left = play(play, 2 * index); + int right = play(play, 2 * index + 1); + if (leftWins(left, right)) + { + tree[index] = right; + return left; + } + else + { + tree[index] = left; + return right; + } + }; + tree[0] = play(play, 1); + + bool first = true; + LedgerKey last; + while (tree[0] != exhausted) + { + int index = tree[0]; + auto& iter = iterators[index]; + // Deduplicate entries with the same key across buckets + if (auto& key = iter.getKey(); first || key != last) + { + last = key; + auto& entry = iter.getEntry(); + if (entry.type() == LIVEENTRY || entry.type() == INITENTRY) + { + callback(entry.liveEntry(), key); + } + } + first = false; + + if (!iter.advance()) + { + tree[index + numIterators] = exhausted; + } + int winner = tree[index + numIterators]; + + // Update tournament up the tree to the root + int i = (index + numIterators) / 2; + while (i > 0) + { + if (leftWins(tree[i], winner)) + { + std::swap(tree[i], winner); + } + i /= 2; + } + + tree[0] = winner; + } +} + // Helper function to handle scan logic in a single bucket. Loop SearchableLiveBucketListSnapshot::scanForEvictionInBucket( diff --git a/src/bucket/BucketListSnapshot.h b/src/bucket/BucketListSnapshot.h index 0670c622dd..1ea1f48b68 100644 --- a/src/bucket/BucketListSnapshot.h +++ b/src/bucket/BucketListSnapshot.h @@ -234,6 +234,13 @@ class SearchableLiveBucketListSnapshot LedgerEntryType type, std::function callback) const; + // Iterate over all live entries of a given type. Note that this handles + // shadowing and only returns the latest live entry for each key. + void scanForLiveEntriesOfType( + LedgerEntryType type, + std::function callback) + const; + friend class ImmutableLedgerData; friend class ImmutableLedgerView; }; diff --git a/src/bucket/BucketManager.cpp b/src/bucket/BucketManager.cpp index 1b48789108..232f9a874f 100644 --- a/src/bucket/BucketManager.cpp +++ b/src/bucket/BucketManager.cpp @@ -1393,17 +1393,26 @@ BucketManager::assumeState(Application& app, HistoryArchiveState const& has, if (restartMerges) { - mLiveBucketList->restartMerges(app, maxProtocolVersion, - has.currentLedger); - if (has.hasHotArchiveBuckets()) - { - mHotArchiveBucketList->restartMerges(app, maxProtocolVersion, - has.currentLedger); - } + this->restartMerges(app, has, maxProtocolVersion); } cleanupStaleFiles(has); } +void +BucketManager::restartMerges(Application& app, HistoryArchiveState const& has, + uint32_t maxProtocolVersion) +{ + ZoneScoped; + releaseAssert(threadIsMain()); + + mLiveBucketList->restartMerges(app, maxProtocolVersion, has.currentLedger); + if (has.hasHotArchiveBuckets()) + { + mHotArchiveBucketList->restartMerges(app, maxProtocolVersion, + has.currentLedger); + } +} + void BucketManager::shutdown() { diff --git a/src/bucket/BucketManager.h b/src/bucket/BucketManager.h index d67a682fa8..35d64cde1a 100644 --- a/src/bucket/BucketManager.h +++ b/src/bucket/BucketManager.h @@ -398,6 +398,11 @@ class BucketManager : NonMovableOrCopyable void assumeState(Application& app, HistoryArchiveState const& has, uint32_t maxProtocolVersion, bool restartMerges); + // Restart any in-progress merges captured in `has`. Safe to call only + // after `assumeState` has populated the BucketList from the same `has`. + void restartMerges(Application& app, HistoryArchiveState const& has, + uint32_t maxProtocolVersion); + void shutdown(); bool isShutdown() const; diff --git a/src/bucket/LedgerCmp.cpp b/src/bucket/LedgerCmp.cpp new file mode 100644 index 0000000000..463d8bdee6 --- /dev/null +++ b/src/bucket/LedgerCmp.cpp @@ -0,0 +1,82 @@ +// Copyright 2026 Stellar Development Foundation and contributors. Licensed +// under the Apache License, Version 2.0. See the COPYING file at the root +// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 + +#include "bucket/LedgerCmp.h" + +namespace +{ +template +std::partial_ordering +lexComparePartial(T&& lhs1, T&& rhs1) +{ + return lhs1 <=> rhs1; +} + +template +std::partial_ordering +lexComparePartial(T&& lhs1, T&& rhs1, U&&... args) +{ + if (auto cmp = lhs1 <=> rhs1; cmp != std::partial_ordering::equivalent) + { + return cmp; + } + return lexComparePartial(std::forward(args)...); +} +} // namespace + +namespace stellar +{ +std::partial_ordering +compareLedgerKeys(LedgerKey const& a, LedgerKey const& b) +{ + LedgerEntryType aty = a.type(); + LedgerEntryType bty = b.type(); + + if (aty < bty) + return std::partial_ordering::less; + + if (aty > bty) + return std::partial_ordering::greater; + + switch (aty) + { + case ACCOUNT: + return a.account().accountID <=> b.account().accountID; + case TRUSTLINE: + return lexComparePartial(a.trustLine().accountID, + b.trustLine().accountID, a.trustLine().asset, + b.trustLine().asset); + case OFFER: + return lexComparePartial(a.offer().sellerID, b.offer().sellerID, + a.offer().offerID, b.offer().offerID); + case DATA: + return lexComparePartial(a.data().accountID, b.data().accountID, + a.data().dataName, b.data().dataName); + case CLAIMABLE_BALANCE: + return a.claimableBalance().balanceID <=> + b.claimableBalance().balanceID; + case LIQUIDITY_POOL: + return a.liquidityPool().liquidityPoolID <=> + b.liquidityPool().liquidityPoolID; + case CONTRACT_DATA: + { + return lexComparePartial( + a.contractData().contract, b.contractData().contract, + a.contractData().key, b.contractData().key, + a.contractData().durability, b.contractData().durability); + } + case CONTRACT_CODE: + return lexComparePartial(a.contractCode().hash, b.contractCode().hash); + case CONFIG_SETTING: + { + return a.configSetting().configSettingID <=> + b.configSetting().configSettingID; + } + case TTL: + return lexComparePartial(a.ttl().keyHash, b.ttl().keyHash); + } + + return std::partial_ordering::unordered; +} +} // namespace stellar diff --git a/src/bucket/LedgerCmp.h b/src/bucket/LedgerCmp.h index 1cab91a98c..1a7299f5e2 100644 --- a/src/bucket/LedgerCmp.h +++ b/src/bucket/LedgerCmp.h @@ -4,6 +4,7 @@ #pragma once +#include #include #include "bucket/BucketUtils.h" @@ -123,6 +124,10 @@ struct LedgerEntryIdCmp } }; +// Like LedgerEntryIdCmp, but only compares LedgerKeys, and does a 3-way +// comparison instead of a less-than. +std::partial_ordering compareLedgerKeys(LedgerKey const& a, LedgerKey const& b); + /** * Compare two BucketEntries for identity by comparing their respective * LedgerEntries (ignoring their hashes, as the LedgerEntryIdCmp ignores their diff --git a/src/ledger/ImmutableLedgerView.cpp b/src/ledger/ImmutableLedgerView.cpp index c4a7a3d8ea..50a4989e43 100644 --- a/src/ledger/ImmutableLedgerView.cpp +++ b/src/ledger/ImmutableLedgerView.cpp @@ -462,6 +462,14 @@ ImmutableLedgerView::scanLiveEntriesOfType( mLiveSnapshot.scanForEntriesOfType(type, std::move(callback)); } +void +ImmutableLedgerView::scanCurrentLiveEntriesOfType( + LedgerEntryType type, + std::function callback) const +{ + mLiveSnapshot.scanForLiveEntriesOfType(type, std::move(callback)); +} + // === Hot Archive BucketList wrapper methods === std::shared_ptr diff --git a/src/ledger/ImmutableLedgerView.h b/src/ledger/ImmutableLedgerView.h index 98ccf4dfe1..4a20ee5bde 100644 --- a/src/ledger/ImmutableLedgerView.h +++ b/src/ledger/ImmutableLedgerView.h @@ -169,10 +169,20 @@ class ImmutableLedgerView : public virtual AbstractLedgerView uint32_t ledgerSeq, EvictionMetrics& metrics, EvictionIterator iter, std::shared_ptr stats, StateArchivalSettings const& sas, uint32_t ledgerVers) const; + + // Scan the live bucket list for entries of a given type. Note this iterates + // over all BucketEntry, so some may be shadowed and outdated. void scanLiveEntriesOfType( LedgerEntryType type, std::function callback) const; + // Scan the live bucket list for entries of a given type. Calls callback + // with the latest live version for each entry. + void scanCurrentLiveEntriesOfType( + LedgerEntryType type, + std::function callback) + const; + // === Hot Archive BucketList methods === std::shared_ptr loadArchiveEntry(LedgerKey const& k) const; @@ -211,6 +221,7 @@ class ApplyLedgerView : private ImmutableLedgerView, using ImmutableLedgerView::loadLiveKeysFromLedger; using ImmutableLedgerView::loadPoolShareTrustLinesByAccountAndAsset; using ImmutableLedgerView::scanAllArchiveEntries; + using ImmutableLedgerView::scanCurrentLiveEntriesOfType; using ImmutableLedgerView::scanForEviction; using ImmutableLedgerView::scanLiveEntriesOfType; }; diff --git a/src/ledger/InMemorySorobanState.cpp b/src/ledger/InMemorySorobanState.cpp index 9a71cd52f5..cf0e1e690e 100644 --- a/src/ledger/InMemorySorobanState.cpp +++ b/src/ledger/InMemorySorobanState.cpp @@ -456,75 +456,26 @@ InMemorySorobanState::initializeStateFromSnapshot( if (protocolVersionStartsFrom(ledgerVersion, SOROBAN_PROTOCOL_VERSION)) { auto sorobanConfig = SorobanNetworkConfig::loadFromLedger(applyView); - // Check if entry is a DEADENTRY and add it to deletedKeys. Otherwise, - // check if the entry is shadowed by a DEADENTRY. - std::unordered_set deletedKeys; - auto shouldAddToMap = [&deletedKeys](BucketEntry const& be, - LedgerEntryType expectedType) { - if (be.type() == DEADENTRY) - { - deletedKeys.insert(be.deadEntry()); - return false; - } - - releaseAssertOrThrow(be.type() == LIVEENTRY || - be.type() == INITENTRY); - auto lk = LedgerEntryKey(be.liveEntry()); - releaseAssertOrThrow(lk.type() == expectedType); - return deletedKeys.find(lk) == deletedKeys.end(); + auto contractDataHandler = [this](LedgerEntry const& le, + LedgerKey const&) { + createContractDataEntry(le); }; - auto contractDataHandler = [this, - &shouldAddToMap](BucketEntry const& be) { - if (!shouldAddToMap(be, CONTRACT_DATA)) - { - return Loop::INCOMPLETE; - } - - auto lk = LedgerEntryKey(be.liveEntry()); - if (!get(lk)) - { - createContractDataEntry(be.liveEntry()); - } - - return Loop::INCOMPLETE; + auto ttlHandler = [this](LedgerEntry const& le, LedgerKey const&) { + createTTL(le); }; - auto ttlHandler = [this, &shouldAddToMap](BucketEntry const& be) { - if (!shouldAddToMap(be, TTL)) - { - return Loop::INCOMPLETE; - } - - auto lk = LedgerEntryKey(be.liveEntry()); - if (!hasTTL(lk)) - { - createTTL(be.liveEntry()); - } - - return Loop::INCOMPLETE; - }; - - auto contractCodeHandler = [this, &shouldAddToMap, &sorobanConfig, - ledgerVersion](BucketEntry const& be) { - if (!shouldAddToMap(be, CONTRACT_CODE)) - { - return Loop::INCOMPLETE; - } - - auto lk = LedgerEntryKey(be.liveEntry()); - if (!get(lk)) - { - createContractCodeEntry(be.liveEntry(), sorobanConfig, - ledgerVersion); - } - - return Loop::INCOMPLETE; + auto contractCodeHandler = [this, &sorobanConfig, + ledgerVersion](LedgerEntry const& le, + LedgerKey const&) { + createContractCodeEntry(le, sorobanConfig, ledgerVersion); }; - applyView.scanLiveEntriesOfType(CONTRACT_DATA, contractDataHandler); - applyView.scanLiveEntriesOfType(TTL, ttlHandler); - applyView.scanLiveEntriesOfType(CONTRACT_CODE, contractCodeHandler); + applyView.scanCurrentLiveEntriesOfType(CONTRACT_DATA, + contractDataHandler); + applyView.scanCurrentLiveEntriesOfType(TTL, ttlHandler); + applyView.scanCurrentLiveEntriesOfType(CONTRACT_CODE, + contractCodeHandler); } mLastClosedLedgerSeq = lclHeader.ledgerSeq; diff --git a/src/ledger/LedgerManagerImpl.cpp b/src/ledger/LedgerManagerImpl.cpp index 53bbea4e27..e0aebaaa50 100644 --- a/src/ledger/LedgerManagerImpl.cpp +++ b/src/ledger/LedgerManagerImpl.cpp @@ -581,13 +581,11 @@ LedgerManagerImpl::loadLastKnownLedgerInternal(bool skipBuildingFullState) throw std::runtime_error("Bucket directory is corrupt"); } - // Only restart merges in full startup mode. Many modes in core - // (standalone offline commands, in-memory setup) do not need to - // spin up expensive merge processes. auto assumeStart = mApp.getClock().now(); + // We don't restart merges here so that in-memory state population is + // faster. auto assumeStateWork = mApp.getWorkScheduler().executeWork( - has, latestLedgerHeader->ledgerVersion, - /* restartMerges */ !skipBuildingFullState); + has, latestLedgerHeader->ledgerVersion, /* restartMerges */ false); if (assumeStateWork->getState() == BasicWork::State::WORK_SUCCESS) { std::chrono::duration assumeSecs = @@ -630,6 +628,12 @@ LedgerManagerImpl::loadLastKnownLedgerInternal(bool skipBuildingFullState) maybeRunSnapshotInvariantFromLedgerState(copyApplyLedgerView(), /* runInParallel */ false); + + // Only restart merges in full startup mode. Many modes in core + // (standalone offline commands, in-memory setup) do not need to spin up + // expensive merge processes. + mApp.getBucketManager().restartMerges( + mApp, has, latestLedgerHeader->ledgerVersion); } mApplyState.markEndOfSetupPhase();