Skip to content

kafka: bump sarama version and enable the retry to fix the broken pipe and out of order#12618

Merged
ti-chi-bot[bot] merged 16 commits into
pingcap:masterfrom
3AceShowHand:sarama-broken-pipe
May 29, 2026
Merged

kafka: bump sarama version and enable the retry to fix the broken pipe and out of order#12618
ti-chi-bot[bot] merged 16 commits into
pingcap:masterfrom
3AceShowHand:sarama-broken-pipe

Conversation

@3AceShowHand

@3AceShowHand 3AceShowHand commented Apr 22, 2026

Copy link
Copy Markdown
Contributor

What problem does this PR solve?

TiCDC disabled Kafka producer retries to avoid the Sarama out-of-order retry issue tracked by #11935. With retries disabled, transient Kafka send errors such as broken pipe fail the sink immediately, create noisy sink-error metrics and alerts, and can trigger costly sink rebuilds.

Issue Number: close #12655, ref #11935

What is changed and how it works?

  • Update the Sarama replacement to github.com/pingcap/sarama v1.41.2-pingcap-20260508, which includes the producer retry ordering fix.
  • Re-enable bounded Kafka producer retry by setting Sarama Producer.Retry.Max to 5 by default while keeping Net.MaxOpenRequests = 1 as an additional ordering guard.
  • Add Kafka sink URI parameter max-retry to configure Sarama Producer.Retry.Max; max-retry=0 disables producer retry and negative values are ignored.
  • Remove the Kafka heartbeat keep-alive workaround and related interfaces/tests now that bounded producer retry handles transient stale-connection failures.
  • Simplify Kafka producer/admin factory code after removing the heartbeat path.

Check List

Tests

  • Unit test: go test ./pkg/sink/kafka

Questions

Will it cause performance regression or break compatibility?

No breaking compatibility is expected. This intentionally changes Kafka sink behavior by enabling bounded producer retries for transient send failures. Users can set max-retry=0 in the sink URI to disable producer retry if needed.

Do you need to update user documentation, design documentation or monitoring documentation?

User documentation should mention the new Kafka sink URI parameter max-retry. No design or monitoring documentation update is required.

Release note

Kafka sink now retries transient producer send failures by default.

@ti-chi-bot ti-chi-bot Bot added do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. labels Apr 22, 2026

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request removes the manual heartbeat and connection keep-alive logic across Kafka sinks and managers, including the removal of the HeartbeatBrokers method and KeepConnAliveInterval configuration. It also updates the sarama dependency and increases the producer's maximum retry count to 5. A review comment suggests updating the documentation in pkg/sink/kafka/sarama.go to reflect whether the previously mentioned ordering issues with retries have been resolved in the new version.

Comment thread pkg/sink/kafka/sarama.go Outdated
@3AceShowHand

Copy link
Copy Markdown
Contributor Author

/retest

2 similar comments
@3AceShowHand

Copy link
Copy Markdown
Contributor Author

/retest

@3AceShowHand

Copy link
Copy Markdown
Contributor Author

/retest

@3AceShowHand

Copy link
Copy Markdown
Contributor Author

/test pull-cdc-integration-storage-test

1 similar comment
@3AceShowHand

Copy link
Copy Markdown
Contributor Author

/test pull-cdc-integration-storage-test

@ti-chi-bot ti-chi-bot Bot added needs-1-more-lgtm Indicates a PR needs 1 more LGTM. approved labels May 28, 2026
@ti-chi-bot ti-chi-bot Bot removed the needs-1-more-lgtm Indicates a PR needs 1 more LGTM. label May 28, 2026
@ti-chi-bot

ti-chi-bot Bot commented May 28, 2026

Copy link
Copy Markdown
Contributor

[LGTM Timeline notifier]

Timeline:

  • 2026-05-28 07:01:28.991630588 +0000 UTC m=+508358.961795646: ☑️ agreed by wk989898.
  • 2026-05-28 13:14:56.566768747 +0000 UTC m=+530766.536933785: ☑️ agreed by hongyunyan.

@3AceShowHand

Copy link
Copy Markdown
Contributor Author

/retest

@3AceShowHand 3AceShowHand added needs-cherry-pick-release-7.5 Should cherry pick this PR to release-7.5 branch. needs-cherry-pick-release-8.5 Should cherry pick this PR to release-8.5 branch. and removed needs-cherry-pick-release-7.1 Should cherry pick this PR to release-7.1 branch. needs-cherry-pick-release-7.5 Should cherry pick this PR to release-7.5 branch. needs-cherry-pick-release-8.1 Should cherry pick this PR to release-8.1 branch. labels May 29, 2026
@ti-chi-bot ti-chi-bot Bot added needs-cherry-pick-release-8.1 Should cherry pick this PR to release-8.1 branch. needs-cherry-pick-release-7.1 Should cherry pick this PR to release-7.1 branch. labels May 29, 2026
@3AceShowHand

Copy link
Copy Markdown
Contributor Author

/retest

