From 62d8a0de4c95f2aaf48bd557df87e246b98e13b8 Mon Sep 17 00:00:00 2001 From: Shivang Nagta Date: Fri, 12 Jun 2026 00:42:16 +0530 Subject: [PATCH 1/2] FEAT: Add client_rack field to kafka task --- internal/pkg/pipeline/task/kafka/README.md | 15 ++++++++++++++- internal/pkg/pipeline/task/kafka/kafka.go | 9 +++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/internal/pkg/pipeline/task/kafka/README.md b/internal/pkg/pipeline/task/kafka/README.md index 24ccf46..efe2fd5 100644 --- a/internal/pkg/pipeline/task/kafka/README.md +++ b/internal/pkg/pipeline/task/kafka/README.md @@ -34,6 +34,7 @@ There are two read modes, controlled by whether `group_id` is set: | `max_records` | int | `0` (unlimited) | Read-mode cap on records forwarded downstream. The reader stops cleanly once this many messages have been sent. In group mode, offsets up to the last forwarded record are committed on shutdown. Must be `>= 0`; negative values are rejected at validation. | | `group_id` | string | - | Consumer group id. If omitted, standalone mode is used (reads from beginning, no offset commits). | | `auto_offset_reset` | string | `latest` | Group-mode reset policy when no committed offset exists or the stored offset is out of range. `latest` skips to the tail; `earliest` reads from the beginning of the available log. Ignored in standalone mode. | +| `client_rack` | string | - | Read-mode rack id, set as librdkafka `client.rack`. Lets the consumer use Kafka's rack-aware features to cut cross-AZ data transfers. Applies to standalone and group consumer modes; ignored for producers. | | `server_auth_type` | string | `none` | `none` or `tls` — server certificate verification mode | | `cert` | string | - | CA certificate PEM/CRT content used when `server_auth_type: tls` (alternatively use `cert_path`) | | `cert_path` | string | - | Path to CA certificate (PEM/CRT) | @@ -110,6 +111,18 @@ tasks: timeout: 25s ``` +### Rack-aware consumer +```yaml +tasks: + - name: read_in_zone + type: kafka + bootstrap_server: kafka.local:9092 + topic: input-topic + group_id: my-consumer-group + client_rack: us-west-2a # match the broker.rack of broker closest to consumer's location + timeout: 25s +``` + ### Using SCRAM (SCRAM-SHA-512) ```yaml tasks: @@ -227,4 +240,4 @@ tasks: --bootstrap-server : ``` -**Thanks to the [Confluent Kafka Go client](https://github.com/confluentinc/confluent-kafka-go) for the Kafka client implementation.** \ No newline at end of file +**Thanks to the [Confluent Kafka Go client](https://github.com/confluentinc/confluent-kafka-go) for the Kafka client implementation.** diff --git a/internal/pkg/pipeline/task/kafka/kafka.go b/internal/pkg/pipeline/task/kafka/kafka.go index bd7d0c2..a5418d6 100644 --- a/internal/pkg/pipeline/task/kafka/kafka.go +++ b/internal/pkg/pipeline/task/kafka/kafka.go @@ -46,6 +46,7 @@ type kafka struct { Timeout duration.Duration `yaml:"timeout,omitempty" json:"timeout,omitempty"` // connection, read, write, commit timeout BatchFlushInterval duration.Duration `yaml:"batch_flush_interval,omitempty" json:"batch_flush_interval,omitempty"` // interval to flush incomplete batches GroupID string `yaml:"group_id,omitempty" json:"group_id,omitempty"` // the consumer group id (optional) + ClientRack string `yaml:"client_rack,omitempty" json:"client_rack,omitempty"` // rack id for enabling rack-aware features AutoOffsetReset string `yaml:"auto_offset_reset,omitempty" json:"auto_offset_reset,omitempty" validate:"omitempty,oneof=earliest latest"` // group-mode reset policy when stored offset is out of range; "earliest" (default) or "latest" BatchSize int `yaml:"batch_size,omitempty" json:"batch_size,omitempty"` // max messages per producer batch (maps to batch.num.messages); defaults to 100 MaxRecords int `yaml:"max_records,omitempty" json:"max_records,omitempty" validate:"omitempty,gte=0"` // stop reading after this many records (0 = unlimited); negative values are rejected at validation @@ -406,6 +407,10 @@ func (k *kafka) buildConsumerConfig() (*ckafka.ConfigMap, error) { _ = cfg.SetKey("isolation.level", "read_committed") _ = cfg.SetKey("group.id", k.GroupID) + if k.ClientRack != "" { + _ = cfg.SetKey("client.rack", k.ClientRack) + } + return cfg, nil } @@ -421,6 +426,10 @@ func (k *kafka) buildStandaloneConsumerConfig() (*ckafka.ConfigMap, error) { _ = cfg.SetKey("auto.offset.reset", "earliest") _ = cfg.SetKey("isolation.level", "read_committed") + if k.ClientRack != "" { + _ = cfg.SetKey("client.rack", k.ClientRack) + } + return cfg, nil } From 35fc3e8c82df40f53f4a113a5b2be804ddec92fd Mon Sep 17 00:00:00 2001 From: Shivang Nagta <141389245+ShivangNagta@users.noreply.github.com> Date: Fri, 12 Jun 2026 01:14:52 +0530 Subject: [PATCH 2/2] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- internal/pkg/pipeline/task/kafka/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/pipeline/task/kafka/README.md b/internal/pkg/pipeline/task/kafka/README.md index efe2fd5..dc50ba1 100644 --- a/internal/pkg/pipeline/task/kafka/README.md +++ b/internal/pkg/pipeline/task/kafka/README.md @@ -119,7 +119,7 @@ tasks: bootstrap_server: kafka.local:9092 topic: input-topic group_id: my-consumer-group - client_rack: us-west-2a # match the broker.rack of broker closest to consumer's location + client_rack: us-west-2a # match the broker.rack of the broker closest to the consumer's location timeout: 25s ```