diff --git a/cmd/broker/acl_test.go b/cmd/broker/acl_test.go index 503689b..262e9ba 100644 --- a/cmd/broker/acl_test.go +++ b/cmd/broker/acl_test.go @@ -116,6 +116,28 @@ func TestACLListOffsetsDenied(t *testing.T) { } } +func TestACLListGroupsDenied(t *testing.T) { + t.Setenv("KAFSCALE_ACL_ENABLED", "true") + t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`) + + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + clientID := "client-a" + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{ + CorrelationID: 18, + APIVersion: 5, + ClientID: &clientID, + }, kmsg.NewPtrListGroupsRequest()) + if err != nil { + t.Fatalf("Handle ListGroups: %v", err) + } + resp := decodeKmsgResponse(t, 5, payload, kmsg.NewPtrListGroupsResponse) + if resp.ErrorCode != protocol.GROUP_AUTHORIZATION_FAILED { + t.Fatalf("expected group auth failed, got %d", resp.ErrorCode) + } +} + func TestACLOffsetFetchDenied(t *testing.T) { t.Setenv("KAFSCALE_ACL_ENABLED", "true") t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`) diff --git a/cmd/broker/main.go b/cmd/broker/main.go index c1ba108..d999ea0 100644 --- a/cmd/broker/main.go +++ b/cmd/broker/main.go @@ -2711,7 +2711,10 @@ func generateApiVersions() []kmsg.ApiVersionsResponseApiKey { {key: protocol.APIKeyOffsetCommit, minVersion: 3, maxVersion: 3}, {key: protocol.APIKeyOffsetFetch, minVersion: 5, maxVersion: 5}, {key: protocol.APIKeyDescribeGroups, minVersion: 5, maxVersion: 5}, - {key: protocol.APIKeyListGroups, minVersion: 5, maxVersion: 5}, + // Older Kafka admin clients negotiate LIST_GROUPS in range [0,4]. + // Narrow 5-5 advertisement breaks `kafka-consumer-groups --list` even + // though the coordinator handler is version-agnostic. + {key: protocol.APIKeyListGroups, minVersion: 0, maxVersion: 5}, {key: protocol.APIKeyOffsetForLeaderEpoch, minVersion: 3, maxVersion: 3}, {key: protocol.APIKeyDescribeConfigs, minVersion: 4, maxVersion: 4}, {key: protocol.APIKeyAlterConfigs, minVersion: 1, maxVersion: 1}, diff --git a/cmd/broker/main_test.go b/cmd/broker/main_test.go index 4de709a..5a7c73f 100644 --- a/cmd/broker/main_test.go +++ b/cmd/broker/main_test.go @@ -182,6 +182,118 @@ func TestHandlerApiVersionsUnsupported(t *testing.T) { } } +func TestGenerateApiVersionsAdvertisesListGroupsCompatibility(t *testing.T) { + for _, entry := range generateApiVersions() { + if entry.ApiKey != protocol.APIKeyListGroups { + continue + } + if entry.MinVersion != 0 || entry.MaxVersion != 5 { + t.Fatalf("expected LIST_GROUPS version range 0..5, got %d..%d", entry.MinVersion, entry.MaxVersion) + } + return + } + t.Fatal("LIST_GROUPS api version entry not found") +} + +func TestHandlerApiVersionsAdvertisesListGroupsCompatibility(t *testing.T) { + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + header := &protocol.RequestHeader{ + APIKey: protocol.APIKeyApiVersion, + APIVersion: 0, + CorrelationID: 77, + } + payload, err := handler.Handle(context.Background(), header, kmsg.NewPtrApiVersionsRequest()) + if err != nil { + t.Fatalf("Handle ApiVersions: %v", err) + } + + resp := decodeKmsgResponse(t, 0, payload, kmsg.NewPtrApiVersionsResponse) + if resp.ErrorCode != protocol.NONE { + t.Fatalf("expected error code %d, got %d", protocol.NONE, resp.ErrorCode) + } + + for _, entry := range resp.ApiKeys { + if entry.ApiKey != protocol.APIKeyListGroups { + continue + } + if entry.MinVersion != 0 || entry.MaxVersion != 5 { + t.Fatalf("expected LIST_GROUPS version range 0..5, got %d..%d", entry.MinVersion, entry.MaxVersion) + } + return + } + t.Fatal("LIST_GROUPS api version entry not found") +} + +func TestHandleListGroupsSupportsCompatibleVersions(t *testing.T) { + store := metadata.NewInMemoryStore(defaultMetadata()) + group := &metadatapb.ConsumerGroup{ + GroupId: "group-1", + State: "stable", + ProtocolType: "consumer", + Protocol: "range", + Members: map[string]*metadatapb.GroupMember{ + "member-1": {ClientId: "client-1", ClientHost: "127.0.0.1"}, + }, + } + if err := store.PutConsumerGroup(context.Background(), group); err != nil { + t.Fatalf("PutConsumerGroup: %v", err) + } + handler := newTestHandler(store) + + for _, version := range []int16{0, 4, 5} { + t.Run(fmt.Sprintf("v%d", version), func(t *testing.T) { + req := kmsg.NewPtrListGroupsRequest() + req.StatesFilter = []string{"Stable"} + req.TypesFilter = []string{"classic"} + + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{ + APIKey: protocol.APIKeyListGroups, + APIVersion: version, + CorrelationID: int32(100 + version), + }, req) + if err != nil { + t.Fatalf("Handle ListGroups: %v", err) + } + + resp := decodeKmsgResponse(t, version, payload, kmsg.NewPtrListGroupsResponse) + if resp.ErrorCode != protocol.NONE { + t.Fatalf("expected error code %d, got %d", protocol.NONE, resp.ErrorCode) + } + if len(resp.Groups) != 1 { + t.Fatalf("expected 1 group, got %d", len(resp.Groups)) + } + if resp.Groups[0].Group != "group-1" { + t.Fatalf("expected group-1, got %q", resp.Groups[0].Group) + } + }) + } +} + +func TestHandleListGroupsEtcdUnavailable(t *testing.T) { + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(unavailableMetadataStore{Store: store}) + + for _, version := range []int16{0, 5} { + t.Run(fmt.Sprintf("v%d", version), func(t *testing.T) { + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{ + APIKey: protocol.APIKeyListGroups, + APIVersion: version, + CorrelationID: int32(200 + version), + }, kmsg.NewPtrListGroupsRequest()) + if err != nil { + t.Fatalf("Handle ListGroups: %v", err) + } + + resp := decodeKmsgResponse(t, version, payload, kmsg.NewPtrListGroupsResponse) + if resp.ErrorCode != protocol.REQUEST_TIMED_OUT { + t.Fatalf("expected REQUEST_TIMED_OUT (%d), got %d", protocol.REQUEST_TIMED_OUT, resp.ErrorCode) + } + }) + } +} + func TestHandleFetch(t *testing.T) { store := metadata.NewInMemoryStore(defaultMetadata()) handler := newTestHandler(store) diff --git a/cmd/kafscale-cli/main.go b/cmd/kafscale-cli/main.go index bb4f32f..a91763f 100644 --- a/cmd/kafscale-cli/main.go +++ b/cmd/kafscale-cli/main.go @@ -303,6 +303,9 @@ func writePartitionStates(ctx context.Context, store *metadata.EtcdStore, topic } func persistTopicConfig(ctx context.Context, store *metadata.EtcdStore, cfg *metadatapb.TopicConfig) error { + if cfg == nil || cfg.Name == "" { + return metadata.ErrInvalidTopic + } payload, err := metadata.EncodeTopicConfig(cfg) if err != nil { return err