Skip to content

kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618)#12660

Open
ti-chi-bot wants to merge 1 commit into
pingcap:release-8.1from
ti-chi-bot:cherry-pick-12618-to-release-8.1
Open

kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618)#12660
ti-chi-bot wants to merge 1 commit into
pingcap:release-8.1from
ti-chi-bot:cherry-pick-12618-to-release-8.1

Conversation

@ti-chi-bot

Copy link
Copy Markdown
Member

This is an automated cherry-pick of #12618

What problem does this PR solve?

TiCDC disabled Kafka producer retries to avoid the Sarama out-of-order retry issue tracked by #11935. With retries disabled, transient Kafka send errors such as broken pipe fail the sink immediately, create noisy sink-error metrics and alerts, and can trigger costly sink rebuilds.

Issue Number: close #12655, ref #11935

What is changed and how it works?

  • Update the Sarama replacement to github.com/pingcap/sarama v1.41.2-pingcap-20260508, which includes the producer retry ordering fix.
  • Re-enable bounded Kafka producer retry by setting Sarama Producer.Retry.Max to 5 by default while keeping Net.MaxOpenRequests = 1 as an additional ordering guard.
  • Add Kafka sink URI parameter max-retry to configure Sarama Producer.Retry.Max; max-retry=0 disables producer retry and negative values are ignored.
  • Remove the Kafka heartbeat keep-alive workaround and related interfaces/tests now that bounded producer retry handles transient stale-connection failures.
  • Simplify Kafka producer/admin factory code after removing the heartbeat path.

Check List

Tests

  • Unit test: go test ./pkg/sink/kafka

Questions

Will it cause performance regression or break compatibility?

No breaking compatibility is expected. This intentionally changes Kafka sink behavior by enabling bounded producer retries for transient send failures. Users can set max-retry=0 in the sink URI to disable producer retry if needed.

Do you need to update user documentation, design documentation or monitoring documentation?

User documentation should mention the new Kafka sink URI parameter max-retry. No design or monitoring documentation update is required.

Release note

Kafka sink now retries transient producer send failures by default.

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
@ti-chi-bot ti-chi-bot added do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. lgtm release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. type/cherry-pick-for-release-8.1 This PR is cherry-picked to release-8.1 from a source PR. labels May 29, 2026
@ti-chi-bot

ti-chi-bot Bot commented May 29, 2026

Copy link
Copy Markdown
Contributor

This cherry pick PR is for a release branch and has not yet been approved by triage owners.
Adding the do-not-merge/cherry-pick-not-approved label.

To merge this cherry pick:

  1. It must be LGTMed and approved by the reviewers firstly.
  2. For pull requests to TiDB-x branches, it must have no failed tests.
  3. AFTER it has lgtm and approved labels, please wait for the cherry-pick merging approval from triage owners.
Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@ti-chi-bot

Copy link
Copy Markdown
Member Author

@3AceShowHand This PR has conflicts, I have hold it.
Please resolve them or ask others to resolve them, then comment /unhold to remove the hold label.

@ti-chi-bot

ti-chi-bot Bot commented May 29, 2026

Copy link
Copy Markdown
Contributor

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign july2993 for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot

ti-chi-bot Bot commented May 29, 2026

Copy link
Copy Markdown
Contributor

@ti-chi-bot: ## If you want to know how to resolve it, please read the guide in TiDB Dev Guide.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository.

@ti-chi-bot ti-chi-bot Bot added size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. and removed size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. labels May 29, 2026

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request bumps the Sarama version and introduces a configurable MaxRetry option for the Kafka producer to resolve broken pipe and out-of-order issues. However, the PR cannot be merged in its current state due to numerous unresolved git conflict markers across several files, including Go source files, tests, and go.mod. Additionally, there is an unused variable partitionNum in mq_ddl_sink.go that will cause a compilation failure and must be removed.

Comment on lines +115 to +119
<<<<<<< HEAD
s := newDDLSink(ctx, changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder, protocol)
=======
s := newDDLSink(changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder.Build(), protocol)
>>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved git conflict markers found in this file. Please resolve the conflict by keeping the cherry-picked version.

	s := newDDLSink(changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder.Build(), protocol)

Comment on lines +159 to +168
<<<<<<< HEAD
encoder := k.encoderBuilder.Build()
msg, err := encoder.EncodeCheckpointEvent(ts)
=======
var (
err error
partitionNum int32
)
msg, err := k.encoder.EncodeCheckpointEvent(ts)
>>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved git conflict markers found in this file. Additionally, the variable partitionNum is declared but not used, which will cause a compilation error in Go. Please resolve the conflict and remove the unused variable declaration.

	msg, err := k.encoder.EncodeCheckpointEvent(ts)

Comment on lines +293 to +296
<<<<<<< HEAD
=======
require.Equal(t, PartitionAll, getDDLDispatchRule(config.ProtocolDebezium))
>>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved git conflict markers found in this file. Please resolve the conflict by keeping the cherry-picked version.

	require.Equal(t, PartitionAll, getDDLDispatchRule(config.ProtocolDebezium))

