diff --git a/pkg/kadm/acls.go b/pkg/kadm/acls.go index c213306c..62676b5b 100644 --- a/pkg/kadm/acls.go +++ b/pkg/kadm/acls.go @@ -663,7 +663,8 @@ type CreateACLsResult struct { Operation ACLOperation // Operation is the operation allowed / denied. Permission kmsg.ACLPermissionType // Permission is whether this is allowed / denied. - Err error // Err is the error for this ACL creation. + Err error // Err is the error for this ACL creation. + ErrMessage string // ErrMessage a potential extra message describing any error. } // CreateACLsResults contains all results to created ACLs. @@ -752,7 +753,8 @@ func (cl *Client) CreateACLs(ctx context.Context, b *ACLBuilder) (CreateACLsResu Operation: c.Operation, Permission: c.PermissionType, - Err: kerr.ErrorForCode(r.ErrorCode), + Err: kerr.ErrorForCode(r.ErrorCode), + ErrMessage: unptrStr(r.ErrorMessage), }) } @@ -770,7 +772,8 @@ type DeletedACL struct { Operation ACLOperation // Operation is this deleted ACL's operation. Permission kmsg.ACLPermissionType // Permission this deleted ACLs permission. - Err error // Err is non-nil if this match has an error. + Err error // Err is non-nil if this match has an error. + ErrMessage string // ErrMessage a potential extra message describing any error. } // DeletedACLs contains ACLs that were deleted from a single delete filter. @@ -794,7 +797,8 @@ type DeleteACLsResult struct { Deleted DeletedACLs // Deleted contains all ACLs this delete filter matched. - Err error // Err is non-nil if this filter has an error. + Err error // Err is non-nil if this filter has an error. + ErrMessage string // ErrMessage a potential extra message describing any error. } // DeleteACLsResults contains all results to deleted ACLs. @@ -841,6 +845,7 @@ func (cl *Client) DeleteACLs(ctx context.Context, b *ACLBuilder) (DeleteACLsResu Operation: m.Operation, Permission: m.PermissionType, Err: kerr.ErrorForCode(m.ErrorCode), + ErrMessage: unptrStr(m.ErrorMessage), }) } rs = append(rs, DeleteACLsResult{ @@ -853,6 +858,7 @@ func (cl *Client) DeleteACLs(ctx context.Context, b *ACLBuilder) (DeleteACLsResu Permission: f.PermissionType, Deleted: ms, Err: kerr.ErrorForCode(r.ErrorCode), + ErrMessage: unptrStr(r.ErrorMessage), }) } return rs, nil @@ -892,7 +898,8 @@ type DescribeACLsResult struct { Described DescribedACLs // Described contains all ACLs this describe filter matched. - Err error // Err is non-nil if this filter has an error. + Err error // Err is non-nil if this filter has an error. + ErrMessage string // ErrMessage a potential extra message describing any error. } // DescribeACLsResults contains all results to described ACLs. @@ -974,6 +981,7 @@ func (cl *Client) DescribeACLs(ctx context.Context, b *ACLBuilder) (DescribeACLs Permission: f.PermissionType, Described: ds, Err: kerr.ErrorForCode(r.ErrorCode), + ErrMessage: unptrStr(r.ErrorMessage), }) } return rs, nil diff --git a/pkg/kadm/configs.go b/pkg/kadm/configs.go index fb5e2752..e6e37245 100644 --- a/pkg/kadm/configs.go +++ b/pkg/kadm/configs.go @@ -42,9 +42,10 @@ func (c *Config) MaybeValue() string { // ResourceConfig contains the configuration values for a resource (topic, // broker, broker logger). type ResourceConfig struct { - Name string // Name is the name of this resource. - Configs []Config // Configs are the configs for this topic. - Err error // Err is any error preventing configs from loading (likely, an unknown topic). + Name string // Name is the name of this resource. + Configs []Config // Configs are the configs for this topic. + Err error // Err is any error preventing configs from loading (likely, an unknown topic). + ErrMessage string // ErrMessage a potential extra message describing any error. } // ResourceConfigs contains the configuration values for many resources. @@ -124,8 +125,9 @@ func (cl *Client) describeConfigs( return err } rc := ResourceConfig{ - Name: r.ResourceName, - Err: kerr.ErrorForCode(r.ErrorCode), + Name: r.ResourceName, + Err: kerr.ErrorForCode(r.ErrorCode), + ErrMessage: unptrStr(r.ErrorMessage), } for _, c := range r.Configs { rcv := Config{ @@ -183,8 +185,9 @@ type AlterConfig struct { // AlteredConfigsResponse contains the response for an individual alteration. type AlterConfigsResponse struct { - Name string // Name is the name of this resource (topic name or broker number). - Err error // Err is non-nil if the config could not be altered. + Name string // Name is the name of this resource (topic name or broker number). + Err error // Err is non-nil if the config could not be altered. + ErrMessage string // ErrMessage a potential extra message describing any error. } // AlterConfigsResponses contains responses for many alterations. @@ -314,8 +317,9 @@ func (cl *Client) alterConfigs( resp := kr.(*kmsg.IncrementalAlterConfigsResponse) for _, r := range resp.Resources { rs = append(rs, AlterConfigsResponse{ // we are not storing in a map, no existence check possible - Name: r.ResourceName, - Err: kerr.ErrorForCode(r.ErrorCode), + Name: r.ResourceName, + Err: kerr.ErrorForCode(r.ErrorCode), + ErrMessage: unptrStr(r.ErrorMessage), }) } return nil @@ -403,8 +407,9 @@ func (cl *Client) alterConfigsState( resp := kr.(*kmsg.AlterConfigsResponse) for _, r := range resp.Resources { rs = append(rs, AlterConfigsResponse{ // we are not storing in a map, no existence check possible - Name: r.ResourceName, - Err: kerr.ErrorForCode(r.ErrorCode), + Name: r.ResourceName, + Err: kerr.ErrorForCode(r.ErrorCode), + ErrMessage: unptrStr(r.ErrorMessage), }) } return nil diff --git a/pkg/kadm/misc.go b/pkg/kadm/misc.go index de4b786c..05add541 100644 --- a/pkg/kadm/misc.go +++ b/pkg/kadm/misc.go @@ -5,6 +5,7 @@ import ( "crypto/rand" "crypto/sha256" "crypto/sha512" + "errors" "fmt" "sort" "strings" @@ -17,6 +18,25 @@ import ( "github.com/twmb/franz-go/pkg/kversion" ) +// ErrAndMessage is returned as the error from requests that were successfully +// responded to, but the response indicates failure with a message. +type ErrAndMessage struct { + Err error // Err is the response ErrorCode. + ErrMessage string // Message is the response ErrorMessage. +} + +func (e *ErrAndMessage) Error() string { + var ke *kerr.Error + if errors.As(e.Err, &ke) && e.ErrMessage != "" { + return ke.Message + ": " + e.ErrMessage + } + return e.Err.Error() +} + +func (e *ErrAndMessage) Unwrap() error { + return e.Err +} + // FindCoordinatorResponse contains information for the coordinator for a group // or transactional ID. type FindCoordinatorResponse struct { @@ -372,7 +392,7 @@ func (cl *Client) DescribeClientQuotas(ctx context.Context, strict bool, entityC return nil, err } if err := kerr.ErrorForCode(resp.ErrorCode); err != nil { - return nil, err + return nil, &ErrAndMessage{err, unptrStr(resp.ErrorMessage)} } var qs DescribedClientQuotas for _, entry := range resp.Entries { diff --git a/pkg/kadm/partas.go b/pkg/kadm/partas.go index 0c6c4a48..84b67790 100644 --- a/pkg/kadm/partas.go +++ b/pkg/kadm/partas.go @@ -108,7 +108,7 @@ func (cl *Client) AlterPartitionAssignments(ctx context.Context, req AlterPartit return nil, err } if err = kerr.ErrorForCode(kresp.ErrorCode); err != nil { - return nil, err + return nil, &ErrAndMessage{err, unptrStr(kresp.ErrorMessage)} } a := make(AlterPartitionAssignmentsResponses) @@ -187,7 +187,7 @@ func (cl *Client) ListPartitionReassignments(ctx context.Context, s TopicsSet) ( return nil, err } if err = kerr.ErrorForCode(kresp.ErrorCode); err != nil { - return nil, err + return nil, &ErrAndMessage{err, unptrStr(kresp.ErrorMessage)} } a := make(ListPartitionReassignmentsResponses) diff --git a/pkg/kadm/topics.go b/pkg/kadm/topics.go index 04d1e269..408506e5 100644 --- a/pkg/kadm/topics.go +++ b/pkg/kadm/topics.go @@ -46,6 +46,7 @@ type CreateTopicResponse struct { Topic string // Topic is the topic that was created. ID TopicID // ID is the topic ID for this topic, if talking to Kafka v2.8+. Err error // Err is any error preventing this topic from being created. + ErrMessage string // ErrMessage a potential extra message describing any error. NumPartitions int32 // NumPartitions is the number of partitions in the response, if talking to Kafka v2.4+. ReplicationFactor int16 // ReplicationFactor is how many replicas every partition has for this topic, if talking to Kafka 2.4+. Configs map[string]Config // Configs contains the topic configuration (minus config synonyms), if talking to Kafka 2.4+. @@ -209,6 +210,7 @@ func (cl *Client) createTopics(ctx context.Context, dry bool, p int32, rf int16, Topic: t.Topic, ID: t.TopicID, Err: kerr.ErrorForCode(t.ErrorCode), + ErrMessage: unptrStr(t.ErrorMessage), NumPartitions: t.NumPartitions, ReplicationFactor: t.ReplicationFactor, Configs: make(map[string]Config), diff --git a/pkg/kadm/txn.go b/pkg/kadm/txn.go index d689207d..2b8ccbe2 100644 --- a/pkg/kadm/txn.go +++ b/pkg/kadm/txn.go @@ -97,6 +97,7 @@ type DescribedProducersPartition struct { Partition int32 // Partition is the partition whose producer's were described. ActiveProducers DescribedProducers // ActiveProducers are producer's actively transactionally producing to this partition. Err error // Err is non-nil if describing this partition failed. + ErrMessage string // ErrMessage a potential extra message describing any error. } // DescribedProducersPartitions contains partitions whose producer's were described. @@ -274,6 +275,7 @@ func (cl *Client) DescribeProducers(ctx context.Context, s TopicsSet) (Described Partition: rp.Partition, ActiveProducers: drs, Err: kerr.ErrorForCode(rp.ErrorCode), + ErrMessage: unptrStr(rp.ErrorMessage), } dps[rp.Partition] = dp // one partition globally, no need to exist-check for _, rr := range rp.ActiveProducers {