Skip to content

kafka: bump sarama version and enable the retry to fix the broken pipe and out of order#5359

Merged
ti-chi-bot[bot] merged 5 commits into
pingcap:masterfrom
3AceShowHand:out-of-order-kafka
Jun 12, 2026
Merged

kafka: bump sarama version and enable the retry to fix the broken pipe and out of order#5359
ti-chi-bot[bot] merged 5 commits into
pingcap:masterfrom
3AceShowHand:out-of-order-kafka

Conversation

@3AceShowHand

@3AceShowHand 3AceShowHand commented Jun 12, 2026

Copy link
Copy Markdown
Collaborator

What problem does this PR solve?

Issue Number: close #1920

ref pingcap/tiflow#12618

Kafka sink can hit stale broker connections and return errors such as broken pipe. The old TiCDC-side Kafka heartbeat sent ApiVersions periodically and ignored errors, so it added background traffic but did not repair a bad connection. Producer retry was also disabled or overridden in different places because older Sarama retry behavior could reorder messages.

What is changed and how it works?

This PR migrates the relevant Kafka sink changes from pingcap/tiflow#12618:

  • Bump the PingCAP Sarama fork to v1.41.2-pingcap-20260508, which includes the partition-muting ordering fix.
  • Remove TiCDC's Kafka application-level heartbeat from the DML, DDL/checkpoint, admin, and topic manager paths.
  • Initialize config.Producer.Retry.Max from Kafka sink options for all Kafka producers.
  • Add a max-retry sink URI parameter. Non-negative values are accepted; negative values are ignored and keep the default.
  • Set the default Kafka producer retry budget to 5.
  • Keep Net.MaxOpenRequests = 1 as an extra ordering guard.

Check List

Tests

  • Unit test:
    • go test --tags=intest ./pkg/sink/kafka ./downstreamadapter/sink/kafka ./downstreamadapter/sink/topicmanager

Questions

Will it cause performance regression or break compatibility?
  • No performance regression
  • No break compatibility since it's internal mechanism change.
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Kafka sink now retries transient producer send failures by default.

@ti-chi-bot ti-chi-bot Bot added do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. labels Jun 12, 2026
@coderabbitai

coderabbitai Bot commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

📝 Walkthrough

Walkthrough

Removes Heartbeat() methods and ticker-driven Heartbeat calls, reworks topic manager background refresh to use a cancellable context and longer interval, adds MaxRetry option and related Sarama config/test changes, and updates the pinned pingcap/sarama pseudo-version in go.mod.

Changes

Sarama Dependency Pin

Layer / File(s) Summary
Sarama dependency version update
go.mod
Replace directive for github.com/IBM/sarama v1.41.2 updated to github.com/pingcap/sarama v1.41.2-pingcap-20260508.

Kafka heartbeat, options, and sarama changes

Layer / File(s) Summary
Remove Heartbeat from producer/admin interfaces and implementations
pkg/sink/kafka/factory.go, pkg/sink/kafka/cluster_admin_client.go, pkg/sink/kafka/admin.go, pkg/sink/kafka/sarama_async_producer.go, pkg/sink/kafka/sarama_sync_producer.go
Deletes Heartbeat() from SyncProducer, AsyncProducer, and ClusterAdminClient interfaces and removes their Heartbeat() implementations.
Remove mock Heartbeat stubs and update sink tests
pkg/sink/kafka/mock_cluster_admin_client.go, pkg/sink/kafka/mock_factory.go, downstreamadapter/sink/kafka/sink_test.go
Removes no-op Heartbeat() methods from mocks and adjusts sink test imports/struct layout for heartbeat-related tests.
Sink loops: stop periodic Heartbeat calls
downstreamadapter/sink/kafka/sink.go
Removes 5s ticker and periodic Heartbeat() calls from DML and checkpoint send loops; loops now react to ctx.Done() and channel inputs only.
Topic manager: background refresh uses cancellable context and longer interval
downstreamadapter/sink/topicmanager/kafka_topic_manager.go
Replaces manager-held ticker with context.CancelFunc, starts backgroundRefreshMeta with a cancellable context, and uses metaRefreshInterval ticks for metadata updates (removes admin.Heartbeat branch).
Add MaxRetry option and remove KeepConnAlive handling
pkg/sink/kafka/options.go, pkg/sink/kafka/options_test.go
Adds defaultMaxRetry and MaxRetry option, parses max-retry from sink URI, removes KeepConnAliveInterval and connections.max.idle.ms handling, and updates tests to cover valid/invalid max-retry cases.
Sarama config: use MaxRetry and admin timeouts
pkg/sink/kafka/sarama_config.go, pkg/sink/kafka/sarama_config_test.go
Set admin retry/backoff and Admin.Timeout, and configure cfg.Producer.Retry.Max = o.MaxRetry; tests verify default and URI-driven MaxRetry behavior.
Factory: wire MetricRegistry
pkg/sink/kafka/sarama_factory.go
Sets config.MetricRegistry = f.metricRegistry before creating Sarama client; removes local hardcoded Producer.Retry.Max in factory.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

