Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion internal/pkg/pipeline/task/kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ There are two read modes, controlled by whether `group_id` is set:
| `max_records` | int | `0` (unlimited) | Read-mode cap on records forwarded downstream. The reader stops cleanly once this many messages have been sent. In group mode, offsets up to the last forwarded record are committed on shutdown. Must be `>= 0`; negative values are rejected at validation. |
| `group_id` | string | - | Consumer group id. If omitted, standalone mode is used (reads from beginning, no offset commits). |
| `auto_offset_reset` | string | `latest` | Group-mode reset policy when no committed offset exists or the stored offset is out of range. `latest` skips to the tail; `earliest` reads from the beginning of the available log. Ignored in standalone mode. |
| `client_rack` | string | - | Read-mode rack id, set as librdkafka `client.rack`. Lets the consumer use Kafka's rack-aware features to cut cross-AZ data transfers. Applies to standalone and group consumer modes; ignored for producers. |
| `server_auth_type` | string | `none` | `none` or `tls` — server certificate verification mode |
| `cert` | string | - | CA certificate PEM/CRT content used when `server_auth_type: tls` (alternatively use `cert_path`) |
| `cert_path` | string | - | Path to CA certificate (PEM/CRT) |
Expand Down Expand Up @@ -110,6 +111,18 @@ tasks:
timeout: 25s
```

### Rack-aware consumer
```yaml
tasks:
- name: read_in_zone
type: kafka
bootstrap_server: kafka.local:9092
topic: input-topic
group_id: my-consumer-group
client_rack: us-west-2a # match the broker.rack of the broker closest to the consumer's location
timeout: 25s
```

### Using SCRAM (SCRAM-SHA-512)
```yaml
tasks:
Expand Down Expand Up @@ -227,4 +240,4 @@ tasks:
--bootstrap-server <host>:<port>
```

**Thanks to the [Confluent Kafka Go client](https://github.com/confluentinc/confluent-kafka-go) for the Kafka client implementation.**
**Thanks to the [Confluent Kafka Go client](https://github.com/confluentinc/confluent-kafka-go) for the Kafka client implementation.**
9 changes: 9 additions & 0 deletions internal/pkg/pipeline/task/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type kafka struct {
Timeout duration.Duration `yaml:"timeout,omitempty" json:"timeout,omitempty"` // connection, read, write, commit timeout
BatchFlushInterval duration.Duration `yaml:"batch_flush_interval,omitempty" json:"batch_flush_interval,omitempty"` // interval to flush incomplete batches
GroupID string `yaml:"group_id,omitempty" json:"group_id,omitempty"` // the consumer group id (optional)
ClientRack string `yaml:"client_rack,omitempty" json:"client_rack,omitempty"` // rack id for enabling rack-aware features
AutoOffsetReset string `yaml:"auto_offset_reset,omitempty" json:"auto_offset_reset,omitempty" validate:"omitempty,oneof=earliest latest"` // group-mode reset policy when stored offset is out of range; "earliest" (default) or "latest"
BatchSize int `yaml:"batch_size,omitempty" json:"batch_size,omitempty"` // max messages per producer batch (maps to batch.num.messages); defaults to 100
MaxRecords int `yaml:"max_records,omitempty" json:"max_records,omitempty" validate:"omitempty,gte=0"` // stop reading after this many records (0 = unlimited); negative values are rejected at validation
Expand Down Expand Up @@ -406,6 +407,10 @@ func (k *kafka) buildConsumerConfig() (*ckafka.ConfigMap, error) {
_ = cfg.SetKey("isolation.level", "read_committed")
_ = cfg.SetKey("group.id", k.GroupID)

if k.ClientRack != "" {
_ = cfg.SetKey("client.rack", k.ClientRack)
}

return cfg, nil
}

Expand All @@ -421,6 +426,10 @@ func (k *kafka) buildStandaloneConsumerConfig() (*ckafka.ConfigMap, error) {
_ = cfg.SetKey("auto.offset.reset", "earliest")
_ = cfg.SetKey("isolation.level", "read_committed")

if k.ClientRack != "" {
_ = cfg.SetKey("client.rack", k.ClientRack)
}

return cfg, nil
}

Expand Down
Loading