kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618)#12659
Conversation
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
|
This cherry pick PR is for a release branch and has not yet been approved by triage owners. To merge this cherry pick:
DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
@3AceShowHand This PR has conflicts, I have hold it. |
|
@ti-chi-bot: ## If you want to know how to resolve it, please read the guide in TiDB Dev Guide. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
There was a problem hiding this comment.
Code Review
This pull request introduces a new Pulsar DDL sink and updates the Kafka integration by bumping the Sarama library version and adding a configurable maximum retry limit to address connection issues. However, the changes contain numerous unresolved git merge conflict markers across multiple files that must be cleaned up before merging. Additionally, the new Pulsar DDL sink implementation has a potential resource leak where the Pulsar client is not closed on error paths, a copy-paste error returning a Kafka-specific error, and a log statement that incorrectly reports success before checking for errors.
| <<<<<<< HEAD | ||
| p := producerCreator(ctx, changefeed, syncProducer) | ||
| s := newDDLSink(ctx, changefeed, p, adminClient, topicManager, eventRouter, encoderBuilder, protocol) | ||
| log.Info("DDL sink created", zap.Duration("duration", time.Since(start))) | ||
| ======= | ||
| ddlProducer := producerCreator(ctx, changefeedID, syncProducer) | ||
| s := newDDLSink(changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder.Build(), protocol) | ||
| log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start))) | ||
| >>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618)) |
There was a problem hiding this comment.
There are unresolved merge conflicts in this file. Please resolve them by keeping the HEAD version of the parameters, as the target branch uses ctx, changefeed and encoderBuilder instead of changefeedID and encoderBuilder.Build().
p := producerCreator(ctx, changefeed, syncProducer)
s := newDDLSink(ctx, changefeed, p, adminClient, topicManager, eventRouter, encoderBuilder, protocol)
log.Info("DDL sink created", zap.Duration("duration", time.Since(start)))| <<<<<<< HEAD | ||
| encoder := k.encoderBuilder.Build() | ||
| msg, err := encoder.EncodeCheckpointEvent(ts) | ||
| ======= | ||
| var ( | ||
| err error | ||
| partitionNum int32 | ||
| ) | ||
| msg, err := k.encoder.EncodeCheckpointEvent(ts) | ||
| >>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618)) |
| <<<<<<< HEAD | ||
| defaultTopic: defaultTopic, | ||
| changefeedID: changefeedID, | ||
| admin: admin, | ||
| cfg: cfg, | ||
| metaRefreshTicker: time.NewTicker(metaRefreshInterval), | ||
| ======= | ||
| defaultTopic: defaultTopic, | ||
| changefeedID: changefeedID, | ||
| admin: admin, | ||
| cfg: cfg, | ||
| >>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618)) |
There was a problem hiding this comment.
There are unresolved merge conflicts in this file. Please resolve them by keeping the 031ef7da65 version, which removes metaRefreshTicker from the struct initialization since it is now managed locally in backgroundRefreshMeta.
defaultTopic: defaultTopic,
changefeedID: changefeedID,
admin: admin,
cfg: cfg,| <<<<<<< HEAD | ||
| ======= | ||
| ticker := time.NewTicker(metaRefreshInterval) | ||
| defer ticker.Stop() | ||
| >>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618)) |
| <<<<<<< HEAD | ||
| return p.producer.SendMessages(msgs) | ||
| ======= | ||
| err := p.producer.SendMessages(msgs) | ||
| return cerror.WrapError(cerror.ErrKafkaSendMessage, err) | ||
| >>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618)) |
There was a problem hiding this comment.
| p, err := producerCreator(ctx, changefeedID, pConfig, client, replicaConfig.Sink) | ||
| log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start))) | ||
| if err != nil { | ||
| return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err) | ||
| } | ||
|
|
||
| topicManager, err := pulsarTopicManagerCreator(pConfig, client) | ||
| if err != nil { | ||
| return nil, errors.Trace(err) | ||
| } |
There was a problem hiding this comment.
If producerCreator or pulsarTopicManagerCreator fails, the successfully created client is not closed, leading to a resource leak. Ensure that client is closed on these error paths.
| p, err := producerCreator(ctx, changefeedID, pConfig, client, replicaConfig.Sink) | |
| log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start))) | |
| if err != nil { | |
| return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err) | |
| } | |
| topicManager, err := pulsarTopicManagerCreator(pConfig, client) | |
| if err != nil { | |
| return nil, errors.Trace(err) | |
| } | |
| p, err := producerCreator(ctx, changefeedID, pConfig, client, replicaConfig.Sink) | |
| if err != nil { | |
| _ = client.Close() | |
| return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err) | |
| } | |
| log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start))) | |
| topicManager, err := pulsarTopicManagerCreator(pConfig, client) | |
| if err != nil { | |
| _ = p.Close() | |
| _ = client.Close() | |
| return nil, errors.Trace(err) | |
| } |
|
|
||
| encoderBuilder, err := builder.NewRowEventEncoderBuilder(ctx, encoderConfig) | ||
| if err != nil { | ||
| return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) |
There was a problem hiding this comment.
This is a Pulsar DDL Sink, but it returns cerror.ErrKafkaInvalidConfig on error. This copy-paste error should be corrected to use errors.Trace(err) to be consistent with other error paths in this file.
| return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) | |
| return nil, errors.Trace(err) |
| p, err := producerCreator(ctx, changefeedID, pConfig, client, replicaConfig.Sink) | ||
| log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start))) | ||
| if err != nil { | ||
| return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err) | ||
| } |
There was a problem hiding this comment.
The success log statement is executed before checking if producerCreator returned an error. If creation fails, an incorrect success log will still be emitted. Move the log statement after the error check.
| p, err := producerCreator(ctx, changefeedID, pConfig, client, replicaConfig.Sink) | |
| log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start))) | |
| if err != nil { | |
| return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err) | |
| } | |
| p, err := producerCreator(ctx, changefeedID, pConfig, client, replicaConfig.Sink) | |
| if err != nil { | |
| return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err) | |
| } | |
| log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start))) |
|
@ti-chi-bot: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
|
This pull request is closed because its related version has closed automatic cherry-picking. https://prow.tidb.net/command-help#cherrypick |
This is an automated cherry-pick of #12618
What problem does this PR solve?
TiCDC disabled Kafka producer retries to avoid the Sarama out-of-order retry issue tracked by #11935. With retries disabled, transient Kafka send errors such as broken pipe fail the sink immediately, create noisy sink-error metrics and alerts, and can trigger costly sink rebuilds.
Issue Number: close #12655, ref #11935
What is changed and how it works?
github.com/pingcap/sarama v1.41.2-pingcap-20260508, which includes the producer retry ordering fix.Producer.Retry.Maxto5by default while keepingNet.MaxOpenRequests = 1as an additional ordering guard.max-retryto configure SaramaProducer.Retry.Max;max-retry=0disables producer retry and negative values are ignored.Check List
Tests
go test ./pkg/sink/kafkaQuestions
Will it cause performance regression or break compatibility?
No breaking compatibility is expected. This intentionally changes Kafka sink behavior by enabling bounded producer retries for transient send failures. Users can set
max-retry=0in the sink URI to disable producer retry if needed.Do you need to update user documentation, design documentation or monitoring documentation?
User documentation should mention the new Kafka sink URI parameter
max-retry. No design or monitoring documentation update is required.Release note