Skip to content

kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618)#12659

Closed
ti-chi-bot wants to merge 1 commit into
pingcap:release-7.1from
ti-chi-bot:cherry-pick-12618-to-release-7.1
Closed

kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618)#12659
ti-chi-bot wants to merge 1 commit into
pingcap:release-7.1from
ti-chi-bot:cherry-pick-12618-to-release-7.1

Conversation

@ti-chi-bot

Copy link
Copy Markdown
Member

This is an automated cherry-pick of #12618

What problem does this PR solve?

TiCDC disabled Kafka producer retries to avoid the Sarama out-of-order retry issue tracked by #11935. With retries disabled, transient Kafka send errors such as broken pipe fail the sink immediately, create noisy sink-error metrics and alerts, and can trigger costly sink rebuilds.

Issue Number: close #12655, ref #11935

What is changed and how it works?

  • Update the Sarama replacement to github.com/pingcap/sarama v1.41.2-pingcap-20260508, which includes the producer retry ordering fix.
  • Re-enable bounded Kafka producer retry by setting Sarama Producer.Retry.Max to 5 by default while keeping Net.MaxOpenRequests = 1 as an additional ordering guard.
  • Add Kafka sink URI parameter max-retry to configure Sarama Producer.Retry.Max; max-retry=0 disables producer retry and negative values are ignored.
  • Remove the Kafka heartbeat keep-alive workaround and related interfaces/tests now that bounded producer retry handles transient stale-connection failures.
  • Simplify Kafka producer/admin factory code after removing the heartbeat path.

Check List

Tests

  • Unit test: go test ./pkg/sink/kafka

Questions

Will it cause performance regression or break compatibility?

No breaking compatibility is expected. This intentionally changes Kafka sink behavior by enabling bounded producer retries for transient send failures. Users can set max-retry=0 in the sink URI to disable producer retry if needed.

Do you need to update user documentation, design documentation or monitoring documentation?

User documentation should mention the new Kafka sink URI parameter max-retry. No design or monitoring documentation update is required.

Release note

Kafka sink now retries transient producer send failures by default.

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
@ti-chi-bot ti-chi-bot added do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. lgtm release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. type/cherry-pick-for-release-7.1 This PR is cherry-picked to release-7.1 from a source PR. labels May 29, 2026
@ti-chi-bot

ti-chi-bot Bot commented May 29, 2026

Copy link
Copy Markdown
Contributor

This cherry pick PR is for a release branch and has not yet been approved by triage owners.
Adding the do-not-merge/cherry-pick-not-approved label.

To merge this cherry pick:

  1. It must be LGTMed and approved by the reviewers firstly.
  2. For pull requests to TiDB-x branches, it must have no failed tests.
  3. AFTER it has lgtm and approved labels, please wait for the cherry-pick merging approval from triage owners.
Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@ti-chi-bot

Copy link
Copy Markdown
Member Author

@3AceShowHand This PR has conflicts, I have hold it.
Please resolve them or ask others to resolve them, then comment /unhold to remove the hold label.

@ti-chi-bot

ti-chi-bot Bot commented May 29, 2026

Copy link
Copy Markdown
Contributor

@ti-chi-bot: ## If you want to know how to resolve it, please read the guide in TiDB Dev Guide.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository.

@ti-chi-bot

ti-chi-bot Bot commented May 29, 2026

Copy link
Copy Markdown
Contributor

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign d3hunter for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new Pulsar DDL sink and updates the Kafka integration by bumping the Sarama library version and adding a configurable maximum retry limit to address connection issues. However, the changes contain numerous unresolved git merge conflict markers across multiple files that must be cleaned up before merging. Additionally, the new Pulsar DDL sink implementation has a potential resource leak where the Pulsar client is not closed on error paths, a copy-paste error returning a Kafka-specific error, and a log statement that incorrectly reports success before checking for errors.

Comment on lines +112 to +120
<<<<<<< HEAD
p := producerCreator(ctx, changefeed, syncProducer)
s := newDDLSink(ctx, changefeed, p, adminClient, topicManager, eventRouter, encoderBuilder, protocol)
log.Info("DDL sink created", zap.Duration("duration", time.Since(start)))
=======
ddlProducer := producerCreator(ctx, changefeedID, syncProducer)
s := newDDLSink(changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder.Build(), protocol)
log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start)))
>>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There are unresolved merge conflicts in this file. Please resolve them by keeping the HEAD version of the parameters, as the target branch uses ctx, changefeed and encoderBuilder instead of changefeedID and encoderBuilder.Build().

	p := producerCreator(ctx, changefeed, syncProducer)
	s := newDDLSink(ctx, changefeed, p, adminClient, topicManager, eventRouter, encoderBuilder, protocol)
	log.Info("DDL sink created", zap.Duration("duration", time.Since(start)))

Comment on lines +142 to +151
<<<<<<< HEAD
encoder := k.encoderBuilder.Build()
msg, err := encoder.EncodeCheckpointEvent(ts)
=======
var (
err error
partitionNum int32
)
msg, err := k.encoder.EncodeCheckpointEvent(ts)
>>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There are unresolved merge conflicts in this file. Please resolve them by keeping the HEAD version of the checkpoint encoding logic, which uses k.encoderBuilder.Build().

	encoder := k.encoderBuilder.Build()
	msg, err := encoder.EncodeCheckpointEvent(ts)

