diff --git a/floyd/example/simple/remove_server.cc b/floyd/example/simple/remove_server.cc index 7cbd680..97c52c8 100644 --- a/floyd/example/simple/remove_server.cc +++ b/floyd/example/simple/remove_server.cc @@ -15,6 +15,20 @@ uint64_t NowMicros() { return static_cast(tv.tv_sec) * 1000000 + tv.tv_usec; } +bool print_members(Floyd* f) { + std::set nodes; + Status s = f->GetAllServers(&nodes); + if (!s.ok()) { + return false; + } + printf("Membership: "); + for (const std::string& n : nodes) { + printf(" %s", n.c_str()); + } + printf("\n"); + return true; +} + int main() { slash::Status s; @@ -44,9 +58,7 @@ int main() std::string msg; while (1) { - if (f1->HasLeader()) { - f1->GetServerStatus(&msg); - printf("%s\n", msg.c_str()); + if (print_members(f1)) { break; } printf("electing leader... sleep 2s\n"); @@ -74,9 +86,7 @@ int main() printf("Remove out server4 status %s\n", s.ToString().c_str()); while (1) { - if (f1->HasLeader()) { - f1->GetServerStatus(&msg); - printf("%s\n", msg.c_str()); + if (print_members(f1)) { break; } printf("electing leader after server 4 leave the cluster... sleep 2s\n"); @@ -97,9 +107,7 @@ int main() delete f5; printf("Remove out server5 status %s\n", s.ToString().c_str()); while (1) { - if (f1->HasLeader()) { - f1->GetServerStatus(&msg); - printf("%s\n", msg.c_str()); + if (print_members(f1)) { break; } printf("electing leader after server 5 leave the cluster... sleep 2s\n"); diff --git a/floyd/include/floyd.h b/floyd/include/floyd.h index 24a64f9..8194aba 100644 --- a/floyd/include/floyd.h +++ b/floyd/include/floyd.h @@ -39,8 +39,8 @@ class Floyd { virtual bool GetLeader(std::string* ip_port) = 0; virtual bool GetLeader(std::string* ip, int* port) = 0; virtual bool HasLeader() = 0; - virtual bool GetAllNodes(std::set* nodes) = 0; virtual bool IsLeader() = 0; + virtual Status GetAllServers(std::set* nodes) = 0; // used for debug virtual bool GetServerStatus(std::string* msg) = 0; diff --git a/floyd/proto/floyd.proto b/floyd/proto/floyd.proto index 7fb1e71..267067c 100644 --- a/floyd/proto/floyd.proto +++ b/floyd/proto/floyd.proto @@ -12,6 +12,7 @@ message Entry { kUnLock = 5; kAddServer = 6; kRemoveServer = 7; + kGetAllServers = 8; } // used in key value operator optional uint64 term = 1; @@ -36,6 +37,7 @@ enum Type { kUnLock = 6; kAddServer = 11; kRemoveServer = 12; + kGetAllServers = 13; // Raft RPC kRequestVote = 8; @@ -142,6 +144,8 @@ message CmdResponse { optional uint64 last_applied = 10; } optional ServerStatus server_status = 7; + + optional Membership all_servers = 8; } /* @@ -164,3 +168,4 @@ message Membership { } + diff --git a/floyd/src/floyd.pb.cc b/floyd/src/floyd.pb.cc index b0a6e7d..3bd35c1 100644 --- a/floyd/src/floyd.pb.cc +++ b/floyd/src/floyd.pb.cc @@ -247,7 +247,7 @@ void protobuf_AssignDesc_floyd_2eproto() { ::google::protobuf::MessageFactory::generated_factory(), sizeof(CmdRequest_ServerStatus)); CmdResponse_descriptor_ = file->message_type(2); - static const int CmdResponse_offsets_[7] = { + static const int CmdResponse_offsets_[8] = { GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(CmdResponse, type_), GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(CmdResponse, code_), GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(CmdResponse, request_vote_res_), @@ -255,6 +255,7 @@ void protobuf_AssignDesc_floyd_2eproto() { GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(CmdResponse, msg_), GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(CmdResponse, kv_response_), GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(CmdResponse, server_status_), + GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(CmdResponse, all_servers_), }; CmdResponse_reflection_ = new ::google::protobuf::internal::GeneratedMessageReflection( @@ -462,63 +463,65 @@ void protobuf_AddDesc_floyd_2eproto() { GOOGLE_PROTOBUF_VERIFY_VERSION; ::google::protobuf::DescriptorPool::InternalAddGeneratedFile( - "\n\013floyd.proto\022\005floyd\"\365\001\n\005Entry\022\014\n\004term\030\001" + "\n\013floyd.proto\022\005floyd\"\211\002\n\005Entry\022\014\n\004term\030\001" " \001(\004\022\013\n\003key\030\002 \001(\t\022\r\n\005value\030\003 \001(\014\022#\n\006opty" "pe\030\004 \002(\0162\023.floyd.Entry.OpType\022\016\n\006holder\030" "\005 \001(\014\022\021\n\tlease_end\030\006 \001(\004\022\016\n\006server\030\007 \001(\014" - "\"j\n\006OpType\022\t\n\005kRead\020\000\022\n\n\006kWrite\020\001\022\013\n\007kDe" + "\"~\n\006OpType\022\t\n\005kRead\020\000\022\n\n\006kWrite\020\001\022\013\n\007kDe" "lete\020\002\022\014\n\010kTryLock\020\004\022\013\n\007kUnLock\020\005\022\016\n\nkAd" - "dServer\020\006\022\021\n\rkRemoveServer\020\007\"\307\007\n\nCmdRequ" - "est\022\031\n\004type\030\001 \002(\0162\013.floyd.Type\0223\n\014reques" - "t_vote\030\002 \001(\0132\035.floyd.CmdRequest.RequestV" - "ote\0227\n\016append_entries\030\003 \001(\0132\037.floyd.CmdR" - "equest.AppendEntries\022/\n\nkv_request\030\004 \001(\013" - "2\033.floyd.CmdRequest.KvRequest\0223\n\014lock_re" - "quest\030\005 \001(\0132\035.floyd.CmdRequest.LockReque" - "st\022>\n\022add_server_request\030\007 \001(\0132\".floyd.C" - "mdRequest.AddServerRequest\022D\n\025remove_ser" - "ver_request\030\010 \001(\0132%.floyd.CmdRequest.Rem" - "oveServerRequest\0225\n\rserver_status\030\006 \001(\0132" - "\036.floyd.CmdRequest.ServerStatus\032d\n\013Reque" - "stVote\022\014\n\004term\030\001 \002(\004\022\n\n\002ip\030\002 \002(\014\022\014\n\004port" - "\030\003 \002(\005\022\026\n\016last_log_index\030\004 \002(\004\022\025\n\rlast_l" - "og_term\030\005 \002(\004\032\234\001\n\rAppendEntries\022\014\n\004term\030" - "\001 \002(\004\022\n\n\002ip\030\002 \002(\014\022\014\n\004port\030\003 \002(\005\022\026\n\016prev_" - "log_index\030\004 \002(\004\022\025\n\rprev_log_term\030\005 \002(\004\022\025" - "\n\rleader_commit\030\006 \002(\004\022\035\n\007entries\030\007 \003(\0132\014" - ".floyd.Entry\032\'\n\tKvRequest\022\013\n\003key\030\001 \002(\014\022\r" - "\n\005value\030\002 \001(\014\032>\n\013LockRequest\022\014\n\004name\030\001 \002" - "(\014\022\016\n\006holder\030\002 \002(\014\022\021\n\tlease_end\030\003 \001(\004\032&\n" - "\020AddServerRequest\022\022\n\nnew_server\030\001 \002(\014\032)\n" - "\023RemoveServerRequest\022\022\n\nold_server\030\001 \002(\014" - "\032L\n\014ServerStatus\022\014\n\004term\030\001 \002(\003\022\024\n\014commit" - "_index\030\002 \002(\003\022\n\n\002ip\030\003 \001(\014\022\014\n\004port\030\004 \001(\005\"\320" - "\005\n\013CmdResponse\022\031\n\004type\030\001 \002(\0162\013.floyd.Typ" - "e\022\037\n\004code\030\002 \001(\0162\021.floyd.StatusCode\022@\n\020re" - "quest_vote_res\030\003 \001(\0132&.floyd.CmdResponse" - ".RequestVoteResponse\022D\n\022append_entries_r" - "es\030\004 \001(\0132(.floyd.CmdResponse.AppendEntri" - "esResponse\022\013\n\003msg\030\005 \001(\014\0222\n\013kv_response\030\006" - " \001(\0132\035.floyd.CmdResponse.KvResponse\0226\n\rs" - "erver_status\030\007 \001(\0132\037.floyd.CmdResponse.S" - "erverStatus\0329\n\023RequestVoteResponse\022\014\n\004te" - "rm\030\001 \002(\004\022\024\n\014vote_granted\030\002 \002(\010\032N\n\025Append" - "EntriesResponse\022\014\n\004term\030\001 \002(\004\022\017\n\007success" - "\030\002 \002(\010\022\026\n\016last_log_index\030\003 \001(\004\032\033\n\nKvResp" - "onse\022\r\n\005value\030\001 \001(\014\032\333\001\n\014ServerStatus\022\014\n\004" - "term\030\001 \002(\004\022\024\n\014commit_index\030\002 \002(\004\022\014\n\004role" - "\030\003 \002(\014\022\021\n\tleader_ip\030\004 \001(\014\022\023\n\013leader_port" - "\030\005 \001(\005\022\024\n\014voted_for_ip\030\006 \001(\014\022\026\n\016voted_fo" - "r_port\030\007 \001(\005\022\025\n\rlast_log_term\030\010 \001(\004\022\026\n\016l" - "ast_log_index\030\t \001(\004\022\024\n\014last_applied\030\n \001(" - "\004\")\n\004Lock\022\016\n\006holder\030\001 \002(\014\022\021\n\tlease_end\030\002" - " \002(\004\"\033\n\nMembership\022\r\n\005nodes\030\001 \003(\014*\241\001\n\004Ty" - "pe\022\t\n\005kRead\020\000\022\n\n\006kWrite\020\001\022\013\n\007kDelete\020\003\022\014" - "\n\010kTryLock\020\005\022\013\n\007kUnLock\020\006\022\016\n\nkAddServer\020" - "\013\022\021\n\rkRemoveServer\020\014\022\020\n\014kRequestVote\020\010\022\022" + "dServer\020\006\022\021\n\rkRemoveServer\020\007\022\022\n\016kGetAllS" + "ervers\020\010\"\307\007\n\nCmdRequest\022\031\n\004type\030\001 \002(\0162\013." + "floyd.Type\0223\n\014request_vote\030\002 \001(\0132\035.floyd" + ".CmdRequest.RequestVote\0227\n\016append_entrie" + "s\030\003 \001(\0132\037.floyd.CmdRequest.AppendEntries" + "\022/\n\nkv_request\030\004 \001(\0132\033.floyd.CmdRequest." + "KvRequest\0223\n\014lock_request\030\005 \001(\0132\035.floyd." + "CmdRequest.LockRequest\022>\n\022add_server_req" + "uest\030\007 \001(\0132\".floyd.CmdRequest.AddServerR" + "equest\022D\n\025remove_server_request\030\010 \001(\0132%." + "floyd.CmdRequest.RemoveServerRequest\0225\n\r" + "server_status\030\006 \001(\0132\036.floyd.CmdRequest.S" + "erverStatus\032d\n\013RequestVote\022\014\n\004term\030\001 \002(\004" + "\022\n\n\002ip\030\002 \002(\014\022\014\n\004port\030\003 \002(\005\022\026\n\016last_log_i" + "ndex\030\004 \002(\004\022\025\n\rlast_log_term\030\005 \002(\004\032\234\001\n\rAp" + "pendEntries\022\014\n\004term\030\001 \002(\004\022\n\n\002ip\030\002 \002(\014\022\014\n" + "\004port\030\003 \002(\005\022\026\n\016prev_log_index\030\004 \002(\004\022\025\n\rp" + "rev_log_term\030\005 \002(\004\022\025\n\rleader_commit\030\006 \002(" + "\004\022\035\n\007entries\030\007 \003(\0132\014.floyd.Entry\032\'\n\tKvRe" + "quest\022\013\n\003key\030\001 \002(\014\022\r\n\005value\030\002 \001(\014\032>\n\013Loc" + "kRequest\022\014\n\004name\030\001 \002(\014\022\016\n\006holder\030\002 \002(\014\022\021" + "\n\tlease_end\030\003 \001(\004\032&\n\020AddServerRequest\022\022\n" + "\nnew_server\030\001 \002(\014\032)\n\023RemoveServerRequest" + "\022\022\n\nold_server\030\001 \002(\014\032L\n\014ServerStatus\022\014\n\004" + "term\030\001 \002(\003\022\024\n\014commit_index\030\002 \002(\003\022\n\n\002ip\030\003" + " \001(\014\022\014\n\004port\030\004 \001(\005\"\370\005\n\013CmdResponse\022\031\n\004ty" + "pe\030\001 \002(\0162\013.floyd.Type\022\037\n\004code\030\002 \001(\0162\021.fl" + "oyd.StatusCode\022@\n\020request_vote_res\030\003 \001(\013" + "2&.floyd.CmdResponse.RequestVoteResponse" + "\022D\n\022append_entries_res\030\004 \001(\0132(.floyd.Cmd" + "Response.AppendEntriesResponse\022\013\n\003msg\030\005 " + "\001(\014\0222\n\013kv_response\030\006 \001(\0132\035.floyd.CmdResp" + "onse.KvResponse\0226\n\rserver_status\030\007 \001(\0132\037" + ".floyd.CmdResponse.ServerStatus\022&\n\013all_s" + "ervers\030\010 \001(\0132\021.floyd.Membership\0329\n\023Reque" + "stVoteResponse\022\014\n\004term\030\001 \002(\004\022\024\n\014vote_gra" + "nted\030\002 \002(\010\032N\n\025AppendEntriesResponse\022\014\n\004t" + "erm\030\001 \002(\004\022\017\n\007success\030\002 \002(\010\022\026\n\016last_log_i" + "ndex\030\003 \001(\004\032\033\n\nKvResponse\022\r\n\005value\030\001 \001(\014\032" + "\333\001\n\014ServerStatus\022\014\n\004term\030\001 \002(\004\022\024\n\014commit" + "_index\030\002 \002(\004\022\014\n\004role\030\003 \002(\014\022\021\n\tleader_ip\030" + "\004 \001(\014\022\023\n\013leader_port\030\005 \001(\005\022\024\n\014voted_for_" + "ip\030\006 \001(\014\022\026\n\016voted_for_port\030\007 \001(\005\022\025\n\rlast" + "_log_term\030\010 \001(\004\022\026\n\016last_log_index\030\t \001(\004\022" + "\024\n\014last_applied\030\n \001(\004\")\n\004Lock\022\016\n\006holder\030" + "\001 \002(\014\022\021\n\tlease_end\030\002 \002(\004\"\033\n\nMembership\022\r" + "\n\005nodes\030\001 \003(\014*\265\001\n\004Type\022\t\n\005kRead\020\000\022\n\n\006kWr" + "ite\020\001\022\013\n\007kDelete\020\003\022\014\n\010kTryLock\020\005\022\013\n\007kUnL" + "ock\020\006\022\016\n\nkAddServer\020\013\022\021\n\rkRemoveServer\020\014" + "\022\022\n\016kGetAllServers\020\r\022\020\n\014kRequestVote\020\010\022\022" "\n\016kAppendEntries\020\t\022\021\n\rkServerStatus\020\n*=\n" "\nStatusCode\022\007\n\003kOk\020\000\022\r\n\tkNotFound\020\001\022\n\n\006k" - "Error\020\002\022\013\n\007kLocked\020\003", 2260); + "Error\020\002\022\013\n\007kLocked\020\003", 2340); ::google::protobuf::MessageFactory::InternalRegisterGeneratedFile( "floyd.proto", &protobuf_RegisterTypes); Entry::default_instance_ = new Entry(); @@ -578,6 +581,7 @@ bool Type_IsValid(int value) { case 10: case 11: case 12: + case 13: return true; default: return false; @@ -616,6 +620,7 @@ bool Entry_OpType_IsValid(int value) { case 5: case 6: case 7: + case 8: return true; default: return false; @@ -630,6 +635,7 @@ const Entry_OpType Entry::kTryLock; const Entry_OpType Entry::kUnLock; const Entry_OpType Entry::kAddServer; const Entry_OpType Entry::kRemoveServer; +const Entry_OpType Entry::kGetAllServers; const Entry_OpType Entry::OpType_MIN; const Entry_OpType Entry::OpType_MAX; const int Entry::OpType_ARRAYSIZE; @@ -5216,6 +5222,7 @@ const int CmdResponse::kAppendEntriesResFieldNumber; const int CmdResponse::kMsgFieldNumber; const int CmdResponse::kKvResponseFieldNumber; const int CmdResponse::kServerStatusFieldNumber; +const int CmdResponse::kAllServersFieldNumber; #endif // !_MSC_VER CmdResponse::CmdResponse() @@ -5228,6 +5235,7 @@ void CmdResponse::InitAsDefaultInstance() { append_entries_res_ = const_cast< ::floyd::CmdResponse_AppendEntriesResponse*>(&::floyd::CmdResponse_AppendEntriesResponse::default_instance()); kv_response_ = const_cast< ::floyd::CmdResponse_KvResponse*>(&::floyd::CmdResponse_KvResponse::default_instance()); server_status_ = const_cast< ::floyd::CmdResponse_ServerStatus*>(&::floyd::CmdResponse_ServerStatus::default_instance()); + all_servers_ = const_cast< ::floyd::Membership*>(&::floyd::Membership::default_instance()); } CmdResponse::CmdResponse(const CmdResponse& from) @@ -5245,6 +5253,7 @@ void CmdResponse::SharedCtor() { msg_ = const_cast< ::std::string*>(&::google::protobuf::internal::kEmptyString); kv_response_ = NULL; server_status_ = NULL; + all_servers_ = NULL; ::memset(_has_bits_, 0, sizeof(_has_bits_)); } @@ -5261,6 +5270,7 @@ void CmdResponse::SharedDtor() { delete append_entries_res_; delete kv_response_; delete server_status_; + delete all_servers_; } } @@ -5306,6 +5316,9 @@ void CmdResponse::Clear() { if (has_server_status()) { if (server_status_ != NULL) server_status_->::floyd::CmdResponse_ServerStatus::Clear(); } + if (has_all_servers()) { + if (all_servers_ != NULL) all_servers_->::floyd::Membership::Clear(); + } } ::memset(_has_bits_, 0, sizeof(_has_bits_)); mutable_unknown_fields()->Clear(); @@ -5424,6 +5437,20 @@ bool CmdResponse::MergePartialFromCodedStream( } else { goto handle_uninterpreted; } + if (input->ExpectTag(66)) goto parse_all_servers; + break; + } + + // optional .floyd.Membership all_servers = 8; + case 8: { + if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) == + ::google::protobuf::internal::WireFormatLite::WIRETYPE_LENGTH_DELIMITED) { + parse_all_servers: + DO_(::google::protobuf::internal::WireFormatLite::ReadMessageNoVirtual( + input, mutable_all_servers())); + } else { + goto handle_uninterpreted; + } if (input->ExpectAtEnd()) return true; break; } @@ -5488,6 +5515,12 @@ void CmdResponse::SerializeWithCachedSizes( 7, this->server_status(), output); } + // optional .floyd.Membership all_servers = 8; + if (has_all_servers()) { + ::google::protobuf::internal::WireFormatLite::WriteMessageMaybeToArray( + 8, this->all_servers(), output); + } + if (!unknown_fields().empty()) { ::google::protobuf::internal::WireFormat::SerializeUnknownFields( unknown_fields(), output); @@ -5543,6 +5576,13 @@ ::google::protobuf::uint8* CmdResponse::SerializeWithCachedSizesToArray( 7, this->server_status(), target); } + // optional .floyd.Membership all_servers = 8; + if (has_all_servers()) { + target = ::google::protobuf::internal::WireFormatLite:: + WriteMessageNoVirtualToArray( + 8, this->all_servers(), target); + } + if (!unknown_fields().empty()) { target = ::google::protobuf::internal::WireFormat::SerializeUnknownFieldsToArray( unknown_fields(), target); @@ -5601,6 +5641,13 @@ int CmdResponse::ByteSize() const { this->server_status()); } + // optional .floyd.Membership all_servers = 8; + if (has_all_servers()) { + total_size += 1 + + ::google::protobuf::internal::WireFormatLite::MessageSizeNoVirtual( + this->all_servers()); + } + } if (!unknown_fields().empty()) { total_size += @@ -5649,6 +5696,9 @@ void CmdResponse::MergeFrom(const CmdResponse& from) { if (from.has_server_status()) { mutable_server_status()->::floyd::CmdResponse_ServerStatus::MergeFrom(from.server_status()); } + if (from.has_all_servers()) { + mutable_all_servers()->::floyd::Membership::MergeFrom(from.all_servers()); + } } mutable_unknown_fields()->MergeFrom(from.unknown_fields()); } @@ -5689,6 +5739,7 @@ void CmdResponse::Swap(CmdResponse* other) { std::swap(msg_, other->msg_); std::swap(kv_response_, other->kv_response_); std::swap(server_status_, other->server_status_); + std::swap(all_servers_, other->all_servers_); std::swap(_has_bits_[0], other->_has_bits_[0]); _unknown_fields_.Swap(&other->_unknown_fields_); std::swap(_cached_size_, other->_cached_size_); diff --git a/floyd/src/floyd.pb.h b/floyd/src/floyd.pb.h index cfce487..facf323 100644 --- a/floyd/src/floyd.pb.h +++ b/floyd/src/floyd.pb.h @@ -58,11 +58,12 @@ enum Entry_OpType { Entry_OpType_kTryLock = 4, Entry_OpType_kUnLock = 5, Entry_OpType_kAddServer = 6, - Entry_OpType_kRemoveServer = 7 + Entry_OpType_kRemoveServer = 7, + Entry_OpType_kGetAllServers = 8 }; bool Entry_OpType_IsValid(int value); const Entry_OpType Entry_OpType_OpType_MIN = Entry_OpType_kRead; -const Entry_OpType Entry_OpType_OpType_MAX = Entry_OpType_kRemoveServer; +const Entry_OpType Entry_OpType_OpType_MAX = Entry_OpType_kGetAllServers; const int Entry_OpType_OpType_ARRAYSIZE = Entry_OpType_OpType_MAX + 1; const ::google::protobuf::EnumDescriptor* Entry_OpType_descriptor(); @@ -83,13 +84,14 @@ enum Type { kUnLock = 6, kAddServer = 11, kRemoveServer = 12, + kGetAllServers = 13, kRequestVote = 8, kAppendEntries = 9, kServerStatus = 10 }; bool Type_IsValid(int value); const Type Type_MIN = kRead; -const Type Type_MAX = kRemoveServer; +const Type Type_MAX = kGetAllServers; const int Type_ARRAYSIZE = Type_MAX + 1; const ::google::protobuf::EnumDescriptor* Type_descriptor(); @@ -185,6 +187,7 @@ class Entry : public ::google::protobuf::Message { static const OpType kUnLock = Entry_OpType_kUnLock; static const OpType kAddServer = Entry_OpType_kAddServer; static const OpType kRemoveServer = Entry_OpType_kRemoveServer; + static const OpType kGetAllServers = Entry_OpType_kGetAllServers; static inline bool OpType_IsValid(int value) { return Entry_OpType_IsValid(value); } @@ -1861,6 +1864,15 @@ class CmdResponse : public ::google::protobuf::Message { inline ::floyd::CmdResponse_ServerStatus* release_server_status(); inline void set_allocated_server_status(::floyd::CmdResponse_ServerStatus* server_status); + // optional .floyd.Membership all_servers = 8; + inline bool has_all_servers() const; + inline void clear_all_servers(); + static const int kAllServersFieldNumber = 8; + inline const ::floyd::Membership& all_servers() const; + inline ::floyd::Membership* mutable_all_servers(); + inline ::floyd::Membership* release_all_servers(); + inline void set_allocated_all_servers(::floyd::Membership* all_servers); + // @@protoc_insertion_point(class_scope:floyd.CmdResponse) private: inline void set_has_type(); @@ -1877,6 +1889,8 @@ class CmdResponse : public ::google::protobuf::Message { inline void clear_has_kv_response(); inline void set_has_server_status(); inline void clear_has_server_status(); + inline void set_has_all_servers(); + inline void clear_has_all_servers(); ::google::protobuf::UnknownFieldSet _unknown_fields_; @@ -1887,9 +1901,10 @@ class CmdResponse : public ::google::protobuf::Message { ::std::string* msg_; ::floyd::CmdResponse_KvResponse* kv_response_; ::floyd::CmdResponse_ServerStatus* server_status_; + ::floyd::Membership* all_servers_; mutable int _cached_size_; - ::google::protobuf::uint32 _has_bits_[(7 + 31) / 32]; + ::google::protobuf::uint32 _has_bits_[(8 + 31) / 32]; friend void protobuf_AddDesc_floyd_2eproto(); friend void protobuf_AssignDesc_floyd_2eproto(); @@ -4532,6 +4547,44 @@ inline void CmdResponse::set_allocated_server_status(::floyd::CmdResponse_Server } } +// optional .floyd.Membership all_servers = 8; +inline bool CmdResponse::has_all_servers() const { + return (_has_bits_[0] & 0x00000080u) != 0; +} +inline void CmdResponse::set_has_all_servers() { + _has_bits_[0] |= 0x00000080u; +} +inline void CmdResponse::clear_has_all_servers() { + _has_bits_[0] &= ~0x00000080u; +} +inline void CmdResponse::clear_all_servers() { + if (all_servers_ != NULL) all_servers_->::floyd::Membership::Clear(); + clear_has_all_servers(); +} +inline const ::floyd::Membership& CmdResponse::all_servers() const { + return all_servers_ != NULL ? *all_servers_ : *default_instance_->all_servers_; +} +inline ::floyd::Membership* CmdResponse::mutable_all_servers() { + set_has_all_servers(); + if (all_servers_ == NULL) all_servers_ = new ::floyd::Membership; + return all_servers_; +} +inline ::floyd::Membership* CmdResponse::release_all_servers() { + clear_has_all_servers(); + ::floyd::Membership* temp = all_servers_; + all_servers_ = NULL; + return temp; +} +inline void CmdResponse::set_allocated_all_servers(::floyd::Membership* all_servers) { + delete all_servers_; + all_servers_ = all_servers; + if (all_servers) { + set_has_all_servers(); + } else { + clear_has_all_servers(); + } +} + // ------------------------------------------------------------------- // Lock diff --git a/floyd/src/floyd_apply.cc b/floyd/src/floyd_apply.cc index 8ada075..96ae4bd 100644 --- a/floyd/src/floyd_apply.cc +++ b/floyd/src/floyd_apply.cc @@ -178,6 +178,9 @@ rocksdb::Status FloydApply::Apply(const Entry& entry) { LOGV(INFO_LEVEL, info_log_, "FloydApply::Apply Remove server %s to cluster", entry.server().c_str()); break; + case Entry_OpType_kGetAllServers: + ret = rocksdb::Status::OK(); + break; default: ret = rocksdb::Status::Corruption("Unknown entry type"); } diff --git a/floyd/src/floyd_impl.cc b/floyd/src/floyd_impl.cc index 86511d2..8d9e43a 100644 --- a/floyd/src/floyd_impl.cc +++ b/floyd/src/floyd_impl.cc @@ -87,6 +87,10 @@ static void BuildRemoveServerRequest(const std::string& old_server, CmdRequest* remove_server_request->set_old_server(old_server); } +static void BuildGetAllServersRequest(CmdRequest* cmd) { + cmd->set_type(Type::kGetAllServers); +} + static void BuildRequestVoteResponse(uint64_t term, bool granted, CmdResponse* response) { response->set_type(Type::kRequestVote); @@ -130,6 +134,8 @@ static void BuildLogEntry(const CmdRequest& cmd, uint64_t current_term, Entry* e } else if (cmd.type() == Type::kRemoveServer) { entry->set_optype(Entry_OpType_kRemoveServer); entry->set_server(cmd.remove_server_request().old_server()); + } else if (cmd.type() == Type::kGetAllServers) { + entry->set_optype(Entry_OpType_kGetAllServers); } } @@ -204,11 +210,6 @@ bool FloydImpl::HasLeader() { return true; } -bool FloydImpl::GetAllNodes(std::set* nodes) { - *nodes = context_->members; - return true; -} - void FloydImpl::set_log_level(const int log_level) { if (info_log_) { info_log_->set_log_level(log_level); @@ -494,6 +495,25 @@ Status FloydImpl::RemoveServer(const std::string& old_server) { return Status::Corruption("RemoveServer Error"); } +Status FloydImpl::GetAllServers(std::set* nodes) { + CmdRequest request; + BuildGetAllServersRequest(&request); + CmdResponse response; + Status s = DoCommand(request, &response); + if (!s.ok()) { + return s; + } + if (response.code() == StatusCode::kOk) { + nodes->clear(); + for (int i = 0; i < response.all_servers().nodes_size(); i++) { + nodes->insert(response.all_servers().nodes(i)); + } + return Status::OK(); + } + return Status::Corruption("GetALlServers Error"); +} + + bool FloydImpl::GetServerStatus(std::string* msg) { LOGV(DEBUG_LEVEL, info_log_, "FloydImpl::GetServerStatus start"); slash::MutexLock l(&context_->global_mu); @@ -711,6 +731,16 @@ Status FloydImpl::ExecuteCommand(const CmdRequest& request, case Type::kRemoveServer: response->set_code(StatusCode::kOk); break; + case Type::kGetAllServers: + rs = db_->Get(rocksdb::ReadOptions(), kMemberConfigKey, &value); + if (!rs.ok()) { + return Status::Corruption(rs.ToString()); + } + if(!response->mutable_all_servers()->ParseFromString(value)) { + return Status::Corruption("Parse failed"); + } + response->set_code(StatusCode::kOk); + break; default: return Status::Corruption("Unknown request type"); } diff --git a/floyd/src/floyd_impl.h b/floyd/src/floyd_impl.h index e6ee3eb..1f3d0a7 100644 --- a/floyd/src/floyd_impl.h +++ b/floyd/src/floyd_impl.h @@ -58,12 +58,12 @@ class FloydImpl : public Floyd { // membership change interface virtual Status AddServer(const std::string& new_server) override; virtual Status RemoveServer(const std::string& out_server) override; + virtual Status GetAllServers(std::set* nodes) override; // return true if leader has been elected virtual bool GetLeader(std::string* ip_port) override; virtual bool GetLeader(std::string* ip, int* port) override; virtual bool HasLeader() override; - virtual bool GetAllNodes(std::set* nodes) override; virtual bool IsLeader() override; int GetLocalPort() { diff --git a/floyd/src/floyd_worker.cc b/floyd/src/floyd_worker.cc index 4c12d79..8a90e28 100644 --- a/floyd/src/floyd_worker.cc +++ b/floyd/src/floyd_worker.cc @@ -80,6 +80,10 @@ int FloydWorkerConn::DealMessage() { response_.set_type(Type::kRemoveServer); floyd_->DoCommand(request_, &response_); break; + case Type::kGetAllServers: + response_.set_type(Type::kGetAllServers); + floyd_->DoCommand(request_, &response_); + break; case Type::kRequestVote: response_.set_type(Type::kRequestVote); floyd_->ReplyRequestVote(request_, &response_);