Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 197 additions & 0 deletions src/bucket/BucketListSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,203 @@ SearchableLiveBucketListSnapshot::scanForEntriesOfType(
loopAllBuckets(scanBucket);
}

namespace
{
// Iterator for `BucketEntry`s of a given type in a bucket. Expects the stream
// to be positioned at the start of the type range. This is basically the same
// as SearchableLiveBucketListSnapshot::scanForEntriesOfType's scanBucket except
// with more control over when iteration happens.
class BucketEntryIterator
{
BucketEntry mEntry;
LedgerKey mKey;
XDRInputFileStream& mStream;
LedgerEntryType const mType;

public:
BucketEntryIterator(XDRInputFileStream& stream, LedgerEntryType type)
: mStream(stream), mType(type)
{
}

BucketEntry const&
getEntry() const
{
return mEntry;
}
LedgerKey const&
getKey() const
{
return mKey;
}
bool advance();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Stylistically this is a little weird. Can we either inline the advance or just move the declaration to .h?

};

bool
BucketEntryIterator::advance()
{
while (mStream.readOne(mEntry))
{
if (isBucketMetaEntry<LiveBucket>(mEntry))
{
continue;
}
mKey = getBucketLedgerKey(mEntry);
if (mKey.type() > mType)
{
break;
}

if (mKey.type() == mType)
{
return true;
}
}
return false;
}
} // namespace

void
SearchableLiveBucketListSnapshot::scanForLiveEntriesOfType(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks generally correct, but it's adding a good amount of complexity, I'd like to see a unit test specifically for this scanning function. Before it was pretty straight forward and indirectly tested, but given the k-way merge I think a more explicit test is warranted. Maybe we can test some of the loser tree edge cases, like a degenerate merge with just 1 bucket, 2 buckets, and some non powers of two. It might also be a good idea to hook this into the randomized bucket testing infra LedgerStateSnapshotTests,cpp or BucketIndexTests.cpp, where we just make sure we hit all the entries properly.

LedgerEntryType type,
std::function<void(LedgerEntry const&, LedgerKey const&)> callback) const
{

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably have a ZoneScoped

ZoneScoped;
// We implement this as a k-way merge over all buckets. We use a loser tree
// for this. The benefit over a heap is ~2x fewer comparisons. A loser tree
// is like a single-elimination tournament. The leaves of the tree are the
// iterators, and the internal nodes represent the loser of the comparison
// between the two children. This implementation represents the binary tree
// in an array, where the tournament tree is from indices [1, 2n) (leaves
// are [n, 2n)). Index 0 is used for keeping track of the overall winner. To
// update, we just need to advance the iterator for the winning node and
// then do the log(k) comparisons upward along the path to the root to
// update the losers. While loser trees often store the whole node value at
// intermediate nodes, we just store an index, since copying the XDR types
// is probably more expensive than the extra indirection.

std::vector<BucketEntryIterator> iterators;
loopAllBuckets([&iterators, type,
this](std::shared_ptr<LiveBucket const> const& bucket) {
if (bucket->isEmpty())
{
return Loop::INCOMPLETE;
}

auto range = bucket->getRangeForType(type);
if (!range)
{
return Loop::INCOMPLETE;
}

auto& stream = getStream(bucket);
stream.seek(range->first);

iterators.emplace_back(stream, type);
return Loop::INCOMPLETE;
});

if (iterators.empty())
{
return;
}

size_t const numIterators = iterators.size();

constexpr int exhausted = -1;
std::vector<int> tree;
tree.resize(numIterators * 2);
for (size_t i = 0; i < numIterators; ++i)
{
if (iterators[i].advance())
{
tree[numIterators + i] = i;
}
else
{
tree[numIterators + i] = exhausted;
}
}

auto leftWins = [&iterators](int leftIndex, int rightIndex) -> bool {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More comments here would be helpful, this indicates that the smaller, newer version index wins, right?

if (leftIndex == exhausted)
{
return false;
}
if (rightIndex == exhausted)
{
return true;
}
if (auto cmp = compareLedgerKeys(iterators[leftIndex].getKey(),
iterators[rightIndex].getKey());
cmp != std::partial_ordering::equivalent)
{
releaseAssert(cmp != std::partial_ordering::unordered);
return cmp == std::partial_ordering::less;
}
return leftIndex < rightIndex;
};

// Play the match at index i; store the loser, return the winner
auto play = [&tree, &leftWins](auto& play, size_t index) -> int {
if (2 * index >= tree.size())
{
return tree[index];
}
int left = play(play, 2 * index);
int right = play(play, 2 * index + 1);
if (leftWins(left, right))
{
tree[index] = right;
return left;
}
else
{
tree[index] = left;
return right;
}
};
tree[0] = play(play, 1);

bool first = true;
LedgerKey last;
while (tree[0] != exhausted)
{
int index = tree[0];
auto& iter = iterators[index];
// Deduplicate entries with the same key across buckets
if (auto& key = iter.getKey(); first || key != last)
{
last = key;
auto& entry = iter.getEntry();
if (entry.type() == LIVEENTRY || entry.type() == INITENTRY)
{
callback(entry.liveEntry(), key);
}
}
first = false;

if (!iter.advance())
{
tree[index + numIterators] = exhausted;
}
int winner = tree[index + numIterators];

// Update tournament up the tree to the root
int i = (index + numIterators) / 2;
while (i > 0)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This while could be a for loop, which to me reads a little cleaner.

{
if (leftWins(tree[i], winner))
{
std::swap(tree[i], winner);
}
i /= 2;
}

tree[0] = winner;
Comment on lines +869 to +904

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add some comments to this? It's a little hard to figure out what's going on here. This is the actual k-way merge, right?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, it's still a little opaque.

}
}

// Helper function to handle scan logic in a single bucket.
Loop
SearchableLiveBucketListSnapshot::scanForEvictionInBucket(
Expand Down
7 changes: 7 additions & 0 deletions src/bucket/BucketListSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,13 @@ class SearchableLiveBucketListSnapshot
LedgerEntryType type,
std::function<Loop(BucketEntry const&)> callback) const;

// Iterate over all live entries of a given type. Note that this handles
// shadowing and only returns the latest live entry for each key.
void scanForLiveEntriesOfType(
LedgerEntryType type,
std::function<void(LedgerEntry const&, LedgerKey const&)> callback)
const;

friend class ImmutableLedgerData;
friend class ImmutableLedgerView;
};
Expand Down
23 changes: 16 additions & 7 deletions src/bucket/BucketManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1393,17 +1393,26 @@ BucketManager::assumeState(Application& app, HistoryArchiveState const& has,

if (restartMerges)
{
mLiveBucketList->restartMerges(app, maxProtocolVersion,
has.currentLedger);
if (has.hasHotArchiveBuckets())
{
mHotArchiveBucketList->restartMerges(app, maxProtocolVersion,
has.currentLedger);
}
this->restartMerges(app, has, maxProtocolVersion);
}
cleanupStaleFiles(has);
}

void
BucketManager::restartMerges(Application& app, HistoryArchiveState const& has,
uint32_t maxProtocolVersion)
{
ZoneScoped;
releaseAssert(threadIsMain());

mLiveBucketList->restartMerges(app, maxProtocolVersion, has.currentLedger);
if (has.hasHotArchiveBuckets())
{
mHotArchiveBucketList->restartMerges(app, maxProtocolVersion,
has.currentLedger);
}
}

