Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions downstreamadapter/dispatcher/basic_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (d *BasicDispatcher) AddBlockEventToSink(event commonEvent.BlockEvent) erro
// If NotSync is true, it means the DDL should not be sent to downstream.
// So we just call PassBlockEventToSink to update the table progress and call the postFlush func.
if ddl.NotSync {
log.Info("ignore DDL by NotSync", zap.Stringer("dispatcher", d.id), zap.Any("ddl", ddl))
log.Info("ignore DDL by NotSync", zap.Stringer("dispatcher", d.id), zap.String("ddl", ddl.GetDDLQuery()))
d.PassBlockEventToSink(event)
return nil
}
Expand Down Expand Up @@ -539,7 +539,7 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC
cost := time.Since(now)
d.sharedInfo.metricHandleDDLHis.Observe(cost.Seconds())
log.Debug("dispatcher handle ddl event finish",
zap.Duration("cost", cost), zap.Any("ddl", ddl))
zap.Duration("cost", cost), zap.String("query", ddl.Query))
})
d.DealWithBlockEvent(ddl)
case commonEvent.TypeSyncPointEvent:
Expand Down
2 changes: 0 additions & 2 deletions downstreamadapter/eventcollector/dispatcher_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@ func (d *dispatcherStat) shouldForwardEventByCommitTs(event dispatcher.Dispatche
zap.Stringer("changefeedID", d.target.GetChangefeedID()),
zap.Int64("tableID", d.target.GetTableSpan().TableID),
zap.Stringer("dispatcher", d.getDispatcherID()),
zap.Any("event", event.Event),
zap.Uint64("eventCommitTs", event.GetCommitTs()),
zap.Uint64("sentCommitTs", d.lastEventCommitTs.Load()))
return false
Expand Down Expand Up @@ -411,7 +410,6 @@ func (d *dispatcherStat) handleSingleDataEvents(events []dispatcher.DispatcherEv
zap.Stringer("changefeedID", d.target.GetChangefeedID()),
zap.Stringer("dispatcher", d.getDispatcherID()),
zap.String("eventType", commonEvent.TypeToString(events[0].GetType())),
zap.Any("event", events[0].Event),
zap.Uint64("eventEpoch", events[0].GetEpoch()),
zap.Uint64("dispatcherEpoch", state.epoch),
zap.Stringer("staleEventService", *from),
Expand Down
2 changes: 1 addition & 1 deletion downstreamadapter/sink/blackhole/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (s *Sink) WriteBlockEvent(event commonEvent.BlockEvent) error {
e := event.(*commonEvent.DDLEvent)
// NOTE: don't change the log, integration test `lossy_ddl` depends on it.
// ref: https://github.com/pingcap/ticdc/blob/da834db76e0662ff15ef12645d1f37bfa6506d83/tests/integration_tests/lossy_ddl/run.sh#L17
log.Debug("BlackHoleSink: DDL Event", zap.Any("ddl", e))
log.Debug("BlackHoleSink: DDL Event", zap.String("ddl", e.GetDDLQuery()))
ddlType := e.GetDDLType().String()
err := s.statistics.RecordDDLExecution(func() (string, error) {
return ddlType, nil
Expand Down
9 changes: 6 additions & 3 deletions downstreamadapter/sink/cloudstorage/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,18 @@ func TestWriterRun(t *testing.T) {
_ = d.run(ctx)
}()

dataFileName := fmt.Sprintf("CDC_%s_000001.json", dispatcherID.String())
indexFileName := fmt.Sprintf("CDC_%s.index", dispatcherID.String())
require.Eventually(t, func() bool {
files, err := os.ReadDir(table1Dir)
return err == nil && len(files) == 2
_, dataErr := os.Stat(path.Join(table1Dir, dataFileName))
_, indexErr := os.Stat(path.Join(table1Dir, "meta", indexFileName))
return dataErr == nil && indexErr == nil
}, 10*time.Second, 100*time.Millisecond)

// check whether files for table1 has been generated
fileNames := getTableFiles(t, table1Dir)
require.Len(t, fileNames, 2)
require.ElementsMatch(t, []string{fmt.Sprintf("CDC_%s_000001.json", dispatcherID.String()), fmt.Sprintf("CDC_%s.index", dispatcherID.String())}, fileNames)
require.ElementsMatch(t, []string{dataFileName, indexFileName}, fileNames)
cancel()
wg.Wait()
}
Expand Down
10 changes: 0 additions & 10 deletions downstreamadapter/sink/kafka/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,17 +375,12 @@ func (s *sink) sendMessages(ctx context.Context) error {
metricSendMessageDuration := metrics.WorkerSendMessageDuration.WithLabelValues(s.changefeedID.Keyspace(), s.changefeedID.Name())
defer metrics.WorkerSendMessageDuration.DeleteLabelValues(s.changefeedID.Keyspace(), s.changefeedID.Name())

ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

var err error
outCh := s.comp.encoderGroup.Output()
for {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-ticker.C:
s.dmlProducer.Heartbeat()
case future, ok := <-outCh:
if !ok {
log.Info("kafka sink encoder's output channel closed",
Expand Down Expand Up @@ -483,9 +478,6 @@ func (s *sink) sendCheckpoint(ctx context.Context) error {
metrics.CheckpointTsMessageCount.DeleteLabelValues(s.changefeedID.Keyspace(), s.changefeedID.Name())
}()

ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

var (
msg *common.Message
partitionNum int32
Expand All @@ -495,8 +487,6 @@ func (s *sink) sendCheckpoint(ctx context.Context) error {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-ticker.C:
s.ddlProducer.Heartbeat()
case ts, ok := <-s.checkpointChan:
if !ok {
log.Warn("kafka sink checkpoint channel closed",
Expand Down
89 changes: 0 additions & 89 deletions downstreamadapter/sink/kafka/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ import (
"context"
"fmt"
"net/url"
"sync"
"testing"
"time"

"github.com/IBM/sarama/mocks"
"github.com/pingcap/errors"
"github.com/pingcap/ticdc/downstreamadapter/sink/helper"
"github.com/pingcap/ticdc/pkg/common"
Expand Down Expand Up @@ -104,93 +102,6 @@ func newKafkaSinkForTest(ctx context.Context) (*sink, error) {
return newKafkaSinkForTestWithProducers(ctx, nil, nil)
}

// mockSyncProducer is used to count the calls to Heartbeat.
type mockSyncProducer struct {
kafka.MockSaramaSyncProducer
heartbeatCount int
mu sync.Mutex
}

func (m *mockSyncProducer) Heartbeat() {
m.mu.Lock()
defer m.mu.Unlock()
m.heartbeatCount++
}

func (m *mockSyncProducer) GetHeartbeatCount() int {
m.mu.Lock()
defer m.mu.Unlock()
return m.heartbeatCount
}

func TestDDLProducerHeartbeat(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
producer := &mockSyncProducer{}
heartbeatInterval := 5 * time.Second
_, err := newKafkaSinkForTestWithProducers(ctx, nil, producer)
require.NoError(t, err)

// Wait for a sufficient amount of time to ensure the heartbeat ticker triggers several times.
// Waiting for 11 seconds to allow for at least two heartbeats.
// Use Eventually to avoid test flakiness.
require.Eventually(t, func() bool {
return producer.GetHeartbeatCount() >= 2
}, 11*time.Second, 150*time.Millisecond, "Heartbeat should be called periodically")

// Verify that closing the manager stops the heartbeat.
countBeforeClose := producer.GetHeartbeatCount()
cancel()
// Wait for a short period to ensure no new heartbeats occur.
time.Sleep(heartbeatInterval * 2)
require.Equal(t, countBeforeClose, producer.GetHeartbeatCount(), "Heartbeat should stop after manager is closed")
}

// mockSyncProducer is used to count the calls to Heartbeat.
type mockAsyncProducer struct {
kafka.MockSaramaAsyncProducer
heartbeatCount int
mu sync.Mutex
}

func (m *mockAsyncProducer) Heartbeat() {
m.mu.Lock()
defer m.mu.Unlock()
m.heartbeatCount++
}

func (m *mockAsyncProducer) GetHeartbeatCount() int {
m.mu.Lock()
defer m.mu.Unlock()
return m.heartbeatCount
}

func TestDMLProducerHeartbeat(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
producer := &mockAsyncProducer{}
producer.AsyncProducer = mocks.NewAsyncProducer(t, nil)
heartbeatInterval := 5 * time.Second
_, err := newKafkaSinkForTestWithProducers(ctx, producer, nil)
require.NoError(t, err)

// Wait for a sufficient amount of time to ensure the heartbeat ticker triggers several times.
// Waiting for 11 seconds to allow for at least two heartbeats.
// Use Eventually to avoid test flakiness.
require.Eventually(t, func() bool {
return producer.GetHeartbeatCount() >= 2
}, 11*time.Second, 150*time.Millisecond, "Heartbeat should be called periodically")

// Verify that closing the manager stops the heartbeat.
countBeforeClose := producer.GetHeartbeatCount()
cancel()
// Wait for a short period to ensure no new heartbeats occur.
time.Sleep(heartbeatInterval * 2)
require.Equal(t, countBeforeClose, producer.GetHeartbeatCount(), "Heartbeat should stop after manager is closed")
}

func TestKafkaSinkBasicFunctionality(t *testing.T) {
helper := commonEvent.NewEventTestHelper(t)
defer helper.Close()
Expand Down
16 changes: 5 additions & 11 deletions downstreamadapter/sink/topicmanager/kafka_topic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ type kafkaTopicManager struct {
cfg *kafka.AutoCreateTopicConfig

topics sync.Map

metaRefreshTicker *time.Ticker

// cancel is used to cancel the background goroutine.
cancel context.CancelFunc
}
Expand Down Expand Up @@ -81,11 +78,10 @@ func newKafkaTopicManager(
cfg *kafka.AutoCreateTopicConfig,
) *kafkaTopicManager {
mgr := &kafkaTopicManager{
defaultTopic: defaultTopic,
changefeedID: changefeedID,
admin: admin,
cfg: cfg,
metaRefreshTicker: time.NewTicker(metaRefreshInterval),
defaultTopic: defaultTopic,
changefeedID: changefeedID,
admin: admin,
cfg: cfg,
}

ctx, mgr.cancel = context.WithCancel(ctx)
Expand Down Expand Up @@ -115,7 +111,7 @@ func (m *kafkaTopicManager) GetPartitionNum(
}

func (m *kafkaTopicManager) backgroundRefreshMeta(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second)
ticker := time.NewTicker(metaRefreshInterval)
defer ticker.Stop()
for {
select {
Expand All @@ -126,8 +122,6 @@ func (m *kafkaTopicManager) backgroundRefreshMeta(ctx context.Context) {
)
return
case <-ticker.C:
m.admin.Heartbeat()
case <-m.metaRefreshTicker.C:
// We ignore the error here, because the error may be caused by the
// network problem, and we can try to get the metadata next time.
topicPartitionNums, _ := m.fetchAllTopicsPartitionsNum(ctx)
Expand Down
50 changes: 0 additions & 50 deletions downstreamadapter/sink/topicmanager/kafka_topic_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,63 +15,13 @@ package topicmanager

import (
"context"
"sync"
"testing"
"time"

"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/sink/kafka"
"github.com/stretchr/testify/require"
)

// mockAdminClientForHeartbeat is used to count the calls to Heartbeat.
type mockAdminClientForHeartbeat struct {
kafka.ClusterAdminClientMockImpl
heartbeatCount int
mu sync.Mutex
}

func (m *mockAdminClientForHeartbeat) Heartbeat() {
m.mu.Lock()
defer m.mu.Unlock()
m.heartbeatCount++
}

func (m *mockAdminClientForHeartbeat) GetHeartbeatCount() int {
m.mu.Lock()
defer m.mu.Unlock()
return m.heartbeatCount
}

func TestKafkaTopicManagerHeartbeat(t *testing.T) {
t.Parallel()

adminClient := &mockAdminClientForHeartbeat{}
cfg := &kafka.AutoCreateTopicConfig{AutoCreate: false}
changefeedID := common.NewChangefeedID4Test("test", "test")
ctx, cancel := context.WithCancel(context.Background())

heartbeatInterval := 5 * time.Second
manager := newKafkaTopicManager(ctx, "topic", changefeedID, adminClient, cfg)

// Ensure the context is canceled at the end of the test.
defer cancel()

// Wait for a sufficient amount of time to ensure the heartbeat ticker triggers several times.
// Waiting for 11 seconds to allow for at least two heartbeats.
// Use Eventually to avoid test flakiness.
require.Eventually(t, func() bool {
return adminClient.GetHeartbeatCount() >= 2
}, 11*time.Second, 150*time.Millisecond, "Heartbeat should be called periodically")

// Verify that closing the manager stops the heartbeat.
countBeforeClose := adminClient.GetHeartbeatCount()
manager.Close()
// Wait for a short period to ensure no new heartbeats occur.
time.Sleep(heartbeatInterval * 2)
require.Equal(t, countBeforeClose, adminClient.GetHeartbeatCount(), "Heartbeat should stop after manager is closed")
}

func TestCreateTopic(t *testing.T) {
t.Parallel()

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ require (
github.com/pingcap/tidb v1.1.0-beta.0.20260611081847-e7463a21c08f
github.com/pingcap/tidb-dashboard v0.0.0-20260603054940-9e92195886c3
github.com/pingcap/tidb/pkg/parser v0.0.0-20260611093756-46989375079c
github.com/pingcap/tiflow v0.0.0-20260612064548-0206c5481cab
github.com/pingcap/tiflow v0.0.0-20260615112939-12921db8ebdd
github.com/prometheus/client_golang v1.23.0
github.com/r3labs/diff v1.1.0
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9
Expand Down Expand Up @@ -401,4 +401,4 @@ replace github.com/pingcap/tipb => github.com/pingcap/tipb v0.0.0-20260605083900

replace github.com/apache/arrow-go/v18 => github.com/joechenrh/arrow-go/v18 v18.0.0-20250911101656-62c34c9a3b82

replace github.com/IBM/sarama v1.41.2 => github.com/pingcap/sarama v1.41.2-pingcap-20251202-x
replace github.com/IBM/sarama v1.41.2 => github.com/pingcap/sarama v1.41.2-pingcap-20260508
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -771,8 +771,8 @@ github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuR
github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9 h1:qG9BSvlWFEE5otQGamuWedx9LRm0nrHvsQRQiW8SxEs=
github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9/go.mod h1:ORfBOFp1eteu2odzsyaxI+b8TzJwgjwyQcGhI+9SfEA=
github.com/pingcap/sarama v1.41.2-pingcap-20251202-x h1:9Vi3qqyDNZxG6fnXQhpeTsnwzSBWNpMeb8o02JkL9JM=
github.com/pingcap/sarama v1.41.2-pingcap-20251202-x/go.mod h1:xdpu7sd6OE1uxNdjYTSKUfY8FaKkJES9/+EyjSgiGQk=
github.com/pingcap/sarama v1.41.2-pingcap-20260508 h1:3ZFtYLUGMMZeA6U0iz3EyFnNGPHu3qOuPLj5wXxHmeU=
github.com/pingcap/sarama v1.41.2-pingcap-20260508/go.mod h1:PIL6ZKKKhm19IbQpmpJcFnybAi1yXtgLAitDAeBdNCw=
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5 h1:T4pXRhBflzDeAhmOQHNPRRogMYxP13V7BkYw3ZsoSfE=
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5/go.mod h1:rlimy0GcTvjiJqvD5mXTRr8O2eNZPBrcUgiWVYp9530=
github.com/pingcap/tidb v1.1.0-beta.0.20260611081847-e7463a21c08f h1:7elbZEXrLr23AYzie0Pzp8djmTGS4cUlkAg/Of2MIF8=
Expand All @@ -781,8 +781,8 @@ github.com/pingcap/tidb-dashboard v0.0.0-20260603054940-9e92195886c3 h1:q7K62HEN
github.com/pingcap/tidb-dashboard v0.0.0-20260603054940-9e92195886c3/go.mod h1:dZXZ7smJd5ZkwDMHnPfrhnzUFNbL5kMU5MpFxRfKr0U=
github.com/pingcap/tidb/pkg/parser v0.0.0-20260611093756-46989375079c h1:fZ3Hv66/w2zPhIqZ6+gtkx2e0GbGbhgXXGkPKnD+wu0=
github.com/pingcap/tidb/pkg/parser v0.0.0-20260611093756-46989375079c/go.mod h1:49aXiDQhHi2UF15fq7zqtJZlKm+y+oIu9BVgWh9++Lk=
github.com/pingcap/tiflow v0.0.0-20260612064548-0206c5481cab h1:hMcP9VDvH3kosqdk/HO0284KxUDf0SK+U42T+5+fHfA=
github.com/pingcap/tiflow v0.0.0-20260612064548-0206c5481cab/go.mod h1:RBWcseGMX5bSsduiDcile5Q7aveYMJONAcg2STve8aY=
github.com/pingcap/tiflow v0.0.0-20260615112939-12921db8ebdd h1:kQXYaSow3246cDF6CULbBrV20WxV5ouabyPT7TmSMGM=
github.com/pingcap/tiflow v0.0.0-20260615112939-12921db8ebdd/go.mod h1:RBWcseGMX5bSsduiDcile5Q7aveYMJONAcg2STve8aY=
github.com/pingcap/tipb v0.0.0-20260605083900-f9f651ef5fbc h1:wxolKysltFSu8gxWJBdUdWuTBoSuY3MjNIIZI5S9JLY=
github.com/pingcap/tipb v0.0.0-20260605083900-f9f651ef5fbc/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ=
Expand Down
10 changes: 6 additions & 4 deletions pkg/applier/redo.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,14 @@ func (ra *RedoApplier) applyDDL(
timodel.ActionAlterTablePartitionPlacement, timodel.ActionRecoverSchema:
tableDDLTs = ra.getTableDDLTs(commonType.DDLSpanTableID)
default:
log.Warn("ignore unsupport DDL", zap.Any("ddl", ddl), zap.Any("type", ddl.Type))
log.Warn("ignore unsupport DDL", zap.String("ddl", ddl.DDL.Query))
return true
}
}
if tableDDLTs.ts >= int64(ddl.DDL.CommitTs) {
log.Warn("ignore DDL which commit ts is less than current ts", zap.Any("ddl", ddl), zap.Any("startTs", tableDDLTs.ts))
log.Warn("ignore DDL which commit ts is less than current ts",
zap.Uint64("commitTs", ddl.DDL.CommitTs), zap.Int64("startTs", tableDDLTs.ts),
zap.String("ddl", ddl.DDL.Query))
// Ignore the previous dml events, because the drop ddl has replicated the downstream
// DML + Drop Table: If the drop table ddl is ignored and the previous dmls should be replicated the downstream in the past.
if ddl.DDL.NeedDroppedTables != nil {
Expand All @@ -323,7 +325,7 @@ func (ra *RedoApplier) applyDDL(
// compatible with old arch
if ra.needRecoveryInfo && ddl.DDL.CommitTs == checkpointTs {
if _, ok := unsupportedDDL[timodel.ActionType(ddl.Type)]; ok {
log.Error("ignore unsupported DDL", zap.Any("ddl", ddl))
log.Warn("ignore unsupported DDL", zap.String("ddl", ddl.DDL.Query))
return true
}
}
Expand All @@ -332,7 +334,7 @@ func (ra *RedoApplier) applyDDL(
if shouldSkip() {
return nil
}
log.Warn("apply DDL", zap.Any("ddl", ddl))
log.Warn("apply DDL", zap.String("ddl", ddl.DDL.Query))
// Wait block tables to flush data before applying DDL.
tableIDs := ra.getBlockTableIDs(ddl.DDL.BlockedTables)
for tableID := range tableIDs {
Expand Down
Loading
Loading