Comment on lines +70 to +81
<<<<<<< HEAD
defaultTopic: defaultTopic,
changefeedID: changefeedID,
admin: admin,
cfg: cfg,
metaRefreshTicker: time.NewTicker(metaRefreshInterval),
=======
defaultTopic: defaultTopic,
changefeedID: changefeedID,
admin: admin,
cfg: cfg,
>>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There are unresolved merge conflicts in this file. Please resolve them by keeping the 031ef7da65 version, which removes metaRefreshTicker from the struct initialization since it is now managed locally in backgroundRefreshMeta.

		defaultTopic: defaultTopic,
		changefeedID: changefeedID,
		admin:        admin,
		cfg:          cfg,

Comment on lines +111 to +115
<<<<<<< HEAD
=======
ticker := time.NewTicker(metaRefreshInterval)
defer ticker.Stop()
>>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There are unresolved merge conflicts in this file. Please resolve them by keeping the local ticker initialization from the 031ef7da65 version.

	ticker := time.NewTicker(metaRefreshInterval)
	defer ticker.Stop()

Comment thread pkg/sink/kafka/factory.go
Comment on lines +132 to +137
<<<<<<< HEAD
return p.producer.SendMessages(msgs)
=======
err := p.producer.SendMessages(msgs)
return cerror.WrapError(cerror.ErrKafkaSendMessage, err)
>>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There are unresolved merge conflicts here. Please resolve them by explicitly checking the error from SendMessages and wrapping it with cerror.WrapError only if it is non-nil.

	if err := p.producer.SendMessages(msgs); err != nil {
		return cerror.WrapError(cerror.ErrKafkaSendMessage, err)
	}
	return nil

Comment on lines +92 to +101
p, err := producerCreator(ctx, changefeedID, pConfig, client, replicaConfig.Sink)
log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start)))
if err != nil {
return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err)
}

topicManager, err := pulsarTopicManagerCreator(pConfig, client)
if err != nil {
return nil, errors.Trace(err)
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If producerCreator or pulsarTopicManagerCreator fails, the successfully created client is not closed, leading to a resource leak. Ensure that client is closed on these error paths.

Suggested change
p, err := producerCreator(ctx, changefeedID, pConfig, client, replicaConfig.Sink)
log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start)))
if err != nil {
return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err)
}
topicManager, err := pulsarTopicManagerCreator(pConfig, client)
if err != nil {
return nil, errors.Trace(err)
}
p, err := producerCreator(ctx, changefeedID, pConfig, client, replicaConfig.Sink)
if err != nil {
_ = client.Close()
return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err)
}
log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start)))
topicManager, err := pulsarTopicManagerCreator(pConfig, client)
if err != nil {
_ = p.Close()
_ = client.Close()
return nil, errors.Trace(err)
}


encoderBuilder, err := builder.NewRowEventEncoderBuilder(ctx, encoderConfig)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This is a Pulsar DDL Sink, but it returns cerror.ErrKafkaInvalidConfig on error. This copy-paste error should be corrected to use errors.Trace(err) to be consistent with other error paths in this file.

Suggested change
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
return nil, errors.Trace(err)

Comment on lines +92 to +96
p, err := producerCreator(ctx, changefeedID, pConfig, client, replicaConfig.Sink)
log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start)))
if err != nil {
return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err)
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The success log statement is executed before checking if producerCreator returned an error. If creation fails, an incorrect success log will still be emitted. Move the log statement after the error check.

Suggested change
p, err := producerCreator(ctx, changefeedID, pConfig, client, replicaConfig.Sink)
log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start)))
if err != nil {
return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err)
}
p, err := producerCreator(ctx, changefeedID, pConfig, client, replicaConfig.Sink)
if err != nil {
return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err)
}
log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start)))

@ti-chi-bot

ti-chi-bot Bot commented May 29, 2026

Copy link
Copy Markdown
Contributor

@ti-chi-bot: The following tests failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-verify fe59e88 link true /test pull-verify
pull-cdc-integration-mysql-test fe59e88 link true /test pull-cdc-integration-mysql-test
pull-cdc-integration-kafka-test fe59e88 link true /test pull-cdc-integration-kafka-test
pull-cdc-integration-storage-test fe59e88 link true /test pull-cdc-integration-storage-test
pull-dm-integration-test fe59e88 link true /test pull-dm-integration-test
pull-dm-compatibility-test fe59e88 link true /test pull-dm-compatibility-test

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

@ti-chi-bot ti-chi-bot Bot closed this Jun 1, 2026
@ti-chi-bot

ti-chi-bot Bot commented Jun 1, 2026

Copy link
Copy Markdown
Contributor

This pull request is closed because its related version has closed automatic cherry-picking.
If it's still needed, you can reopen it or just regenerate it using bot,
see:

https://prow.tidb.net/command-help#cherrypick
https://book.prow.tidb.net/#/plugins/cherrypicker

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/cherry-pick-not-approved do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. lgtm release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. type/cherry-pick-for-release-7.1 This PR is cherry-picked to release-7.1 from a source PR.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants