Skip to content
22 changes: 22 additions & 0 deletions cmd/broker/acl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,28 @@ func TestACLListOffsetsDenied(t *testing.T) {
}
}

func TestACLListGroupsDenied(t *testing.T) {
t.Setenv("KAFSCALE_ACL_ENABLED", "true")
t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`)

store := metadata.NewInMemoryStore(defaultMetadata())
handler := newTestHandler(store)

clientID := "client-a"
payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{
CorrelationID: 18,
APIVersion: 5,
ClientID: &clientID,
}, kmsg.NewPtrListGroupsRequest())
if err != nil {
t.Fatalf("Handle ListGroups: %v", err)
}
resp := decodeKmsgResponse(t, 5, payload, kmsg.NewPtrListGroupsResponse)
if resp.ErrorCode != protocol.GROUP_AUTHORIZATION_FAILED {
t.Fatalf("expected group auth failed, got %d", resp.ErrorCode)
}
}

func TestACLOffsetFetchDenied(t *testing.T) {
t.Setenv("KAFSCALE_ACL_ENABLED", "true")
t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`)
Expand Down
5 changes: 4 additions & 1 deletion cmd/broker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2711,7 +2711,10 @@ func generateApiVersions() []kmsg.ApiVersionsResponseApiKey {
{key: protocol.APIKeyOffsetCommit, minVersion: 3, maxVersion: 3},
{key: protocol.APIKeyOffsetFetch, minVersion: 5, maxVersion: 5},
{key: protocol.APIKeyDescribeGroups, minVersion: 5, maxVersion: 5},
{key: protocol.APIKeyListGroups, minVersion: 5, maxVersion: 5},
// Older Kafka admin clients negotiate LIST_GROUPS in range [0,4].
// Narrow 5-5 advertisement breaks `kafka-consumer-groups --list` even
// though the coordinator handler is version-agnostic.
{key: protocol.APIKeyListGroups, minVersion: 0, maxVersion: 5},
{key: protocol.APIKeyOffsetForLeaderEpoch, minVersion: 3, maxVersion: 3},
{key: protocol.APIKeyDescribeConfigs, minVersion: 4, maxVersion: 4},
{key: protocol.APIKeyAlterConfigs, minVersion: 1, maxVersion: 1},
Expand Down
112 changes: 112 additions & 0 deletions cmd/broker/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,118 @@ func TestHandlerApiVersionsUnsupported(t *testing.T) {
}
}

func TestGenerateApiVersionsAdvertisesListGroupsCompatibility(t *testing.T) {
for _, entry := range generateApiVersions() {
if entry.ApiKey != protocol.APIKeyListGroups {
continue
}
if entry.MinVersion != 0 || entry.MaxVersion != 5 {
t.Fatalf("expected LIST_GROUPS version range 0..5, got %d..%d", entry.MinVersion, entry.MaxVersion)
}
return
}
t.Fatal("LIST_GROUPS api version entry not found")
}

func TestHandlerApiVersionsAdvertisesListGroupsCompatibility(t *testing.T) {
store := metadata.NewInMemoryStore(defaultMetadata())
handler := newTestHandler(store)

header := &protocol.RequestHeader{
APIKey: protocol.APIKeyApiVersion,
APIVersion: 0,
CorrelationID: 77,
}
payload, err := handler.Handle(context.Background(), header, kmsg.NewPtrApiVersionsRequest())
if err != nil {
t.Fatalf("Handle ApiVersions: %v", err)
}

resp := decodeKmsgResponse(t, 0, payload, kmsg.NewPtrApiVersionsResponse)
if resp.ErrorCode != protocol.NONE {
t.Fatalf("expected error code %d, got %d", protocol.NONE, resp.ErrorCode)
}

for _, entry := range resp.ApiKeys {
if entry.ApiKey != protocol.APIKeyListGroups {
continue
}
if entry.MinVersion != 0 || entry.MaxVersion != 5 {
t.Fatalf("expected LIST_GROUPS version range 0..5, got %d..%d", entry.MinVersion, entry.MaxVersion)
}
return
}
t.Fatal("LIST_GROUPS api version entry not found")
}

func TestHandleListGroupsSupportsCompatibleVersions(t *testing.T) {
store := metadata.NewInMemoryStore(defaultMetadata())
group := &metadatapb.ConsumerGroup{
GroupId: "group-1",
State: "stable",
ProtocolType: "consumer",
Protocol: "range",
Members: map[string]*metadatapb.GroupMember{
"member-1": {ClientId: "client-1", ClientHost: "127.0.0.1"},
},
}
if err := store.PutConsumerGroup(context.Background(), group); err != nil {
t.Fatalf("PutConsumerGroup: %v", err)
}
handler := newTestHandler(store)

for _, version := range []int16{0, 4, 5} {
t.Run(fmt.Sprintf("v%d", version), func(t *testing.T) {
req := kmsg.NewPtrListGroupsRequest()
req.StatesFilter = []string{"Stable"}
req.TypesFilter = []string{"classic"}

payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{
APIKey: protocol.APIKeyListGroups,
APIVersion: version,
CorrelationID: int32(100 + version),
}, req)
if err != nil {
t.Fatalf("Handle ListGroups: %v", err)
}

resp := decodeKmsgResponse(t, version, payload, kmsg.NewPtrListGroupsResponse)
if resp.ErrorCode != protocol.NONE {
t.Fatalf("expected error code %d, got %d", protocol.NONE, resp.ErrorCode)
}
if len(resp.Groups) != 1 {
t.Fatalf("expected 1 group, got %d", len(resp.Groups))
}
if resp.Groups[0].Group != "group-1" {
t.Fatalf("expected group-1, got %q", resp.Groups[0].Group)
}
})
}
}

func TestHandleListGroupsEtcdUnavailable(t *testing.T) {
store := metadata.NewInMemoryStore(defaultMetadata())
handler := newTestHandler(unavailableMetadataStore{Store: store})

for _, version := range []int16{0, 5} {
t.Run(fmt.Sprintf("v%d", version), func(t *testing.T) {
payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{
APIKey: protocol.APIKeyListGroups,
APIVersion: version,
CorrelationID: int32(200 + version),
}, kmsg.NewPtrListGroupsRequest())
if err != nil {
t.Fatalf("Handle ListGroups: %v", err)
}

resp := decodeKmsgResponse(t, version, payload, kmsg.NewPtrListGroupsResponse)
if resp.ErrorCode != protocol.REQUEST_TIMED_OUT {
t.Fatalf("expected REQUEST_TIMED_OUT (%d), got %d", protocol.REQUEST_TIMED_OUT, resp.ErrorCode)
}
})
}
}

func TestHandleFetch(t *testing.T) {
store := metadata.NewInMemoryStore(defaultMetadata())
handler := newTestHandler(store)
Expand Down
3 changes: 3 additions & 0 deletions cmd/kafscale-cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,9 @@ func writePartitionStates(ctx context.Context, store *metadata.EtcdStore, topic
}

func persistTopicConfig(ctx context.Context, store *metadata.EtcdStore, cfg *metadatapb.TopicConfig) error {
if cfg == nil || cfg.Name == "" {
return metadata.ErrInvalidTopic
}
payload, err := metadata.EncodeTopicConfig(cfg)
if err != nil {
return err
Expand Down
Loading