1 similar comment
@3AceShowHand

Copy link
Copy Markdown
Contributor Author

/retest

@wk989898

Copy link
Copy Markdown
Collaborator

Manual Test:

  1. Start a kafka server
  2. Hack the sarama code like https://github.com/wk989898/sarama/tree/hack
  3. Add more logs for the producer
// Add HandleKey field for message
HandleKey    *string
  1. Send message with follower code
package main

import (
    "context"
    "fmt"
    "math/rand"
    "sync/atomic"
    "time"

    "github.com/IBM/sarama"
    "github.com/pingcap/log"
    "github.com/pingcap/tiflow/cdc/model"
    "github.com/pingcap/tiflow/pkg/config"
    "github.com/pingcap/tiflow/pkg/sink/codec/common"
    "github.com/pingcap/tiflow/pkg/sink/kafka"
    "go.uber.org/zap"
    "go.uber.org/zap/zapcore"
)

var (
    PartitionNum      int32 = 3
    ReplicationFactor int16 = 1
    partition         int32 = 0
    schema                  = ""
    table                   = ""
)

func initlog() {
    logger, err := zap.NewStdLogAt(log.L().With(zap.String("component", "sarama")), zapcore.InfoLevel)
    if err != nil {
        fmt.Println("err", err)
    }
    sarama.Logger = logger
}
func main() {
    initlog()
    topic := fmt.Sprintf("test-%d", rand.Int())
    ctx := context.Background()
    o := kafka.NewOptions()
    o.BrokerEndpoints = []string{"127.0.0.1:9092"}
    o.ClientID = "sarama-test"

    f, err := kafka.NewSaramaFactory(o, model.DefaultChangeFeedID("sarama-test"))
    if err != nil {
        fmt.Println("err", err)
    }
    admin, err := f.AdminClient(ctx)
    if err != nil {
        fmt.Println("err", err)
    }
    err = admin.CreateTopic(ctx, &kafka.TopicDetail{Name: topic,
        NumPartitions:     PartitionNum,
        ReplicationFactor: ReplicationFactor,
    }, false)
    if err != nil {
        fmt.Println("err", err)
    }
    async, err := f.AsyncProducer(ctx, make(chan error, 1))
    if err != nil {
        fmt.Println("err", err)
    }
    go func() {
        err := async.AsyncRunCallback(ctx)
        if err != nil {
            fmt.Println("err", err)
        }
    }()
    var count int32
    for i := 0; i < 200; i++ { 
        val := fmt.Sprintf("val: %d", i)
        msg := common.NewMsg(config.ProtocolCanalJSON, nil, []byte(val), 0, model.MessageTypeRow, &schema, &table)
        msg.Callback = func() {
            atomic.AddInt32(&count, 1)
        }
        msg.HandleKey = []string{val}
        err := async.AsyncSend(ctx, topic, partition, msg)
        if err != nil {
            fmt.Println("AsyncSend err", err)
        }
        time.Sleep(time.Microsecond * 1000) 
    }
    for atomic.LoadInt32(&count) < 200 {

    }
    fmt.Println("topic", topic)
}
  1. Consume the message from kafka server
kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test-xxx
  1. Check the out-of-order from the logs

@3AceShowHand 3AceShowHand removed needs-cherry-pick-release-7.1 Should cherry pick this PR to release-7.1 branch. needs-cherry-pick-release-8.1 Should cherry pick this PR to release-8.1 branch. labels May 29, 2026
@ti-chi-bot ti-chi-bot Bot added needs-cherry-pick-release-8.1 Should cherry pick this PR to release-8.1 branch. needs-cherry-pick-release-7.1 Should cherry pick this PR to release-7.1 branch. labels May 29, 2026
@3AceShowHand

Copy link
Copy Markdown
Contributor Author

/retest

2 similar comments
@joechenrh

Copy link
Copy Markdown
Contributor

/retest

@wuhuizuo

Copy link
Copy Markdown
Contributor

/retest

@ti-chi-bot ti-chi-bot Bot merged commit 031ef7d into pingcap:master May 29, 2026
34 of 35 checks passed
@ti-chi-bot

Copy link
Copy Markdown
Member

In response to a cherrypick label: new pull request created to branch release-7.1: #12659.
But this PR has conflicts, please resolve them!

@ti-chi-bot

Copy link
Copy Markdown
Member

In response to a cherrypick label: new pull request created to branch release-8.1: #12660.
But this PR has conflicts, please resolve them!

@ti-chi-bot

Copy link
Copy Markdown
Member

In response to a cherrypick label: new pull request created to branch release-8.5: #12661.
But this PR has conflicts, please resolve them!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

approved lgtm needs-cherry-pick-release-7.1 Should cherry pick this PR to release-7.1 branch. needs-cherry-pick-release-8.1 Should cherry pick this PR to release-8.1 branch. needs-cherry-pick-release-8.5 Should cherry pick this PR to release-8.5 branch. release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

kafka: Consider reintroducing retry (or at least make it configurable)

7 participants