diff --git a/downstreamadapter/dispatcher/basic_dispatcher.go b/downstreamadapter/dispatcher/basic_dispatcher.go index d70efaed03..cc2006c320 100644 --- a/downstreamadapter/dispatcher/basic_dispatcher.go +++ b/downstreamadapter/dispatcher/basic_dispatcher.go @@ -311,7 +311,7 @@ func (d *BasicDispatcher) AddBlockEventToSink(event commonEvent.BlockEvent) erro // If NotSync is true, it means the DDL should not be sent to downstream. // So we just call PassBlockEventToSink to update the table progress and call the postFlush func. if ddl.NotSync { - log.Info("ignore DDL by NotSync", zap.Stringer("dispatcher", d.id), zap.Any("ddl", ddl)) + log.Info("ignore DDL by NotSync", zap.Stringer("dispatcher", d.id), zap.String("ddl", ddl.GetDDLQuery())) d.PassBlockEventToSink(event) return nil } @@ -539,7 +539,7 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC cost := time.Since(now) d.sharedInfo.metricHandleDDLHis.Observe(cost.Seconds()) log.Debug("dispatcher handle ddl event finish", - zap.Duration("cost", cost), zap.Any("ddl", ddl)) + zap.Duration("cost", cost), zap.String("query", ddl.Query)) }) d.DealWithBlockEvent(ddl) case commonEvent.TypeSyncPointEvent: diff --git a/downstreamadapter/eventcollector/dispatcher_stat.go b/downstreamadapter/eventcollector/dispatcher_stat.go index 794493f170..0153b3dc16 100644 --- a/downstreamadapter/eventcollector/dispatcher_stat.go +++ b/downstreamadapter/eventcollector/dispatcher_stat.go @@ -261,7 +261,6 @@ func (d *dispatcherStat) shouldForwardEventByCommitTs(event dispatcher.Dispatche zap.Stringer("changefeedID", d.target.GetChangefeedID()), zap.Int64("tableID", d.target.GetTableSpan().TableID), zap.Stringer("dispatcher", d.getDispatcherID()), - zap.Any("event", event.Event), zap.Uint64("eventCommitTs", event.GetCommitTs()), zap.Uint64("sentCommitTs", d.lastEventCommitTs.Load())) return false @@ -411,7 +410,6 @@ func (d *dispatcherStat) handleSingleDataEvents(events []dispatcher.DispatcherEv zap.Stringer("changefeedID", d.target.GetChangefeedID()), zap.Stringer("dispatcher", d.getDispatcherID()), zap.String("eventType", commonEvent.TypeToString(events[0].GetType())), - zap.Any("event", events[0].Event), zap.Uint64("eventEpoch", events[0].GetEpoch()), zap.Uint64("dispatcherEpoch", state.epoch), zap.Stringer("staleEventService", *from), diff --git a/downstreamadapter/sink/blackhole/sink.go b/downstreamadapter/sink/blackhole/sink.go index 1b05dcac03..984b2d3079 100644 --- a/downstreamadapter/sink/blackhole/sink.go +++ b/downstreamadapter/sink/blackhole/sink.go @@ -67,7 +67,7 @@ func (s *Sink) WriteBlockEvent(event commonEvent.BlockEvent) error { e := event.(*commonEvent.DDLEvent) // NOTE: don't change the log, integration test `lossy_ddl` depends on it. // ref: https://github.com/pingcap/ticdc/blob/da834db76e0662ff15ef12645d1f37bfa6506d83/tests/integration_tests/lossy_ddl/run.sh#L17 - log.Debug("BlackHoleSink: DDL Event", zap.Any("ddl", e)) + log.Debug("BlackHoleSink: DDL Event", zap.String("ddl", e.GetDDLQuery())) ddlType := e.GetDDLType().String() err := s.statistics.RecordDDLExecution(func() (string, error) { return ddlType, nil diff --git a/downstreamadapter/sink/cloudstorage/writer_test.go b/downstreamadapter/sink/cloudstorage/writer_test.go index 4afb979a0b..c85e275bd5 100644 --- a/downstreamadapter/sink/cloudstorage/writer_test.go +++ b/downstreamadapter/sink/cloudstorage/writer_test.go @@ -151,15 +151,18 @@ func TestWriterRun(t *testing.T) { _ = d.run(ctx) }() + dataFileName := fmt.Sprintf("CDC_%s_000001.json", dispatcherID.String()) + indexFileName := fmt.Sprintf("CDC_%s.index", dispatcherID.String()) require.Eventually(t, func() bool { - files, err := os.ReadDir(table1Dir) - return err == nil && len(files) == 2 + _, dataErr := os.Stat(path.Join(table1Dir, dataFileName)) + _, indexErr := os.Stat(path.Join(table1Dir, "meta", indexFileName)) + return dataErr == nil && indexErr == nil }, 10*time.Second, 100*time.Millisecond) // check whether files for table1 has been generated fileNames := getTableFiles(t, table1Dir) require.Len(t, fileNames, 2) - require.ElementsMatch(t, []string{fmt.Sprintf("CDC_%s_000001.json", dispatcherID.String()), fmt.Sprintf("CDC_%s.index", dispatcherID.String())}, fileNames) + require.ElementsMatch(t, []string{dataFileName, indexFileName}, fileNames) cancel() wg.Wait() } diff --git a/downstreamadapter/sink/kafka/sink.go b/downstreamadapter/sink/kafka/sink.go index 51f040e3c3..e6d1038b26 100644 --- a/downstreamadapter/sink/kafka/sink.go +++ b/downstreamadapter/sink/kafka/sink.go @@ -375,17 +375,12 @@ func (s *sink) sendMessages(ctx context.Context) error { metricSendMessageDuration := metrics.WorkerSendMessageDuration.WithLabelValues(s.changefeedID.Keyspace(), s.changefeedID.Name()) defer metrics.WorkerSendMessageDuration.DeleteLabelValues(s.changefeedID.Keyspace(), s.changefeedID.Name()) - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - var err error outCh := s.comp.encoderGroup.Output() for { select { case <-ctx.Done(): return errors.Trace(ctx.Err()) - case <-ticker.C: - s.dmlProducer.Heartbeat() case future, ok := <-outCh: if !ok { log.Info("kafka sink encoder's output channel closed", @@ -483,9 +478,6 @@ func (s *sink) sendCheckpoint(ctx context.Context) error { metrics.CheckpointTsMessageCount.DeleteLabelValues(s.changefeedID.Keyspace(), s.changefeedID.Name()) }() - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - var ( msg *common.Message partitionNum int32 @@ -495,8 +487,6 @@ func (s *sink) sendCheckpoint(ctx context.Context) error { select { case <-ctx.Done(): return errors.Trace(ctx.Err()) - case <-ticker.C: - s.ddlProducer.Heartbeat() case ts, ok := <-s.checkpointChan: if !ok { log.Warn("kafka sink checkpoint channel closed", diff --git a/downstreamadapter/sink/kafka/sink_test.go b/downstreamadapter/sink/kafka/sink_test.go index 2e6c5089f7..0bb4708f58 100644 --- a/downstreamadapter/sink/kafka/sink_test.go +++ b/downstreamadapter/sink/kafka/sink_test.go @@ -17,11 +17,9 @@ import ( "context" "fmt" "net/url" - "sync" "testing" "time" - "github.com/IBM/sarama/mocks" "github.com/pingcap/errors" "github.com/pingcap/ticdc/downstreamadapter/sink/helper" "github.com/pingcap/ticdc/pkg/common" @@ -104,93 +102,6 @@ func newKafkaSinkForTest(ctx context.Context) (*sink, error) { return newKafkaSinkForTestWithProducers(ctx, nil, nil) } -// mockSyncProducer is used to count the calls to Heartbeat. -type mockSyncProducer struct { - kafka.MockSaramaSyncProducer - heartbeatCount int - mu sync.Mutex -} - -func (m *mockSyncProducer) Heartbeat() { - m.mu.Lock() - defer m.mu.Unlock() - m.heartbeatCount++ -} - -func (m *mockSyncProducer) GetHeartbeatCount() int { - m.mu.Lock() - defer m.mu.Unlock() - return m.heartbeatCount -} - -func TestDDLProducerHeartbeat(t *testing.T) { - t.Parallel() - - ctx, cancel := context.WithCancel(context.Background()) - producer := &mockSyncProducer{} - heartbeatInterval := 5 * time.Second - _, err := newKafkaSinkForTestWithProducers(ctx, nil, producer) - require.NoError(t, err) - - // Wait for a sufficient amount of time to ensure the heartbeat ticker triggers several times. - // Waiting for 11 seconds to allow for at least two heartbeats. - // Use Eventually to avoid test flakiness. - require.Eventually(t, func() bool { - return producer.GetHeartbeatCount() >= 2 - }, 11*time.Second, 150*time.Millisecond, "Heartbeat should be called periodically") - - // Verify that closing the manager stops the heartbeat. - countBeforeClose := producer.GetHeartbeatCount() - cancel() - // Wait for a short period to ensure no new heartbeats occur. - time.Sleep(heartbeatInterval * 2) - require.Equal(t, countBeforeClose, producer.GetHeartbeatCount(), "Heartbeat should stop after manager is closed") -} - -// mockSyncProducer is used to count the calls to Heartbeat. -type mockAsyncProducer struct { - kafka.MockSaramaAsyncProducer - heartbeatCount int - mu sync.Mutex -} - -func (m *mockAsyncProducer) Heartbeat() { - m.mu.Lock() - defer m.mu.Unlock() - m.heartbeatCount++ -} - -func (m *mockAsyncProducer) GetHeartbeatCount() int { - m.mu.Lock() - defer m.mu.Unlock() - return m.heartbeatCount -} - -func TestDMLProducerHeartbeat(t *testing.T) { - t.Parallel() - - ctx, cancel := context.WithCancel(context.Background()) - producer := &mockAsyncProducer{} - producer.AsyncProducer = mocks.NewAsyncProducer(t, nil) - heartbeatInterval := 5 * time.Second - _, err := newKafkaSinkForTestWithProducers(ctx, producer, nil) - require.NoError(t, err) - - // Wait for a sufficient amount of time to ensure the heartbeat ticker triggers several times. - // Waiting for 11 seconds to allow for at least two heartbeats. - // Use Eventually to avoid test flakiness. - require.Eventually(t, func() bool { - return producer.GetHeartbeatCount() >= 2 - }, 11*time.Second, 150*time.Millisecond, "Heartbeat should be called periodically") - - // Verify that closing the manager stops the heartbeat. - countBeforeClose := producer.GetHeartbeatCount() - cancel() - // Wait for a short period to ensure no new heartbeats occur. - time.Sleep(heartbeatInterval * 2) - require.Equal(t, countBeforeClose, producer.GetHeartbeatCount(), "Heartbeat should stop after manager is closed") -} - func TestKafkaSinkBasicFunctionality(t *testing.T) { helper := commonEvent.NewEventTestHelper(t) defer helper.Close() diff --git a/downstreamadapter/sink/topicmanager/kafka_topic_manager.go b/downstreamadapter/sink/topicmanager/kafka_topic_manager.go index c6963573d8..8e92167327 100644 --- a/downstreamadapter/sink/topicmanager/kafka_topic_manager.go +++ b/downstreamadapter/sink/topicmanager/kafka_topic_manager.go @@ -46,9 +46,6 @@ type kafkaTopicManager struct { cfg *kafka.AutoCreateTopicConfig topics sync.Map - - metaRefreshTicker *time.Ticker - // cancel is used to cancel the background goroutine. cancel context.CancelFunc } @@ -81,11 +78,10 @@ func newKafkaTopicManager( cfg *kafka.AutoCreateTopicConfig, ) *kafkaTopicManager { mgr := &kafkaTopicManager{ - defaultTopic: defaultTopic, - changefeedID: changefeedID, - admin: admin, - cfg: cfg, - metaRefreshTicker: time.NewTicker(metaRefreshInterval), + defaultTopic: defaultTopic, + changefeedID: changefeedID, + admin: admin, + cfg: cfg, } ctx, mgr.cancel = context.WithCancel(ctx) @@ -115,7 +111,7 @@ func (m *kafkaTopicManager) GetPartitionNum( } func (m *kafkaTopicManager) backgroundRefreshMeta(ctx context.Context) { - ticker := time.NewTicker(5 * time.Second) + ticker := time.NewTicker(metaRefreshInterval) defer ticker.Stop() for { select { @@ -126,8 +122,6 @@ func (m *kafkaTopicManager) backgroundRefreshMeta(ctx context.Context) { ) return case <-ticker.C: - m.admin.Heartbeat() - case <-m.metaRefreshTicker.C: // We ignore the error here, because the error may be caused by the // network problem, and we can try to get the metadata next time. topicPartitionNums, _ := m.fetchAllTopicsPartitionsNum(ctx) diff --git a/downstreamadapter/sink/topicmanager/kafka_topic_manager_test.go b/downstreamadapter/sink/topicmanager/kafka_topic_manager_test.go index 49231e3116..bf02658b24 100644 --- a/downstreamadapter/sink/topicmanager/kafka_topic_manager_test.go +++ b/downstreamadapter/sink/topicmanager/kafka_topic_manager_test.go @@ -15,63 +15,13 @@ package topicmanager import ( "context" - "sync" "testing" - "time" "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/sink/kafka" "github.com/stretchr/testify/require" ) -// mockAdminClientForHeartbeat is used to count the calls to Heartbeat. -type mockAdminClientForHeartbeat struct { - kafka.ClusterAdminClientMockImpl - heartbeatCount int - mu sync.Mutex -} - -func (m *mockAdminClientForHeartbeat) Heartbeat() { - m.mu.Lock() - defer m.mu.Unlock() - m.heartbeatCount++ -} - -func (m *mockAdminClientForHeartbeat) GetHeartbeatCount() int { - m.mu.Lock() - defer m.mu.Unlock() - return m.heartbeatCount -} - -func TestKafkaTopicManagerHeartbeat(t *testing.T) { - t.Parallel() - - adminClient := &mockAdminClientForHeartbeat{} - cfg := &kafka.AutoCreateTopicConfig{AutoCreate: false} - changefeedID := common.NewChangefeedID4Test("test", "test") - ctx, cancel := context.WithCancel(context.Background()) - - heartbeatInterval := 5 * time.Second - manager := newKafkaTopicManager(ctx, "topic", changefeedID, adminClient, cfg) - - // Ensure the context is canceled at the end of the test. - defer cancel() - - // Wait for a sufficient amount of time to ensure the heartbeat ticker triggers several times. - // Waiting for 11 seconds to allow for at least two heartbeats. - // Use Eventually to avoid test flakiness. - require.Eventually(t, func() bool { - return adminClient.GetHeartbeatCount() >= 2 - }, 11*time.Second, 150*time.Millisecond, "Heartbeat should be called periodically") - - // Verify that closing the manager stops the heartbeat. - countBeforeClose := adminClient.GetHeartbeatCount() - manager.Close() - // Wait for a short period to ensure no new heartbeats occur. - time.Sleep(heartbeatInterval * 2) - require.Equal(t, countBeforeClose, adminClient.GetHeartbeatCount(), "Heartbeat should stop after manager is closed") -} - func TestCreateTopic(t *testing.T) { t.Parallel() diff --git a/go.mod b/go.mod index fa56afbcaa..c978877cbb 100644 --- a/go.mod +++ b/go.mod @@ -55,7 +55,7 @@ require ( github.com/pingcap/tidb v1.1.0-beta.0.20260611081847-e7463a21c08f github.com/pingcap/tidb-dashboard v0.0.0-20260603054940-9e92195886c3 github.com/pingcap/tidb/pkg/parser v0.0.0-20260611093756-46989375079c - github.com/pingcap/tiflow v0.0.0-20260612064548-0206c5481cab + github.com/pingcap/tiflow v0.0.0-20260615112939-12921db8ebdd github.com/prometheus/client_golang v1.23.0 github.com/r3labs/diff v1.1.0 github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 @@ -401,4 +401,4 @@ replace github.com/pingcap/tipb => github.com/pingcap/tipb v0.0.0-20260605083900 replace github.com/apache/arrow-go/v18 => github.com/joechenrh/arrow-go/v18 v18.0.0-20250911101656-62c34c9a3b82 -replace github.com/IBM/sarama v1.41.2 => github.com/pingcap/sarama v1.41.2-pingcap-20251202-x +replace github.com/IBM/sarama v1.41.2 => github.com/pingcap/sarama v1.41.2-pingcap-20260508 diff --git a/go.sum b/go.sum index 1cd11e423d..c28da88e10 100644 --- a/go.sum +++ b/go.sum @@ -771,8 +771,8 @@ github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuR github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9 h1:qG9BSvlWFEE5otQGamuWedx9LRm0nrHvsQRQiW8SxEs= github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9/go.mod h1:ORfBOFp1eteu2odzsyaxI+b8TzJwgjwyQcGhI+9SfEA= -github.com/pingcap/sarama v1.41.2-pingcap-20251202-x h1:9Vi3qqyDNZxG6fnXQhpeTsnwzSBWNpMeb8o02JkL9JM= -github.com/pingcap/sarama v1.41.2-pingcap-20251202-x/go.mod h1:xdpu7sd6OE1uxNdjYTSKUfY8FaKkJES9/+EyjSgiGQk= +github.com/pingcap/sarama v1.41.2-pingcap-20260508 h1:3ZFtYLUGMMZeA6U0iz3EyFnNGPHu3qOuPLj5wXxHmeU= +github.com/pingcap/sarama v1.41.2-pingcap-20260508/go.mod h1:PIL6ZKKKhm19IbQpmpJcFnybAi1yXtgLAitDAeBdNCw= github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5 h1:T4pXRhBflzDeAhmOQHNPRRogMYxP13V7BkYw3ZsoSfE= github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5/go.mod h1:rlimy0GcTvjiJqvD5mXTRr8O2eNZPBrcUgiWVYp9530= github.com/pingcap/tidb v1.1.0-beta.0.20260611081847-e7463a21c08f h1:7elbZEXrLr23AYzie0Pzp8djmTGS4cUlkAg/Of2MIF8= @@ -781,8 +781,8 @@ github.com/pingcap/tidb-dashboard v0.0.0-20260603054940-9e92195886c3 h1:q7K62HEN github.com/pingcap/tidb-dashboard v0.0.0-20260603054940-9e92195886c3/go.mod h1:dZXZ7smJd5ZkwDMHnPfrhnzUFNbL5kMU5MpFxRfKr0U= github.com/pingcap/tidb/pkg/parser v0.0.0-20260611093756-46989375079c h1:fZ3Hv66/w2zPhIqZ6+gtkx2e0GbGbhgXXGkPKnD+wu0= github.com/pingcap/tidb/pkg/parser v0.0.0-20260611093756-46989375079c/go.mod h1:49aXiDQhHi2UF15fq7zqtJZlKm+y+oIu9BVgWh9++Lk= -github.com/pingcap/tiflow v0.0.0-20260612064548-0206c5481cab h1:hMcP9VDvH3kosqdk/HO0284KxUDf0SK+U42T+5+fHfA= -github.com/pingcap/tiflow v0.0.0-20260612064548-0206c5481cab/go.mod h1:RBWcseGMX5bSsduiDcile5Q7aveYMJONAcg2STve8aY= +github.com/pingcap/tiflow v0.0.0-20260615112939-12921db8ebdd h1:kQXYaSow3246cDF6CULbBrV20WxV5ouabyPT7TmSMGM= +github.com/pingcap/tiflow v0.0.0-20260615112939-12921db8ebdd/go.mod h1:RBWcseGMX5bSsduiDcile5Q7aveYMJONAcg2STve8aY= github.com/pingcap/tipb v0.0.0-20260605083900-f9f651ef5fbc h1:wxolKysltFSu8gxWJBdUdWuTBoSuY3MjNIIZI5S9JLY= github.com/pingcap/tipb v0.0.0-20260605083900-f9f651ef5fbc/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= diff --git a/pkg/applier/redo.go b/pkg/applier/redo.go index f07c88f8c2..e43d9e4c44 100644 --- a/pkg/applier/redo.go +++ b/pkg/applier/redo.go @@ -293,12 +293,14 @@ func (ra *RedoApplier) applyDDL( timodel.ActionAlterTablePartitionPlacement, timodel.ActionRecoverSchema: tableDDLTs = ra.getTableDDLTs(commonType.DDLSpanTableID) default: - log.Warn("ignore unsupport DDL", zap.Any("ddl", ddl), zap.Any("type", ddl.Type)) + log.Warn("ignore unsupport DDL", zap.String("ddl", ddl.DDL.Query)) return true } } if tableDDLTs.ts >= int64(ddl.DDL.CommitTs) { - log.Warn("ignore DDL which commit ts is less than current ts", zap.Any("ddl", ddl), zap.Any("startTs", tableDDLTs.ts)) + log.Warn("ignore DDL which commit ts is less than current ts", + zap.Uint64("commitTs", ddl.DDL.CommitTs), zap.Int64("startTs", tableDDLTs.ts), + zap.String("ddl", ddl.DDL.Query)) // Ignore the previous dml events, because the drop ddl has replicated the downstream // DML + Drop Table: If the drop table ddl is ignored and the previous dmls should be replicated the downstream in the past. if ddl.DDL.NeedDroppedTables != nil { @@ -323,7 +325,7 @@ func (ra *RedoApplier) applyDDL( // compatible with old arch if ra.needRecoveryInfo && ddl.DDL.CommitTs == checkpointTs { if _, ok := unsupportedDDL[timodel.ActionType(ddl.Type)]; ok { - log.Error("ignore unsupported DDL", zap.Any("ddl", ddl)) + log.Warn("ignore unsupported DDL", zap.String("ddl", ddl.DDL.Query)) return true } } @@ -332,7 +334,7 @@ func (ra *RedoApplier) applyDDL( if shouldSkip() { return nil } - log.Warn("apply DDL", zap.Any("ddl", ddl)) + log.Warn("apply DDL", zap.String("ddl", ddl.DDL.Query)) // Wait block tables to flush data before applying DDL. tableIDs := ra.getBlockTableIDs(ddl.DDL.BlockedTables) for tableID := range tableIDs { diff --git a/pkg/common/table_info.go b/pkg/common/table_info.go index 2ab20bed94..5463382229 100644 --- a/pkg/common/table_info.go +++ b/pkg/common/table_info.go @@ -113,6 +113,25 @@ type TableInfo struct { } `json:"-"` } +func (ti *TableInfo) String() string { + if ti == nil { + return "" + } + return fmt.Sprintf( + "TableInfo{schema:%s, table:%s, tableID:%d, isPartition:%t, targetSchema:%s, targetTable:%s, charset:%s, collate:%s, hasPKOrNotNullUK:%t, updateTS:%d}", + ti.TableName.Schema, + ti.TableName.Table, + ti.TableName.TableID, + ti.TableName.IsPartition, + ti.TableName.GetTargetSchema(), + ti.TableName.GetTargetTable(), + ti.Charset, + ti.Collate, + ti.HasPKOrNotNullUK, + ti.UpdateTS, + ) +} + func (ti *TableInfo) initPreSQLs() { if ti == nil || ti.columnSchema == nil { return diff --git a/pkg/sink/kafka/admin.go b/pkg/sink/kafka/admin.go index 41298460d8..b8cfd1cfc3 100644 --- a/pkg/sink/kafka/admin.go +++ b/pkg/sink/kafka/admin.go @@ -180,13 +180,6 @@ func (a *saramaAdminClient) CreateTopic(detail *TopicDetail, validateOnly bool) return nil } -func (a *saramaAdminClient) Heartbeat() { - brokers := a.client.Brokers() - for _, b := range brokers { - _, _ = b.ApiVersions(&sarama.ApiVersionsRequest{}) - } -} - func (a *saramaAdminClient) Close() { // For admins created via sarama.NewClusterAdminFromClient, admin.Close() takes care // of closing the underlying client as well. Fall back to closing the client directly diff --git a/pkg/sink/kafka/cluster_admin_client.go b/pkg/sink/kafka/cluster_admin_client.go index ffa70e516d..3c6c331c88 100644 --- a/pkg/sink/kafka/cluster_admin_client.go +++ b/pkg/sink/kafka/cluster_admin_client.go @@ -47,8 +47,6 @@ type ClusterAdminClient interface { // CreateTopic creates a new topic. CreateTopic(detail *TopicDetail, validateOnly bool) error - Heartbeat() - // Close shuts down the admin client. Close() } diff --git a/pkg/sink/kafka/factory.go b/pkg/sink/kafka/factory.go index 54b9d5b60a..72a458a508 100644 --- a/pkg/sink/kafka/factory.go +++ b/pkg/sink/kafka/factory.go @@ -48,8 +48,6 @@ type SyncProducer interface { // SendMessages will return an error. SendMessages(topic string, partitionNum int32, message *common.Message) error - Heartbeat() - // Close shuts down the producer; you must call this function before a producer // object passes out of scope, as it may otherwise leak memory. // You must call this before calling Close on the underlying client. @@ -69,8 +67,6 @@ type AsyncProducer interface { // wish to send. AsyncSend(ctx context.Context, topic string, partition int32, message *common.Message) error - Heartbeat() - // AsyncRunCallback process the messages that has sent to kafka, // and run tha attached callback. the caller should call this // method in a background goroutine diff --git a/pkg/sink/kafka/mock_cluster_admin_client.go b/pkg/sink/kafka/mock_cluster_admin_client.go index d7e00780ab..99a12388dd 100644 --- a/pkg/sink/kafka/mock_cluster_admin_client.go +++ b/pkg/sink/kafka/mock_cluster_admin_client.go @@ -94,8 +94,6 @@ func NewClusterAdminClientMockImpl() *ClusterAdminClientMockImpl { } } -func (c *ClusterAdminClientMockImpl) Heartbeat() {} - // GetAllBrokers implement the ClusterAdminClient interface func (c *ClusterAdminClientMockImpl) GetAllBrokers() []Broker { return nil diff --git a/pkg/sink/kafka/mock_factory.go b/pkg/sink/kafka/mock_factory.go index ddc0362e61..5a4bc64092 100644 --- a/pkg/sink/kafka/mock_factory.go +++ b/pkg/sink/kafka/mock_factory.go @@ -107,10 +107,6 @@ func (m *MockSaramaSyncProducer) Close() { _ = m.SyncProducer.Close() } -func (m *MockSaramaSyncProducer) Heartbeat() { - return -} - // MockSaramaAsyncProducer is a mock implementation of AsyncProducer interface. type MockSaramaAsyncProducer struct { AsyncProducer *mocks.AsyncProducer @@ -176,10 +172,6 @@ func (p *MockSaramaAsyncProducer) AsyncSend(ctx context.Context, topic string, p return nil } -func (p *MockSaramaAsyncProducer) Heartbeat() { - return -} - // Close implement the AsyncProducer interface. func (p *MockSaramaAsyncProducer) Close() { if p.closed { diff --git a/pkg/sink/kafka/options.go b/pkg/sink/kafka/options.go index 6a5624427f..c9b992814e 100644 --- a/pkg/sink/kafka/options.go +++ b/pkg/sink/kafka/options.go @@ -38,6 +38,8 @@ import ( const ( // defaultPartitionNum specifies the default number of partitions when we create the topic. defaultPartitionNum = 3 + // defaultMaxRetry is the default retry budget for Kafka producers. + defaultMaxRetry = 5 // the `max-message-bytes` is set equal to topic's `max.message.bytes`, and is used to check // whether the message is larger than the max size limit. It's found some message pass the message @@ -62,10 +64,6 @@ const ( // See: https://kafka.apache.org/documentation/#brokerconfigs_min.insync.replicas and // https://kafka.apache.org/documentation/#topicconfigs_min.insync.replicas MinInsyncReplicasConfigName = "min.insync.replicas" - // BrokerConnectionsMaxIdleMsConfigName specifies the maximum idle time of a connection to a broker. - // Broker will close the connection if it is idle for this long. - // See: https://kafka.apache.org/documentation/#brokerconfigs_connections.max.idle.ms - BrokerConnectionsMaxIdleMsConfigName = "connections.max.idle.ms" ) const ( @@ -119,6 +117,7 @@ type urlConfig struct { ReplicationFactor *int16 `form:"replication-factor"` KafkaVersion *string `form:"kafka-version"` MaxMessageBytes *int `form:"max-message-bytes"` + MaxRetry *int `form:"max-retry"` Compression *string `form:"compression"` KafkaClientID *string `form:"kafka-client-id"` AutoCreateTopic *bool `form:"auto-create-topic"` @@ -158,6 +157,7 @@ type options struct { IsAssignedVersion bool RequestVersion int16 MaxMessageBytes int + MaxRetry int Compression string ClientID string RequiredAcks RequiredAcks @@ -172,10 +172,9 @@ type options struct { SASL *security.SASL // Timeout for network configurations, default to `10s` - DialTimeout time.Duration - WriteTimeout time.Duration - ReadTimeout time.Duration - KeepConnAliveInterval time.Duration + DialTimeout time.Duration + WriteTimeout time.Duration + ReadTimeout time.Duration } // NewOptions returns a default Kafka configuration @@ -184,6 +183,7 @@ func NewOptions() *options { Version: "2.4.0", // MaxMessageBytes will be used to initialize producer MaxMessageBytes: config.DefaultMaxMessageBytes, + MaxRetry: defaultMaxRetry, ReplicationFactor: 1, Compression: "none", RequiredAcks: WaitForAll, @@ -261,6 +261,10 @@ func (o *options) Apply(changefeedID common.ChangeFeedID, o.MaxMessageBytes = *urlParameter.MaxMessageBytes } + if urlParameter.MaxRetry != nil && *urlParameter.MaxRetry >= 0 { + o.MaxRetry = *urlParameter.MaxRetry + } + if urlParameter.Compression != nil { o.Compression = *urlParameter.Compression } @@ -593,21 +597,6 @@ func adjustOptions( } } - // adjust keepConnAliveInterval by `connections.max.idle.ms` broker config. - idleMs, err := admin.GetBrokerConfig(BrokerConnectionsMaxIdleMsConfigName) - if err != nil { - log.Warn("GetBrokerConfig failed for connections.max.idle.ms", zap.Error(err)) - } else { - idleMsInt, err := strconv.Atoi(idleMs) - if err != nil || idleMsInt <= 0 { - log.Warn("invalid broker config", - zap.String("configName", BrokerConnectionsMaxIdleMsConfigName), zap.String("configValue", idleMs)) - return errors.Trace(err) - } - options.KeepConnAliveInterval = time.Duration(idleMsInt/3) * time.Millisecond - log.Info("Adjust KeepConnAliveInterval", zap.Duration("KeepConnAliveInterval", options.KeepConnAliveInterval)) - } - info, exists := topics[topic] // once we have found the topic, no matter `auto-create-topic`, // make sure user input parameters are valid. diff --git a/pkg/sink/kafka/options_test.go b/pkg/sink/kafka/options_test.go index 36c2f4e669..8f64d49762 100644 --- a/pkg/sink/kafka/options_test.go +++ b/pkg/sink/kafka/options_test.go @@ -24,10 +24,9 @@ import ( "github.com/IBM/sarama" "github.com/aws/aws-sdk-go/aws" - "github.com/pingcap/errors" commonType "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/config" - cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/sink/codec/common" "github.com/stretchr/testify/require" ) @@ -51,6 +50,7 @@ func TestCompleteOptions(t *testing.T) { require.Equal(t, "2.6.0", options.Version) require.Equal(t, 4096, options.MaxMessageBytes) require.Equal(t, WaitForLocal, options.RequiredAcks) + require.Equal(t, defaultMaxRetry, options.MaxRetry) // multiple kafka broker endpoints uri = "kafka://127.0.0.1:9092,127.0.0.1:9091,127.0.0.1:9090/kafka-test?" @@ -78,6 +78,14 @@ func TestCompleteOptions(t *testing.T) { err = options.Apply(commonType.NewChangefeedID4Test(commonType.DefaultKeyspaceName, "test"), sinkURI, config.GetDefaultReplicaConfig().Sink) require.Regexp(t, ".*invalid syntax.*", errors.Cause(err)) + // Illegal max-retry. + uri = "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&max-retry=a" + sinkURI, err = url.Parse(uri) + require.NoError(t, err) + options = NewOptions() + err = options.Apply(commonType.NewChangefeedID4Test(commonType.DefaultKeyspaceName, "test"), sinkURI, config.GetDefaultReplicaConfig().Sink) + require.Regexp(t, ".*invalid syntax.*", errors.Cause(err)) + // Illegal partition-num. uri = "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=a" sinkURI, err = url.Parse(uri) @@ -108,7 +116,33 @@ func TestCompleteOptions(t *testing.T) { require.NoError(t, err) options = NewOptions() err = options.Apply(commonType.NewChangefeedID4Test(commonType.DefaultKeyspaceName, "test"), sinkURI, config.GetDefaultReplicaConfig().Sink) - require.True(t, cerror.ErrKafkaInvalidClientID.Equal(err)) + require.True(t, errors.ErrKafkaInvalidClientID.Equal(err)) + + // max-retry accepts non-negative sink-uri values. + uri = "kafka://127.0.0.1:9092/abc?max-retry=7" + sinkURI, err = url.Parse(uri) + require.NoError(t, err) + options = NewOptions() + err = options.Apply(commonType.NewChangefeedID4Test(commonType.DefaultKeyspaceName, "test"), sinkURI, config.GetDefaultReplicaConfig().Sink) + require.NoError(t, err) + require.Equal(t, 7, options.MaxRetry) + + uri = "kafka://127.0.0.1:9092/abc?max-retry=0" + sinkURI, err = url.Parse(uri) + require.NoError(t, err) + options = NewOptions() + err = options.Apply(commonType.NewChangefeedID4Test(commonType.DefaultKeyspaceName, "test"), sinkURI, config.GetDefaultReplicaConfig().Sink) + require.NoError(t, err) + require.Equal(t, 0, options.MaxRetry) + + // Negative max-retry values are ignored. + uri = "kafka://127.0.0.1:9092/abc?max-retry=-1" + sinkURI, err = url.Parse(uri) + require.NoError(t, err) + options = NewOptions() + err = options.Apply(commonType.NewChangefeedID4Test(commonType.DefaultKeyspaceName, "test"), sinkURI, config.GetDefaultReplicaConfig().Sink) + require.NoError(t, err) + require.Equal(t, defaultMaxRetry, options.MaxRetry) } func TestSetPartitionNum(t *testing.T) { @@ -124,7 +158,7 @@ func TestSetPartitionNum(t *testing.T) { options.PartitionNum = 3 err = options.setPartitionNum(2) - require.True(t, cerror.ErrKafkaInvalidPartitionNum.Equal(err)) + require.True(t, errors.ErrKafkaInvalidPartitionNum.Equal(err)) } func TestClientID(t *testing.T) { @@ -783,75 +817,3 @@ func TestMerge(t *testing.T) { require.Equal(t, "cert.pem", c.Credential.CertPath) require.Equal(t, "key.pem", c.Credential.KeyPath) } - -// mockAdminClientForAdjust mocks the ClusterAdminClient to test adjustOptions. -type mockAdminClientForAdjust struct { - ClusterAdminClientMockImpl // We know there is a Mock implementation from the diff - brokerConfigValue string - shouldError bool -} - -// GetBrokerConfig simulates the behavior of getting configuration from a broker. -func (m *mockAdminClientForAdjust) GetBrokerConfig(configName string) (string, error) { - if m.shouldError { - return "", errors.New("mock error: cannot get broker config") - } - if configName == BrokerConnectionsMaxIdleMsConfigName { - return m.brokerConfigValue, nil - } - return "", errors.Errorf("unexpected config name: %s", configName) -} - -func TestAdjustOptionsKeepAlive(t *testing.T) { - t.Parallel() - ctx := context.Background() - - // Case 1: Successful adjustment. - // The broker returns a valid idle time, KeepConnAliveInterval should be set to 1/3 of it. - t.Run("SuccessfulAdjustment", func(t *testing.T) { - t.Parallel() - o := NewOptions() - adminClient := &mockAdminClientForAdjust{ - ClusterAdminClientMockImpl: *NewClusterAdminClientMockImpl(), - brokerConfigValue: "300000", // 300,000 ms = 300 s - } - err := adjustOptions(ctx, adminClient, o, adminClient.GetDefaultMockTopicName()) - require.NoError(t, err) - // Expected value is 300000ms / 3 = 100000ms = 100s - require.Equal(t, 100*time.Second, o.KeepConnAliveInterval) - }) - - // Case 2: Broker returns an invalid (non-integer) config value. - t.Run("InvalidNonIntegerConfig", func(t *testing.T) { - t.Parallel() - o := NewOptions() - adminClient := &mockAdminClientForAdjust{ - ClusterAdminClientMockImpl: *NewClusterAdminClientMockImpl(), - brokerConfigValue: "not-a-number", - } - err := adjustOptions(ctx, adminClient, o, adminClient.GetDefaultMockTopicName()) - require.Error(t, err) - // The error should be a type conversion error. - _, ok := errors.Cause(err).(*strconv.NumError) - require.True(t, ok, "error should be of type strconv.NumError") - }) - - // Case 3: Broker returns an invalid (zero or negative) config value. - // According to the code in the diff, this case will log a warning and return a nil error, - // and the configuration item will not be updated. - t.Run("InvalidZeroOrNegativeConfig", func(t *testing.T) { - t.Parallel() - for _, val := range []string{"0", "-1000"} { - o := NewOptions() - defaultInterval := o.KeepConnAliveInterval - adminClient := &mockAdminClientForAdjust{ - ClusterAdminClientMockImpl: *NewClusterAdminClientMockImpl(), - brokerConfigValue: val, - } - err := adjustOptions(ctx, adminClient, o, adminClient.GetDefaultMockTopicName()) - require.NoError(t, err, "should not return error for zero or negative idle time") - // KeepConnAliveInterval should remain its default value. - require.Equal(t, defaultInterval, o.KeepConnAliveInterval, "interval should not be changed") - } - }) -} diff --git a/pkg/sink/kafka/sarama_async_producer.go b/pkg/sink/kafka/sarama_async_producer.go index 58b723af16..9c0bd82104 100644 --- a/pkg/sink/kafka/sarama_async_producer.go +++ b/pkg/sink/kafka/sarama_async_producer.go @@ -146,13 +146,6 @@ func (p *saramaAsyncProducer) handleProducerError(err *sarama.ProducerError) err return cerror.WrapError(cerror.ErrKafkaAsyncSendMessage, errWithInfo) } -func (p *saramaAsyncProducer) Heartbeat() { - brokers := p.client.Brokers() - for _, b := range brokers { - _, _ = b.ApiVersions(&sarama.ApiVersionsRequest{}) - } -} - // AsyncSend is the input channel for the user to write messages to that they // wish to send. func (p *saramaAsyncProducer) AsyncSend( diff --git a/pkg/sink/kafka/sarama_config.go b/pkg/sink/kafka/sarama_config.go index 59faa82610..b53dc47b37 100644 --- a/pkg/sink/kafka/sarama_config.go +++ b/pkg/sink/kafka/sarama_config.go @@ -45,22 +45,17 @@ func newSaramaConfig(ctx context.Context, o *options) (*sarama.Config, error) { config.Metadata.Retry.Max = 10 config.Metadata.Retry.Backoff = 200 * time.Millisecond config.Metadata.Timeout = 2 * time.Minute - // The kafka server side connections.max.idle.ms default value is 10 minutes. - // it will close the connection if idle for too long. - // so we need to refresh the metadata frequently to avoid the connection being closed by server, - // and then trigger the `fetching metadata: write broken pipe` error, it's annoying. - config.Metadata.RefreshFrequency = 9 * time.Minute - config.Admin.Retry.Max = 10 config.Admin.Retry.Backoff = 200 * time.Millisecond // This timeout control the request timeout for each admin request. // set it as the read timeout. config.Admin.Timeout = 10 * time.Second - // According to the https://github.com/IBM/sarama/issues/2619, - // sarama may send message out of order even set the `config.Net.MaxOpenRequest` to 1, - // when the kafka cluster is unhealthy and trigger the internal retry mechanism. - config.Producer.Retry.Max = 0 + // Keep a bounded producer retry budget to tolerate transient broker-side + // connection failures such as stale connections or broken pipe errors. + // The PingCAP Sarama fork includes the partition-muting ordering fix, while + // Net.MaxOpenRequests=1 below remains an extra ordering guard. + config.Producer.Retry.Max = o.MaxRetry config.Producer.Retry.Backoff = 100 * time.Millisecond // make sure sarama producer flush messages as soon as possible. diff --git a/pkg/sink/kafka/sarama_config_test.go b/pkg/sink/kafka/sarama_config_test.go index 6c85146117..bfb0147a1c 100644 --- a/pkg/sink/kafka/sarama_config_test.go +++ b/pkg/sink/kafka/sarama_config_test.go @@ -22,6 +22,7 @@ import ( "github.com/IBM/sarama" "github.com/gin-gonic/gin/binding" "github.com/pingcap/errors" + commonType "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/security" "github.com/stretchr/testify/require" @@ -54,6 +55,9 @@ func TestNewSaramaConfig(t *testing.T) { require.NoError(t, err) require.Equal(t, cc.expected, cfg.Producer.Compression) } + cfg, err := newSaramaConfig(ctx, options) + require.NoError(t, err) + require.Equal(t, defaultMaxRetry, cfg.Producer.Retry.Max) options.EnableTLS = true options.Credential = &security.Credential{ @@ -73,7 +77,7 @@ func TestNewSaramaConfig(t *testing.T) { SASLMechanism: sarama.SASLTypeSCRAMSHA256, } - cfg, err := newSaramaConfig(ctx, saslOptions) + cfg, err = newSaramaConfig(ctx, saslOptions) require.NoError(t, err) require.NotNil(t, cfg) require.Equal(t, "user", cfg.Net.SASL.User) @@ -81,6 +85,60 @@ func TestNewSaramaConfig(t *testing.T) { require.Equal(t, sarama.SASLMechanism("SCRAM-SHA-256"), cfg.Net.SASL.Mechanism) } +func TestNewSaramaConfigMaxRetryFromSinkURI(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + sinkURI string + expected int + }{ + { + name: "default max retry", + sinkURI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&kafka-client-id=unit-test", + expected: defaultMaxRetry, + }, + { + name: "set max retry", + sinkURI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0" + + "&kafka-client-id=unit-test&max-retry=7", + expected: 7, + }, + { + name: "zero max retry", + sinkURI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0" + + "&kafka-client-id=unit-test&max-retry=0", + expected: 0, + }, + { + name: "negative max retry", + sinkURI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0" + + "&kafka-client-id=unit-test&max-retry=-1", + expected: defaultMaxRetry, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + t.Parallel() + + options := NewOptions() + sinkURI, err := url.Parse(test.sinkURI) + require.NoError(t, err) + err = options.Apply( + commonType.NewChangefeedID4Test(commonType.DefaultKeyspaceName, "test"), + sinkURI, + config.GetDefaultReplicaConfig().Sink, + ) + require.NoError(t, err) + + cfg, err := newSaramaConfig(context.Background(), options) + require.NoError(t, err) + require.Equal(t, test.expected, cfg.Producer.Retry.Max) + }) + } +} + func TestApplySASL(t *testing.T) { t.Parallel() diff --git a/pkg/sink/kafka/sarama_factory.go b/pkg/sink/kafka/sarama_factory.go index 6d6c03aadc..650f346b4c 100644 --- a/pkg/sink/kafka/sarama_factory.go +++ b/pkg/sink/kafka/sarama_factory.go @@ -116,7 +116,6 @@ func (f *saramaFactory) SyncProducer(ctx context.Context) (SyncProducer, error) return nil, errors.WrapError(errors.ErrKafkaNewProducer, err) } config.MetricRegistry = f.metricRegistry - config.Producer.Retry.Max = 3 client, err := sarama.NewClient(f.option.BrokerEndpoints, config) if err != nil { diff --git a/pkg/sink/kafka/sarama_sync_producer.go b/pkg/sink/kafka/sarama_sync_producer.go index 4410e0aff9..9d5efdfb0b 100644 --- a/pkg/sink/kafka/sarama_sync_producer.go +++ b/pkg/sink/kafka/sarama_sync_producer.go @@ -101,16 +101,6 @@ func (p *saramaSyncProducer) SendMessages(topic string, partitionNum int32, mess return errors.WrapError(errors.ErrKafkaSendMessage, err) } -func (p *saramaSyncProducer) Heartbeat() { - if p.closed.Load() { - return - } - brokers := p.client.Brokers() - for _, b := range brokers { - _, _ = b.ApiVersions(&sarama.ApiVersionsRequest{}) - } -} - func (p *saramaSyncProducer) Close() { if p.closed.Load() { log.Warn("kafka DDL producer already closed", diff --git a/pkg/sink/mysql/mysql_writer_ddl.go b/pkg/sink/mysql/mysql_writer_ddl.go index 48009fcf5f..7c49cb3ef6 100644 --- a/pkg/sink/mysql/mysql_writer_ddl.go +++ b/pkg/sink/mysql/mysql_writer_ddl.go @@ -222,28 +222,27 @@ func (w *Writer) execDDLWithMaxRetries(event *commonEvent.DDLEvent) error { if errors.IsIgnorableMySQLDDLError(err) { // NOTE: don't change the log, some tests depend on it. log.Info("Execute DDL failed, but error can be ignored", - zap.String("ddl", event.Query), zap.Uint64("startTs", event.GetStartTs()), zap.Uint64("commitTs", event.GetCommitTs()), - zap.Error(err)) + zap.String("ddl", event.Query), zap.Error(err)) // If the error is ignorable, we will ignore the error directly. return nil } if w.cfg.IsTiDB && ddlCreateTime != "" && errors.Cause(err) == mysql.ErrInvalidConn { - log.Warn("Wait the asynchronous ddl to synchronize", zap.String("ddl", event.Query), zap.String("ddlCreateTime", ddlCreateTime), + log.Warn("Wait the asynchronous ddl to synchronize", zap.Uint64("startTs", event.GetStartTs()), zap.Uint64("commitTs", event.GetCommitTs()), + zap.String("ddlCreateTime", ddlCreateTime), zap.String("ddl", event.Query), zap.String("readTimeout", w.cfg.ReadTimeout), zap.Error(err)) return w.waitDDLDone(w.ctx, event, ddlCreateTime) } log.Warn("Execute DDL with error, retry later", - zap.String("ddl", event.Query), zap.Uint64("startTs", event.GetStartTs()), zap.Uint64("commitTs", event.GetCommitTs()), - zap.Error(err)) + zap.String("ddl", event.Query), zap.Error(err)) return errors.WrapError(errors.ErrExecDDLFailed, errors.WithMessage(err, fmt.Sprintf("Execute DDL failed, Query info: %s; ", event.GetDDLQuery()))) } log.Info("Execute DDL succeeded", - zap.String("changefeed", w.ChangefeedID.String()), zap.String("query", event.GetDDLQuery()), + zap.String("changefeed", w.ChangefeedID.String()), zap.Uint64("startTs", event.GetStartTs()), zap.Uint64("commitTs", event.GetCommitTs()), - zap.Any("ddl", event)) + zap.String("query", event.GetDDLQuery())) return nil }, retry.WithBackoffBaseDelay(BackoffBaseDelay.Milliseconds()), retry.WithBackoffMaxDelay(BackoffMaxDelay.Milliseconds()), diff --git a/tests/integration_tests/ddl_reentrant/run.sh b/tests/integration_tests/ddl_reentrant/run.sh index 4d0a0b6cbd..a92ef09316 100755 --- a/tests/integration_tests/ddl_reentrant/run.sh +++ b/tests/integration_tests/ddl_reentrant/run.sh @@ -88,7 +88,12 @@ function ddl_test() { echo $restored_sql >${WORK_DIR}/ddl_temp.sql ensure 10 check_ddl_executed "${WORK_DIR}/cdc.log" "${WORK_DIR}/ddl_temp.sql" true - ddl_finished_ts=$(grep "Execute DDL succeeded" ${WORK_DIR}/cdc.log | tail -n 1 | grep -oE 'FinishedTs: [0-9]+' | awk '{print $2}') + ddl_finished_ts=$(grep "Execute DDL succeeded" "${WORK_DIR}/cdc.log" | tail -n 1 | grep -oE 'commitTs=[0-9]+' | awk -F= '{print $2}') + if [[ -z "$ddl_finished_ts" ]]; then + echo "failed to parse ddl finished ts from cdc log" + grep "Execute DDL succeeded" "${WORK_DIR}/cdc.log" | tail -n 1 || true + exit 1 + fi cdc_cli_changefeed pause --changefeed-id=${changefeedid} cdc_cli_changefeed resume --no-confirm --changefeed-id=${changefeedid} --overwrite-checkpoint-ts=$((ddl_finished_ts - 1)) echo "resume changefeed ${changefeedid} from ${ddl_finished_ts}"