From d8a412d5a64f2e9e42f146dd890d5db5c1651f15 Mon Sep 17 00:00:00 2001 From: Tarunkumar Banda Date: Wed, 5 Apr 2023 18:49:36 +0000 Subject: [PATCH 1/3] Changes for ST in fullNode --- bftengine/CMakeLists.txt | 1 + .../bcstatetransfer/SimpleBCStateTransfer.hpp | 4 + bftengine/include/bftengine/ReplicaConfig.hpp | 9 +- .../include/bftengine/ReplicaFactory.hpp | 7 + .../bcstatetransfer/AsyncStateTransferCRE.cpp | 4 +- bftengine/src/bcstatetransfer/BCStateTran.cpp | 10 +- bftengine/src/bftengine/ClientsManager.cpp | 4 +- bftengine/src/bftengine/FullNodeReplica.cpp | 295 ++++++++++++++++++ bftengine/src/bftengine/FullNodeReplica.hpp | 80 +++++ bftengine/src/bftengine/ReplicaFactory.cpp | 25 +- .../src/bftengine/ReplicaForStateTransfer.cpp | 4 +- bftengine/src/bftengine/ReplicasInfo.cpp | 22 +- bftengine/src/bftengine/ReplicasInfo.hpp | 2 + bftengine/src/bftengine/SigManager.cpp | 17 +- bftengine/src/preprocessor/PreProcessor.cpp | 3 +- .../simplestatetransfer/SimpleStateTran.cpp | 2 + .../bcstatetransfer/bcstatetransfer_tests.cpp | 2 + examples/replica/src/SetupReplica.cpp | 4 +- kvbc/src/Replica.cpp | 31 +- kvbc/src/reconfiguration_kvbc_handler.cpp | 8 +- tests/simpleKVBC/TesterReplica/setup.cpp | 4 +- 21 files changed, 495 insertions(+), 43 deletions(-) create mode 100755 bftengine/src/bftengine/FullNodeReplica.cpp create mode 100644 bftengine/src/bftengine/FullNodeReplica.hpp diff --git a/bftengine/CMakeLists.txt b/bftengine/CMakeLists.txt index 9651286269..e991c3a2f0 100644 --- a/bftengine/CMakeLists.txt +++ b/bftengine/CMakeLists.txt @@ -11,6 +11,7 @@ set(corebft_source_files src/bftengine/DebugStatistics.cpp src/bftengine/SeqNumInfo.cpp src/bftengine/ReadOnlyReplica.cpp + src/bftengine/FullNodeReplica.cpp src/bftengine/ReplicaBase.cpp src/bftengine/ReplicaForStateTransfer.cpp src/bftengine/ReplicaImp.cpp diff --git a/bftengine/include/bcstatetransfer/SimpleBCStateTransfer.hpp b/bftengine/include/bcstatetransfer/SimpleBCStateTransfer.hpp index 79f67222d1..c6943bce1a 100644 --- a/bftengine/include/bcstatetransfer/SimpleBCStateTransfer.hpp +++ b/bftengine/include/bcstatetransfer/SimpleBCStateTransfer.hpp @@ -150,8 +150,10 @@ struct Config { uint16_t cVal = 0; uint16_t numReplicas = 0; // number of consensus replicas uint16_t numRoReplicas = 0; + uint16_t numFnReplicas = 0; bool pedanticChecks = false; bool isReadOnly = false; + bool isFullNode = false; // sizes uint32_t maxChunkSize = 0; @@ -219,6 +221,8 @@ inline std::ostream &operator<<(std::ostream &os, const Config &c) { c.enableSourceBlocksPreFetch, c.enableSourceSelectorPrimaryAwareness, c.enableStoreRvbDataDuringCheckpointing); + os << ","; + os << KVLOG(c.numFnReplicas, c.isFullNode); return os; } // creates an instance of the state transfer module. diff --git a/bftengine/include/bftengine/ReplicaConfig.hpp b/bftengine/include/bftengine/ReplicaConfig.hpp index 2cd6af5ce2..fba4452d41 100644 --- a/bftengine/include/bftengine/ReplicaConfig.hpp +++ b/bftengine/include/bftengine/ReplicaConfig.hpp @@ -46,8 +46,10 @@ class ReplicaConfig : public concord::serialize::SerializableFactory= 1"); CONFIG_PARAM(cVal, uint16_t, 0, "C value. cVal >=0"); CONFIG_PARAM(replicaId, @@ -327,8 +329,10 @@ class ReplicaConfig : public concord::serialize::SerializableFactory, + IStateTransfer *, + bft::communication::ICommunication *, + MetadataStorage *); + static void setAggregator(const std::shared_ptr &aggregator); static logging::Logger logger_; diff --git a/bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp b/bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp index ddb1cab14e..fe0d8d63ca 100644 --- a/bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp +++ b/bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp @@ -148,7 +148,9 @@ std::shared_ptr CreFactory::create(std::shared_ptr< for (uint16_t i = 0; i < repConfig.numReplicas; i++) { bftClientConf.all_replicas.emplace(bft::client::ReplicaId{i}); } - for (uint16_t i = repConfig.numReplicas; i < repConfig.numReplicas + repConfig.numRoReplicas; i++) { + for (uint16_t i = repConfig.numReplicas; + i < repConfig.numReplicas + repConfig.numRoReplicas + repConfig.numFnReplicas; + i++) { bftClientConf.ro_replicas.emplace(bft::client::ReplicaId{i}); } bftClientConf.replicas_master_key_folder_path = std::nullopt; diff --git a/bftengine/src/bcstatetransfer/BCStateTran.cpp b/bftengine/src/bcstatetransfer/BCStateTran.cpp index a981ce68af..8ca2a6ed1c 100644 --- a/bftengine/src/bcstatetransfer/BCStateTran.cpp +++ b/bftengine/src/bcstatetransfer/BCStateTran.cpp @@ -308,7 +308,7 @@ BCStateTran::BCStateTran(const Config &config, IAppState *const stateApi, DataSt // Validate input parameters and some of the configuration ConcordAssertNE(stateApi, nullptr); ConcordAssertGE(replicas_.size(), 3U * config_.fVal + 1U); - ConcordAssert(replicas_.count(config_.myReplicaId) == 1 || config.isReadOnly); + ConcordAssert(replicas_.count(config_.myReplicaId) == 1 || config.isReadOnly || config.isFullNode); ConcordAssertLT(finalizePutblockTimeoutMilli_, config_.refreshTimerMs); ConcordAssertEQ(RejectFetchingMsg::reasonMessages.size(), RejectFetchingMsg::Reason::LAST - 1); if (config_.sourceSessionExpiryDurationMs > 0) { @@ -1159,7 +1159,7 @@ void BCStateTran::handleStateTransferMessageImpl(char *msg, time_in_incoming_events_queue_rec_.end(); histograms_.incoming_events_queue_size->record(incomingEventsQ_->size()); } - bool invalidSender = (senderId >= (config_.numReplicas + config_.numRoReplicas)); + bool invalidSender = (senderId >= (config_.numReplicas + config_.numRoReplicas + config_.numFnReplicas)); bool sentFromSelf = senderId == config_.myReplicaId; bool msgSizeTooSmall = msgLen < sizeof(BCStateTranBaseMsg); if (msgSizeTooSmall || sentFromSelf || invalidSender) { @@ -3971,13 +3971,13 @@ void BCStateTran::computeDigestOfPage( if (checkpointNumber > 0) { digestGenerator.update(page, pageSize); } - digestGenerator.writeDigest(outDigest.getForUpdate()); + digestGenerator.writeDigest(reinterpret_cast(&outDigest)); } void BCStateTran::computeDigestOfPagesDescriptor(const DataStore::ResPagesDescriptor *pagesDesc, Digest &outDigest) { DigestGenerator digestGenerator; digestGenerator.update(reinterpret_cast(pagesDesc), pagesDesc->size()); - digestGenerator.writeDigest(outDigest.getForUpdate()); + digestGenerator.writeDigest(reinterpret_cast(&outDigest)); } void BCStateTran::computeDigestOfBlockImpl(const uint64_t blockNum, @@ -3996,7 +3996,7 @@ void BCStateTran::computeDigestOfBlock(const uint64_t blockNum, const char *block, const uint32_t blockSize, Digest *outDigest) { - computeDigestOfBlockImpl(blockNum, block, blockSize, outDigest->getForUpdate()); + computeDigestOfBlockImpl(blockNum, block, blockSize, reinterpret_cast(outDigest)); } BlockDigest BCStateTran::computeDigestOfBlock(const uint64_t blockNum, const char *block, const uint32_t blockSize) { diff --git a/bftengine/src/bftengine/ClientsManager.cpp b/bftengine/src/bftengine/ClientsManager.cpp index e1465d6955..80e76e37e2 100644 --- a/bftengine/src/bftengine/ClientsManager.cpp +++ b/bftengine/src/bftengine/ClientsManager.cpp @@ -217,7 +217,9 @@ ClientsManager::ClientsManager(const std::set& proxyClients, ConcordAssert(maxNumOfReqsPerClient_ > 0); reservedPagesPerRequest_ = reservedPagesPerRequest(sizeOfReservedPage(), maxReplySize_); reservedPagesPerClient_ = reservedPagesPerClient(sizeOfReservedPage(), maxReplySize_, maxNumOfReqsPerClient_); - for (NodeIdType i = 0; i < ReplicaConfig::instance().numReplicas + ReplicaConfig::instance().numRoReplicas; i++) { + for (NodeIdType i = 0; i < ReplicaConfig::instance().numReplicas + ReplicaConfig::instance().numRoReplicas + + ReplicaConfig::instance().numFnReplicas; + i++) { clientIds_.insert(i); } clientIds_.insert(proxyClients_.begin(), proxyClients_.end()); diff --git a/bftengine/src/bftengine/FullNodeReplica.cpp b/bftengine/src/bftengine/FullNodeReplica.cpp new file mode 100755 index 0000000000..4151d793b8 --- /dev/null +++ b/bftengine/src/bftengine/FullNodeReplica.cpp @@ -0,0 +1,295 @@ +// Concord +// +// Copyright (c) 2018, 2019 VMware, Inc. All Rights Reserved. +// +// This product is licensed to you under the Apache 2.0 license (the "License"). You may not use this product except in +// compliance with the Apache 2.0 License. +// +// This product may include a number of subcomponents with separate copyright notices and license terms. Your use of +// these subcomponents is subject to the terms and conditions of the sub-component's license, as noted in the LICENSE +// file. + +#include +#include +#include + +#include +#include +#include "FullNodeReplica.hpp" + +#include "log/logger.hpp" +#include "MsgHandlersRegistrator.hpp" +#include "messages/CheckpointMsg.hpp" +#include "messages/AskForCheckpointMsg.hpp" +#include "messages/ClientRequestMsg.hpp" +#include "messages/ClientReplyMsg.hpp" +#include "util/kvstream.h" +#include "PersistentStorage.hpp" +#include "MsgsCommunicator.hpp" +#include "SigManager.hpp" +#include "ReconfigurationCmd.hpp" +#include "util/json_output.hpp" +#include "SharedTypes.hpp" +#include "communication/StateControl.hpp" + +using concordUtil::Timers; + +namespace bftEngine::impl { + +FullNodeReplica::FullNodeReplica(const ReplicaConfig &config, + std::shared_ptr requestsHandler, + IStateTransfer *stateTransfer, + std::shared_ptr msgComm, + std::shared_ptr msgHandlerReg, + concordUtil::Timers &timers, + MetadataStorage *metadataStorage) + : ReplicaForStateTransfer(config, requestsHandler, stateTransfer, msgComm, msgHandlerReg, true, timers), + ro_metrics_{metrics_.RegisterCounter("receivedCheckpointMsgs"), + metrics_.RegisterCounter("sentAskForCheckpointMsgs"), + metrics_.RegisterCounter("receivedInvalidMsgs"), + metrics_.RegisterGauge("lastExecutedSeqNum", lastExecutedSeqNum)}, + metadataStorage_{metadataStorage} { + LOG_INFO(GL, "Initialising ReadOnly Replica"); + repsInfo = new ReplicasInfo(config, dynamicCollectorForPartialProofs, dynamicCollectorForExecutionProofs); + msgHandlers_->registerMsgHandler( + MsgCode::Checkpoint, std::bind(&FullNodeReplica::messageHandler, this, std::placeholders::_1)); + msgHandlers_->registerMsgHandler( + MsgCode::ClientRequest, + std::bind(&FullNodeReplica::messageHandler, this, std::placeholders::_1)); + msgHandlers_->registerMsgHandler( + MsgCode::StateTransfer, + std::bind(&FullNodeReplica::messageHandler, this, std::placeholders::_1)); + metrics_.Register(); + + SigManager::init(config_.replicaId, + config_.replicaPrivateKey, + config_.publicKeysOfReplicas, + concord::crypto::KeyFormat::HexaDecimalStrippedFormat, + ReplicaConfig::instance().getPublicKeysOfClients(), + concord::crypto::KeyFormat::PemFormat, + {{repsInfo->getIdOfOperator(), + ReplicaConfig::instance().getOperatorPublicKey(), + concord::crypto::KeyFormat::PemFormat}}, + *repsInfo); + + // Register status handler for Read-Only replica + registerStatusHandlers(); + bft::communication::StateControl::instance().setGetPeerPubKeyMethod( + [&](uint32_t id) { return SigManager::instance()->getPublicKeyOfVerifier(id); }); +} + +void FullNodeReplica::start() { + ReplicaForStateTransfer::start(); + size_t sendAskForCheckpointMsgPeriodSec = config_.get("concord.bft.ro.sendAskForCheckpointMsgPeriodSec", 30); + askForCheckpointMsgTimer_ = timers_.add( + std::chrono::seconds(sendAskForCheckpointMsgPeriodSec), Timers::Timer::RECURRING, [this](Timers::Handle) { + if (!this->isCollectingState()) { + sendAskForCheckpointMsg(); + } + }); + msgsCommunicator_->startMsgsProcessing(config_.replicaId); +} + +void FullNodeReplica::stop() { + timers_.cancel(askForCheckpointMsgTimer_); + ReplicaForStateTransfer::stop(); +} + +void FullNodeReplica::onTransferringCompleteImp(uint64_t newStateCheckpoint) { + lastExecutedSeqNum = newStateCheckpoint * checkpointWindowSize; + + ro_metrics_.last_executed_seq_num_.Get().Set(lastExecutedSeqNum); + last_executed_seq_num_ = lastExecutedSeqNum; +} + +void FullNodeReplica::onReportAboutInvalidMessage(MessageBase *msg, const char *reason) { + ro_metrics_.received_invalid_msg_++; + LOG_WARN(GL, + "Node " << config_.replicaId << " received invalid message from Node " << msg->senderId() + << " type=" << msg->type() << " reason: " << reason); +} +void FullNodeReplica::sendAskForCheckpointMsg() { + ro_metrics_.sent_ask_for_checkpoint_msg_++; + LOG_INFO(GL, "sending AskForCheckpointMsg"); + auto msg = std::make_unique(config_.replicaId); + for (auto id : repsInfo->idsOfPeerReplicas()) send(msg.get(), id); +} + +template <> +void FullNodeReplica::onMessage(std::unique_ptr msg) { + ReplicaForStateTransfer::onMessage(move(msg)); +} + +template <> +void FullNodeReplica::onMessage(std::unique_ptr msg) { + if (isCollectingState()) { + return; + } + ro_metrics_.received_checkpoint_msg_++; + LOG_INFO(GL, + KVLOG(msg->senderId(), + msg->idOfGeneratedReplica(), + msg->seqNumber(), + msg->epochNumber(), + msg->size(), + msg->isStableState(), + msg->state(), + msg->stateDigest(), + msg->reservedPagesDigest(), + msg->rvbDataDigest())); + + // Reconfiguration cmd block is synced to RO replica via reserved pages + EpochNum replicasLastKnownEpochVal = 0; + auto epochNumberFromResPages = ReconfigurationCmd::instance().getReconfigurationCommandEpochNumber(); + if (epochNumberFromResPages.has_value()) replicasLastKnownEpochVal = epochNumberFromResPages.value(); + + // not relevant + if (!msg->isStableState() || msg->seqNumber() <= lastExecutedSeqNum || + msg->epochNumber() < replicasLastKnownEpochVal) { + return; + } + // no self certificate + static std::map> checkpointsInfo; + const auto msgSeqNum = msg->seqNumber(); + const auto idOfGeneratedReplica = msg->idOfGeneratedReplica(); + checkpointsInfo[msgSeqNum].addCheckpointMsg(msg.release(), idOfGeneratedReplica); + // if enough - invoke state transfer + if (checkpointsInfo[msgSeqNum].isCheckpointCertificateComplete()) { + persistCheckpointDescriptor(msgSeqNum, checkpointsInfo[msgSeqNum]); + checkpointsInfo.clear(); + LOG_INFO(GL, "call to startCollectingState()"); + stateTransfer->startCollectingState(); + } +} + +void FullNodeReplica::persistCheckpointDescriptor(const SeqNum &seqnum, const CheckpointInfo &chckpinfo) { + std::vector msgs; + msgs.reserve(chckpinfo.getAllCheckpointMsgs().size()); + for (const auto &m : chckpinfo.getAllCheckpointMsgs()) { + msgs.push_back(m.second); + LOG_INFO(GL, + KVLOG(m.second->seqNumber(), + m.second->epochNumber(), + m.second->state(), + m.second->stateDigest(), + m.second->reservedPagesDigest(), + m.second->rvbDataDigest(), + m.second->idOfGeneratedReplica())); + } + DescriptorOfLastStableCheckpoint desc(ReplicaConfig::instance().getnumReplicas(), msgs); + const size_t bufLen = DescriptorOfLastStableCheckpoint::maxSize(ReplicaConfig::instance().getnumReplicas()); + concord::serialize::UniquePtrToChar descBuf(new char[bufLen]); + char *descBufPtr = descBuf.get(); + size_t actualSize = 0; + desc.serialize(descBufPtr, bufLen, actualSize); + ConcordAssertNE(actualSize, 0); + + // TODO [TK] S3KeyGenerator + // checkpoints// + std::ostringstream oss; + oss << "checkpoints/" << msgs[0]->state() << "/" << config_.replicaId; + metadataStorage_->atomicWriteArbitraryObject(oss.str(), descBuf.get(), actualSize); +} + +template <> +void FullNodeReplica::onMessage(std::unique_ptr msg) { + const NodeIdType senderId = msg->senderId(); + const NodeIdType clientId = msg->clientProxyId(); + const bool reconfig_flag = (msg->flags() & MsgFlag::RECONFIG_FLAG) != 0; + const ReqId reqSeqNum = msg->requestSeqNum(); + const uint64_t flags = msg->flags(); + + SCOPED_MDC_CID(msg->getCid()); + LOG_DEBUG(CNSUS, KVLOG(clientId, reqSeqNum, senderId) << " flags: " << std::bitset(flags)); + + const auto &span_context = msg->spanContext::type>(); + auto span = concordUtils::startChildSpanFromContext(span_context, "bft_client_request"); + span.setTag("rid", config_.getreplicaId()); + span.setTag("cid", msg->getCid()); + span.setTag("seq_num", reqSeqNum); + + // A read only replica can handle only reconfiguration requests. Those requests are signed by the operator and + // the validation is done in the reconfiguration engine. Thus, we don't need to check the client validity as in + // the committers + + if (reconfig_flag) { + LOG_INFO(GL, "ro replica has received a reconfiguration request"); + executeReadOnlyRequest(span, *(msg.get())); + return; + } +} + +void FullNodeReplica::executeReadOnlyRequest(concordUtils::SpanWrapper &parent_span, const ClientRequestMsg &request) { + auto span = concordUtils::startChildSpan("bft_execute_read_only_request", parent_span); + // Read only replica does not know who is the primary, so it always return 0. It is the client responsibility to treat + // the replies accordingly. + ClientReplyMsg reply(0, request.requestSeqNum(), config_.getreplicaId()); + const uint16_t clientId = request.clientProxyId(); + + int executionResult = 0; + bftEngine::IRequestsHandler::ExecutionRequestsQueue accumulatedRequests; + accumulatedRequests.push_back(bftEngine::IRequestsHandler::ExecutionRequest{clientId, + static_cast(lastExecutedSeqNum), + request.getCid(), + request.flags(), + request.requestLength(), + request.requestBuf(), + "", + reply.maxReplyLength(), + reply.replyBuf(), + request.requestSeqNum(), + request.requestIndexInBatch(), + request.result()}); + // DD: Do we need to take care of Time Service here? + bftRequestsHandler_->execute(accumulatedRequests, std::nullopt, request.getCid(), span); + IRequestsHandler::ExecutionRequest &single_request = accumulatedRequests.back(); + executionResult = single_request.outExecutionStatus; + const uint32_t actualReplyLength = single_request.outActualReplySize; + const uint32_t actualReplicaSpecificInfoLength = single_request.outReplicaSpecificInfoSize; + LOG_DEBUG(GL, + "Executed read only request. " << KVLOG(clientId, + lastExecutedSeqNum, + request.requestLength(), + reply.maxReplyLength(), + actualReplyLength, + actualReplicaSpecificInfoLength, + executionResult)); + // TODO(GG): TBD - how do we want to support empty replies? (actualReplyLength==0) + if (!executionResult) { + if (actualReplyLength > 0) { + reply.setReplyLength(actualReplyLength); + reply.setReplicaSpecificInfoLength(actualReplicaSpecificInfoLength); + send(&reply, clientId); + return; + } else { + LOG_WARN(GL, "Received zero size response. " << KVLOG(clientId)); + strcpy(single_request.outReply, "Executed data is empty"); + single_request.outActualReplySize = strlen(single_request.outReply); + executionResult = static_cast(bftEngine::OperationResult::EXEC_DATA_EMPTY); + } + + } else { + LOG_ERROR(GL, "Received error while executing RO request. " << KVLOG(clientId, executionResult)); + } + ClientReplyMsg replyMsg( + 0, request.requestSeqNum(), single_request.outReply, single_request.outActualReplySize, executionResult); + send(&replyMsg, clientId); +} + +void FullNodeReplica::registerStatusHandlers() { + auto h = concord::diagnostics::StatusHandler( + "replica-sequence-numbers", "Last executed sequence number of the read-only replica", [this]() { + concordUtils::BuildJson bj; + + bj.startJson(); + bj.startNested("sequenceNumbers"); + bj.addKv("lastExecutedSeqNum", last_executed_seq_num_); + bj.endNested(); + bj.endJson(); + + return bj.getJson(); + }); + concord::diagnostics::RegistrarSingleton::getInstance().status.registerHandler(h); +} + +} // namespace bftEngine::impl diff --git a/bftengine/src/bftengine/FullNodeReplica.hpp b/bftengine/src/bftengine/FullNodeReplica.hpp new file mode 100644 index 0000000000..61f1ca1578 --- /dev/null +++ b/bftengine/src/bftengine/FullNodeReplica.hpp @@ -0,0 +1,80 @@ +// Concord +// +// Copyright (c) 2018-2019 VMware, Inc. All Rights Reserved. +// +// This product is licensed to you under the Apache 2.0 license (the "License"). You may not use this product except in +// compliance with the Apache 2.0 License. +// +// This product may include a number of subcomponents with separate copyright notices and license terms. Your use of +// these subcomponents is subject to the terms and conditions of the sub-component's license, as noted in the LICENSE +// file. + +#pragma once + +#include "ReplicaForStateTransfer.hpp" +#include "util/Timers.hpp" +#include "CheckpointInfo.hpp" + +namespace bftEngine::impl { + +class ClientRequestMsg; +/** + * + */ +class FullNodeReplica : public ReplicaForStateTransfer { + public: + FullNodeReplica(const ReplicaConfig&, + std::shared_ptr, + IStateTransfer*, + std::shared_ptr, + std::shared_ptr, + concordUtil::Timers& timers, + MetadataStorage* metadataStorage); + + void start() override; + void stop() override; + virtual bool isReadOnly() const override { return true; } + + protected: + void sendAskForCheckpointMsg(); + + void onTransferringCompleteImp(uint64_t newStateCheckpoint) override; + void onReportAboutInvalidMessage(MessageBase* msg, const char* reason) override; + + template + void messageHandler(std::unique_ptr msg) { + auto trueTypeObj = std::make_unique(msg.get()); + msg.reset(); + if (validateMessage(trueTypeObj.get())) { + onMessage(std::move(trueTypeObj)); + } + } + + template + void onMessage(std::unique_ptr); + + void executeReadOnlyRequest(concordUtils::SpanWrapper& parent_span, const ClientRequestMsg& m); + void persistCheckpointDescriptor(const SeqNum&, const CheckpointInfo&); + + protected: + concordUtil::Timers::Handle askForCheckpointMsgTimer_; + + struct Metrics { + concordMetrics::CounterHandle received_checkpoint_msg_; + concordMetrics::CounterHandle sent_ask_for_checkpoint_msg_; + concordMetrics::CounterHandle received_invalid_msg_; + concordMetrics::GaugeHandle last_executed_seq_num_; + } ro_metrics_; + + std::unique_ptr metadataStorage_; + std::atomic last_executed_seq_num_{0}; + + private: + // This function serves as an ReplicaStatusHandlers alternative for FullNodeReplica. The reason to use this function + // is that regular and read-only replicas expose different metrics and the status handlers are not interchangeable. + // The read-only replica also hasn't got an implementation for InternalMessages which are used by the + // ReplicaStatusHandler. + void registerStatusHandlers(); +}; + +} // namespace bftEngine::impl diff --git a/bftengine/src/bftengine/ReplicaFactory.cpp b/bftengine/src/bftengine/ReplicaFactory.cpp index f2a4653333..1bf85d2873 100644 --- a/bftengine/src/bftengine/ReplicaFactory.cpp +++ b/bftengine/src/bftengine/ReplicaFactory.cpp @@ -143,8 +143,9 @@ ReplicaFactory::IReplicaPtr ReplicaFactory::createReplica( replicaConfig.numReplicas, replicaConfig.fVal, replicaConfig.cVal, - replicaConfig.numReplicas + replicaConfig.numRoReplicas + replicaConfig.numOfClientProxies + - replicaConfig.numOfExternalClients + replicaConfig.numOfClientServices + replicaConfig.numReplicas, + replicaConfig.numReplicas + replicaConfig.numRoReplicas + replicaConfig.numFnReplicas + + replicaConfig.numOfClientProxies + replicaConfig.numOfExternalClients + replicaConfig.numOfClientServices + + replicaConfig.numReplicas, replicaConfig.clientBatchingMaxMsgsNbr)); unique_ptr metadataStoragePtr(metadataStorage); auto objectDescriptors = static_cast(persistentStoragePtr.get()) @@ -269,6 +270,26 @@ ReplicaFactory::IReplicaPtr ReplicaFactory::createRoReplica(const ReplicaConfig return replicaInternal; } +ReplicaFactory::IReplicaPtr ReplicaFactory::createFullNodeReplica(const ReplicaConfig &replicaConfig, + std::shared_ptr requestsHandler, + IStateTransfer *stateTransfer, + bft::communication::ICommunication *communication, + MetadataStorage *metadataStorage) { + LOG_INFO(logger_, "ReplicaFactory::createFullNodeReplica START"); + auto replicaInternal = std::make_unique(); + auto msgHandlers = std::make_shared(); + auto incomingMsgsStorageImpPtr = + std::make_unique(msgHandlers, timersResolution, replicaConfig.replicaId); + auto &timers = incomingMsgsStorageImpPtr->timers(); + std::shared_ptr incomingMsgsStorage{std::move(incomingMsgsStorageImpPtr)}; + auto msgReceiver = std::make_shared(incomingMsgsStorage); + auto msgsCommunicator = std::make_shared(communication, incomingMsgsStorage, msgReceiver); + replicaInternal->setReplica(std::make_unique( + replicaConfig, requestsHandler, stateTransfer, msgsCommunicator, msgHandlers, timers, metadataStorage)); + LOG_INFO(logger_, "ReplicaFactory::createFullNodeReplica END"); + return replicaInternal; +} + std::unique_ptr ReplicaFactory::createPreProcessor( const ReplicaConfig &replicaConfig, shared_ptr &msgsCommunicator, diff --git a/bftengine/src/bftengine/ReplicaForStateTransfer.cpp b/bftengine/src/bftengine/ReplicaForStateTransfer.cpp index 956bb75281..0387e1a596 100644 --- a/bftengine/src/bftengine/ReplicaForStateTransfer.cpp +++ b/bftengine/src/bftengine/ReplicaForStateTransfer.cpp @@ -49,8 +49,8 @@ ReplicaForStateTransfer::ReplicaForStateTransfer(const ReplicaConfig &config, // Reserved Pages and State Transfer initialization ClientsManager::setNumResPages( - (config.numReplicas + config.numRoReplicas + config.numOfClientProxies + config.numOfExternalClients + - config.numReplicas + config.numOfClientServices) * + (config.numReplicas + config.numRoReplicas + config.numFnReplicas + config.numOfClientProxies + + config.numOfExternalClients + config.numReplicas + config.numOfClientServices) * ClientsManager::reservedPagesPerClient( config.getsizeOfReservedPage(), config.maxReplyMessageSize, diff --git a/bftengine/src/bftengine/ReplicasInfo.cpp b/bftengine/src/bftengine/ReplicasInfo.cpp index c8dc186aa4..b4c0138fef 100644 --- a/bftengine/src/bftengine/ReplicasInfo.cpp +++ b/bftengine/src/bftengine/ReplicasInfo.cpp @@ -57,13 +57,14 @@ ReplicasInfo::ReplicasInfo(const ReplicaConfig& config, : _myId{config.replicaId}, _numberOfReplicas{config.numReplicas}, _numberOfRoReplicas{config.numRoReplicas}, + _numberOfFnReplicas{config.numFnReplicas}, _numOfClientProxies{config.numOfClientProxies}, _numberOfExternalClients{config.numOfExternalClients}, _numberOfClientServices{config.numOfClientServices}, _numberOfInternalClients{config.numReplicas}, - _maxValidPrincipalId{static_cast(config.numReplicas + config.numRoReplicas + config.numOfClientProxies + - config.numOfExternalClients + _numberOfInternalClients + - _numberOfClientServices - 1)}, + _maxValidPrincipalId{static_cast(config.numReplicas + config.numRoReplicas + config.numFnReplicas + + config.numOfClientProxies + config.numOfExternalClients + + _numberOfInternalClients + _numberOfClientServices - 1)}, _fVal{config.fVal}, _cVal{config.cVal}, _dynamicCollectorForPartialProofs{dynamicCollectorForPartialProofs}, @@ -87,7 +88,7 @@ ReplicasInfo::ReplicasInfo(const ReplicaConfig& config, _idsOfPeerROReplicas{[&config]() { std::set ret; uint16_t start = config.numReplicas; - uint16_t end = start + config.numRoReplicas; + uint16_t end = start + config.numRoReplicas + config.numFnReplicas; for (uint16_t i{start}; i < end; ++i) if (i != config.replicaId) { ret.insert(i); @@ -98,7 +99,7 @@ ReplicasInfo::ReplicasInfo(const ReplicaConfig& config, _idsOfClientProxies{[&config]() { std::set ret; - auto start = config.numReplicas + config.numRoReplicas; + auto start = config.numReplicas + config.numRoReplicas + config.numFnReplicas; auto end = start + config.numOfClientProxies; for (auto i = start; i < end; ++i) { ret.insert(i); @@ -109,7 +110,7 @@ ReplicasInfo::ReplicasInfo(const ReplicaConfig& config, _idsOfExternalClients{[&config]() { std::set ret; - auto start = config.numReplicas + config.numRoReplicas + config.numOfClientProxies; + auto start = config.numReplicas + config.numRoReplicas + config.numFnReplicas + config.numOfClientProxies; auto end = start + config.numOfExternalClients; for (auto i = start; i < (end - ((uint16_t)config.operatorEnabled_)); ++i) { ret.insert(i); @@ -124,7 +125,7 @@ ReplicasInfo::ReplicasInfo(const ReplicaConfig& config, _idsOfClientServices{[&config]() { std::set ret; - auto start = config.numReplicas + config.numRoReplicas + config.numOfClientProxies + + auto start = config.numReplicas + config.numRoReplicas + config.numFnReplicas + config.numOfClientProxies + config.numOfExternalClients - ((uint16_t)config.operatorEnabled_); auto end = start + config.numOfClientServices; for (auto i = start; i < end; ++i) { @@ -137,7 +138,7 @@ ReplicasInfo::ReplicasInfo(const ReplicaConfig& config, _idsOfInternalClients{[&config]() { std::set ret; - auto start = config.numReplicas + config.numRoReplicas + config.numOfClientProxies + + auto start = config.numReplicas + config.numRoReplicas + config.numFnReplicas + config.numOfClientProxies + config.numOfExternalClients + config.numOfClientServices; auto end = start + config.numReplicas; for (auto i = start; i < end; ++i) { @@ -147,8 +148,9 @@ ReplicasInfo::ReplicasInfo(const ReplicaConfig& config, return ret; }()} { _operator_id = config.operatorEnabled_ - ? static_cast(config.numReplicas + config.numRoReplicas + config.numOfClientProxies + - config.numOfExternalClients + config.numOfClientServices - 1) + ? static_cast(config.numReplicas + config.numRoReplicas + config.numFnReplicas + + config.numOfClientProxies + config.numOfExternalClients + + config.numOfClientServices - 1) : 0; ConcordAssert(_numberOfReplicas == (3 * _fVal + 2 * _cVal + 1)); } diff --git a/bftengine/src/bftengine/ReplicasInfo.hpp b/bftengine/src/bftengine/ReplicasInfo.hpp index f012a82990..a5a30c383e 100644 --- a/bftengine/src/bftengine/ReplicasInfo.hpp +++ b/bftengine/src/bftengine/ReplicasInfo.hpp @@ -81,6 +81,7 @@ class ReplicasInfo { uint16_t getNumberOfReplicas() { return _numberOfReplicas; } uint16_t getNumberOfRoReplicas() { return _numberOfRoReplicas; } + uint16_t getNumberOfFnReplicas() { return _numberOfFnReplicas; } uint16_t getNumOfClientProxies() { return _numOfClientProxies; } uint16_t getNumberOfExternalClients() { return _numberOfExternalClients; } uint16_t getNumberOfInternalClients() { return _numberOfInternalClients; } @@ -90,6 +91,7 @@ class ReplicasInfo { const ReplicaId _myId = 0; const uint16_t _numberOfReplicas = 0; const uint16_t _numberOfRoReplicas = 0; + const uint16_t _numberOfFnReplicas = 0; const uint16_t _numOfClientProxies = 0; const uint16_t _numberOfExternalClients = 0; const uint16_t _numberOfClientServices = 0; diff --git a/bftengine/src/bftengine/SigManager.cpp b/bftengine/src/bftengine/SigManager.cpp index 929ade0987..3e92b53d80 100644 --- a/bftengine/src/bftengine/SigManager.cpp +++ b/bftengine/src/bftengine/SigManager.cpp @@ -51,18 +51,23 @@ SigManager* SigManager::initImpl( size_t lowBound, highBound; auto numReplicas = replicasInfo.getNumberOfReplicas(); auto numRoReplicas = replicasInfo.getNumberOfRoReplicas(); + auto numFnReplicas = replicasInfo.getNumberOfFnReplicas(); auto numOfClientProxies = replicasInfo.getNumOfClientProxies(); auto numOfExternalClients = replicasInfo.getNumberOfExternalClients(); auto numOfInternalClients = replicasInfo.getNumberOfInternalClients(); auto numOfClientServices = replicasInfo.getNumberOfClientServices(); - LOG_INFO( - GL, - "Compute publicKeysMapping and publickeys: " << KVLOG( - myId, numReplicas, numRoReplicas, numOfClientProxies, numOfExternalClients, publicKeysOfReplicas.size())); + LOG_INFO(GL, + "Compute publicKeysMapping and publickeys: " << KVLOG(myId, + numReplicas, + numRoReplicas, + numFnReplicas, + numOfClientProxies, + numOfExternalClients, + publicKeysOfReplicas.size())); SigManager::KeyIndex i{0}; - highBound = numReplicas + numRoReplicas - 1; + highBound = numReplicas + numRoReplicas + numFnReplicas - 1; for (const auto& repIdToKeyPair : publicKeysOfReplicas) { // each replica sign with a unique private key (1 to 1 relation) ConcordAssert(repIdToKeyPair.first <= highBound); @@ -74,7 +79,7 @@ SigManager* SigManager::initImpl( // Multiple clients might be signing with the same private key (1 to many relation) // Also, we do not enforce to have all range between [lowBound, highBound] construcred. We might want to have less // principal ids mapped to keys than what is stated in the range. - lowBound = numRoReplicas + numReplicas + numOfClientProxies; + lowBound = numRoReplicas + numFnReplicas + numReplicas + numOfClientProxies; highBound = lowBound + numOfExternalClients + numOfInternalClients + numOfClientServices - 1; for (const auto& p : (*publicKeysOfClients)) { ConcordAssert(!p.first.empty()); diff --git a/bftengine/src/preprocessor/PreProcessor.cpp b/bftengine/src/preprocessor/PreProcessor.cpp index bdc3e86da5..062933ac42 100644 --- a/bftengine/src/preprocessor/PreProcessor.cpp +++ b/bftengine/src/preprocessor/PreProcessor.cpp @@ -407,7 +407,8 @@ PreProcessor::PreProcessor(shared_ptr &msgsCommunicator, myReplicaId_(myReplica.getReplicaConfig().replicaId), maxExternalMsgSize_(myReplica.getReplicaConfig().maxExternalMessageSize), responseSizeInternalOverhead_{/* for conflict detection BlockId */ sizeof(uint64_t)}, - numOfReplicas_(myReplica.getReplicaConfig().numReplicas + myReplica.getReplicaConfig().numRoReplicas), + numOfReplicas_(myReplica.getReplicaConfig().numReplicas + myReplica.getReplicaConfig().numRoReplicas + + myReplica.getReplicaConfig().numFnReplicas), numOfClientProxies_(myReplica.getReplicaConfig().numOfClientProxies), clientBatchingEnabled_(myReplica.getReplicaConfig().clientBatchingEnabled), threadPool_("PreProcessor::threadPool"), diff --git a/bftengine/src/simplestatetransfer/SimpleStateTran.cpp b/bftengine/src/simplestatetransfer/SimpleStateTran.cpp index 5f6c1957e4..7beab6b88a 100644 --- a/bftengine/src/simplestatetransfer/SimpleStateTran.cpp +++ b/bftengine/src/simplestatetransfer/SimpleStateTran.cpp @@ -313,8 +313,10 @@ SimpleStateTran::SimpleStateTran( cVal, // cVal (uint16_t)(3 * fVal + 2 * cVal + 1), // numReplicas 0, // numRoReplicas + 0, // numFnReplicas pedanticChecks, // pedanticChecks false, // isReadOnly + false, // isFullNode 128, // maxChunkSize 256, // maxNumberOfChunksInBatch 1024, // maxBlockSize diff --git a/bftengine/tests/bcstatetransfer/bcstatetransfer_tests.cpp b/bftengine/tests/bcstatetransfer/bcstatetransfer_tests.cpp index 025c9d5331..eedefd206f 100644 --- a/bftengine/tests/bcstatetransfer/bcstatetransfer_tests.cpp +++ b/bftengine/tests/bcstatetransfer/bcstatetransfer_tests.cpp @@ -84,8 +84,10 @@ Config targetConfig() { 0, // cVal 7, // numReplicas 0, // numRoReplicas + 0, // numFnReplicas false, // pedanticChecks false, // isReadOnly + false, // isFullNode 1024, // maxChunkSize 24, // maxNumberOfChunksInBatch 1024, // maxBlockSize diff --git a/examples/replica/src/SetupReplica.cpp b/examples/replica/src/SetupReplica.cpp index 1833a29e43..485e78ba65 100644 --- a/examples/replica/src/SetupReplica.cpp +++ b/examples/replica/src/SetupReplica.cpp @@ -202,8 +202,8 @@ bft::communication::ICommunication* SetupReplica::createCommunication( logging::Logger logger = getLogger(); TestCommConfig testCommConfig(logger); testCommConfig.GetReplicaConfig(replicaConfig.replicaId, keysFilePrefix, &replicaConfig); - uint16_t numOfReplicas = - (uint16_t)(3 * replicaConfig.fVal + 2 * replicaConfig.cVal + 1 + replicaConfig.numRoReplicas); + uint16_t numOfReplicas = (uint16_t)(3 * replicaConfig.fVal + 2 * replicaConfig.cVal + 1 + + replicaConfig.numRoReplicas + replicaConfig.numFnReplicas); auto numOfClients = replicaConfig.numOfClientProxies ? replicaConfig.numOfClientProxies : replicaConfig.numOfExternalClients; #ifdef USE_COMM_PLAIN_TCP diff --git a/kvbc/src/Replica.cpp b/kvbc/src/Replica.cpp index 2dff5b6df8..94c6229f41 100644 --- a/kvbc/src/Replica.cpp +++ b/kvbc/src/Replica.cpp @@ -58,16 +58,23 @@ Status Replica::initInternals() { m_currentRepStatus = RepStatus::Starting; - if (replicaConfig_.isReadOnly) { - LOG_INFO(logger, - "ReadOnly Replica Status:" << KVLOG(getLastBlockNum(), getLastBlockId(), getLastReachableBlockNum())); + if (replicaConfig_.isReadOnly || replicaConfig_.isFullNode) { auto requestHandler = bftEngine::IRequestsHandler::createRequestsHandler(m_cmdHandler, cronTableRegistry_); requestHandler->setReconfigurationHandler(std::make_shared( bftEngine::ReplicaConfig::instance().pathToOperatorPublicKey_, bftEngine::ReplicaConfig::instance().operatorMsgSigningAlgo, *this)); - m_replicaPtr = bftEngine::ReplicaFactory::createRoReplica( - replicaConfig_, requestHandler, m_stateTransfer, m_ptrComm.get(), m_metadataStorage); + if (replicaConfig_.isReadOnly || replicaConfig_.isFullNode) { + LOG_INFO(logger, + "Read Replica Status:" << KVLOG(getLastBlockNum(), getLastBlockId(), getLastReachableBlockNum())); + m_replicaPtr = bftEngine::ReplicaFactory::createRoReplica( + replicaConfig_, requestHandler, m_stateTransfer, m_ptrComm.get(), m_metadataStorage); + } else { + LOG_INFO(logger, + "FullNode Replica Status:" << KVLOG(getLastBlockNum(), getLastBlockId(), getLastReachableBlockNum())); + m_replicaPtr = bftEngine::ReplicaFactory::createFullNodeReplica( + replicaConfig_, requestHandler, m_stateTransfer, m_ptrComm.get(), m_metadataStorage); + } m_stateTransfer->addOnTransferringCompleteCallback([this](std::uint64_t) { std::vector stateFromReservedPages; uint64_t wedgePt{0}; @@ -330,7 +337,7 @@ void Replica::createReplicaAndSyncState() { const auto lastExecutedSeqNum = m_replicaPtr->getLastExecutedSequenceNum(); LOG_INFO(logger, KVLOG(lastExecutedSeqNum)); - if (!replicaConfig_.isReadOnly && !m_stateTransfer->isCollectingState()) { + if (!replicaConfig_.isReadOnly && !replicaConfig_.isFullNode && !m_stateTransfer->isCollectingState()) { try { const auto maxNumOfBlocksToDelete = replicaConfig_.maxNumOfRequestsInBatch; const auto removedBlocksNum = replicaStateSync_->execute( @@ -492,8 +499,10 @@ Replica::Replica(ICommunication *comm, replicaConfig_.cVal, replicaConfig_.numReplicas, replicaConfig_.numRoReplicas, + replicaConfig_.numFnReplicas, replicaConfig_.get("concord.bft.st.pedanticChecks", false), replicaConfig_.isReadOnly, + replicaConfig_.isFullNode, #if defined USE_COMM_PLAIN_TCP || defined USE_COMM_TLS_TCP replicaConfig_.get("concord.bft.st.maxChunkSize", 30 * 1024 * 1024), @@ -543,7 +552,7 @@ Replica::Replica(ICommunication *comm, stConfig.gettingMissingBlocksSummaryWindowSize = 50; } - if (!replicaConfig.isReadOnly) { + if (!replicaConfig.isReadOnly && !replicaConfig.isFullNode) { const auto linkStChain = true; { auto [it, inserted] = kvbc_categories.insert( @@ -592,12 +601,18 @@ Replica::Replica(ICommunication *comm, return oss.str(); }); registrar.status.registerHandler(handler); + } else if (replicaConfig.isFullNode) { + op_kvBlockchain.emplace(storage::rocksdb::NativeClient::fromIDBClient(m_dbSet.dataDBClient), + true, + kvbc_categories, + concord::kvbc::adapter::aux::AdapterAuxTypes(this->aggregator_)); + m_kvBlockchain = &(op_kvBlockchain.value()); } m_dbSet.dataDBClient->setAggregator(aggregator_); m_dbSet.metadataDBClient->setAggregator(aggregator_); auto stKeyManipulator = std::shared_ptr{storageFactory->newSTKeyManipulator()}; m_stateTransfer = bftEngine::bcst::create(stConfig, this, m_metadataDBClient, stKeyManipulator, aggregator_); - if (!replicaConfig.isReadOnly) { + if (!replicaConfig.isReadOnly && !replicaConfig.isFullNode) { stReconfigurationSM_ = std::make_unique(*m_stateTransfer, *this); m_metadataStorage = new DBMetadataStorage(m_metadataDBClient.get(), storageFactory->newMetadataKeyManipulator()); } else { diff --git a/kvbc/src/reconfiguration_kvbc_handler.cpp b/kvbc/src/reconfiguration_kvbc_handler.cpp index ba2e0adbde..ac634fbddc 100644 --- a/kvbc/src/reconfiguration_kvbc_handler.cpp +++ b/kvbc/src/reconfiguration_kvbc_handler.cpp @@ -465,7 +465,8 @@ bool KvbcClientReconfigurationHandler::handle(const concord::messages::ClientRec const std::optional& ts, concord::messages::ReconfigurationResponse& rres) { concord::messages::ClientReconfigurationStateReply rep; - uint16_t first_client_id = ReplicaConfig::instance().numReplicas + ReplicaConfig::instance().numRoReplicas; + uint16_t first_client_id = ReplicaConfig::instance().numReplicas + ReplicaConfig::instance().numRoReplicas + + ReplicaConfig::instance().numFnReplicas; if (sender_id > first_client_id) { for (uint8_t i = kvbc::keyTypes::CLIENT_COMMAND_TYPES::start_ + 1; i < kvbc::keyTypes::CLIENT_COMMAND_TYPES::end_; i++) { @@ -699,7 +700,10 @@ bool ReconfigurationHandler::handle(const concord::messages::AddRemoveWithWedgeC auto execute_key_prefix = std::string{kvbc::keyTypes::reconfiguration_client_data_prefix, static_cast(kvbc::keyTypes::CLIENT_COMMAND_TYPES::CLIENT_SCALING_EXECUTE_COMMAND)}; - for (uint64_t i = 0; i < ReplicaConfig::instance().numReplicas + ReplicaConfig::instance().numRoReplicas; i++) { + + for (auto i = 0; i < ReplicaConfig::instance().numReplicas + ReplicaConfig::instance().numRoReplicas + + ReplicaConfig::instance().numFnReplicas; + i++) { concord::messages::ClientsAddRemoveExecuteCommand cmd; cmd.config_descriptor = command.config_descriptor; if (token.find(i) == token.end()) continue; diff --git a/tests/simpleKVBC/TesterReplica/setup.cpp b/tests/simpleKVBC/TesterReplica/setup.cpp index af8e1620df..4d74a34c41 100644 --- a/tests/simpleKVBC/TesterReplica/setup.cpp +++ b/tests/simpleKVBC/TesterReplica/setup.cpp @@ -370,8 +370,8 @@ std::unique_ptr TestSetup::ParseArgs(int argc, char** argv) { TestCommConfig testCommConfig(logger); testCommConfig.GetReplicaConfig(replicaConfig.replicaId, keysFilePrefix, &replicaConfig); - uint16_t numOfReplicas = - (uint16_t)(3 * replicaConfig.fVal + 2 * replicaConfig.cVal + 1 + replicaConfig.numRoReplicas); + uint16_t numOfReplicas = (uint16_t)(3 * replicaConfig.fVal + 2 * replicaConfig.cVal + 1 + + replicaConfig.numRoReplicas + replicaConfig.numFnReplicas); auto numOfClients = replicaConfig.numOfClientProxies ? replicaConfig.numOfClientProxies : replicaConfig.numOfExternalClients; std::shared_ptr sm_ = From b9f0c6708e3260b6153cd3bc36d1be8b83cd5a9c Mon Sep 17 00:00:00 2001 From: Tarunkumar Banda Date: Mon, 24 Apr 2023 09:36:23 +0000 Subject: [PATCH 2/3] Refactoring the ST changes in FullNode --- .../bcstatetransfer/AsyncStateTransferCRE.cpp | 4 +-- bftengine/src/bcstatetransfer/BCStateTran.cpp | 6 ++--- bftengine/src/bftengine/FullNodeReplica.cpp | 26 +++++++++---------- bftengine/src/bftengine/FullNodeReplica.hpp | 9 ++++--- bftengine/src/bftengine/ReadOnlyReplica.hpp | 1 + bftengine/src/bftengine/ReplicaBase.hpp | 2 ++ bftengine/src/bftengine/ReplicaFactory.cpp | 2 +- bftengine/src/bftengine/ReplicaImp.hpp | 2 ++ bftengine/src/preprocessor/PreProcessor.cpp | 3 +-- examples/replica/src/SetupReplica.cpp | 4 +-- kvbc/src/Replica.cpp | 4 +-- kvbc/src/reconfiguration_kvbc_handler.cpp | 8 ++---- tests/simpleKVBC/TesterReplica/setup.cpp | 4 +-- 13 files changed, 37 insertions(+), 38 deletions(-) diff --git a/bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp b/bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp index fe0d8d63ca..ddb1cab14e 100644 --- a/bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp +++ b/bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp @@ -148,9 +148,7 @@ std::shared_ptr CreFactory::create(std::shared_ptr< for (uint16_t i = 0; i < repConfig.numReplicas; i++) { bftClientConf.all_replicas.emplace(bft::client::ReplicaId{i}); } - for (uint16_t i = repConfig.numReplicas; - i < repConfig.numReplicas + repConfig.numRoReplicas + repConfig.numFnReplicas; - i++) { + for (uint16_t i = repConfig.numReplicas; i < repConfig.numReplicas + repConfig.numRoReplicas; i++) { bftClientConf.ro_replicas.emplace(bft::client::ReplicaId{i}); } bftClientConf.replicas_master_key_folder_path = std::nullopt; diff --git a/bftengine/src/bcstatetransfer/BCStateTran.cpp b/bftengine/src/bcstatetransfer/BCStateTran.cpp index 8ca2a6ed1c..f774f2e810 100644 --- a/bftengine/src/bcstatetransfer/BCStateTran.cpp +++ b/bftengine/src/bcstatetransfer/BCStateTran.cpp @@ -3971,13 +3971,13 @@ void BCStateTran::computeDigestOfPage( if (checkpointNumber > 0) { digestGenerator.update(page, pageSize); } - digestGenerator.writeDigest(reinterpret_cast(&outDigest)); + digestGenerator.writeDigest(outDigest.getForUpdate()); } void BCStateTran::computeDigestOfPagesDescriptor(const DataStore::ResPagesDescriptor *pagesDesc, Digest &outDigest) { DigestGenerator digestGenerator; digestGenerator.update(reinterpret_cast(pagesDesc), pagesDesc->size()); - digestGenerator.writeDigest(reinterpret_cast(&outDigest)); + digestGenerator.writeDigest(outDigest.getForUpdate()); } void BCStateTran::computeDigestOfBlockImpl(const uint64_t blockNum, @@ -3996,7 +3996,7 @@ void BCStateTran::computeDigestOfBlock(const uint64_t blockNum, const char *block, const uint32_t blockSize, Digest *outDigest) { - computeDigestOfBlockImpl(blockNum, block, blockSize, reinterpret_cast(outDigest)); + computeDigestOfBlockImpl(blockNum, block, blockSize, outDigest->getForUpdate()); } BlockDigest BCStateTran::computeDigestOfBlock(const uint64_t blockNum, const char *block, const uint32_t blockSize) { diff --git a/bftengine/src/bftengine/FullNodeReplica.cpp b/bftengine/src/bftengine/FullNodeReplica.cpp index 4151d793b8..f71a4ee511 100755 --- a/bftengine/src/bftengine/FullNodeReplica.cpp +++ b/bftengine/src/bftengine/FullNodeReplica.cpp @@ -44,12 +44,12 @@ FullNodeReplica::FullNodeReplica(const ReplicaConfig &config, concordUtil::Timers &timers, MetadataStorage *metadataStorage) : ReplicaForStateTransfer(config, requestsHandler, stateTransfer, msgComm, msgHandlerReg, true, timers), - ro_metrics_{metrics_.RegisterCounter("receivedCheckpointMsgs"), + fn_metrics_{metrics_.RegisterCounter("receivedCheckpointMsgs"), metrics_.RegisterCounter("sentAskForCheckpointMsgs"), metrics_.RegisterCounter("receivedInvalidMsgs"), metrics_.RegisterGauge("lastExecutedSeqNum", lastExecutedSeqNum)}, metadataStorage_{metadataStorage} { - LOG_INFO(GL, "Initialising ReadOnly Replica"); + LOG_INFO(GL, "Initialising Full Node Replica"); repsInfo = new ReplicasInfo(config, dynamicCollectorForPartialProofs, dynamicCollectorForExecutionProofs); msgHandlers_->registerMsgHandler( MsgCode::Checkpoint, std::bind(&FullNodeReplica::messageHandler, this, std::placeholders::_1)); @@ -72,7 +72,7 @@ FullNodeReplica::FullNodeReplica(const ReplicaConfig &config, concord::crypto::KeyFormat::PemFormat}}, *repsInfo); - // Register status handler for Read-Only replica + // Register status handler for Full Node replica registerStatusHandlers(); bft::communication::StateControl::instance().setGetPeerPubKeyMethod( [&](uint32_t id) { return SigManager::instance()->getPublicKeyOfVerifier(id); }); @@ -98,18 +98,18 @@ void FullNodeReplica::stop() { void FullNodeReplica::onTransferringCompleteImp(uint64_t newStateCheckpoint) { lastExecutedSeqNum = newStateCheckpoint * checkpointWindowSize; - ro_metrics_.last_executed_seq_num_.Get().Set(lastExecutedSeqNum); + fn_metrics_.last_executed_seq_num_.Get().Set(lastExecutedSeqNum); last_executed_seq_num_ = lastExecutedSeqNum; } void FullNodeReplica::onReportAboutInvalidMessage(MessageBase *msg, const char *reason) { - ro_metrics_.received_invalid_msg_++; + fn_metrics_.received_invalid_msg_++; LOG_WARN(GL, "Node " << config_.replicaId << " received invalid message from Node " << msg->senderId() << " type=" << msg->type() << " reason: " << reason); } void FullNodeReplica::sendAskForCheckpointMsg() { - ro_metrics_.sent_ask_for_checkpoint_msg_++; + fn_metrics_.sent_ask_for_checkpoint_msg_++; LOG_INFO(GL, "sending AskForCheckpointMsg"); auto msg = std::make_unique(config_.replicaId); for (auto id : repsInfo->idsOfPeerReplicas()) send(msg.get(), id); @@ -125,7 +125,7 @@ void FullNodeReplica::onMessage(std::unique_ptr ms if (isCollectingState()) { return; } - ro_metrics_.received_checkpoint_msg_++; + fn_metrics_.received_checkpoint_msg_++; LOG_INFO(GL, KVLOG(msg->senderId(), msg->idOfGeneratedReplica(), @@ -208,12 +208,12 @@ void FullNodeReplica::onMessage(std::unique_ptrgetCid()); span.setTag("seq_num", reqSeqNum); - // A read only replica can handle only reconfiguration requests. Those requests are signed by the operator and + // A full node replica can handle only reconfiguration requests. Those requests are signed by the operator and // the validation is done in the reconfiguration engine. Thus, we don't need to check the client validity as in // the committers if (reconfig_flag) { - LOG_INFO(GL, "ro replica has received a reconfiguration request"); + LOG_INFO(GL, "FN replica has received a reconfiguration request"); executeReadOnlyRequest(span, *(msg.get())); return; } @@ -221,7 +221,7 @@ void FullNodeReplica::onMessage(std::unique_ptr metadataStorage_; std::atomic last_executed_seq_num_{0}; private: // This function serves as an ReplicaStatusHandlers alternative for FullNodeReplica. The reason to use this function - // is that regular and read-only replicas expose different metrics and the status handlers are not interchangeable. - // The read-only replica also hasn't got an implementation for InternalMessages which are used by the + // is that regular and full node replicas expose different metrics and the status handlers are not interchangeable. + // The full node replica also hasn't got an implementation for InternalMessages which are used by the // ReplicaStatusHandler. void registerStatusHandlers(); }; diff --git a/bftengine/src/bftengine/ReadOnlyReplica.hpp b/bftengine/src/bftengine/ReadOnlyReplica.hpp index 5d1951b173..4bc0b5461a 100644 --- a/bftengine/src/bftengine/ReadOnlyReplica.hpp +++ b/bftengine/src/bftengine/ReadOnlyReplica.hpp @@ -34,6 +34,7 @@ class ReadOnlyReplica : public ReplicaForStateTransfer { void start() override; void stop() override; virtual bool isReadOnly() const override { return true; } + virtual bool isFullNode() const override { return false; } protected: void sendAskForCheckpointMsg(); diff --git a/bftengine/src/bftengine/ReplicaBase.hpp b/bftengine/src/bftengine/ReplicaBase.hpp index 7799fb4820..f1650969b9 100644 --- a/bftengine/src/bftengine/ReplicaBase.hpp +++ b/bftengine/src/bftengine/ReplicaBase.hpp @@ -50,6 +50,8 @@ class ReplicaBase { virtual bool isReadOnly() const = 0; + virtual bool isFullNode() const = 0; + std::shared_ptr getMsgsCommunicator() const { return msgsCommunicator_; } std::shared_ptr getMsgHandlersRegistrator() const { return msgHandlers_; } concordUtil::Timers* getTimers() { return &timers_; } diff --git a/bftengine/src/bftengine/ReplicaFactory.cpp b/bftengine/src/bftengine/ReplicaFactory.cpp index 1bf85d2873..c8879fb368 100644 --- a/bftengine/src/bftengine/ReplicaFactory.cpp +++ b/bftengine/src/bftengine/ReplicaFactory.cpp @@ -96,7 +96,7 @@ void ReplicaInternal::restartForDebug(uint32_t delayMillisec) { } } - if (!replica_->isReadOnly()) { + if (!replica_->isReadOnly() && !replica_->isFullNode()) { auto replicaImp = dynamic_cast(replica_.get()); shared_ptr persistentStorage(replicaImp->getPersistentStorage()); diff --git a/bftengine/src/bftengine/ReplicaImp.hpp b/bftengine/src/bftengine/ReplicaImp.hpp index 44c319f67e..7d595569ad 100644 --- a/bftengine/src/bftengine/ReplicaImp.hpp +++ b/bftengine/src/bftengine/ReplicaImp.hpp @@ -366,6 +366,8 @@ class ReplicaImp : public InternalReplicaApi, public ReplicaForStateTransfer { virtual bool isReadOnly() const override { return false; } + virtual bool isFullNode() const override { return false; } + shared_ptr getPersistentStorage() const { return ps_; } std::shared_ptr getSecretsManager() { return sm_; } diff --git a/bftengine/src/preprocessor/PreProcessor.cpp b/bftengine/src/preprocessor/PreProcessor.cpp index 062933ac42..bdc3e86da5 100644 --- a/bftengine/src/preprocessor/PreProcessor.cpp +++ b/bftengine/src/preprocessor/PreProcessor.cpp @@ -407,8 +407,7 @@ PreProcessor::PreProcessor(shared_ptr &msgsCommunicator, myReplicaId_(myReplica.getReplicaConfig().replicaId), maxExternalMsgSize_(myReplica.getReplicaConfig().maxExternalMessageSize), responseSizeInternalOverhead_{/* for conflict detection BlockId */ sizeof(uint64_t)}, - numOfReplicas_(myReplica.getReplicaConfig().numReplicas + myReplica.getReplicaConfig().numRoReplicas + - myReplica.getReplicaConfig().numFnReplicas), + numOfReplicas_(myReplica.getReplicaConfig().numReplicas + myReplica.getReplicaConfig().numRoReplicas), numOfClientProxies_(myReplica.getReplicaConfig().numOfClientProxies), clientBatchingEnabled_(myReplica.getReplicaConfig().clientBatchingEnabled), threadPool_("PreProcessor::threadPool"), diff --git a/examples/replica/src/SetupReplica.cpp b/examples/replica/src/SetupReplica.cpp index 485e78ba65..1833a29e43 100644 --- a/examples/replica/src/SetupReplica.cpp +++ b/examples/replica/src/SetupReplica.cpp @@ -202,8 +202,8 @@ bft::communication::ICommunication* SetupReplica::createCommunication( logging::Logger logger = getLogger(); TestCommConfig testCommConfig(logger); testCommConfig.GetReplicaConfig(replicaConfig.replicaId, keysFilePrefix, &replicaConfig); - uint16_t numOfReplicas = (uint16_t)(3 * replicaConfig.fVal + 2 * replicaConfig.cVal + 1 + - replicaConfig.numRoReplicas + replicaConfig.numFnReplicas); + uint16_t numOfReplicas = + (uint16_t)(3 * replicaConfig.fVal + 2 * replicaConfig.cVal + 1 + replicaConfig.numRoReplicas); auto numOfClients = replicaConfig.numOfClientProxies ? replicaConfig.numOfClientProxies : replicaConfig.numOfExternalClients; #ifdef USE_COMM_PLAIN_TCP diff --git a/kvbc/src/Replica.cpp b/kvbc/src/Replica.cpp index 94c6229f41..227a12c227 100644 --- a/kvbc/src/Replica.cpp +++ b/kvbc/src/Replica.cpp @@ -64,9 +64,9 @@ Status Replica::initInternals() { bftEngine::ReplicaConfig::instance().pathToOperatorPublicKey_, bftEngine::ReplicaConfig::instance().operatorMsgSigningAlgo, *this)); - if (replicaConfig_.isReadOnly || replicaConfig_.isFullNode) { + if (replicaConfig_.isReadOnly) { LOG_INFO(logger, - "Read Replica Status:" << KVLOG(getLastBlockNum(), getLastBlockId(), getLastReachableBlockNum())); + "ReadOnly Replica Status:" << KVLOG(getLastBlockNum(), getLastBlockId(), getLastReachableBlockNum())); m_replicaPtr = bftEngine::ReplicaFactory::createRoReplica( replicaConfig_, requestHandler, m_stateTransfer, m_ptrComm.get(), m_metadataStorage); } else { diff --git a/kvbc/src/reconfiguration_kvbc_handler.cpp b/kvbc/src/reconfiguration_kvbc_handler.cpp index ac634fbddc..ba2e0adbde 100644 --- a/kvbc/src/reconfiguration_kvbc_handler.cpp +++ b/kvbc/src/reconfiguration_kvbc_handler.cpp @@ -465,8 +465,7 @@ bool KvbcClientReconfigurationHandler::handle(const concord::messages::ClientRec const std::optional& ts, concord::messages::ReconfigurationResponse& rres) { concord::messages::ClientReconfigurationStateReply rep; - uint16_t first_client_id = ReplicaConfig::instance().numReplicas + ReplicaConfig::instance().numRoReplicas + - ReplicaConfig::instance().numFnReplicas; + uint16_t first_client_id = ReplicaConfig::instance().numReplicas + ReplicaConfig::instance().numRoReplicas; if (sender_id > first_client_id) { for (uint8_t i = kvbc::keyTypes::CLIENT_COMMAND_TYPES::start_ + 1; i < kvbc::keyTypes::CLIENT_COMMAND_TYPES::end_; i++) { @@ -700,10 +699,7 @@ bool ReconfigurationHandler::handle(const concord::messages::AddRemoveWithWedgeC auto execute_key_prefix = std::string{kvbc::keyTypes::reconfiguration_client_data_prefix, static_cast(kvbc::keyTypes::CLIENT_COMMAND_TYPES::CLIENT_SCALING_EXECUTE_COMMAND)}; - - for (auto i = 0; i < ReplicaConfig::instance().numReplicas + ReplicaConfig::instance().numRoReplicas + - ReplicaConfig::instance().numFnReplicas; - i++) { + for (uint64_t i = 0; i < ReplicaConfig::instance().numReplicas + ReplicaConfig::instance().numRoReplicas; i++) { concord::messages::ClientsAddRemoveExecuteCommand cmd; cmd.config_descriptor = command.config_descriptor; if (token.find(i) == token.end()) continue; diff --git a/tests/simpleKVBC/TesterReplica/setup.cpp b/tests/simpleKVBC/TesterReplica/setup.cpp index 4d74a34c41..af8e1620df 100644 --- a/tests/simpleKVBC/TesterReplica/setup.cpp +++ b/tests/simpleKVBC/TesterReplica/setup.cpp @@ -370,8 +370,8 @@ std::unique_ptr TestSetup::ParseArgs(int argc, char** argv) { TestCommConfig testCommConfig(logger); testCommConfig.GetReplicaConfig(replicaConfig.replicaId, keysFilePrefix, &replicaConfig); - uint16_t numOfReplicas = (uint16_t)(3 * replicaConfig.fVal + 2 * replicaConfig.cVal + 1 + - replicaConfig.numRoReplicas + replicaConfig.numFnReplicas); + uint16_t numOfReplicas = + (uint16_t)(3 * replicaConfig.fVal + 2 * replicaConfig.cVal + 1 + replicaConfig.numRoReplicas); auto numOfClients = replicaConfig.numOfClientProxies ? replicaConfig.numOfClientProxies : replicaConfig.numOfExternalClients; std::shared_ptr sm_ = From 2fc060393dcbec320a983067504158d5d61de26e Mon Sep 17 00:00:00 2001 From: "tarunkumar21c@gmail.com" Date: Wed, 3 May 2023 16:10:51 +0000 Subject: [PATCH 3/3] Fixed review comments --- .../bcstatetransfer/SimpleBCStateTransfer.hpp | 16 +- bftengine/src/bftengine/ClientsManager.cpp | 5 +- bftengine/src/bftengine/FullNodeReplica.cpp | 168 ++++++------------ bftengine/src/bftengine/FullNodeReplica.hpp | 9 +- bftengine/src/bftengine/ReadOnlyReplica.hpp | 4 +- bftengine/src/bftengine/ReplicaImp.hpp | 4 +- 6 files changed, 73 insertions(+), 133 deletions(-) mode change 100755 => 100644 bftengine/src/bftengine/FullNodeReplica.cpp diff --git a/bftengine/include/bcstatetransfer/SimpleBCStateTransfer.hpp b/bftengine/include/bcstatetransfer/SimpleBCStateTransfer.hpp index c6943bce1a..ed9d8c353c 100644 --- a/bftengine/include/bcstatetransfer/SimpleBCStateTransfer.hpp +++ b/bftengine/include/bcstatetransfer/SimpleBCStateTransfer.hpp @@ -193,19 +193,21 @@ inline std::ostream &operator<<(std::ostream &os, const Config &c) { c.cVal, c.numReplicas, c.numRoReplicas, + c.numFnReplicas, c.pedanticChecks, c.isReadOnly, + c.isFullNode, c.maxChunkSize, c.maxNumberOfChunksInBatch, c.maxBlockSize, c.maxPendingDataFromSourceReplica, c.maxNumOfReservedPages, c.sizeOfReservedPage, - c.gettingMissingBlocksSummaryWindowSize, - c.minPrePrepareMsgsForPrimaryAwareness, - c.fetchRangeSize); + c.gettingMissingBlocksSummaryWindowSize); os << ","; - os << KVLOG(c.RVT_K, + os << KVLOG(c.minPrePrepareMsgsForPrimaryAwareness, + c.fetchRangeSize, + c.RVT_K, c.refreshTimerMs, c.checkpointSummariesRetransmissionTimeoutMs, c.maxAcceptableMsgDelayMs, @@ -218,11 +220,9 @@ inline std::ostream &operator<<(std::ostream &os, const Config &c) { c.sourcePerformanceSnapshotFrequencySec, c.runInSeparateThread, c.enableReservedPages, - c.enableSourceBlocksPreFetch, - c.enableSourceSelectorPrimaryAwareness, - c.enableStoreRvbDataDuringCheckpointing); + c.enableSourceBlocksPreFetch); os << ","; - os << KVLOG(c.numFnReplicas, c.isFullNode); + os << KVLOG(c.enableSourceSelectorPrimaryAwareness, c.enableStoreRvbDataDuringCheckpointing); return os; } // creates an instance of the state transfer module. diff --git a/bftengine/src/bftengine/ClientsManager.cpp b/bftengine/src/bftengine/ClientsManager.cpp index 80e76e37e2..338c108ddb 100644 --- a/bftengine/src/bftengine/ClientsManager.cpp +++ b/bftengine/src/bftengine/ClientsManager.cpp @@ -217,9 +217,8 @@ ClientsManager::ClientsManager(const std::set& proxyClients, ConcordAssert(maxNumOfReqsPerClient_ > 0); reservedPagesPerRequest_ = reservedPagesPerRequest(sizeOfReservedPage(), maxReplySize_); reservedPagesPerClient_ = reservedPagesPerClient(sizeOfReservedPage(), maxReplySize_, maxNumOfReqsPerClient_); - for (NodeIdType i = 0; i < ReplicaConfig::instance().numReplicas + ReplicaConfig::instance().numRoReplicas + - ReplicaConfig::instance().numFnReplicas; - i++) { + const auto& config = ReplicaConfig::instance(); + for (NodeIdType i{}; i < config.numReplicas + config.numRoReplicas + config.numFnReplicas; ++i) { clientIds_.insert(i); } clientIds_.insert(proxyClients_.begin(), proxyClients_.end()); diff --git a/bftengine/src/bftengine/FullNodeReplica.cpp b/bftengine/src/bftengine/FullNodeReplica.cpp old mode 100755 new mode 100644 index f71a4ee511..e78ed0d3f7 --- a/bftengine/src/bftengine/FullNodeReplica.cpp +++ b/bftengine/src/bftengine/FullNodeReplica.cpp @@ -1,6 +1,6 @@ // Concord // -// Copyright (c) 2018, 2019 VMware, Inc. All Rights Reserved. +// Copyright (c) 2023 VMware, Inc. All Rights Reserved. // // This product is licensed to you under the Apache 2.0 license (the "License"). You may not use this product except in // compliance with the Apache 2.0 License. @@ -13,8 +13,8 @@ #include #include -#include -#include +#include "bftengine/Replica.hpp" +#include "messages/StateTransferMsg.hpp" #include "FullNodeReplica.hpp" #include "log/logger.hpp" @@ -33,32 +33,31 @@ #include "communication/StateControl.hpp" using concordUtil::Timers; +using namespace std::placeholders; + +// Note : The changes in files are inclined with RO replica SateTransfer behavior, all the class functions are inherited +// from ReadOnlyReplica. As we know for timebeing StateTransfer functionality is a temporary solution for FullNode, +// until the ASP/BSP is implemented the functions in this class needs to be changed based on the required accordingly. namespace bftEngine::impl { FullNodeReplica::FullNodeReplica(const ReplicaConfig &config, - std::shared_ptr requestsHandler, - IStateTransfer *stateTransfer, - std::shared_ptr msgComm, - std::shared_ptr msgHandlerReg, + std::shared_ptr requests_handler, + IStateTransfer *state_transfer, + std::shared_ptr msg_comm, + std::shared_ptr msg_handler_reg, concordUtil::Timers &timers, - MetadataStorage *metadataStorage) - : ReplicaForStateTransfer(config, requestsHandler, stateTransfer, msgComm, msgHandlerReg, true, timers), + MetadataStorage *metadata_storage) + : ReplicaForStateTransfer(config, requests_handler, state_transfer, msg_comm, msg_handler_reg, true, timers), fn_metrics_{metrics_.RegisterCounter("receivedCheckpointMsgs"), metrics_.RegisterCounter("sentAskForCheckpointMsgs"), metrics_.RegisterCounter("receivedInvalidMsgs"), metrics_.RegisterGauge("lastExecutedSeqNum", lastExecutedSeqNum)}, - metadataStorage_{metadataStorage} { + metadata_storage_{metadata_storage} { LOG_INFO(GL, "Initialising Full Node Replica"); repsInfo = new ReplicasInfo(config, dynamicCollectorForPartialProofs, dynamicCollectorForExecutionProofs); - msgHandlers_->registerMsgHandler( - MsgCode::Checkpoint, std::bind(&FullNodeReplica::messageHandler, this, std::placeholders::_1)); - msgHandlers_->registerMsgHandler( - MsgCode::ClientRequest, - std::bind(&FullNodeReplica::messageHandler, this, std::placeholders::_1)); - msgHandlers_->registerMsgHandler( - MsgCode::StateTransfer, - std::bind(&FullNodeReplica::messageHandler, this, std::placeholders::_1)); + + registerMsgHandlers(); metrics_.Register(); SigManager::init(config_.replicaId, @@ -96,10 +95,8 @@ void FullNodeReplica::stop() { } void FullNodeReplica::onTransferringCompleteImp(uint64_t newStateCheckpoint) { - lastExecutedSeqNum = newStateCheckpoint * checkpointWindowSize; - - fn_metrics_.last_executed_seq_num_.Get().Set(lastExecutedSeqNum); - last_executed_seq_num_ = lastExecutedSeqNum; + last_executed_seq_num_ = newStateCheckpoint * checkpointWindowSize; + fn_metrics_.last_executed_seq_num_.Get().Set(last_executed_seq_num_); } void FullNodeReplica::onReportAboutInvalidMessage(MessageBase *msg, const char *reason) { @@ -111,8 +108,8 @@ void FullNodeReplica::onReportAboutInvalidMessage(MessageBase *msg, const char * void FullNodeReplica::sendAskForCheckpointMsg() { fn_metrics_.sent_ask_for_checkpoint_msg_++; LOG_INFO(GL, "sending AskForCheckpointMsg"); - auto msg = std::make_unique(config_.replicaId); - for (auto id : repsInfo->idsOfPeerReplicas()) send(msg.get(), id); + AskForCheckpointMsg msg{config_.replicaId}; + for (auto id : repsInfo->idsOfPeerReplicas()) send(&msg, id); } template <> @@ -139,24 +136,24 @@ void FullNodeReplica::onMessage(std::unique_ptr ms msg->rvbDataDigest())); // Reconfiguration cmd block is synced to RO replica via reserved pages - EpochNum replicasLastKnownEpochVal = 0; - auto epochNumberFromResPages = ReconfigurationCmd::instance().getReconfigurationCommandEpochNumber(); - if (epochNumberFromResPages.has_value()) replicasLastKnownEpochVal = epochNumberFromResPages.value(); + EpochNum replicas_last_known_epoch_val = 0; + auto epoch_number_from_res_pages = ReconfigurationCmd::instance().getReconfigurationCommandEpochNumber(); + if (epoch_number_from_res_pages.has_value()) replicas_last_known_epoch_val = epoch_number_from_res_pages.value(); // not relevant if (!msg->isStableState() || msg->seqNumber() <= lastExecutedSeqNum || - msg->epochNumber() < replicasLastKnownEpochVal) { + msg->epochNumber() < replicas_last_known_epoch_val) { return; } // no self certificate - static std::map> checkpointsInfo; - const auto msgSeqNum = msg->seqNumber(); - const auto idOfGeneratedReplica = msg->idOfGeneratedReplica(); - checkpointsInfo[msgSeqNum].addCheckpointMsg(msg.release(), idOfGeneratedReplica); + static std::map> checkpoints_info; + const auto msg_seq_num = msg->seqNumber(); + const auto id_of_generated_eplica = msg->idOfGeneratedReplica(); + checkpoints_info[msg_seq_num].addCheckpointMsg(msg.release(), id_of_generated_eplica); // if enough - invoke state transfer - if (checkpointsInfo[msgSeqNum].isCheckpointCertificateComplete()) { - persistCheckpointDescriptor(msgSeqNum, checkpointsInfo[msgSeqNum]); - checkpointsInfo.clear(); + if (checkpoints_info[msg_seq_num].isCheckpointCertificateComplete()) { + persistCheckpointDescriptor(msg_seq_num, checkpoints_info[msg_seq_num]); + checkpoints_info.clear(); LOG_INFO(GL, "call to startCollectingState()"); stateTransfer->startCollectingState(); } @@ -177,103 +174,37 @@ void FullNodeReplica::persistCheckpointDescriptor(const SeqNum &seqnum, const Ch m.second->idOfGeneratedReplica())); } DescriptorOfLastStableCheckpoint desc(ReplicaConfig::instance().getnumReplicas(), msgs); - const size_t bufLen = DescriptorOfLastStableCheckpoint::maxSize(ReplicaConfig::instance().getnumReplicas()); - concord::serialize::UniquePtrToChar descBuf(new char[bufLen]); - char *descBufPtr = descBuf.get(); - size_t actualSize = 0; - desc.serialize(descBufPtr, bufLen, actualSize); - ConcordAssertNE(actualSize, 0); + const size_t buf_len = DescriptorOfLastStableCheckpoint::maxSize(ReplicaConfig::instance().getnumReplicas()); + concord::serialize::UniquePtrToChar desc_buf(new char[buf_len]); + char *desc_buf_ptr = desc_buf.get(); + size_t actual_size = 0; + desc.serialize(desc_buf_ptr, buf_len, actual_size); + ConcordAssertNE(actual_size, 0); // TODO [TK] S3KeyGenerator // checkpoints// std::ostringstream oss; oss << "checkpoints/" << msgs[0]->state() << "/" << config_.replicaId; - metadataStorage_->atomicWriteArbitraryObject(oss.str(), descBuf.get(), actualSize); + metadata_storage_->atomicWriteArbitraryObject(oss.str(), desc_buf.get(), actual_size); } template <> void FullNodeReplica::onMessage(std::unique_ptr msg) { - const NodeIdType senderId = msg->senderId(); - const NodeIdType clientId = msg->clientProxyId(); - const bool reconfig_flag = (msg->flags() & MsgFlag::RECONFIG_FLAG) != 0; - const ReqId reqSeqNum = msg->requestSeqNum(); + const NodeIdType sender_id = msg->senderId(); + const NodeIdType client_id = msg->clientProxyId(); + const ReqId req_seq_num = msg->requestSeqNum(); const uint64_t flags = msg->flags(); SCOPED_MDC_CID(msg->getCid()); - LOG_DEBUG(CNSUS, KVLOG(clientId, reqSeqNum, senderId) << " flags: " << std::bitset(flags)); + LOG_DEBUG(CNSUS, KVLOG(client_id, req_seq_num, sender_id) << " flags: " << std::bitset(flags)); const auto &span_context = msg->spanContext::type>(); auto span = concordUtils::startChildSpanFromContext(span_context, "bft_client_request"); span.setTag("rid", config_.getreplicaId()); span.setTag("cid", msg->getCid()); - span.setTag("seq_num", reqSeqNum); - - // A full node replica can handle only reconfiguration requests. Those requests are signed by the operator and - // the validation is done in the reconfiguration engine. Thus, we don't need to check the client validity as in - // the committers + span.setTag("seq_num", req_seq_num); - if (reconfig_flag) { - LOG_INFO(GL, "FN replica has received a reconfiguration request"); - executeReadOnlyRequest(span, *(msg.get())); - return; - } -} - -void FullNodeReplica::executeReadOnlyRequest(concordUtils::SpanWrapper &parent_span, const ClientRequestMsg &request) { - auto span = concordUtils::startChildSpan("bft_execute_read_only_request", parent_span); - // full node replica does not know who is the primary, so it always return 0. It is the client responsibility to treat - // the replies accordingly. - ClientReplyMsg reply(0, request.requestSeqNum(), config_.getreplicaId()); - const uint16_t clientId = request.clientProxyId(); - - int executionResult = 0; - bftEngine::IRequestsHandler::ExecutionRequestsQueue accumulatedRequests; - accumulatedRequests.push_back(bftEngine::IRequestsHandler::ExecutionRequest{clientId, - static_cast(lastExecutedSeqNum), - request.getCid(), - request.flags(), - request.requestLength(), - request.requestBuf(), - "", - reply.maxReplyLength(), - reply.replyBuf(), - request.requestSeqNum(), - request.requestIndexInBatch(), - request.result()}); - // DD: Do we need to take care of Time Service here? - bftRequestsHandler_->execute(accumulatedRequests, std::nullopt, request.getCid(), span); - IRequestsHandler::ExecutionRequest &single_request = accumulatedRequests.back(); - executionResult = single_request.outExecutionStatus; - const uint32_t actualReplyLength = single_request.outActualReplySize; - const uint32_t actualReplicaSpecificInfoLength = single_request.outReplicaSpecificInfoSize; - LOG_DEBUG(GL, - "Executed full node request. " << KVLOG(clientId, - lastExecutedSeqNum, - request.requestLength(), - reply.maxReplyLength(), - actualReplyLength, - actualReplicaSpecificInfoLength, - executionResult)); - // TODO(GG): TBD - how do we want to support empty replies? (actualReplyLength==0) - if (!executionResult) { - if (actualReplyLength > 0) { - reply.setReplyLength(actualReplyLength); - reply.setReplicaSpecificInfoLength(actualReplicaSpecificInfoLength); - send(&reply, clientId); - return; - } else { - LOG_WARN(GL, "Received zero size response. " << KVLOG(clientId)); - strcpy(single_request.outReply, "Executed data is empty"); - single_request.outActualReplySize = strlen(single_request.outReply); - executionResult = static_cast(bftEngine::OperationResult::EXEC_DATA_EMPTY); - } - - } else { - LOG_ERROR(GL, "Received error while executing FN request. " << KVLOG(clientId, executionResult)); - } - ClientReplyMsg replyMsg( - 0, request.requestSeqNum(), single_request.outReply, single_request.outActualReplySize, executionResult); - send(&replyMsg, clientId); + // TODO: handle reconfiguration request here, refer ReadOnlyReplica class } void FullNodeReplica::registerStatusHandlers() { @@ -292,4 +223,13 @@ void FullNodeReplica::registerStatusHandlers() { concord::diagnostics::RegistrarSingleton::getInstance().status.registerHandler(h); } +void FullNodeReplica::registerMsgHandlers() { + msgHandlers_->registerMsgHandler(MsgCode::Checkpoint, + std::bind(&FullNodeReplica::messageHandler, this, _1)); + msgHandlers_->registerMsgHandler(MsgCode::ClientRequest, + std::bind(&FullNodeReplica::messageHandler, this, _1)); + msgHandlers_->registerMsgHandler(MsgCode::StateTransfer, + std::bind(&FullNodeReplica::messageHandler, this, _1)); +} + } // namespace bftEngine::impl diff --git a/bftengine/src/bftengine/FullNodeReplica.hpp b/bftengine/src/bftengine/FullNodeReplica.hpp index 2b2e4f7f37..05ff032003 100644 --- a/bftengine/src/bftengine/FullNodeReplica.hpp +++ b/bftengine/src/bftengine/FullNodeReplica.hpp @@ -33,8 +33,8 @@ class FullNodeReplica : public ReplicaForStateTransfer { void start() override; void stop() override; - virtual bool isReadOnly() const override { return false; } - virtual bool isFullNode() const override { return true; } + virtual bool isReadOnly() const override { return config_.isReadOnly; } + virtual bool isFullNode() const override { return config_.isFullNode; } protected: void sendAskForCheckpointMsg(); @@ -54,7 +54,6 @@ class FullNodeReplica : public ReplicaForStateTransfer { template void onMessage(std::unique_ptr); - void executeReadOnlyRequest(concordUtils::SpanWrapper& parent_span, const ClientRequestMsg& m); void persistCheckpointDescriptor(const SeqNum&, const CheckpointInfo&); protected: @@ -67,7 +66,7 @@ class FullNodeReplica : public ReplicaForStateTransfer { concordMetrics::GaugeHandle last_executed_seq_num_; } fn_metrics_; - std::unique_ptr metadataStorage_; + std::unique_ptr metadata_storage_; std::atomic last_executed_seq_num_{0}; private: @@ -76,6 +75,8 @@ class FullNodeReplica : public ReplicaForStateTransfer { // The full node replica also hasn't got an implementation for InternalMessages which are used by the // ReplicaStatusHandler. void registerStatusHandlers(); + + void registerMsgHandlers(); }; } // namespace bftEngine::impl diff --git a/bftengine/src/bftengine/ReadOnlyReplica.hpp b/bftengine/src/bftengine/ReadOnlyReplica.hpp index 4bc0b5461a..efabe48fa2 100644 --- a/bftengine/src/bftengine/ReadOnlyReplica.hpp +++ b/bftengine/src/bftengine/ReadOnlyReplica.hpp @@ -33,8 +33,8 @@ class ReadOnlyReplica : public ReplicaForStateTransfer { void start() override; void stop() override; - virtual bool isReadOnly() const override { return true; } - virtual bool isFullNode() const override { return false; } + virtual bool isReadOnly() const override { return config_.isReadOnly; } + virtual bool isFullNode() const override { return config_.isFullNode; } protected: void sendAskForCheckpointMsg(); diff --git a/bftengine/src/bftengine/ReplicaImp.hpp b/bftengine/src/bftengine/ReplicaImp.hpp index 7d595569ad..4d9aa8a9ab 100644 --- a/bftengine/src/bftengine/ReplicaImp.hpp +++ b/bftengine/src/bftengine/ReplicaImp.hpp @@ -364,9 +364,9 @@ class ReplicaImp : public InternalReplicaApi, public ReplicaForStateTransfer { std::shared_ptr ticksGenerator() const { return ticks_gen_; } - virtual bool isReadOnly() const override { return false; } + virtual bool isReadOnly() const override { return config_.isReadOnly; } - virtual bool isFullNode() const override { return false; } + virtual bool isFullNode() const override { return config_.isFullNode; } shared_ptr getPersistentStorage() const { return ps_; } std::shared_ptr getSecretsManager() { return sm_; }