kafka: bump sarama version and enable the retry to fix the broken pipe and out of order#12618
Conversation
There was a problem hiding this comment.
Code Review
This pull request removes the manual heartbeat and connection keep-alive logic across Kafka sinks and managers, including the removal of the HeartbeatBrokers method and KeepConnAliveInterval configuration. It also updates the sarama dependency and increases the producer's maximum retry count to 5. A review comment suggests updating the documentation in pkg/sink/kafka/sarama.go to reflect whether the previously mentioned ordering issues with retries have been resolved in the new version.
|
/retest |
2 similar comments
|
/retest |
|
/retest |
|
/test pull-cdc-integration-storage-test |
1 similar comment
|
/test pull-cdc-integration-storage-test |
[LGTM Timeline notifier]Timeline:
|
|
/retest |
|
/retest |
1 similar comment
|
/retest |
|
Manual Test:
// Add HandleKey field for message
HandleKey *string
package main
import (
"context"
"fmt"
"math/rand"
"sync/atomic"
"time"
"github.com/IBM/sarama"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/pingcap/tiflow/pkg/sink/kafka"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
var (
PartitionNum int32 = 3
ReplicationFactor int16 = 1
partition int32 = 0
schema = ""
table = ""
)
func initlog() {
logger, err := zap.NewStdLogAt(log.L().With(zap.String("component", "sarama")), zapcore.InfoLevel)
if err != nil {
fmt.Println("err", err)
}
sarama.Logger = logger
}
func main() {
initlog()
topic := fmt.Sprintf("test-%d", rand.Int())
ctx := context.Background()
o := kafka.NewOptions()
o.BrokerEndpoints = []string{"127.0.0.1:9092"}
o.ClientID = "sarama-test"
f, err := kafka.NewSaramaFactory(o, model.DefaultChangeFeedID("sarama-test"))
if err != nil {
fmt.Println("err", err)
}
admin, err := f.AdminClient(ctx)
if err != nil {
fmt.Println("err", err)
}
err = admin.CreateTopic(ctx, &kafka.TopicDetail{Name: topic,
NumPartitions: PartitionNum,
ReplicationFactor: ReplicationFactor,
}, false)
if err != nil {
fmt.Println("err", err)
}
async, err := f.AsyncProducer(ctx, make(chan error, 1))
if err != nil {
fmt.Println("err", err)
}
go func() {
err := async.AsyncRunCallback(ctx)
if err != nil {
fmt.Println("err", err)
}
}()
var count int32
for i := 0; i < 200; i++ {
val := fmt.Sprintf("val: %d", i)
msg := common.NewMsg(config.ProtocolCanalJSON, nil, []byte(val), 0, model.MessageTypeRow, &schema, &table)
msg.Callback = func() {
atomic.AddInt32(&count, 1)
}
msg.HandleKey = []string{val}
err := async.AsyncSend(ctx, topic, partition, msg)
if err != nil {
fmt.Println("AsyncSend err", err)
}
time.Sleep(time.Microsecond * 1000)
}
for atomic.LoadInt32(&count) < 200 {
}
fmt.Println("topic", topic)
}
|
|
/retest |
2 similar comments
|
/retest |
|
/retest |
|
In response to a cherrypick label: new pull request created to branch |
|
In response to a cherrypick label: new pull request created to branch |
|
In response to a cherrypick label: new pull request created to branch |
What problem does this PR solve?
TiCDC disabled Kafka producer retries to avoid the Sarama out-of-order retry issue tracked by #11935. With retries disabled, transient Kafka send errors such as broken pipe fail the sink immediately, create noisy sink-error metrics and alerts, and can trigger costly sink rebuilds.
Issue Number: close #12655, ref #11935
What is changed and how it works?
github.com/pingcap/sarama v1.41.2-pingcap-20260508, which includes the producer retry ordering fix.Producer.Retry.Maxto5by default while keepingNet.MaxOpenRequests = 1as an additional ordering guard.max-retryto configure SaramaProducer.Retry.Max;max-retry=0disables producer retry and negative values are ignored.Check List
Tests
go test ./pkg/sink/kafkaQuestions
Will it cause performance regression or break compatibility?
No breaking compatibility is expected. This intentionally changes Kafka sink behavior by enabling bounded producer retries for transient send failures. Users can set
max-retry=0in the sink URI to disable producer retry if needed.Do you need to update user documentation, design documentation or monitoring documentation?
User documentation should mention the new Kafka sink URI parameter
max-retry. No design or monitoring documentation update is required.Release note