diff --git a/downstreamadapter/sink/kafka/sink.go b/downstreamadapter/sink/kafka/sink.go index a4cfaa8fc6..2f5f7dd005 100644 --- a/downstreamadapter/sink/kafka/sink.go +++ b/downstreamadapter/sink/kafka/sink.go @@ -375,17 +375,12 @@ func (s *sink) sendMessages(ctx context.Context) error { metricSendMessageDuration := metrics.WorkerSendMessageDuration.WithLabelValues(s.changefeedID.Keyspace(), s.changefeedID.Name()) defer metrics.WorkerSendMessageDuration.DeleteLabelValues(s.changefeedID.Keyspace(), s.changefeedID.Name()) - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - var err error outCh := s.comp.encoderGroup.Output() for { select { case <-ctx.Done(): return errors.Trace(ctx.Err()) - case <-ticker.C: - s.dmlProducer.Heartbeat() case future, ok := <-outCh: if !ok { log.Info("kafka sink encoder's output channel closed", @@ -483,9 +478,6 @@ func (s *sink) sendCheckpoint(ctx context.Context) error { metrics.CheckpointTsMessageCount.DeleteLabelValues(s.changefeedID.Keyspace(), s.changefeedID.Name()) }() - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - var ( msg *common.Message partitionNum int32 @@ -495,8 +487,6 @@ func (s *sink) sendCheckpoint(ctx context.Context) error { select { case <-ctx.Done(): return errors.Trace(ctx.Err()) - case <-ticker.C: - s.ddlProducer.Heartbeat() case ts, ok := <-s.checkpointChan: if !ok { log.Warn("kafka sink checkpoint channel closed", diff --git a/downstreamadapter/sink/kafka/sink_test.go b/downstreamadapter/sink/kafka/sink_test.go index bfbd704901..8e6b979fca 100644 --- a/downstreamadapter/sink/kafka/sink_test.go +++ b/downstreamadapter/sink/kafka/sink_test.go @@ -17,11 +17,9 @@ import ( "context" "fmt" "net/url" - "sync" "testing" "time" - "github.com/IBM/sarama/mocks" "github.com/pingcap/errors" "github.com/pingcap/ticdc/downstreamadapter/sink/helper" "github.com/pingcap/ticdc/pkg/common" @@ -104,93 +102,6 @@ func newKafkaSinkForTest(ctx context.Context) (*sink, error) { return newKafkaSinkForTestWithProducers(ctx, nil, nil) } -// mockSyncProducer is used to count the calls to Heartbeat. -type mockSyncProducer struct { - kafka.MockSaramaSyncProducer - heartbeatCount int - mu sync.Mutex -} - -func (m *mockSyncProducer) Heartbeat() { - m.mu.Lock() - defer m.mu.Unlock() - m.heartbeatCount++ -} - -func (m *mockSyncProducer) GetHeartbeatCount() int { - m.mu.Lock() - defer m.mu.Unlock() - return m.heartbeatCount -} - -func TestDDLProducerHeartbeat(t *testing.T) { - t.Parallel() - - ctx, cancel := context.WithCancel(context.Background()) - producer := &mockSyncProducer{} - heartbeatInterval := 5 * time.Second - _, err := newKafkaSinkForTestWithProducers(ctx, nil, producer) - require.NoError(t, err) - - // Wait for a sufficient amount of time to ensure the heartbeat ticker triggers several times. - // Waiting for 11 seconds to allow for at least two heartbeats. - // Use Eventually to avoid test flakiness. - require.Eventually(t, func() bool { - return producer.GetHeartbeatCount() >= 2 - }, 11*time.Second, 150*time.Millisecond, "Heartbeat should be called periodically") - - // Verify that closing the manager stops the heartbeat. - countBeforeClose := producer.GetHeartbeatCount() - cancel() - // Wait for a short period to ensure no new heartbeats occur. - time.Sleep(heartbeatInterval * 2) - require.Equal(t, countBeforeClose, producer.GetHeartbeatCount(), "Heartbeat should stop after manager is closed") -} - -// mockSyncProducer is used to count the calls to Heartbeat. -type mockAsyncProducer struct { - kafka.MockSaramaAsyncProducer - heartbeatCount int - mu sync.Mutex -} - -func (m *mockAsyncProducer) Heartbeat() { - m.mu.Lock() - defer m.mu.Unlock() - m.heartbeatCount++ -} - -func (m *mockAsyncProducer) GetHeartbeatCount() int { - m.mu.Lock() - defer m.mu.Unlock() - return m.heartbeatCount -} - -func TestDMLProducerHeartbeat(t *testing.T) { - t.Parallel() - - ctx, cancel := context.WithCancel(context.Background()) - producer := &mockAsyncProducer{} - producer.AsyncProducer = mocks.NewAsyncProducer(t, nil) - heartbeatInterval := 5 * time.Second - _, err := newKafkaSinkForTestWithProducers(ctx, producer, nil) - require.NoError(t, err) - - // Wait for a sufficient amount of time to ensure the heartbeat ticker triggers several times. - // Waiting for 11 seconds to allow for at least two heartbeats. - // Use Eventually to avoid test flakiness. - require.Eventually(t, func() bool { - return producer.GetHeartbeatCount() >= 2 - }, 11*time.Second, 150*time.Millisecond, "Heartbeat should be called periodically") - - // Verify that closing the manager stops the heartbeat. - countBeforeClose := producer.GetHeartbeatCount() - cancel() - // Wait for a short period to ensure no new heartbeats occur. - time.Sleep(heartbeatInterval * 2) - require.Equal(t, countBeforeClose, producer.GetHeartbeatCount(), "Heartbeat should stop after manager is closed") -} - func TestKafkaSinkBasicFunctionality(t *testing.T) { helper := commonEvent.NewEventTestHelper(t) defer helper.Close() diff --git a/downstreamadapter/sink/topicmanager/kafka_topic_manager.go b/downstreamadapter/sink/topicmanager/kafka_topic_manager.go index 5cf7f6049a..fb6653c316 100644 --- a/downstreamadapter/sink/topicmanager/kafka_topic_manager.go +++ b/downstreamadapter/sink/topicmanager/kafka_topic_manager.go @@ -45,9 +45,6 @@ type kafkaTopicManager struct { cfg *kafka.AutoCreateTopicConfig topics sync.Map - - metaRefreshTicker *time.Ticker - // cancel is used to cancel the background goroutine. cancel context.CancelFunc } @@ -80,11 +77,10 @@ func newKafkaTopicManager( cfg *kafka.AutoCreateTopicConfig, ) *kafkaTopicManager { mgr := &kafkaTopicManager{ - defaultTopic: defaultTopic, - changefeedID: changefeedID, - admin: admin, - cfg: cfg, - metaRefreshTicker: time.NewTicker(metaRefreshInterval), + defaultTopic: defaultTopic, + changefeedID: changefeedID, + admin: admin, + cfg: cfg, } ctx, mgr.cancel = context.WithCancel(ctx) @@ -114,7 +110,7 @@ func (m *kafkaTopicManager) GetPartitionNum( } func (m *kafkaTopicManager) backgroundRefreshMeta(ctx context.Context) { - ticker := time.NewTicker(5 * time.Second) + ticker := time.NewTicker(metaRefreshInterval) defer ticker.Stop() for { select { @@ -125,8 +121,6 @@ func (m *kafkaTopicManager) backgroundRefreshMeta(ctx context.Context) { ) return case <-ticker.C: - m.admin.Heartbeat() - case <-m.metaRefreshTicker.C: // We ignore the error here, because the error may be caused by the // network problem, and we can try to get the metadata next time. topicPartitionNums, _ := m.fetchAllTopicsPartitionsNum() diff --git a/downstreamadapter/sink/topicmanager/kafka_topic_manager_test.go b/downstreamadapter/sink/topicmanager/kafka_topic_manager_test.go index 49231e3116..bf02658b24 100644 --- a/downstreamadapter/sink/topicmanager/kafka_topic_manager_test.go +++ b/downstreamadapter/sink/topicmanager/kafka_topic_manager_test.go @@ -15,63 +15,13 @@ package topicmanager import ( "context" - "sync" "testing" - "time" "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/sink/kafka" "github.com/stretchr/testify/require" ) -// mockAdminClientForHeartbeat is used to count the calls to Heartbeat. -type mockAdminClientForHeartbeat struct { - kafka.ClusterAdminClientMockImpl - heartbeatCount int - mu sync.Mutex -} - -func (m *mockAdminClientForHeartbeat) Heartbeat() { - m.mu.Lock() - defer m.mu.Unlock() - m.heartbeatCount++ -} - -func (m *mockAdminClientForHeartbeat) GetHeartbeatCount() int { - m.mu.Lock() - defer m.mu.Unlock() - return m.heartbeatCount -} - -func TestKafkaTopicManagerHeartbeat(t *testing.T) { - t.Parallel() - - adminClient := &mockAdminClientForHeartbeat{} - cfg := &kafka.AutoCreateTopicConfig{AutoCreate: false} - changefeedID := common.NewChangefeedID4Test("test", "test") - ctx, cancel := context.WithCancel(context.Background()) - - heartbeatInterval := 5 * time.Second - manager := newKafkaTopicManager(ctx, "topic", changefeedID, adminClient, cfg) - - // Ensure the context is canceled at the end of the test. - defer cancel() - - // Wait for a sufficient amount of time to ensure the heartbeat ticker triggers several times. - // Waiting for 11 seconds to allow for at least two heartbeats. - // Use Eventually to avoid test flakiness. - require.Eventually(t, func() bool { - return adminClient.GetHeartbeatCount() >= 2 - }, 11*time.Second, 150*time.Millisecond, "Heartbeat should be called periodically") - - // Verify that closing the manager stops the heartbeat. - countBeforeClose := adminClient.GetHeartbeatCount() - manager.Close() - // Wait for a short period to ensure no new heartbeats occur. - time.Sleep(heartbeatInterval * 2) - require.Equal(t, countBeforeClose, adminClient.GetHeartbeatCount(), "Heartbeat should stop after manager is closed") -} - func TestCreateTopic(t *testing.T) { t.Parallel() diff --git a/go.mod b/go.mod index 69731c6ee0..06dbcacfa0 100644 --- a/go.mod +++ b/go.mod @@ -394,7 +394,7 @@ require ( ) replace ( - github.com/IBM/sarama v1.41.2 => github.com/pingcap/sarama v1.41.2-pingcap-20251202-x + github.com/IBM/sarama v1.41.2 => github.com/pingcap/sarama v1.41.2-pingcap-20260508 // Downgrade grpc to v1.63.2, as well as other related modules. github.com/apache/arrow-go/v18 => github.com/joechenrh/arrow-go/v18 v18.0.0-20250911101656-62c34c9a3b82 diff --git a/go.sum b/go.sum index 772a16fb33..16633c83da 100644 --- a/go.sum +++ b/go.sum @@ -759,8 +759,8 @@ github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9 h1:qG9BSvlWFEE5otQGa github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9/go.mod h1:ORfBOFp1eteu2odzsyaxI+b8TzJwgjwyQcGhI+9SfEA= github.com/pingcap/metering_sdk v0.0.0-20260324055927-14fead745f1d h1:5JCgncG9X7tOsqKqbIXpV2VG4mu/hv3RvvZewqFj0U4= github.com/pingcap/metering_sdk v0.0.0-20260324055927-14fead745f1d/go.mod h1:HMNxmg0/lrn3SPGJ6LTZqP0WwEpcXMu9s/4TWJbzT8w= -github.com/pingcap/sarama v1.41.2-pingcap-20251202-x h1:9Vi3qqyDNZxG6fnXQhpeTsnwzSBWNpMeb8o02JkL9JM= -github.com/pingcap/sarama v1.41.2-pingcap-20251202-x/go.mod h1:xdpu7sd6OE1uxNdjYTSKUfY8FaKkJES9/+EyjSgiGQk= +github.com/pingcap/sarama v1.41.2-pingcap-20260508 h1:3ZFtYLUGMMZeA6U0iz3EyFnNGPHu3qOuPLj5wXxHmeU= +github.com/pingcap/sarama v1.41.2-pingcap-20260508/go.mod h1:PIL6ZKKKhm19IbQpmpJcFnybAi1yXtgLAitDAeBdNCw= github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5 h1:T4pXRhBflzDeAhmOQHNPRRogMYxP13V7BkYw3ZsoSfE= github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5/go.mod h1:rlimy0GcTvjiJqvD5mXTRr8O2eNZPBrcUgiWVYp9530= github.com/pingcap/tidb v1.1.0-beta.0.20260604031706-f9faeaf4828f h1:Z+Ez33+LxWbKwM88th19M/v81zwFTjbsKFv1qXQk134= diff --git a/pkg/sink/kafka/admin.go b/pkg/sink/kafka/admin.go index 41298460d8..b8cfd1cfc3 100644 --- a/pkg/sink/kafka/admin.go +++ b/pkg/sink/kafka/admin.go @@ -180,13 +180,6 @@ func (a *saramaAdminClient) CreateTopic(detail *TopicDetail, validateOnly bool) return nil } -func (a *saramaAdminClient) Heartbeat() { - brokers := a.client.Brokers() - for _, b := range brokers { - _, _ = b.ApiVersions(&sarama.ApiVersionsRequest{}) - } -} - func (a *saramaAdminClient) Close() { // For admins created via sarama.NewClusterAdminFromClient, admin.Close() takes care // of closing the underlying client as well. Fall back to closing the client directly diff --git a/pkg/sink/kafka/cluster_admin_client.go b/pkg/sink/kafka/cluster_admin_client.go index ffa70e516d..3c6c331c88 100644 --- a/pkg/sink/kafka/cluster_admin_client.go +++ b/pkg/sink/kafka/cluster_admin_client.go @@ -47,8 +47,6 @@ type ClusterAdminClient interface { // CreateTopic creates a new topic. CreateTopic(detail *TopicDetail, validateOnly bool) error - Heartbeat() - // Close shuts down the admin client. Close() } diff --git a/pkg/sink/kafka/factory.go b/pkg/sink/kafka/factory.go index 54b9d5b60a..72a458a508 100644 --- a/pkg/sink/kafka/factory.go +++ b/pkg/sink/kafka/factory.go @@ -48,8 +48,6 @@ type SyncProducer interface { // SendMessages will return an error. SendMessages(topic string, partitionNum int32, message *common.Message) error - Heartbeat() - // Close shuts down the producer; you must call this function before a producer // object passes out of scope, as it may otherwise leak memory. // You must call this before calling Close on the underlying client. @@ -69,8 +67,6 @@ type AsyncProducer interface { // wish to send. AsyncSend(ctx context.Context, topic string, partition int32, message *common.Message) error - Heartbeat() - // AsyncRunCallback process the messages that has sent to kafka, // and run tha attached callback. the caller should call this // method in a background goroutine diff --git a/pkg/sink/kafka/mock_cluster_admin_client.go b/pkg/sink/kafka/mock_cluster_admin_client.go index d7e00780ab..99a12388dd 100644 --- a/pkg/sink/kafka/mock_cluster_admin_client.go +++ b/pkg/sink/kafka/mock_cluster_admin_client.go @@ -94,8 +94,6 @@ func NewClusterAdminClientMockImpl() *ClusterAdminClientMockImpl { } } -func (c *ClusterAdminClientMockImpl) Heartbeat() {} - // GetAllBrokers implement the ClusterAdminClient interface func (c *ClusterAdminClientMockImpl) GetAllBrokers() []Broker { return nil diff --git a/pkg/sink/kafka/mock_factory.go b/pkg/sink/kafka/mock_factory.go index ddc0362e61..5a4bc64092 100644 --- a/pkg/sink/kafka/mock_factory.go +++ b/pkg/sink/kafka/mock_factory.go @@ -107,10 +107,6 @@ func (m *MockSaramaSyncProducer) Close() { _ = m.SyncProducer.Close() } -func (m *MockSaramaSyncProducer) Heartbeat() { - return -} - // MockSaramaAsyncProducer is a mock implementation of AsyncProducer interface. type MockSaramaAsyncProducer struct { AsyncProducer *mocks.AsyncProducer @@ -176,10 +172,6 @@ func (p *MockSaramaAsyncProducer) AsyncSend(ctx context.Context, topic string, p return nil } -func (p *MockSaramaAsyncProducer) Heartbeat() { - return -} - // Close implement the AsyncProducer interface. func (p *MockSaramaAsyncProducer) Close() { if p.closed { diff --git a/pkg/sink/kafka/options.go b/pkg/sink/kafka/options.go index 6a5624427f..c9b992814e 100644 --- a/pkg/sink/kafka/options.go +++ b/pkg/sink/kafka/options.go @@ -38,6 +38,8 @@ import ( const ( // defaultPartitionNum specifies the default number of partitions when we create the topic. defaultPartitionNum = 3 + // defaultMaxRetry is the default retry budget for Kafka producers. + defaultMaxRetry = 5 // the `max-message-bytes` is set equal to topic's `max.message.bytes`, and is used to check // whether the message is larger than the max size limit. It's found some message pass the message @@ -62,10 +64,6 @@ const ( // See: https://kafka.apache.org/documentation/#brokerconfigs_min.insync.replicas and // https://kafka.apache.org/documentation/#topicconfigs_min.insync.replicas MinInsyncReplicasConfigName = "min.insync.replicas" - // BrokerConnectionsMaxIdleMsConfigName specifies the maximum idle time of a connection to a broker. - // Broker will close the connection if it is idle for this long. - // See: https://kafka.apache.org/documentation/#brokerconfigs_connections.max.idle.ms - BrokerConnectionsMaxIdleMsConfigName = "connections.max.idle.ms" ) const ( @@ -119,6 +117,7 @@ type urlConfig struct { ReplicationFactor *int16 `form:"replication-factor"` KafkaVersion *string `form:"kafka-version"` MaxMessageBytes *int `form:"max-message-bytes"` + MaxRetry *int `form:"max-retry"` Compression *string `form:"compression"` KafkaClientID *string `form:"kafka-client-id"` AutoCreateTopic *bool `form:"auto-create-topic"` @@ -158,6 +157,7 @@ type options struct { IsAssignedVersion bool RequestVersion int16 MaxMessageBytes int + MaxRetry int Compression string ClientID string RequiredAcks RequiredAcks @@ -172,10 +172,9 @@ type options struct { SASL *security.SASL // Timeout for network configurations, default to `10s` - DialTimeout time.Duration - WriteTimeout time.Duration - ReadTimeout time.Duration - KeepConnAliveInterval time.Duration + DialTimeout time.Duration + WriteTimeout time.Duration + ReadTimeout time.Duration } // NewOptions returns a default Kafka configuration @@ -184,6 +183,7 @@ func NewOptions() *options { Version: "2.4.0", // MaxMessageBytes will be used to initialize producer MaxMessageBytes: config.DefaultMaxMessageBytes, + MaxRetry: defaultMaxRetry, ReplicationFactor: 1, Compression: "none", RequiredAcks: WaitForAll, @@ -261,6 +261,10 @@ func (o *options) Apply(changefeedID common.ChangeFeedID, o.MaxMessageBytes = *urlParameter.MaxMessageBytes } + if urlParameter.MaxRetry != nil && *urlParameter.MaxRetry >= 0 { + o.MaxRetry = *urlParameter.MaxRetry + } + if urlParameter.Compression != nil { o.Compression = *urlParameter.Compression } @@ -593,21 +597,6 @@ func adjustOptions( } } - // adjust keepConnAliveInterval by `connections.max.idle.ms` broker config. - idleMs, err := admin.GetBrokerConfig(BrokerConnectionsMaxIdleMsConfigName) - if err != nil { - log.Warn("GetBrokerConfig failed for connections.max.idle.ms", zap.Error(err)) - } else { - idleMsInt, err := strconv.Atoi(idleMs) - if err != nil || idleMsInt <= 0 { - log.Warn("invalid broker config", - zap.String("configName", BrokerConnectionsMaxIdleMsConfigName), zap.String("configValue", idleMs)) - return errors.Trace(err) - } - options.KeepConnAliveInterval = time.Duration(idleMsInt/3) * time.Millisecond - log.Info("Adjust KeepConnAliveInterval", zap.Duration("KeepConnAliveInterval", options.KeepConnAliveInterval)) - } - info, exists := topics[topic] // once we have found the topic, no matter `auto-create-topic`, // make sure user input parameters are valid. diff --git a/pkg/sink/kafka/options_test.go b/pkg/sink/kafka/options_test.go index 75c690f173..3023ec08bd 100644 --- a/pkg/sink/kafka/options_test.go +++ b/pkg/sink/kafka/options_test.go @@ -50,6 +50,7 @@ func TestCompleteOptions(t *testing.T) { require.Equal(t, "2.6.0", options.Version) require.Equal(t, 4096, options.MaxMessageBytes) require.Equal(t, WaitForLocal, options.RequiredAcks) + require.Equal(t, defaultMaxRetry, options.MaxRetry) // multiple kafka broker endpoints uri = "kafka://127.0.0.1:9092,127.0.0.1:9091,127.0.0.1:9090/kafka-test?" @@ -77,6 +78,14 @@ func TestCompleteOptions(t *testing.T) { err = options.Apply(commonType.NewChangefeedID4Test(commonType.DefaultKeyspaceName, "test"), sinkURI, config.GetDefaultReplicaConfig().Sink) require.Regexp(t, ".*invalid syntax.*", errors.Cause(err)) + // Illegal max-retry. + uri = "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&max-retry=a" + sinkURI, err = url.Parse(uri) + require.NoError(t, err) + options = NewOptions() + err = options.Apply(commonType.NewChangefeedID4Test(commonType.DefaultKeyspaceName, "test"), sinkURI, config.GetDefaultReplicaConfig().Sink) + require.Regexp(t, ".*invalid syntax.*", errors.Cause(err)) + // Illegal partition-num. uri = "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=a" sinkURI, err = url.Parse(uri) @@ -108,6 +117,32 @@ func TestCompleteOptions(t *testing.T) { options = NewOptions() err = options.Apply(commonType.NewChangefeedID4Test(commonType.DefaultKeyspaceName, "test"), sinkURI, config.GetDefaultReplicaConfig().Sink) require.True(t, errors.ErrKafkaInvalidClientID.Equal(err)) + + // max-retry accepts non-negative sink-uri values. + uri = "kafka://127.0.0.1:9092/abc?max-retry=7" + sinkURI, err = url.Parse(uri) + require.NoError(t, err) + options = NewOptions() + err = options.Apply(commonType.NewChangefeedID4Test(commonType.DefaultKeyspaceName, "test"), sinkURI, config.GetDefaultReplicaConfig().Sink) + require.NoError(t, err) + require.Equal(t, 7, options.MaxRetry) + + uri = "kafka://127.0.0.1:9092/abc?max-retry=0" + sinkURI, err = url.Parse(uri) + require.NoError(t, err) + options = NewOptions() + err = options.Apply(commonType.NewChangefeedID4Test(commonType.DefaultKeyspaceName, "test"), sinkURI, config.GetDefaultReplicaConfig().Sink) + require.NoError(t, err) + require.Equal(t, 0, options.MaxRetry) + + // Negative max-retry values are ignored. + uri = "kafka://127.0.0.1:9092/abc?max-retry=-1" + sinkURI, err = url.Parse(uri) + require.NoError(t, err) + options = NewOptions() + err = options.Apply(commonType.NewChangefeedID4Test(commonType.DefaultKeyspaceName, "test"), sinkURI, config.GetDefaultReplicaConfig().Sink) + require.NoError(t, err) + require.Equal(t, defaultMaxRetry, options.MaxRetry) } func TestSetPartitionNum(t *testing.T) { @@ -782,76 +817,3 @@ func TestMerge(t *testing.T) { require.Equal(t, "cert.pem", c.Credential.CertPath) require.Equal(t, "key.pem", c.Credential.KeyPath) } - -// mockAdminClientForAdjust mocks the ClusterAdminClient to test adjustOptions. -type mockAdminClientForAdjust struct { - ClusterAdminClientMockImpl // We know there is a Mock implementation from the diff - brokerConfigValue string - shouldError bool -} - -// GetBrokerConfig simulates the behavior of getting configuration from a broker. -func (m *mockAdminClientForAdjust) GetBrokerConfig(configName string) (string, error) { - if m.shouldError { - return "", errors.New("mock error: cannot get broker config") - } - if configName == BrokerConnectionsMaxIdleMsConfigName { - return m.brokerConfigValue, nil - } - return "", errors.Errorf("unexpected config name: %s", configName) -} - -func TestAdjustOptionsKeepAlive(t *testing.T) { - t.Parallel() - ctx := context.Background() - - // Case 1: Successful adjustment. - // The broker returns a valid idle time, KeepConnAliveInterval should be set to 1/3 of it. - t.Run("SuccessfulAdjustment", func(t *testing.T) { - t.Parallel() - o := NewOptions() - adminClient := &mockAdminClientForAdjust{ - ClusterAdminClientMockImpl: *NewClusterAdminClientMockImpl(), - brokerConfigValue: "300000", // 300,000 ms = 300 s - } - err := adjustOptions(ctx, adminClient, o, adminClient.GetDefaultMockTopicName()) - require.NoError(t, err) - // Expected value is 300000ms / 3 = 100000ms = 100s - require.Equal(t, 100*time.Second, o.KeepConnAliveInterval) - }) - - // Case 2: Broker returns an invalid (non-integer) config value. - t.Run("InvalidNonIntegerConfig", func(t *testing.T) { - t.Parallel() - o := NewOptions() - adminClient := &mockAdminClientForAdjust{ - ClusterAdminClientMockImpl: *NewClusterAdminClientMockImpl(), - brokerConfigValue: "not-a-number", - } - err := adjustOptions(ctx, adminClient, o, adminClient.GetDefaultMockTopicName()) - require.Error(t, err) - // The error should be a type conversion error. - var numError *strconv.NumError - ok := errors.As(errors.Cause(err), &numError) - require.True(t, ok, "error should be of type strconv.NumError") - }) - - // Case 3: Broker returns an invalid (zero or negative) config value. - // According to the code in the diff, this case will log a warning and return a nil error, - // and the configuration item will not be updated. - t.Run("InvalidZeroOrNegativeConfig", func(t *testing.T) { - t.Parallel() - for _, val := range []string{"0", "-1000"} { - o := NewOptions() - defaultInterval := o.KeepConnAliveInterval - adminClient := &mockAdminClientForAdjust{ - ClusterAdminClientMockImpl: *NewClusterAdminClientMockImpl(), - brokerConfigValue: val, - } - err := adjustOptions(ctx, adminClient, o, adminClient.GetDefaultMockTopicName()) - require.NoError(t, err, "should not return error for zero or negative idle time") - // KeepConnAliveInterval should remain its default value. - require.Equal(t, defaultInterval, o.KeepConnAliveInterval, "interval should not be changed") - } - }) -} diff --git a/pkg/sink/kafka/sarama_async_producer.go b/pkg/sink/kafka/sarama_async_producer.go index 58b723af16..9c0bd82104 100644 --- a/pkg/sink/kafka/sarama_async_producer.go +++ b/pkg/sink/kafka/sarama_async_producer.go @@ -146,13 +146,6 @@ func (p *saramaAsyncProducer) handleProducerError(err *sarama.ProducerError) err return cerror.WrapError(cerror.ErrKafkaAsyncSendMessage, errWithInfo) } -func (p *saramaAsyncProducer) Heartbeat() { - brokers := p.client.Brokers() - for _, b := range brokers { - _, _ = b.ApiVersions(&sarama.ApiVersionsRequest{}) - } -} - // AsyncSend is the input channel for the user to write messages to that they // wish to send. func (p *saramaAsyncProducer) AsyncSend( diff --git a/pkg/sink/kafka/sarama_config.go b/pkg/sink/kafka/sarama_config.go index 59faa82610..b53dc47b37 100644 --- a/pkg/sink/kafka/sarama_config.go +++ b/pkg/sink/kafka/sarama_config.go @@ -45,22 +45,17 @@ func newSaramaConfig(ctx context.Context, o *options) (*sarama.Config, error) { config.Metadata.Retry.Max = 10 config.Metadata.Retry.Backoff = 200 * time.Millisecond config.Metadata.Timeout = 2 * time.Minute - // The kafka server side connections.max.idle.ms default value is 10 minutes. - // it will close the connection if idle for too long. - // so we need to refresh the metadata frequently to avoid the connection being closed by server, - // and then trigger the `fetching metadata: write broken pipe` error, it's annoying. - config.Metadata.RefreshFrequency = 9 * time.Minute - config.Admin.Retry.Max = 10 config.Admin.Retry.Backoff = 200 * time.Millisecond // This timeout control the request timeout for each admin request. // set it as the read timeout. config.Admin.Timeout = 10 * time.Second - // According to the https://github.com/IBM/sarama/issues/2619, - // sarama may send message out of order even set the `config.Net.MaxOpenRequest` to 1, - // when the kafka cluster is unhealthy and trigger the internal retry mechanism. - config.Producer.Retry.Max = 0 + // Keep a bounded producer retry budget to tolerate transient broker-side + // connection failures such as stale connections or broken pipe errors. + // The PingCAP Sarama fork includes the partition-muting ordering fix, while + // Net.MaxOpenRequests=1 below remains an extra ordering guard. + config.Producer.Retry.Max = o.MaxRetry config.Producer.Retry.Backoff = 100 * time.Millisecond // make sure sarama producer flush messages as soon as possible. diff --git a/pkg/sink/kafka/sarama_config_test.go b/pkg/sink/kafka/sarama_config_test.go index e8711a1440..d12595bfe9 100644 --- a/pkg/sink/kafka/sarama_config_test.go +++ b/pkg/sink/kafka/sarama_config_test.go @@ -22,6 +22,7 @@ import ( "github.com/IBM/sarama" "github.com/gin-gonic/gin/binding" "github.com/pingcap/errors" + commonType "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/security" "github.com/stretchr/testify/require" @@ -54,6 +55,9 @@ func TestNewSaramaConfig(t *testing.T) { require.NoError(t, err) require.Equal(t, cc.expected, cfg.Producer.Compression) } + cfg, err := newSaramaConfig(ctx, options) + require.NoError(t, err) + require.Equal(t, defaultMaxRetry, cfg.Producer.Retry.Max) options.EnableTLS = true options.Credential = &security.Credential{ @@ -73,7 +77,7 @@ func TestNewSaramaConfig(t *testing.T) { SASLMechanism: sarama.SASLTypeSCRAMSHA256, } - cfg, err := newSaramaConfig(ctx, saslOptions) + cfg, err = newSaramaConfig(ctx, saslOptions) require.NoError(t, err) require.NotNil(t, cfg) require.Equal(t, "user", cfg.Net.SASL.User) @@ -81,6 +85,60 @@ func TestNewSaramaConfig(t *testing.T) { require.Equal(t, sarama.SASLMechanism("SCRAM-SHA-256"), cfg.Net.SASL.Mechanism) } +func TestNewSaramaConfigMaxRetryFromSinkURI(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + sinkURI string + expected int + }{ + { + name: "default max retry", + sinkURI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&kafka-client-id=unit-test", + expected: defaultMaxRetry, + }, + { + name: "set max retry", + sinkURI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0" + + "&kafka-client-id=unit-test&max-retry=7", + expected: 7, + }, + { + name: "zero max retry", + sinkURI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0" + + "&kafka-client-id=unit-test&max-retry=0", + expected: 0, + }, + { + name: "negative max retry", + sinkURI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0" + + "&kafka-client-id=unit-test&max-retry=-1", + expected: defaultMaxRetry, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + t.Parallel() + + options := NewOptions() + sinkURI, err := url.Parse(test.sinkURI) + require.NoError(t, err) + err = options.Apply( + commonType.NewChangefeedID4Test(commonType.DefaultKeyspaceName, "test"), + sinkURI, + config.GetDefaultReplicaConfig().Sink, + ) + require.NoError(t, err) + + cfg, err := newSaramaConfig(context.Background(), options) + require.NoError(t, err) + require.Equal(t, test.expected, cfg.Producer.Retry.Max) + }) + } +} + func TestApplySASL(t *testing.T) { t.Parallel() diff --git a/pkg/sink/kafka/sarama_factory.go b/pkg/sink/kafka/sarama_factory.go index 6d6c03aadc..650f346b4c 100644 --- a/pkg/sink/kafka/sarama_factory.go +++ b/pkg/sink/kafka/sarama_factory.go @@ -116,7 +116,6 @@ func (f *saramaFactory) SyncProducer(ctx context.Context) (SyncProducer, error) return nil, errors.WrapError(errors.ErrKafkaNewProducer, err) } config.MetricRegistry = f.metricRegistry - config.Producer.Retry.Max = 3 client, err := sarama.NewClient(f.option.BrokerEndpoints, config) if err != nil { diff --git a/pkg/sink/kafka/sarama_sync_producer.go b/pkg/sink/kafka/sarama_sync_producer.go index 4410e0aff9..9d5efdfb0b 100644 --- a/pkg/sink/kafka/sarama_sync_producer.go +++ b/pkg/sink/kafka/sarama_sync_producer.go @@ -101,16 +101,6 @@ func (p *saramaSyncProducer) SendMessages(topic string, partitionNum int32, mess return errors.WrapError(errors.ErrKafkaSendMessage, err) } -func (p *saramaSyncProducer) Heartbeat() { - if p.closed.Load() { - return - } - brokers := p.client.Brokers() - for _, b := range brokers { - _, _ = b.ApiVersions(&sarama.ApiVersionsRequest{}) - } -} - func (p *saramaSyncProducer) Close() { if p.closed.Load() { log.Warn("kafka DDL producer already closed",