Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/herder/HerderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,12 @@ HerderImpl::recvTransaction(TransactionFrameBasePtr tx, bool submittedFromSelf,
CLOG_TRACE(Herder, "recv transaction {} for {}",
hexAbbrev(tx->getFullHash()),
KeyUtils::toShortString(tx->getSourceID()));
#ifdef BUILD_TESTS
if (submittedFromSelf)
{
mLedgerManager.recordTxSubmission(tx->getContentsHash());
}
#endif

auto const& env = tx->getEnvelope();
mApp.getOverlayManager().broadcastTransaction(env, tx->getFullFee(),
Expand Down
10 changes: 10 additions & 0 deletions src/ledger/LedgerManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,16 @@ class LedgerManager
virtual ::rust::Box<rust_bridge::SorobanModuleCache>
getModuleCacheForTesting() = 0;
virtual uint64_t getSorobanInMemoryStateSizeForTesting() = 0;

// Records the submission time of a self-submitted transaction for the
// tx-latency metrics. No-op unless
// Config::LOADGEN_MEASURE_TX_LATENCY_FOR_TESTING is set.
virtual void recordTxSubmission(Hash const& contentsHash) = 0;

// Begins/ends a load-generation latency measurement window. No-op unless
// Config::LOADGEN_MEASURE_TX_LATENCY_FOR_TESTING is set.
virtual void beginTxLatencyMeasurement(uint32_t expectedTxCount) = 0;
virtual void finalizeTxLatencyMeasurement() = 0;
#endif

// Return the (changing) number of seconds since the LCL closed.
Expand Down
158 changes: 158 additions & 0 deletions src/ledger/LedgerManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@
#include <Tracy.hpp>

#include "LedgerManagerImpl.h"
#include <algorithm>
#include <chrono>
#include <limits>
#include <memory>
#include <optional>
#include <regex>
Expand Down Expand Up @@ -226,6 +228,22 @@ LedgerManagerImpl::LedgerApplyMetrics::LedgerApplyMetrics(
{
}

#ifdef BUILD_TESTS
LedgerManagerImpl::TxLatencyMetrics::TxLatencyMetrics(MetricsRegistry& registry)
: mTxsSubmitted(registry.NewCounter({"loadgen", "tx-latency", "submitted"}))
, mTxsExternalized(
registry.NewCounter({"loadgen", "tx-latency", "externalized"}))
, mLatencyTimer(registry.NewTimer({"loadgen", "tx-latency", "duration"}))
, mRunMin(registry.NewCounter({"loadgen", "tx-latency-run", "min-ms"}))
, mRunMax(registry.NewCounter({"loadgen", "tx-latency-run", "max-ms"}))
, mRunMean(registry.NewCounter({"loadgen", "tx-latency-run", "mean-ms"}))
, mRunP50(registry.NewCounter({"loadgen", "tx-latency-run", "p50-ms"}))
, mRunP75(registry.NewCounter({"loadgen", "tx-latency-run", "p75-ms"}))
, mRunP99(registry.NewCounter({"loadgen", "tx-latency-run", "p99-ms"}))
{
}
#endif

LedgerManagerImpl::ApplyState::ApplyState(Application& app)
: mMetrics(app.getMetrics())
, mAppConnector(app.getAppConnector())
Expand Down Expand Up @@ -349,6 +367,9 @@ LedgerManagerImpl::LedgerManagerImpl(Application& app)
, mCatchupDuration(
app.getMetrics().NewTimer({"ledger", "catchup", "duration"}))
, mState(LM_BOOTING_STATE)
#ifdef BUILD_TESTS
, mTxLatencyMetrics(app.getMetrics())
#endif
{
// At this point, we haven't called assumeState yet, so the BucketLists are
// empty. We will create an "empty" snapshot that is not null, but
Expand Down Expand Up @@ -1308,6 +1329,142 @@ LedgerManagerImpl::emitNextMeta()
mNextMetaToEmit.reset();
}

#ifdef BUILD_TESTS
void
LedgerManagerImpl::recordTxSubmission(Hash const& contentsHash)
{
if (!mApp.getConfig().LOADGEN_MEASURE_TX_LATENCY_FOR_TESTING)
{
return;
}
MutexLocker guard(mTxLatencyMetrics.mMutex);
if (mTxLatencyMetrics.mTxSubmitTimes
.try_emplace(contentsHash, mApp.getClock().now())
.second)
{
mTxLatencyMetrics.mTxsSubmitted.inc();
}
else
{
CLOG_WARNING(Ledger, "Duplicate tx submission recorded for hash {}.",
binToHex(contentsHash));
}
}

void
LedgerManagerImpl::recordTxMetaEmissionLatency(LedgerCloseMeta const& lcm)
{
if (!mApp.getConfig().LOADGEN_MEASURE_TX_LATENCY_FOR_TESTING)
{
return;
}
VirtualClock::time_point const emitTime = mApp.getClock().now();
MutexLocker guard(mTxLatencyMetrics.mMutex);
auto probe = [&](auto const& txProcessing) {
for (auto const& txp : txProcessing)
{
if (auto submitted = mTxLatencyMetrics.mTxSubmitTimes.find(
txp.result.transactionHash);
submitted != mTxLatencyMetrics.mTxSubmitTimes.end())
{
auto const latency = emitTime - submitted->second;
mTxLatencyMetrics.mTxsExternalized.inc();
mTxLatencyMetrics.mLatencyTimer.Update(latency);
int64_t const ms =
std::chrono::duration_cast<std::chrono::milliseconds>(
latency)
.count();
mTxLatencyMetrics.mSamples.push_back(std::clamp<int64_t>(
ms, 0, std::numeric_limits<uint32_t>::max()));
mTxLatencyMetrics.mTxSubmitTimes.erase(submitted);
}
}
};
switch (lcm.v())
{
case 0:
probe(lcm.v0().txProcessing);
break;
case 1:
probe(lcm.v1().txProcessing);
break;
case 2:
probe(lcm.v2().txProcessing);
break;
default:
releaseAssert(false);
}
}

void
LedgerManagerImpl::beginTxLatencyMeasurement(uint32_t expectedTxCount)
{
if (!mApp.getConfig().LOADGEN_MEASURE_TX_LATENCY_FOR_TESTING)
{
return;
}
MutexLocker guard(mTxLatencyMetrics.mMutex);
if (!mTxLatencyMetrics.mTxSubmitTimes.empty())
{
CLOG_WARNING(Ledger,
"Starting tx latency measurement with {} unmatched "
"submissions from prior run",
mTxLatencyMetrics.mTxSubmitTimes.size());
mTxLatencyMetrics.mTxSubmitTimes.clear();
}
mTxLatencyMetrics.mSamples.clear();
mTxLatencyMetrics.mSamples.reserve(expectedTxCount);

// Clear the per-run latency metrics.
mTxLatencyMetrics.mRunMin.clear();
mTxLatencyMetrics.mRunMax.clear();
mTxLatencyMetrics.mRunMean.clear();
mTxLatencyMetrics.mRunP50.clear();
mTxLatencyMetrics.mRunP75.clear();
mTxLatencyMetrics.mRunP99.clear();
}

void
LedgerManagerImpl::finalizeTxLatencyMeasurement()
{
if (!mApp.getConfig().LOADGEN_MEASURE_TX_LATENCY_FOR_TESTING)
{
return;
}
MutexLocker guard(mTxLatencyMetrics.mMutex);
auto& samples = mTxLatencyMetrics.mSamples;
if (samples.empty())
{
CLOG_WARNING(Ledger,
"Finalizing tx latency measurement with no samples");
return;
}
std::sort(samples.begin(), samples.end());
size_t const n = samples.size();

auto percentile = [&](size_t p) -> uint32_t {
// Calculate ceiling of rank
size_t rank = (p * n + 99) / 100;
rank = std::clamp<size_t>(rank, 1, n);
return samples[rank - 1];
};

uint64_t sum = 0;
for (uint32_t s : samples)
{
sum += s;
}

mTxLatencyMetrics.mRunMin.set_count(samples.front());
mTxLatencyMetrics.mRunMax.set_count(samples.back());
// Mean rounded to the nearest millisecond.
mTxLatencyMetrics.mRunMean.set_count((sum + n / 2) / n);
mTxLatencyMetrics.mRunP50.set_count(percentile(50));
mTxLatencyMetrics.mRunP75.set_count(percentile(75));
mTxLatencyMetrics.mRunP99.set_count(percentile(99));
}
#endif

namespace
{
#ifdef BUILD_TESTS
Expand Down Expand Up @@ -1773,6 +1930,7 @@ LedgerManagerImpl::applyLedger(LedgerCloseData const& ledgerData,
appliedLedgerState->getLastClosedLedgerHeader();
// Copy this before we move it into mNextMetaToEmit below
mLastLedgerCloseMeta = *ledgerCloseMeta;
recordTxMetaEmissionLatency(ledgerCloseMeta->getXDR());
}
#endif

Expand Down
33 changes: 33 additions & 0 deletions src/ledger/LedgerManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "transactions/ParallelApplyUtils.h"
#include "transactions/TransactionFrame.h"
#include "util/Math.h"
#include "util/UnorderedMap.h"
#include "util/XDRStream.h"
#include "xdr/Stellar-ledger.h"
#include <atomic>
Expand Down Expand Up @@ -431,6 +432,35 @@ class LedgerManagerImpl : public LedgerManager
std::optional<LedgerCloseMetaFrame> mLastLedgerCloseMeta;
// Local prng for OP_APPLY_SLEEP_TIME_*_FOR_TESTING.
stellar_default_random_engine mApplySleepRng;

// Metrics for measuring tx e2e latency. Active only when
// Config::LOADGEN_MEASURE_TX_LATENCY_FOR_TESTING is set.
struct TxLatencyMetrics
{
// Lifetime totals (cumulative; not reset between runs).
medida::Counter& mTxsSubmitted;
medida::Counter& mTxsExternalized;
medida::Timer& mLatencyTimer;
// Per-run "loadgen.tx-latency-run.*" statistics (ms), reset by
// beginTxLatencyMeasurement.
medida::Counter& mRunMin;
medida::Counter& mRunMax;
medida::Counter& mRunMean;
medida::Counter& mRunP50;
medida::Counter& mRunP75;
medida::Counter& mRunP99;
ANNOTATED_MUTEX(mMutex);
UnorderedMap<Hash, VirtualClock::time_point>
mTxSubmitTimes GUARDED_BY(mMutex);
// Each recorded submission -> meta-emission latency in ms
std::vector<uint32_t> mSamples GUARDED_BY(mMutex);

TxLatencyMetrics(MetricsRegistry& registry);
} mTxLatencyMetrics;

// End point of the tx-latency metric: matches the ledger-close meta's
// txProcessing entries against mTxSubmitTimes and records each latency.
void recordTxMetaEmissionLatency(LedgerCloseMeta const& lcm);
#endif

void setState(State s);
Expand Down Expand Up @@ -533,6 +563,9 @@ class LedgerManagerImpl : public LedgerManager
getModuleCacheForTesting() override;
void rebuildInMemorySorobanStateForTesting(uint32_t ledgerVersion) override;
uint64_t getSorobanInMemoryStateSizeForTesting() override;
void recordTxSubmission(Hash const& contentsHash) override;
void beginTxLatencyMeasurement(uint32_t expectedTxCount) override;
void finalizeTxLatencyMeasurement() override;
#endif

uint64_t secondsSinceLastLedgerClose() const override;
Expand Down
6 changes: 6 additions & 0 deletions src/main/Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ static std::unordered_set<std::string> const TESTING_ONLY_OPTIONS = {
"LOADGEN_TX_SIZE_BYTES_DISTRIBUTION_FOR_TESTING",
"LOADGEN_INSTRUCTIONS_FOR_TESTING",
"LOADGEN_INSTRUCTIONS_DISTRIBUTION_FOR_TESTING",
"LOADGEN_MEASURE_TX_LATENCY_FOR_TESTING",
"CATCHUP_WAIT_MERGES_TX_APPLY_FOR_TESTING",
"ARTIFICIALLY_SET_SURVEY_PHASE_DURATION_FOR_TESTING",
"ARTIFICIALLY_DELAY_BUCKET_APPLICATION_FOR_TESTING",
Expand Down Expand Up @@ -141,6 +142,7 @@ Config::Config() : NODE_SEED(SecretKey::random())
LOADGEN_TX_SIZE_BYTES_DISTRIBUTION_FOR_TESTING = {};
LOADGEN_INSTRUCTIONS_FOR_TESTING = {};
LOADGEN_INSTRUCTIONS_DISTRIBUTION_FOR_TESTING = {};
LOADGEN_MEASURE_TX_LATENCY_FOR_TESTING = false;
CATCHUP_WAIT_MERGES_TX_APPLY_FOR_TESTING = false;
ARTIFICIALLY_SET_SURVEY_PHASE_DURATION_FOR_TESTING =
std::chrono::minutes::zero();
Expand Down Expand Up @@ -1676,6 +1678,10 @@ Config::processConfig(std::shared_ptr<cpptoml::table> t)
LOADGEN_INSTRUCTIONS_DISTRIBUTION_FOR_TESTING =
readIntArray<uint32_t>(item);
}},
{"LOADGEN_MEASURE_TX_LATENCY_FOR_TESTING",
[&]() {
LOADGEN_MEASURE_TX_LATENCY_FOR_TESTING = readBool(item);
}},
#ifdef BUILD_TESTS
{"OP_APPLY_SLEEP_TIME_DURATION_FOR_TESTING",
[&]() {
Expand Down
4 changes: 4 additions & 0 deletions src/main/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,10 @@ class Config : public std::enable_shared_from_this<Config>
std::vector<uint32_t> LOADGEN_INSTRUCTIONS_FOR_TESTING;
std::vector<uint32_t> LOADGEN_INSTRUCTIONS_DISTRIBUTION_FOR_TESTING;

// Defaults to false. When set, measure the latency of self-submitted
// transactions from submission to meta generation.
bool LOADGEN_MEASURE_TX_LATENCY_FOR_TESTING;

#ifdef BUILD_TESTS
// Config parameters that force transaction application during ledger
// close to sleep for a certain amount of time.
Expand Down
3 changes: 3 additions & 0 deletions src/simulation/LoadGenerator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ LoadGenerator::start(GeneratedLoadConfig& cfg)
0));
}
}
mApp.getLedgerManager().beginTxLatencyMeasurement(cfg.nTxs);
mStarted = true;
}

Expand Down Expand Up @@ -1342,6 +1343,7 @@ LoadGenerator::waitTillComplete(GeneratedLoadConfig cfg)
if (checkMinimumSorobanSuccess(cfg))
{
CLOG_INFO(LoadGen, "Load generation complete.");
mApp.getLedgerManager().finalizeTxLatencyMeasurement();
mLoadgenComplete.Mark();
reset();
}
Expand Down Expand Up @@ -1423,6 +1425,7 @@ LoadGenerator::waitTillCompleteWithoutChecks()
"for high traffic due to tx queue limiter evictions.",
inconsistencies.size());
}
mApp.getLedgerManager().finalizeTxLatencyMeasurement();
mLoadgenComplete.Mark();
reset();
return;
Expand Down
Loading