diff --git a/internal/pkg/pipeline/task/kafka/README.md b/internal/pkg/pipeline/task/kafka/README.md index 24ccf46..dc50ba1 100644 --- a/internal/pkg/pipeline/task/kafka/README.md +++ b/internal/pkg/pipeline/task/kafka/README.md @@ -34,6 +34,7 @@ There are two read modes, controlled by whether `group_id` is set: | `max_records` | int | `0` (unlimited) | Read-mode cap on records forwarded downstream. The reader stops cleanly once this many messages have been sent. In group mode, offsets up to the last forwarded record are committed on shutdown. Must be `>= 0`; negative values are rejected at validation. | | `group_id` | string | - | Consumer group id. If omitted, standalone mode is used (reads from beginning, no offset commits). | | `auto_offset_reset` | string | `latest` | Group-mode reset policy when no committed offset exists or the stored offset is out of range. `latest` skips to the tail; `earliest` reads from the beginning of the available log. Ignored in standalone mode. | +| `client_rack` | string | - | Read-mode rack id, set as librdkafka `client.rack`. Lets the consumer use Kafka's rack-aware features to cut cross-AZ data transfers. Applies to standalone and group consumer modes; ignored for producers. | | `server_auth_type` | string | `none` | `none` or `tls` — server certificate verification mode | | `cert` | string | - | CA certificate PEM/CRT content used when `server_auth_type: tls` (alternatively use `cert_path`) | | `cert_path` | string | - | Path to CA certificate (PEM/CRT) | @@ -110,6 +111,18 @@ tasks: timeout: 25s ``` +### Rack-aware consumer +```yaml +tasks: + - name: read_in_zone + type: kafka + bootstrap_server: kafka.local:9092 + topic: input-topic + group_id: my-consumer-group + client_rack: us-west-2a # match the broker.rack of the broker closest to the consumer's location + timeout: 25s +``` + ### Using SCRAM (SCRAM-SHA-512) ```yaml tasks: @@ -227,4 +240,4 @@ tasks: --bootstrap-server : ``` -**Thanks to the [Confluent Kafka Go client](https://github.com/confluentinc/confluent-kafka-go) for the Kafka client implementation.** \ No newline at end of file +**Thanks to the [Confluent Kafka Go client](https://github.com/confluentinc/confluent-kafka-go) for the Kafka client implementation.** diff --git a/internal/pkg/pipeline/task/kafka/kafka.go b/internal/pkg/pipeline/task/kafka/kafka.go index bd7d0c2..a5418d6 100644 --- a/internal/pkg/pipeline/task/kafka/kafka.go +++ b/internal/pkg/pipeline/task/kafka/kafka.go @@ -46,6 +46,7 @@ type kafka struct { Timeout duration.Duration `yaml:"timeout,omitempty" json:"timeout,omitempty"` // connection, read, write, commit timeout BatchFlushInterval duration.Duration `yaml:"batch_flush_interval,omitempty" json:"batch_flush_interval,omitempty"` // interval to flush incomplete batches GroupID string `yaml:"group_id,omitempty" json:"group_id,omitempty"` // the consumer group id (optional) + ClientRack string `yaml:"client_rack,omitempty" json:"client_rack,omitempty"` // rack id for enabling rack-aware features AutoOffsetReset string `yaml:"auto_offset_reset,omitempty" json:"auto_offset_reset,omitempty" validate:"omitempty,oneof=earliest latest"` // group-mode reset policy when stored offset is out of range; "earliest" (default) or "latest" BatchSize int `yaml:"batch_size,omitempty" json:"batch_size,omitempty"` // max messages per producer batch (maps to batch.num.messages); defaults to 100 MaxRecords int `yaml:"max_records,omitempty" json:"max_records,omitempty" validate:"omitempty,gte=0"` // stop reading after this many records (0 = unlimited); negative values are rejected at validation @@ -406,6 +407,10 @@ func (k *kafka) buildConsumerConfig() (*ckafka.ConfigMap, error) { _ = cfg.SetKey("isolation.level", "read_committed") _ = cfg.SetKey("group.id", k.GroupID) + if k.ClientRack != "" { + _ = cfg.SetKey("client.rack", k.ClientRack) + } + return cfg, nil } @@ -421,6 +426,10 @@ func (k *kafka) buildStandaloneConsumerConfig() (*ckafka.ConfigMap, error) { _ = cfg.SetKey("auto.offset.reset", "earliest") _ = cfg.SetKey("isolation.level", "read_committed") + if k.ClientRack != "" { + _ = cfg.SetKey("client.rack", k.ClientRack) + } + return cfg, nil }