diff --git a/bftengine/CMakeLists.txt b/bftengine/CMakeLists.txt index 9651286269..e991c3a2f0 100644 --- a/bftengine/CMakeLists.txt +++ b/bftengine/CMakeLists.txt @@ -11,6 +11,7 @@ set(corebft_source_files src/bftengine/DebugStatistics.cpp src/bftengine/SeqNumInfo.cpp src/bftengine/ReadOnlyReplica.cpp + src/bftengine/FullNodeReplica.cpp src/bftengine/ReplicaBase.cpp src/bftengine/ReplicaForStateTransfer.cpp src/bftengine/ReplicaImp.cpp diff --git a/bftengine/include/bcstatetransfer/SimpleBCStateTransfer.hpp b/bftengine/include/bcstatetransfer/SimpleBCStateTransfer.hpp index 79f67222d1..c6943bce1a 100644 --- a/bftengine/include/bcstatetransfer/SimpleBCStateTransfer.hpp +++ b/bftengine/include/bcstatetransfer/SimpleBCStateTransfer.hpp @@ -150,8 +150,10 @@ struct Config { uint16_t cVal = 0; uint16_t numReplicas = 0; // number of consensus replicas uint16_t numRoReplicas = 0; + uint16_t numFnReplicas = 0; bool pedanticChecks = false; bool isReadOnly = false; + bool isFullNode = false; // sizes uint32_t maxChunkSize = 0; @@ -219,6 +221,8 @@ inline std::ostream &operator<<(std::ostream &os, const Config &c) { c.enableSourceBlocksPreFetch, c.enableSourceSelectorPrimaryAwareness, c.enableStoreRvbDataDuringCheckpointing); + os << ","; + os << KVLOG(c.numFnReplicas, c.isFullNode); return os; } // creates an instance of the state transfer module. diff --git a/bftengine/include/bftengine/ReplicaConfig.hpp b/bftengine/include/bftengine/ReplicaConfig.hpp index 2cd6af5ce2..fba4452d41 100644 --- a/bftengine/include/bftengine/ReplicaConfig.hpp +++ b/bftengine/include/bftengine/ReplicaConfig.hpp @@ -46,8 +46,10 @@ class ReplicaConfig : public concord::serialize::SerializableFactory= 1"); CONFIG_PARAM(cVal, uint16_t, 0, "C value. cVal >=0"); CONFIG_PARAM(replicaId, @@ -327,8 +329,10 @@ class ReplicaConfig : public concord::serialize::SerializableFactory, + IStateTransfer *, + bft::communication::ICommunication *, + MetadataStorage *); + static void setAggregator(const std::shared_ptr &aggregator); static logging::Logger logger_; diff --git a/bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp b/bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp index ddb1cab14e..fe0d8d63ca 100644 --- a/bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp +++ b/bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp @@ -148,7 +148,9 @@ std::shared_ptr CreFactory::create(std::shared_ptr< for (uint16_t i = 0; i < repConfig.numReplicas; i++) { bftClientConf.all_replicas.emplace(bft::client::ReplicaId{i}); } - for (uint16_t i = repConfig.numReplicas; i < repConfig.numReplicas + repConfig.numRoReplicas; i++) { + for (uint16_t i = repConfig.numReplicas; + i < repConfig.numReplicas + repConfig.numRoReplicas + repConfig.numFnReplicas; + i++) { bftClientConf.ro_replicas.emplace(bft::client::ReplicaId{i}); } bftClientConf.replicas_master_key_folder_path = std::nullopt; diff --git a/bftengine/src/bcstatetransfer/BCStateTran.cpp b/bftengine/src/bcstatetransfer/BCStateTran.cpp index a981ce68af..8ca2a6ed1c 100644 --- a/bftengine/src/bcstatetransfer/BCStateTran.cpp +++ b/bftengine/src/bcstatetransfer/BCStateTran.cpp @@ -308,7 +308,7 @@ BCStateTran::BCStateTran(const Config &config, IAppState *const stateApi, DataSt // Validate input parameters and some of the configuration ConcordAssertNE(stateApi, nullptr); ConcordAssertGE(replicas_.size(), 3U * config_.fVal + 1U); - ConcordAssert(replicas_.count(config_.myReplicaId) == 1 || config.isReadOnly); + ConcordAssert(replicas_.count(config_.myReplicaId) == 1 || config.isReadOnly || config.isFullNode); ConcordAssertLT(finalizePutblockTimeoutMilli_, config_.refreshTimerMs); ConcordAssertEQ(RejectFetchingMsg::reasonMessages.size(), RejectFetchingMsg::Reason::LAST - 1); if (config_.sourceSessionExpiryDurationMs > 0) { @@ -1159,7 +1159,7 @@ void BCStateTran::handleStateTransferMessageImpl(char *msg, time_in_incoming_events_queue_rec_.end(); histograms_.incoming_events_queue_size->record(incomingEventsQ_->size()); } - bool invalidSender = (senderId >= (config_.numReplicas + config_.numRoReplicas)); + bool invalidSender = (senderId >= (config_.numReplicas + config_.numRoReplicas + config_.numFnReplicas)); bool sentFromSelf = senderId == config_.myReplicaId; bool msgSizeTooSmall = msgLen < sizeof(BCStateTranBaseMsg); if (msgSizeTooSmall || sentFromSelf || invalidSender) { @@ -3971,13 +3971,13 @@ void BCStateTran::computeDigestOfPage( if (checkpointNumber > 0) { digestGenerator.update(page, pageSize); } - digestGenerator.writeDigest(outDigest.getForUpdate()); + digestGenerator.writeDigest(reinterpret_cast(&outDigest)); } void BCStateTran::computeDigestOfPagesDescriptor(const DataStore::ResPagesDescriptor *pagesDesc, Digest &outDigest) { DigestGenerator digestGenerator; digestGenerator.update(reinterpret_cast(pagesDesc), pagesDesc->size()); - digestGenerator.writeDigest(outDigest.getForUpdate()); + digestGenerator.writeDigest(reinterpret_cast(&outDigest)); } void BCStateTran::computeDigestOfBlockImpl(const uint64_t blockNum, @@ -3996,7 +3996,7 @@ void BCStateTran::computeDigestOfBlock(const uint64_t blockNum, const char *block, const uint32_t blockSize, Digest *outDigest) { - computeDigestOfBlockImpl(blockNum, block, blockSize, outDigest->getForUpdate()); + computeDigestOfBlockImpl(blockNum, block, blockSize, reinterpret_cast(outDigest)); } BlockDigest BCStateTran::computeDigestOfBlock(const uint64_t blockNum, const char *block, const uint32_t blockSize) { diff --git a/bftengine/src/bftengine/ClientsManager.cpp b/bftengine/src/bftengine/ClientsManager.cpp index e1465d6955..80e76e37e2 100644 --- a/bftengine/src/bftengine/ClientsManager.cpp +++ b/bftengine/src/bftengine/ClientsManager.cpp @@ -217,7 +217,9 @@ ClientsManager::ClientsManager(const std::set& proxyClients, ConcordAssert(maxNumOfReqsPerClient_ > 0); reservedPagesPerRequest_ = reservedPagesPerRequest(sizeOfReservedPage(), maxReplySize_); reservedPagesPerClient_ = reservedPagesPerClient(sizeOfReservedPage(), maxReplySize_, maxNumOfReqsPerClient_); - for (NodeIdType i = 0; i < ReplicaConfig::instance().numReplicas + ReplicaConfig::instance().numRoReplicas; i++) { + for (NodeIdType i = 0; i < ReplicaConfig::instance().numReplicas + ReplicaConfig::instance().numRoReplicas + + ReplicaConfig::instance().numFnReplicas; + i++) { clientIds_.insert(i); } clientIds_.insert(proxyClients_.begin(), proxyClients_.end()); diff --git a/bftengine/src/bftengine/FullNodeReplica.cpp b/bftengine/src/bftengine/FullNodeReplica.cpp new file mode 100755 index 0000000000..4151d793b8 --- /dev/null +++ b/bftengine/src/bftengine/FullNodeReplica.cpp @@ -0,0 +1,295 @@ +// Concord +// +// Copyright (c) 2018, 2019 VMware, Inc. All Rights Reserved. +// +// This product is licensed to you under the Apache 2.0 license (the "License"). You may not use this product except in +// compliance with the Apache 2.0 License. +// +// This product may include a number of subcomponents with separate copyright notices and license terms. Your use of +// these subcomponents is subject to the terms and conditions of the sub-component's license, as noted in the LICENSE +// file. + +#include +#include +#include + +#include +#include +#include "FullNodeReplica.hpp" + +#include "log/logger.hpp" +#include "MsgHandlersRegistrator.hpp" +#include "messages/CheckpointMsg.hpp" +#include "messages/AskForCheckpointMsg.hpp" +#include "messages/ClientRequestMsg.hpp" +#include "messages/ClientReplyMsg.hpp" +#include "util/kvstream.h" +#include "PersistentStorage.hpp" +#include "MsgsCommunicator.hpp" +#include "SigManager.hpp" +#include "ReconfigurationCmd.hpp" +#include "util/json_output.hpp" +#include "SharedTypes.hpp" +#include "communication/StateControl.hpp" + +using concordUtil::Timers; + +namespace bftEngine::impl { + +FullNodeReplica::FullNodeReplica(const ReplicaConfig &config, + std::shared_ptr requestsHandler, + IStateTransfer *stateTransfer, + std::shared_ptr msgComm, + std::shared_ptr msgHandlerReg, + concordUtil::Timers &timers, + MetadataStorage *metadataStorage) + : ReplicaForStateTransfer(config, requestsHandler, stateTransfer, msgComm, msgHandlerReg, true, timers), + ro_metrics_{metrics_.RegisterCounter("receivedCheckpointMsgs"), + metrics_.RegisterCounter("sentAskForCheckpointMsgs"), + metrics_.RegisterCounter("receivedInvalidMsgs"), + metrics_.RegisterGauge("lastExecutedSeqNum", lastExecutedSeqNum)}, + metadataStorage_{metadataStorage} { + LOG_INFO(GL, "Initialising ReadOnly Replica"); + repsInfo = new ReplicasInfo(config, dynamicCollectorForPartialProofs, dynamicCollectorForExecutionProofs); + msgHandlers_->registerMsgHandler( + MsgCode::Checkpoint, std::bind(&FullNodeReplica::messageHandler, this, std::placeholders::_1)); + msgHandlers_->registerMsgHandler( + MsgCode::ClientRequest, + std::bind(&FullNodeReplica::messageHandler, this, std::placeholders::_1)); + msgHandlers_->registerMsgHandler( + MsgCode::StateTransfer, + std::bind(&FullNodeReplica::messageHandler, this, std::placeholders::_1)); + metrics_.Register(); + + SigManager::init(config_.replicaId, + config_.replicaPrivateKey, + config_.publicKeysOfReplicas, + concord::crypto::KeyFormat::HexaDecimalStrippedFormat, + ReplicaConfig::instance().getPublicKeysOfClients(), + concord::crypto::KeyFormat::PemFormat, + {{repsInfo->getIdOfOperator(), + ReplicaConfig::instance().getOperatorPublicKey(), + concord::crypto::KeyFormat::PemFormat}}, + *repsInfo); + + // Register status handler for Read-Only replica + registerStatusHandlers(); + bft::communication::StateControl::instance().setGetPeerPubKeyMethod( + [&](uint32_t id) { return SigManager::instance()->getPublicKeyOfVerifier(id); }); +} + +void FullNodeReplica::start() { + ReplicaForStateTransfer::start(); + size_t sendAskForCheckpointMsgPeriodSec = config_.get("concord.bft.ro.sendAskForCheckpointMsgPeriodSec", 30); + askForCheckpointMsgTimer_ = timers_.add( + std::chrono::seconds(sendAskForCheckpointMsgPeriodSec), Timers::Timer::RECURRING, [this](Timers::Handle) { + if (!this->isCollectingState()) { + sendAskForCheckpointMsg(); + } + }); + msgsCommunicator_->startMsgsProcessing(config_.replicaId); +} + +void FullNodeReplica::stop() { + timers_.cancel(askForCheckpointMsgTimer_); + ReplicaForStateTransfer::stop(); +} + +void FullNodeReplica::onTransferringCompleteImp(uint64_t newStateCheckpoint) { + lastExecutedSeqNum = newStateCheckpoint * checkpointWindowSize; + + ro_metrics_.last_executed_seq_num_.Get().Set(lastExecutedSeqNum); + last_executed_seq_num_ = lastExecutedSeqNum; +} + +void FullNodeReplica::onReportAboutInvalidMessage(MessageBase *msg, const char *reason) { + ro_metrics_.received_invalid_msg_++; + LOG_WARN(GL, + "Node " << config_.replicaId << " received invalid message from Node " << msg->senderId() + << " type=" << msg->type() << " reason: " << reason); +} +void FullNodeReplica::sendAskForCheckpointMsg() { + ro_metrics_.sent_ask_for_checkpoint_msg_++; + LOG_INFO(GL, "sending AskForCheckpointMsg"); + auto msg = std::make_unique(config_.replicaId); + for (auto id : repsInfo->idsOfPeerReplicas()) send(msg.get(), id); +} + +template <> +void FullNodeReplica::onMessage(std::unique_ptr msg) { + ReplicaForStateTransfer::onMessage(move(msg)); +} + +template <> +void FullNodeReplica::onMessage(std::unique_ptr msg) { + if (isCollectingState()) { + return; + } + ro_metrics_.received_checkpoint_msg_++; + LOG_INFO(GL, + KVLOG(msg->senderId(), + msg->idOfGeneratedReplica(), + msg->seqNumber(), + msg->epochNumber(), + msg->size(), + msg->isStableState(), + msg->state(), + msg->stateDigest(), + msg->reservedPagesDigest(), + msg->rvbDataDigest())); + + // Reconfiguration cmd block is synced to RO replica via reserved pages + EpochNum replicasLastKnownEpochVal = 0; + auto epochNumberFromResPages = ReconfigurationCmd::instance().getReconfigurationCommandEpochNumber(); + if (epochNumberFromResPages.has_value()) replicasLastKnownEpochVal = epochNumberFromResPages.value(); + + // not relevant + if (!msg->isStableState() || msg->seqNumber() <= lastExecutedSeqNum || + msg->epochNumber() < replicasLastKnownEpochVal) { + return; + } + // no self certificate + static std::map> checkpointsInfo; + const auto msgSeqNum = msg->seqNumber(); + const auto idOfGeneratedReplica = msg->idOfGeneratedReplica(); + checkpointsInfo[msgSeqNum].addCheckpointMsg(msg.release(), idOfGeneratedReplica); + // if enough - invoke state transfer + if (checkpointsInfo[msgSeqNum].isCheckpointCertificateComplete()) { + persistCheckpointDescriptor(msgSeqNum, checkpointsInfo[msgSeqNum]); + checkpointsInfo.clear(); + LOG_INFO(GL, "call to startCollectingState()"); + stateTransfer->startCollectingState(); + } +} + +void FullNodeReplica::persistCheckpointDescriptor(const SeqNum &seqnum, const CheckpointInfo &chckpinfo) { + std::vector msgs; + msgs.reserve(chckpinfo.getAllCheckpointMsgs().size()); + for (const auto &m : chckpinfo.getAllCheckpointMsgs()) { + msgs.push_back(m.second); + LOG_INFO(GL, + KVLOG(m.second->seqNumber(), + m.second->epochNumber(), + m.second->state(), + m.second->stateDigest(), + m.second->reservedPagesDigest(), + m.second->rvbDataDigest(), + m.second->idOfGeneratedReplica())); + } + DescriptorOfLastStableCheckpoint desc(ReplicaConfig::instance().getnumReplicas(), msgs); + const size_t bufLen = DescriptorOfLastStableCheckpoint::maxSize(ReplicaConfig::instance().getnumReplicas()); + concord::serialize::UniquePtrToChar descBuf(new char[bufLen]); + char *descBufPtr = descBuf.get(); + size_t actualSize = 0; + desc.serialize(descBufPtr, bufLen, actualSize); + ConcordAssertNE(actualSize, 0); + + // TODO [TK] S3KeyGenerator + // checkpoints// + std::ostringstream oss; + oss << "checkpoints/" << msgs[0]->state() << "/" << config_.replicaId; + metadataStorage_->atomicWriteArbitraryObject(oss.str(), descBuf.get(), actualSize); +} + +template <> +void FullNodeReplica::onMessage(std::unique_ptr msg) { + const NodeIdType senderId = msg->senderId(); + const NodeIdType clientId = msg->clientProxyId(); + const bool reconfig_flag = (msg->flags() & MsgFlag::RECONFIG_FLAG) != 0; + const ReqId reqSeqNum = msg->requestSeqNum(); + const uint64_t flags = msg->flags(); + + SCOPED_MDC_CID(msg->getCid()); + LOG_DEBUG(CNSUS, KVLOG(clientId, reqSeqNum, senderId) << " flags: " << std::bitset(flags)); + + const auto &span_context = msg->spanContext::type>(); + auto span = concordUtils::startChildSpanFromContext(span_context, "bft_client_request"); + span.setTag("rid", config_.getreplicaId()); + span.setTag("cid", msg->getCid()); + span.setTag("seq_num", reqSeqNum); + + // A read only replica can handle only reconfiguration requests. Those requests are signed by the operator and + // the validation is done in the reconfiguration engine. Thus, we don't need to check the client validity as in + // the committers + + if (reconfig_flag) { + LOG_INFO(GL, "ro replica has received a reconfiguration request"); + executeReadOnlyRequest(span, *(msg.get())); + return; + } +} + +void FullNodeReplica::executeReadOnlyRequest(concordUtils::SpanWrapper &parent_span, const ClientRequestMsg &request) { + auto span = concordUtils::startChildSpan("bft_execute_read_only_request", parent_span); + // Read only replica does not know who is the primary, so it always return 0. It is the client responsibility to treat + // the replies accordingly. + ClientReplyMsg reply(0, request.requestSeqNum(), config_.getreplicaId()); + const uint16_t clientId = request.clientProxyId(); + + int executionResult = 0; + bftEngine::IRequestsHandler::ExecutionRequestsQueue accumulatedRequests; + accumulatedRequests.push_back(bftEngine::IRequestsHandler::ExecutionRequest{clientId, + static_cast(lastExecutedSeqNum), + request.getCid(), + request.flags(), + request.requestLength(), + request.requestBuf(), + "", + reply.maxReplyLength(), + reply.replyBuf(), + request.requestSeqNum(), + request.requestIndexInBatch(), + request.result()}); + // DD: Do we need to take care of Time Service here? + bftRequestsHandler_->execute(accumulatedRequests, std::nullopt, request.getCid(), span); + IRequestsHandler::ExecutionRequest &single_request = accumulatedRequests.back(); + executionResult = single_request.outExecutionStatus; + const uint32_t actualReplyLength = single_request.outActualReplySize; + const uint32_t actualReplicaSpecificInfoLength = single_request.outReplicaSpecificInfoSize; + LOG_DEBUG(GL, + "Executed read only request. " << KVLOG(clientId, + lastExecutedSeqNum, + request.requestLength(), + reply.maxReplyLength(), + actualReplyLength, + actualReplicaSpecificInfoLength, + executionResult)); + // TODO(GG): TBD - how do we want to support empty replies? (actualReplyLength==0) + if (!executionResult) { + if (actualReplyLength > 0) { + reply.setReplyLength(actualReplyLength); + reply.setReplicaSpecificInfoLength(actualReplicaSpecificInfoLength); + send(&reply, clientId); + return; + } else { + LOG_WARN(GL, "Received zero size response. " << KVLOG(clientId)); + strcpy(single_request.outReply, "Executed data is empty"); + single_request.outActualReplySize = strlen(single_request.outReply); + executionResult = static_cast(bftEngine::OperationResult::EXEC_DATA_EMPTY); + } + + } else { + LOG_ERROR(GL, "Received error while executing RO request. " << KVLOG(clientId, executionResult)); + } + ClientReplyMsg replyMsg( + 0, request.requestSeqNum(), single_request.outReply, single_request.outActualReplySize, executionResult); + send(&replyMsg, clientId); +} + +void FullNodeReplica::registerStatusHandlers() { + auto h = concord::diagnostics::StatusHandler( + "replica-sequence-numbers", "Last executed sequence number of the read-only replica", [this]() { + concordUtils::BuildJson bj; + + bj.startJson(); + bj.startNested("sequenceNumbers"); + bj.addKv("lastExecutedSeqNum", last_executed_seq_num_); + bj.endNested(); + bj.endJson(); + + return bj.getJson(); + }); + concord::diagnostics::RegistrarSingleton::getInstance().status.registerHandler(h); +} + +} // namespace bftEngine::impl diff --git a/bftengine/src/bftengine/FullNodeReplica.hpp b/bftengine/src/bftengine/FullNodeReplica.hpp new file mode 100644 index 0000000000..61f1ca1578 --- /dev/null +++ b/bftengine/src/bftengine/FullNodeReplica.hpp @@ -0,0 +1,80 @@ +// Concord +// +// Copyright (c) 2018-2019 VMware, Inc. All Rights Reserved. +// +// This product is licensed to you under the Apache 2.0 license (the "License"). You may not use this product except in +// compliance with the Apache 2.0 License. +// +// This product may include a number of subcomponents with separate copyright notices and license terms. Your use of +// these subcomponents is subject to the terms and conditions of the sub-component's license, as noted in the LICENSE +// file. + +#pragma once + +#include "ReplicaForStateTransfer.hpp" +#include "util/Timers.hpp" +#include "CheckpointInfo.hpp" + +namespace bftEngine::impl { + +class ClientRequestMsg; +/** + * + */ +class FullNodeReplica : public ReplicaForStateTransfer { + public: + FullNodeReplica(const ReplicaConfig&, + std::shared_ptr, + IStateTransfer*, + std::shared_ptr, + std::shared_ptr, + concordUtil::Timers& timers, + MetadataStorage* metadataStorage); + + void start() override; + void stop() override; + virtual bool isReadOnly() const override { return true; } + + protected: + void sendAskForCheckpointMsg(); + + void onTransferringCompleteImp(uint64_t newStateCheckpoint) override; + void onReportAboutInvalidMessage(MessageBase* msg, const char* reason) override; + + template + void messageHandler(std::unique_ptr msg) { + auto trueTypeObj = std::make_unique(msg.get()); + msg.reset(); + if (validateMessage(trueTypeObj.get())) { + onMessage(std::move(trueTypeObj)); + } + } + + template + void onMessage(std::unique_ptr); + + void executeReadOnlyRequest(concordUtils::SpanWrapper& parent_span, const ClientRequestMsg& m); + void persistCheckpointDescriptor(const SeqNum&, const CheckpointInfo&); + + protected: + concordUtil::Timers::Handle askForCheckpointMsgTimer_; + + struct Metrics { + concordMetrics::CounterHandle received_checkpoint_msg_; + concordMetrics::CounterHandle sent_ask_for_checkpoint_msg_; + concordMetrics::CounterHandle received_invalid_msg_; + concordMetrics::GaugeHandle last_executed_seq_num_; + } ro_metrics_; + + std::unique_ptr metadataStorage_; + std::atomic last_executed_seq_num_{0}; + + private: + // This function serves as an ReplicaStatusHandlers alternative for FullNodeReplica. The reason to use this function + // is that regular and read-only replicas expose different metrics and the status handlers are not interchangeable. + // The read-only replica also hasn't got an implementation for InternalMessages which are used by the + // ReplicaStatusHandler. + void registerStatusHandlers(); +}; + +} // namespace bftEngine::impl diff --git a/bftengine/src/bftengine/ReplicaFactory.cpp b/bftengine/src/bftengine/ReplicaFactory.cpp index f2a4653333..1bf85d2873 100644 --- a/bftengine/src/bftengine/ReplicaFactory.cpp +++ b/bftengine/src/bftengine/ReplicaFactory.cpp @@ -143,8 +143,9 @@ ReplicaFactory::IReplicaPtr ReplicaFactory::createReplica( replicaConfig.numReplicas, replicaConfig.fVal, replicaConfig.cVal, - replicaConfig.numReplicas + replicaConfig.numRoReplicas + replicaConfig.numOfClientProxies + - replicaConfig.numOfExternalClients + replicaConfig.numOfClientServices + replicaConfig.numReplicas, + replicaConfig.numReplicas + replicaConfig.numRoReplicas + replicaConfig.numFnReplicas + + replicaConfig.numOfClientProxies + replicaConfig.numOfExternalClients + replicaConfig.numOfClientServices + + replicaConfig.numReplicas, replicaConfig.clientBatchingMaxMsgsNbr)); unique_ptr metadataStoragePtr(metadataStorage); auto objectDescriptors = static_cast(persistentStoragePtr.get()) @@ -269,6 +270,26 @@ ReplicaFactory::IReplicaPtr ReplicaFactory::createRoReplica(const ReplicaConfig return replicaInternal; } +ReplicaFactory::IReplicaPtr ReplicaFactory::createFullNodeReplica(const ReplicaConfig &replicaConfig, + std::shared_ptr requestsHandler, + IStateTransfer *stateTransfer, + bft::communication::ICommunication *communication, + MetadataStorage *metadataStorage) { + LOG_INFO(logger_, "ReplicaFactory::createFullNodeReplica START"); + auto replicaInternal = std::make_unique(); + auto msgHandlers = std::make_shared(); + auto incomingMsgsStorageImpPtr = + std::make_unique(msgHandlers, timersResolution, replicaConfig.replicaId); + auto &timers = incomingMsgsStorageImpPtr->timers(); + std::shared_ptr incomingMsgsStorage{std::move(incomingMsgsStorageImpPtr)}; + auto msgReceiver = std::make_shared(incomingMsgsStorage); + auto msgsCommunicator = std::make_shared(communication, incomingMsgsStorage, msgReceiver); + replicaInternal->setReplica(std::make_unique( + replicaConfig, requestsHandler, stateTransfer, msgsCommunicator, msgHandlers, timers, metadataStorage)); + LOG_INFO(logger_, "ReplicaFactory::createFullNodeReplica END"); + return replicaInternal; +} + std::unique_ptr ReplicaFactory::createPreProcessor( const ReplicaConfig &replicaConfig, shared_ptr &msgsCommunicator, diff --git a/bftengine/src/bftengine/ReplicaForStateTransfer.cpp b/bftengine/src/bftengine/ReplicaForStateTransfer.cpp index 956bb75281..0387e1a596 100644 --- a/bftengine/src/bftengine/ReplicaForStateTransfer.cpp +++ b/bftengine/src/bftengine/ReplicaForStateTransfer.cpp @@ -49,8 +49,8 @@ ReplicaForStateTransfer::ReplicaForStateTransfer(const ReplicaConfig &config, // Reserved Pages and State Transfer initialization ClientsManager::setNumResPages( - (config.numReplicas + config.numRoReplicas + config.numOfClientProxies + config.numOfExternalClients + - config.numReplicas + config.numOfClientServices) * + (config.numReplicas + config.numRoReplicas + config.numFnReplicas + config.numOfClientProxies + + config.numOfExternalClients + config.numReplicas + config.numOfClientServices) * ClientsManager::reservedPagesPerClient( config.getsizeOfReservedPage(), config.maxReplyMessageSize, diff --git a/bftengine/src/bftengine/ReplicasInfo.cpp b/bftengine/src/bftengine/ReplicasInfo.cpp index c8dc186aa4..b4c0138fef 100644 --- a/bftengine/src/bftengine/ReplicasInfo.cpp +++ b/bftengine/src/bftengine/ReplicasInfo.cpp @@ -57,13 +57,14 @@ ReplicasInfo::ReplicasInfo(const ReplicaConfig& config, : _myId{config.replicaId}, _numberOfReplicas{config.numReplicas}, _numberOfRoReplicas{config.numRoReplicas}, + _numberOfFnReplicas{config.numFnReplicas}, _numOfClientProxies{config.numOfClientProxies}, _numberOfExternalClients{config.numOfExternalClients}, _numberOfClientServices{config.numOfClientServices}, _numberOfInternalClients{config.numReplicas}, - _maxValidPrincipalId{static_cast(config.numReplicas + config.numRoReplicas + config.numOfClientProxies + - config.numOfExternalClients + _numberOfInternalClients + - _numberOfClientServices - 1)}, + _maxValidPrincipalId{static_cast(config.numReplicas + config.numRoReplicas + config.numFnReplicas + + config.numOfClientProxies + config.numOfExternalClients + + _numberOfInternalClients + _numberOfClientServices - 1)}, _fVal{config.fVal}, _cVal{config.cVal}, _dynamicCollectorForPartialProofs{dynamicCollectorForPartialProofs}, @@ -87,7 +88,7 @@ ReplicasInfo::ReplicasInfo(const ReplicaConfig& config, _idsOfPeerROReplicas{[&config]() { std::set ret; uint16_t start = config.numReplicas; - uint16_t end = start + config.numRoReplicas; + uint16_t end = start + config.numRoReplicas + config.numFnReplicas; for (uint16_t i{start}; i < end; ++i) if (i != config.replicaId) { ret.insert(i); @@ -98,7 +99,7 @@ ReplicasInfo::ReplicasInfo(const ReplicaConfig& config, _idsOfClientProxies{[&config]() { std::set ret; - auto start = config.numReplicas + config.numRoReplicas; + auto start = config.numReplicas + config.numRoReplicas + config.numFnReplicas; auto end = start + config.numOfClientProxies; for (auto i = start; i < end; ++i) { ret.insert(i); @@ -109,7 +110,7 @@ ReplicasInfo::ReplicasInfo(const ReplicaConfig& config, _idsOfExternalClients{[&config]() { std::set ret; - auto start = config.numReplicas + config.numRoReplicas + config.numOfClientProxies; + auto start = config.numReplicas + config.numRoReplicas + config.numFnReplicas + config.numOfClientProxies; auto end = start + config.numOfExternalClients; for (auto i = start; i < (end - ((uint16_t)config.operatorEnabled_)); ++i) { ret.insert(i); @@ -124,7 +125,7 @@ ReplicasInfo::ReplicasInfo(const ReplicaConfig& config, _idsOfClientServices{[&config]() { std::set ret; - auto start = config.numReplicas + config.numRoReplicas + config.numOfClientProxies + + auto start = config.numReplicas + config.numRoReplicas + config.numFnReplicas + config.numOfClientProxies + config.numOfExternalClients - ((uint16_t)config.operatorEnabled_); auto end = start + config.numOfClientServices; for (auto i = start; i < end; ++i) { @@ -137,7 +138,7 @@ ReplicasInfo::ReplicasInfo(const ReplicaConfig& config, _idsOfInternalClients{[&config]() { std::set ret; - auto start = config.numReplicas + config.numRoReplicas + config.numOfClientProxies + + auto start = config.numReplicas + config.numRoReplicas + config.numFnReplicas + config.numOfClientProxies + config.numOfExternalClients + config.numOfClientServices; auto end = start + config.numReplicas; for (auto i = start; i < end; ++i) { @@ -147,8 +148,9 @@ ReplicasInfo::ReplicasInfo(const ReplicaConfig& config, return ret; }()} { _operator_id = config.operatorEnabled_ - ? static_cast(config.numReplicas + config.numRoReplicas + config.numOfClientProxies + - config.numOfExternalClients + config.numOfClientServices - 1) + ? static_cast(config.numReplicas + config.numRoReplicas + config.numFnReplicas + + config.numOfClientProxies + config.numOfExternalClients + + config.numOfClientServices - 1) : 0; ConcordAssert(_numberOfReplicas == (3 * _fVal + 2 * _cVal + 1)); } diff --git a/bftengine/src/bftengine/ReplicasInfo.hpp b/bftengine/src/bftengine/ReplicasInfo.hpp index f012a82990..a5a30c383e 100644 --- a/bftengine/src/bftengine/ReplicasInfo.hpp +++ b/bftengine/src/bftengine/ReplicasInfo.hpp @@ -81,6 +81,7 @@ class ReplicasInfo { uint16_t getNumberOfReplicas() { return _numberOfReplicas; } uint16_t getNumberOfRoReplicas() { return _numberOfRoReplicas; } + uint16_t getNumberOfFnReplicas() { return _numberOfFnReplicas; } uint16_t getNumOfClientProxies() { return _numOfClientProxies; } uint16_t getNumberOfExternalClients() { return _numberOfExternalClients; } uint16_t getNumberOfInternalClients() { return _numberOfInternalClients; } @@ -90,6 +91,7 @@ class ReplicasInfo { const ReplicaId _myId = 0; const uint16_t _numberOfReplicas = 0; const uint16_t _numberOfRoReplicas = 0; + const uint16_t _numberOfFnReplicas = 0; const uint16_t _numOfClientProxies = 0; const uint16_t _numberOfExternalClients = 0; const uint16_t _numberOfClientServices = 0; diff --git a/bftengine/src/bftengine/SigManager.cpp b/bftengine/src/bftengine/SigManager.cpp index 929ade0987..3e92b53d80 100644 --- a/bftengine/src/bftengine/SigManager.cpp +++ b/bftengine/src/bftengine/SigManager.cpp @@ -51,18 +51,23 @@ SigManager* SigManager::initImpl( size_t lowBound, highBound; auto numReplicas = replicasInfo.getNumberOfReplicas(); auto numRoReplicas = replicasInfo.getNumberOfRoReplicas(); + auto numFnReplicas = replicasInfo.getNumberOfFnReplicas(); auto numOfClientProxies = replicasInfo.getNumOfClientProxies(); auto numOfExternalClients = replicasInfo.getNumberOfExternalClients(); auto numOfInternalClients = replicasInfo.getNumberOfInternalClients(); auto numOfClientServices = replicasInfo.getNumberOfClientServices(); - LOG_INFO( - GL, - "Compute publicKeysMapping and publickeys: " << KVLOG( - myId, numReplicas, numRoReplicas, numOfClientProxies, numOfExternalClients, publicKeysOfReplicas.size())); + LOG_INFO(GL, + "Compute publicKeysMapping and publickeys: " << KVLOG(myId, + numReplicas, + numRoReplicas, + numFnReplicas, + numOfClientProxies, + numOfExternalClients, + publicKeysOfReplicas.size())); SigManager::KeyIndex i{0}; - highBound = numReplicas + numRoReplicas - 1; + highBound = numReplicas + numRoReplicas + numFnReplicas - 1; for (const auto& repIdToKeyPair : publicKeysOfReplicas) { // each replica sign with a unique private key (1 to 1 relation) ConcordAssert(repIdToKeyPair.first <= highBound); @@ -74,7 +79,7 @@ SigManager* SigManager::initImpl( // Multiple clients might be signing with the same private key (1 to many relation) // Also, we do not enforce to have all range between [lowBound, highBound] construcred. We might want to have less // principal ids mapped to keys than what is stated in the range. - lowBound = numRoReplicas + numReplicas + numOfClientProxies; + lowBound = numRoReplicas + numFnReplicas + numReplicas + numOfClientProxies; highBound = lowBound + numOfExternalClients + numOfInternalClients + numOfClientServices - 1; for (const auto& p : (*publicKeysOfClients)) { ConcordAssert(!p.first.empty()); diff --git a/bftengine/src/preprocessor/PreProcessor.cpp b/bftengine/src/preprocessor/PreProcessor.cpp index bdc3e86da5..062933ac42 100644 --- a/bftengine/src/preprocessor/PreProcessor.cpp +++ b/bftengine/src/preprocessor/PreProcessor.cpp @@ -407,7 +407,8 @@ PreProcessor::PreProcessor(shared_ptr &msgsCommunicator, myReplicaId_(myReplica.getReplicaConfig().replicaId), maxExternalMsgSize_(myReplica.getReplicaConfig().maxExternalMessageSize), responseSizeInternalOverhead_{/* for conflict detection BlockId */ sizeof(uint64_t)}, - numOfReplicas_(myReplica.getReplicaConfig().numReplicas + myReplica.getReplicaConfig().numRoReplicas), + numOfReplicas_(myReplica.getReplicaConfig().numReplicas + myReplica.getReplicaConfig().numRoReplicas + + myReplica.getReplicaConfig().numFnReplicas), numOfClientProxies_(myReplica.getReplicaConfig().numOfClientProxies), clientBatchingEnabled_(myReplica.getReplicaConfig().clientBatchingEnabled), threadPool_("PreProcessor::threadPool"), diff --git a/bftengine/src/simplestatetransfer/SimpleStateTran.cpp b/bftengine/src/simplestatetransfer/SimpleStateTran.cpp index 5f6c1957e4..7beab6b88a 100644 --- a/bftengine/src/simplestatetransfer/SimpleStateTran.cpp +++ b/bftengine/src/simplestatetransfer/SimpleStateTran.cpp @@ -313,8 +313,10 @@ SimpleStateTran::SimpleStateTran( cVal, // cVal (uint16_t)(3 * fVal + 2 * cVal + 1), // numReplicas 0, // numRoReplicas + 0, // numFnReplicas pedanticChecks, // pedanticChecks false, // isReadOnly + false, // isFullNode 128, // maxChunkSize 256, // maxNumberOfChunksInBatch 1024, // maxBlockSize diff --git a/bftengine/tests/bcstatetransfer/bcstatetransfer_tests.cpp b/bftengine/tests/bcstatetransfer/bcstatetransfer_tests.cpp index 025c9d5331..eedefd206f 100644 --- a/bftengine/tests/bcstatetransfer/bcstatetransfer_tests.cpp +++ b/bftengine/tests/bcstatetransfer/bcstatetransfer_tests.cpp @@ -84,8 +84,10 @@ Config targetConfig() { 0, // cVal 7, // numReplicas 0, // numRoReplicas + 0, // numFnReplicas false, // pedanticChecks false, // isReadOnly + false, // isFullNode 1024, // maxChunkSize 24, // maxNumberOfChunksInBatch 1024, // maxBlockSize diff --git a/examples/replica/src/SetupReplica.cpp b/examples/replica/src/SetupReplica.cpp index 1833a29e43..485e78ba65 100644 --- a/examples/replica/src/SetupReplica.cpp +++ b/examples/replica/src/SetupReplica.cpp @@ -202,8 +202,8 @@ bft::communication::ICommunication* SetupReplica::createCommunication( logging::Logger logger = getLogger(); TestCommConfig testCommConfig(logger); testCommConfig.GetReplicaConfig(replicaConfig.replicaId, keysFilePrefix, &replicaConfig); - uint16_t numOfReplicas = - (uint16_t)(3 * replicaConfig.fVal + 2 * replicaConfig.cVal + 1 + replicaConfig.numRoReplicas); + uint16_t numOfReplicas = (uint16_t)(3 * replicaConfig.fVal + 2 * replicaConfig.cVal + 1 + + replicaConfig.numRoReplicas + replicaConfig.numFnReplicas); auto numOfClients = replicaConfig.numOfClientProxies ? replicaConfig.numOfClientProxies : replicaConfig.numOfExternalClients; #ifdef USE_COMM_PLAIN_TCP diff --git a/kvbc/src/Replica.cpp b/kvbc/src/Replica.cpp index 2dff5b6df8..94c6229f41 100644 --- a/kvbc/src/Replica.cpp +++ b/kvbc/src/Replica.cpp @@ -58,16 +58,23 @@ Status Replica::initInternals() { m_currentRepStatus = RepStatus::Starting; - if (replicaConfig_.isReadOnly) { - LOG_INFO(logger, - "ReadOnly Replica Status:" << KVLOG(getLastBlockNum(), getLastBlockId(), getLastReachableBlockNum())); + if (replicaConfig_.isReadOnly || replicaConfig_.isFullNode) { auto requestHandler = bftEngine::IRequestsHandler::createRequestsHandler(m_cmdHandler, cronTableRegistry_); requestHandler->setReconfigurationHandler(std::make_shared( bftEngine::ReplicaConfig::instance().pathToOperatorPublicKey_, bftEngine::ReplicaConfig::instance().operatorMsgSigningAlgo, *this)); - m_replicaPtr = bftEngine::ReplicaFactory::createRoReplica( - replicaConfig_, requestHandler, m_stateTransfer, m_ptrComm.get(), m_metadataStorage); + if (replicaConfig_.isReadOnly || replicaConfig_.isFullNode) { + LOG_INFO(logger, + "Read Replica Status:" << KVLOG(getLastBlockNum(), getLastBlockId(), getLastReachableBlockNum())); + m_replicaPtr = bftEngine::ReplicaFactory::createRoReplica( + replicaConfig_, requestHandler, m_stateTransfer, m_ptrComm.get(), m_metadataStorage); + } else { + LOG_INFO(logger, + "FullNode Replica Status:" << KVLOG(getLastBlockNum(), getLastBlockId(), getLastReachableBlockNum())); + m_replicaPtr = bftEngine::ReplicaFactory::createFullNodeReplica( + replicaConfig_, requestHandler, m_stateTransfer, m_ptrComm.get(), m_metadataStorage); + } m_stateTransfer->addOnTransferringCompleteCallback([this](std::uint64_t) { std::vector stateFromReservedPages; uint64_t wedgePt{0}; @@ -330,7 +337,7 @@ void Replica::createReplicaAndSyncState() { const auto lastExecutedSeqNum = m_replicaPtr->getLastExecutedSequenceNum(); LOG_INFO(logger, KVLOG(lastExecutedSeqNum)); - if (!replicaConfig_.isReadOnly && !m_stateTransfer->isCollectingState()) { + if (!replicaConfig_.isReadOnly && !replicaConfig_.isFullNode && !m_stateTransfer->isCollectingState()) { try { const auto maxNumOfBlocksToDelete = replicaConfig_.maxNumOfRequestsInBatch; const auto removedBlocksNum = replicaStateSync_->execute( @@ -492,8 +499,10 @@ Replica::Replica(ICommunication *comm, replicaConfig_.cVal, replicaConfig_.numReplicas, replicaConfig_.numRoReplicas, + replicaConfig_.numFnReplicas, replicaConfig_.get("concord.bft.st.pedanticChecks", false), replicaConfig_.isReadOnly, + replicaConfig_.isFullNode, #if defined USE_COMM_PLAIN_TCP || defined USE_COMM_TLS_TCP replicaConfig_.get("concord.bft.st.maxChunkSize", 30 * 1024 * 1024), @@ -543,7 +552,7 @@ Replica::Replica(ICommunication *comm, stConfig.gettingMissingBlocksSummaryWindowSize = 50; } - if (!replicaConfig.isReadOnly) { + if (!replicaConfig.isReadOnly && !replicaConfig.isFullNode) { const auto linkStChain = true; { auto [it, inserted] = kvbc_categories.insert( @@ -592,12 +601,18 @@ Replica::Replica(ICommunication *comm, return oss.str(); }); registrar.status.registerHandler(handler); + } else if (replicaConfig.isFullNode) { + op_kvBlockchain.emplace(storage::rocksdb::NativeClient::fromIDBClient(m_dbSet.dataDBClient), + true, + kvbc_categories, + concord::kvbc::adapter::aux::AdapterAuxTypes(this->aggregator_)); + m_kvBlockchain = &(op_kvBlockchain.value()); } m_dbSet.dataDBClient->setAggregator(aggregator_); m_dbSet.metadataDBClient->setAggregator(aggregator_); auto stKeyManipulator = std::shared_ptr{storageFactory->newSTKeyManipulator()}; m_stateTransfer = bftEngine::bcst::create(stConfig, this, m_metadataDBClient, stKeyManipulator, aggregator_); - if (!replicaConfig.isReadOnly) { + if (!replicaConfig.isReadOnly && !replicaConfig.isFullNode) { stReconfigurationSM_ = std::make_unique(*m_stateTransfer, *this); m_metadataStorage = new DBMetadataStorage(m_metadataDBClient.get(), storageFactory->newMetadataKeyManipulator()); } else { diff --git a/kvbc/src/reconfiguration_kvbc_handler.cpp b/kvbc/src/reconfiguration_kvbc_handler.cpp index 7e2a284cd8..c4f72f7489 100644 --- a/kvbc/src/reconfiguration_kvbc_handler.cpp +++ b/kvbc/src/reconfiguration_kvbc_handler.cpp @@ -464,7 +464,8 @@ bool KvbcClientReconfigurationHandler::handle(const concord::messages::ClientRec const std::optional& ts, concord::messages::ReconfigurationResponse& rres) { concord::messages::ClientReconfigurationStateReply rep; - uint16_t first_client_id = ReplicaConfig::instance().numReplicas + ReplicaConfig::instance().numRoReplicas; + uint16_t first_client_id = ReplicaConfig::instance().numReplicas + ReplicaConfig::instance().numRoReplicas + + ReplicaConfig::instance().numFnReplicas; if (sender_id > first_client_id) { for (uint8_t i = kvbc::keyTypes::CLIENT_COMMAND_TYPES::start_ + 1; i < kvbc::keyTypes::CLIENT_COMMAND_TYPES::end_; i++) { @@ -698,7 +699,10 @@ bool ReconfigurationHandler::handle(const concord::messages::AddRemoveWithWedgeC auto execute_key_prefix = std::string{kvbc::keyTypes::reconfiguration_client_data_prefix, static_cast(kvbc::keyTypes::CLIENT_COMMAND_TYPES::CLIENT_SCALING_EXECUTE_COMMAND)}; - for (uint64_t i = 0; i < ReplicaConfig::instance().numReplicas + ReplicaConfig::instance().numRoReplicas; i++) { + + for (auto i = 0; i < ReplicaConfig::instance().numReplicas + ReplicaConfig::instance().numRoReplicas + + ReplicaConfig::instance().numFnReplicas; + i++) { concord::messages::ClientsAddRemoveExecuteCommand cmd; cmd.config_descriptor = command.config_descriptor; if (token.find(i) == token.end()) continue; diff --git a/tests/simpleKVBC/TesterReplica/setup.cpp b/tests/simpleKVBC/TesterReplica/setup.cpp index af8e1620df..4d74a34c41 100644 --- a/tests/simpleKVBC/TesterReplica/setup.cpp +++ b/tests/simpleKVBC/TesterReplica/setup.cpp @@ -370,8 +370,8 @@ std::unique_ptr TestSetup::ParseArgs(int argc, char** argv) { TestCommConfig testCommConfig(logger); testCommConfig.GetReplicaConfig(replicaConfig.replicaId, keysFilePrefix, &replicaConfig); - uint16_t numOfReplicas = - (uint16_t)(3 * replicaConfig.fVal + 2 * replicaConfig.cVal + 1 + replicaConfig.numRoReplicas); + uint16_t numOfReplicas = (uint16_t)(3 * replicaConfig.fVal + 2 * replicaConfig.cVal + 1 + + replicaConfig.numRoReplicas + replicaConfig.numFnReplicas); auto numOfClients = replicaConfig.numOfClientProxies ? replicaConfig.numOfClientProxies : replicaConfig.numOfExternalClients; std::shared_ptr sm_ =