diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 70fe7be3209f6..d98c13bf2bdbd 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -403,6 +403,7 @@ func RegisterMetrics() { // StmtSummary prometheus.MustRegister(StmtSummaryWindowRecordCount) prometheus.MustRegister(StmtSummaryWindowEvictedCount) + prometheus.MustRegister(StmtSummaryEvictedLogCounter) // Channelz setupChannelzCollector() diff --git a/pkg/metrics/metrics_internal_test.go b/pkg/metrics/metrics_internal_test.go index 533ab770e3954..616aabe50074f 100644 --- a/pkg/metrics/metrics_internal_test.go +++ b/pkg/metrics/metrics_internal_test.go @@ -37,6 +37,13 @@ func readGaugeValue(t *testing.T, gauge prometheus.Gauge) float64 { return m.GetGauge().GetValue() } +func readCounterValue(t *testing.T, counter prometheus.Counter) float64 { + t.Helper() + m := &dto.Metric{} + require.NoError(t, counter.Write(m)) + return m.GetCounter().GetValue() +} + func countCollectedMetrics(collector prometheus.Collector) int { ch := make(chan prometheus.Metric, 16) collector.Collect(ch) @@ -53,6 +60,7 @@ func TestStmtSummaryMetricLabels(t *testing.T) { InitStmtSummaryMetrics() require.Equal(t, 0, countCollectedMetrics(StmtSummaryWindowRecordCount)) require.Equal(t, 0, countCollectedMetrics(StmtSummaryWindowEvictedCount)) + require.Equal(t, 0, countCollectedMetrics(StmtSummaryEvictedLogCounter)) SetStmtSummaryWindowMetrics(StmtSummaryTypeV1, 3, 1) require.Equal(t, 1, countCollectedMetrics(StmtSummaryWindowRecordCount)) @@ -65,6 +73,12 @@ func TestStmtSummaryMetricLabels(t *testing.T) { require.Equal(t, 2, countCollectedMetrics(StmtSummaryWindowEvictedCount)) require.Equal(t, 5.0, readGaugeValue(t, StmtSummaryWindowRecordCount.WithLabelValues(StmtSummaryTypeV2))) require.Equal(t, 2.0, readGaugeValue(t, StmtSummaryWindowEvictedCount.WithLabelValues(StmtSummaryTypeV2))) + + StmtSummaryEvictedLogCounter.WithLabelValues(StmtSummaryTypeV2, StmtSummaryEvictedLogResultPersisted).Add(3) + StmtSummaryEvictedLogCounter.WithLabelValues(StmtSummaryTypeV2, StmtSummaryEvictedLogResultDropped).Inc() + require.Equal(t, 2, countCollectedMetrics(StmtSummaryEvictedLogCounter)) + require.Equal(t, 3.0, readCounterValue(t, StmtSummaryEvictedLogCounter.WithLabelValues(StmtSummaryTypeV2, StmtSummaryEvictedLogResultPersisted))) + require.Equal(t, 1.0, readCounterValue(t, StmtSummaryEvictedLogCounter.WithLabelValues(StmtSummaryTypeV2, StmtSummaryEvictedLogResultDropped))) } func TestGrpcChannelzCollectorSingleton(t *testing.T) { diff --git a/pkg/metrics/stmtsummary.go b/pkg/metrics/stmtsummary.go index cd5306dac1760..28edda4c91041 100644 --- a/pkg/metrics/stmtsummary.go +++ b/pkg/metrics/stmtsummary.go @@ -27,6 +27,11 @@ const ( StmtSummaryTypeV1 = "v1" // StmtSummaryTypeV2 marks metrics reported by the persistent statement summary implementation. StmtSummaryTypeV2 = "v2" + + // StmtSummaryEvictedLogResultPersisted marks evicted records submitted to the stmt log. + StmtSummaryEvictedLogResultPersisted = "persisted" + // StmtSummaryEvictedLogResultDropped marks evicted records dropped before reaching the stmt log. + StmtSummaryEvictedLogResultDropped = "dropped" ) var ( @@ -39,6 +44,9 @@ var ( // This value resets to 0 when the window rotates. StmtSummaryWindowEvictedCount *prometheus.GaugeVec + // StmtSummaryEvictedLogCounter counts v2 evicted-log persistence outcomes. + StmtSummaryEvictedLogCounter *prometheus.CounterVec + stmtSummaryWindowRecordCountV1 prometheus.Gauge stmtSummaryWindowRecordCountV2 prometheus.Gauge stmtSummaryWindowEvictedCountV1 prometheus.Gauge @@ -65,6 +73,14 @@ func InitStmtSummaryMetrics() { Help: "The number of LRU evictions in the current statement summary window.", }, []string{LblType}) + StmtSummaryEvictedLogCounter = metricscommon.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "stmt_summary", + Name: "evicted_log_total", + Help: "The number of v2 statement summary evicted-log records by result.", + }, []string{LblType, LblResult}) + stmtSummaryWindowMetricsMu.Lock() stmtSummaryWindowRecordCountV1 = nil stmtSummaryWindowRecordCountV2 = nil diff --git a/pkg/sessionctx/vardef/tidb_vars.go b/pkg/sessionctx/vardef/tidb_vars.go index 32e3adcbd59f1..20d0444e07265 100644 --- a/pkg/sessionctx/vardef/tidb_vars.go +++ b/pkg/sessionctx/vardef/tidb_vars.go @@ -721,6 +721,11 @@ const ( // TiDBStmtSummaryMaxSQLLength indicates the max length of displayed normalized sql and sample sql. TiDBStmtSummaryMaxSQLLength = "tidb_stmt_summary_max_sql_length" + // TiDBStmtSummaryPersistEvicted controls whether per-record LRU evictions + // in the v2 (persistent) statement summary are persisted to the stmt log. + // Off by default because it adds log volume proportional to eviction rate. + TiDBStmtSummaryPersistEvicted = "tidb_stmt_summary_persist_evicted" + // TiDBStmtSummaryGroupByUser, when enabled, adds the executing user to the // statement summary grouping key so the same digest run by different users // produces separate rows. Off by default to avoid cardinality growth. @@ -1627,6 +1632,7 @@ const ( DefTiDBStmtSummaryHistorySize = 24 DefTiDBStmtSummaryMaxStmtCount = 3000 DefTiDBStmtSummaryMaxSQLLength = 32768 + DefTiDBStmtSummaryPersistEvicted = false DefTiDBStmtSummaryGroupByUser = false DefTiDBCapturePlanBaseline = Off DefTiDBIgnoreInlistPlanDigest = true diff --git a/pkg/sessionctx/variable/sysvar.go b/pkg/sessionctx/variable/sysvar.go index 4480c9827a573..cc4c97f600e67 100644 --- a/pkg/sessionctx/variable/sysvar.go +++ b/pkg/sessionctx/variable/sysvar.go @@ -953,6 +953,10 @@ var defaultSysVars = []*SysVar{ SetGlobal: func(_ context.Context, s *SessionVars, val string) error { return stmtsummaryv2.SetMaxSQLLength(TidbOptInt(val, vardef.DefTiDBStmtSummaryMaxSQLLength)) }}, + {Scope: vardef.ScopeGlobal, Name: vardef.TiDBStmtSummaryPersistEvicted, Value: BoolToOnOff(vardef.DefTiDBStmtSummaryPersistEvicted), Type: vardef.TypeBool, AllowEmpty: true, + SetGlobal: func(_ context.Context, s *SessionVars, val string) error { + return stmtsummaryv2.SetPersistEvicted(TiDBOptOn(val)) + }}, {Scope: vardef.ScopeGlobal, Name: vardef.TiDBStmtSummaryGroupByUser, Value: BoolToOnOff(vardef.DefTiDBStmtSummaryGroupByUser), Type: vardef.TypeBool, AllowEmpty: true, SetGlobal: func(_ context.Context, s *SessionVars, val string) error { return stmtsummaryv2.SetGroupByUser(TiDBOptOn(val)) diff --git a/pkg/util/stmtsummary/v2/BUILD.bazel b/pkg/util/stmtsummary/v2/BUILD.bazel index 0e66ad00b0ab7..2ad5781b7b0b9 100644 --- a/pkg/util/stmtsummary/v2/BUILD.bazel +++ b/pkg/util/stmtsummary/v2/BUILD.bazel @@ -49,7 +49,7 @@ go_test( ], embed = [":stmtsummary"], flaky = True, - shard_count = 16, + shard_count = 18, deps = [ "//pkg/config", "//pkg/meta/model", @@ -65,5 +65,7 @@ go_test( "@com_github_prometheus_client_model//go", "@com_github_stretchr_testify//require", "@org_uber_go_goleak//:goleak", + "@org_uber_go_zap//:zap", + "@org_uber_go_zap//zapcore", ], ) diff --git a/pkg/util/stmtsummary/v2/logger.go b/pkg/util/stmtsummary/v2/logger.go index 629828d8e0307..028b20ed11fc9 100644 --- a/pkg/util/stmtsummary/v2/logger.go +++ b/pkg/util/stmtsummary/v2/logger.go @@ -17,10 +17,12 @@ package stmtsummary import ( "encoding/json" "fmt" + "strings" "time" "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/util/logutil" "go.uber.org/zap" "go.uber.org/zap/buffer" @@ -59,10 +61,10 @@ func (s *stmtLogStorage) persist(w *stmtWindow, end time.Time) { r.Unlock() } w.evicted.Lock() - if w.evicted.other.ExecCount > 0 { - w.evicted.other.Begin = begin - w.evicted.other.End = end.Unix() - s.log(w.evicted.other) + if w.evicted.otherForPersist.ExecCount > 0 { + w.evicted.otherForPersist.Begin = begin + w.evicted.otherForPersist.End = end.Unix() + s.log(w.evicted.otherForPersist) } w.evicted.Unlock() } @@ -71,6 +73,42 @@ func (s *stmtLogStorage) sync() error { return s.logger.Sync() } +// logEvicted writes evicted records to the stmt log with an `"evicted":true` +// marker so downstream consumers can distinguish per-record eviction events +// from rotated-window records. +func (s *stmtLogStorage) logEvicted(records []*StmtRecord) { + var builder strings.Builder + persisted := 0 + for _, r := range records { + b, err := marshalEvictedStmtRecord(r) + if err != nil { + logutil.BgLogger().Warn("failed to marshal evicted statement summary", zap.Error(err)) + continue + } + if builder.Len() > 0 { + builder.WriteByte('\n') + } + _, _ = builder.Write(b) + persisted++ + } + if builder.Len() == 0 { + return + } + s.logger.Info(builder.String()) + metrics.StmtSummaryEvictedLogCounter.WithLabelValues( + metrics.StmtSummaryTypeV2, + metrics.StmtSummaryEvictedLogResultPersisted, + ).Add(float64(persisted)) +} + +// evictedStmtRecord embeds *StmtRecord and adds an "evicted" JSON tag. +// Keeping the embedded pointer means the JSON field order matches StmtRecord +// and parsers tolerant of the extra field work unchanged. +type evictedStmtRecord struct { + *StmtRecord + Evicted bool `json:"evicted"` +} + func (s *stmtLogStorage) log(r *StmtRecord) { b, err := marshalStmtRecord(r) if err != nil { @@ -81,10 +119,28 @@ func (s *stmtLogStorage) log(r *StmtRecord) { } func marshalStmtRecord(r *StmtRecord) ([]byte, error) { + return marshalStmtRecordWithEvicted(r, false) +} + +func marshalEvictedStmtRecord(r *StmtRecord) ([]byte, error) { + return marshalStmtRecordWithEvicted(r, true) +} + +func marshalStmtRecordWithEvicted(r *StmtRecord, evicted bool) ([]byte, error) { fields := config.GetGlobalConfig().GetKeyspaceObservabilityStmtLogFields() if len(fields) == 0 { + if evicted { + return json.Marshal(evictedStmtRecord{StmtRecord: r, Evicted: true}) + } return json.Marshal(r) } + if evicted { + return json.Marshal(evictedStmtRecordWithAdditionalFields{ + StmtRecord: r, + AdditionalFields: fields, + Evicted: true, + }) + } return json.Marshal(stmtRecordWithAdditionalFields{ StmtRecord: r, AdditionalFields: fields, @@ -96,6 +152,12 @@ type stmtRecordWithAdditionalFields struct { AdditionalFields map[string]string `json:"additional_fields"` } +type evictedStmtRecordWithAdditionalFields struct { + *StmtRecord + AdditionalFields map[string]string `json:"additional_fields"` + Evicted bool `json:"evicted"` +} + type stmtLogEncoder struct{} func (*stmtLogEncoder) EncodeEntry(entry zapcore.Entry, _ []zapcore.Field) (*buffer.Buffer, error) { diff --git a/pkg/util/stmtsummary/v2/reader.go b/pkg/util/stmtsummary/v2/reader.go index 498c9c7437238..589fc4aa0a59f 100644 --- a/pkg/util/stmtsummary/v2/reader.go +++ b/pkg/util/stmtsummary/v2/reader.go @@ -448,6 +448,11 @@ type stmtTinyRecord struct { End int64 `json:"end"` } +type stmtPersistedRecord struct { + StmtRecord + Evicted bool `json:"evicted"` +} + type stmtFile struct { file *os.File begin int64 @@ -757,11 +762,14 @@ func (w *stmtParseWorker) handleLines( rows := make([][]types.Datum, 0, len(lines)) for _, line := range lines { - record, err := w.parse(line) + record, skipped, err := w.parse(line) if err != nil { // ignore invalid lines continue } + if skipped { + continue + } if w.needStop(record) { break @@ -790,12 +798,15 @@ func (w *stmtParseWorker) putRows( } } -func (*stmtParseWorker) parse(raw []byte) (*StmtRecord, error) { - var record StmtRecord +func (*stmtParseWorker) parse(raw []byte) (*StmtRecord, bool, error) { + var record stmtPersistedRecord if err := json.Unmarshal(raw, &record); err != nil { - return nil, err + return nil, false, err } - return &record, nil + if record.Evicted { + return nil, true, nil + } + return &record.StmtRecord, false, nil } func (w *stmtParseWorker) needStop(record *StmtRecord) bool { diff --git a/pkg/util/stmtsummary/v2/reader_test.go b/pkg/util/stmtsummary/v2/reader_test.go index 906fc69d2d2a0..6f24b1c3422fe 100644 --- a/pkg/util/stmtsummary/v2/reader_test.go +++ b/pkg/util/stmtsummary/v2/reader_test.go @@ -266,6 +266,8 @@ func TestHistoryReader(t *testing.T) { require.NoError(t, err) _, err = file.WriteString("{\"begin\":1672129270,\"end\":1672129280,\"digest\":\"digest2\",\"exec_count\":20}\n") require.NoError(t, err) + _, err = file.WriteString("{\"begin\":1672129270,\"end\":1672129280,\"digest\":\"evicted_digest\",\"exec_count\":99,\"evicted\":true}\n") + require.NoError(t, err) require.NoError(t, file.Close()) file, err = os.Create(filename2) diff --git a/pkg/util/stmtsummary/v2/record_test.go b/pkg/util/stmtsummary/v2/record_test.go index 8fcfc88f179d2..d35d3807a970f 100644 --- a/pkg/util/stmtsummary/v2/record_test.go +++ b/pkg/util/stmtsummary/v2/record_test.go @@ -103,4 +103,12 @@ func TestStmtRecord(t *testing.T) { require.NoError(t, json.Unmarshal(b, &items)) require.Equal(t, map[string]any{"stmt_meta_a": "value_a"}, items["additional_fields"]) require.Equal(t, record2.Digest, items["digest"]) + + b, err = marshalEvictedStmtRecord(record2) + require.NoError(t, err) + items = make(map[string]any) + require.NoError(t, json.Unmarshal(b, &items)) + require.Equal(t, map[string]any{"stmt_meta_a": "value_a"}, items["additional_fields"]) + require.Equal(t, true, items["evicted"]) + require.Equal(t, record2.Digest, items["digest"]) } diff --git a/pkg/util/stmtsummary/v2/stmtsummary.go b/pkg/util/stmtsummary/v2/stmtsummary.go index 8eec704ea7d91..2eab73de1ca2f 100644 --- a/pkg/util/stmtsummary/v2/stmtsummary.go +++ b/pkg/util/stmtsummary/v2/stmtsummary.go @@ -41,6 +41,17 @@ const ( defaultMaxSQLLength = 32768 defaultRefreshInterval = 30 * 60 // 30 min defaultRotateCheckInterval = 1 // s + + // evictedLogChanCap bounds the buffer of per-record evicted entries waiting + // to be logged. When full, new evictions are dropped so Add() never blocks. + evictedLogChanCap = 1024 + + // evictedLogBatchSize and evictedLogFlushInterval bound the async logger's + // batching. They reduce write frequency under eviction bursts while keeping + // single-record latency low. + evictedLogBatchSize = 64 + evictedLogFlushInterval = 100 * time.Millisecond + evictedDropReportInterval = 30 * time.Second ) var ( @@ -86,6 +97,7 @@ type StmtSummary struct { optMaxStmtCount *atomic2.Uint32 optMaxSQLLength *atomic2.Uint32 optRefreshInterval *atomic2.Uint32 + optPersistEvicted *atomic2.Bool optGroupByUser *atomic2.Bool window *stmtWindow @@ -93,6 +105,11 @@ type StmtSummary struct { storage stmtStorage closeWg sync.WaitGroup closed atomic.Bool + + // evictedCh carries per-record evictions to the async logger. + // Eviction persistence is controlled by optPersistEvicted; sends are non-blocking. + evictedCh chan *StmtRecord + evictedDropped atomic.Uint64 } // NewStmtSummary creates a new StmtSummary from Config. @@ -114,8 +131,8 @@ func NewStmtSummary(cfg *Config) (*StmtSummary, error) { optMaxStmtCount: atomic2.NewUint32(defaultMaxStmtCount), optMaxSQLLength: atomic2.NewUint32(defaultMaxSQLLength), optRefreshInterval: atomic2.NewUint32(defaultRefreshInterval), + optPersistEvicted: atomic2.NewBool(false), optGroupByUser: atomic2.NewBool(false), - window: newStmtWindow(timeNow(), uint(defaultMaxStmtCount)), storage: newStmtLogStorage(&log.Config{ File: log.FileLogConfig{ Filename: cfg.Filename, @@ -124,13 +141,20 @@ func NewStmtSummary(cfg *Config) (*StmtSummary, error) { MaxBackups: cfg.FileMaxBackups, }, }), + evictedCh: make(chan *StmtRecord, evictedLogChanCap), } + s.window = newStmtWindow(timeNow(), uint(defaultMaxStmtCount), s.onEvict) s.closeWg.Add(1) go func() { defer s.closeWg.Done() s.rotateLoop() }() + s.closeWg.Add(1) + go func() { + defer s.closeWg.Done() + s.evictedLogLoop() + }() return s, nil } @@ -148,10 +172,18 @@ func NewStmtSummary4Test(maxStmtCount uint) *StmtSummary { optMaxStmtCount: atomic2.NewUint32(defaultMaxStmtCount), optMaxSQLLength: atomic2.NewUint32(defaultMaxSQLLength), optRefreshInterval: atomic2.NewUint32(60 * 60 * 24 * 365), // 1 year + optPersistEvicted: atomic2.NewBool(false), optGroupByUser: atomic2.NewBool(false), - window: newStmtWindow(timeNow(), maxStmtCount), storage: &mockStmtStorage{}, + evictedCh: make(chan *StmtRecord, evictedLogChanCap), } + ss.window = newStmtWindow(timeNow(), maxStmtCount, ss.onEvict) + + ss.closeWg.Add(1) + go func() { + defer ss.closeWg.Done() + ss.evictedLogLoop() + }() return ss } @@ -238,6 +270,17 @@ func (s *StmtSummary) SetRefreshInterval(v uint32) error { return nil } +// PersistEvicted reports whether per-record evictions are persisted. +func (s *StmtSummary) PersistEvicted() bool { + return s.optPersistEvicted.Load() +} + +// SetPersistEvicted enables or disables per-record eviction persistence. +func (s *StmtSummary) SetPersistEvicted(v bool) error { + s.optPersistEvicted.Store(v) + return nil +} + // GroupByUser reports whether statement summaries are grouped by the // executing user in addition to the usual digest/schema/plan tuple. func (s *StmtSummary) GroupByUser() bool { @@ -274,6 +317,11 @@ func (s *StmtSummary) Add(info *stmtsummary.StmtExecInfo) { // Add info to the current statistics window. s.windowLock.Lock() + if s.closed.Load() { + s.windowLock.Unlock() + stmtsummary.StmtDigestKeyPool.Put(k) + return + } // Decide userForKey under windowLock so SetGroupByUser's flag flip + clear // is atomic w.r.t. Add; otherwise a post-clear insert could land under the // wrong grouping mode. @@ -338,11 +386,17 @@ func (s *StmtSummary) ClearInternal() { // Close closes the work of StmtSummary. func (s *StmtSummary) Close() { + s.windowLock.Lock() + if !s.closed.CompareAndSwap(false, true) { + s.windowLock.Unlock() + return + } + s.windowLock.Unlock() + if s.cancel != nil { s.cancel() s.closeWg.Wait() } - s.closed.Store(true) s.flush() } @@ -351,7 +405,7 @@ func (s *StmtSummary) flush() { s.windowLock.Lock() window := s.window - s.window = newStmtWindow(now, uint(s.MaxStmtCount())) + s.window = newStmtWindow(now, uint(s.MaxStmtCount()), s.onEvict) s.windowLock.Unlock() if window.lru.Size() > 0 { @@ -396,7 +450,7 @@ func (s *StmtSummary) updateMetrics() { func (s *StmtSummary) rotate(now time.Time) { w := s.window - s.window = newStmtWindow(now, uint(s.MaxStmtCount())) + s.window = newStmtWindow(now, uint(s.MaxStmtCount()), s.onEvict) size := w.lru.Size() if size > 0 { // Persist window asynchronously. @@ -408,6 +462,126 @@ func (s *StmtSummary) rotate(now time.Time) { } } +// onEvict is the LRU eviction hook installed on every stmtWindow. +// Called while the record's lock is held (see newStmtWindow). We copy the +// fields we need and hand the clone off to the async log goroutine. A +// non-blocking send is used so the hot Add() path never stalls on log I/O. +func (s *StmtSummary) onEvict(_ *stmtsummary.StmtDigestKey, r *StmtRecord, begin, end time.Time) bool { + if !s.optPersistEvicted.Load() { + return false + } + if s.evictedCh == nil { + return false + } + clone := cloneRecordForLog(r) + clone.Begin = begin.Unix() + clone.End = end.Unix() + select { + case s.evictedCh <- clone: + return true + default: + s.evictedDropped.Add(1) + metrics.StmtSummaryEvictedLogCounter.WithLabelValues( + metrics.StmtSummaryTypeV2, + metrics.StmtSummaryEvictedLogResultDropped, + ).Inc() + return false + } +} + +// evictedLogLoop drains evictedCh and writes each record to the stmt log. +// When group_by_user is also enabled, each logged record represents exactly +// one (digest, user) group that fell out of the LRU. +func (s *StmtSummary) evictedLogLoop() { + reportTicker := time.NewTicker(evictedDropReportInterval) + defer reportTicker.Stop() + + flushTimer := time.NewTimer(evictedLogFlushInterval) + if !flushTimer.Stop() { + <-flushTimer.C + } + defer flushTimer.Stop() + + var lastDropReport uint64 + report := func() { + cur := s.evictedDropped.Load() + if cur > lastDropReport { + logutil.BgLogger().Warn("stmt summary evicted log dropped records", + zap.Uint64("dropped_total", cur), + zap.Uint64("since_last_report", cur-lastDropReport), + ) + lastDropReport = cur + } + } + + stopFlushTimer := func() { + if !flushTimer.Stop() { + select { + case <-flushTimer.C: + default: + } + } + } + + batch := make([]*StmtRecord, 0, evictedLogBatchSize) + flush := func() { + if len(batch) == 0 { + return + } + s.storage.logEvicted(batch) + for i := range batch { + batch[i] = nil + } + batch = batch[:0] + stopFlushTimer() + } + appendRecord := func(r *StmtRecord) { + batch = append(batch, r) + if len(batch) == 1 { + flushTimer.Reset(evictedLogFlushInterval) + } + if len(batch) >= evictedLogBatchSize { + flush() + } + } + drainAvailable := func() { + for len(batch) > 0 && len(batch) < evictedLogBatchSize { + select { + case r := <-s.evictedCh: + appendRecord(r) + default: + return + } + } + } + + for { + select { + case <-s.ctx.Done(): + // Close sets closed while holding windowLock before canceling this + // context, and Add rechecks closed under the same lock. At this + // point no Add can enqueue more evicted records. + for { + select { + case r := <-s.evictedCh: + appendRecord(r) + default: + flush() + report() + return + } + } + case r := <-s.evictedCh: + appendRecord(r) + drainAvailable() + case <-flushTimer.C: + flush() + case <-reportTicker.C: + report() + } + } +} + // stmtWindow represents a single statistical window, which has a begin // time and an end time. Data within a single window is eliminated // according to the LRU strategy. All evicted data will be aggregated @@ -419,7 +593,14 @@ type stmtWindow struct { evictedCount atomic.Int64 // total number of LRU evictions in this window } -func newStmtWindow(begin time.Time, capacity uint) *stmtWindow { +// onEvictFn is invoked for every LRU eviction. The callback receives the +// locked record (caller holds r.Lock) so it can copy fields cheaply. It +// returns true when the record has been handed off for per-record persistence, +// in which case the caller can skip adding it to the persisted aggregate. +// Must not block. +type onEvictFn func(key *stmtsummary.StmtDigestKey, r *StmtRecord, begin, end time.Time) bool + +func newStmtWindow(begin time.Time, capacity uint, onEvict onEvictFn) *stmtWindow { w := &stmtWindow{ begin: begin, lru: kvcache.NewSimpleLRUCache(capacity, 0, 0), @@ -430,7 +611,12 @@ func newStmtWindow(begin time.Time, capacity uint) *stmtWindow { r := v.(*lockedStmtRecord) r.Lock() defer r.Unlock() - w.evicted.add(k.(*stmtsummary.StmtDigestKey), r.StmtRecord) + key := k.(*stmtsummary.StmtDigestKey) + queuedForEvictedLog := false + if onEvict != nil { + queuedForEvictedLog = onEvict(key, r.StmtRecord, w.begin, timeNow()) + } + w.evicted.add(key, r.StmtRecord, queuedForEvictedLog) }) return w } @@ -443,29 +629,32 @@ func (w *stmtWindow) clear() { type stmtStorage interface { persist(w *stmtWindow, end time.Time) + // logEvicted writes evicted records to durable storage. It may be + // called concurrently with persist; implementations must be safe to call + // from the evictedLogLoop goroutine. + logEvicted(records []*StmtRecord) sync() error } type stmtEvicted struct { sync.Mutex - keys map[string]struct{} + keys map[string]struct{} + // other contains all evicted records in the current window. other *StmtRecord + // otherForPersist contains records not covered by per-record evicted logs. + // When per-record evicted logging is disabled, it is equivalent to other. + otherForPersist *StmtRecord } func newStmtEvicted() *stmtEvicted { return &stmtEvicted{ - keys: make(map[string]struct{}), - other: &StmtRecord{ - AuthUsers: make(map[string]struct{}), - MinLatency: time.Duration(math.MaxInt64), - BackoffTypes: make(map[string]int), - FirstSeen: time.Now(), - LastSeen: time.Now(), - }, + keys: make(map[string]struct{}), + other: newEvictedAggregateRecord(), + otherForPersist: newEvictedAggregateRecord(), } } -func (e *stmtEvicted) add(key *stmtsummary.StmtDigestKey, record *StmtRecord) { +func (e *stmtEvicted) add(key *stmtsummary.StmtDigestKey, record *StmtRecord, queuedForEvictedLog bool) { if key == nil || record == nil { return } @@ -473,6 +662,9 @@ func (e *stmtEvicted) add(key *stmtsummary.StmtDigestKey, record *StmtRecord) { defer e.Unlock() e.keys[string(key.Hash())] = struct{}{} e.other.Merge(record) + if !queuedForEvictedLog { + e.otherForPersist.Merge(record) + } } func (e *stmtEvicted) count() int { @@ -481,6 +673,16 @@ func (e *stmtEvicted) count() int { return len(e.keys) } +func newEvictedAggregateRecord() *StmtRecord { + return &StmtRecord{ + AuthUsers: make(map[string]struct{}), + MinLatency: time.Duration(math.MaxInt64), + BackoffTypes: make(map[string]int), + FirstSeen: time.Now(), + LastSeen: time.Now(), + } +} + type lockedStmtRecord struct { sync.Mutex *StmtRecord @@ -489,6 +691,7 @@ type lockedStmtRecord struct { type mockStmtStorage struct { sync.Mutex windows []*stmtWindow + evicted []*StmtRecord } func (s *mockStmtStorage) persist(w *stmtWindow, _ time.Time) { @@ -497,10 +700,38 @@ func (s *mockStmtStorage) persist(w *stmtWindow, _ time.Time) { s.Unlock() } +func (s *mockStmtStorage) logEvicted(records []*StmtRecord) { + s.Lock() + s.evicted = append(s.evicted, records...) + s.Unlock() +} + func (*mockStmtStorage) sync() error { return nil } +// cloneRecordForLog returns a shallow copy of r with its two mutable maps +// (AuthUsers, BackoffTypes) cloned, so the async logger can marshal the +// snapshot without racing with further updates on the retained StmtRecord. +// Called with r's lock held (see onEvict). +func cloneRecordForLog(r *StmtRecord) *StmtRecord { + c := *r + if len(r.AuthUsers) > 0 { + c.AuthUsers = make(map[string]struct{}, len(r.AuthUsers)) + for u := range r.AuthUsers { + c.AuthUsers[u] = struct{}{} + } + } + if len(r.BackoffTypes) > 0 { + c.BackoffTypes = make(map[string]int, len(r.BackoffTypes)) + for k, v := range r.BackoffTypes { + c.BackoffTypes[k] = v + } + } + // IndexNames is a slice; shallow copy is fine because it is append-only. + return &c +} + /* Public proxy functions between v1 and v2 */ // Add wraps GlobalStmtSummary.Add and stmtsummary.StmtSummaryByDigestMap.AddStatement. @@ -577,6 +808,16 @@ func SetMaxSQLLength(v int) error { return stmtsummary.StmtSummaryByDigestMap.SetMaxSQLLength(v) } +// SetPersistEvicted toggles per-record eviction persistence. Only v2 +// (persistent) honors this flag; v1 has no log sink, so the call is a no-op +// for it. +func SetPersistEvicted(v bool) error { + if GlobalStmtSummary != nil { + return GlobalStmtSummary.SetPersistEvicted(v) + } + return nil +} + // SetGroupByUser toggles the user dimension on both v1 and v2 so the sysvar // setter can call one entry point regardless of which backend is active. func SetGroupByUser(v bool) error { diff --git a/pkg/util/stmtsummary/v2/stmtsummary_test.go b/pkg/util/stmtsummary/v2/stmtsummary_test.go index 4d48dd30474d6..c395045e7b387 100644 --- a/pkg/util/stmtsummary/v2/stmtsummary_test.go +++ b/pkg/util/stmtsummary/v2/stmtsummary_test.go @@ -15,15 +15,20 @@ package stmtsummary import ( + "bytes" "encoding/json" "path/filepath" + "strings" "testing" + "time" "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/util/stmtsummary" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) func readGaugeValue(t *testing.T, gauge prometheus.Gauge) float64 { @@ -33,6 +38,13 @@ func readGaugeValue(t *testing.T, gauge prometheus.Gauge) float64 { return m.GetGauge().GetValue() } +func readCounterValue(t *testing.T, counter prometheus.Counter) float64 { + t.Helper() + m := &dto.Metric{} + require.NoError(t, counter.Write(m)) + return m.GetCounter().GetValue() +} + func TestStmtWindow(t *testing.T) { ss := NewStmtSummary4Test(5) defer ss.Close() @@ -83,6 +95,106 @@ func TestStmtSummary(t *testing.T) { require.Equal(t, 0, w.lru.Size()) } +func TestStmtSummaryPersistEvicted(t *testing.T) { + begin := time.Date(2026, 5, 25, 10, 0, 0, 0, time.UTC) + evictAt := begin.Add(42 * time.Second) + now := begin + oldTimeNow := timeNow + timeNow = func() time.Time { + return now + } + t.Cleanup(func() { + timeNow = oldTimeNow + }) + + storage := &mockStmtStorage{} + ss := NewStmtSummary4Test(2) + ss.storage = storage + defer ss.Close() + require.NoError(t, ss.SetPersistEvicted(true)) + + // With capacity 2, the 3rd and later distinct digests evict older entries + // and should each land in storage.evicted. + ss.Add(GenerateStmtExecInfo4Test("digest1")) + ss.Add(GenerateStmtExecInfo4Test("digest2")) + now = evictAt + ss.Add(GenerateStmtExecInfo4Test("digest3")) // evicts digest1 + ss.Add(GenerateStmtExecInfo4Test("digest4")) // evicts digest2 + + // The log is async; wait briefly for drain. + require.Eventually(t, func() bool { + storage.Lock() + defer storage.Unlock() + return len(storage.evicted) == 2 + }, time.Second, 10*time.Millisecond, "expected 2 evicted records to be logged") + + storage.Lock() + digests := []string{storage.evicted[0].Digest, storage.evicted[1].Digest} + for _, record := range storage.evicted { + require.Equal(t, begin.Unix(), record.Begin) + require.Equal(t, evictAt.Unix(), record.End) + } + storage.Unlock() + require.ElementsMatch(t, []string{"digest1", "digest2"}, digests) + + // Disable and verify no further log writes. + require.NoError(t, ss.SetPersistEvicted(false)) + ss.Add(GenerateStmtExecInfo4Test("digest5")) // evicts digest3 + require.Never(t, func() bool { + storage.Lock() + defer storage.Unlock() + return len(storage.evicted) != 2 + }, 100*time.Millisecond, 10*time.Millisecond, "evicted count should remain 2 after disabling") +} + +func TestStmtSummaryPersistEvictedDoesNotPersistLoggedRecordsAsAggregate(t *testing.T) { + var logBuf bytes.Buffer + storage := &stmtLogStorage{ + logger: zap.New(zapcore.NewCore(&stmtLogEncoder{}, zapcore.AddSync(&logBuf), zapcore.InfoLevel)), + } + + ss := NewStmtSummary4Test(2) + ss.storage = storage + require.NoError(t, ss.SetPersistEvicted(true)) + + ss.Add(GenerateStmtExecInfo4Test("digest1")) + ss.Add(GenerateStmtExecInfo4Test("digest2")) + ss.Add(GenerateStmtExecInfo4Test("digest3")) // evicts digest1 + ss.Add(GenerateStmtExecInfo4Test("digest4")) // evicts digest2 + persistedBefore := readCounterValue(t, metrics.StmtSummaryEvictedLogCounter.WithLabelValues( + metrics.StmtSummaryTypeV2, + metrics.StmtSummaryEvictedLogResultPersisted, + )) + ss.Close() + persistedAfter := readCounterValue(t, metrics.StmtSummaryEvictedLogCounter.WithLabelValues( + metrics.StmtSummaryTypeV2, + metrics.StmtSummaryEvictedLogResultPersisted, + )) + require.Equal(t, 2.0, persistedAfter-persistedBefore) + + type loggedRecord struct { + Digest string `json:"digest"` + ExecCount int64 `json:"exec_count"` + Evicted bool `json:"evicted"` + } + + var totalExecCount int64 + evictedDigests := make([]string, 0, 2) + for _, line := range strings.Split(strings.TrimSpace(logBuf.String()), "\n") { + var record loggedRecord + require.NoError(t, json.Unmarshal([]byte(line), &record)) + totalExecCount += record.ExecCount + if record.Evicted { + evictedDigests = append(evictedDigests, record.Digest) + continue + } + require.NotEmpty(t, record.Digest, "logged evicted records should not also be persisted as the aggregate row") + } + + require.ElementsMatch(t, []string{"digest1", "digest2"}, evictedDigests) + require.Equal(t, int64(4), totalExecCount) +} + func TestStmtSummaryGroupByUser(t *testing.T) { ss := NewStmtSummary4Test(100) defer ss.Close()