diff --git a/src/herder/HerderImpl.cpp b/src/herder/HerderImpl.cpp index 984b59f6c..2564668a5 100644 --- a/src/herder/HerderImpl.cpp +++ b/src/herder/HerderImpl.cpp @@ -651,6 +651,12 @@ HerderImpl::recvTransaction(TransactionFrameBasePtr tx, bool submittedFromSelf, CLOG_TRACE(Herder, "recv transaction {} for {}", hexAbbrev(tx->getFullHash()), KeyUtils::toShortString(tx->getSourceID())); +#ifdef BUILD_TESTS + if (submittedFromSelf) + { + mLedgerManager.recordTxSubmission(tx->getContentsHash()); + } +#endif auto const& env = tx->getEnvelope(); mApp.getOverlayManager().broadcastTransaction(env, tx->getFullFee(), diff --git a/src/ledger/LedgerManager.h b/src/ledger/LedgerManager.h index 56f6b25a7..80e5a7339 100644 --- a/src/ledger/LedgerManager.h +++ b/src/ledger/LedgerManager.h @@ -299,6 +299,16 @@ class LedgerManager virtual ::rust::Box getModuleCacheForTesting() = 0; virtual uint64_t getSorobanInMemoryStateSizeForTesting() = 0; + + // Records the submission time of a self-submitted transaction for the + // tx-latency metrics. No-op unless + // Config::LOADGEN_MEASURE_TX_LATENCY_FOR_TESTING is set. + virtual void recordTxSubmission(Hash const& contentsHash) = 0; + + // Begins/ends a load-generation latency measurement window. No-op unless + // Config::LOADGEN_MEASURE_TX_LATENCY_FOR_TESTING is set. + virtual void beginTxLatencyMeasurement(uint32_t expectedTxCount) = 0; + virtual void finalizeTxLatencyMeasurement() = 0; #endif // Return the (changing) number of seconds since the LCL closed. diff --git a/src/ledger/LedgerManagerImpl.cpp b/src/ledger/LedgerManagerImpl.cpp index 7ece69cee..ce47a91a5 100644 --- a/src/ledger/LedgerManagerImpl.cpp +++ b/src/ledger/LedgerManagerImpl.cpp @@ -74,7 +74,9 @@ #include #include "LedgerManagerImpl.h" +#include #include +#include #include #include #include @@ -226,6 +228,22 @@ LedgerManagerImpl::LedgerApplyMetrics::LedgerApplyMetrics( { } +#ifdef BUILD_TESTS +LedgerManagerImpl::TxLatencyMetrics::TxLatencyMetrics(MetricsRegistry& registry) + : mTxsSubmitted(registry.NewCounter({"loadgen", "tx-latency", "submitted"})) + , mTxsExternalized( + registry.NewCounter({"loadgen", "tx-latency", "externalized"})) + , mLatencyTimer(registry.NewTimer({"loadgen", "tx-latency", "duration"})) + , mRunMin(registry.NewCounter({"loadgen", "tx-latency-run", "min-ms"})) + , mRunMax(registry.NewCounter({"loadgen", "tx-latency-run", "max-ms"})) + , mRunMean(registry.NewCounter({"loadgen", "tx-latency-run", "mean-ms"})) + , mRunP50(registry.NewCounter({"loadgen", "tx-latency-run", "p50-ms"})) + , mRunP75(registry.NewCounter({"loadgen", "tx-latency-run", "p75-ms"})) + , mRunP99(registry.NewCounter({"loadgen", "tx-latency-run", "p99-ms"})) +{ +} +#endif + LedgerManagerImpl::ApplyState::ApplyState(Application& app) : mMetrics(app.getMetrics()) , mAppConnector(app.getAppConnector()) @@ -349,6 +367,9 @@ LedgerManagerImpl::LedgerManagerImpl(Application& app) , mCatchupDuration( app.getMetrics().NewTimer({"ledger", "catchup", "duration"})) , mState(LM_BOOTING_STATE) +#ifdef BUILD_TESTS + , mTxLatencyMetrics(app.getMetrics()) +#endif { // At this point, we haven't called assumeState yet, so the BucketLists are // empty. We will create an "empty" snapshot that is not null, but @@ -1308,6 +1329,142 @@ LedgerManagerImpl::emitNextMeta() mNextMetaToEmit.reset(); } +#ifdef BUILD_TESTS +void +LedgerManagerImpl::recordTxSubmission(Hash const& contentsHash) +{ + if (!mApp.getConfig().LOADGEN_MEASURE_TX_LATENCY_FOR_TESTING) + { + return; + } + MutexLocker guard(mTxLatencyMetrics.mMutex); + if (mTxLatencyMetrics.mTxSubmitTimes + .try_emplace(contentsHash, mApp.getClock().now()) + .second) + { + mTxLatencyMetrics.mTxsSubmitted.inc(); + } + else + { + CLOG_WARNING(Ledger, "Duplicate tx submission recorded for hash {}.", + binToHex(contentsHash)); + } +} + +void +LedgerManagerImpl::recordTxMetaEmissionLatency(LedgerCloseMeta const& lcm) +{ + if (!mApp.getConfig().LOADGEN_MEASURE_TX_LATENCY_FOR_TESTING) + { + return; + } + VirtualClock::time_point const emitTime = mApp.getClock().now(); + MutexLocker guard(mTxLatencyMetrics.mMutex); + auto probe = [&](auto const& txProcessing) { + for (auto const& txp : txProcessing) + { + if (auto submitted = mTxLatencyMetrics.mTxSubmitTimes.find( + txp.result.transactionHash); + submitted != mTxLatencyMetrics.mTxSubmitTimes.end()) + { + auto const latency = emitTime - submitted->second; + mTxLatencyMetrics.mTxsExternalized.inc(); + mTxLatencyMetrics.mLatencyTimer.Update(latency); + int64_t const ms = + std::chrono::duration_cast( + latency) + .count(); + mTxLatencyMetrics.mSamples.push_back(std::clamp( + ms, 0, std::numeric_limits::max())); + mTxLatencyMetrics.mTxSubmitTimes.erase(submitted); + } + } + }; + switch (lcm.v()) + { + case 0: + probe(lcm.v0().txProcessing); + break; + case 1: + probe(lcm.v1().txProcessing); + break; + case 2: + probe(lcm.v2().txProcessing); + break; + default: + releaseAssert(false); + } +} + +void +LedgerManagerImpl::beginTxLatencyMeasurement(uint32_t expectedTxCount) +{ + if (!mApp.getConfig().LOADGEN_MEASURE_TX_LATENCY_FOR_TESTING) + { + return; + } + MutexLocker guard(mTxLatencyMetrics.mMutex); + if (!mTxLatencyMetrics.mTxSubmitTimes.empty()) + { + CLOG_WARNING(Ledger, + "Starting tx latency measurement with {} unmatched " + "submissions from prior run", + mTxLatencyMetrics.mTxSubmitTimes.size()); + mTxLatencyMetrics.mTxSubmitTimes.clear(); + } + mTxLatencyMetrics.mSamples.clear(); + mTxLatencyMetrics.mSamples.reserve(expectedTxCount); + + // Clear the per-run latency metrics. + mTxLatencyMetrics.mRunMin.clear(); + mTxLatencyMetrics.mRunMax.clear(); + mTxLatencyMetrics.mRunMean.clear(); + mTxLatencyMetrics.mRunP50.clear(); + mTxLatencyMetrics.mRunP75.clear(); + mTxLatencyMetrics.mRunP99.clear(); +} + +void +LedgerManagerImpl::finalizeTxLatencyMeasurement() +{ + if (!mApp.getConfig().LOADGEN_MEASURE_TX_LATENCY_FOR_TESTING) + { + return; + } + MutexLocker guard(mTxLatencyMetrics.mMutex); + auto& samples = mTxLatencyMetrics.mSamples; + if (samples.empty()) + { + CLOG_WARNING(Ledger, + "Finalizing tx latency measurement with no samples"); + return; + } + std::sort(samples.begin(), samples.end()); + size_t const n = samples.size(); + + auto percentile = [&](size_t p) -> uint32_t { + // Calculate ceiling of rank + size_t rank = (p * n + 99) / 100; + rank = std::clamp(rank, 1, n); + return samples[rank - 1]; + }; + + uint64_t sum = 0; + for (uint32_t s : samples) + { + sum += s; + } + + mTxLatencyMetrics.mRunMin.set_count(samples.front()); + mTxLatencyMetrics.mRunMax.set_count(samples.back()); + // Mean rounded to the nearest millisecond. + mTxLatencyMetrics.mRunMean.set_count((sum + n / 2) / n); + mTxLatencyMetrics.mRunP50.set_count(percentile(50)); + mTxLatencyMetrics.mRunP75.set_count(percentile(75)); + mTxLatencyMetrics.mRunP99.set_count(percentile(99)); +} +#endif + namespace { #ifdef BUILD_TESTS @@ -1773,6 +1930,7 @@ LedgerManagerImpl::applyLedger(LedgerCloseData const& ledgerData, appliedLedgerState->getLastClosedLedgerHeader(); // Copy this before we move it into mNextMetaToEmit below mLastLedgerCloseMeta = *ledgerCloseMeta; + recordTxMetaEmissionLatency(ledgerCloseMeta->getXDR()); } #endif diff --git a/src/ledger/LedgerManagerImpl.h b/src/ledger/LedgerManagerImpl.h index 5171b3a78..84fabefa4 100644 --- a/src/ledger/LedgerManagerImpl.h +++ b/src/ledger/LedgerManagerImpl.h @@ -18,6 +18,7 @@ #include "transactions/ParallelApplyUtils.h" #include "transactions/TransactionFrame.h" #include "util/Math.h" +#include "util/UnorderedMap.h" #include "util/XDRStream.h" #include "xdr/Stellar-ledger.h" #include @@ -431,6 +432,35 @@ class LedgerManagerImpl : public LedgerManager std::optional mLastLedgerCloseMeta; // Local prng for OP_APPLY_SLEEP_TIME_*_FOR_TESTING. stellar_default_random_engine mApplySleepRng; + + // Metrics for measuring tx e2e latency. Active only when + // Config::LOADGEN_MEASURE_TX_LATENCY_FOR_TESTING is set. + struct TxLatencyMetrics + { + // Lifetime totals (cumulative; not reset between runs). + medida::Counter& mTxsSubmitted; + medida::Counter& mTxsExternalized; + medida::Timer& mLatencyTimer; + // Per-run "loadgen.tx-latency-run.*" statistics (ms), reset by + // beginTxLatencyMeasurement. + medida::Counter& mRunMin; + medida::Counter& mRunMax; + medida::Counter& mRunMean; + medida::Counter& mRunP50; + medida::Counter& mRunP75; + medida::Counter& mRunP99; + ANNOTATED_MUTEX(mMutex); + UnorderedMap + mTxSubmitTimes GUARDED_BY(mMutex); + // Each recorded submission -> meta-emission latency in ms + std::vector mSamples GUARDED_BY(mMutex); + + TxLatencyMetrics(MetricsRegistry& registry); + } mTxLatencyMetrics; + + // End point of the tx-latency metric: matches the ledger-close meta's + // txProcessing entries against mTxSubmitTimes and records each latency. + void recordTxMetaEmissionLatency(LedgerCloseMeta const& lcm); #endif void setState(State s); @@ -533,6 +563,9 @@ class LedgerManagerImpl : public LedgerManager getModuleCacheForTesting() override; void rebuildInMemorySorobanStateForTesting(uint32_t ledgerVersion) override; uint64_t getSorobanInMemoryStateSizeForTesting() override; + void recordTxSubmission(Hash const& contentsHash) override; + void beginTxLatencyMeasurement(uint32_t expectedTxCount) override; + void finalizeTxLatencyMeasurement() override; #endif uint64_t secondsSinceLastLedgerClose() const override; diff --git a/src/main/Config.cpp b/src/main/Config.cpp index 2dfd5beb6..4c1f75805 100644 --- a/src/main/Config.cpp +++ b/src/main/Config.cpp @@ -65,6 +65,7 @@ static std::unordered_set const TESTING_ONLY_OPTIONS = { "LOADGEN_TX_SIZE_BYTES_DISTRIBUTION_FOR_TESTING", "LOADGEN_INSTRUCTIONS_FOR_TESTING", "LOADGEN_INSTRUCTIONS_DISTRIBUTION_FOR_TESTING", + "LOADGEN_MEASURE_TX_LATENCY_FOR_TESTING", "CATCHUP_WAIT_MERGES_TX_APPLY_FOR_TESTING", "ARTIFICIALLY_SET_SURVEY_PHASE_DURATION_FOR_TESTING", "ARTIFICIALLY_DELAY_BUCKET_APPLICATION_FOR_TESTING", @@ -141,6 +142,7 @@ Config::Config() : NODE_SEED(SecretKey::random()) LOADGEN_TX_SIZE_BYTES_DISTRIBUTION_FOR_TESTING = {}; LOADGEN_INSTRUCTIONS_FOR_TESTING = {}; LOADGEN_INSTRUCTIONS_DISTRIBUTION_FOR_TESTING = {}; + LOADGEN_MEASURE_TX_LATENCY_FOR_TESTING = false; CATCHUP_WAIT_MERGES_TX_APPLY_FOR_TESTING = false; ARTIFICIALLY_SET_SURVEY_PHASE_DURATION_FOR_TESTING = std::chrono::minutes::zero(); @@ -1676,6 +1678,10 @@ Config::processConfig(std::shared_ptr t) LOADGEN_INSTRUCTIONS_DISTRIBUTION_FOR_TESTING = readIntArray(item); }}, + {"LOADGEN_MEASURE_TX_LATENCY_FOR_TESTING", + [&]() { + LOADGEN_MEASURE_TX_LATENCY_FOR_TESTING = readBool(item); + }}, #ifdef BUILD_TESTS {"OP_APPLY_SLEEP_TIME_DURATION_FOR_TESTING", [&]() { diff --git a/src/main/Config.h b/src/main/Config.h index 398540385..63dcac91a 100644 --- a/src/main/Config.h +++ b/src/main/Config.h @@ -330,6 +330,10 @@ class Config : public std::enable_shared_from_this std::vector LOADGEN_INSTRUCTIONS_FOR_TESTING; std::vector LOADGEN_INSTRUCTIONS_DISTRIBUTION_FOR_TESTING; + // Defaults to false. When set, measure the latency of self-submitted + // transactions from submission to meta generation. + bool LOADGEN_MEASURE_TX_LATENCY_FOR_TESTING; + #ifdef BUILD_TESTS // Config parameters that force transaction application during ledger // close to sleep for a certain amount of time. diff --git a/src/simulation/LoadGenerator.cpp b/src/simulation/LoadGenerator.cpp index 9b1bab8a3..bf798d795 100644 --- a/src/simulation/LoadGenerator.cpp +++ b/src/simulation/LoadGenerator.cpp @@ -478,6 +478,7 @@ LoadGenerator::start(GeneratedLoadConfig& cfg) 0)); } } + mApp.getLedgerManager().beginTxLatencyMeasurement(cfg.nTxs); mStarted = true; } @@ -1342,6 +1343,7 @@ LoadGenerator::waitTillComplete(GeneratedLoadConfig cfg) if (checkMinimumSorobanSuccess(cfg)) { CLOG_INFO(LoadGen, "Load generation complete."); + mApp.getLedgerManager().finalizeTxLatencyMeasurement(); mLoadgenComplete.Mark(); reset(); } @@ -1423,6 +1425,7 @@ LoadGenerator::waitTillCompleteWithoutChecks() "for high traffic due to tx queue limiter evictions.", inconsistencies.size()); } + mApp.getLedgerManager().finalizeTxLatencyMeasurement(); mLoadgenComplete.Mark(); reset(); return;