From 43e173e24878955d38f1e3302b288f6b2085bc38 Mon Sep 17 00:00:00 2001 From: nolouch Date: Tue, 19 May 2026 17:40:03 -0700 Subject: [PATCH 01/11] util/stmtsummary: add tidb_stmt_summary_log_evicted Introduce a new global system variable tidb_stmt_summary_log_evicted: when ON, the v2 (persistent) backend writes a JSON entry (with "evicted":true) to the stmt log for each LRU eviction, allowing downstream consumers to track which records were dropped from memory. Uses a buffered async channel so the Add() hot path never blocks; when the channel is full, evictions are dropped and a periodic warn log reports the dropped count. Off by default because it adds log volume proportional to eviction rate. Co-Authored-By: Claude Opus 4.7 (1M context) --- pkg/sessionctx/vardef/tidb_vars.go | 6 + pkg/sessionctx/variable/sysvar.go | 4 + pkg/util/stmtsummary/v2/BUILD.bazel | 2 +- pkg/util/stmtsummary/v2/logger.go | 20 +++ pkg/util/stmtsummary/v2/stmtsummary.go | 159 +++++++++++++++++++- pkg/util/stmtsummary/v2/stmtsummary_test.go | 36 +++++ 6 files changed, 220 insertions(+), 7 deletions(-) diff --git a/pkg/sessionctx/vardef/tidb_vars.go b/pkg/sessionctx/vardef/tidb_vars.go index 478b27f0d6e99..ebf444bc9c0fe 100644 --- a/pkg/sessionctx/vardef/tidb_vars.go +++ b/pkg/sessionctx/vardef/tidb_vars.go @@ -704,6 +704,11 @@ const ( // TiDBStmtSummaryMaxSQLLength indicates the max length of displayed normalized sql and sample sql. TiDBStmtSummaryMaxSQLLength = "tidb_stmt_summary_max_sql_length" + // TiDBStmtSummaryLogEvicted controls whether per-record LRU evictions in + // the v2 (persistent) statement summary are written to the stmt log. Off + // by default because it adds log volume proportional to eviction rate. + TiDBStmtSummaryLogEvicted = "tidb_stmt_summary_log_evicted" + // TiDBIgnoreInlistPlanDigest enables TiDB to generate the same plan digest with SQL using different in-list arguments. TiDBIgnoreInlistPlanDigest = "tidb_ignore_inlist_plan_digest" @@ -1588,6 +1593,7 @@ const ( DefTiDBStmtSummaryHistorySize = 24 DefTiDBStmtSummaryMaxStmtCount = 3000 DefTiDBStmtSummaryMaxSQLLength = 32768 + DefTiDBStmtSummaryLogEvicted = false DefTiDBCapturePlanBaseline = Off DefTiDBIgnoreInlistPlanDigest = true DefTiDBEnableIndexMerge = true diff --git a/pkg/sessionctx/variable/sysvar.go b/pkg/sessionctx/variable/sysvar.go index aff7f44916781..1a70968e7ef0e 100644 --- a/pkg/sessionctx/variable/sysvar.go +++ b/pkg/sessionctx/variable/sysvar.go @@ -942,6 +942,10 @@ var defaultSysVars = []*SysVar{ SetGlobal: func(_ context.Context, s *SessionVars, val string) error { return stmtsummaryv2.SetMaxSQLLength(TidbOptInt(val, vardef.DefTiDBStmtSummaryMaxSQLLength)) }}, + {Scope: vardef.ScopeGlobal, Name: vardef.TiDBStmtSummaryLogEvicted, Value: BoolToOnOff(vardef.DefTiDBStmtSummaryLogEvicted), Type: vardef.TypeBool, AllowEmpty: true, + SetGlobal: func(_ context.Context, s *SessionVars, val string) error { + return stmtsummaryv2.SetLogEvicted(TiDBOptOn(val)) + }}, {Scope: vardef.ScopeGlobal, Name: vardef.TiDBCapturePlanBaseline, Value: vardef.DefTiDBCapturePlanBaseline, Type: vardef.TypeBool, AllowEmptyAll: true}, {Scope: vardef.ScopeGlobal, Name: vardef.TiDBEvolvePlanTaskMaxTime, Value: strconv.Itoa(vardef.DefTiDBEvolvePlanTaskMaxTime), Type: vardef.TypeInt, MinValue: -1, MaxValue: math.MaxInt64}, {Scope: vardef.ScopeGlobal, Name: vardef.TiDBEvolvePlanTaskStartTime, Value: vardef.DefTiDBEvolvePlanTaskStartTime, Type: vardef.TypeTime}, diff --git a/pkg/util/stmtsummary/v2/BUILD.bazel b/pkg/util/stmtsummary/v2/BUILD.bazel index 8bc5284bfe3bf..131ef9cbcee76 100644 --- a/pkg/util/stmtsummary/v2/BUILD.bazel +++ b/pkg/util/stmtsummary/v2/BUILD.bazel @@ -48,7 +48,7 @@ go_test( ], embed = [":stmtsummary"], flaky = True, - shard_count = 13, + shard_count = 14, deps = [ "//pkg/meta/model", "//pkg/parser/ast", diff --git a/pkg/util/stmtsummary/v2/logger.go b/pkg/util/stmtsummary/v2/logger.go index 64c3499c6ba28..9774f679e0c5b 100644 --- a/pkg/util/stmtsummary/v2/logger.go +++ b/pkg/util/stmtsummary/v2/logger.go @@ -70,6 +70,26 @@ func (s *stmtLogStorage) sync() error { return s.logger.Sync() } +// logEvicted writes a single evicted record to the stmt log with an +// `"evicted":true` marker so downstream consumers can distinguish per-record +// eviction events from rotated-window records. +func (s *stmtLogStorage) logEvicted(r *StmtRecord) { + b, err := json.Marshal(evictedStmtRecord{StmtRecord: r, Evicted: true}) + if err != nil { + logutil.BgLogger().Warn("failed to marshal evicted statement summary", zap.Error(err)) + return + } + s.logger.Info(string(b)) +} + +// evictedStmtRecord embeds *StmtRecord and adds an "evicted" JSON tag. +// Keeping the embedded pointer means the JSON field order matches StmtRecord +// and parsers tolerant of the extra field work unchanged. +type evictedStmtRecord struct { + *StmtRecord + Evicted bool `json:"evicted"` +} + func (s *stmtLogStorage) log(r *StmtRecord) { b, err := json.Marshal(r) if err != nil { diff --git a/pkg/util/stmtsummary/v2/stmtsummary.go b/pkg/util/stmtsummary/v2/stmtsummary.go index 654aa6b53a997..eb223ddf6a675 100644 --- a/pkg/util/stmtsummary/v2/stmtsummary.go +++ b/pkg/util/stmtsummary/v2/stmtsummary.go @@ -40,6 +40,10 @@ const ( defaultMaxSQLLength = 32768 defaultRefreshInterval = 30 * 60 // 30 min defaultRotateCheckInterval = 1 // s + + // evictedLogChanCap bounds the buffer of per-record evicted entries waiting + // to be logged. When full, new evictions are dropped so Add() never blocks. + evictedLogChanCap = 1024 ) var ( @@ -85,12 +89,18 @@ type StmtSummary struct { optMaxStmtCount *atomic2.Uint32 optMaxSQLLength *atomic2.Uint32 optRefreshInterval *atomic2.Uint32 + optLogEvicted *atomic2.Bool window *stmtWindow windowLock sync.Mutex storage stmtStorage closeWg sync.WaitGroup closed atomic.Bool + + // evictedCh carries per-record evictions to the async logger. + // A nil channel means evicted-logging is disabled. Sends are non-blocking. + evictedCh chan *StmtRecord + evictedDropped atomic.Uint64 } // NewStmtSummary creates a new StmtSummary from Config. @@ -112,7 +122,7 @@ func NewStmtSummary(cfg *Config) (*StmtSummary, error) { optMaxStmtCount: atomic2.NewUint32(defaultMaxStmtCount), optMaxSQLLength: atomic2.NewUint32(defaultMaxSQLLength), optRefreshInterval: atomic2.NewUint32(defaultRefreshInterval), - window: newStmtWindow(timeNow(), uint(defaultMaxStmtCount)), + optLogEvicted: atomic2.NewBool(false), storage: newStmtLogStorage(&log.Config{ File: log.FileLogConfig{ Filename: cfg.Filename, @@ -121,13 +131,20 @@ func NewStmtSummary(cfg *Config) (*StmtSummary, error) { MaxBackups: cfg.FileMaxBackups, }, }), + evictedCh: make(chan *StmtRecord, evictedLogChanCap), } + s.window = newStmtWindow(timeNow(), uint(defaultMaxStmtCount), s.onEvict) s.closeWg.Add(1) go func() { defer s.closeWg.Done() s.rotateLoop() }() + s.closeWg.Add(1) + go func() { + defer s.closeWg.Done() + s.evictedLogLoop() + }() return s, nil } @@ -145,9 +162,17 @@ func NewStmtSummary4Test(maxStmtCount uint) *StmtSummary { optMaxStmtCount: atomic2.NewUint32(defaultMaxStmtCount), optMaxSQLLength: atomic2.NewUint32(defaultMaxSQLLength), optRefreshInterval: atomic2.NewUint32(60 * 60 * 24 * 365), // 1 year - window: newStmtWindow(timeNow(), maxStmtCount), + optLogEvicted: atomic2.NewBool(false), storage: &mockStmtStorage{}, + evictedCh: make(chan *StmtRecord, evictedLogChanCap), } + ss.window = newStmtWindow(timeNow(), maxStmtCount, ss.onEvict) + + ss.closeWg.Add(1) + go func() { + defer ss.closeWg.Done() + ss.evictedLogLoop() + }() return ss } @@ -234,6 +259,17 @@ func (s *StmtSummary) SetRefreshInterval(v uint32) error { return nil } +// LogEvicted reports whether per-record evictions are logged. +func (s *StmtSummary) LogEvicted() bool { + return s.optLogEvicted.Load() +} + +// SetLogEvicted enables or disables per-record eviction logging. +func (s *StmtSummary) SetLogEvicted(v bool) error { + s.optLogEvicted.Store(v) + return nil +} + // Add adds a single stmtsummary.StmtExecInfo to the current statistics window // of StmtSummary. Before adding, it will check whether the current window has // expired, and if it has expired, the window will be persisted asynchronously @@ -318,7 +354,7 @@ func (s *StmtSummary) flush() { s.windowLock.Lock() window := s.window - s.window = newStmtWindow(now, uint(s.MaxStmtCount())) + s.window = newStmtWindow(now, uint(s.MaxStmtCount()), s.onEvict) s.windowLock.Unlock() if window.lru.Size() > 0 { @@ -352,7 +388,7 @@ func (s *StmtSummary) rotateLoop() { func (s *StmtSummary) rotate(now time.Time) { w := s.window - s.window = newStmtWindow(now, uint(s.MaxStmtCount())) + s.window = newStmtWindow(now, uint(s.MaxStmtCount()), s.onEvict) size := w.lru.Size() if size > 0 { // Persist window asynchronously. @@ -364,6 +400,66 @@ func (s *StmtSummary) rotate(now time.Time) { } } +// onEvict is the LRU eviction hook installed on every stmtWindow. +// Called while the record's lock is held (see newStmtWindow). We copy the +// fields we need and hand the clone off to the async log goroutine. A +// non-blocking send is used so the hot Add() path never stalls on log I/O. +func (s *StmtSummary) onEvict(_ *stmtsummary.StmtDigestKey, r *StmtRecord) { + if !s.optLogEvicted.Load() { + return + } + if s.evictedCh == nil { + return + } + clone := cloneRecordForLog(r) + select { + case s.evictedCh <- clone: + default: + s.evictedDropped.Add(1) + } +} + +// evictedLogLoop drains evictedCh and writes each record to the stmt log. +// When group_by_user is also enabled, each logged record represents exactly +// one (digest, user) group that fell out of the LRU. +func (s *StmtSummary) evictedLogLoop() { + const dropReportInterval = 30 * time.Second + ticker := time.NewTicker(dropReportInterval) + defer ticker.Stop() + + var lastDropReport uint64 + report := func() { + cur := s.evictedDropped.Load() + if cur > lastDropReport { + logutil.BgLogger().Warn("stmt summary evicted log dropped records", + zap.Uint64("dropped_total", cur), + zap.Uint64("since_last_report", cur-lastDropReport), + ) + lastDropReport = cur + } + } + + for { + select { + case <-s.ctx.Done(): + // Drain remaining buffered records synchronously, then exit. + for { + select { + case r := <-s.evictedCh: + s.storage.logEvicted(r) + default: + report() + return + } + } + case r := <-s.evictedCh: + s.storage.logEvicted(r) + case <-ticker.C: + report() + } + } +} + // stmtWindow represents a single statistical window, which has a begin // time and an end time. Data within a single window is eliminated // according to the LRU strategy. All evicted data will be aggregated @@ -374,7 +470,12 @@ type stmtWindow struct { evicted *stmtEvicted } -func newStmtWindow(begin time.Time, capacity uint) *stmtWindow { +// onEvictFn is invoked for every LRU eviction after the evicted stats have +// been aggregated into stmtWindow.evicted. The callback receives the locked +// record (caller holds r.Lock) so it can copy fields cheaply. Must not block. +type onEvictFn func(key *stmtsummary.StmtDigestKey, r *StmtRecord) + +func newStmtWindow(begin time.Time, capacity uint, onEvict onEvictFn) *stmtWindow { w := &stmtWindow{ begin: begin, lru: kvcache.NewSimpleLRUCache(capacity, 0, 0), @@ -384,7 +485,11 @@ func newStmtWindow(begin time.Time, capacity uint) *stmtWindow { r := v.(*lockedStmtRecord) r.Lock() defer r.Unlock() - w.evicted.add(k.(*stmtsummary.StmtDigestKey), r.StmtRecord) + key := k.(*stmtsummary.StmtDigestKey) + w.evicted.add(key, r.StmtRecord) + if onEvict != nil { + onEvict(key, r.StmtRecord) + } }) return w } @@ -396,6 +501,10 @@ func (w *stmtWindow) clear() { type stmtStorage interface { persist(w *stmtWindow, end time.Time) + // logEvicted writes a single evicted record to durable storage. It may be + // called concurrently with persist; implementations must be safe to call + // from the evictedLogLoop goroutine. + logEvicted(r *StmtRecord) sync() error } @@ -442,6 +551,7 @@ type lockedStmtRecord struct { type mockStmtStorage struct { sync.Mutex windows []*stmtWindow + evicted []*StmtRecord } func (s *mockStmtStorage) persist(w *stmtWindow, _ time.Time) { @@ -450,10 +560,38 @@ func (s *mockStmtStorage) persist(w *stmtWindow, _ time.Time) { s.Unlock() } +func (s *mockStmtStorage) logEvicted(r *StmtRecord) { + s.Lock() + s.evicted = append(s.evicted, r) + s.Unlock() +} + func (*mockStmtStorage) sync() error { return nil } +// cloneRecordForLog returns a shallow copy of r with its two mutable maps +// (AuthUsers, BackoffTypes) cloned, so the async logger can marshal the +// snapshot without racing with further updates on the retained StmtRecord. +// Called with r's lock held (see onEvict). +func cloneRecordForLog(r *StmtRecord) *StmtRecord { + c := *r + if len(r.AuthUsers) > 0 { + c.AuthUsers = make(map[string]struct{}, len(r.AuthUsers)) + for u := range r.AuthUsers { + c.AuthUsers[u] = struct{}{} + } + } + if len(r.BackoffTypes) > 0 { + c.BackoffTypes = make(map[string]int, len(r.BackoffTypes)) + for k, v := range r.BackoffTypes { + c.BackoffTypes[k] = v + } + } + // IndexNames is a slice; shallow copy is fine because it is append-only. + return &c +} + /* Public proxy functions between v1 and v2 */ // Add wraps GlobalStmtSummary.Add and stmtsummary.StmtSummaryByDigestMap.AddStatement. @@ -529,3 +667,12 @@ func SetMaxSQLLength(v int) error { } return stmtsummary.StmtSummaryByDigestMap.SetMaxSQLLength(v) } + +// SetLogEvicted toggles per-record eviction logging. Only v2 (persistent) +// honors this flag; v1 has no log sink, so the call is a no-op for it. +func SetLogEvicted(v bool) error { + if GlobalStmtSummary != nil { + return GlobalStmtSummary.SetLogEvicted(v) + } + return nil +} diff --git a/pkg/util/stmtsummary/v2/stmtsummary_test.go b/pkg/util/stmtsummary/v2/stmtsummary_test.go index 3256e6d0af960..e561a7bfa7f44 100644 --- a/pkg/util/stmtsummary/v2/stmtsummary_test.go +++ b/pkg/util/stmtsummary/v2/stmtsummary_test.go @@ -17,6 +17,7 @@ package stmtsummary import ( "encoding/json" "testing" + "time" "github.com/stretchr/testify/require" ) @@ -69,6 +70,41 @@ func TestStmtSummary(t *testing.T) { require.Equal(t, 0, w.lru.Size()) } +func TestStmtSummaryLogEvicted(t *testing.T) { + storage := &mockStmtStorage{} + ss := NewStmtSummary4Test(2) + ss.storage = storage + defer ss.Close() + require.NoError(t, ss.SetLogEvicted(true)) + + // With capacity 2, the 3rd and later distinct digests evict older entries + // and should each land in storage.evicted. + ss.Add(GenerateStmtExecInfo4Test("digest1")) + ss.Add(GenerateStmtExecInfo4Test("digest2")) + ss.Add(GenerateStmtExecInfo4Test("digest3")) // evicts digest1 + ss.Add(GenerateStmtExecInfo4Test("digest4")) // evicts digest2 + + // The log is async; wait briefly for drain. + require.Eventually(t, func() bool { + storage.Lock() + defer storage.Unlock() + return len(storage.evicted) == 2 + }, time.Second, 10*time.Millisecond, "expected 2 evicted records to be logged") + + storage.Lock() + digests := []string{storage.evicted[0].Digest, storage.evicted[1].Digest} + storage.Unlock() + require.ElementsMatch(t, []string{"digest1", "digest2"}, digests) + + // Disable and verify no further log writes. + require.NoError(t, ss.SetLogEvicted(false)) + ss.Add(GenerateStmtExecInfo4Test("digest5")) // evicts digest3 + time.Sleep(50 * time.Millisecond) + storage.Lock() + require.Equal(t, 2, len(storage.evicted)) + storage.Unlock() +} + func TestStmtSummaryFlush(t *testing.T) { storage := &mockStmtStorage{} ss := NewStmtSummary4Test(1000) From b18c4030b2c24f1f9ecf84da2b2c8bb42c108810 Mon Sep 17 00:00:00 2001 From: nolouch Date: Fri, 22 May 2026 01:10:30 -0700 Subject: [PATCH 02/11] util/stmtsummary: batch persist evicted summaries --- pkg/sessionctx/vardef/tidb_vars.go | 10 +- pkg/sessionctx/variable/sysvar.go | 4 +- pkg/util/stmtsummary/v2/logger.go | 27 +++-- pkg/util/stmtsummary/v2/stmtsummary.go | 106 +++++++++++++++----- pkg/util/stmtsummary/v2/stmtsummary_test.go | 6 +- 5 files changed, 111 insertions(+), 42 deletions(-) diff --git a/pkg/sessionctx/vardef/tidb_vars.go b/pkg/sessionctx/vardef/tidb_vars.go index 3269bbd859ab5..324f8fe37f0c1 100644 --- a/pkg/sessionctx/vardef/tidb_vars.go +++ b/pkg/sessionctx/vardef/tidb_vars.go @@ -721,10 +721,10 @@ const ( // TiDBStmtSummaryMaxSQLLength indicates the max length of displayed normalized sql and sample sql. TiDBStmtSummaryMaxSQLLength = "tidb_stmt_summary_max_sql_length" - // TiDBStmtSummaryLogEvicted controls whether per-record LRU evictions in - // the v2 (persistent) statement summary are written to the stmt log. Off - // by default because it adds log volume proportional to eviction rate. - TiDBStmtSummaryLogEvicted = "tidb_stmt_summary_log_evicted" + // TiDBStmtSummaryPersistEvicted controls whether per-record LRU evictions + // in the v2 (persistent) statement summary are persisted to the stmt log. + // Off by default because it adds log volume proportional to eviction rate. + TiDBStmtSummaryPersistEvicted = "tidb_stmt_summary_persist_evicted" // TiDBIgnoreInlistPlanDigest enables TiDB to generate the same plan digest with SQL using different in-list arguments. TiDBIgnoreInlistPlanDigest = "tidb_ignore_inlist_plan_digest" @@ -1627,7 +1627,7 @@ const ( DefTiDBStmtSummaryHistorySize = 24 DefTiDBStmtSummaryMaxStmtCount = 3000 DefTiDBStmtSummaryMaxSQLLength = 32768 - DefTiDBStmtSummaryLogEvicted = false + DefTiDBStmtSummaryPersistEvicted = false DefTiDBCapturePlanBaseline = Off DefTiDBIgnoreInlistPlanDigest = true DefTiDBEnableIndexMerge = true diff --git a/pkg/sessionctx/variable/sysvar.go b/pkg/sessionctx/variable/sysvar.go index ec0e4f2cf9512..7280d2aacc4ed 100644 --- a/pkg/sessionctx/variable/sysvar.go +++ b/pkg/sessionctx/variable/sysvar.go @@ -953,9 +953,9 @@ var defaultSysVars = []*SysVar{ SetGlobal: func(_ context.Context, s *SessionVars, val string) error { return stmtsummaryv2.SetMaxSQLLength(TidbOptInt(val, vardef.DefTiDBStmtSummaryMaxSQLLength)) }}, - {Scope: vardef.ScopeGlobal, Name: vardef.TiDBStmtSummaryLogEvicted, Value: BoolToOnOff(vardef.DefTiDBStmtSummaryLogEvicted), Type: vardef.TypeBool, AllowEmpty: true, + {Scope: vardef.ScopeGlobal, Name: vardef.TiDBStmtSummaryPersistEvicted, Value: BoolToOnOff(vardef.DefTiDBStmtSummaryPersistEvicted), Type: vardef.TypeBool, AllowEmpty: true, SetGlobal: func(_ context.Context, s *SessionVars, val string) error { - return stmtsummaryv2.SetLogEvicted(TiDBOptOn(val)) + return stmtsummaryv2.SetPersistEvicted(TiDBOptOn(val)) }}, {Scope: vardef.ScopeGlobal, Name: vardef.TiDBCapturePlanBaseline, Value: vardef.DefTiDBCapturePlanBaseline, Type: vardef.TypeBool, AllowEmptyAll: true}, {Scope: vardef.ScopeGlobal, Name: vardef.TiDBEvolvePlanTaskMaxTime, Value: strconv.Itoa(vardef.DefTiDBEvolvePlanTaskMaxTime), Type: vardef.TypeInt, MinValue: -1, MaxValue: math.MaxInt64}, diff --git a/pkg/util/stmtsummary/v2/logger.go b/pkg/util/stmtsummary/v2/logger.go index 9774f679e0c5b..0355d18294123 100644 --- a/pkg/util/stmtsummary/v2/logger.go +++ b/pkg/util/stmtsummary/v2/logger.go @@ -17,6 +17,7 @@ package stmtsummary import ( "encoding/json" "fmt" + "strings" "time" "github.com/pingcap/log" @@ -70,16 +71,26 @@ func (s *stmtLogStorage) sync() error { return s.logger.Sync() } -// logEvicted writes a single evicted record to the stmt log with an -// `"evicted":true` marker so downstream consumers can distinguish per-record -// eviction events from rotated-window records. -func (s *stmtLogStorage) logEvicted(r *StmtRecord) { - b, err := json.Marshal(evictedStmtRecord{StmtRecord: r, Evicted: true}) - if err != nil { - logutil.BgLogger().Warn("failed to marshal evicted statement summary", zap.Error(err)) +// logEvicted writes evicted records to the stmt log with an `"evicted":true` +// marker so downstream consumers can distinguish per-record eviction events +// from rotated-window records. +func (s *stmtLogStorage) logEvicted(records []*StmtRecord) { + var builder strings.Builder + for _, r := range records { + b, err := json.Marshal(evictedStmtRecord{StmtRecord: r, Evicted: true}) + if err != nil { + logutil.BgLogger().Warn("failed to marshal evicted statement summary", zap.Error(err)) + continue + } + if builder.Len() > 0 { + builder.WriteByte('\n') + } + _, _ = builder.Write(b) + } + if builder.Len() == 0 { return } - s.logger.Info(string(b)) + s.logger.Info(builder.String()) } // evictedStmtRecord embeds *StmtRecord and adds an "evicted" JSON tag. diff --git a/pkg/util/stmtsummary/v2/stmtsummary.go b/pkg/util/stmtsummary/v2/stmtsummary.go index 0fb84efcc0c95..3dbf62555621a 100644 --- a/pkg/util/stmtsummary/v2/stmtsummary.go +++ b/pkg/util/stmtsummary/v2/stmtsummary.go @@ -45,6 +45,13 @@ const ( // evictedLogChanCap bounds the buffer of per-record evicted entries waiting // to be logged. When full, new evictions are dropped so Add() never blocks. evictedLogChanCap = 1024 + + // evictedLogBatchSize and evictedLogFlushInterval bound the async logger's + // batching. They reduce write frequency under eviction bursts while keeping + // single-record latency low. + evictedLogBatchSize = 64 + evictedLogFlushInterval = 100 * time.Millisecond + evictedDropReportInterval = 30 * time.Second ) var ( @@ -90,7 +97,7 @@ type StmtSummary struct { optMaxStmtCount *atomic2.Uint32 optMaxSQLLength *atomic2.Uint32 optRefreshInterval *atomic2.Uint32 - optLogEvicted *atomic2.Bool + optPersistEvicted *atomic2.Bool window *stmtWindow windowLock sync.Mutex @@ -123,7 +130,7 @@ func NewStmtSummary(cfg *Config) (*StmtSummary, error) { optMaxStmtCount: atomic2.NewUint32(defaultMaxStmtCount), optMaxSQLLength: atomic2.NewUint32(defaultMaxSQLLength), optRefreshInterval: atomic2.NewUint32(defaultRefreshInterval), - optLogEvicted: atomic2.NewBool(false), + optPersistEvicted: atomic2.NewBool(false), storage: newStmtLogStorage(&log.Config{ File: log.FileLogConfig{ Filename: cfg.Filename, @@ -163,7 +170,7 @@ func NewStmtSummary4Test(maxStmtCount uint) *StmtSummary { optMaxStmtCount: atomic2.NewUint32(defaultMaxStmtCount), optMaxSQLLength: atomic2.NewUint32(defaultMaxSQLLength), optRefreshInterval: atomic2.NewUint32(60 * 60 * 24 * 365), // 1 year - optLogEvicted: atomic2.NewBool(false), + optPersistEvicted: atomic2.NewBool(false), storage: &mockStmtStorage{}, evictedCh: make(chan *StmtRecord, evictedLogChanCap), } @@ -260,14 +267,14 @@ func (s *StmtSummary) SetRefreshInterval(v uint32) error { return nil } -// LogEvicted reports whether per-record evictions are logged. -func (s *StmtSummary) LogEvicted() bool { - return s.optLogEvicted.Load() +// PersistEvicted reports whether per-record evictions are persisted. +func (s *StmtSummary) PersistEvicted() bool { + return s.optPersistEvicted.Load() } -// SetLogEvicted enables or disables per-record eviction logging. -func (s *StmtSummary) SetLogEvicted(v bool) error { - s.optLogEvicted.Store(v) +// SetPersistEvicted enables or disables per-record eviction persistence. +func (s *StmtSummary) SetPersistEvicted(v bool) error { + s.optPersistEvicted.Store(v) return nil } @@ -428,7 +435,7 @@ func (s *StmtSummary) rotate(now time.Time) { // fields we need and hand the clone off to the async log goroutine. A // non-blocking send is used so the hot Add() path never stalls on log I/O. func (s *StmtSummary) onEvict(_ *stmtsummary.StmtDigestKey, r *StmtRecord) { - if !s.optLogEvicted.Load() { + if !s.optPersistEvicted.Load() { return } if s.evictedCh == nil { @@ -446,9 +453,14 @@ func (s *StmtSummary) onEvict(_ *stmtsummary.StmtDigestKey, r *StmtRecord) { // When group_by_user is also enabled, each logged record represents exactly // one (digest, user) group that fell out of the LRU. func (s *StmtSummary) evictedLogLoop() { - const dropReportInterval = 30 * time.Second - ticker := time.NewTicker(dropReportInterval) - defer ticker.Stop() + reportTicker := time.NewTicker(evictedDropReportInterval) + defer reportTicker.Stop() + + flushTimer := time.NewTimer(evictedLogFlushInterval) + if !flushTimer.Stop() { + <-flushTimer.C + } + defer flushTimer.Stop() var lastDropReport uint64 report := func() { @@ -462,6 +474,47 @@ func (s *StmtSummary) evictedLogLoop() { } } + stopFlushTimer := func() { + if !flushTimer.Stop() { + select { + case <-flushTimer.C: + default: + } + } + } + + batch := make([]*StmtRecord, 0, evictedLogBatchSize) + flush := func() { + if len(batch) == 0 { + return + } + s.storage.logEvicted(batch) + for i := range batch { + batch[i] = nil + } + batch = batch[:0] + stopFlushTimer() + } + appendRecord := func(r *StmtRecord) { + batch = append(batch, r) + if len(batch) == 1 { + flushTimer.Reset(evictedLogFlushInterval) + } + if len(batch) >= evictedLogBatchSize { + flush() + } + } + drainAvailable := func() { + for len(batch) > 0 && len(batch) < evictedLogBatchSize { + select { + case r := <-s.evictedCh: + appendRecord(r) + default: + return + } + } + } + for { select { case <-s.ctx.Done(): @@ -471,15 +524,19 @@ func (s *StmtSummary) evictedLogLoop() { for { select { case r := <-s.evictedCh: - s.storage.logEvicted(r) + appendRecord(r) default: + flush() report() return } } case r := <-s.evictedCh: - s.storage.logEvicted(r) - case <-ticker.C: + appendRecord(r) + drainAvailable() + case <-flushTimer.C: + flush() + case <-reportTicker.C: report() } } @@ -529,10 +586,10 @@ func (w *stmtWindow) clear() { type stmtStorage interface { persist(w *stmtWindow, end time.Time) - // logEvicted writes a single evicted record to durable storage. It may be + // logEvicted writes evicted records to durable storage. It may be // called concurrently with persist; implementations must be safe to call // from the evictedLogLoop goroutine. - logEvicted(r *StmtRecord) + logEvicted(records []*StmtRecord) sync() error } @@ -588,9 +645,9 @@ func (s *mockStmtStorage) persist(w *stmtWindow, _ time.Time) { s.Unlock() } -func (s *mockStmtStorage) logEvicted(r *StmtRecord) { +func (s *mockStmtStorage) logEvicted(records []*StmtRecord) { s.Lock() - s.evicted = append(s.evicted, r) + s.evicted = append(s.evicted, records...) s.Unlock() } @@ -696,11 +753,12 @@ func SetMaxSQLLength(v int) error { return stmtsummary.StmtSummaryByDigestMap.SetMaxSQLLength(v) } -// SetLogEvicted toggles per-record eviction logging. Only v2 (persistent) -// honors this flag; v1 has no log sink, so the call is a no-op for it. -func SetLogEvicted(v bool) error { +// SetPersistEvicted toggles per-record eviction persistence. Only v2 +// (persistent) honors this flag; v1 has no log sink, so the call is a no-op +// for it. +func SetPersistEvicted(v bool) error { if GlobalStmtSummary != nil { - return GlobalStmtSummary.SetLogEvicted(v) + return GlobalStmtSummary.SetPersistEvicted(v) } return nil } diff --git a/pkg/util/stmtsummary/v2/stmtsummary_test.go b/pkg/util/stmtsummary/v2/stmtsummary_test.go index 87732f0945fb5..955138829e889 100644 --- a/pkg/util/stmtsummary/v2/stmtsummary_test.go +++ b/pkg/util/stmtsummary/v2/stmtsummary_test.go @@ -83,12 +83,12 @@ func TestStmtSummary(t *testing.T) { require.Equal(t, 0, w.lru.Size()) } -func TestStmtSummaryLogEvicted(t *testing.T) { +func TestStmtSummaryPersistEvicted(t *testing.T) { storage := &mockStmtStorage{} ss := NewStmtSummary4Test(2) ss.storage = storage defer ss.Close() - require.NoError(t, ss.SetLogEvicted(true)) + require.NoError(t, ss.SetPersistEvicted(true)) // With capacity 2, the 3rd and later distinct digests evict older entries // and should each land in storage.evicted. @@ -110,7 +110,7 @@ func TestStmtSummaryLogEvicted(t *testing.T) { require.ElementsMatch(t, []string{"digest1", "digest2"}, digests) // Disable and verify no further log writes. - require.NoError(t, ss.SetLogEvicted(false)) + require.NoError(t, ss.SetPersistEvicted(false)) ss.Add(GenerateStmtExecInfo4Test("digest5")) // evicts digest3 require.Never(t, func() bool { storage.Lock() From 7c9cf5885f60aae90993f559f3048e7c1a27b4f2 Mon Sep 17 00:00:00 2001 From: nolouch Date: Mon, 25 May 2026 03:22:34 -0700 Subject: [PATCH 03/11] util/stmtsummary: set time range for evicted records --- pkg/util/stmtsummary/v2/stmtsummary.go | 8 +++++--- pkg/util/stmtsummary/v2/stmtsummary_test.go | 16 ++++++++++++++++ 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/pkg/util/stmtsummary/v2/stmtsummary.go b/pkg/util/stmtsummary/v2/stmtsummary.go index 3dbf62555621a..f488723d168aa 100644 --- a/pkg/util/stmtsummary/v2/stmtsummary.go +++ b/pkg/util/stmtsummary/v2/stmtsummary.go @@ -434,7 +434,7 @@ func (s *StmtSummary) rotate(now time.Time) { // Called while the record's lock is held (see newStmtWindow). We copy the // fields we need and hand the clone off to the async log goroutine. A // non-blocking send is used so the hot Add() path never stalls on log I/O. -func (s *StmtSummary) onEvict(_ *stmtsummary.StmtDigestKey, r *StmtRecord) { +func (s *StmtSummary) onEvict(_ *stmtsummary.StmtDigestKey, r *StmtRecord, begin, end time.Time) { if !s.optPersistEvicted.Load() { return } @@ -442,6 +442,8 @@ func (s *StmtSummary) onEvict(_ *stmtsummary.StmtDigestKey, r *StmtRecord) { return } clone := cloneRecordForLog(r) + clone.Begin = begin.Unix() + clone.End = end.Unix() select { case s.evictedCh <- clone: default: @@ -556,7 +558,7 @@ type stmtWindow struct { // onEvictFn is invoked for every LRU eviction after the evicted stats have // been aggregated into stmtWindow.evicted. The callback receives the locked // record (caller holds r.Lock) so it can copy fields cheaply. Must not block. -type onEvictFn func(key *stmtsummary.StmtDigestKey, r *StmtRecord) +type onEvictFn func(key *stmtsummary.StmtDigestKey, r *StmtRecord, begin, end time.Time) func newStmtWindow(begin time.Time, capacity uint, onEvict onEvictFn) *stmtWindow { w := &stmtWindow{ @@ -572,7 +574,7 @@ func newStmtWindow(begin time.Time, capacity uint, onEvict onEvictFn) *stmtWindo key := k.(*stmtsummary.StmtDigestKey) w.evicted.add(key, r.StmtRecord) if onEvict != nil { - onEvict(key, r.StmtRecord) + onEvict(key, r.StmtRecord, w.begin, timeNow()) } }) return w diff --git a/pkg/util/stmtsummary/v2/stmtsummary_test.go b/pkg/util/stmtsummary/v2/stmtsummary_test.go index 955138829e889..b9a0d22192420 100644 --- a/pkg/util/stmtsummary/v2/stmtsummary_test.go +++ b/pkg/util/stmtsummary/v2/stmtsummary_test.go @@ -84,6 +84,17 @@ func TestStmtSummary(t *testing.T) { } func TestStmtSummaryPersistEvicted(t *testing.T) { + begin := time.Date(2026, 5, 25, 10, 0, 0, 0, time.UTC) + evictAt := begin.Add(42 * time.Second) + now := begin + oldTimeNow := timeNow + timeNow = func() time.Time { + return now + } + t.Cleanup(func() { + timeNow = oldTimeNow + }) + storage := &mockStmtStorage{} ss := NewStmtSummary4Test(2) ss.storage = storage @@ -94,6 +105,7 @@ func TestStmtSummaryPersistEvicted(t *testing.T) { // and should each land in storage.evicted. ss.Add(GenerateStmtExecInfo4Test("digest1")) ss.Add(GenerateStmtExecInfo4Test("digest2")) + now = evictAt ss.Add(GenerateStmtExecInfo4Test("digest3")) // evicts digest1 ss.Add(GenerateStmtExecInfo4Test("digest4")) // evicts digest2 @@ -106,6 +118,10 @@ func TestStmtSummaryPersistEvicted(t *testing.T) { storage.Lock() digests := []string{storage.evicted[0].Digest, storage.evicted[1].Digest} + for _, record := range storage.evicted { + require.Equal(t, begin.Unix(), record.Begin) + require.Equal(t, evictAt.Unix(), record.End) + } storage.Unlock() require.ElementsMatch(t, []string{"digest1", "digest2"}, digests) From 7332d726e3fdf6515a8c506ca775df78b90f7bd4 Mon Sep 17 00:00:00 2001 From: nolouch Date: Mon, 25 May 2026 17:10:38 -0700 Subject: [PATCH 04/11] util/stmtsummary: avoid duplicated evicted persistence --- pkg/util/stmtsummary/v2/BUILD.bazel | 4 +- pkg/util/stmtsummary/v2/logger.go | 8 ++-- pkg/util/stmtsummary/v2/stmtsummary.go | 46 +++++++++++++-------- pkg/util/stmtsummary/v2/stmtsummary_test.go | 43 +++++++++++++++++++ 4 files changed, 79 insertions(+), 22 deletions(-) diff --git a/pkg/util/stmtsummary/v2/BUILD.bazel b/pkg/util/stmtsummary/v2/BUILD.bazel index 8e1c5a2554118..f71cd225ec121 100644 --- a/pkg/util/stmtsummary/v2/BUILD.bazel +++ b/pkg/util/stmtsummary/v2/BUILD.bazel @@ -49,7 +49,7 @@ go_test( ], embed = [":stmtsummary"], flaky = True, - shard_count = 16, + shard_count = 17, deps = [ "//pkg/meta/model", "//pkg/metrics", @@ -64,5 +64,7 @@ go_test( "@com_github_prometheus_client_model//go", "@com_github_stretchr_testify//require", "@org_uber_go_goleak//:goleak", + "@org_uber_go_zap//:zap", + "@org_uber_go_zap//zapcore", ], ) diff --git a/pkg/util/stmtsummary/v2/logger.go b/pkg/util/stmtsummary/v2/logger.go index 0355d18294123..204e63479527b 100644 --- a/pkg/util/stmtsummary/v2/logger.go +++ b/pkg/util/stmtsummary/v2/logger.go @@ -59,10 +59,10 @@ func (s *stmtLogStorage) persist(w *stmtWindow, end time.Time) { r.Unlock() } w.evicted.Lock() - if w.evicted.other.ExecCount > 0 { - w.evicted.other.Begin = begin - w.evicted.other.End = end.Unix() - s.log(w.evicted.other) + if w.evicted.unlogged.ExecCount > 0 { + w.evicted.unlogged.Begin = begin + w.evicted.unlogged.End = end.Unix() + s.log(w.evicted.unlogged) } w.evicted.Unlock() } diff --git a/pkg/util/stmtsummary/v2/stmtsummary.go b/pkg/util/stmtsummary/v2/stmtsummary.go index f488723d168aa..4a194478fdaf0 100644 --- a/pkg/util/stmtsummary/v2/stmtsummary.go +++ b/pkg/util/stmtsummary/v2/stmtsummary.go @@ -434,20 +434,22 @@ func (s *StmtSummary) rotate(now time.Time) { // Called while the record's lock is held (see newStmtWindow). We copy the // fields we need and hand the clone off to the async log goroutine. A // non-blocking send is used so the hot Add() path never stalls on log I/O. -func (s *StmtSummary) onEvict(_ *stmtsummary.StmtDigestKey, r *StmtRecord, begin, end time.Time) { +func (s *StmtSummary) onEvict(_ *stmtsummary.StmtDigestKey, r *StmtRecord, begin, end time.Time) bool { if !s.optPersistEvicted.Load() { - return + return false } if s.evictedCh == nil { - return + return false } clone := cloneRecordForLog(r) clone.Begin = begin.Unix() clone.End = end.Unix() select { case s.evictedCh <- clone: + return true default: s.evictedDropped.Add(1) + return false } } @@ -558,7 +560,7 @@ type stmtWindow struct { // onEvictFn is invoked for every LRU eviction after the evicted stats have // been aggregated into stmtWindow.evicted. The callback receives the locked // record (caller holds r.Lock) so it can copy fields cheaply. Must not block. -type onEvictFn func(key *stmtsummary.StmtDigestKey, r *StmtRecord, begin, end time.Time) +type onEvictFn func(key *stmtsummary.StmtDigestKey, r *StmtRecord, begin, end time.Time) bool func newStmtWindow(begin time.Time, capacity uint, onEvict onEvictFn) *stmtWindow { w := &stmtWindow{ @@ -572,10 +574,11 @@ func newStmtWindow(begin time.Time, capacity uint, onEvict onEvictFn) *stmtWindo r.Lock() defer r.Unlock() key := k.(*stmtsummary.StmtDigestKey) - w.evicted.add(key, r.StmtRecord) + logged := false if onEvict != nil { - onEvict(key, r.StmtRecord, w.begin, timeNow()) + logged = onEvict(key, r.StmtRecord, w.begin, timeNow()) } + w.evicted.add(key, r.StmtRecord, !logged) }) return w } @@ -597,24 +600,20 @@ type stmtStorage interface { type stmtEvicted struct { sync.Mutex - keys map[string]struct{} - other *StmtRecord + keys map[string]struct{} + other *StmtRecord + unlogged *StmtRecord } func newStmtEvicted() *stmtEvicted { return &stmtEvicted{ - keys: make(map[string]struct{}), - other: &StmtRecord{ - AuthUsers: make(map[string]struct{}), - MinLatency: time.Duration(math.MaxInt64), - BackoffTypes: make(map[string]int), - FirstSeen: time.Now(), - LastSeen: time.Now(), - }, + keys: make(map[string]struct{}), + other: newEvictedAggregateRecord(), + unlogged: newEvictedAggregateRecord(), } } -func (e *stmtEvicted) add(key *stmtsummary.StmtDigestKey, record *StmtRecord) { +func (e *stmtEvicted) add(key *stmtsummary.StmtDigestKey, record *StmtRecord, persistAsAggregate bool) { if key == nil || record == nil { return } @@ -622,6 +621,9 @@ func (e *stmtEvicted) add(key *stmtsummary.StmtDigestKey, record *StmtRecord) { defer e.Unlock() e.keys[string(key.Hash())] = struct{}{} e.other.Merge(record) + if persistAsAggregate { + e.unlogged.Merge(record) + } } func (e *stmtEvicted) count() int { @@ -630,6 +632,16 @@ func (e *stmtEvicted) count() int { return len(e.keys) } +func newEvictedAggregateRecord() *StmtRecord { + return &StmtRecord{ + AuthUsers: make(map[string]struct{}), + MinLatency: time.Duration(math.MaxInt64), + BackoffTypes: make(map[string]int), + FirstSeen: time.Now(), + LastSeen: time.Now(), + } +} + type lockedStmtRecord struct { sync.Mutex *StmtRecord diff --git a/pkg/util/stmtsummary/v2/stmtsummary_test.go b/pkg/util/stmtsummary/v2/stmtsummary_test.go index b9a0d22192420..149f45bdf805a 100644 --- a/pkg/util/stmtsummary/v2/stmtsummary_test.go +++ b/pkg/util/stmtsummary/v2/stmtsummary_test.go @@ -15,8 +15,10 @@ package stmtsummary import ( + "bytes" "encoding/json" "path/filepath" + "strings" "testing" "time" @@ -24,6 +26,8 @@ import ( "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) func readGaugeValue(t *testing.T, gauge prometheus.Gauge) float64 { @@ -135,6 +139,45 @@ func TestStmtSummaryPersistEvicted(t *testing.T) { }, 100*time.Millisecond, 10*time.Millisecond, "evicted count should remain 2 after disabling") } +func TestStmtSummaryPersistEvictedDoesNotPersistLoggedRecordsAsAggregate(t *testing.T) { + var logBuf bytes.Buffer + storage := &stmtLogStorage{ + logger: zap.New(zapcore.NewCore(&stmtLogEncoder{}, zapcore.AddSync(&logBuf), zapcore.InfoLevel)), + } + + ss := NewStmtSummary4Test(2) + ss.storage = storage + require.NoError(t, ss.SetPersistEvicted(true)) + + ss.Add(GenerateStmtExecInfo4Test("digest1")) + ss.Add(GenerateStmtExecInfo4Test("digest2")) + ss.Add(GenerateStmtExecInfo4Test("digest3")) // evicts digest1 + ss.Add(GenerateStmtExecInfo4Test("digest4")) // evicts digest2 + ss.Close() + + type loggedRecord struct { + Digest string `json:"digest"` + ExecCount int64 `json:"exec_count"` + Evicted bool `json:"evicted"` + } + + var totalExecCount int64 + evictedDigests := make([]string, 0, 2) + for _, line := range strings.Split(strings.TrimSpace(logBuf.String()), "\n") { + var record loggedRecord + require.NoError(t, json.Unmarshal([]byte(line), &record)) + totalExecCount += record.ExecCount + if record.Evicted { + evictedDigests = append(evictedDigests, record.Digest) + continue + } + require.NotEmpty(t, record.Digest, "logged evicted records should not also be persisted as the aggregate row") + } + + require.ElementsMatch(t, []string{"digest1", "digest2"}, evictedDigests) + require.Equal(t, int64(4), totalExecCount) +} + func TestWindowEvictedCountResetOnRotate(t *testing.T) { ss := NewStmtSummary4Test(2) defer ss.Close() From 3aec5f79df8183eb881445cbe15a8b255b23d447 Mon Sep 17 00:00:00 2001 From: nolouch Date: Mon, 25 May 2026 17:13:37 -0700 Subject: [PATCH 05/11] util/stmtsummary: clarify evicted persistence callback --- pkg/util/stmtsummary/v2/stmtsummary.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/util/stmtsummary/v2/stmtsummary.go b/pkg/util/stmtsummary/v2/stmtsummary.go index 4a194478fdaf0..5fd48e3a32d37 100644 --- a/pkg/util/stmtsummary/v2/stmtsummary.go +++ b/pkg/util/stmtsummary/v2/stmtsummary.go @@ -557,9 +557,11 @@ type stmtWindow struct { evictedCount atomic.Int64 // total number of LRU evictions in this window } -// onEvictFn is invoked for every LRU eviction after the evicted stats have -// been aggregated into stmtWindow.evicted. The callback receives the locked -// record (caller holds r.Lock) so it can copy fields cheaply. Must not block. +// onEvictFn is invoked for every LRU eviction. The callback receives the +// locked record (caller holds r.Lock) so it can copy fields cheaply. It +// returns true when the record has been handed off for per-record persistence, +// in which case the caller can skip adding it to the persisted aggregate. +// Must not block. type onEvictFn func(key *stmtsummary.StmtDigestKey, r *StmtRecord, begin, end time.Time) bool func newStmtWindow(begin time.Time, capacity uint, onEvict onEvictFn) *stmtWindow { From 7f7df2282e56c5356b3a6ecc84732eff6653591a Mon Sep 17 00:00:00 2001 From: nolouch Date: Mon, 25 May 2026 17:36:28 -0700 Subject: [PATCH 06/11] util/stmtsummary: clarify evicted channel comment --- pkg/util/stmtsummary/v2/stmtsummary.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/util/stmtsummary/v2/stmtsummary.go b/pkg/util/stmtsummary/v2/stmtsummary.go index 5fd48e3a32d37..683e9e5d694de 100644 --- a/pkg/util/stmtsummary/v2/stmtsummary.go +++ b/pkg/util/stmtsummary/v2/stmtsummary.go @@ -106,7 +106,7 @@ type StmtSummary struct { closed atomic.Bool // evictedCh carries per-record evictions to the async logger. - // A nil channel means evicted-logging is disabled. Sends are non-blocking. + // Eviction persistence is controlled by optPersistEvicted; sends are non-blocking. evictedCh chan *StmtRecord evictedDropped atomic.Uint64 } From 61529aea4aeda8fce19c2032c78ea5915b7be552 Mon Sep 17 00:00:00 2001 From: nolouch Date: Mon, 25 May 2026 18:28:51 -0700 Subject: [PATCH 07/11] util/stmtsummary: add evicted log metrics --- pkg/metrics/metrics.go | 1 + pkg/metrics/metrics_internal_test.go | 14 ++++++++++++++ pkg/metrics/stmtsummary.go | 16 ++++++++++++++++ pkg/util/stmtsummary/v2/logger.go | 7 +++++++ pkg/util/stmtsummary/v2/stmtsummary.go | 4 ++++ pkg/util/stmtsummary/v2/stmtsummary_test.go | 16 ++++++++++++++++ 6 files changed, 58 insertions(+) diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 70fe7be3209f6..d98c13bf2bdbd 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -403,6 +403,7 @@ func RegisterMetrics() { // StmtSummary prometheus.MustRegister(StmtSummaryWindowRecordCount) prometheus.MustRegister(StmtSummaryWindowEvictedCount) + prometheus.MustRegister(StmtSummaryEvictedLogCounter) // Channelz setupChannelzCollector() diff --git a/pkg/metrics/metrics_internal_test.go b/pkg/metrics/metrics_internal_test.go index 533ab770e3954..616aabe50074f 100644 --- a/pkg/metrics/metrics_internal_test.go +++ b/pkg/metrics/metrics_internal_test.go @@ -37,6 +37,13 @@ func readGaugeValue(t *testing.T, gauge prometheus.Gauge) float64 { return m.GetGauge().GetValue() } +func readCounterValue(t *testing.T, counter prometheus.Counter) float64 { + t.Helper() + m := &dto.Metric{} + require.NoError(t, counter.Write(m)) + return m.GetCounter().GetValue() +} + func countCollectedMetrics(collector prometheus.Collector) int { ch := make(chan prometheus.Metric, 16) collector.Collect(ch) @@ -53,6 +60,7 @@ func TestStmtSummaryMetricLabels(t *testing.T) { InitStmtSummaryMetrics() require.Equal(t, 0, countCollectedMetrics(StmtSummaryWindowRecordCount)) require.Equal(t, 0, countCollectedMetrics(StmtSummaryWindowEvictedCount)) + require.Equal(t, 0, countCollectedMetrics(StmtSummaryEvictedLogCounter)) SetStmtSummaryWindowMetrics(StmtSummaryTypeV1, 3, 1) require.Equal(t, 1, countCollectedMetrics(StmtSummaryWindowRecordCount)) @@ -65,6 +73,12 @@ func TestStmtSummaryMetricLabels(t *testing.T) { require.Equal(t, 2, countCollectedMetrics(StmtSummaryWindowEvictedCount)) require.Equal(t, 5.0, readGaugeValue(t, StmtSummaryWindowRecordCount.WithLabelValues(StmtSummaryTypeV2))) require.Equal(t, 2.0, readGaugeValue(t, StmtSummaryWindowEvictedCount.WithLabelValues(StmtSummaryTypeV2))) + + StmtSummaryEvictedLogCounter.WithLabelValues(StmtSummaryTypeV2, StmtSummaryEvictedLogResultPersisted).Add(3) + StmtSummaryEvictedLogCounter.WithLabelValues(StmtSummaryTypeV2, StmtSummaryEvictedLogResultDropped).Inc() + require.Equal(t, 2, countCollectedMetrics(StmtSummaryEvictedLogCounter)) + require.Equal(t, 3.0, readCounterValue(t, StmtSummaryEvictedLogCounter.WithLabelValues(StmtSummaryTypeV2, StmtSummaryEvictedLogResultPersisted))) + require.Equal(t, 1.0, readCounterValue(t, StmtSummaryEvictedLogCounter.WithLabelValues(StmtSummaryTypeV2, StmtSummaryEvictedLogResultDropped))) } func TestGrpcChannelzCollectorSingleton(t *testing.T) { diff --git a/pkg/metrics/stmtsummary.go b/pkg/metrics/stmtsummary.go index cd5306dac1760..28edda4c91041 100644 --- a/pkg/metrics/stmtsummary.go +++ b/pkg/metrics/stmtsummary.go @@ -27,6 +27,11 @@ const ( StmtSummaryTypeV1 = "v1" // StmtSummaryTypeV2 marks metrics reported by the persistent statement summary implementation. StmtSummaryTypeV2 = "v2" + + // StmtSummaryEvictedLogResultPersisted marks evicted records submitted to the stmt log. + StmtSummaryEvictedLogResultPersisted = "persisted" + // StmtSummaryEvictedLogResultDropped marks evicted records dropped before reaching the stmt log. + StmtSummaryEvictedLogResultDropped = "dropped" ) var ( @@ -39,6 +44,9 @@ var ( // This value resets to 0 when the window rotates. StmtSummaryWindowEvictedCount *prometheus.GaugeVec + // StmtSummaryEvictedLogCounter counts v2 evicted-log persistence outcomes. + StmtSummaryEvictedLogCounter *prometheus.CounterVec + stmtSummaryWindowRecordCountV1 prometheus.Gauge stmtSummaryWindowRecordCountV2 prometheus.Gauge stmtSummaryWindowEvictedCountV1 prometheus.Gauge @@ -65,6 +73,14 @@ func InitStmtSummaryMetrics() { Help: "The number of LRU evictions in the current statement summary window.", }, []string{LblType}) + StmtSummaryEvictedLogCounter = metricscommon.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "stmt_summary", + Name: "evicted_log_total", + Help: "The number of v2 statement summary evicted-log records by result.", + }, []string{LblType, LblResult}) + stmtSummaryWindowMetricsMu.Lock() stmtSummaryWindowRecordCountV1 = nil stmtSummaryWindowRecordCountV2 = nil diff --git a/pkg/util/stmtsummary/v2/logger.go b/pkg/util/stmtsummary/v2/logger.go index 204e63479527b..8e17da07dcdc4 100644 --- a/pkg/util/stmtsummary/v2/logger.go +++ b/pkg/util/stmtsummary/v2/logger.go @@ -21,6 +21,7 @@ import ( "time" "github.com/pingcap/log" + "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/util/logutil" "go.uber.org/zap" "go.uber.org/zap/buffer" @@ -76,6 +77,7 @@ func (s *stmtLogStorage) sync() error { // from rotated-window records. func (s *stmtLogStorage) logEvicted(records []*StmtRecord) { var builder strings.Builder + persisted := 0 for _, r := range records { b, err := json.Marshal(evictedStmtRecord{StmtRecord: r, Evicted: true}) if err != nil { @@ -86,11 +88,16 @@ func (s *stmtLogStorage) logEvicted(records []*StmtRecord) { builder.WriteByte('\n') } _, _ = builder.Write(b) + persisted++ } if builder.Len() == 0 { return } s.logger.Info(builder.String()) + metrics.StmtSummaryEvictedLogCounter.WithLabelValues( + metrics.StmtSummaryTypeV2, + metrics.StmtSummaryEvictedLogResultPersisted, + ).Add(float64(persisted)) } // evictedStmtRecord embeds *StmtRecord and adds an "evicted" JSON tag. diff --git a/pkg/util/stmtsummary/v2/stmtsummary.go b/pkg/util/stmtsummary/v2/stmtsummary.go index 683e9e5d694de..b7efe7b8e5ff7 100644 --- a/pkg/util/stmtsummary/v2/stmtsummary.go +++ b/pkg/util/stmtsummary/v2/stmtsummary.go @@ -449,6 +449,10 @@ func (s *StmtSummary) onEvict(_ *stmtsummary.StmtDigestKey, r *StmtRecord, begin return true default: s.evictedDropped.Add(1) + metrics.StmtSummaryEvictedLogCounter.WithLabelValues( + metrics.StmtSummaryTypeV2, + metrics.StmtSummaryEvictedLogResultDropped, + ).Inc() return false } } diff --git a/pkg/util/stmtsummary/v2/stmtsummary_test.go b/pkg/util/stmtsummary/v2/stmtsummary_test.go index 149f45bdf805a..29279f972b2aa 100644 --- a/pkg/util/stmtsummary/v2/stmtsummary_test.go +++ b/pkg/util/stmtsummary/v2/stmtsummary_test.go @@ -37,6 +37,13 @@ func readGaugeValue(t *testing.T, gauge prometheus.Gauge) float64 { return m.GetGauge().GetValue() } +func readCounterValue(t *testing.T, counter prometheus.Counter) float64 { + t.Helper() + m := &dto.Metric{} + require.NoError(t, counter.Write(m)) + return m.GetCounter().GetValue() +} + func TestStmtWindow(t *testing.T) { ss := NewStmtSummary4Test(5) defer ss.Close() @@ -153,7 +160,16 @@ func TestStmtSummaryPersistEvictedDoesNotPersistLoggedRecordsAsAggregate(t *test ss.Add(GenerateStmtExecInfo4Test("digest2")) ss.Add(GenerateStmtExecInfo4Test("digest3")) // evicts digest1 ss.Add(GenerateStmtExecInfo4Test("digest4")) // evicts digest2 + persistedBefore := readCounterValue(t, metrics.StmtSummaryEvictedLogCounter.WithLabelValues( + metrics.StmtSummaryTypeV2, + metrics.StmtSummaryEvictedLogResultPersisted, + )) ss.Close() + persistedAfter := readCounterValue(t, metrics.StmtSummaryEvictedLogCounter.WithLabelValues( + metrics.StmtSummaryTypeV2, + metrics.StmtSummaryEvictedLogResultPersisted, + )) + require.Equal(t, 2.0, persistedAfter-persistedBefore) type loggedRecord struct { Digest string `json:"digest"` From ad92e061dfba22642caf39ef7ecf1f50bbe0fc13 Mon Sep 17 00:00:00 2001 From: nolouch Date: Mon, 25 May 2026 23:16:57 -0700 Subject: [PATCH 08/11] util/stmtsummary: clarify evicted aggregate naming --- pkg/util/stmtsummary/v2/logger.go | 8 +++--- pkg/util/stmtsummary/v2/reader.go | 10 ++++---- pkg/util/stmtsummary/v2/stmtsummary.go | 28 +++++++++++---------- pkg/util/stmtsummary/v2/stmtsummary_test.go | 6 ++--- 4 files changed, 27 insertions(+), 25 deletions(-) diff --git a/pkg/util/stmtsummary/v2/logger.go b/pkg/util/stmtsummary/v2/logger.go index 8e17da07dcdc4..3c30a7fd6da94 100644 --- a/pkg/util/stmtsummary/v2/logger.go +++ b/pkg/util/stmtsummary/v2/logger.go @@ -60,10 +60,10 @@ func (s *stmtLogStorage) persist(w *stmtWindow, end time.Time) { r.Unlock() } w.evicted.Lock() - if w.evicted.unlogged.ExecCount > 0 { - w.evicted.unlogged.Begin = begin - w.evicted.unlogged.End = end.Unix() - s.log(w.evicted.unlogged) + if w.evicted.persistFallback.ExecCount > 0 { + w.evicted.persistFallback.Begin = begin + w.evicted.persistFallback.End = end.Unix() + s.log(w.evicted.persistFallback) } w.evicted.Unlock() } diff --git a/pkg/util/stmtsummary/v2/reader.go b/pkg/util/stmtsummary/v2/reader.go index 498c9c7437238..d0caef20ea933 100644 --- a/pkg/util/stmtsummary/v2/reader.go +++ b/pkg/util/stmtsummary/v2/reader.go @@ -129,17 +129,17 @@ func (r *MemReader) Rows() [][]types.Datum { func() { evicted.Lock() defer evicted.Unlock() - if evicted.other.ExecCount == 0 { + if evicted.inMemoryAggregate.ExecCount == 0 { return } - if !r.checker.hasPrivilege(evicted.other.AuthUsers) { + if !r.checker.hasPrivilege(evicted.inMemoryAggregate.AuthUsers) { return } - evicted.other.Begin = w.begin.Unix() - evicted.other.End = end + evicted.inMemoryAggregate.Begin = w.begin.Unix() + evicted.inMemoryAggregate.End = end row := make([]types.Datum, len(r.columnFactories)) for i, factory := range r.columnFactories { - row[i] = types.NewDatum(factory(r, evicted.other)) + row[i] = types.NewDatum(factory(r, evicted.inMemoryAggregate)) } rows = append(rows, row) }() diff --git a/pkg/util/stmtsummary/v2/stmtsummary.go b/pkg/util/stmtsummary/v2/stmtsummary.go index b7efe7b8e5ff7..3a08554000f37 100644 --- a/pkg/util/stmtsummary/v2/stmtsummary.go +++ b/pkg/util/stmtsummary/v2/stmtsummary.go @@ -580,11 +580,11 @@ func newStmtWindow(begin time.Time, capacity uint, onEvict onEvictFn) *stmtWindo r.Lock() defer r.Unlock() key := k.(*stmtsummary.StmtDigestKey) - logged := false + queuedForEvictedLog := false if onEvict != nil { - logged = onEvict(key, r.StmtRecord, w.begin, timeNow()) + queuedForEvictedLog = onEvict(key, r.StmtRecord, w.begin, timeNow()) } - w.evicted.add(key, r.StmtRecord, !logged) + w.evicted.add(key, r.StmtRecord, queuedForEvictedLog) }) return w } @@ -606,29 +606,31 @@ type stmtStorage interface { type stmtEvicted struct { sync.Mutex - keys map[string]struct{} - other *StmtRecord - unlogged *StmtRecord + keys map[string]struct{} + // inMemoryAggregate contains all evicted records in the current in-memory window. + inMemoryAggregate *StmtRecord + // persistFallback contains only records not already queued to the per-record evicted log. + persistFallback *StmtRecord } func newStmtEvicted() *stmtEvicted { return &stmtEvicted{ - keys: make(map[string]struct{}), - other: newEvictedAggregateRecord(), - unlogged: newEvictedAggregateRecord(), + keys: make(map[string]struct{}), + inMemoryAggregate: newEvictedAggregateRecord(), + persistFallback: newEvictedAggregateRecord(), } } -func (e *stmtEvicted) add(key *stmtsummary.StmtDigestKey, record *StmtRecord, persistAsAggregate bool) { +func (e *stmtEvicted) add(key *stmtsummary.StmtDigestKey, record *StmtRecord, queuedForEvictedLog bool) { if key == nil || record == nil { return } e.Lock() defer e.Unlock() e.keys[string(key.Hash())] = struct{}{} - e.other.Merge(record) - if persistAsAggregate { - e.unlogged.Merge(record) + e.inMemoryAggregate.Merge(record) + if !queuedForEvictedLog { + e.persistFallback.Merge(record) } } diff --git a/pkg/util/stmtsummary/v2/stmtsummary_test.go b/pkg/util/stmtsummary/v2/stmtsummary_test.go index 29279f972b2aa..d810ad0bfcf96 100644 --- a/pkg/util/stmtsummary/v2/stmtsummary_test.go +++ b/pkg/util/stmtsummary/v2/stmtsummary_test.go @@ -58,14 +58,14 @@ func TestStmtWindow(t *testing.T) { ss.Add(GenerateStmtExecInfo4Test("digest7")) require.Equal(t, 5, ss.window.lru.Size()) require.Equal(t, 2, ss.window.evicted.count()) - require.Equal(t, int64(4), ss.window.evicted.other.ExecCount) // digest1 digest1 digest2 digest2 + require.Equal(t, int64(4), ss.window.evicted.inMemoryAggregate.ExecCount) // digest1 digest1 digest2 digest2 require.Equal(t, int64(2), ss.window.evictedCount.Load()) - _, err := json.Marshal(ss.window.evicted.other) + _, err := json.Marshal(ss.window.evicted.inMemoryAggregate) require.NoError(t, err) ss.Clear() require.Equal(t, 0, ss.window.lru.Size()) require.Equal(t, 0, ss.window.evicted.count()) - require.Equal(t, int64(0), ss.window.evicted.other.ExecCount) + require.Equal(t, int64(0), ss.window.evicted.inMemoryAggregate.ExecCount) require.Equal(t, int64(0), ss.window.evictedCount.Load()) } From fd2caabfaaf0022d2e10af2f7060967147789721 Mon Sep 17 00:00:00 2001 From: nolouch Date: Thu, 28 May 2026 01:32:23 -0700 Subject: [PATCH 09/11] util/stmtsummary: clarify persisted evicted aggregate --- pkg/util/stmtsummary/v2/logger.go | 8 ++++---- pkg/util/stmtsummary/v2/reader.go | 10 +++++----- pkg/util/stmtsummary/v2/stmtsummary.go | 19 ++++++++++--------- pkg/util/stmtsummary/v2/stmtsummary_test.go | 6 +++--- 4 files changed, 22 insertions(+), 21 deletions(-) diff --git a/pkg/util/stmtsummary/v2/logger.go b/pkg/util/stmtsummary/v2/logger.go index 3c30a7fd6da94..8a4f013f2ba3f 100644 --- a/pkg/util/stmtsummary/v2/logger.go +++ b/pkg/util/stmtsummary/v2/logger.go @@ -60,10 +60,10 @@ func (s *stmtLogStorage) persist(w *stmtWindow, end time.Time) { r.Unlock() } w.evicted.Lock() - if w.evicted.persistFallback.ExecCount > 0 { - w.evicted.persistFallback.Begin = begin - w.evicted.persistFallback.End = end.Unix() - s.log(w.evicted.persistFallback) + if w.evicted.otherForPersist.ExecCount > 0 { + w.evicted.otherForPersist.Begin = begin + w.evicted.otherForPersist.End = end.Unix() + s.log(w.evicted.otherForPersist) } w.evicted.Unlock() } diff --git a/pkg/util/stmtsummary/v2/reader.go b/pkg/util/stmtsummary/v2/reader.go index d0caef20ea933..498c9c7437238 100644 --- a/pkg/util/stmtsummary/v2/reader.go +++ b/pkg/util/stmtsummary/v2/reader.go @@ -129,17 +129,17 @@ func (r *MemReader) Rows() [][]types.Datum { func() { evicted.Lock() defer evicted.Unlock() - if evicted.inMemoryAggregate.ExecCount == 0 { + if evicted.other.ExecCount == 0 { return } - if !r.checker.hasPrivilege(evicted.inMemoryAggregate.AuthUsers) { + if !r.checker.hasPrivilege(evicted.other.AuthUsers) { return } - evicted.inMemoryAggregate.Begin = w.begin.Unix() - evicted.inMemoryAggregate.End = end + evicted.other.Begin = w.begin.Unix() + evicted.other.End = end row := make([]types.Datum, len(r.columnFactories)) for i, factory := range r.columnFactories { - row[i] = types.NewDatum(factory(r, evicted.inMemoryAggregate)) + row[i] = types.NewDatum(factory(r, evicted.other)) } rows = append(rows, row) }() diff --git a/pkg/util/stmtsummary/v2/stmtsummary.go b/pkg/util/stmtsummary/v2/stmtsummary.go index 3a08554000f37..3c207750cfa5c 100644 --- a/pkg/util/stmtsummary/v2/stmtsummary.go +++ b/pkg/util/stmtsummary/v2/stmtsummary.go @@ -607,17 +607,18 @@ type stmtStorage interface { type stmtEvicted struct { sync.Mutex keys map[string]struct{} - // inMemoryAggregate contains all evicted records in the current in-memory window. - inMemoryAggregate *StmtRecord - // persistFallback contains only records not already queued to the per-record evicted log. - persistFallback *StmtRecord + // other contains all evicted records in the current window. + other *StmtRecord + // otherForPersist contains records not covered by per-record evicted logs. + // When per-record evicted logging is disabled, it is equivalent to other. + otherForPersist *StmtRecord } func newStmtEvicted() *stmtEvicted { return &stmtEvicted{ - keys: make(map[string]struct{}), - inMemoryAggregate: newEvictedAggregateRecord(), - persistFallback: newEvictedAggregateRecord(), + keys: make(map[string]struct{}), + other: newEvictedAggregateRecord(), + otherForPersist: newEvictedAggregateRecord(), } } @@ -628,9 +629,9 @@ func (e *stmtEvicted) add(key *stmtsummary.StmtDigestKey, record *StmtRecord, qu e.Lock() defer e.Unlock() e.keys[string(key.Hash())] = struct{}{} - e.inMemoryAggregate.Merge(record) + e.other.Merge(record) if !queuedForEvictedLog { - e.persistFallback.Merge(record) + e.otherForPersist.Merge(record) } } diff --git a/pkg/util/stmtsummary/v2/stmtsummary_test.go b/pkg/util/stmtsummary/v2/stmtsummary_test.go index d810ad0bfcf96..29279f972b2aa 100644 --- a/pkg/util/stmtsummary/v2/stmtsummary_test.go +++ b/pkg/util/stmtsummary/v2/stmtsummary_test.go @@ -58,14 +58,14 @@ func TestStmtWindow(t *testing.T) { ss.Add(GenerateStmtExecInfo4Test("digest7")) require.Equal(t, 5, ss.window.lru.Size()) require.Equal(t, 2, ss.window.evicted.count()) - require.Equal(t, int64(4), ss.window.evicted.inMemoryAggregate.ExecCount) // digest1 digest1 digest2 digest2 + require.Equal(t, int64(4), ss.window.evicted.other.ExecCount) // digest1 digest1 digest2 digest2 require.Equal(t, int64(2), ss.window.evictedCount.Load()) - _, err := json.Marshal(ss.window.evicted.inMemoryAggregate) + _, err := json.Marshal(ss.window.evicted.other) require.NoError(t, err) ss.Clear() require.Equal(t, 0, ss.window.lru.Size()) require.Equal(t, 0, ss.window.evicted.count()) - require.Equal(t, int64(0), ss.window.evicted.inMemoryAggregate.ExecCount) + require.Equal(t, int64(0), ss.window.evicted.other.ExecCount) require.Equal(t, int64(0), ss.window.evictedCount.Load()) } From 9d34ad11d259a646a8ad7c36db97ccd716c0faae Mon Sep 17 00:00:00 2001 From: nolouch Date: Tue, 2 Jun 2026 00:56:42 -0700 Subject: [PATCH 10/11] util/stmtsummary: filter evicted records from history reader --- pkg/util/stmtsummary/v2/reader.go | 21 ++++++++++++++++----- pkg/util/stmtsummary/v2/reader_test.go | 2 ++ 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/pkg/util/stmtsummary/v2/reader.go b/pkg/util/stmtsummary/v2/reader.go index 498c9c7437238..589fc4aa0a59f 100644 --- a/pkg/util/stmtsummary/v2/reader.go +++ b/pkg/util/stmtsummary/v2/reader.go @@ -448,6 +448,11 @@ type stmtTinyRecord struct { End int64 `json:"end"` } +type stmtPersistedRecord struct { + StmtRecord + Evicted bool `json:"evicted"` +} + type stmtFile struct { file *os.File begin int64 @@ -757,11 +762,14 @@ func (w *stmtParseWorker) handleLines( rows := make([][]types.Datum, 0, len(lines)) for _, line := range lines { - record, err := w.parse(line) + record, skipped, err := w.parse(line) if err != nil { // ignore invalid lines continue } + if skipped { + continue + } if w.needStop(record) { break @@ -790,12 +798,15 @@ func (w *stmtParseWorker) putRows( } } -func (*stmtParseWorker) parse(raw []byte) (*StmtRecord, error) { - var record StmtRecord +func (*stmtParseWorker) parse(raw []byte) (*StmtRecord, bool, error) { + var record stmtPersistedRecord if err := json.Unmarshal(raw, &record); err != nil { - return nil, err + return nil, false, err } - return &record, nil + if record.Evicted { + return nil, true, nil + } + return &record.StmtRecord, false, nil } func (w *stmtParseWorker) needStop(record *StmtRecord) bool { diff --git a/pkg/util/stmtsummary/v2/reader_test.go b/pkg/util/stmtsummary/v2/reader_test.go index 906fc69d2d2a0..6f24b1c3422fe 100644 --- a/pkg/util/stmtsummary/v2/reader_test.go +++ b/pkg/util/stmtsummary/v2/reader_test.go @@ -266,6 +266,8 @@ func TestHistoryReader(t *testing.T) { require.NoError(t, err) _, err = file.WriteString("{\"begin\":1672129270,\"end\":1672129280,\"digest\":\"digest2\",\"exec_count\":20}\n") require.NoError(t, err) + _, err = file.WriteString("{\"begin\":1672129270,\"end\":1672129280,\"digest\":\"evicted_digest\",\"exec_count\":99,\"evicted\":true}\n") + require.NoError(t, err) require.NoError(t, file.Close()) file, err = os.Create(filename2) From cc36a907158d20e06123cf360a3dc40fc764362e Mon Sep 17 00:00:00 2001 From: nolouch Date: Tue, 2 Jun 2026 01:34:08 -0700 Subject: [PATCH 11/11] util/stmtsummary: clarify stmt record marshal helpers --- pkg/util/stmtsummary/v2/logger.go | 14 +++++++++++--- pkg/util/stmtsummary/v2/record_test.go | 4 ++-- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/pkg/util/stmtsummary/v2/logger.go b/pkg/util/stmtsummary/v2/logger.go index 4d3ea15220c01..028b20ed11fc9 100644 --- a/pkg/util/stmtsummary/v2/logger.go +++ b/pkg/util/stmtsummary/v2/logger.go @@ -80,7 +80,7 @@ func (s *stmtLogStorage) logEvicted(records []*StmtRecord) { var builder strings.Builder persisted := 0 for _, r := range records { - b, err := marshalStmtRecord(r, true) + b, err := marshalEvictedStmtRecord(r) if err != nil { logutil.BgLogger().Warn("failed to marshal evicted statement summary", zap.Error(err)) continue @@ -110,7 +110,7 @@ type evictedStmtRecord struct { } func (s *stmtLogStorage) log(r *StmtRecord) { - b, err := marshalStmtRecord(r, false) + b, err := marshalStmtRecord(r) if err != nil { logutil.BgLogger().Warn("failed to marshal statement summary", zap.Error(err)) return @@ -118,7 +118,15 @@ func (s *stmtLogStorage) log(r *StmtRecord) { s.logger.Info(string(b)) } -func marshalStmtRecord(r *StmtRecord, evicted bool) ([]byte, error) { +func marshalStmtRecord(r *StmtRecord) ([]byte, error) { + return marshalStmtRecordWithEvicted(r, false) +} + +func marshalEvictedStmtRecord(r *StmtRecord) ([]byte, error) { + return marshalStmtRecordWithEvicted(r, true) +} + +func marshalStmtRecordWithEvicted(r *StmtRecord, evicted bool) ([]byte, error) { fields := config.GetGlobalConfig().GetKeyspaceObservabilityStmtLogFields() if len(fields) == 0 { if evicted { diff --git a/pkg/util/stmtsummary/v2/record_test.go b/pkg/util/stmtsummary/v2/record_test.go index d96605a840274..d35d3807a970f 100644 --- a/pkg/util/stmtsummary/v2/record_test.go +++ b/pkg/util/stmtsummary/v2/record_test.go @@ -97,14 +97,14 @@ func TestStmtRecord(t *testing.T) { } require.NoError(t, conf.ResolveKeyspaceObservability(map[string]string{"meta_a": "value_a"})) }) - b, err := marshalStmtRecord(record2, false) + b, err := marshalStmtRecord(record2) require.NoError(t, err) items := make(map[string]any) require.NoError(t, json.Unmarshal(b, &items)) require.Equal(t, map[string]any{"stmt_meta_a": "value_a"}, items["additional_fields"]) require.Equal(t, record2.Digest, items["digest"]) - b, err = marshalStmtRecord(record2, true) + b, err = marshalEvictedStmtRecord(record2) require.NoError(t, err) items = make(map[string]any) require.NoError(t, json.Unmarshal(b, &items))