From 2fc060393dcbec320a983067504158d5d61de26e Mon Sep 17 00:00:00 2001 From: "tarunkumar21c@gmail.com" Date: Wed, 3 May 2023 16:10:51 +0000 Subject: [PATCH] Fixed review comments --- .../bcstatetransfer/SimpleBCStateTransfer.hpp | 16 +- bftengine/src/bftengine/ClientsManager.cpp | 5 +- bftengine/src/bftengine/FullNodeReplica.cpp | 168 ++++++------------ bftengine/src/bftengine/FullNodeReplica.hpp | 9 +- bftengine/src/bftengine/ReadOnlyReplica.hpp | 4 +- bftengine/src/bftengine/ReplicaImp.hpp | 4 +- 6 files changed, 73 insertions(+), 133 deletions(-) mode change 100755 => 100644 bftengine/src/bftengine/FullNodeReplica.cpp diff --git a/bftengine/include/bcstatetransfer/SimpleBCStateTransfer.hpp b/bftengine/include/bcstatetransfer/SimpleBCStateTransfer.hpp index c6943bce1a..ed9d8c353c 100644 --- a/bftengine/include/bcstatetransfer/SimpleBCStateTransfer.hpp +++ b/bftengine/include/bcstatetransfer/SimpleBCStateTransfer.hpp @@ -193,19 +193,21 @@ inline std::ostream &operator<<(std::ostream &os, const Config &c) { c.cVal, c.numReplicas, c.numRoReplicas, + c.numFnReplicas, c.pedanticChecks, c.isReadOnly, + c.isFullNode, c.maxChunkSize, c.maxNumberOfChunksInBatch, c.maxBlockSize, c.maxPendingDataFromSourceReplica, c.maxNumOfReservedPages, c.sizeOfReservedPage, - c.gettingMissingBlocksSummaryWindowSize, - c.minPrePrepareMsgsForPrimaryAwareness, - c.fetchRangeSize); + c.gettingMissingBlocksSummaryWindowSize); os << ","; - os << KVLOG(c.RVT_K, + os << KVLOG(c.minPrePrepareMsgsForPrimaryAwareness, + c.fetchRangeSize, + c.RVT_K, c.refreshTimerMs, c.checkpointSummariesRetransmissionTimeoutMs, c.maxAcceptableMsgDelayMs, @@ -218,11 +220,9 @@ inline std::ostream &operator<<(std::ostream &os, const Config &c) { c.sourcePerformanceSnapshotFrequencySec, c.runInSeparateThread, c.enableReservedPages, - c.enableSourceBlocksPreFetch, - c.enableSourceSelectorPrimaryAwareness, - c.enableStoreRvbDataDuringCheckpointing); + c.enableSourceBlocksPreFetch); os << ","; - os << KVLOG(c.numFnReplicas, c.isFullNode); + os << KVLOG(c.enableSourceSelectorPrimaryAwareness, c.enableStoreRvbDataDuringCheckpointing); return os; } // creates an instance of the state transfer module. diff --git a/bftengine/src/bftengine/ClientsManager.cpp b/bftengine/src/bftengine/ClientsManager.cpp index 80e76e37e2..338c108ddb 100644 --- a/bftengine/src/bftengine/ClientsManager.cpp +++ b/bftengine/src/bftengine/ClientsManager.cpp @@ -217,9 +217,8 @@ ClientsManager::ClientsManager(const std::set& proxyClients, ConcordAssert(maxNumOfReqsPerClient_ > 0); reservedPagesPerRequest_ = reservedPagesPerRequest(sizeOfReservedPage(), maxReplySize_); reservedPagesPerClient_ = reservedPagesPerClient(sizeOfReservedPage(), maxReplySize_, maxNumOfReqsPerClient_); - for (NodeIdType i = 0; i < ReplicaConfig::instance().numReplicas + ReplicaConfig::instance().numRoReplicas + - ReplicaConfig::instance().numFnReplicas; - i++) { + const auto& config = ReplicaConfig::instance(); + for (NodeIdType i{}; i < config.numReplicas + config.numRoReplicas + config.numFnReplicas; ++i) { clientIds_.insert(i); } clientIds_.insert(proxyClients_.begin(), proxyClients_.end()); diff --git a/bftengine/src/bftengine/FullNodeReplica.cpp b/bftengine/src/bftengine/FullNodeReplica.cpp old mode 100755 new mode 100644 index f71a4ee511..e78ed0d3f7 --- a/bftengine/src/bftengine/FullNodeReplica.cpp +++ b/bftengine/src/bftengine/FullNodeReplica.cpp @@ -1,6 +1,6 @@ // Concord // -// Copyright (c) 2018, 2019 VMware, Inc. All Rights Reserved. +// Copyright (c) 2023 VMware, Inc. All Rights Reserved. // // This product is licensed to you under the Apache 2.0 license (the "License"). You may not use this product except in // compliance with the Apache 2.0 License. @@ -13,8 +13,8 @@ #include #include -#include -#include +#include "bftengine/Replica.hpp" +#include "messages/StateTransferMsg.hpp" #include "FullNodeReplica.hpp" #include "log/logger.hpp" @@ -33,32 +33,31 @@ #include "communication/StateControl.hpp" using concordUtil::Timers; +using namespace std::placeholders; + +// Note : The changes in files are inclined with RO replica SateTransfer behavior, all the class functions are inherited +// from ReadOnlyReplica. As we know for timebeing StateTransfer functionality is a temporary solution for FullNode, +// until the ASP/BSP is implemented the functions in this class needs to be changed based on the required accordingly. namespace bftEngine::impl { FullNodeReplica::FullNodeReplica(const ReplicaConfig &config, - std::shared_ptr requestsHandler, - IStateTransfer *stateTransfer, - std::shared_ptr msgComm, - std::shared_ptr msgHandlerReg, + std::shared_ptr requests_handler, + IStateTransfer *state_transfer, + std::shared_ptr msg_comm, + std::shared_ptr msg_handler_reg, concordUtil::Timers &timers, - MetadataStorage *metadataStorage) - : ReplicaForStateTransfer(config, requestsHandler, stateTransfer, msgComm, msgHandlerReg, true, timers), + MetadataStorage *metadata_storage) + : ReplicaForStateTransfer(config, requests_handler, state_transfer, msg_comm, msg_handler_reg, true, timers), fn_metrics_{metrics_.RegisterCounter("receivedCheckpointMsgs"), metrics_.RegisterCounter("sentAskForCheckpointMsgs"), metrics_.RegisterCounter("receivedInvalidMsgs"), metrics_.RegisterGauge("lastExecutedSeqNum", lastExecutedSeqNum)}, - metadataStorage_{metadataStorage} { + metadata_storage_{metadata_storage} { LOG_INFO(GL, "Initialising Full Node Replica"); repsInfo = new ReplicasInfo(config, dynamicCollectorForPartialProofs, dynamicCollectorForExecutionProofs); - msgHandlers_->registerMsgHandler( - MsgCode::Checkpoint, std::bind(&FullNodeReplica::messageHandler, this, std::placeholders::_1)); - msgHandlers_->registerMsgHandler( - MsgCode::ClientRequest, - std::bind(&FullNodeReplica::messageHandler, this, std::placeholders::_1)); - msgHandlers_->registerMsgHandler( - MsgCode::StateTransfer, - std::bind(&FullNodeReplica::messageHandler, this, std::placeholders::_1)); + + registerMsgHandlers(); metrics_.Register(); SigManager::init(config_.replicaId, @@ -96,10 +95,8 @@ void FullNodeReplica::stop() { } void FullNodeReplica::onTransferringCompleteImp(uint64_t newStateCheckpoint) { - lastExecutedSeqNum = newStateCheckpoint * checkpointWindowSize; - - fn_metrics_.last_executed_seq_num_.Get().Set(lastExecutedSeqNum); - last_executed_seq_num_ = lastExecutedSeqNum; + last_executed_seq_num_ = newStateCheckpoint * checkpointWindowSize; + fn_metrics_.last_executed_seq_num_.Get().Set(last_executed_seq_num_); } void FullNodeReplica::onReportAboutInvalidMessage(MessageBase *msg, const char *reason) { @@ -111,8 +108,8 @@ void FullNodeReplica::onReportAboutInvalidMessage(MessageBase *msg, const char * void FullNodeReplica::sendAskForCheckpointMsg() { fn_metrics_.sent_ask_for_checkpoint_msg_++; LOG_INFO(GL, "sending AskForCheckpointMsg"); - auto msg = std::make_unique(config_.replicaId); - for (auto id : repsInfo->idsOfPeerReplicas()) send(msg.get(), id); + AskForCheckpointMsg msg{config_.replicaId}; + for (auto id : repsInfo->idsOfPeerReplicas()) send(&msg, id); } template <> @@ -139,24 +136,24 @@ void FullNodeReplica::onMessage(std::unique_ptr ms msg->rvbDataDigest())); // Reconfiguration cmd block is synced to RO replica via reserved pages - EpochNum replicasLastKnownEpochVal = 0; - auto epochNumberFromResPages = ReconfigurationCmd::instance().getReconfigurationCommandEpochNumber(); - if (epochNumberFromResPages.has_value()) replicasLastKnownEpochVal = epochNumberFromResPages.value(); + EpochNum replicas_last_known_epoch_val = 0; + auto epoch_number_from_res_pages = ReconfigurationCmd::instance().getReconfigurationCommandEpochNumber(); + if (epoch_number_from_res_pages.has_value()) replicas_last_known_epoch_val = epoch_number_from_res_pages.value(); // not relevant if (!msg->isStableState() || msg->seqNumber() <= lastExecutedSeqNum || - msg->epochNumber() < replicasLastKnownEpochVal) { + msg->epochNumber() < replicas_last_known_epoch_val) { return; } // no self certificate - static std::map> checkpointsInfo; - const auto msgSeqNum = msg->seqNumber(); - const auto idOfGeneratedReplica = msg->idOfGeneratedReplica(); - checkpointsInfo[msgSeqNum].addCheckpointMsg(msg.release(), idOfGeneratedReplica); + static std::map> checkpoints_info; + const auto msg_seq_num = msg->seqNumber(); + const auto id_of_generated_eplica = msg->idOfGeneratedReplica(); + checkpoints_info[msg_seq_num].addCheckpointMsg(msg.release(), id_of_generated_eplica); // if enough - invoke state transfer - if (checkpointsInfo[msgSeqNum].isCheckpointCertificateComplete()) { - persistCheckpointDescriptor(msgSeqNum, checkpointsInfo[msgSeqNum]); - checkpointsInfo.clear(); + if (checkpoints_info[msg_seq_num].isCheckpointCertificateComplete()) { + persistCheckpointDescriptor(msg_seq_num, checkpoints_info[msg_seq_num]); + checkpoints_info.clear(); LOG_INFO(GL, "call to startCollectingState()"); stateTransfer->startCollectingState(); } @@ -177,103 +174,37 @@ void FullNodeReplica::persistCheckpointDescriptor(const SeqNum &seqnum, const Ch m.second->idOfGeneratedReplica())); } DescriptorOfLastStableCheckpoint desc(ReplicaConfig::instance().getnumReplicas(), msgs); - const size_t bufLen = DescriptorOfLastStableCheckpoint::maxSize(ReplicaConfig::instance().getnumReplicas()); - concord::serialize::UniquePtrToChar descBuf(new char[bufLen]); - char *descBufPtr = descBuf.get(); - size_t actualSize = 0; - desc.serialize(descBufPtr, bufLen, actualSize); - ConcordAssertNE(actualSize, 0); + const size_t buf_len = DescriptorOfLastStableCheckpoint::maxSize(ReplicaConfig::instance().getnumReplicas()); + concord::serialize::UniquePtrToChar desc_buf(new char[buf_len]); + char *desc_buf_ptr = desc_buf.get(); + size_t actual_size = 0; + desc.serialize(desc_buf_ptr, buf_len, actual_size); + ConcordAssertNE(actual_size, 0); // TODO [TK] S3KeyGenerator // checkpoints// std::ostringstream oss; oss << "checkpoints/" << msgs[0]->state() << "/" << config_.replicaId; - metadataStorage_->atomicWriteArbitraryObject(oss.str(), descBuf.get(), actualSize); + metadata_storage_->atomicWriteArbitraryObject(oss.str(), desc_buf.get(), actual_size); } template <> void FullNodeReplica::onMessage(std::unique_ptr msg) { - const NodeIdType senderId = msg->senderId(); - const NodeIdType clientId = msg->clientProxyId(); - const bool reconfig_flag = (msg->flags() & MsgFlag::RECONFIG_FLAG) != 0; - const ReqId reqSeqNum = msg->requestSeqNum(); + const NodeIdType sender_id = msg->senderId(); + const NodeIdType client_id = msg->clientProxyId(); + const ReqId req_seq_num = msg->requestSeqNum(); const uint64_t flags = msg->flags(); SCOPED_MDC_CID(msg->getCid()); - LOG_DEBUG(CNSUS, KVLOG(clientId, reqSeqNum, senderId) << " flags: " << std::bitset(flags)); + LOG_DEBUG(CNSUS, KVLOG(client_id, req_seq_num, sender_id) << " flags: " << std::bitset(flags)); const auto &span_context = msg->spanContext::type>(); auto span = concordUtils::startChildSpanFromContext(span_context, "bft_client_request"); span.setTag("rid", config_.getreplicaId()); span.setTag("cid", msg->getCid()); - span.setTag("seq_num", reqSeqNum); - - // A full node replica can handle only reconfiguration requests. Those requests are signed by the operator and - // the validation is done in the reconfiguration engine. Thus, we don't need to check the client validity as in - // the committers + span.setTag("seq_num", req_seq_num); - if (reconfig_flag) { - LOG_INFO(GL, "FN replica has received a reconfiguration request"); - executeReadOnlyRequest(span, *(msg.get())); - return; - } -} - -void FullNodeReplica::executeReadOnlyRequest(concordUtils::SpanWrapper &parent_span, const ClientRequestMsg &request) { - auto span = concordUtils::startChildSpan("bft_execute_read_only_request", parent_span); - // full node replica does not know who is the primary, so it always return 0. It is the client responsibility to treat - // the replies accordingly. - ClientReplyMsg reply(0, request.requestSeqNum(), config_.getreplicaId()); - const uint16_t clientId = request.clientProxyId(); - - int executionResult = 0; - bftEngine::IRequestsHandler::ExecutionRequestsQueue accumulatedRequests; - accumulatedRequests.push_back(bftEngine::IRequestsHandler::ExecutionRequest{clientId, - static_cast(lastExecutedSeqNum), - request.getCid(), - request.flags(), - request.requestLength(), - request.requestBuf(), - "", - reply.maxReplyLength(), - reply.replyBuf(), - request.requestSeqNum(), - request.requestIndexInBatch(), - request.result()}); - // DD: Do we need to take care of Time Service here? - bftRequestsHandler_->execute(accumulatedRequests, std::nullopt, request.getCid(), span); - IRequestsHandler::ExecutionRequest &single_request = accumulatedRequests.back(); - executionResult = single_request.outExecutionStatus; - const uint32_t actualReplyLength = single_request.outActualReplySize; - const uint32_t actualReplicaSpecificInfoLength = single_request.outReplicaSpecificInfoSize; - LOG_DEBUG(GL, - "Executed full node request. " << KVLOG(clientId, - lastExecutedSeqNum, - request.requestLength(), - reply.maxReplyLength(), - actualReplyLength, - actualReplicaSpecificInfoLength, - executionResult)); - // TODO(GG): TBD - how do we want to support empty replies? (actualReplyLength==0) - if (!executionResult) { - if (actualReplyLength > 0) { - reply.setReplyLength(actualReplyLength); - reply.setReplicaSpecificInfoLength(actualReplicaSpecificInfoLength); - send(&reply, clientId); - return; - } else { - LOG_WARN(GL, "Received zero size response. " << KVLOG(clientId)); - strcpy(single_request.outReply, "Executed data is empty"); - single_request.outActualReplySize = strlen(single_request.outReply); - executionResult = static_cast(bftEngine::OperationResult::EXEC_DATA_EMPTY); - } - - } else { - LOG_ERROR(GL, "Received error while executing FN request. " << KVLOG(clientId, executionResult)); - } - ClientReplyMsg replyMsg( - 0, request.requestSeqNum(), single_request.outReply, single_request.outActualReplySize, executionResult); - send(&replyMsg, clientId); + // TODO: handle reconfiguration request here, refer ReadOnlyReplica class } void FullNodeReplica::registerStatusHandlers() { @@ -292,4 +223,13 @@ void FullNodeReplica::registerStatusHandlers() { concord::diagnostics::RegistrarSingleton::getInstance().status.registerHandler(h); } +void FullNodeReplica::registerMsgHandlers() { + msgHandlers_->registerMsgHandler(MsgCode::Checkpoint, + std::bind(&FullNodeReplica::messageHandler, this, _1)); + msgHandlers_->registerMsgHandler(MsgCode::ClientRequest, + std::bind(&FullNodeReplica::messageHandler, this, _1)); + msgHandlers_->registerMsgHandler(MsgCode::StateTransfer, + std::bind(&FullNodeReplica::messageHandler, this, _1)); +} + } // namespace bftEngine::impl diff --git a/bftengine/src/bftengine/FullNodeReplica.hpp b/bftengine/src/bftengine/FullNodeReplica.hpp index 2b2e4f7f37..05ff032003 100644 --- a/bftengine/src/bftengine/FullNodeReplica.hpp +++ b/bftengine/src/bftengine/FullNodeReplica.hpp @@ -33,8 +33,8 @@ class FullNodeReplica : public ReplicaForStateTransfer { void start() override; void stop() override; - virtual bool isReadOnly() const override { return false; } - virtual bool isFullNode() const override { return true; } + virtual bool isReadOnly() const override { return config_.isReadOnly; } + virtual bool isFullNode() const override { return config_.isFullNode; } protected: void sendAskForCheckpointMsg(); @@ -54,7 +54,6 @@ class FullNodeReplica : public ReplicaForStateTransfer { template void onMessage(std::unique_ptr); - void executeReadOnlyRequest(concordUtils::SpanWrapper& parent_span, const ClientRequestMsg& m); void persistCheckpointDescriptor(const SeqNum&, const CheckpointInfo&); protected: @@ -67,7 +66,7 @@ class FullNodeReplica : public ReplicaForStateTransfer { concordMetrics::GaugeHandle last_executed_seq_num_; } fn_metrics_; - std::unique_ptr metadataStorage_; + std::unique_ptr metadata_storage_; std::atomic last_executed_seq_num_{0}; private: @@ -76,6 +75,8 @@ class FullNodeReplica : public ReplicaForStateTransfer { // The full node replica also hasn't got an implementation for InternalMessages which are used by the // ReplicaStatusHandler. void registerStatusHandlers(); + + void registerMsgHandlers(); }; } // namespace bftEngine::impl diff --git a/bftengine/src/bftengine/ReadOnlyReplica.hpp b/bftengine/src/bftengine/ReadOnlyReplica.hpp index 4bc0b5461a..efabe48fa2 100644 --- a/bftengine/src/bftengine/ReadOnlyReplica.hpp +++ b/bftengine/src/bftengine/ReadOnlyReplica.hpp @@ -33,8 +33,8 @@ class ReadOnlyReplica : public ReplicaForStateTransfer { void start() override; void stop() override; - virtual bool isReadOnly() const override { return true; } - virtual bool isFullNode() const override { return false; } + virtual bool isReadOnly() const override { return config_.isReadOnly; } + virtual bool isFullNode() const override { return config_.isFullNode; } protected: void sendAskForCheckpointMsg(); diff --git a/bftengine/src/bftengine/ReplicaImp.hpp b/bftengine/src/bftengine/ReplicaImp.hpp index 7d595569ad..4d9aa8a9ab 100644 --- a/bftengine/src/bftengine/ReplicaImp.hpp +++ b/bftengine/src/bftengine/ReplicaImp.hpp @@ -364,9 +364,9 @@ class ReplicaImp : public InternalReplicaApi, public ReplicaForStateTransfer { std::shared_ptr ticksGenerator() const { return ticks_gen_; } - virtual bool isReadOnly() const override { return false; } + virtual bool isReadOnly() const override { return config_.isReadOnly; } - virtual bool isFullNode() const override { return false; } + virtual bool isFullNode() const override { return config_.isFullNode; } shared_ptr getPersistentStorage() const { return ps_; } std::shared_ptr getSecretsManager() { return sm_; }