Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ func RegisterMetrics() {
// StmtSummary
prometheus.MustRegister(StmtSummaryWindowRecordCount)
prometheus.MustRegister(StmtSummaryWindowEvictedCount)
prometheus.MustRegister(StmtSummaryEvictedLogCounter)

// Channelz
setupChannelzCollector()
Expand Down
14 changes: 14 additions & 0 deletions pkg/metrics/metrics_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ func readGaugeValue(t *testing.T, gauge prometheus.Gauge) float64 {
return m.GetGauge().GetValue()
}

func readCounterValue(t *testing.T, counter prometheus.Counter) float64 {
t.Helper()
m := &dto.Metric{}
require.NoError(t, counter.Write(m))
return m.GetCounter().GetValue()
}

func countCollectedMetrics(collector prometheus.Collector) int {
ch := make(chan prometheus.Metric, 16)
collector.Collect(ch)
Expand All @@ -53,6 +60,7 @@ func TestStmtSummaryMetricLabels(t *testing.T) {
InitStmtSummaryMetrics()
require.Equal(t, 0, countCollectedMetrics(StmtSummaryWindowRecordCount))
require.Equal(t, 0, countCollectedMetrics(StmtSummaryWindowEvictedCount))
require.Equal(t, 0, countCollectedMetrics(StmtSummaryEvictedLogCounter))

SetStmtSummaryWindowMetrics(StmtSummaryTypeV1, 3, 1)
require.Equal(t, 1, countCollectedMetrics(StmtSummaryWindowRecordCount))
Expand All @@ -65,6 +73,12 @@ func TestStmtSummaryMetricLabels(t *testing.T) {
require.Equal(t, 2, countCollectedMetrics(StmtSummaryWindowEvictedCount))
require.Equal(t, 5.0, readGaugeValue(t, StmtSummaryWindowRecordCount.WithLabelValues(StmtSummaryTypeV2)))
require.Equal(t, 2.0, readGaugeValue(t, StmtSummaryWindowEvictedCount.WithLabelValues(StmtSummaryTypeV2)))

StmtSummaryEvictedLogCounter.WithLabelValues(StmtSummaryTypeV2, StmtSummaryEvictedLogResultPersisted).Add(3)
StmtSummaryEvictedLogCounter.WithLabelValues(StmtSummaryTypeV2, StmtSummaryEvictedLogResultDropped).Inc()
require.Equal(t, 2, countCollectedMetrics(StmtSummaryEvictedLogCounter))
require.Equal(t, 3.0, readCounterValue(t, StmtSummaryEvictedLogCounter.WithLabelValues(StmtSummaryTypeV2, StmtSummaryEvictedLogResultPersisted)))
require.Equal(t, 1.0, readCounterValue(t, StmtSummaryEvictedLogCounter.WithLabelValues(StmtSummaryTypeV2, StmtSummaryEvictedLogResultDropped)))
}

func TestGrpcChannelzCollectorSingleton(t *testing.T) {
Expand Down
16 changes: 16 additions & 0 deletions pkg/metrics/stmtsummary.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ const (
StmtSummaryTypeV1 = "v1"
// StmtSummaryTypeV2 marks metrics reported by the persistent statement summary implementation.
StmtSummaryTypeV2 = "v2"

// StmtSummaryEvictedLogResultPersisted marks evicted records submitted to the stmt log.
StmtSummaryEvictedLogResultPersisted = "persisted"
// StmtSummaryEvictedLogResultDropped marks evicted records dropped before reaching the stmt log.
StmtSummaryEvictedLogResultDropped = "dropped"
)

var (
Expand All @@ -39,6 +44,9 @@ var (
// This value resets to 0 when the window rotates.
StmtSummaryWindowEvictedCount *prometheus.GaugeVec

// StmtSummaryEvictedLogCounter counts v2 evicted-log persistence outcomes.
StmtSummaryEvictedLogCounter *prometheus.CounterVec

stmtSummaryWindowRecordCountV1 prometheus.Gauge
stmtSummaryWindowRecordCountV2 prometheus.Gauge
stmtSummaryWindowEvictedCountV1 prometheus.Gauge
Expand All @@ -65,6 +73,14 @@ func InitStmtSummaryMetrics() {
Help: "The number of LRU evictions in the current statement summary window.",
}, []string{LblType})

StmtSummaryEvictedLogCounter = metricscommon.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "stmt_summary",
Name: "evicted_log_total",
Help: "The number of v2 statement summary evicted-log records by result.",
}, []string{LblType, LblResult})

stmtSummaryWindowMetricsMu.Lock()
stmtSummaryWindowRecordCountV1 = nil
stmtSummaryWindowRecordCountV2 = nil
Expand Down
6 changes: 6 additions & 0 deletions pkg/sessionctx/vardef/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,11 @@ const (
// TiDBStmtSummaryMaxSQLLength indicates the max length of displayed normalized sql and sample sql.
TiDBStmtSummaryMaxSQLLength = "tidb_stmt_summary_max_sql_length"

// TiDBStmtSummaryPersistEvicted controls whether per-record LRU evictions
// in the v2 (persistent) statement summary are persisted to the stmt log.
// Off by default because it adds log volume proportional to eviction rate.
TiDBStmtSummaryPersistEvicted = "tidb_stmt_summary_persist_evicted"

// TiDBStmtSummaryGroupByUser, when enabled, adds the executing user to the
// statement summary grouping key so the same digest run by different users
// produces separate rows. Off by default to avoid cardinality growth.
Expand Down Expand Up @@ -1627,6 +1632,7 @@ const (
DefTiDBStmtSummaryHistorySize = 24
DefTiDBStmtSummaryMaxStmtCount = 3000
DefTiDBStmtSummaryMaxSQLLength = 32768
DefTiDBStmtSummaryPersistEvicted = false
DefTiDBStmtSummaryGroupByUser = false
DefTiDBCapturePlanBaseline = Off
DefTiDBIgnoreInlistPlanDigest = true
Expand Down
4 changes: 4 additions & 0 deletions pkg/sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,10 @@ var defaultSysVars = []*SysVar{
SetGlobal: func(_ context.Context, s *SessionVars, val string) error {
return stmtsummaryv2.SetMaxSQLLength(TidbOptInt(val, vardef.DefTiDBStmtSummaryMaxSQLLength))
}},
{Scope: vardef.ScopeGlobal, Name: vardef.TiDBStmtSummaryPersistEvicted, Value: BoolToOnOff(vardef.DefTiDBStmtSummaryPersistEvicted), Type: vardef.TypeBool, AllowEmpty: true,
SetGlobal: func(_ context.Context, s *SessionVars, val string) error {
return stmtsummaryv2.SetPersistEvicted(TiDBOptOn(val))
}},
{Scope: vardef.ScopeGlobal, Name: vardef.TiDBStmtSummaryGroupByUser, Value: BoolToOnOff(vardef.DefTiDBStmtSummaryGroupByUser), Type: vardef.TypeBool, AllowEmpty: true,
SetGlobal: func(_ context.Context, s *SessionVars, val string) error {
return stmtsummaryv2.SetGroupByUser(TiDBOptOn(val))
Expand Down
4 changes: 3 additions & 1 deletion pkg/util/stmtsummary/v2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ go_test(
],
embed = [":stmtsummary"],
flaky = True,
shard_count = 16,
shard_count = 18,
deps = [
"//pkg/config",
"//pkg/meta/model",
Expand All @@ -65,5 +65,7 @@ go_test(
"@com_github_prometheus_client_model//go",
"@com_github_stretchr_testify//require",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
],
)
70 changes: 66 additions & 4 deletions pkg/util/stmtsummary/v2/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ package stmtsummary
import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
"go.uber.org/zap/buffer"
Expand Down Expand Up @@ -59,10 +61,10 @@ func (s *stmtLogStorage) persist(w *stmtWindow, end time.Time) {
r.Unlock()
}
w.evicted.Lock()
if w.evicted.other.ExecCount > 0 {
w.evicted.other.Begin = begin
w.evicted.other.End = end.Unix()
s.log(w.evicted.other)
if w.evicted.otherForPersist.ExecCount > 0 {
w.evicted.otherForPersist.Begin = begin
w.evicted.otherForPersist.End = end.Unix()
s.log(w.evicted.otherForPersist)
}
w.evicted.Unlock()
}
Expand All @@ -71,6 +73,42 @@ func (s *stmtLogStorage) sync() error {
return s.logger.Sync()
}

// logEvicted writes evicted records to the stmt log with an `"evicted":true`
// marker so downstream consumers can distinguish per-record eviction events
// from rotated-window records.
func (s *stmtLogStorage) logEvicted(records []*StmtRecord) {
Comment thread
nolouch marked this conversation as resolved.
var builder strings.Builder
persisted := 0
for _, r := range records {
b, err := marshalEvictedStmtRecord(r)
if err != nil {
logutil.BgLogger().Warn("failed to marshal evicted statement summary", zap.Error(err))
continue
}
if builder.Len() > 0 {
builder.WriteByte('\n')
}
_, _ = builder.Write(b)
persisted++
}
if builder.Len() == 0 {
return
}
s.logger.Info(builder.String())
metrics.StmtSummaryEvictedLogCounter.WithLabelValues(
metrics.StmtSummaryTypeV2,
metrics.StmtSummaryEvictedLogResultPersisted,
).Add(float64(persisted))
}

// evictedStmtRecord embeds *StmtRecord and adds an "evicted" JSON tag.
// Keeping the embedded pointer means the JSON field order matches StmtRecord
// and parsers tolerant of the extra field work unchanged.
type evictedStmtRecord struct {
*StmtRecord
Evicted bool `json:"evicted"`
}

func (s *stmtLogStorage) log(r *StmtRecord) {
b, err := marshalStmtRecord(r)
if err != nil {
Expand All @@ -81,10 +119,28 @@ func (s *stmtLogStorage) log(r *StmtRecord) {
}

func marshalStmtRecord(r *StmtRecord) ([]byte, error) {
return marshalStmtRecordWithEvicted(r, false)
}

func marshalEvictedStmtRecord(r *StmtRecord) ([]byte, error) {
return marshalStmtRecordWithEvicted(r, true)
}

func marshalStmtRecordWithEvicted(r *StmtRecord, evicted bool) ([]byte, error) {
fields := config.GetGlobalConfig().GetKeyspaceObservabilityStmtLogFields()
if len(fields) == 0 {
if evicted {
return json.Marshal(evictedStmtRecord{StmtRecord: r, Evicted: true})
}
return json.Marshal(r)
}
if evicted {
return json.Marshal(evictedStmtRecordWithAdditionalFields{
StmtRecord: r,
AdditionalFields: fields,
Evicted: true,
})
}
return json.Marshal(stmtRecordWithAdditionalFields{
StmtRecord: r,
AdditionalFields: fields,
Expand All @@ -96,6 +152,12 @@ type stmtRecordWithAdditionalFields struct {
AdditionalFields map[string]string `json:"additional_fields"`
}

type evictedStmtRecordWithAdditionalFields struct {
*StmtRecord
AdditionalFields map[string]string `json:"additional_fields"`
Evicted bool `json:"evicted"`
}

type stmtLogEncoder struct{}

func (*stmtLogEncoder) EncodeEntry(entry zapcore.Entry, _ []zapcore.Field) (*buffer.Buffer, error) {
Expand Down
21 changes: 16 additions & 5 deletions pkg/util/stmtsummary/v2/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,11 @@ type stmtTinyRecord struct {
End int64 `json:"end"`
}

type stmtPersistedRecord struct {
StmtRecord
Evicted bool `json:"evicted"`
}

type stmtFile struct {
file *os.File
begin int64
Expand Down Expand Up @@ -757,11 +762,14 @@ func (w *stmtParseWorker) handleLines(

rows := make([][]types.Datum, 0, len(lines))
for _, line := range lines {
record, err := w.parse(line)
record, skipped, err := w.parse(line)
if err != nil {
// ignore invalid lines
continue
}
if skipped {
continue
}

if w.needStop(record) {
break
Expand Down Expand Up @@ -790,12 +798,15 @@ func (w *stmtParseWorker) putRows(
}
}

func (*stmtParseWorker) parse(raw []byte) (*StmtRecord, error) {
var record StmtRecord
func (*stmtParseWorker) parse(raw []byte) (*StmtRecord, bool, error) {
var record stmtPersistedRecord
if err := json.Unmarshal(raw, &record); err != nil {
return nil, err
return nil, false, err
}
return &record, nil
if record.Evicted {
return nil, true, nil
}
return &record.StmtRecord, false, nil
}

func (w *stmtParseWorker) needStop(record *StmtRecord) bool {
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/stmtsummary/v2/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ func TestHistoryReader(t *testing.T) {
require.NoError(t, err)
_, err = file.WriteString("{\"begin\":1672129270,\"end\":1672129280,\"digest\":\"digest2\",\"exec_count\":20}\n")
require.NoError(t, err)
_, err = file.WriteString("{\"begin\":1672129270,\"end\":1672129280,\"digest\":\"evicted_digest\",\"exec_count\":99,\"evicted\":true}\n")
require.NoError(t, err)
require.NoError(t, file.Close())

file, err = os.Create(filename2)
Expand Down
8 changes: 8 additions & 0 deletions pkg/util/stmtsummary/v2/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,12 @@ func TestStmtRecord(t *testing.T) {
require.NoError(t, json.Unmarshal(b, &items))
require.Equal(t, map[string]any{"stmt_meta_a": "value_a"}, items["additional_fields"])
require.Equal(t, record2.Digest, items["digest"])

b, err = marshalEvictedStmtRecord(record2)
require.NoError(t, err)
items = make(map[string]any)
require.NoError(t, json.Unmarshal(b, &items))
require.Equal(t, map[string]any{"stmt_meta_a": "value_a"}, items["additional_fields"])
require.Equal(t, true, items["evicted"])
require.Equal(t, record2.Digest, items["digest"])
}
Loading
Loading