From 0d24bb59a8553008f6179b669d97910f9ded5163 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 14 Oct 2024 09:35:06 -0600 Subject: [PATCH] generate / kmsg: update GroupMetadata{Key,Value} Not much changed here. Closes #804. --- generate/definitions/misc | 6 +- pkg/kmsg/generated.go | 201 ++++++++++++++++++++++++++++++++------ 2 files changed, 174 insertions(+), 33 deletions(-) diff --git a/generate/definitions/misc b/generate/definitions/misc index 8166cee2..7c3c5cd8 100644 --- a/generate/definitions/misc +++ b/generate/definitions/misc @@ -272,7 +272,7 @@ GroupMetadataKey => not top level, with version field // // KAFKA-7862 commit 0f995ba6be, proposed in KIP-345 and included in 2.3.0 // released version 3. -GroupMetadataValue => not top level, with version field +GroupMetadataValue => not top level, with version field, flexible v4+ // Version is the version of this value. Version: int16 // ProtocolType is the type of protocol being used for the group @@ -287,7 +287,7 @@ GroupMetadataValue => not top level, with version field Leader: nullable-string // CurrentStateTimestamp is the timestamp for this state of the group // (stable, etc.). - CurrentStateTimestamp: int64 // v2+ + CurrentStateTimestamp: int64(-1) // v2+ // Members are the group members. Members: [=>] // MemberID is a group member. @@ -299,7 +299,7 @@ GroupMetadataValue => not top level, with version field // ClientHost is the hostname of this group member. ClientHost: string // RebalanceTimeoutMillis is the rebalance timeout of this group member. - RebalanceTimeoutMillis: int32 // v1+ + RebalanceTimeoutMillis: int32(-1) // v1+ // SessionTimeoutMillis is the session timeout of this group member. SessionTimeoutMillis: int32 // Subscription is the subscription of this group member. diff --git a/pkg/kmsg/generated.go b/pkg/kmsg/generated.go index 16350a4a..584f6f37 100644 --- a/pkg/kmsg/generated.go +++ b/pkg/kmsg/generated.go @@ -922,6 +922,8 @@ type GroupMetadataValueMember struct { ClientHost string // RebalanceTimeoutMillis is the rebalance timeout of this group member. + // + // This field has a default of -1. RebalanceTimeoutMillis int32 // v1+ // SessionTimeoutMillis is the session timeout of this group member. @@ -932,11 +934,15 @@ type GroupMetadataValueMember struct { // Assignment is what the leader assigned this group member. Assignment []byte + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags // v4+ } // Default sets any default fields. Calling this allows for future compatibility // if new fields are added to GroupMetadataValueMember. func (v *GroupMetadataValueMember) Default() { + v.RebalanceTimeoutMillis = -1 } // NewGroupMetadataValueMember returns a default GroupMetadataValueMember @@ -980,22 +986,33 @@ type GroupMetadataValue struct { // CurrentStateTimestamp is the timestamp for this state of the group // (stable, etc.). + // + // This field has a default of -1. CurrentStateTimestamp int64 // v2+ // Members are the group members. Members []GroupMetadataValueMember + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags // v4+ } func (v *GroupMetadataValue) AppendTo(dst []byte) []byte { version := v.Version _ = version + isFlexible := version >= 4 + _ = isFlexible { v := v.Version dst = kbin.AppendInt16(dst, v) } { v := v.ProtocolType - dst = kbin.AppendString(dst, v) + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } } { v := v.Generation @@ -1003,11 +1020,19 @@ func (v *GroupMetadataValue) AppendTo(dst []byte) []byte { } { v := v.Protocol - dst = kbin.AppendNullableString(dst, v) + if isFlexible { + dst = kbin.AppendCompactNullableString(dst, v) + } else { + dst = kbin.AppendNullableString(dst, v) + } } { v := v.Leader - dst = kbin.AppendNullableString(dst, v) + if isFlexible { + dst = kbin.AppendCompactNullableString(dst, v) + } else { + dst = kbin.AppendNullableString(dst, v) + } } if version >= 2 { v := v.CurrentStateTimestamp @@ -1015,24 +1040,44 @@ func (v *GroupMetadataValue) AppendTo(dst []byte) []byte { } { v := v.Members - dst = kbin.AppendArrayLen(dst, len(v)) + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } for i := range v { v := &v[i] { v := v.MemberID - dst = kbin.AppendString(dst, v) + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } } if version >= 3 { v := v.InstanceID - dst = kbin.AppendNullableString(dst, v) + if isFlexible { + dst = kbin.AppendCompactNullableString(dst, v) + } else { + dst = kbin.AppendNullableString(dst, v) + } } { v := v.ClientID - dst = kbin.AppendString(dst, v) + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } } { v := v.ClientHost - dst = kbin.AppendString(dst, v) + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } } if version >= 1 { v := v.RebalanceTimeoutMillis @@ -1044,14 +1089,30 @@ func (v *GroupMetadataValue) AppendTo(dst []byte) []byte { } { v := v.Subscription - dst = kbin.AppendBytes(dst, v) + if isFlexible { + dst = kbin.AppendCompactBytes(dst, v) + } else { + dst = kbin.AppendBytes(dst, v) + } } { v := v.Assignment - dst = kbin.AppendBytes(dst, v) + if isFlexible { + dst = kbin.AppendCompactBytes(dst, v) + } else { + dst = kbin.AppendBytes(dst, v) + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) } } } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } return dst } @@ -1069,13 +1130,23 @@ func (v *GroupMetadataValue) readFrom(src []byte, unsafe bool) error { v.Version = b.Int16() version := v.Version _ = version + isFlexible := version >= 4 + _ = isFlexible s := v { var v string if unsafe { - v = b.UnsafeString() + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } } else { - v = b.String() + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } } s.ProtocolType = v } @@ -1085,19 +1156,35 @@ func (v *GroupMetadataValue) readFrom(src []byte, unsafe bool) error { } { var v *string - if unsafe { - v = b.UnsafeNullableString() + if isFlexible { + if unsafe { + v = b.UnsafeCompactNullableString() + } else { + v = b.CompactNullableString() + } } else { - v = b.NullableString() + if unsafe { + v = b.UnsafeNullableString() + } else { + v = b.NullableString() + } } s.Protocol = v } { var v *string - if unsafe { - v = b.UnsafeNullableString() + if isFlexible { + if unsafe { + v = b.UnsafeCompactNullableString() + } else { + v = b.CompactNullableString() + } } else { - v = b.NullableString() + if unsafe { + v = b.UnsafeNullableString() + } else { + v = b.NullableString() + } } s.Leader = v } @@ -1109,7 +1196,11 @@ func (v *GroupMetadataValue) readFrom(src []byte, unsafe bool) error { v := s.Members a := v var l int32 - l = b.ArrayLen() + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } if !b.Ok() { return b.Complete() } @@ -1124,36 +1215,68 @@ func (v *GroupMetadataValue) readFrom(src []byte, unsafe bool) error { { var v string if unsafe { - v = b.UnsafeString() + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } } else { - v = b.String() + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } } s.MemberID = v } if version >= 3 { var v *string - if unsafe { - v = b.UnsafeNullableString() + if isFlexible { + if unsafe { + v = b.UnsafeCompactNullableString() + } else { + v = b.CompactNullableString() + } } else { - v = b.NullableString() + if unsafe { + v = b.UnsafeNullableString() + } else { + v = b.NullableString() + } } s.InstanceID = v } { var v string if unsafe { - v = b.UnsafeString() + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } } else { - v = b.String() + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } } s.ClientID = v } { var v string if unsafe { - v = b.UnsafeString() + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } } else { - v = b.String() + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } } s.ClientHost = v } @@ -1166,23 +1289,41 @@ func (v *GroupMetadataValue) readFrom(src []byte, unsafe bool) error { s.SessionTimeoutMillis = v } { - v := b.Bytes() + var v []byte + if isFlexible { + v = b.CompactBytes() + } else { + v = b.Bytes() + } s.Subscription = v } { - v := b.Bytes() + var v []byte + if isFlexible { + v = b.CompactBytes() + } else { + v = b.Bytes() + } s.Assignment = v } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } } v = a s.Members = v } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } return b.Complete() } +func (v *GroupMetadataValue) IsFlexible() bool { return v.Version >= 4 } // Default sets any default fields. Calling this allows for future compatibility // if new fields are added to GroupMetadataValue. func (v *GroupMetadataValue) Default() { + v.CurrentStateTimestamp = -1 } // NewGroupMetadataValue returns a default GroupMetadataValue