🐰 I nibbled code where heartbeats slept,
Tickers hushed and loops ran clean and light,
Topics breathe on calmer, cancellable nights,
Retries tuned, metrics wired just right,
Hop—go.mod pinned, and tests passed bright.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 66.67% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly summarizes the main changes: bumping sarama version and enabling retry functionality to address broken pipe and out-of-order issues, which aligns with the code changes.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description check ✅ Passed The PR description is well-structured and addresses all required sections with substantial detail about the problem, changes, testing, and release notes.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ti-chi-bot ti-chi-bot Bot added the size/XS Denotes a PR that changes 0-9 lines, ignoring generated files. label Jun 12, 2026

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request updates the replacement dependency for github.com/IBM/sarama to github.com/pingcap/sarama v1.41.2-pingcap-20260508 in go.mod and updates the corresponding checksums in go.sum. There are no review comments, and I have no feedback to provide.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

@ti-chi-bot ti-chi-bot Bot added size/L Denotes a PR that changes 100-499 lines, ignoring generated files. and removed size/XS Denotes a PR that changes 0-9 lines, ignoring generated files. labels Jun 12, 2026

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
downstreamadapter/sink/topicmanager/kafka_topic_manager.go (1)

112-129: ⚠️ Potential issue | 🟠 Major

Fix TestKafkaTopicManagerHeartbeat to match the new metadata refresh loop (and make it deterministic).

TestKafkaTopicManagerHeartbeat still mocks and waits for Heartbeat() calls (~11s), but kafkaTopicManager.backgroundRefreshMeta never calls Heartbeat(); it only calls fetchAllTopicsPartitionsNum()admin.GetTopicsPartitionsNum(...) on the fixed metaRefreshInterval (10 minutes). This means the test is asserting behavior the implementation can’t produce.

Update the test to validate cancellation/close behavior via GetTopicsPartitionsNum call counting, but you’ll need to inject/override the refresh interval or ticker (so the test doesn’t have to wait 10 minutes) before asserting that Close() stops further refresh calls.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@downstreamadapter/sink/topicmanager/kafka_topic_manager.go` around lines 112
- 129, The test is asserting Heartbeat calls but backgroundRefreshMeta only
calls fetchAllTopicsPartitionsNum -> admin.GetTopicsPartitionsNum on a fixed
metaRefreshInterval; make the refresh interval/ticker injectable (e.g., add a
configurable metaRefreshInterval or ticker to kafkaTopicManager and use it in
backgroundRefreshMeta) so tests can set a short interval, update
TestKafkaTopicManagerHeartbeat to count/mock admin.GetTopicsPartitionsNum
invocations (via fetchAllTopicsPartitionsNum) instead of Heartbeat, start the
manager with the injected short interval, call Close/Cancel, and assert that
GetTopicsPartitionsNum call count stops increasing after Close to
deterministically verify cancellation.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@downstreamadapter/sink/topicmanager/kafka_topic_manager.go`:
- Around line 112-129: The test is asserting Heartbeat calls but
backgroundRefreshMeta only calls fetchAllTopicsPartitionsNum ->
admin.GetTopicsPartitionsNum on a fixed metaRefreshInterval; make the refresh
interval/ticker injectable (e.g., add a configurable metaRefreshInterval or
ticker to kafkaTopicManager and use it in backgroundRefreshMeta) so tests can
set a short interval, update TestKafkaTopicManagerHeartbeat to count/mock
admin.GetTopicsPartitionsNum invocations (via fetchAllTopicsPartitionsNum)
instead of Heartbeat, start the manager with the injected short interval, call
Close/Cancel, and assert that GetTopicsPartitionsNum call count stops increasing
after Close to deterministically verify cancellation.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 3b3c78db-932e-433c-ad6a-a38a5ca23928

📥 Commits

Reviewing files that changed from the base of the PR and between 26eb3bf and 63022e2.

