Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions downstreamadapter/sink/kafka/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,17 +375,12 @@ func (s *sink) sendMessages(ctx context.Context) error {
metricSendMessageDuration := metrics.WorkerSendMessageDuration.WithLabelValues(s.changefeedID.Keyspace(), s.changefeedID.Name())
defer metrics.WorkerSendMessageDuration.DeleteLabelValues(s.changefeedID.Keyspace(), s.changefeedID.Name())

ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

var err error
outCh := s.comp.encoderGroup.Output()
for {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-ticker.C:
s.dmlProducer.Heartbeat()
case future, ok := <-outCh:
if !ok {
log.Info("kafka sink encoder's output channel closed",
Expand Down Expand Up @@ -483,9 +478,6 @@ func (s *sink) sendCheckpoint(ctx context.Context) error {
metrics.CheckpointTsMessageCount.DeleteLabelValues(s.changefeedID.Keyspace(), s.changefeedID.Name())
}()

ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

var (
msg *common.Message
partitionNum int32
Expand All @@ -495,8 +487,6 @@ func (s *sink) sendCheckpoint(ctx context.Context) error {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-ticker.C:
s.ddlProducer.Heartbeat()
case ts, ok := <-s.checkpointChan:
if !ok {
log.Warn("kafka sink checkpoint channel closed",
Expand Down
89 changes: 0 additions & 89 deletions downstreamadapter/sink/kafka/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ import (
"context"
"fmt"
"net/url"
"sync"
"testing"
"time"

"github.com/IBM/sarama/mocks"
"github.com/pingcap/errors"
"github.com/pingcap/ticdc/downstreamadapter/sink/helper"
"github.com/pingcap/ticdc/pkg/common"
Expand Down Expand Up @@ -104,93 +102,6 @@ func newKafkaSinkForTest(ctx context.Context) (*sink, error) {
return newKafkaSinkForTestWithProducers(ctx, nil, nil)
}

// mockSyncProducer is used to count the calls to Heartbeat.
type mockSyncProducer struct {
kafka.MockSaramaSyncProducer
heartbeatCount int
mu sync.Mutex
}

func (m *mockSyncProducer) Heartbeat() {
m.mu.Lock()
defer m.mu.Unlock()
m.heartbeatCount++
}

func (m *mockSyncProducer) GetHeartbeatCount() int {
m.mu.Lock()
defer m.mu.Unlock()
return m.heartbeatCount
}

func TestDDLProducerHeartbeat(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
producer := &mockSyncProducer{}
heartbeatInterval := 5 * time.Second
_, err := newKafkaSinkForTestWithProducers(ctx, nil, producer)
require.NoError(t, err)

// Wait for a sufficient amount of time to ensure the heartbeat ticker triggers several times.
// Waiting for 11 seconds to allow for at least two heartbeats.
// Use Eventually to avoid test flakiness.
require.Eventually(t, func() bool {
return producer.GetHeartbeatCount() >= 2
}, 11*time.Second, 150*time.Millisecond, "Heartbeat should be called periodically")

// Verify that closing the manager stops the heartbeat.
countBeforeClose := producer.GetHeartbeatCount()
cancel()
// Wait for a short period to ensure no new heartbeats occur.
time.Sleep(heartbeatInterval * 2)
require.Equal(t, countBeforeClose, producer.GetHeartbeatCount(), "Heartbeat should stop after manager is closed")
}

// mockSyncProducer is used to count the calls to Heartbeat.
type mockAsyncProducer struct {
kafka.MockSaramaAsyncProducer
heartbeatCount int
mu sync.Mutex
}

func (m *mockAsyncProducer) Heartbeat() {
m.mu.Lock()
defer m.mu.Unlock()
m.heartbeatCount++
}

func (m *mockAsyncProducer) GetHeartbeatCount() int {
m.mu.Lock()
defer m.mu.Unlock()
return m.heartbeatCount
}

func TestDMLProducerHeartbeat(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
producer := &mockAsyncProducer{}
producer.AsyncProducer = mocks.NewAsyncProducer(t, nil)
heartbeatInterval := 5 * time.Second
_, err := newKafkaSinkForTestWithProducers(ctx, producer, nil)
require.NoError(t, err)

// Wait for a sufficient amount of time to ensure the heartbeat ticker triggers several times.
// Waiting for 11 seconds to allow for at least two heartbeats.
// Use Eventually to avoid test flakiness.
require.Eventually(t, func() bool {
return producer.GetHeartbeatCount() >= 2
}, 11*time.Second, 150*time.Millisecond, "Heartbeat should be called periodically")

// Verify that closing the manager stops the heartbeat.
countBeforeClose := producer.GetHeartbeatCount()
cancel()
// Wait for a short period to ensure no new heartbeats occur.
time.Sleep(heartbeatInterval * 2)
require.Equal(t, countBeforeClose, producer.GetHeartbeatCount(), "Heartbeat should stop after manager is closed")
}

func TestKafkaSinkBasicFunctionality(t *testing.T) {
helper := commonEvent.NewEventTestHelper(t)
defer helper.Close()
Expand Down
16 changes: 5 additions & 11 deletions downstreamadapter/sink/topicmanager/kafka_topic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ type kafkaTopicManager struct {
cfg *kafka.AutoCreateTopicConfig

topics sync.Map

metaRefreshTicker *time.Ticker

// cancel is used to cancel the background goroutine.
cancel context.CancelFunc
}
Expand Down Expand Up @@ -80,11 +77,10 @@ func newKafkaTopicManager(
cfg *kafka.AutoCreateTopicConfig,
) *kafkaTopicManager {
mgr := &kafkaTopicManager{
defaultTopic: defaultTopic,
changefeedID: changefeedID,
admin: admin,
cfg: cfg,
metaRefreshTicker: time.NewTicker(metaRefreshInterval),
defaultTopic: defaultTopic,
changefeedID: changefeedID,
admin: admin,
cfg: cfg,
}

ctx, mgr.cancel = context.WithCancel(ctx)
Expand Down Expand Up @@ -114,7 +110,7 @@ func (m *kafkaTopicManager) GetPartitionNum(
}

func (m *kafkaTopicManager) backgroundRefreshMeta(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second)
ticker := time.NewTicker(metaRefreshInterval)
defer ticker.Stop()
for {
select {
Expand All @@ -125,8 +121,6 @@ func (m *kafkaTopicManager) backgroundRefreshMeta(ctx context.Context) {
)
return
case <-ticker.C:
m.admin.Heartbeat()
case <-m.metaRefreshTicker.C:
// We ignore the error here, because the error may be caused by the
// network problem, and we can try to get the metadata next time.
topicPartitionNums, _ := m.fetchAllTopicsPartitionsNum()
Expand Down
50 changes: 0 additions & 50 deletions downstreamadapter/sink/topicmanager/kafka_topic_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,63 +15,13 @@ package topicmanager

import (
"context"
"sync"
"testing"
"time"

"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/sink/kafka"
"github.com/stretchr/testify/require"
)

// mockAdminClientForHeartbeat is used to count the calls to Heartbeat.
type mockAdminClientForHeartbeat struct {
kafka.ClusterAdminClientMockImpl
heartbeatCount int
mu sync.Mutex
}

func (m *mockAdminClientForHeartbeat) Heartbeat() {
m.mu.Lock()
defer m.mu.Unlock()
m.heartbeatCount++
}

func (m *mockAdminClientForHeartbeat) GetHeartbeatCount() int {
m.mu.Lock()
defer m.mu.Unlock()
return m.heartbeatCount
}

func TestKafkaTopicManagerHeartbeat(t *testing.T) {
t.Parallel()

adminClient := &mockAdminClientForHeartbeat{}
cfg := &kafka.AutoCreateTopicConfig{AutoCreate: false}
changefeedID := common.NewChangefeedID4Test("test", "test")
ctx, cancel := context.WithCancel(context.Background())

heartbeatInterval := 5 * time.Second
manager := newKafkaTopicManager(ctx, "topic", changefeedID, adminClient, cfg)

// Ensure the context is canceled at the end of the test.
defer cancel()

// Wait for a sufficient amount of time to ensure the heartbeat ticker triggers several times.
// Waiting for 11 seconds to allow for at least two heartbeats.
// Use Eventually to avoid test flakiness.
require.Eventually(t, func() bool {
return adminClient.GetHeartbeatCount() >= 2
}, 11*time.Second, 150*time.Millisecond, "Heartbeat should be called periodically")

// Verify that closing the manager stops the heartbeat.
countBeforeClose := adminClient.GetHeartbeatCount()
manager.Close()
// Wait for a short period to ensure no new heartbeats occur.
time.Sleep(heartbeatInterval * 2)
require.Equal(t, countBeforeClose, adminClient.GetHeartbeatCount(), "Heartbeat should stop after manager is closed")
}

func TestCreateTopic(t *testing.T) {
t.Parallel()

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ require (
)

replace (
github.com/IBM/sarama v1.41.2 => github.com/pingcap/sarama v1.41.2-pingcap-20251202-x
github.com/IBM/sarama v1.41.2 => github.com/pingcap/sarama v1.41.2-pingcap-20260508

// Downgrade grpc to v1.63.2, as well as other related modules.
github.com/apache/arrow-go/v18 => github.com/joechenrh/arrow-go/v18 v18.0.0-20250911101656-62c34c9a3b82
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -759,8 +759,8 @@ github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9 h1:qG9BSvlWFEE5otQGa
github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9/go.mod h1:ORfBOFp1eteu2odzsyaxI+b8TzJwgjwyQcGhI+9SfEA=
github.com/pingcap/metering_sdk v0.0.0-20260324055927-14fead745f1d h1:5JCgncG9X7tOsqKqbIXpV2VG4mu/hv3RvvZewqFj0U4=
github.com/pingcap/metering_sdk v0.0.0-20260324055927-14fead745f1d/go.mod h1:HMNxmg0/lrn3SPGJ6LTZqP0WwEpcXMu9s/4TWJbzT8w=
github.com/pingcap/sarama v1.41.2-pingcap-20251202-x h1:9Vi3qqyDNZxG6fnXQhpeTsnwzSBWNpMeb8o02JkL9JM=
github.com/pingcap/sarama v1.41.2-pingcap-20251202-x/go.mod h1:xdpu7sd6OE1uxNdjYTSKUfY8FaKkJES9/+EyjSgiGQk=
github.com/pingcap/sarama v1.41.2-pingcap-20260508 h1:3ZFtYLUGMMZeA6U0iz3EyFnNGPHu3qOuPLj5wXxHmeU=
github.com/pingcap/sarama v1.41.2-pingcap-20260508/go.mod h1:PIL6ZKKKhm19IbQpmpJcFnybAi1yXtgLAitDAeBdNCw=
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5 h1:T4pXRhBflzDeAhmOQHNPRRogMYxP13V7BkYw3ZsoSfE=
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5/go.mod h1:rlimy0GcTvjiJqvD5mXTRr8O2eNZPBrcUgiWVYp9530=
github.com/pingcap/tidb v1.1.0-beta.0.20260604031706-f9faeaf4828f h1:Z+Ez33+LxWbKwM88th19M/v81zwFTjbsKFv1qXQk134=
Expand Down
7 changes: 0 additions & 7 deletions pkg/sink/kafka/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,6 @@ func (a *saramaAdminClient) CreateTopic(detail *TopicDetail, validateOnly bool)
return nil
}

func (a *saramaAdminClient) Heartbeat() {
brokers := a.client.Brokers()
for _, b := range brokers {
_, _ = b.ApiVersions(&sarama.ApiVersionsRequest{})
}
}

func (a *saramaAdminClient) Close() {
// For admins created via sarama.NewClusterAdminFromClient, admin.Close() takes care
// of closing the underlying client as well. Fall back to closing the client directly
Expand Down
2 changes: 0 additions & 2 deletions pkg/sink/kafka/cluster_admin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ type ClusterAdminClient interface {
// CreateTopic creates a new topic.
CreateTopic(detail *TopicDetail, validateOnly bool) error

Heartbeat()

// Close shuts down the admin client.
Close()
}
4 changes: 0 additions & 4 deletions pkg/sink/kafka/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ type SyncProducer interface {
// SendMessages will return an error.
SendMessages(topic string, partitionNum int32, message *common.Message) error

Heartbeat()

// Close shuts down the producer; you must call this function before a producer
// object passes out of scope, as it may otherwise leak memory.
// You must call this before calling Close on the underlying client.
Expand All @@ -69,8 +67,6 @@ type AsyncProducer interface {
// wish to send.
AsyncSend(ctx context.Context, topic string, partition int32, message *common.Message) error

Heartbeat()

// AsyncRunCallback process the messages that has sent to kafka,
// and run tha attached callback. the caller should call this
// method in a background goroutine
Expand Down
2 changes: 0 additions & 2 deletions pkg/sink/kafka/mock_cluster_admin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ func NewClusterAdminClientMockImpl() *ClusterAdminClientMockImpl {
}
}

func (c *ClusterAdminClientMockImpl) Heartbeat() {}

// GetAllBrokers implement the ClusterAdminClient interface
func (c *ClusterAdminClientMockImpl) GetAllBrokers() []Broker {
return nil
Expand Down
8 changes: 0 additions & 8 deletions pkg/sink/kafka/mock_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,6 @@ func (m *MockSaramaSyncProducer) Close() {
_ = m.SyncProducer.Close()
}

func (m *MockSaramaSyncProducer) Heartbeat() {
return
}

// MockSaramaAsyncProducer is a mock implementation of AsyncProducer interface.
type MockSaramaAsyncProducer struct {
AsyncProducer *mocks.AsyncProducer
Expand Down Expand Up @@ -176,10 +172,6 @@ func (p *MockSaramaAsyncProducer) AsyncSend(ctx context.Context, topic string, p
return nil
}

func (p *MockSaramaAsyncProducer) Heartbeat() {
return
}

// Close implement the AsyncProducer interface.
func (p *MockSaramaAsyncProducer) Close() {
if p.closed {
Expand Down
Loading
Loading