Skip to content

Commit

Permalink
Merge pull request #759 from twmb/kadm-errmsg
Browse files Browse the repository at this point in the history
kadm: add ErrMessage to all types it was missing from
  • Loading branch information
twmb authored Jul 29, 2024
2 parents 8b955b4 + af493a7 commit 36265e0
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 19 deletions.
18 changes: 13 additions & 5 deletions pkg/kadm/acls.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,8 @@ type CreateACLsResult struct {
Operation ACLOperation // Operation is the operation allowed / denied.
Permission kmsg.ACLPermissionType // Permission is whether this is allowed / denied.

Err error // Err is the error for this ACL creation.
Err error // Err is the error for this ACL creation.
ErrMessage string // ErrMessage a potential extra message describing any error.
}

// CreateACLsResults contains all results to created ACLs.
Expand Down Expand Up @@ -752,7 +753,8 @@ func (cl *Client) CreateACLs(ctx context.Context, b *ACLBuilder) (CreateACLsResu
Operation: c.Operation,
Permission: c.PermissionType,

Err: kerr.ErrorForCode(r.ErrorCode),
Err: kerr.ErrorForCode(r.ErrorCode),
ErrMessage: unptrStr(r.ErrorMessage),
})
}

Expand All @@ -770,7 +772,8 @@ type DeletedACL struct {
Operation ACLOperation // Operation is this deleted ACL's operation.
Permission kmsg.ACLPermissionType // Permission this deleted ACLs permission.

Err error // Err is non-nil if this match has an error.
Err error // Err is non-nil if this match has an error.
ErrMessage string // ErrMessage a potential extra message describing any error.
}

// DeletedACLs contains ACLs that were deleted from a single delete filter.
Expand All @@ -794,7 +797,8 @@ type DeleteACLsResult struct {

Deleted DeletedACLs // Deleted contains all ACLs this delete filter matched.

Err error // Err is non-nil if this filter has an error.
Err error // Err is non-nil if this filter has an error.
ErrMessage string // ErrMessage a potential extra message describing any error.
}

// DeleteACLsResults contains all results to deleted ACLs.
Expand Down Expand Up @@ -841,6 +845,7 @@ func (cl *Client) DeleteACLs(ctx context.Context, b *ACLBuilder) (DeleteACLsResu
Operation: m.Operation,
Permission: m.PermissionType,
Err: kerr.ErrorForCode(m.ErrorCode),
ErrMessage: unptrStr(m.ErrorMessage),
})
}
rs = append(rs, DeleteACLsResult{
Expand All @@ -853,6 +858,7 @@ func (cl *Client) DeleteACLs(ctx context.Context, b *ACLBuilder) (DeleteACLsResu
Permission: f.PermissionType,
Deleted: ms,
Err: kerr.ErrorForCode(r.ErrorCode),
ErrMessage: unptrStr(r.ErrorMessage),
})
}
return rs, nil
Expand Down Expand Up @@ -892,7 +898,8 @@ type DescribeACLsResult struct {

Described DescribedACLs // Described contains all ACLs this describe filter matched.

Err error // Err is non-nil if this filter has an error.
Err error // Err is non-nil if this filter has an error.
ErrMessage string // ErrMessage a potential extra message describing any error.
}