📒 Files selected for processing (10)
  • downstreamadapter/sink/kafka/sink.go
  • downstreamadapter/sink/kafka/sink_test.go
  • downstreamadapter/sink/topicmanager/kafka_topic_manager.go
  • pkg/sink/kafka/admin.go
  • pkg/sink/kafka/cluster_admin_client.go
  • pkg/sink/kafka/factory.go
  • pkg/sink/kafka/mock_cluster_admin_client.go
  • pkg/sink/kafka/mock_factory.go
  • pkg/sink/kafka/sarama_async_producer.go
  • pkg/sink/kafka/sarama_sync_producer.go
💤 Files with no reviewable changes (8)
  • pkg/sink/kafka/cluster_admin_client.go
  • pkg/sink/kafka/sarama_async_producer.go
  • pkg/sink/kafka/sarama_sync_producer.go
  • pkg/sink/kafka/mock_factory.go
  • downstreamadapter/sink/kafka/sink.go
  • pkg/sink/kafka/mock_cluster_admin_client.go
  • pkg/sink/kafka/factory.go
  • pkg/sink/kafka/admin.go

@ti-chi-bot ti-chi-bot Bot added size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. and removed size/L Denotes a PR that changes 100-499 lines, ignoring generated files. labels Jun 12, 2026

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@pkg/sink/kafka/options.go`:
- Around line 264-266: The code currently accepts non-negative max-retry and
sets o.MaxRetry from urlParameter.MaxRetry (in options.go) and then wires that
to Sarama via config.Producer.Retry.Max (in sarama_config.go), but Sarama treats
0 as “no automatic retries” which disables broken-pipe retry; update the code
and docs to make this explicit by adding a comment/docstring and an operational
warning: when urlParameter.MaxRetry is present and set to 0 (o.MaxRetry == 0)
emit a clear WARN via the sink’s logger (or add to the config documentation for
the Kafka sink) stating “max-retry=0 disables Sarama automatic producer retries
(broken-pipe will not be retried)”; reference the fields o.MaxRetry,
urlParameter.MaxRetry and config.Producer.Retry.Max so maintainers can locate
and update options.go and sarama_config.go accordingly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: abd16ec8-ce07-48ff-bdad-5da177f4854a

📥 Commits

Reviewing files that changed from the base of the PR and between 63022e2 and b57b006.

📒 Files selected for processing (7)
  • downstreamadapter/sink/kafka/sink_test.go
  • pkg/sink/kafka/mock_factory.go
  • pkg/sink/kafka/options.go
  • pkg/sink/kafka/options_test.go
  • pkg/sink/kafka/sarama_config.go
  • pkg/sink/kafka/sarama_config_test.go
  • pkg/sink/kafka/sarama_factory.go
💤 Files with no reviewable changes (2)
  • pkg/sink/kafka/sarama_factory.go
  • downstreamadapter/sink/kafka/sink_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • pkg/sink/kafka/mock_factory.go

Comment thread pkg/sink/kafka/options.go
@3AceShowHand

Copy link
Copy Markdown
Collaborator Author

/test all

@ti-chi-bot ti-chi-bot Bot added needs-1-more-lgtm Indicates a PR needs 1 more LGTM. approved labels Jun 12, 2026
@ti-chi-bot

ti-chi-bot Bot commented Jun 12, 2026

Copy link
Copy Markdown

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: tenfyzhong, wk989898

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Details Needs approval from an approver in each of these files:
  • OWNERS [tenfyzhong,wk989898]

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot Bot added lgtm and removed needs-1-more-lgtm Indicates a PR needs 1 more LGTM. labels Jun 12, 2026
@ti-chi-bot

ti-chi-bot Bot commented Jun 12, 2026

Copy link
Copy Markdown

[LGTM Timeline notifier]

Timeline:

  • 2026-06-12 04:00:42.067997006 +0000 UTC m=+1105343.138314395: ☑️ agreed by wk989898.
  • 2026-06-12 05:27:25.726855713 +0000 UTC m=+1110546.797173103: ☑️ agreed by tenfyzhong.

@ti-chi-bot ti-chi-bot Bot merged commit 15a8703 into pingcap:master Jun 12, 2026
26 checks passed
@3AceShowHand

Copy link
Copy Markdown
Collaborator Author

/cherry-pick release-8.5

@ti-chi-bot

Copy link
Copy Markdown
Member

@3AceShowHand: new pull request created to branch release-8.5: #5370.
But this PR has conflicts, please resolve them!

Details

In response to this:

/cherry-pick release-8.5

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

approved lgtm release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XL Denotes a PR that changes 500-999 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

kafka: Consider reintroducing retry (or at least make it configurable)

4 participants