Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kadm: add ErrMessage to all types it was missing from #759

Merged
merged 1 commit into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions pkg/kadm/acls.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,8 @@ type CreateACLsResult struct {
Operation ACLOperation // Operation is the operation allowed / denied.
Permission kmsg.ACLPermissionType // Permission is whether this is allowed / denied.

Err error // Err is the error for this ACL creation.
Err error // Err is the error for this ACL creation.
ErrMessage string // ErrMessage a potential extra message describing any error.
}

// CreateACLsResults contains all results to created ACLs.
Expand Down Expand Up @@ -752,7 +753,8 @@ func (cl *Client) CreateACLs(ctx context.Context, b *ACLBuilder) (CreateACLsResu
Operation: c.Operation,
Permission: c.PermissionType,

Err: kerr.ErrorForCode(r.ErrorCode),
Err: kerr.ErrorForCode(r.ErrorCode),
ErrMessage: unptrStr(r.ErrorMessage),
})
}

Expand All @@ -770,7 +772,8 @@ type DeletedACL struct {
Operation ACLOperation // Operation is this deleted ACL's operation.
Permission kmsg.ACLPermissionType // Permission this deleted ACLs permission.

Err error // Err is non-nil if this match has an error.
Err error // Err is non-nil if this match has an error.
ErrMessage string // ErrMessage a potential extra message describing any error.
}

// DeletedACLs contains ACLs that were deleted from a single delete filter.
Expand All @@ -794,7 +797,8 @@ type DeleteACLsResult struct {

Deleted DeletedACLs // Deleted contains all ACLs this delete filter matched.

Err error // Err is non-nil if this filter has an error.
Err error // Err is non-nil if this filter has an error.
ErrMessage string // ErrMessage a potential extra message describing any error.
}

// DeleteACLsResults contains all results to deleted ACLs.
Expand Down Expand Up @@ -841,6 +845,7 @@ func (cl *Client) DeleteACLs(ctx context.Context, b *ACLBuilder) (DeleteACLsResu
Operation: m.Operation,
Permission: m.PermissionType,
Err: kerr.ErrorForCode(m.ErrorCode),
ErrMessage: unptrStr(m.ErrorMessage),
})
}
rs = append(rs, DeleteACLsResult{
Expand All @@ -853,6 +858,7 @@ func (cl *Client) DeleteACLs(ctx context.Context, b *ACLBuilder) (DeleteACLsResu
Permission: f.PermissionType,
Deleted: ms,
Err: kerr.ErrorForCode(r.ErrorCode),
ErrMessage: unptrStr(r.ErrorMessage),
})
}
return rs, nil
Expand Down Expand Up @@ -892,7 +898,8 @@ type DescribeACLsResult struct {

Described DescribedACLs // Described contains all ACLs this describe filter matched.

Err error // Err is non-nil if this filter has an error.
Err error // Err is non-nil if this filter has an error.
ErrMessage string // ErrMessage a potential extra message describing any error.
}

