kafka: bump sarama version and enable the retry to fix the broken pipe and out of order#5359
Conversation
📝 WalkthroughWalkthroughRemoves Heartbeat() methods and ticker-driven Heartbeat calls, reworks topic manager background refresh to use a cancellable context and longer interval, adds MaxRetry option and related Sarama config/test changes, and updates the pinned pingcap/sarama pseudo-version in go.mod. ChangesSarama Dependency Pin
Kafka heartbeat, options, and sarama changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request updates the replacement dependency for github.com/IBM/sarama to github.com/pingcap/sarama v1.41.2-pingcap-20260508 in go.mod and updates the corresponding checksums in go.sum. There are no review comments, and I have no feedback to provide.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
downstreamadapter/sink/topicmanager/kafka_topic_manager.go (1)
112-129:⚠️ Potential issue | 🟠 MajorFix
TestKafkaTopicManagerHeartbeatto match the new metadata refresh loop (and make it deterministic).
TestKafkaTopicManagerHeartbeatstill mocks and waits forHeartbeat()calls (~11s), butkafkaTopicManager.backgroundRefreshMetanever callsHeartbeat(); it only callsfetchAllTopicsPartitionsNum()→admin.GetTopicsPartitionsNum(...)on the fixedmetaRefreshInterval(10 minutes). This means the test is asserting behavior the implementation can’t produce.Update the test to validate cancellation/close behavior via
GetTopicsPartitionsNumcall counting, but you’ll need to inject/override the refresh interval or ticker (so the test doesn’t have to wait 10 minutes) before asserting thatClose()stops further refresh calls.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@downstreamadapter/sink/topicmanager/kafka_topic_manager.go` around lines 112 - 129, The test is asserting Heartbeat calls but backgroundRefreshMeta only calls fetchAllTopicsPartitionsNum -> admin.GetTopicsPartitionsNum on a fixed metaRefreshInterval; make the refresh interval/ticker injectable (e.g., add a configurable metaRefreshInterval or ticker to kafkaTopicManager and use it in backgroundRefreshMeta) so tests can set a short interval, update TestKafkaTopicManagerHeartbeat to count/mock admin.GetTopicsPartitionsNum invocations (via fetchAllTopicsPartitionsNum) instead of Heartbeat, start the manager with the injected short interval, call Close/Cancel, and assert that GetTopicsPartitionsNum call count stops increasing after Close to deterministically verify cancellation.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@downstreamadapter/sink/topicmanager/kafka_topic_manager.go`:
- Around line 112-129: The test is asserting Heartbeat calls but
backgroundRefreshMeta only calls fetchAllTopicsPartitionsNum ->
admin.GetTopicsPartitionsNum on a fixed metaRefreshInterval; make the refresh
interval/ticker injectable (e.g., add a configurable metaRefreshInterval or
ticker to kafkaTopicManager and use it in backgroundRefreshMeta) so tests can
set a short interval, update TestKafkaTopicManagerHeartbeat to count/mock
admin.GetTopicsPartitionsNum invocations (via fetchAllTopicsPartitionsNum)
instead of Heartbeat, start the manager with the injected short interval, call
Close/Cancel, and assert that GetTopicsPartitionsNum call count stops increasing
after Close to deterministically verify cancellation.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 3b3c78db-932e-433c-ad6a-a38a5ca23928
📒 Files selected for processing (10)
downstreamadapter/sink/kafka/sink.godownstreamadapter/sink/kafka/sink_test.godownstreamadapter/sink/topicmanager/kafka_topic_manager.gopkg/sink/kafka/admin.gopkg/sink/kafka/cluster_admin_client.gopkg/sink/kafka/factory.gopkg/sink/kafka/mock_cluster_admin_client.gopkg/sink/kafka/mock_factory.gopkg/sink/kafka/sarama_async_producer.gopkg/sink/kafka/sarama_sync_producer.go
💤 Files with no reviewable changes (8)
- pkg/sink/kafka/cluster_admin_client.go
- pkg/sink/kafka/sarama_async_producer.go
- pkg/sink/kafka/sarama_sync_producer.go
- pkg/sink/kafka/mock_factory.go
- downstreamadapter/sink/kafka/sink.go
- pkg/sink/kafka/mock_cluster_admin_client.go
- pkg/sink/kafka/factory.go
- pkg/sink/kafka/admin.go
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@pkg/sink/kafka/options.go`:
- Around line 264-266: The code currently accepts non-negative max-retry and
sets o.MaxRetry from urlParameter.MaxRetry (in options.go) and then wires that
to Sarama via config.Producer.Retry.Max (in sarama_config.go), but Sarama treats
0 as “no automatic retries” which disables broken-pipe retry; update the code
and docs to make this explicit by adding a comment/docstring and an operational
warning: when urlParameter.MaxRetry is present and set to 0 (o.MaxRetry == 0)
emit a clear WARN via the sink’s logger (or add to the config documentation for
the Kafka sink) stating “max-retry=0 disables Sarama automatic producer retries
(broken-pipe will not be retried)”; reference the fields o.MaxRetry,
urlParameter.MaxRetry and config.Producer.Retry.Max so maintainers can locate
and update options.go and sarama_config.go accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: abd16ec8-ce07-48ff-bdad-5da177f4854a
📒 Files selected for processing (7)
downstreamadapter/sink/kafka/sink_test.gopkg/sink/kafka/mock_factory.gopkg/sink/kafka/options.gopkg/sink/kafka/options_test.gopkg/sink/kafka/sarama_config.gopkg/sink/kafka/sarama_config_test.gopkg/sink/kafka/sarama_factory.go
💤 Files with no reviewable changes (2)
- pkg/sink/kafka/sarama_factory.go
- downstreamadapter/sink/kafka/sink_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/sink/kafka/mock_factory.go
|
/test all |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: tenfyzhong, wk989898 The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
[LGTM Timeline notifier]Timeline:
|
|
/cherry-pick release-8.5 |
|
@3AceShowHand: new pull request created to branch DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
What problem does this PR solve?
Issue Number: close #1920
ref pingcap/tiflow#12618
Kafka sink can hit stale broker connections and return errors such as
broken pipe. The old TiCDC-side Kafka heartbeat sentApiVersionsperiodically and ignored errors, so it added background traffic but did not repair a bad connection. Producer retry was also disabled or overridden in different places because older Sarama retry behavior could reorder messages.What is changed and how it works?
This PR migrates the relevant Kafka sink changes from pingcap/tiflow#12618:
v1.41.2-pingcap-20260508, which includes the partition-muting ordering fix.config.Producer.Retry.Maxfrom Kafka sink options for all Kafka producers.max-retrysink URI parameter. Non-negative values are accepted; negative values are ignored and keep the default.5.Net.MaxOpenRequests = 1as an extra ordering guard.Check List
Tests
go test --tags=intest ./pkg/sink/kafka ./downstreamadapter/sink/kafka ./downstreamadapter/sink/topicmanagerQuestions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note