void
BucketManager::shutdown()
{
Expand Down
5 changes: 5 additions & 0 deletions src/bucket/BucketManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,11 @@ class BucketManager : NonMovableOrCopyable
void assumeState(Application& app, HistoryArchiveState const& has,
uint32_t maxProtocolVersion, bool restartMerges);

// Restart any in-progress merges captured in `has`. Safe to call only
// after `assumeState` has populated the BucketList from the same `has`.
void restartMerges(Application& app, HistoryArchiveState const& has,
uint32_t maxProtocolVersion);

void shutdown();

bool isShutdown() const;
Expand Down
82 changes: 82 additions & 0 deletions src/bucket/LedgerCmp.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2026 Stellar Development Foundation and contributors. Licensed
// under the Apache License, Version 2.0. See the COPYING file at the root
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#include "bucket/LedgerCmp.h"

namespace
{
template <typename T>
std::partial_ordering
lexComparePartial(T&& lhs1, T&& rhs1)
{
return lhs1 <=> rhs1;
}

template <typename T, typename... U>
std::partial_ordering
lexComparePartial(T&& lhs1, T&& rhs1, U&&... args)
{
if (auto cmp = lhs1 <=> rhs1; cmp != std::partial_ordering::equivalent)
{
return cmp;
}
return lexComparePartial(std::forward<U>(args)...);
}
} // namespace

namespace stellar
{
std::partial_ordering
compareLedgerKeys(LedgerKey const& a, LedgerKey const& b)
{
LedgerEntryType aty = a.type();
LedgerEntryType bty = b.type();

if (aty < bty)
return std::partial_ordering::less;

if (aty > bty)
return std::partial_ordering::greater;

switch (aty)
{
case ACCOUNT:
return a.account().accountID <=> b.account().accountID;
case TRUSTLINE:
return lexComparePartial(a.trustLine().accountID,
b.trustLine().accountID, a.trustLine().asset,
b.trustLine().asset);
case OFFER:
return lexComparePartial(a.offer().sellerID, b.offer().sellerID,
a.offer().offerID, b.offer().offerID);
case DATA:
return lexComparePartial(a.data().accountID, b.data().accountID,
a.data().dataName, b.data().dataName);
case CLAIMABLE_BALANCE:
return a.claimableBalance().balanceID <=>
b.claimableBalance().balanceID;
case LIQUIDITY_POOL:
return a.liquidityPool().liquidityPoolID <=>
b.liquidityPool().liquidityPoolID;
case CONTRACT_DATA:
{
return lexComparePartial(
a.contractData().contract, b.contractData().contract,
a.contractData().key, b.contractData().key,
a.contractData().durability, b.contractData().durability);
}
case CONTRACT_CODE:
return lexComparePartial(a.contractCode().hash, b.contractCode().hash);
case CONFIG_SETTING:
{
return a.configSetting().configSettingID <=>
b.configSetting().configSettingID;
}
case TTL:
return lexComparePartial(a.ttl().keyHash, b.ttl().keyHash);
}

return std::partial_ordering::unordered;
}
} // namespace stellar
5 changes: 5 additions & 0 deletions src/bucket/LedgerCmp.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#pragma once

#include <compare>
#include <type_traits>

#include "bucket/BucketUtils.h"
Expand Down Expand Up @@ -123,6 +124,10 @@ struct LedgerEntryIdCmp
}
};

// Like LedgerEntryIdCmp, but only compares LedgerKeys, and does a 3-way
// comparison instead of a less-than.
std::partial_ordering compareLedgerKeys(LedgerKey const& a, LedgerKey const& b);
Comment on lines +127 to +129

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a partial ordering? Does a total ordering not exist for ledger keys?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The compare delegates to the operator <=> from xdrpp (which is important since we do need to match how LedgerEntryIdCmp does the ordering (and since, e.g., the value type in ScVal for CONTRACT_DATA is nested pretty deeply). I'll open a PR in xdrpp to fix it.


/**
* Compare two BucketEntries for identity by comparing their respective
* LedgerEntries (ignoring their hashes, as the LedgerEntryIdCmp ignores their
Expand Down
8 changes: 8 additions & 0 deletions src/ledger/ImmutableLedgerView.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,14 @@ ImmutableLedgerView::scanLiveEntriesOfType(
mLiveSnapshot.scanForEntriesOfType(type, std::move(callback));
}

void
ImmutableLedgerView::scanCurrentLiveEntriesOfType(
LedgerEntryType type,
std::function<void(LedgerEntry const&, LedgerKey const&)> callback) const
{
mLiveSnapshot.scanForLiveEntriesOfType(type, std::move(callback));
}

// === Hot Archive BucketList wrapper methods ===

std::shared_ptr<HotArchiveBucketEntry const>
Expand Down
Loading
Loading