// DescribeACLsResults contains all results to described ACLs.
Expand Down Expand Up @@ -974,6 +981,7 @@ func (cl *Client) DescribeACLs(ctx context.Context, b *ACLBuilder) (DescribeACLs
Permission: f.PermissionType,
Described: ds,
Err: kerr.ErrorForCode(r.ErrorCode),
ErrMessage: unptrStr(r.ErrorMessage),
})
}
return rs, nil
Expand Down
27 changes: 16 additions & 11 deletions pkg/kadm/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ func (c *Config) MaybeValue() string {
// ResourceConfig contains the configuration values for a resource (topic,
// broker, broker logger).
type ResourceConfig struct {
Name string // Name is the name of this resource.
Configs []Config // Configs are the configs for this topic.
Err error // Err is any error preventing configs from loading (likely, an unknown topic).
Name string // Name is the name of this resource.
Configs []Config // Configs are the configs for this topic.
Err error // Err is any error preventing configs from loading (likely, an unknown topic).
ErrMessage string // ErrMessage a potential extra message describing any error.
}

// ResourceConfigs contains the configuration values for many resources.
Expand Down Expand Up @@ -124,8 +125,9 @@ func (cl *Client) describeConfigs(
return err
}
rc := ResourceConfig{
Name: r.ResourceName,
Err: kerr.ErrorForCode(r.ErrorCode),
Name: r.ResourceName,
Err: kerr.ErrorForCode(r.ErrorCode),
ErrMessage: unptrStr(r.ErrorMessage),
}
for _, c := range r.Configs {
rcv := Config{
Expand Down Expand Up @@ -183,8 +185,9 @@ type AlterConfig struct {

// AlteredConfigsResponse contains the response for an individual alteration.
type AlterConfigsResponse struct {
Name string // Name is the name of this resource (topic name or broker number).
Err error // Err is non-nil if the config could not be altered.
Name string // Name is the name of this resource (topic name or broker number).
Err error // Err is non-nil if the config could not be altered.
ErrMessage string // ErrMessage a potential extra message describing any error.
}

// AlterConfigsResponses contains responses for many alterations.
Expand Down Expand Up @@ -314,8 +317,9 @@ func (cl *Client) alterConfigs(
resp := kr.(*kmsg.IncrementalAlterConfigsResponse)
for _, r := range resp.Resources {
rs = append(rs, AlterConfigsResponse{ // we are not storing in a map, no existence check possible
Name: r.ResourceName,
Err: kerr.ErrorForCode(r.ErrorCode),
Name: r.ResourceName,
Err: kerr.ErrorForCode(r.ErrorCode),
ErrMessage: unptrStr(r.ErrorMessage),
})
}
return nil
Expand Down Expand Up @@ -403,8 +407,9 @@ func (cl *Client) alterConfigsState(
resp := kr.(*kmsg.AlterConfigsResponse)
for _, r := range resp.Resources {
rs = append(rs, AlterConfigsResponse{ // we are not storing in a map, no existence check possible
Name: r.ResourceName,
Err: kerr.ErrorForCode(r.ErrorCode),
Name: r.ResourceName,
Err: kerr.ErrorForCode(r.ErrorCode),
ErrMessage: unptrStr(r.ErrorMessage),
})
}
return nil
Expand Down
22 changes: 21 additions & 1 deletion pkg/kadm/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/rand"
"crypto/sha256"
"crypto/sha512"
"errors"
"fmt"
"sort"
"strings"
Expand All @@ -17,6 +18,25 @@ import (
"github.com/twmb/franz-go/pkg/kversion"
)

// ErrAndMessage is returned as the error from requests that were successfully
// responded to, but the response indicates failure with a message.
type ErrAndMessage struct {
Err error // Err is the response ErrorCode.
ErrMessage string // Message is the response ErrorMessage.
}

func (e *ErrAndMessage) Error() string {
var ke *kerr.Error
if errors.As(e.Err, &ke) && e.ErrMessage != "" {
return ke.Message + ": " + e.ErrMessage
}
return e.Err.Error()
}

func (e *ErrAndMessage) Unwrap() error {
return e.Err
}

// FindCoordinatorResponse contains information for the coordinator for a group
// or transactional ID.
type FindCoordinatorResponse struct {
Expand Down Expand Up @@ -372,7 +392,7 @@ func (cl *Client) DescribeClientQuotas(ctx context.Context, strict bool, entityC
return nil, err
}
if err := kerr.ErrorForCode(resp.ErrorCode); err != nil {
return nil, err
return nil, &ErrAndMessage{err, unptrStr(resp.ErrorMessage)}
}
var qs DescribedClientQuotas
for _, entry := range resp.Entries {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kadm/partas.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (cl *Client) AlterPartitionAssignments(ctx context.Context, req AlterPartit
return nil, err
}
if err = kerr.ErrorForCode(kresp.ErrorCode); err != nil {
return nil, err
return nil, &ErrAndMessage{err, unptrStr(kresp.ErrorMessage)}
}

a := make(AlterPartitionAssignmentsResponses)
Expand Down Expand Up @@ -187,7 +187,7 @@ func (cl *Client) ListPartitionReassignments(ctx context.Context, s TopicsSet) (
return nil, err
}
if err = kerr.ErrorForCode(kresp.ErrorCode); err != nil {
return nil, err
return nil, &ErrAndMessage{err, unptrStr(kresp.ErrorMessage)}
}

a := make(ListPartitionReassignmentsResponses)
Expand Down
2 changes: 2 additions & 0 deletions pkg/kadm/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type CreateTopicResponse struct {
Topic string // Topic is the topic that was created.
ID TopicID // ID is the topic ID for this topic, if talking to Kafka v2.8+.
Err error // Err is any error preventing this topic from being created.
ErrMessage string // ErrMessage a potential extra message describing any error.
NumPartitions int32 // NumPartitions is the number of partitions in the response, if talking to Kafka v2.4+.
ReplicationFactor int16 // ReplicationFactor is how many replicas every partition has for this topic, if talking to Kafka 2.4+.
Configs map[string]Config // Configs contains the topic configuration (minus config synonyms), if talking to Kafka 2.4+.
Expand Down Expand Up @@ -209,6 +210,7 @@ func (cl *Client) createTopics(ctx context.Context, dry bool, p int32, rf int16,
Topic: t.Topic,
ID: t.TopicID,
Err: kerr.ErrorForCode(t.ErrorCode),
ErrMessage: unptrStr(t.ErrorMessage),
NumPartitions: t.NumPartitions,
ReplicationFactor: t.ReplicationFactor,
Configs: make(map[string]Config),
Expand Down
2 changes: 2 additions & 0 deletions pkg/kadm/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type DescribedProducersPartition struct {
Partition int32 // Partition is the partition whose producer's were described.
ActiveProducers DescribedProducers // ActiveProducers are producer's actively transactionally producing to this partition.
Err error // Err is non-nil if describing this partition failed.
ErrMessage string // ErrMessage a potential extra message describing any error.
}

// DescribedProducersPartitions contains partitions whose producer's were described.
Expand Down Expand Up @@ -274,6 +275,7 @@ func (cl *Client) DescribeProducers(ctx context.Context, s TopicsSet) (Described
Partition: rp.Partition,
ActiveProducers: drs,
Err: kerr.ErrorForCode(rp.ErrorCode),
ErrMessage: unptrStr(rp.ErrorMessage),
}
dps[rp.Partition] = dp // one partition globally, no need to exist-check
for _, rr := range rp.ActiveProducers {
Expand Down