Skip to content

Commit

Permalink
kadm: add func to decode AuthorizedOperations
Browse files Browse the repository at this point in the history
The authorized operations is an int32 bitfield
that represents ACL operations. This commit adds
a func that allows to decode the bitfield into
a slice of kmsg.ACLOperation values. This bitfield
is used in the metadata, describe clusters and
describe groups API responses.
  • Loading branch information
weeco committed Dec 5, 2024
1 parent cea7aa5 commit aa1c73c
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 0 deletions.
53 changes: 53 additions & 0 deletions pkg/kadm/acls.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kadm
import (
"context"
"fmt"
"math"
"strings"
"sync"

Expand Down Expand Up @@ -1115,3 +1116,55 @@ func createDelDescACL(b *ACLBuilder) ([]kmsg.DeleteACLsRequestFilter, []*kmsg.De
}
return deletions, describes, nil
}

// DecodeACLOperations decodes an int32 bitfield into a slice of
// kmsg.ACLOperation values.
//
// This function is used to interpret the `AuthorizedOperations` field returned
// by the Kafka APIs, which specifies the operations a client is allowed to
// perform on a cluster, topic, or consumer group. It is utilized in multiple
// Kafka API responses, including Metadata, DescribeCluster, and
// DescribeGroupsResponseGroup.
//
// Caveats with Metadata API
// 1. To include authorized operations in the Metadata response, the client must explicitly
// opt in by setting `IncludeClusterAuthorizedOperations` and/or `IncludeTopicAuthorizedOperations`.
// These options were introduced in Kafka 2.3.0 as part of KIP-430.
// 2. In Kafka 2.8.0 (Metadata v11), the `AuthorizedOperations` for the cluster was removed from the
// Metadata response. Instead, clients should use the DescribeCluster API to retrieve cluster-level
// permissions.
//
// Function Behavior
// - If the bitfield equals `math.MinInt32` (-2147483648), it indicates that "AUTHORIZED_OPERATIONS_OMITTED"
// is set, and the function returns an empty slice.
// - For non-omitted values, the function iterates through all 32 bits of the bitfield. Each bit that
// is set (`1`) corresponds to an ACL operation, which is then mapped to its respective `kmsg.ACLOperation` value.
// - Undefined or unknown bits (e.g., bit 0 for `kmsg.ACLOperationUnknown`) are ignored.
//
// Supported Use Cases
// - Cluster Operations: Retrieved via the DescribeCluster API or older Metadata API versions (v8–v10).
// - Topic Operations: Retrieved via the Metadata API when `IncludeTopicAuthorizedOperations` is set.
// - Group Operations: Retrieved in the DescribeGroups API response.
func DecodeACLOperations(bitfield int32) []kmsg.ACLOperation {
var operations []kmsg.ACLOperation

// MinInt32 represents "AUTHORIZED_OPERATIONS_OMITTED"
if bitfield == math.MinInt32 {
return operations
}

// Helper function to determine if an operation is valid.
isValidOperation := func(op kmsg.ACLOperation) bool {
return op >= kmsg.ACLOperationRead && op <= kmsg.ACLOperationDescribeTokens
}

for i := 0; i < 32; i++ {
if bitfield&(1<<i) != 0 {
operation := kmsg.ACLOperation(i)
if isValidOperation(operation) {
operations = append(operations, operation)
}
}
}
return operations
}
109 changes: 109 additions & 0 deletions pkg/kadm/acls_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package kadm

import (
"math"
"reflect"
"testing"

"github.com/twmb/franz-go/pkg/kmsg"
)

func TestDecodeACLOperations(t *testing.T) {
tests := []struct {
name string
bitfield int32
expected []kmsg.ACLOperation
}{
{
name: "Example 264",
bitfield: 264,
expected: []kmsg.ACLOperation{
kmsg.ACLOperationRead,
kmsg.ACLOperationDescribe,
},
},
{
name: "Example 3400",
bitfield: 3400,
expected: []kmsg.ACLOperation{
kmsg.ACLOperationRead,
kmsg.ACLOperationDescribe,
kmsg.ACLOperationAlterConfigs,
kmsg.ACLOperationDescribeConfigs,
kmsg.ACLOperationDelete,
},
},
{
name: "Example 3968",
bitfield: 3968,
expected: []kmsg.ACLOperation{
kmsg.ACLOperationAlter,
kmsg.ACLOperationAlterConfigs,
kmsg.ACLOperationClusterAction,
kmsg.ACLOperationDescribe,
kmsg.ACLOperationDescribeConfigs,
},
},
{
name: "Example 4000",
bitfield: 4000,
expected: []kmsg.ACLOperation{
kmsg.ACLOperationAlter,
kmsg.ACLOperationAlterConfigs,
kmsg.ACLOperationClusterAction,
kmsg.ACLOperationCreate,
kmsg.ACLOperationDescribe,
kmsg.ACLOperationDescribeConfigs,
},
},
{
name: "All Operations",
bitfield: math.MaxInt32, // All bits set
expected: []kmsg.ACLOperation{
kmsg.ACLOperationRead,
kmsg.ACLOperationWrite,
kmsg.ACLOperationCreate,
kmsg.ACLOperationDelete,
kmsg.ACLOperationAlter,
kmsg.ACLOperationDescribe,
kmsg.ACLOperationClusterAction,
kmsg.ACLOperationDescribeConfigs,
kmsg.ACLOperationAlterConfigs,
kmsg.ACLOperationIdempotentWrite,
kmsg.ACLOperationCreateTokens,
kmsg.ACLOperationDescribeTokens,
},
},
{
name: "Invalid Operations Excluded",
bitfield: 1<<15 | 1<<16, // Bits beyond known operations
expected: []kmsg.ACLOperation{},
},
{
name: "Empty Bitfield",
bitfield: math.MinInt32,
expected: []kmsg.ACLOperation{},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := DecodeACLOperations(tt.bitfield)

// Compare slices ignoring order
expectedMap := make(map[kmsg.ACLOperation]bool)
for _, op := range tt.expected {
expectedMap[op] = true
}

resultMap := make(map[kmsg.ACLOperation]bool)
for _, op := range result {
resultMap[op] = true
}

if !reflect.DeepEqual(expectedMap, resultMap) {
t.Errorf("DecodeACLOperations(%d) = %v, expected %v", tt.bitfield, result, tt.expected)
}
})
}
}

0 comments on commit aa1c73c

Please sign in to comment.