kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618)#12660
Conversation
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
|
This cherry pick PR is for a release branch and has not yet been approved by triage owners. To merge this cherry pick:
DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
@3AceShowHand This PR has conflicts, I have hold it. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
@ti-chi-bot: ## If you want to know how to resolve it, please read the guide in TiDB Dev Guide. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
There was a problem hiding this comment.
Code Review
This pull request bumps the Sarama version and introduces a configurable MaxRetry option for the Kafka producer to resolve broken pipe and out-of-order issues. However, the PR cannot be merged in its current state due to numerous unresolved git conflict markers across several files, including Go source files, tests, and go.mod. Additionally, there is an unused variable partitionNum in mq_ddl_sink.go that will cause a compilation failure and must be removed.
| <<<<<<< HEAD | ||
| s := newDDLSink(ctx, changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder, protocol) | ||
| ======= | ||
| s := newDDLSink(changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder.Build(), protocol) | ||
| >>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618)) |
| <<<<<<< HEAD | ||
| encoder := k.encoderBuilder.Build() | ||
| msg, err := encoder.EncodeCheckpointEvent(ts) | ||
| ======= | ||
| var ( | ||
| err error | ||
| partitionNum int32 | ||
| ) | ||
| msg, err := k.encoder.EncodeCheckpointEvent(ts) | ||
| >>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618)) |
There was a problem hiding this comment.
| <<<<<<< HEAD | ||
| ======= | ||
| require.Equal(t, PartitionAll, getDDLDispatchRule(config.ProtocolDebezium)) | ||
| >>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618)) |
| <<<<<<< HEAD | ||
| s := newDDLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, encoderBuilder, protocol) | ||
| ======= | ||
| s := newDDLSink(changefeedID, p, nil, topicManager, eventRouter, encoderBuilder.Build(), protocol) | ||
| >>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618)) |
| <<<<<<< HEAD | ||
| metaRefreshTicker *time.Ticker | ||
|
|
||
| ======= | ||
| >>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618)) |
| <<<<<<< HEAD | ||
| client sarama.Client | ||
| producer sarama.SyncProducer | ||
| ======= | ||
| producer sarama.SyncProducer | ||
| client sarama.Client | ||
| >>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618)) |
| <<<<<<< HEAD | ||
| return p.producer.SendMessages(msgs) | ||
| ======= | ||
| err := p.producer.SendMessages(msgs) | ||
| return cerror.WrapError(cerror.ErrKafkaSendMessage, err) | ||
| >>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618)) |
| <<<<<<< HEAD | ||
| ======= | ||
| MaxRetry: defaultProducerMaxRetry, | ||
| >>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618)) |
| <<<<<<< HEAD | ||
| client: client, | ||
| producer: p, | ||
| ======= | ||
| producer: p, | ||
| client: client, | ||
| >>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618)) |
| <<<<<<< HEAD | ||
| ======= | ||
| func setProducerMockHandlers(t *testing.T, broker *sarama.MockBroker) { | ||
| broker.SetHandlerByMap(map[string]sarama.MockResponse{ | ||
| "ApiVersionsRequest": sarama.NewMockApiVersionsResponse(t).SetApiKeys([]sarama.ApiVersionsResponseKey{ | ||
| {ApiKey: 0, MinVersion: 0, MaxVersion: 8}, // Produce | ||
| {ApiKey: 1, MinVersion: 0, MaxVersion: 11}, // Fetch | ||
| {ApiKey: 2, MinVersion: 0, MaxVersion: 5}, // ListOffsets | ||
| {ApiKey: 3, MinVersion: 0, MaxVersion: 10}, // Metadata | ||
| {ApiKey: 18, MinVersion: 0, MaxVersion: 3}, // ApiVersions | ||
| }), | ||
| "MetadataRequest": sarama.NewMockMetadataResponse(t). | ||
| SetBroker(broker.Addr(), broker.BrokerID()), | ||
| }) | ||
| } | ||
|
|
||
| >>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618)) |
There was a problem hiding this comment.
Unresolved git conflict markers found in this file. Please resolve the conflict by keeping the cherry-picked version.
func setProducerMockHandlers(t *testing.T, broker *sarama.MockBroker) {
broker.SetHandlerByMap(map[string]sarama.MockResponse{
"ApiVersionsRequest": sarama.NewMockApiVersionsResponse(t).SetApiKeys([]sarama.ApiVersionsResponseKey{
{ApiKey: 0, MinVersion: 0, MaxVersion: 8}, // Produce
{ApiKey: 1, MinVersion: 0, MaxVersion: 11}, // Fetch
{ApiKey: 2, MinVersion: 0, MaxVersion: 5}, // ListOffsets
{ApiKey: 3, MinVersion: 0, MaxVersion: 10}, // Metadata
{ApiKey: 18, MinVersion: 0, MaxVersion: 3}, // ApiVersions
}),
"MetadataRequest": sarama.NewMockMetadataResponse(t).
SetBroker(broker.Addr(), broker.BrokerID()),
})
}|
@ti-chi-bot: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
This is an automated cherry-pick of #12618
What problem does this PR solve?
TiCDC disabled Kafka producer retries to avoid the Sarama out-of-order retry issue tracked by #11935. With retries disabled, transient Kafka send errors such as broken pipe fail the sink immediately, create noisy sink-error metrics and alerts, and can trigger costly sink rebuilds.
Issue Number: close #12655, ref #11935
What is changed and how it works?
github.com/pingcap/sarama v1.41.2-pingcap-20260508, which includes the producer retry ordering fix.Producer.Retry.Maxto5by default while keepingNet.MaxOpenRequests = 1as an additional ordering guard.max-retryto configure SaramaProducer.Retry.Max;max-retry=0disables producer retry and negative values are ignored.Check List
Tests
go test ./pkg/sink/kafkaQuestions
Will it cause performance regression or break compatibility?
No breaking compatibility is expected. This intentionally changes Kafka sink behavior by enabling bounded producer retries for transient send failures. Users can set
max-retry=0in the sink URI to disable producer retry if needed.Do you need to update user documentation, design documentation or monitoring documentation?
User documentation should mention the new Kafka sink URI parameter
max-retry. No design or monitoring documentation update is required.Release note