Comment on lines +103 to +107
<<<<<<< HEAD
s := newDDLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, encoderBuilder, protocol)
=======
s := newDDLSink(changefeedID, p, nil, topicManager, eventRouter, encoderBuilder.Build(), protocol)
>>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved git conflict markers found in this file. Please resolve the conflict by keeping the cherry-picked version.

	s := newDDLSink(changefeedID, p, nil, topicManager, eventRouter, encoderBuilder.Build(), protocol)

Comment on lines +51 to +55
<<<<<<< HEAD
metaRefreshTicker *time.Ticker

=======
>>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved git conflict markers found in this file. Please resolve the conflict by removing the metaRefreshTicker field from the struct as it is now managed locally within the background refresh goroutine.

Comment thread pkg/sink/kafka/factory.go
Comment on lines +93 to +99
<<<<<<< HEAD
client sarama.Client
producer sarama.SyncProducer
=======
producer sarama.SyncProducer
client sarama.Client
>>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved git conflict markers found in this file. Please resolve the conflict by keeping the cherry-picked version.

	producer sarama.SyncProducer
	client   sarama.Client

Comment thread pkg/sink/kafka/factory.go
Comment on lines +129 to +134
<<<<<<< HEAD
return p.producer.SendMessages(msgs)
=======
err := p.producer.SendMessages(msgs)
return cerror.WrapError(cerror.ErrKafkaSendMessage, err)
>>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved git conflict markers found in this file. Please resolve the conflict by keeping the cherry-picked version.

	err := p.producer.SendMessages(msgs)
	return cerror.WrapError(cerror.ErrKafkaSendMessage, err)

Comment thread pkg/sink/kafka/options.go
Comment on lines +191 to +194
<<<<<<< HEAD
=======
MaxRetry: defaultProducerMaxRetry,
>>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved git conflict markers found in this file. Please resolve the conflict by keeping the cherry-picked version.

		MaxRetry:           defaultProducerMaxRetry,

Comment on lines +123 to +129
<<<<<<< HEAD
client: client,
producer: p,
=======
producer: p,
client: client,
>>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved git conflict markers found in this file. Please resolve the conflict by keeping the cherry-picked version.

		producer: p,
		client:   client,

Comment on lines +98 to +114
<<<<<<< HEAD
=======
func setProducerMockHandlers(t *testing.T, broker *sarama.MockBroker) {
broker.SetHandlerByMap(map[string]sarama.MockResponse{
"ApiVersionsRequest": sarama.NewMockApiVersionsResponse(t).SetApiKeys([]sarama.ApiVersionsResponseKey{
{ApiKey: 0, MinVersion: 0, MaxVersion: 8}, // Produce
{ApiKey: 1, MinVersion: 0, MaxVersion: 11}, // Fetch
{ApiKey: 2, MinVersion: 0, MaxVersion: 5}, // ListOffsets
{ApiKey: 3, MinVersion: 0, MaxVersion: 10}, // Metadata
{ApiKey: 18, MinVersion: 0, MaxVersion: 3}, // ApiVersions
}),
"MetadataRequest": sarama.NewMockMetadataResponse(t).
SetBroker(broker.Addr(), broker.BrokerID()),
})
}

>>>>>>> 031ef7da65 (kafka: bump sarama version and enable the retry to fix the broken pipe and out of order (#12618))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved git conflict markers found in this file. Please resolve the conflict by keeping the cherry-picked version.

func setProducerMockHandlers(t *testing.T, broker *sarama.MockBroker) {
	broker.SetHandlerByMap(map[string]sarama.MockResponse{
		"ApiVersionsRequest": sarama.NewMockApiVersionsResponse(t).SetApiKeys([]sarama.ApiVersionsResponseKey{
			{ApiKey: 0, MinVersion: 0, MaxVersion: 8},  // Produce
			{ApiKey: 1, MinVersion: 0, MaxVersion: 11}, // Fetch
			{ApiKey: 2, MinVersion: 0, MaxVersion: 5},  // ListOffsets
			{ApiKey: 3, MinVersion: 0, MaxVersion: 10}, // Metadata
			{ApiKey: 18, MinVersion: 0, MaxVersion: 3}, // ApiVersions
		}),
		"MetadataRequest": sarama.NewMockMetadataResponse(t).
			SetBroker(broker.Addr(), broker.BrokerID()),
	})
}

@ti-chi-bot

ti-chi-bot Bot commented May 29, 2026

Copy link
Copy Markdown
Contributor

@ti-chi-bot: The following tests failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-verify 27f385a link true /test pull-verify
pull-dm-integration-test 27f385a link true /test pull-dm-integration-test
pull-cdc-integration-storage-test 27f385a link true /test pull-cdc-integration-storage-test
pull-cdc-integration-pulsar-test 27f385a link true /test pull-cdc-integration-pulsar-test
pull-cdc-integration-mysql-test 27f385a link true /test pull-cdc-integration-mysql-test
pull-cdc-integration-kafka-test 27f385a link true /test pull-cdc-integration-kafka-test
pull-dm-compatibility-test 27f385a link true /test pull-dm-compatibility-test

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/cherry-pick-not-approved do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. lgtm release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. type/cherry-pick-for-release-8.1 This PR is cherry-picked to release-8.1 from a source PR.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants