diff --git a/cluster/gossip.pb.go b/cluster/gossip.pb.go index 314d37b7..5f5b1aa9 100644 --- a/cluster/gossip.pb.go +++ b/cluster/gossip.pb.go @@ -26,8 +26,8 @@ type GossipRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - MemberId string `protobuf:"bytes,2,opt,name=member_id,json=memberId,proto3" json:"member_id,omitempty"` - State *GossipState `protobuf:"bytes,1,opt,name=state,proto3" json:"state,omitempty"` + FromMemberId string `protobuf:"bytes,2,opt,name=from_member_id,json=fromMemberId,proto3" json:"from_member_id,omitempty"` + State *GossipState `protobuf:"bytes,1,opt,name=state,proto3" json:"state,omitempty"` } func (x *GossipRequest) Reset() { @@ -62,9 +62,9 @@ func (*GossipRequest) Descriptor() ([]byte, []int) { return file_gossip_proto_rawDescGZIP(), []int{0} } -func (x *GossipRequest) GetMemberId() string { +func (x *GossipRequest) GetFromMemberId() string { if x != nil { - return x.MemberId + return x.FromMemberId } return "" } @@ -131,7 +131,7 @@ type GossipState struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Members map[string]*GossipState_GossipMemberState `protobuf:"bytes,1,rep,name=members,proto3" json:"members,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Members map[string]*GossipMemberState `protobuf:"bytes,1,rep,name=members,proto3" json:"members,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` } func (x *GossipState) Reset() { @@ -166,32 +166,23 @@ func (*GossipState) Descriptor() ([]byte, []int) { return file_gossip_proto_rawDescGZIP(), []int{2} } -func (x *GossipState) GetMembers() map[string]*GossipState_GossipMemberState { +func (x *GossipState) GetMembers() map[string]*GossipMemberState { if x != nil { return x.Members } return nil } -// a known key might be heartbeat. if we locally tag each entry with a local timestamp -// this means that we can measure if we have not received a new heartbeat from one member in some time -// even if we don't know the exact time the heartbeat was issued, due to clock differences. -// we still know when _we_ as in this node, got this data. -// and we can measure time from then til now. -// -// if we got a hear-beat from another node, and X seconds pass, we can assume it to be dead -type GossipKeyValue struct { +type GossipMemberState struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - SequenceNumber int64 `protobuf:"varint,2,opt,name=sequence_number,json=sequenceNumber,proto3" json:"sequence_number,omitempty"` //version is local to the owner member - Value *anypb.Any `protobuf:"bytes,4,opt,name=value,proto3" json:"value,omitempty"` //value is any format - LocalTimestampUnixMilliseconds int64 `protobuf:"varint,5,opt,name=local_timestamp_unix_milliseconds,json=localTimestampUnixMilliseconds,proto3" json:"local_timestamp_unix_milliseconds,omitempty"` + Values map[string]*GossipKeyValue `protobuf:"bytes,1,rep,name=values,proto3" json:"values,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` } -func (x *GossipKeyValue) Reset() { - *x = GossipKeyValue{} +func (x *GossipMemberState) Reset() { + *x = GossipMemberState{} if protoimpl.UnsafeEnabled { mi := &file_gossip_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -199,13 +190,13 @@ func (x *GossipKeyValue) Reset() { } } -func (x *GossipKeyValue) String() string { +func (x *GossipMemberState) String() string { return protoimpl.X.MessageStringOf(x) } -func (*GossipKeyValue) ProtoMessage() {} +func (*GossipMemberState) ProtoMessage() {} -func (x *GossipKeyValue) ProtoReflect() protoreflect.Message { +func (x *GossipMemberState) ProtoReflect() protoreflect.Message { mi := &file_gossip_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -217,44 +208,37 @@ func (x *GossipKeyValue) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use GossipKeyValue.ProtoReflect.Descriptor instead. -func (*GossipKeyValue) Descriptor() ([]byte, []int) { +// Deprecated: Use GossipMemberState.ProtoReflect.Descriptor instead. +func (*GossipMemberState) Descriptor() ([]byte, []int) { return file_gossip_proto_rawDescGZIP(), []int{3} } -func (x *GossipKeyValue) GetSequenceNumber() int64 { +func (x *GossipMemberState) GetValues() map[string]*GossipKeyValue { if x != nil { - return x.SequenceNumber - } - return 0 -} - -func (x *GossipKeyValue) GetValue() *anypb.Any { - if x != nil { - return x.Value + return x.Values } return nil } -func (x *GossipKeyValue) GetLocalTimestampUnixMilliseconds() int64 { - if x != nil { - return x.LocalTimestampUnixMilliseconds - } - return 0 -} - -// represents a value that can be sent in form of a delta change -// instead of a full value replace -type GossipDeltaValue struct { +// a known key might be heartbeat. if we locally tag each entry with a local timestamp +// this means that we can measure if we have not received a new heartbeat from one member in some time +// even if we don't know the exact time the heartbeat was issued, due to clock differences. +// we still know when _we_ as in this node, got this data. +// and we can measure time from then til now. +// +// if we got a hear-beat from another node, and X seconds pass, we can assume it to be dead +type GossipKeyValue struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Entries []*GossipDeltaValue_GossipDeltaEntry `protobuf:"bytes,1,rep,name=entries,proto3" json:"entries,omitempty"` + SequenceNumber int64 `protobuf:"varint,2,opt,name=sequence_number,json=sequenceNumber,proto3" json:"sequence_number,omitempty"` //version is local to the owner member + Value *anypb.Any `protobuf:"bytes,4,opt,name=value,proto3" json:"value,omitempty"` //value is any format + LocalTimestampUnixMilliseconds int64 `protobuf:"varint,5,opt,name=local_timestamp_unix_milliseconds,json=localTimestampUnixMilliseconds,proto3" json:"local_timestamp_unix_milliseconds,omitempty"` } -func (x *GossipDeltaValue) Reset() { - *x = GossipDeltaValue{} +func (x *GossipKeyValue) Reset() { + *x = GossipKeyValue{} if protoimpl.UnsafeEnabled { mi := &file_gossip_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -262,13 +246,13 @@ func (x *GossipDeltaValue) Reset() { } } -func (x *GossipDeltaValue) String() string { +func (x *GossipKeyValue) String() string { return protoimpl.X.MessageStringOf(x) } -func (*GossipDeltaValue) ProtoMessage() {} +func (*GossipKeyValue) ProtoMessage() {} -func (x *GossipDeltaValue) ProtoReflect() protoreflect.Message { +func (x *GossipKeyValue) ProtoReflect() protoreflect.Message { mi := &file_gossip_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -280,186 +264,84 @@ func (x *GossipDeltaValue) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use GossipDeltaValue.ProtoReflect.Descriptor instead. -func (*GossipDeltaValue) Descriptor() ([]byte, []int) { +// Deprecated: Use GossipKeyValue.ProtoReflect.Descriptor instead. +func (*GossipKeyValue) Descriptor() ([]byte, []int) { return file_gossip_proto_rawDescGZIP(), []int{4} } -func (x *GossipDeltaValue) GetEntries() []*GossipDeltaValue_GossipDeltaEntry { +func (x *GossipKeyValue) GetSequenceNumber() int64 { if x != nil { - return x.Entries - } - return nil -} - -type GossipState_GossipMemberState struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Values map[string]*GossipKeyValue `protobuf:"bytes,1,rep,name=values,proto3" json:"values,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` -} - -func (x *GossipState_GossipMemberState) Reset() { - *x = GossipState_GossipMemberState{} - if protoimpl.UnsafeEnabled { - mi := &file_gossip_proto_msgTypes[5] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *GossipState_GossipMemberState) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*GossipState_GossipMemberState) ProtoMessage() {} - -func (x *GossipState_GossipMemberState) ProtoReflect() protoreflect.Message { - mi := &file_gossip_proto_msgTypes[5] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms + return x.SequenceNumber } - return mi.MessageOf(x) -} - -// Deprecated: Use GossipState_GossipMemberState.ProtoReflect.Descriptor instead. -func (*GossipState_GossipMemberState) Descriptor() ([]byte, []int) { - return file_gossip_proto_rawDescGZIP(), []int{2, 0} + return 0 } -func (x *GossipState_GossipMemberState) GetValues() map[string]*GossipKeyValue { +func (x *GossipKeyValue) GetValue() *anypb.Any { if x != nil { - return x.Values + return x.Value } return nil } -// these are the entries of a delta value -// this can be seen as an array with data, where each element in the array is tagged with a sequence number -type GossipDeltaValue_GossipDeltaEntry struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - SequenceNumber int64 `protobuf:"varint,1,opt,name=sequence_number,json=sequenceNumber,proto3" json:"sequence_number,omitempty"` - Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` -} - -func (x *GossipDeltaValue_GossipDeltaEntry) Reset() { - *x = GossipDeltaValue_GossipDeltaEntry{} - if protoimpl.UnsafeEnabled { - mi := &file_gossip_proto_msgTypes[8] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *GossipDeltaValue_GossipDeltaEntry) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*GossipDeltaValue_GossipDeltaEntry) ProtoMessage() {} - -func (x *GossipDeltaValue_GossipDeltaEntry) ProtoReflect() protoreflect.Message { - mi := &file_gossip_proto_msgTypes[8] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use GossipDeltaValue_GossipDeltaEntry.ProtoReflect.Descriptor instead. -func (*GossipDeltaValue_GossipDeltaEntry) Descriptor() ([]byte, []int) { - return file_gossip_proto_rawDescGZIP(), []int{4, 0} -} - -func (x *GossipDeltaValue_GossipDeltaEntry) GetSequenceNumber() int64 { +func (x *GossipKeyValue) GetLocalTimestampUnixMilliseconds() int64 { if x != nil { - return x.SequenceNumber + return x.LocalTimestampUnixMilliseconds } return 0 } -func (x *GossipDeltaValue_GossipDeltaEntry) GetData() []byte { - if x != nil { - return x.Data - } - return nil -} - var File_gossip_proto protoreflect.FileDescriptor var file_gossip_proto_rawDesc = []byte{ 0x0a, 0x0c, 0x67, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x1a, 0x19, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x61, 0x6e, 0x79, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x22, 0x58, 0x0a, 0x0d, 0x47, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x5f, 0x69, 0x64, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x49, 0x64, - 0x12, 0x2a, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x14, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x47, 0x6f, 0x73, 0x73, 0x69, 0x70, - 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x22, 0x3c, 0x0a, 0x0e, - 0x47, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2a, - 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, - 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x47, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x53, 0x74, - 0x61, 0x74, 0x65, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x22, 0xe4, 0x02, 0x0a, 0x0b, 0x47, - 0x6f, 0x73, 0x73, 0x69, 0x70, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x3b, 0x0a, 0x07, 0x6d, 0x65, - 0x6d, 0x62, 0x65, 0x72, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x63, 0x6c, - 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x47, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x53, 0x74, 0x61, 0x74, - 0x65, 0x2e, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, - 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x1a, 0xb3, 0x01, 0x0a, 0x11, 0x47, 0x6f, 0x73, 0x73, - 0x69, 0x70, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x4a, 0x0a, - 0x06, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x32, 0x2e, - 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x47, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x53, 0x74, - 0x61, 0x74, 0x65, 0x2e, 0x47, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, - 0x53, 0x74, 0x61, 0x74, 0x65, 0x2e, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, - 0x79, 0x52, 0x06, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x1a, 0x52, 0x0a, 0x0b, 0x56, 0x61, 0x6c, - 0x75, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x2d, 0x0a, 0x05, 0x76, 0x61, - 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x63, 0x6c, 0x75, 0x73, - 0x74, 0x65, 0x72, 0x2e, 0x47, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x4b, 0x65, 0x79, 0x56, 0x61, 0x6c, - 0x75, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x1a, 0x62, 0x0a, - 0x0c, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, + 0x74, 0x6f, 0x22, 0x61, 0x0a, 0x0d, 0x47, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x24, 0x0a, 0x0e, 0x66, 0x72, 0x6f, 0x6d, 0x5f, 0x6d, 0x65, 0x6d, 0x62, + 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x66, 0x72, 0x6f, + 0x6d, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x49, 0x64, 0x12, 0x2a, 0x0a, 0x05, 0x73, 0x74, 0x61, + 0x74, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, + 0x65, 0x72, 0x2e, 0x47, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, + 0x73, 0x74, 0x61, 0x74, 0x65, 0x22, 0x3c, 0x0a, 0x0e, 0x47, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2a, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, + 0x2e, 0x47, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x73, 0x74, + 0x61, 0x74, 0x65, 0x22, 0xa2, 0x01, 0x0a, 0x0b, 0x47, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x53, 0x74, + 0x61, 0x74, 0x65, 0x12, 0x3b, 0x0a, 0x07, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x18, 0x01, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x47, + 0x6f, 0x73, 0x73, 0x69, 0x70, 0x53, 0x74, 0x61, 0x74, 0x65, 0x2e, 0x4d, 0x65, 0x6d, 0x62, 0x65, + 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, + 0x1a, 0x56, 0x0a, 0x0c, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, + 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, + 0x65, 0x79, 0x12, 0x30, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x1a, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x47, 0x6f, 0x73, 0x73, + 0x69, 0x70, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xa7, 0x01, 0x0a, 0x11, 0x47, 0x6f, 0x73, + 0x73, 0x69, 0x70, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x3e, + 0x0a, 0x06, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x26, + 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x47, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x4d, + 0x65, 0x6d, 0x62, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x65, 0x2e, 0x56, 0x61, 0x6c, 0x75, 0x65, + 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x06, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x1a, 0x52, + 0x0a, 0x0b, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, - 0x3c, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x26, - 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x47, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x53, - 0x74, 0x61, 0x74, 0x65, 0x2e, 0x47, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x4d, 0x65, 0x6d, 0x62, 0x65, - 0x72, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, - 0x01, 0x22, 0xb0, 0x01, 0x0a, 0x0e, 0x47, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x4b, 0x65, 0x79, 0x56, - 0x61, 0x6c, 0x75, 0x65, 0x12, 0x27, 0x0a, 0x0f, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, - 0x5f, 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0e, 0x73, - 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x12, 0x2a, 0x0a, - 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, - 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, - 0x6e, 0x79, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x49, 0x0a, 0x21, 0x6c, 0x6f, 0x63, - 0x61, 0x6c, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x5f, 0x75, 0x6e, 0x69, - 0x78, 0x5f, 0x6d, 0x69, 0x6c, 0x6c, 0x69, 0x73, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x18, 0x05, - 0x20, 0x01, 0x28, 0x03, 0x52, 0x1e, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x54, 0x69, 0x6d, 0x65, 0x73, - 0x74, 0x61, 0x6d, 0x70, 0x55, 0x6e, 0x69, 0x78, 0x4d, 0x69, 0x6c, 0x6c, 0x69, 0x73, 0x65, 0x63, - 0x6f, 0x6e, 0x64, 0x73, 0x22, 0xa9, 0x01, 0x0a, 0x10, 0x47, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x44, - 0x65, 0x6c, 0x74, 0x61, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x44, 0x0a, 0x07, 0x65, 0x6e, 0x74, - 0x72, 0x69, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2a, 0x2e, 0x63, 0x6c, 0x75, - 0x73, 0x74, 0x65, 0x72, 0x2e, 0x47, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x44, 0x65, 0x6c, 0x74, 0x61, - 0x56, 0x61, 0x6c, 0x75, 0x65, 0x2e, 0x47, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x44, 0x65, 0x6c, 0x74, - 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x1a, - 0x4f, 0x0a, 0x10, 0x47, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x45, 0x6e, - 0x74, 0x72, 0x79, 0x12, 0x27, 0x0a, 0x0f, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x5f, - 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0e, 0x73, 0x65, - 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, - 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, - 0x42, 0x2c, 0x5a, 0x2a, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, - 0x61, 0x73, 0x79, 0x6e, 0x6b, 0x72, 0x6f, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x61, 0x63, - 0x74, 0x6f, 0x72, 0x2d, 0x67, 0x6f, 0x2f, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x2d, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, + 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x47, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x4b, + 0x65, 0x79, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, + 0x38, 0x01, 0x22, 0xb0, 0x01, 0x0a, 0x0e, 0x47, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x4b, 0x65, 0x79, + 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x27, 0x0a, 0x0f, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, + 0x65, 0x5f, 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0e, + 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x12, 0x2a, + 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, + 0x41, 0x6e, 0x79, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x49, 0x0a, 0x21, 0x6c, 0x6f, + 0x63, 0x61, 0x6c, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x5f, 0x75, 0x6e, + 0x69, 0x78, 0x5f, 0x6d, 0x69, 0x6c, 0x6c, 0x69, 0x73, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x18, + 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x1e, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x54, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x55, 0x6e, 0x69, 0x78, 0x4d, 0x69, 0x6c, 0x6c, 0x69, 0x73, 0x65, + 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x42, 0x2c, 0x5a, 0x2a, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x73, 0x79, 0x6e, 0x6b, 0x72, 0x6f, 0x6e, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x2d, 0x67, 0x6f, 0x2f, 0x63, 0x6c, 0x75, 0x73, + 0x74, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -474,33 +356,30 @@ func file_gossip_proto_rawDescGZIP() []byte { return file_gossip_proto_rawDescData } -var file_gossip_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_gossip_proto_msgTypes = make([]protoimpl.MessageInfo, 7) var file_gossip_proto_goTypes = []interface{}{ - (*GossipRequest)(nil), // 0: cluster.GossipRequest - (*GossipResponse)(nil), // 1: cluster.GossipResponse - (*GossipState)(nil), // 2: cluster.GossipState - (*GossipKeyValue)(nil), // 3: cluster.GossipKeyValue - (*GossipDeltaValue)(nil), // 4: cluster.GossipDeltaValue - (*GossipState_GossipMemberState)(nil), // 5: cluster.GossipState.GossipMemberState - nil, // 6: cluster.GossipState.MembersEntry - nil, // 7: cluster.GossipState.GossipMemberState.ValuesEntry - (*GossipDeltaValue_GossipDeltaEntry)(nil), // 8: cluster.GossipDeltaValue.GossipDeltaEntry - (*anypb.Any)(nil), // 9: google.protobuf.Any + (*GossipRequest)(nil), // 0: cluster.GossipRequest + (*GossipResponse)(nil), // 1: cluster.GossipResponse + (*GossipState)(nil), // 2: cluster.GossipState + (*GossipMemberState)(nil), // 3: cluster.GossipMemberState + (*GossipKeyValue)(nil), // 4: cluster.GossipKeyValue + nil, // 5: cluster.GossipState.MembersEntry + nil, // 6: cluster.GossipMemberState.ValuesEntry + (*anypb.Any)(nil), // 7: google.protobuf.Any } var file_gossip_proto_depIdxs = []int32{ 2, // 0: cluster.GossipRequest.state:type_name -> cluster.GossipState 2, // 1: cluster.GossipResponse.state:type_name -> cluster.GossipState - 6, // 2: cluster.GossipState.members:type_name -> cluster.GossipState.MembersEntry - 9, // 3: cluster.GossipKeyValue.value:type_name -> google.protobuf.Any - 8, // 4: cluster.GossipDeltaValue.entries:type_name -> cluster.GossipDeltaValue.GossipDeltaEntry - 7, // 5: cluster.GossipState.GossipMemberState.values:type_name -> cluster.GossipState.GossipMemberState.ValuesEntry - 5, // 6: cluster.GossipState.MembersEntry.value:type_name -> cluster.GossipState.GossipMemberState - 3, // 7: cluster.GossipState.GossipMemberState.ValuesEntry.value:type_name -> cluster.GossipKeyValue - 8, // [8:8] is the sub-list for method output_type - 8, // [8:8] is the sub-list for method input_type - 8, // [8:8] is the sub-list for extension type_name - 8, // [8:8] is the sub-list for extension extendee - 0, // [0:8] is the sub-list for field type_name + 5, // 2: cluster.GossipState.members:type_name -> cluster.GossipState.MembersEntry + 6, // 3: cluster.GossipMemberState.values:type_name -> cluster.GossipMemberState.ValuesEntry + 7, // 4: cluster.GossipKeyValue.value:type_name -> google.protobuf.Any + 3, // 5: cluster.GossipState.MembersEntry.value:type_name -> cluster.GossipMemberState + 4, // 6: cluster.GossipMemberState.ValuesEntry.value:type_name -> cluster.GossipKeyValue + 7, // [7:7] is the sub-list for method output_type + 7, // [7:7] is the sub-list for method input_type + 7, // [7:7] is the sub-list for extension type_name + 7, // [7:7] is the sub-list for extension extendee + 0, // [0:7] is the sub-list for field type_name } func init() { file_gossip_proto_init() } @@ -546,7 +425,7 @@ func file_gossip_proto_init() { } } file_gossip_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*GossipKeyValue); i { + switch v := v.(*GossipMemberState); i { case 0: return &v.state case 1: @@ -558,31 +437,7 @@ func file_gossip_proto_init() { } } file_gossip_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*GossipDeltaValue); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_gossip_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*GossipState_GossipMemberState); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_gossip_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*GossipDeltaValue_GossipDeltaEntry); i { + switch v := v.(*GossipKeyValue); i { case 0: return &v.state case 1: @@ -600,7 +455,7 @@ func file_gossip_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_gossip_proto_rawDesc, NumEnums: 0, - NumMessages: 9, + NumMessages: 7, NumExtensions: 0, NumServices: 0, }, diff --git a/cluster/gossip.proto b/cluster/gossip.proto index b89cc1fe..2effa65e 100644 --- a/cluster/gossip.proto +++ b/cluster/gossip.proto @@ -5,7 +5,7 @@ import "google/protobuf/any.proto"; message GossipRequest { - string member_id = 2; + string from_member_id = 2; GossipState state = 1; } @@ -17,14 +17,12 @@ message GossipResponse { //two GossipState objects can be merged //key + member_id gets it's own entry, if collision, highest version is selected message GossipState { - message GossipMemberState { - map values = 1; - } - map members = 1; } - +message GossipMemberState { + map values = 1; +} //a known key might be heartbeat. if we locally tag each entry with a local timestamp //this means that we can measure if we have not received a new heartbeat from one member in some time @@ -38,18 +36,3 @@ message GossipKeyValue { google.protobuf.Any value = 4; //value is any format int64 local_timestamp_unix_milliseconds = 5; } - -//represents a value that can be sent in form of a delta change -//instead of a full value replace -message GossipDeltaValue -{ - //these are the entries of a delta value - //this can be seen as an array with data, where each element in the array is tagged with a sequence number - message GossipDeltaEntry - { - int64 sequence_number = 1; - bytes data = 2; - } - - repeated GossipDeltaEntry entries = 1; -} \ No newline at end of file diff --git a/cluster/gossip_actor.go b/cluster/gossip_actor.go index 2c4658b6..05de0580 100644 --- a/cluster/gossip_actor.go +++ b/cluster/gossip_actor.go @@ -90,8 +90,8 @@ func (ga *GossipActor) onGossipRequest(r *GossipRequest, ctx actor.Context) { } ga.ReceiveState(r.State, ctx) - if !GetCluster(ctx.ActorSystem()).MemberList.ContainsMemberID(r.MemberId) { - ctx.Logger().Warn("Got gossip request from unknown member", slog.String("MemberId", r.MemberId)) + if !GetCluster(ctx.ActorSystem()).MemberList.ContainsMemberID(r.FromMemberId) { + ctx.Logger().Warn("Got gossip request from unknown member", slog.String("MemberId", r.FromMemberId)) // nothing to send, do not provide sender or state payload // ctx.Respond(&GossipResponse{State: &GossipState{Members: make(map[string]*GossipState_GossipMemberState)}}) @@ -100,9 +100,9 @@ func (ga *GossipActor) onGossipRequest(r *GossipRequest, ctx actor.Context) { return } - memberState := ga.gossip.GetMemberStateDelta(r.MemberId) + memberState := ga.gossip.GetMemberStateDelta(r.FromMemberId) if !memberState.HasState { - ctx.Logger().Warn("Got gossip request from member, but no state was found", slog.String("MemberId", r.MemberId)) + ctx.Logger().Warn("Got gossip request from member, but no state was found", slog.String("MemberId", r.FromMemberId)) // nothing to send, do not provide sender or state payload ctx.Respond(&GossipResponse{}) @@ -177,8 +177,8 @@ func (ga *GossipActor) sendGossipForMember(member *Member, memberStateDelta *Mem // for timeout, blocking other gossips from getting through msg := GossipRequest{ - MemberId: member.Id, - State: memberStateDelta.State, + FromMemberId: member.Id, + State: memberStateDelta.State, } future := ctx.RequestFuture(pid, &msg, ga.gossipRequestTimeout) diff --git a/cluster/gossip_state_management.go b/cluster/gossip_state_management.go index 816209c7..b0bf32ad 100644 --- a/cluster/gossip_state_management.go +++ b/cluster/gossip_state_management.go @@ -8,7 +8,6 @@ import ( ) // convenience type alias -type GossipMemberState = GossipState_GossipMemberState func ensureEntryExists(memberState *GossipMemberState, key string) *GossipKeyValue { value, ok := memberState.Values[key] @@ -55,7 +54,7 @@ func setKey(state *GossipState, key string, value proto.Message, memberID string // merges the local and the incoming remote states into a new states slice and return it func mergeState(localState *GossipState, remoteState *GossipState) ([]*GossipUpdate, *GossipState, map[string]empty) { // make a copy of the localState (we do not want to modify localState just yet) - mergedState := &GossipState{Members: make(map[string]*GossipState_GossipMemberState)} + mergedState := &GossipState{Members: make(map[string]*GossipMemberState)} for id, member := range localState.Members { mergedState.Members[id] = member } diff --git a/cluster/informer.go b/cluster/informer.go index c3d39240..85981b87 100644 --- a/cluster/informer.go +++ b/cluster/informer.go @@ -48,7 +48,7 @@ func newInformer(myID string, getBlockedMembers func() set.Set[string], fanOut i informer := Informer{ myID: myID, state: &GossipState{ - Members: map[string]*GossipState_GossipMemberState{}, + Members: map[string]*GossipMemberState{}, }, committedOffsets: map[string]int64{}, activeMemberIDs: map[string]empty{}, @@ -145,13 +145,13 @@ func (inf *Informer) GetMemberStateDelta(targetMemberID string) *MemberStateDelt var count int // newState will old the final new state to be sent - newState := GossipState{Members: make(map[string]*GossipState_GossipMemberState)} + newState := GossipState{Members: make(map[string]*GossipMemberState)} // hashmaps in Go are random by nature so no need to randomize state.Members pendingOffsets := inf.committedOffsets // create a new map with gossipMaxSend entries max - members := make(map[string]*GossipState_GossipMemberState) + members := make(map[string]*GossipMemberState) // add ourselves to the gossip list if we are in the members state if member, ok := inf.state.Members[inf.myID]; ok { @@ -179,7 +179,7 @@ func (inf *Informer) GetMemberStateDelta(targetMemberID string) *MemberStateDelt for memberID, memberState := range members { // create an empty state - newMemberState := GossipState_GossipMemberState{ + newMemberState := GossipMemberState{ Values: make(map[string]*GossipKeyValue), }