From addf4009de61afb68de809ce4a1171fbe517b905 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mirko=20K=C3=A4mpf?= Date: Sun, 19 Apr 2026 11:23:42 +0200 Subject: [PATCH 1/7] feat(deploy): add kafscale-broker-standalone Helm chart (smoke testing) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Minimal chart that deploys a single kafscale-broker Pod pointing at external etcd + S3 (MinIO or cloud). Intended for quick smoke tests and blueprint convergence on KIND — NOT a replacement for the full operator-based chart at deploy/helm/kafscale/. Used by scalytics-all-in-one bp-001 Ops Foundation smoke suite: COMP-kafscale-01 (pod Ready) + COMP-kafscale-02 (Kafka TCP reachable). Co-Authored-By: Claude Opus 4.6 (1M context) --- .../kafscale-broker-standalone/Chart.yaml | 6 +++ .../templates/broker.yaml | 52 +++++++++++++++++++ .../kafscale-broker-standalone/values.yaml | 17 ++++++ 3 files changed, 75 insertions(+) create mode 100644 deploy/helm/kafscale-broker-standalone/Chart.yaml create mode 100644 deploy/helm/kafscale-broker-standalone/templates/broker.yaml create mode 100644 deploy/helm/kafscale-broker-standalone/values.yaml diff --git a/deploy/helm/kafscale-broker-standalone/Chart.yaml b/deploy/helm/kafscale-broker-standalone/Chart.yaml new file mode 100644 index 0000000..b5dc1b5 --- /dev/null +++ b/deploy/helm/kafscale-broker-standalone/Chart.yaml @@ -0,0 +1,6 @@ +apiVersion: v2 +name: kafscale-broker-standalone +description: Minimal standalone KafScale broker (etcd + MinIO external). Smoke chart only. +type: application +version: 0.1.0 +appVersion: "v1.5.0" diff --git a/deploy/helm/kafscale-broker-standalone/templates/broker.yaml b/deploy/helm/kafscale-broker-standalone/templates/broker.yaml new file mode 100644 index 0000000..9aa0f5e --- /dev/null +++ b/deploy/helm/kafscale-broker-standalone/templates/broker.yaml @@ -0,0 +1,52 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: kafscale-broker + namespace: {{ .Release.Namespace }} + labels: + app.kubernetes.io/name: kafscale-broker + app.kubernetes.io/part-of: ops-foundation +spec: + replicas: 1 + selector: { matchLabels: { app.kubernetes.io/name: kafscale-broker } } + template: + metadata: + labels: + app.kubernetes.io/name: kafscale-broker + app.kubernetes.io/part-of: ops-foundation + spec: + enableServiceLinks: false + containers: + - name: broker + image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" + imagePullPolicy: {{ .Values.image.pullPolicy }} + env: + - { name: KAFSCALE_BROKER_ID, value: "0" } + - { name: KAFSCALE_BROKER_ADDR, value: ":9092" } + - { name: KAFSCALE_BROKER_HOST, value: "kafscale-broker" } + - { name: KAFSCALE_BROKER_PORT, value: "9092" } + - { name: KAFSCALE_BROKER_ETCD_ENDPOINTS, value: "{{ .Values.etcdEndpoints }}" } + - { name: KAFSCALE_BROKER_DATA_DIR, value: "/tmp/data" } + - { name: KAFSCALE_BROKER_LOG_LEVEL, value: "info" } + - { name: KAFSCALE_S3_BUCKET, value: "{{ .Values.s3Bucket }}" } + - { name: KAFSCALE_S3_REGION, value: "us-east-1" } + - { name: KAFSCALE_S3_ENDPOINT, value: "{{ .Values.s3Endpoint }}" } + - { name: KAFSCALE_S3_ACCESS_KEY, value: "{{ .Values.s3AccessKey }}" } + - { name: KAFSCALE_S3_SECRET_KEY, value: "{{ .Values.s3SecretKey }}" } + - { name: KAFSCALE_S3_PATH_STYLE, value: "true" } + ports: + - { name: kafka, containerPort: 9092 } +--- +apiVersion: v1 +kind: Service +metadata: + name: kafscale-broker + namespace: {{ .Release.Namespace }} + labels: + app.kubernetes.io/name: kafscale-broker +spec: + type: {{ .Values.service.type }} + ports: + - { name: kafka, port: {{ .Values.service.port }}, targetPort: 9092 } + selector: + app.kubernetes.io/name: kafscale-broker diff --git a/deploy/helm/kafscale-broker-standalone/values.yaml b/deploy/helm/kafscale-broker-standalone/values.yaml new file mode 100644 index 0000000..635f164 --- /dev/null +++ b/deploy/helm/kafscale-broker-standalone/values.yaml @@ -0,0 +1,17 @@ +image: + repository: ghcr.io/kafscale/kafscale-broker + tag: dev + pullPolicy: IfNotPresent + +etcdEndpoints: "http://etcd:2379" +s3Endpoint: "http://minio:9000" +s3Bucket: "kafscale" +s3AccessKey: "scalytics" +s3SecretKey: "scalytics" + +service: + type: ClusterIP + port: 9092 + +labels: + app.kubernetes.io/part-of: ops-foundation From 914da96d0d198306b543d4ab3b37393927de0916 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mirko=20K=C3=A4mpf?= Date: Sun, 19 Apr 2026 14:19:05 +0200 Subject: [PATCH 2/7] fix(broker): advertise LIST_GROUPS versions 0-5 (was 5-5) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Resolves OPS-005 item #2. The Java admin client (kafka-consumer-groups, AdminClient.listConsumerGroups, Schema Registry) negotiates LIST_GROUPS in range [0,4]. Advertising a narrow 5-5 window caused: UnsupportedVersionException: Error listing groups ... The broker does not support LIST_GROUPS with version in range [0,4]. The supported range is [5,5]. The underlying h.coordinator.ListGroups handler is version-agnostic; the encoder handles v0 just as well as v5. The fix is one line — widen the advertised range. Verified: kafka-consumer-groups --bootstrap-server kafscale-broker:9092 --list now exits 0 cleanly (from UnsupportedVersionException pre-fix). The remaining OPS-005 items (INIT_PRODUCER_ID, transactional APIs, Schema Registry NPE on verifySchemaTopic) are substantive broker-engineering work and are not addressed here. Co-Authored-By: Claude Opus 4.6 (1M context) --- cmd/broker/main.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cmd/broker/main.go b/cmd/broker/main.go index c1ba108..070a0bb 100644 --- a/cmd/broker/main.go +++ b/cmd/broker/main.go @@ -2711,7 +2711,10 @@ func generateApiVersions() []kmsg.ApiVersionsResponseApiKey { {key: protocol.APIKeyOffsetCommit, minVersion: 3, maxVersion: 3}, {key: protocol.APIKeyOffsetFetch, minVersion: 5, maxVersion: 5}, {key: protocol.APIKeyDescribeGroups, minVersion: 5, maxVersion: 5}, - {key: protocol.APIKeyListGroups, minVersion: 5, maxVersion: 5}, + // OPS-005: Java admin-client negotiates LIST_GROUPS in range [0,4]. + // Narrow 5-5 range breaks `kafka-consumer-groups --list`. Widen to 0-5 + // — the underlying coordinator handler is version-agnostic. + {key: protocol.APIKeyListGroups, minVersion: 0, maxVersion: 5}, {key: protocol.APIKeyOffsetForLeaderEpoch, minVersion: 3, maxVersion: 3}, {key: protocol.APIKeyDescribeConfigs, minVersion: 4, maxVersion: 4}, {key: protocol.APIKeyAlterConfigs, minVersion: 1, maxVersion: 1}, From 4f5666afbf3a62e5306a93dab20d31d05eb0db2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mirko=20K=C3=A4mpf?= Date: Sun, 19 Apr 2026 15:11:42 +0200 Subject: [PATCH 3/7] feat(broker): INIT_PRODUCER_ID stub handler (OPS-005 #1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a minimum-viable implementation of the Kafka INIT_PRODUCER_ID API (API key 22). Allocates a monotonically-increasing producer ID with epoch 0; does not yet track sequence numbers or deduplicate on replay. Sufficient to unblock Java AdminClient default producers, franz-go idempotent producers, and Schema Registry's producer-init probe. Changes: - pkg/protocol/api.go: add APIKeyInitProducerID = 22 - cmd/broker/main.go: * handler gains nextProducerID int64 (atomic allocator) * dispatch case for *kmsg.InitProducerIDRequest returns pid + epoch=0 * apiVersions: InitProducerID moves from unsupported to {0, 4} * import sync/atomic Verified: - `kafka-console-producer --producer-property enable.idempotence=true` now succeeds (was: UnsupportedVersionException). - kaf-mirror (franz-go) replicates primary→standby end-to-end: PRIMARY offsets = STANDBY offsets, measured lag <1s. - SCEN-bp002-06_Replication scenario test flipped SKIP → PASS. Known limitations (production correctness gap, tracked in OPS-005): - no sequence-number tracking: duplicate-on-retry semantics not enforced - no epoch management: fencing of stale producers on rebalance not implemented - PID allocator is process-local, not persisted across broker restart Co-Authored-By: Claude Opus 4.6 (1M context) --- cmd/broker/main.go | 27 ++++++++++++++++++++++++++- pkg/protocol/api.go | 1 + 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/cmd/broker/main.go b/cmd/broker/main.go index 070a0bb..a58aaf5 100644 --- a/cmd/broker/main.go +++ b/cmd/broker/main.go @@ -28,6 +28,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "syscall" "time" @@ -60,6 +61,11 @@ const ( type handler struct { apiVersions []kmsg.ApiVersionsResponseApiKey + // nextProducerID is a monotonically-increasing allocator for + // InitProducerID responses (OPS-005 #1 stub). Accessed via sync/atomic. + // Replace with a proper persistent allocator when full idempotent-producer + // dedup lands. + nextProducerID int64 store metadata.Store s3 storage.S3Client cache *cache.SegmentCache @@ -206,6 +212,20 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re return protocol.EncodeResponse(header.CorrelationID, header.APIVersion, resp), nil case *kmsg.ProduceRequest: return h.handleProduce(ctx, header, req.(*kmsg.ProduceRequest)) + case *kmsg.InitProducerIDRequest: + // OPS-005 #1: minimum-viable INIT_PRODUCER_ID stub. Allocates a + // monotonically-increasing producer ID with epoch 0. Does NOT track + // sequence numbers or deduplicate (that work is in scope for a later + // broker-engineering sprint). This is sufficient to unblock idempotent + // producers for development + bp-002 BDR convergence. For production + // correctness, full dedup-on-replay is still required. + pid := atomic.AddInt64(&h.nextProducerID, 1) + resp := kmsg.NewPtrInitProducerIDResponse() + resp.ErrorCode = protocol.NONE + resp.ProducerID = pid + resp.ProducerEpoch = 0 + resp.ThrottleMillis = 0 + return protocol.EncodeResponse(header.CorrelationID, header.APIVersion, resp), nil case *kmsg.FetchRequest: return h.handleFetch(ctx, header, req.(*kmsg.FetchRequest)) case *kmsg.FindCoordinatorRequest: @@ -2715,6 +2735,11 @@ func generateApiVersions() []kmsg.ApiVersionsResponseApiKey { // Narrow 5-5 range breaks `kafka-consumer-groups --list`. Widen to 0-5 // — the underlying coordinator handler is version-agnostic. {key: protocol.APIKeyListGroups, minVersion: 0, maxVersion: 5}, + // OPS-005 #1: idempotent-producer init. Handler is a stub that + // allocates a monotonically-increasing producer ID; sequence-number + // dedup is NOT yet implemented. Sufficient to unblock franz-go and + // Java default producers; production correctness gap tracked. + {key: protocol.APIKeyInitProducerID, minVersion: 0, maxVersion: 4}, {key: protocol.APIKeyOffsetForLeaderEpoch, minVersion: 3, maxVersion: 3}, {key: protocol.APIKeyDescribeConfigs, minVersion: 4, maxVersion: 4}, {key: protocol.APIKeyAlterConfigs, minVersion: 1, maxVersion: 1}, @@ -2725,7 +2750,7 @@ func generateApiVersions() []kmsg.ApiVersionsResponseApiKey { } unsupported := []int16{ 4, 5, 6, 7, - 21, 22, + 21, // 22 (InitProducerID) moved to supported — OPS-005 #1 stub handler 24, 25, 26, } diff --git a/pkg/protocol/api.go b/pkg/protocol/api.go index b237578..f4d81af 100644 --- a/pkg/protocol/api.go +++ b/pkg/protocol/api.go @@ -33,6 +33,7 @@ const ( APIKeyApiVersion int16 = 18 APIKeyCreateTopics int16 = 19 APIKeyDeleteTopics int16 = 20 + APIKeyInitProducerID int16 = 22 // OPS-005: idempotent-producer init (stub handler) APIKeyOffsetForLeaderEpoch int16 = 23 APIKeyDescribeConfigs int16 = 32 APIKeyAlterConfigs int16 = 33 From 971106f672b1c988769a4b8927f32543eae777c5 Mon Sep 17 00:00:00 2001 From: 2pk03 Date: Tue, 2 Jun 2026 08:50:48 +0200 Subject: [PATCH 4/7] Revert "feat(broker): INIT_PRODUCER_ID stub handler (OPS-005 #1)" This reverts commit 4f5666afbf3a62e5306a93dab20d31d05eb0db2d. --- cmd/broker/main.go | 27 +-------------------------- pkg/protocol/api.go | 1 - 2 files changed, 1 insertion(+), 27 deletions(-) diff --git a/cmd/broker/main.go b/cmd/broker/main.go index a58aaf5..070a0bb 100644 --- a/cmd/broker/main.go +++ b/cmd/broker/main.go @@ -28,7 +28,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "syscall" "time" @@ -61,11 +60,6 @@ const ( type handler struct { apiVersions []kmsg.ApiVersionsResponseApiKey - // nextProducerID is a monotonically-increasing allocator for - // InitProducerID responses (OPS-005 #1 stub). Accessed via sync/atomic. - // Replace with a proper persistent allocator when full idempotent-producer - // dedup lands. - nextProducerID int64 store metadata.Store s3 storage.S3Client cache *cache.SegmentCache @@ -212,20 +206,6 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re return protocol.EncodeResponse(header.CorrelationID, header.APIVersion, resp), nil case *kmsg.ProduceRequest: return h.handleProduce(ctx, header, req.(*kmsg.ProduceRequest)) - case *kmsg.InitProducerIDRequest: - // OPS-005 #1: minimum-viable INIT_PRODUCER_ID stub. Allocates a - // monotonically-increasing producer ID with epoch 0. Does NOT track - // sequence numbers or deduplicate (that work is in scope for a later - // broker-engineering sprint). This is sufficient to unblock idempotent - // producers for development + bp-002 BDR convergence. For production - // correctness, full dedup-on-replay is still required. - pid := atomic.AddInt64(&h.nextProducerID, 1) - resp := kmsg.NewPtrInitProducerIDResponse() - resp.ErrorCode = protocol.NONE - resp.ProducerID = pid - resp.ProducerEpoch = 0 - resp.ThrottleMillis = 0 - return protocol.EncodeResponse(header.CorrelationID, header.APIVersion, resp), nil case *kmsg.FetchRequest: return h.handleFetch(ctx, header, req.(*kmsg.FetchRequest)) case *kmsg.FindCoordinatorRequest: @@ -2735,11 +2715,6 @@ func generateApiVersions() []kmsg.ApiVersionsResponseApiKey { // Narrow 5-5 range breaks `kafka-consumer-groups --list`. Widen to 0-5 // — the underlying coordinator handler is version-agnostic. {key: protocol.APIKeyListGroups, minVersion: 0, maxVersion: 5}, - // OPS-005 #1: idempotent-producer init. Handler is a stub that - // allocates a monotonically-increasing producer ID; sequence-number - // dedup is NOT yet implemented. Sufficient to unblock franz-go and - // Java default producers; production correctness gap tracked. - {key: protocol.APIKeyInitProducerID, minVersion: 0, maxVersion: 4}, {key: protocol.APIKeyOffsetForLeaderEpoch, minVersion: 3, maxVersion: 3}, {key: protocol.APIKeyDescribeConfigs, minVersion: 4, maxVersion: 4}, {key: protocol.APIKeyAlterConfigs, minVersion: 1, maxVersion: 1}, @@ -2750,7 +2725,7 @@ func generateApiVersions() []kmsg.ApiVersionsResponseApiKey { } unsupported := []int16{ 4, 5, 6, 7, - 21, // 22 (InitProducerID) moved to supported — OPS-005 #1 stub handler + 21, 22, 24, 25, 26, } diff --git a/pkg/protocol/api.go b/pkg/protocol/api.go index f4d81af..b237578 100644 --- a/pkg/protocol/api.go +++ b/pkg/protocol/api.go @@ -33,7 +33,6 @@ const ( APIKeyApiVersion int16 = 18 APIKeyCreateTopics int16 = 19 APIKeyDeleteTopics int16 = 20 - APIKeyInitProducerID int16 = 22 // OPS-005: idempotent-producer init (stub handler) APIKeyOffsetForLeaderEpoch int16 = 23 APIKeyDescribeConfigs int16 = 32 APIKeyAlterConfigs int16 = 33 From 61c5ce491ca4e35cc72d189ad54c73ea75a450b0 Mon Sep 17 00:00:00 2001 From: 2pk03 Date: Tue, 2 Jun 2026 08:50:50 +0200 Subject: [PATCH 5/7] Revert "feat(deploy): add kafscale-broker-standalone Helm chart (smoke testing)" This reverts commit addf4009de61afb68de809ce4a1171fbe517b905. --- .../kafscale-broker-standalone/Chart.yaml | 6 --- .../templates/broker.yaml | 52 ------------------- .../kafscale-broker-standalone/values.yaml | 17 ------ 3 files changed, 75 deletions(-) delete mode 100644 deploy/helm/kafscale-broker-standalone/Chart.yaml delete mode 100644 deploy/helm/kafscale-broker-standalone/templates/broker.yaml delete mode 100644 deploy/helm/kafscale-broker-standalone/values.yaml diff --git a/deploy/helm/kafscale-broker-standalone/Chart.yaml b/deploy/helm/kafscale-broker-standalone/Chart.yaml deleted file mode 100644 index b5dc1b5..0000000 --- a/deploy/helm/kafscale-broker-standalone/Chart.yaml +++ /dev/null @@ -1,6 +0,0 @@ -apiVersion: v2 -name: kafscale-broker-standalone -description: Minimal standalone KafScale broker (etcd + MinIO external). Smoke chart only. -type: application -version: 0.1.0 -appVersion: "v1.5.0" diff --git a/deploy/helm/kafscale-broker-standalone/templates/broker.yaml b/deploy/helm/kafscale-broker-standalone/templates/broker.yaml deleted file mode 100644 index 9aa0f5e..0000000 --- a/deploy/helm/kafscale-broker-standalone/templates/broker.yaml +++ /dev/null @@ -1,52 +0,0 @@ -apiVersion: apps/v1 -kind: Deployment -metadata: - name: kafscale-broker - namespace: {{ .Release.Namespace }} - labels: - app.kubernetes.io/name: kafscale-broker - app.kubernetes.io/part-of: ops-foundation -spec: - replicas: 1 - selector: { matchLabels: { app.kubernetes.io/name: kafscale-broker } } - template: - metadata: - labels: - app.kubernetes.io/name: kafscale-broker - app.kubernetes.io/part-of: ops-foundation - spec: - enableServiceLinks: false - containers: - - name: broker - image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" - imagePullPolicy: {{ .Values.image.pullPolicy }} - env: - - { name: KAFSCALE_BROKER_ID, value: "0" } - - { name: KAFSCALE_BROKER_ADDR, value: ":9092" } - - { name: KAFSCALE_BROKER_HOST, value: "kafscale-broker" } - - { name: KAFSCALE_BROKER_PORT, value: "9092" } - - { name: KAFSCALE_BROKER_ETCD_ENDPOINTS, value: "{{ .Values.etcdEndpoints }}" } - - { name: KAFSCALE_BROKER_DATA_DIR, value: "/tmp/data" } - - { name: KAFSCALE_BROKER_LOG_LEVEL, value: "info" } - - { name: KAFSCALE_S3_BUCKET, value: "{{ .Values.s3Bucket }}" } - - { name: KAFSCALE_S3_REGION, value: "us-east-1" } - - { name: KAFSCALE_S3_ENDPOINT, value: "{{ .Values.s3Endpoint }}" } - - { name: KAFSCALE_S3_ACCESS_KEY, value: "{{ .Values.s3AccessKey }}" } - - { name: KAFSCALE_S3_SECRET_KEY, value: "{{ .Values.s3SecretKey }}" } - - { name: KAFSCALE_S3_PATH_STYLE, value: "true" } - ports: - - { name: kafka, containerPort: 9092 } ---- -apiVersion: v1 -kind: Service -metadata: - name: kafscale-broker - namespace: {{ .Release.Namespace }} - labels: - app.kubernetes.io/name: kafscale-broker -spec: - type: {{ .Values.service.type }} - ports: - - { name: kafka, port: {{ .Values.service.port }}, targetPort: 9092 } - selector: - app.kubernetes.io/name: kafscale-broker diff --git a/deploy/helm/kafscale-broker-standalone/values.yaml b/deploy/helm/kafscale-broker-standalone/values.yaml deleted file mode 100644 index 635f164..0000000 --- a/deploy/helm/kafscale-broker-standalone/values.yaml +++ /dev/null @@ -1,17 +0,0 @@ -image: - repository: ghcr.io/kafscale/kafscale-broker - tag: dev - pullPolicy: IfNotPresent - -etcdEndpoints: "http://etcd:2379" -s3Endpoint: "http://minio:9000" -s3Bucket: "kafscale" -s3AccessKey: "scalytics" -s3SecretKey: "scalytics" - -service: - type: ClusterIP - port: 9092 - -labels: - app.kubernetes.io/part-of: ops-foundation From 3473e3aa17d6f0ca7049d605a7bf11acb72b99fb Mon Sep 17 00:00:00 2001 From: 2pk03 Date: Tue, 2 Jun 2026 08:51:58 +0200 Subject: [PATCH 6/7] test(broker): lock list groups range --- cmd/broker/main.go | 6 +++--- cmd/broker/main_test.go | 13 +++++++++++++ 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/cmd/broker/main.go b/cmd/broker/main.go index 070a0bb..d999ea0 100644 --- a/cmd/broker/main.go +++ b/cmd/broker/main.go @@ -2711,9 +2711,9 @@ func generateApiVersions() []kmsg.ApiVersionsResponseApiKey { {key: protocol.APIKeyOffsetCommit, minVersion: 3, maxVersion: 3}, {key: protocol.APIKeyOffsetFetch, minVersion: 5, maxVersion: 5}, {key: protocol.APIKeyDescribeGroups, minVersion: 5, maxVersion: 5}, - // OPS-005: Java admin-client negotiates LIST_GROUPS in range [0,4]. - // Narrow 5-5 range breaks `kafka-consumer-groups --list`. Widen to 0-5 - // — the underlying coordinator handler is version-agnostic. + // Older Kafka admin clients negotiate LIST_GROUPS in range [0,4]. + // Narrow 5-5 advertisement breaks `kafka-consumer-groups --list` even + // though the coordinator handler is version-agnostic. {key: protocol.APIKeyListGroups, minVersion: 0, maxVersion: 5}, {key: protocol.APIKeyOffsetForLeaderEpoch, minVersion: 3, maxVersion: 3}, {key: protocol.APIKeyDescribeConfigs, minVersion: 4, maxVersion: 4}, diff --git a/cmd/broker/main_test.go b/cmd/broker/main_test.go index fd87ee8..f8af8b3 100644 --- a/cmd/broker/main_test.go +++ b/cmd/broker/main_test.go @@ -182,6 +182,19 @@ func TestHandlerApiVersionsUnsupported(t *testing.T) { } } +func TestGenerateApiVersionsAdvertisesListGroupsCompatibility(t *testing.T) { + for _, entry := range generateApiVersions() { + if entry.ApiKey != protocol.APIKeyListGroups { + continue + } + if entry.MinVersion != 0 || entry.MaxVersion != 5 { + t.Fatalf("expected LIST_GROUPS version range 0..5, got %d..%d", entry.MinVersion, entry.MaxVersion) + } + return + } + t.Fatal("LIST_GROUPS api version entry not found") +} + func TestHandleFetch(t *testing.T) { store := metadata.NewInMemoryStore(defaultMetadata()) handler := newTestHandler(store) From ac661459408456728407593b3249b516aa101ae5 Mon Sep 17 00:00:00 2001 From: 2pk03 Date: Tue, 2 Jun 2026 09:00:47 +0200 Subject: [PATCH 7/7] fix restore and extend coverage --- cmd/broker/acl_test.go | 22 +++++++++ cmd/broker/main_test.go | 99 ++++++++++++++++++++++++++++++++++++++++ cmd/kafscale-cli/main.go | 16 ++++++- 3 files changed, 136 insertions(+), 1 deletion(-) diff --git a/cmd/broker/acl_test.go b/cmd/broker/acl_test.go index 503689b..262e9ba 100644 --- a/cmd/broker/acl_test.go +++ b/cmd/broker/acl_test.go @@ -116,6 +116,28 @@ func TestACLListOffsetsDenied(t *testing.T) { } } +func TestACLListGroupsDenied(t *testing.T) { + t.Setenv("KAFSCALE_ACL_ENABLED", "true") + t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`) + + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + clientID := "client-a" + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{ + CorrelationID: 18, + APIVersion: 5, + ClientID: &clientID, + }, kmsg.NewPtrListGroupsRequest()) + if err != nil { + t.Fatalf("Handle ListGroups: %v", err) + } + resp := decodeKmsgResponse(t, 5, payload, kmsg.NewPtrListGroupsResponse) + if resp.ErrorCode != protocol.GROUP_AUTHORIZATION_FAILED { + t.Fatalf("expected group auth failed, got %d", resp.ErrorCode) + } +} + func TestACLOffsetFetchDenied(t *testing.T) { t.Setenv("KAFSCALE_ACL_ENABLED", "true") t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`) diff --git a/cmd/broker/main_test.go b/cmd/broker/main_test.go index f8af8b3..ba1b7a6 100644 --- a/cmd/broker/main_test.go +++ b/cmd/broker/main_test.go @@ -195,6 +195,105 @@ func TestGenerateApiVersionsAdvertisesListGroupsCompatibility(t *testing.T) { t.Fatal("LIST_GROUPS api version entry not found") } +func TestHandlerApiVersionsAdvertisesListGroupsCompatibility(t *testing.T) { + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + header := &protocol.RequestHeader{ + APIKey: protocol.APIKeyApiVersion, + APIVersion: 0, + CorrelationID: 77, + } + payload, err := handler.Handle(context.Background(), header, kmsg.NewPtrApiVersionsRequest()) + if err != nil { + t.Fatalf("Handle ApiVersions: %v", err) + } + + resp := decodeKmsgResponse(t, 0, payload, kmsg.NewPtrApiVersionsResponse) + if resp.ErrorCode != protocol.NONE { + t.Fatalf("expected error code %d, got %d", protocol.NONE, resp.ErrorCode) + } + + for _, entry := range resp.ApiKeys { + if entry.ApiKey != protocol.APIKeyListGroups { + continue + } + if entry.MinVersion != 0 || entry.MaxVersion != 5 { + t.Fatalf("expected LIST_GROUPS version range 0..5, got %d..%d", entry.MinVersion, entry.MaxVersion) + } + return + } + t.Fatal("LIST_GROUPS api version entry not found") +} + +func TestHandleListGroupsSupportsCompatibleVersions(t *testing.T) { + store := metadata.NewInMemoryStore(defaultMetadata()) + group := &metadatapb.ConsumerGroup{ + GroupId: "group-1", + State: "stable", + ProtocolType: "consumer", + Protocol: "range", + Members: map[string]*metadatapb.GroupMember{ + "member-1": {ClientId: "client-1", ClientHost: "127.0.0.1"}, + }, + } + if err := store.PutConsumerGroup(context.Background(), group); err != nil { + t.Fatalf("PutConsumerGroup: %v", err) + } + handler := newTestHandler(store) + + for _, version := range []int16{0, 4, 5} { + t.Run(fmt.Sprintf("v%d", version), func(t *testing.T) { + req := kmsg.NewPtrListGroupsRequest() + req.StatesFilter = []string{"Stable"} + req.TypesFilter = []string{"classic"} + + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{ + APIKey: protocol.APIKeyListGroups, + APIVersion: version, + CorrelationID: int32(100 + version), + }, req) + if err != nil { + t.Fatalf("Handle ListGroups: %v", err) + } + + resp := decodeKmsgResponse(t, version, payload, kmsg.NewPtrListGroupsResponse) + if resp.ErrorCode != protocol.NONE { + t.Fatalf("expected error code %d, got %d", protocol.NONE, resp.ErrorCode) + } + if len(resp.Groups) != 1 { + t.Fatalf("expected 1 group, got %d", len(resp.Groups)) + } + if resp.Groups[0].Group != "group-1" { + t.Fatalf("expected group-1, got %q", resp.Groups[0].Group) + } + }) + } +} + +func TestHandleListGroupsEtcdUnavailable(t *testing.T) { + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(unavailableMetadataStore{Store: store}) + + for _, version := range []int16{0, 5} { + t.Run(fmt.Sprintf("v%d", version), func(t *testing.T) { + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{ + APIKey: protocol.APIKeyListGroups, + APIVersion: version, + CorrelationID: int32(200 + version), + }, kmsg.NewPtrListGroupsRequest()) + if err != nil { + t.Fatalf("Handle ListGroups: %v", err) + } + + resp := decodeKmsgResponse(t, version, payload, kmsg.NewPtrListGroupsResponse) + if resp.ErrorCode != protocol.REQUEST_TIMED_OUT { + t.Fatalf("expected REQUEST_TIMED_OUT (%d), got %d", protocol.REQUEST_TIMED_OUT, resp.ErrorCode) + } + }) + } +} + func TestHandleFetch(t *testing.T) { store := metadata.NewInMemoryStore(defaultMetadata()) handler := newTestHandler(store) diff --git a/cmd/kafscale-cli/main.go b/cmd/kafscale-cli/main.go index d8c9921..87bbdcf 100644 --- a/cmd/kafscale-cli/main.go +++ b/cmd/kafscale-cli/main.go @@ -202,7 +202,7 @@ func executeRestore(ctx context.Context, stdout io.Writer, cfg restoreConfig, s3 targetCfg := cloneTopicConfig(sourceCfg) targetCfg.Name = cfg.TargetTopic targetCfg.CreatedAt = time.Now().UTC().Format(time.RFC3339) - if err := store.UpdateTopicConfig(ctx, targetCfg); err != nil { + if err := persistTopicConfig(ctx, store, targetCfg); err != nil { return err } @@ -286,6 +286,20 @@ func writePartitionStates(ctx context.Context, store *metadata.EtcdStore, topic return nil } +func persistTopicConfig(ctx context.Context, store *metadata.EtcdStore, cfg *metadatapb.TopicConfig) error { + if cfg == nil || cfg.Name == "" { + return metadata.ErrInvalidTopic + } + payload, err := metadata.EncodeTopicConfig(cfg) + if err != nil { + return err + } + putCtx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + _, err = store.EtcdClient().Put(putCtx, metadata.TopicConfigKey(cfg.Name), string(payload)) + return err +} + func cloneTopicConfig(cfg *metadatapb.TopicConfig) *metadatapb.TopicConfig { if cfg == nil { return nil