Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/apply-load-benchmark-sac.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ APPLY_LOAD_TIME_WRITES = true
# eventually, it is useful to disable these when optimizing anything besides
# the metrics.
DISABLE_SOROBAN_METRICS_FOR_TESTING = true
# Disable transaction metadata collection (BUILD_TESTS forces it otherwise)
DISABLE_TX_META_FOR_TESTING = true
# Disable metadata output
METADATA_OUTPUT_STREAM = ""
# Disable metadata debug
Expand Down
2 changes: 2 additions & 0 deletions docs/apply-load-benchmark-token.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ APPLY_LOAD_TIME_WRITES = true
# eventually, it is useful to disable these when optimizing anything besides
# the metrics.
DISABLE_SOROBAN_METRICS_FOR_TESTING = true
# Disable transaction metadata collection (BUILD_TESTS forces it otherwise)
DISABLE_TX_META_FOR_TESTING = true
# Disable metadata output
METADATA_OUTPUT_STREAM = ""
# Disable metadata debug
Expand Down
2 changes: 2 additions & 0 deletions scripts/apply_load/aws/apply-load-aws-benchmark.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ APPLY_LOAD_TIME_WRITES = true
# eventually, it is useful to disable these when optimizing anything besides
# the metrics.
DISABLE_SOROBAN_METRICS_FOR_TESTING = true
# Disable transaction metadata collection (BUILD_TESTS forces it otherwise)
DISABLE_TX_META_FOR_TESTING = true
# Disable metadata output
METADATA_OUTPUT_STREAM = ""
# Disable metadata debug
Expand Down
2 changes: 2 additions & 0 deletions scripts/apply_load/aws/apply-load-aws-ledger-limits.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ LOG_FILE_PATH="{log_file_path}"
# eventually, it is useful to disable these when optimizing anything besides
# the metrics.
DISABLE_SOROBAN_METRICS_FOR_TESTING = false
# Disable transaction metadata collection (BUILD_TESTS forces it otherwise)
DISABLE_TX_META_FOR_TESTING = true
# Disable metadata output
METADATA_OUTPUT_STREAM = ""
# Disable metadata debug
Expand Down
2 changes: 2 additions & 0 deletions scripts/apply_load/aws/apply-load-aws-max-sac-tps.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ APPLY_LOAD_TIME_WRITES = true
# eventually, it is useful to disable these when optimizing anything besides
# the metrics.
DISABLE_SOROBAN_METRICS_FOR_TESTING = true
# Disable transaction metadata collection (BUILD_TESTS forces it otherwise)
DISABLE_TX_META_FOR_TESTING = true
# Disable metadata output
METADATA_OUTPUT_STREAM = ""
# Disable metadata debug
Expand Down
9 changes: 7 additions & 2 deletions src/bucket/BucketOutputIterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ template <typename BucketT>
std::shared_ptr<BucketT>
BucketOutputIterator<BucketT>::getBucket(
BucketManager& bucketManager, MergeKey* mergeKey,
std::unique_ptr<std::vector<typename BucketT::EntryT>> inMemoryState)
std::unique_ptr<std::vector<typename BucketT::EntryT>> inMemoryState,
std::shared_ptr<typename BucketT::IndexT const> preBuiltIndex)
{
ZoneScoped;
if (mBuf)
Expand Down Expand Up @@ -219,7 +220,11 @@ BucketOutputIterator<BucketT>::getBucket(

if (!index)
{
if constexpr (std::is_same_v<BucketT, LiveBucket>)
if (preBuiltIndex)
{
index = std::move(preBuiltIndex);
}
else if constexpr (std::is_same_v<BucketT, LiveBucket>)
{
if (inMemoryState)
{
Expand Down
2 changes: 2 additions & 0 deletions src/bucket/BucketOutputIterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ template <typename BucketT> class BucketOutputIterator
std::shared_ptr<BucketT> getBucket(
BucketManager& bucketManager, MergeKey* mergeKey = nullptr,
std::unique_ptr<std::vector<typename BucketT::EntryT>> inMemoryState =
nullptr,
std::shared_ptr<typename BucketT::IndexT const> preBuiltIndex =
nullptr);
};
}
32 changes: 27 additions & 5 deletions src/bucket/LiveBucket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "bucket/BucketOutputIterator.h"
#include "bucket/BucketUtils.h"
#include "bucket/LedgerCmp.h"
#include <future>
#include <medida/counter.h>

namespace stellar
Expand Down Expand Up @@ -587,29 +588,50 @@ LiveBucket::mergeInMemory(BucketManager& bucketManager,
mergedEntries.emplace_back(entry);
};

mergeInternal(bucketManager, inputSource, putFunc, maxProtocolVersion, mc,
shadowIterators, keepShadowedLifecycleEntries);
{
ZoneNamedN(zoneMerge, "mergeInMemory merge", true);
mergeInternal(bucketManager, inputSource, putFunc, maxProtocolVersion,
mc, shadowIterators, keepShadowedLifecycleEntries);
}

if (countMergeEvents)
{
bucketManager.incrMergeCounters<LiveBucket>(mc);
}

// Start index construction on worker thread — reads mergedEntries (const),
// completely independent of the put loop's serialize/hash/write work.
auto indexFuture = std::async(std::launch::async, [&]() {
return std::make_shared<LiveBucketIndex>(bucketManager, mergedEntries,
meta);
});

// Write merge output to a bucket and save to disk
LiveBucketOutputIterator out(bucketManager.getTmpDir(),
/*keepTombstoneEntries=*/true, meta, mc, ctx,
doFsync);

for (auto const& e : mergedEntries)
{
out.put(e);
ZoneNamedN(zonePut, "mergeInMemory put loop", true);
for (auto const& e : mergedEntries)
{
out.put(e);
}
}

// Collect the pre-built index
std::shared_ptr<LiveBucketIndex const> preBuiltIndex;
{
ZoneNamedN(zoneWait, "mergeInMemory index future wait", true);
preBuiltIndex = indexFuture.get();
}

// Store the merged entries in memory in the new bucket in case this
// bucket sees another incoming merge as level 0 curr.
return out.getBucket(
bucketManager, nullptr,
std::make_unique<std::vector<BucketEntry>>(std::move(mergedEntries)));
std::make_unique<std::vector<BucketEntry>>(std::move(mergedEntries)),
std::move(preBuiltIndex));
}

BucketEntryCounters const&
Expand Down
99 changes: 50 additions & 49 deletions src/crypto/SecretKey.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "util/Math.h"
#include "util/RandomEvictionCache.h"
#include <Tracy.hpp>
#include <array>
#include <atomic>
#include <chrono>
#include <memory>
#include <mutex>
Expand All @@ -41,16 +43,29 @@ namespace stellar
// to the state of the process; caching its results centrally
// makes all signature-verification in the program faster and
// has no effect on correctness.
//
// The cache is sharded across NUM_VERIFY_CACHE_SHARDS shards to
// reduce mutex contention when multiple threads verify signatures
// in parallel. Each shard has its own mutex and cache partition.

constexpr size_t VERIFY_SIG_CACHE_SIZE = 250'000;
static std::mutex gVerifySigCacheMutex;
static RandomEvictionCache<Hash, bool> gVerifySigCache(VERIFY_SIG_CACHE_SIZE);
static uint64_t gVerifyCacheHit = 0;
static uint64_t gVerifyCacheMiss = 0;
constexpr size_t NUM_VERIFY_CACHE_SHARDS = 16;
constexpr size_t VERIFY_SIG_CACHE_SHARD_SIZE =
VERIFY_SIG_CACHE_SIZE / NUM_VERIFY_CACHE_SHARDS;

struct VerifySigCacheShard
{
std::mutex mMutex;
RandomEvictionCache<Hash, bool> mCache;
VerifySigCacheShard() : mCache(VERIFY_SIG_CACHE_SHARD_SIZE)
{
}
};

// Global flag to use Rust ed25519-dalek for signature verification
// Protected by gVerifySigCacheMutex
static bool gUseRustDalekVerify = false;
static std::array<VerifySigCacheShard, NUM_VERIFY_CACHE_SHARDS>
gVerifySigCacheShards;
static std::atomic<uint64_t> gVerifyCacheHit{0};
static std::atomic<uint64_t> gVerifyCacheMiss{0};

static Hash
verifySigCacheKey(PublicKey const& key, Signature const& signature,
Expand Down Expand Up @@ -322,32 +337,29 @@ SecretKey::fromStrKeySeed(std::string const& strKeySeed)
void
PubKeyUtils::clearVerifySigCache()
{
std::lock_guard<std::mutex> guard(gVerifySigCacheMutex);
gVerifySigCache.clear();
}

void
PubKeyUtils::enableRustDalekVerify()
{
std::lock_guard<std::mutex> guard(gVerifySigCacheMutex);
gUseRustDalekVerify = true;
for (auto& shard : gVerifySigCacheShards)
{
std::lock_guard<std::mutex> guard(shard.mMutex);
shard.mCache.clear();
}
}

void
PubKeyUtils::seedVerifySigCache(unsigned int seed)
{
std::lock_guard<std::mutex> guard(gVerifySigCacheMutex);
gVerifySigCache.seed(seed);
for (size_t i = 0; i < NUM_VERIFY_CACHE_SHARDS; ++i)
{
std::lock_guard<std::mutex> guard(gVerifySigCacheShards[i].mMutex);
gVerifySigCacheShards[i].mCache.seed(seed +
static_cast<unsigned int>(i));
}
}

void
PubKeyUtils::flushVerifySigCacheCounts(uint64_t& hits, uint64_t& misses)
{
std::lock_guard<std::mutex> guard(gVerifySigCacheMutex);
hits = gVerifyCacheHit;
misses = gVerifyCacheMiss;
gVerifyCacheHit = 0;
gVerifyCacheMiss = 0;
hits = gVerifyCacheHit.exchange(0, std::memory_order_relaxed);
misses = gVerifyCacheMiss.exchange(0, std::memory_order_relaxed);
}

std::string
Expand Down Expand Up @@ -456,41 +468,30 @@ PubKeyUtils::verifySig(PublicKey const& key, Signature const& signature,
}

auto cacheKey = verifySigCacheKey(key, signature, bin);
bool shouldUseRustDalekVerify;

// Select shard based on cache key hash to distribute lock contention
auto shardIdx = std::hash<Hash>{}(cacheKey) % NUM_VERIFY_CACHE_SHARDS;
auto& shard = gVerifySigCacheShards[shardIdx];

{
std::lock_guard<std::mutex> guard(gVerifySigCacheMutex);
if (gVerifySigCache.exists(cacheKey))
std::lock_guard<std::mutex> guard(shard.mMutex);
if (auto* cached = shard.mCache.maybeGet(cacheKey))
{
++gVerifyCacheHit;
std::string hitStr("hit");
ZoneText(hitStr.c_str(), hitStr.size());
return {gVerifySigCache.get(cacheKey),
VerifySigCacheLookupResult::HIT};
gVerifyCacheHit.fetch_add(1, std::memory_order_relaxed);
ZoneText("hit", 3);
return {*cached, VerifySigCacheLookupResult::HIT};
}

shouldUseRustDalekVerify = gUseRustDalekVerify;
}

std::string missStr("miss");
ZoneText(missStr.c_str(), missStr.size());
ZoneText("miss", 4);

bool ok;
if (shouldUseRustDalekVerify)
{
ok = stellar::rust_bridge::verify_ed25519_signature_dalek(
key.ed25519().data(), signature.data(), bin.data(), bin.size());
}
else
bool ok = stellar::rust_bridge::verify_ed25519_signature_dalek(
key.ed25519().data(), signature.data(), bin.data(), bin.size());
{
ok = (crypto_sign_verify_detached(signature.data(), bin.data(),
bin.size(),
key.ed25519().data()) == 0);
std::lock_guard<std::mutex> guard(shard.mMutex);
gVerifyCacheMiss.fetch_add(1, std::memory_order_relaxed);
shard.mCache.put(cacheKey, ok);
}

std::lock_guard<std::mutex> guard(gVerifySigCacheMutex);
++gVerifyCacheMiss;
gVerifySigCache.put(cacheKey, ok);
return {ok, VerifySigCacheLookupResult::MISS};
}

Expand Down
7 changes: 0 additions & 7 deletions src/crypto/SecretKey.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,6 @@ void clearVerifySigCache();
void seedVerifySigCache(unsigned int seed);
void flushVerifySigCacheCounts(uint64_t& hits, uint64_t& misses);

// Enable Rust ed25519-dalek for signature verification
// Once enabled, it cannot be disabled. It should be enabled at the protocol 24
// boundary.
// Note: This should be removed following the protocol 24 upgrade, rust ed25519
// can be used unconditionally after upgrade, even for replay.
void enableRustDalekVerify();

PublicKey random();
#ifdef BUILD_TESTS
PublicKey pseudoRandomForTesting();
Expand Down
1 change: 0 additions & 1 deletion src/crypto/test/CryptoTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1630,7 +1630,6 @@ ZcashTestVector const ZCASH_TEST_VECTORS[196] = {

TEST_CASE("Ed25519 test vectors from Zcash", "[crypto]")
{
PubKeyUtils::enableRustDalekVerify();
for (auto const& tv : ZCASH_TEST_VECTORS)
{
PublicKey pk;
Expand Down
1 change: 0 additions & 1 deletion src/herder/Upgrades.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1249,7 +1249,6 @@ Upgrades::applyVersionUpgrade(Application& app, AbstractLedgerTxn& ltx,
}
if (needUpgradeToVersion(ProtocolVersion::V_25, prevVersion, newVersion))
{
PubKeyUtils::enableRustDalekVerify();
SorobanNetworkConfig::createCostTypesForV25(ltx, app);
}
if (needUpgradeToVersion(ProtocolVersion::V_26, prevVersion, newVersion))
Expand Down
13 changes: 9 additions & 4 deletions src/invariant/test/InvariantTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,9 @@ TEST_CASE("BucketList state consistency invariant", "[invariant]")
auto ttlData = entryData.ttlData;
modifiedState.mContractDataEntries.erase(it);
modifiedState.mContractDataEntries.emplace(
InternalContractDataMapEntry(modifiedEntry, ttlData));
InternalContractDataMapEntry(
modifiedEntry, ttlData,
static_cast<uint32_t>(xdr::xdr_size(modifiedEntry))));
}

auto result =
Expand Down Expand Up @@ -711,7 +713,9 @@ TEST_CASE("BucketList state consistency invariant", "[invariant]")
createContractDataWithTTL(PERSISTENT, 1000);
TTLData ttlData(extraTTL.data.ttl().liveUntilLedgerSeq, 1);
modifiedState.mContractDataEntries.emplace(
InternalContractDataMapEntry(extraEntry, ttlData));
InternalContractDataMapEntry(
extraEntry, ttlData,
static_cast<uint32_t>(xdr::xdr_size(extraEntry))));
}

auto result =
Expand Down Expand Up @@ -741,8 +745,9 @@ TEST_CASE("BucketList state consistency invariant", "[invariant]")

TTLData wrongTTL(42, 1);
modifiedState.mContractDataEntries.erase(it);
modifiedState.mContractDataEntries.emplace(
InternalContractDataMapEntry(entryCopy, wrongTTL));
modifiedState.mContractDataEntries.emplace(InternalContractDataMapEntry(
entryCopy, wrongTTL,
static_cast<uint32_t>(xdr::xdr_size(entryCopy))));

auto result =
invariant.checkSnapshot(makeSnap(), modifiedState, noopIsStopping);
Expand Down
Loading
Loading