kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112)#12679
Conversation
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
|
This cherry pick PR is for a release branch and has not yet been approved by triage owners. To merge this cherry pick:
DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
@3AceShowHand This PR has conflicts, I have hold it. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
@ti-chi-bot: ## If you want to know how to resolve it, please read the guide in TiDB Dev Guide. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
There was a problem hiding this comment.
Code Review
This pull request refactors the DDL sink to close the underlying sink upon failure and simplifies the Kafka DDL sink implementation. However, the changes introduce several unresolved merge conflict markers across multiple files, including kafka_ddl_producer.go, kafka_ddl_sink.go, pulsar_ddl_sink.go, factory.go, and sarama_factory.go, which must be resolved to prevent compilation failures. Additionally, a duplicate import alias cerror was introduced in pkg/sink/kafka/v2/factory.go and should be removed in favor of using the existing errors package directly.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| <<<<<<< HEAD | ||
| err := k.syncProducer.SendMessages(ctx, topic, | ||
| totalPartitionsNum, message.Key, message.Value) | ||
| return cerror.WrapError(cerror.ErrKafkaSendMessage, err) | ||
| ======= | ||
| return k.syncProducer.SendMessages(ctx, topic, totalPartitionsNum, message) | ||
| >>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112)) |
There was a problem hiding this comment.
Unresolved merge conflict markers found. Since the SyncProducer interface in pkg/sink/kafka/factory.go expects key []byte, value []byte for SendMessages, choosing the incoming branch version (message *common.Message) will break the interface implementation. Please resolve the conflict by keeping the HEAD version.
err := k.syncProducer.SendMessages(ctx, topic,
totalPartitionsNum, message.Key, message.Value)
return cerror.WrapError(cerror.ErrKafkaSendMessage, err)| <<<<<<< HEAD | ||
| err := k.syncProducer.SendMessage(ctx, topic, | ||
| partitionNum, message.Key, message.Value) | ||
| return cerror.WrapError(cerror.ErrKafkaSendMessage, err) | ||
| ======= | ||
| return k.syncProducer.SendMessage(ctx, topic, partitionNum, message) | ||
| >>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112)) |
There was a problem hiding this comment.
Unresolved merge conflict markers found. Since SyncProducer.SendMessage expects key []byte, value []byte, please resolve the conflict by keeping the HEAD version.
err := k.syncProducer.SendMessage(ctx, topic,
partitionNum, message.Key, message.Value)
return cerror.WrapError(cerror.ErrKafkaSendMessage, err)| <<<<<<< HEAD | ||
| s := newDDLSink(ctx, changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder, protocol) | ||
| ======= | ||
| s := newDDLSink(changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder.Build(), protocol) | ||
| >>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112)) |
There was a problem hiding this comment.
Unresolved merge conflict markers found. Based on the signature of newDDLSink in mq_ddl_sink.go, it requires ctx as the first argument and codec.RowEventEncoder (which is obtained via encoderBuilder.Build()). Please resolve the conflict accordingly.
s := newDDLSink(ctx, changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder.Build(), protocol)| <<<<<<< HEAD | ||
| s := newDDLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, encoderBuilder, protocol) | ||
| ======= | ||
| s := newDDLSink(changefeedID, p, nil, topicManager, eventRouter, encoderBuilder.Build(), protocol) | ||
| >>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112)) |
There was a problem hiding this comment.
Unresolved merge conflict markers found. Based on the signature of newDDLSink in mq_ddl_sink.go, it requires ctx as the first argument and codec.RowEventEncoder (which is obtained via encoderBuilder.Build()). Please resolve the conflict accordingly.
s := newDDLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, encoderBuilder.Build(), protocol)| <<<<<<< HEAD | ||
| func (p *saramaSyncProducer) SendMessages(ctx context.Context, | ||
| topic string, partitionNum int32, | ||
| key []byte, value []byte, | ||
| ) error { | ||
| ======= | ||
| func (p *saramaSyncProducer) SendMessages(_ context.Context, topic string, partitionNum int32, message *common.Message) error { | ||
| >>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112)) |
There was a problem hiding this comment.
Unresolved merge conflict markers found. Since the SyncProducer interface expects key []byte, value []byte for SendMessages, please keep the HEAD version of the signature to avoid compilation errors.
func (p *saramaSyncProducer) SendMessages(ctx context.Context,
topic string, partitionNum int32,
key []byte, value []byte,
) error {| <<<<<<< HEAD | ||
|
|
||
| client, err := newSaramaClientImpl(f.option.BrokerEndpoints, config) | ||
| if err != nil { | ||
| return nil, errors.Trace(err) | ||
| } | ||
| p, err := newSaramaSyncProducerFromClientImpl(client) | ||
| ======= | ||
| p, err := sarama.NewSyncProducer(f.option.BrokerEndpoints, config) | ||
| >>>>>>> 4c631d5951 (kafka(ticdc): ddl sink close the underline sink if send ddl or checkpoint failed and refactor the kafka ddl sink (#12112)) |
There was a problem hiding this comment.
Unresolved merge conflict markers found. To resolve this correctly and avoid leaking the Kafka client, you should choose the incoming branch version (sarama.NewSyncProducer). Additionally, please make sure to remove the closeSaramaClientOnFailure call on line 122 (which is currently outside the diff), as the client variable will no longer be defined.
p, err := sarama.NewSyncProducer(f.option.BrokerEndpoints, config)| "github.com/pingcap/tiflow/pkg/errors" | ||
| cerror "github.com/pingcap/tiflow/pkg/errors" |
There was a problem hiding this comment.
| err := s.w.WriteMessages(ctx, kafka.Message{ | ||
| Topic: topic, | ||
| Partition: int(partitionNum), | ||
| Key: key, | ||
| Value: value, | ||
| }) | ||
| return cerror.WrapError(cerror.ErrKafkaSendMessage, err) |
There was a problem hiding this comment.
Use the errors package directly instead of the redundant cerror alias.
| err := s.w.WriteMessages(ctx, kafka.Message{ | |
| Topic: topic, | |
| Partition: int(partitionNum), | |
| Key: key, | |
| Value: value, | |
| }) | |
| return cerror.WrapError(cerror.ErrKafkaSendMessage, err) | |
| err := s.w.WriteMessages(ctx, kafka.Message{ | |
| Topic: topic, | |
| Partition: int(partitionNum), | |
| Key: key, | |
| Value: value, | |
| }) | |
| return errors.WrapError(errors.ErrKafkaSendMessage, err) |
| err := s.w.WriteMessages(ctx, msgs...) | ||
| return cerror.WrapError(cerror.ErrKafkaSendMessage, err) |
There was a problem hiding this comment.
|
@ti-chi-bot: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
This is an automated cherry-pick of #12112
What problem does this PR solve?
Issue Number: close #12666, close #12096
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note