Skip to content

kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112)#12679

Open
ti-chi-bot wants to merge 1 commit into
pingcap:release-8.1from
ti-chi-bot:cherry-pick-12112-to-release-8.1
Open

kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112)#12679
ti-chi-bot wants to merge 1 commit into
pingcap:release-8.1from
ti-chi-bot:cherry-pick-12112-to-release-8.1

Conversation

@ti-chi-bot

Copy link
Copy Markdown
Member

This is an automated cherry-pick of #12112

What problem does this PR solve?

Issue Number: close #12666, close #12096

What is changed and how it works?

  • owner/ddl_sink should close if meet any error, and rebuilt by the retry.
  • kafka factory implementations should wrap error immediately, and caller should just return error, no need to trace the error.
  • kafka ddl sink use encoder directly, instead of encoderBuilder
  • syncProducer no need to close in a goroutine, since it only deliver one message at a time, won't be blocked if the downstream kafka cluster unavailable.

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

`None`

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
@ti-chi-bot ti-chi-bot added do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. lgtm release-note-none Denotes a PR that doesn't merit a release note. size/L Denotes a PR that changes 100-499 lines, ignoring generated files. type/cherry-pick-for-release-8.1 This PR is cherry-picked to release-8.1 from a source PR. labels Jun 5, 2026
@ti-chi-bot

ti-chi-bot Bot commented Jun 5, 2026

Copy link
Copy Markdown
Contributor

This cherry pick PR is for a release branch and has not yet been approved by triage owners.
Adding the do-not-merge/cherry-pick-not-approved label.

To merge this cherry pick:

  1. It must be LGTMed and approved by the reviewers firstly.
  2. For pull requests to TiDB-x branches, it must have no failed tests.
  3. AFTER it has lgtm and approved labels, please wait for the cherry-pick merging approval from triage owners.
Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@ti-chi-bot

Copy link
Copy Markdown
Member Author

@3AceShowHand This PR has conflicts, I have hold it.
Please resolve them or ask others to resolve them, then comment /unhold to remove the hold label.

@ti-chi-bot

ti-chi-bot Bot commented Jun 5, 2026

Copy link
Copy Markdown
Contributor

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign tammyxia for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot

ti-chi-bot Bot commented Jun 5, 2026

Copy link
Copy Markdown
Contributor

@ti-chi-bot: ## If you want to know how to resolve it, please read the guide in TiDB Dev Guide.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the DDL sink to close the underlying sink upon failure and simplifies the Kafka DDL sink implementation. However, the changes introduce several unresolved merge conflict markers across multiple files, including kafka_ddl_producer.go, kafka_ddl_sink.go, pulsar_ddl_sink.go, factory.go, and sarama_factory.go, which must be resolved to prevent compilation failures. Additionally, a duplicate import alias cerror was introduced in pkg/sink/kafka/v2/factory.go and should be removed in favor of using the existing errors package directly.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +72 to +78
<<<<<<< HEAD
err := k.syncProducer.SendMessages(ctx, topic,
totalPartitionsNum, message.Key, message.Value)
return cerror.WrapError(cerror.ErrKafkaSendMessage, err)
=======
return k.syncProducer.SendMessages(ctx, topic, totalPartitionsNum, message)
>>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved merge conflict markers found. Since the SyncProducer interface in pkg/sink/kafka/factory.go expects key []byte, value []byte for SendMessages, choosing the incoming branch version (message *common.Message) will break the interface implementation. Please resolve the conflict by keeping the HEAD version.

		err := k.syncProducer.SendMessages(ctx, topic,
			totalPartitionsNum, message.Key, message.Value)
		return cerror.WrapError(cerror.ErrKafkaSendMessage, err)

Comment on lines +96 to +102
<<<<<<< HEAD
err := k.syncProducer.SendMessage(ctx, topic,
partitionNum, message.Key, message.Value)
return cerror.WrapError(cerror.ErrKafkaSendMessage, err)
=======
return k.syncProducer.SendMessage(ctx, topic, partitionNum, message)
>>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved merge conflict markers found. Since SyncProducer.SendMessage expects key []byte, value []byte, please resolve the conflict by keeping the HEAD version.

		err := k.syncProducer.SendMessage(ctx, topic,
			partitionNum, message.Key, message.Value)
		return cerror.WrapError(cerror.ErrKafkaSendMessage, err)

Comment on lines +115 to +119
<<<<<<< HEAD
s := newDDLSink(ctx, changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder, protocol)
=======
s := newDDLSink(changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder.Build(), protocol)
>>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved merge conflict markers found. Based on the signature of newDDLSink in mq_ddl_sink.go, it requires ctx as the first argument and codec.RowEventEncoder (which is obtained via encoderBuilder.Build()). Please resolve the conflict accordingly.

	s := newDDLSink(ctx, changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder.Build(), protocol)