// DescribeACLsResults contains all results to described ACLs.
Expand Down Expand Up @@ -974,6 +981,7 @@ func (cl *Client) DescribeACLs(ctx context.Context, b *ACLBuilder) (DescribeACLs
Permission: f.PermissionType,
Described: ds,
Err: kerr.ErrorForCode(r.ErrorCode),
ErrMessage: unptrStr(r.ErrorMessage),
})
}
return rs, nil
Expand Down
27 changes: 16 additions & 11 deletions pkg/kadm/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ func (c *Config) MaybeValue() string {
// ResourceConfig contains the configuration values for a resource (topic,
// broker, broker logger).
type ResourceConfig struct {
Name string // Name is the name of this resource.
Configs []Config // Configs are the configs for this topic.
Err error // Err is any error preventing configs from loading (likely, an unknown topic).
Name string // Name is the name of this resource.
Configs []Config // Configs are the configs for this topic.
Err error // Err is any error preventing configs from loading (likely, an unknown topic).
ErrMessage string // ErrMessage a potential extra message describing any error.
}

// ResourceConfigs contains the configuration values for many resources.
Expand Down Expand Up @@ -124,8 +125,9 @@ func (cl *Client) describeConfigs(
return err
}
rc := ResourceConfig{
Name: r.ResourceName,
Err: kerr.ErrorForCode(r.ErrorCode),
Name: r.ResourceName,
Err: kerr.ErrorForCode(r.ErrorCode),
ErrMessage: unptrStr(r.ErrorMessage),
}
for _, c := range r.Configs {
rcv := Config{
Expand Down Expand Up @@ -183,8 +185,9 @@ type AlterConfig struct {

// AlteredConfigsResponse contains the response for an individual alteration.
type AlterConfigsResponse struct {
Name string // Name is the name of this resource (topic name or broker number).
Err error // Err is non-nil if the config could not be altered.
Name string // Name is the name of this resource (topic name or broker number).
Err error // Err is non-nil if the config could not be altered.
ErrMessage string // ErrMessage a potential extra message describing any error.
}

// AlterConfigsResponses contains responses for many alterations.
Expand Down Expand Up @@ -314,8 +317,9 @@ func (cl *Client) alterConfigs(
resp := kr.(*kmsg.IncrementalAlterConfigsResponse)
for _, r := range resp.Resources {
rs = append(rs, AlterConfigsResponse{ // we are not storing in a map, no existence check possible
Name: r.ResourceName,
Err: kerr.ErrorForCode(r.ErrorCode),
Name: r.ResourceName,
Err: kerr.ErrorForCode(r.ErrorCode),
ErrMessage: unptrStr(r.ErrorMessage),
})
}
return nil
Expand Down Expand Up @@ -403,8 +407,9 @@ func (cl *Client) alterConfigsState(
resp := kr.(*kmsg.AlterConfigsResponse)
for _, r := range resp.Resources {
rs = append(rs, AlterConfigsResponse{ // we are not storing in a map, no existence check possible
Name: r.ResourceName,
Err: kerr.ErrorForCode(r.ErrorCode),
Name: r.ResourceName,
Err: kerr.ErrorForCode(r.ErrorCode),
ErrMessage: unptrStr(r.ErrorMessage),
})
}
return nil
Expand Down
22 changes: 21 additions & 1 deletion pkg/kadm/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/rand"
"crypto/sha256"
"crypto/sha512"
"errors"
"fmt"
"sort"
"strings"
Expand All @@ -17,6 +18,25 @@ import (
"github.com/twmb/franz-go/pkg/kversion"
)

// ErrAndMessage is returned as the error from requests that were successfully
// responded to, but the response indicates failure with a message.
type ErrAndMessage struct {
Err error // Err is the response ErrorCode.
ErrMessage string // Message is the response ErrorMessage.
}

func (e *ErrAndMessage) Error() string {
var ke *kerr.Error
if errors.As(e.Err, &ke) && e.ErrMessage != "" {
return ke.Message + ": " + e.ErrMessage
}
return e.Err.Error()
}

func (e *ErrAndMessage) Unwrap() error {
return e.Err
}

// FindCoordinatorResponse contains information for the coordinator for a group
// or transactional ID.
type FindCoordinatorResponse struct {
Expand Down Expand Up @@ -372,7 +392,7 @@ func (cl *Client) DescribeClientQuotas(ctx context.Context, strict bool, entityC
return nil, err
}
if err := kerr.ErrorForCode(resp.ErrorCode); err != nil {
return nil, err
return nil, &ErrAndMessage{err, unptrStr(resp.ErrorMessage)}
}
var qs DescribedClientQuotas
for _, entry := range resp.Entries {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kadm/partas.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (cl *Client) AlterPartitionAssignments(ctx context.Context, req AlterPartit
return nil, err
}
if err = kerr.ErrorForCode(kresp.ErrorCode); err != nil {
return nil, err
return nil, &ErrAndMessage{err, unptrStr(kresp.ErrorMessage)}
}

a := make(AlterPartitionAssignmentsResponses)
Expand Down Expand Up @@ -187,7 +187,7 @@ func (cl *Client) ListPartitionReassignments(ctx context.Context, s TopicsSet) (
return nil, err
}
if err = kerr.ErrorForCode(kresp.ErrorCode); err != nil {
return nil, err
return nil, &ErrAndMessage{err, unptrStr(kresp.ErrorMessage)}
}

a := make(ListPartitionReassignmentsResponses)
Expand Down
2 changes: 2 additions & 0 deletions pkg/kadm/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type CreateTopicResponse struct {
Topic string // Topic is the topic that was created.
ID TopicID // ID is the topic ID for this topic, if talking to Kafka v2.8+.
Err error // Err is any error preventing this topic from being created.
ErrMessage string // ErrMessage a potential extra message describing any error.
NumPartitions int32 // NumPartitions is the number of partitions in the response, if talking to Kafka v2.4+.
ReplicationFactor int16 // ReplicationFactor is how many replicas every partition has for this topic, if talking to Kafka 2.4+.
Configs map[string]Config // Configs contains the topic configuration (minus config synonyms), if talking to Kafka 2.4+.
Expand Down Expand Up @@ -209,6 +210,7 @@ func (cl *Client) createTopics(ctx context.Context, dry bool, p int32, rf int16,
Topic: t.Topic,
ID: t.TopicID,
Err: kerr.ErrorForCode(t.ErrorCode),
ErrMessage: unptrStr(t.ErrorMessage),
NumPartitions: t.NumPartitions,
ReplicationFactor: t.ReplicationFactor,
Configs: make(map[string]Config),
Expand Down
2 changes: 2 additions & 0 deletions pkg/kadm/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type DescribedProducersPartition struct {
Partition int32 // Partition is the partition whose producer's were described.
ActiveProducers DescribedProducers // ActiveProducers are producer's actively transactionally producing to this partition.
Err error // Err is non-nil if describing this partition failed.
ErrMessage string // ErrMessage a potential extra message describing any error.
}

// DescribedProducersPartitions contains partitions whose producer's were described.
Expand Down Expand Up @@ -274,6 +275,7 @@ func (cl *Client) DescribeProducers(ctx context.Context, s TopicsSet) (Described
Partition: rp.Partition,
ActiveProducers: drs,
Err: kerr.ErrorForCode(rp.ErrorCode),
ErrMessage: unptrStr(rp.ErrorMessage),
}
dps[rp.Partition] = dp // one partition globally, no need to exist-check
for _, rr := range rp.ActiveProducers {
Expand Down

0 comments on commit 36265e0

Please sign in to comment.