Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kadm: add func to decode AuthorizedOperations #870

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions pkg/kadm/acls.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kadm
import (
"context"
"fmt"
"math"
"strings"
"sync"

Expand Down Expand Up @@ -1115,3 +1116,55 @@ func createDelDescACL(b *ACLBuilder) ([]kmsg.DeleteACLsRequestFilter, []*kmsg.De
}
return deletions, describes, nil
}

// DecodeACLOperations decodes an int32 bitfield into a slice of
// kmsg.ACLOperation values.
//
// This function is used to interpret the `AuthorizedOperations` field returned
// by the Kafka APIs, which specifies the operations a client is allowed to
// perform on a cluster, topic, or consumer group. It is utilized in multiple
// Kafka API responses, including Metadata, DescribeCluster, and
// DescribeGroupsResponseGroup.
//
// Caveats with Metadata API
// 1. To include authorized operations in the Metadata response, the client must explicitly
// opt in by setting `IncludeClusterAuthorizedOperations` and/or `IncludeTopicAuthorizedOperations`.
// These options were introduced in Kafka 2.3.0 as part of KIP-430.
// 2. In Kafka 2.8.0 (Metadata v11), the `AuthorizedOperations` for the cluster was removed from the
// Metadata response. Instead, clients should use the DescribeCluster API to retrieve cluster-level
// permissions.
//
// Function Behavior
// - If the bitfield equals `math.MinInt32` (-2147483648), it indicates that "AUTHORIZED_OPERATIONS_OMITTED"
// is set, and the function returns an empty slice.
// - For non-omitted values, the function iterates through all 32 bits of the bitfield. Each bit that
// is set (`1`) corresponds to an ACL operation, which is then mapped to its respective `kmsg.ACLOperation` value.
// - Undefined or unknown bits (e.g., bit 0 for `kmsg.ACLOperationUnknown`) are ignored.
//
// Supported Use Cases
// - Cluster Operations: Retrieved via the DescribeCluster API or older Metadata API versions (v8–v10).
// - Topic Operations: Retrieved via the Metadata API when `IncludeTopicAuthorizedOperations` is set.
// - Group Operations: Retrieved in the DescribeGroups API response.
func DecodeACLOperations(bitfield int32) []kmsg.ACLOperation {
var operations []kmsg.ACLOperation

// MinInt32 represents "AUTHORIZED_OPERATIONS_OMITTED"
if bitfield == math.MinInt32 {
return operations
}

// Helper function to determine if an operation is valid.
isValidOperation := func(op kmsg.ACLOperation) bool {
return op >= kmsg.ACLOperationRead && op <= kmsg.ACLOperationDescribeTokens
}

for i := 0; i < 32; i++ {
if bitfield&(1<<i) != 0 {
operation := kmsg.ACLOperation(i)
if isValidOperation(operation) {
operations = append(operations, operation)
}
}
}
return operations
}
109 changes: 109 additions & 0 deletions pkg/kadm/acls_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package kadm

import (
"math"
"reflect"
"testing"

"github.com/twmb/franz-go/pkg/kmsg"
)

func TestDecodeACLOperations(t *testing.T) {
tests := []struct {
name string
bitfield int32
expected []kmsg.ACLOperation
}{
{
name: "Example 264",
bitfield: 264,
expected: []kmsg.ACLOperation{
kmsg.ACLOperationRead,
kmsg.ACLOperationDescribe,
},
},
{
name: "Example 3400",
bitfield: 3400,
expected: []kmsg.ACLOperation{
kmsg.ACLOperationRead,
kmsg.ACLOperationDescribe,
kmsg.ACLOperationAlterConfigs,
kmsg.ACLOperationDescribeConfigs,
kmsg.ACLOperationDelete,
},
},
{
name: "Example 3968",
bitfield: 3968,
expected: []kmsg.ACLOperation{
kmsg.ACLOperationAlter,
kmsg.ACLOperationAlterConfigs,
kmsg.ACLOperationClusterAction,
kmsg.ACLOperationDescribe,
kmsg.ACLOperationDescribeConfigs,
},
},
{
name: "Example 4000",
bitfield: 4000,
expected: []kmsg.ACLOperation{
kmsg.ACLOperationAlter,
kmsg.ACLOperationAlterConfigs,
kmsg.ACLOperationClusterAction,
kmsg.ACLOperationCreate,
kmsg.ACLOperationDescribe,
kmsg.ACLOperationDescribeConfigs,
},
},
{
name: "All Operations",
bitfield: math.MaxInt32, // All bits set
expected: []kmsg.ACLOperation{
kmsg.ACLOperationRead,
kmsg.ACLOperationWrite,
kmsg.ACLOperationCreate,
kmsg.ACLOperationDelete,
kmsg.ACLOperationAlter,
kmsg.ACLOperationDescribe,
kmsg.ACLOperationClusterAction,
kmsg.ACLOperationDescribeConfigs,
kmsg.ACLOperationAlterConfigs,
kmsg.ACLOperationIdempotentWrite,
kmsg.ACLOperationCreateTokens,
kmsg.ACLOperationDescribeTokens,
},
},
{
name: "Invalid Operations Excluded",
bitfield: 1<<15 | 1<<16, // Bits beyond known operations
expected: []kmsg.ACLOperation{},
},
{
name: "Empty Bitfield",
bitfield: math.MinInt32,
expected: []kmsg.ACLOperation{},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := DecodeACLOperations(tt.bitfield)

// Compare slices ignoring order
expectedMap := make(map[kmsg.ACLOperation]bool)
for _, op := range tt.expected {
expectedMap[op] = true
}

resultMap := make(map[kmsg.ACLOperation]bool)
for _, op := range result {
resultMap[op] = true
}

if !reflect.DeepEqual(expectedMap, resultMap) {
t.Errorf("DecodeACLOperations(%d) = %v, expected %v", tt.bitfield, result, tt.expected)
}
})
}
}
Loading