Comment on lines +103 to +107
<<<<<<< HEAD
s := newDDLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, encoderBuilder, protocol)
=======
s := newDDLSink(changefeedID, p, nil, topicManager, eventRouter, encoderBuilder.Build(), protocol)
>>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved merge conflict markers found. Based on the signature of newDDLSink in mq_ddl_sink.go, it requires ctx as the first argument and codec.RowEventEncoder (which is obtained via encoderBuilder.Build()). Please resolve the conflict accordingly.

	s := newDDLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, encoderBuilder.Build(), protocol)

Comment thread pkg/sink/kafka/factory.go
Comment on lines +106 to +113
<<<<<<< HEAD
func (p *saramaSyncProducer) SendMessages(ctx context.Context,
topic string, partitionNum int32,
key []byte, value []byte,
) error {
=======
func (p *saramaSyncProducer) SendMessages(_ context.Context, topic string, partitionNum int32, message *common.Message) error {
>>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved merge conflict markers found. Since the SyncProducer interface expects key []byte, value []byte for SendMessages, please keep the HEAD version of the signature to avoid compilation errors.

func (p *saramaSyncProducer) SendMessages(ctx context.Context,
	topic string, partitionNum int32,
	key []byte, value []byte,
) error {

Comment on lines +111 to +120
<<<<<<< HEAD

client, err := newSaramaClientImpl(f.option.BrokerEndpoints, config)
if err != nil {
return nil, errors.Trace(err)
}
p, err := newSaramaSyncProducerFromClientImpl(client)
=======
p, err := sarama.NewSyncProducer(f.option.BrokerEndpoints, config)
>>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved merge conflict markers found. To resolve this correctly and avoid leaking the Kafka client, you should choose the incoming branch version (sarama.NewSyncProducer). Additionally, please make sure to remove the closeSaramaClientOnFailure call on line 122 (which is currently outside the diff), as the client variable will no longer be defined.

	p, err := sarama.NewSyncProducer(f.option.BrokerEndpoints, config)

Comment on lines 27 to +28
"github.com/pingcap/tiflow/pkg/errors"
cerror "github.com/pingcap/tiflow/pkg/errors"

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Duplicate import of the same package github.com/pingcap/tiflow/pkg/errors. Please remove the redundant cerror alias and use errors consistently.

Suggested change
"github.com/pingcap/tiflow/pkg/errors"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/errors"

Comment on lines +281 to +287
err := s.w.WriteMessages(ctx, kafka.Message{
Topic: topic,
Partition: int(partitionNum),
Key: key,
Value: value,
})
return cerror.WrapError(cerror.ErrKafkaSendMessage, err)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Use the errors package directly instead of the redundant cerror alias.

Suggested change
err := s.w.WriteMessages(ctx, kafka.Message{
Topic: topic,
Partition: int(partitionNum),
Key: key,
Value: value,
})
return cerror.WrapError(cerror.ErrKafkaSendMessage, err)
err := s.w.WriteMessages(ctx, kafka.Message{
Topic: topic,
Partition: int(partitionNum),
Key: key,
Value: value,
})
return errors.WrapError(errors.ErrKafkaSendMessage, err)

Comment on lines +308 to +309
err := s.w.WriteMessages(ctx, msgs...)
return cerror.WrapError(cerror.ErrKafkaSendMessage, err)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Use the errors package directly instead of the redundant cerror alias.

Suggested change
err := s.w.WriteMessages(ctx, msgs...)
return cerror.WrapError(cerror.ErrKafkaSendMessage, err)
err := s.w.WriteMessages(ctx, msgs...)
return errors.WrapError(errors.ErrKafkaSendMessage, err)

@ti-chi-bot

ti-chi-bot Bot commented Jun 5, 2026

Copy link
Copy Markdown
Contributor

@ti-chi-bot: The following tests failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-verify 8034e98 link true /test pull-verify
pull-cdc-integration-pulsar-test 8034e98 link true /test pull-cdc-integration-pulsar-test
pull-cdc-integration-storage-test 8034e98 link true /test pull-cdc-integration-storage-test
pull-cdc-integration-mysql-test 8034e98 link true /test pull-cdc-integration-mysql-test
pull-cdc-integration-kafka-test 8034e98 link true /test pull-cdc-integration-kafka-test
pull-dm-integration-test 8034e98 link true /test pull-dm-integration-test

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/cherry-pick-not-approved do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. lgtm release-note-none Denotes a PR that doesn't merit a release note. size/L Denotes a PR that changes 100-499 lines, ignoring generated files. type/cherry-pick-for-release-8.1 This PR is cherry-picked to release-8.1 from a source PR.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants