Skip to content

Commit

Permalink
Fixed review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
[email protected] authored and [email protected] committed May 5, 2023
1 parent babbc5a commit c1204d4
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 133 deletions.
16 changes: 8 additions & 8 deletions bftengine/include/bcstatetransfer/SimpleBCStateTransfer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,19 +193,21 @@ inline std::ostream &operator<<(std::ostream &os, const Config &c) {
c.cVal,
c.numReplicas,
c.numRoReplicas,
c.numFnReplicas,
c.pedanticChecks,
c.isReadOnly,
c.isFullNode,
c.maxChunkSize,
c.maxNumberOfChunksInBatch,
c.maxBlockSize,
c.maxPendingDataFromSourceReplica,
c.maxNumOfReservedPages,
c.sizeOfReservedPage,
c.gettingMissingBlocksSummaryWindowSize,
c.minPrePrepareMsgsForPrimaryAwareness,
c.fetchRangeSize);
c.gettingMissingBlocksSummaryWindowSize);
os << ",";
os << KVLOG(c.RVT_K,
os << KVLOG(c.minPrePrepareMsgsForPrimaryAwareness,
c.fetchRangeSize,
c.RVT_K,
c.refreshTimerMs,
c.checkpointSummariesRetransmissionTimeoutMs,
c.maxAcceptableMsgDelayMs,
Expand All @@ -218,11 +220,9 @@ inline std::ostream &operator<<(std::ostream &os, const Config &c) {
c.sourcePerformanceSnapshotFrequencySec,
c.runInSeparateThread,
c.enableReservedPages,
c.enableSourceBlocksPreFetch,
c.enableSourceSelectorPrimaryAwareness,
c.enableStoreRvbDataDuringCheckpointing);
c.enableSourceBlocksPreFetch);
os << ",";
os << KVLOG(c.numFnReplicas, c.isFullNode);
os << KVLOG(c.enableSourceSelectorPrimaryAwareness, c.enableStoreRvbDataDuringCheckpointing);
return os;
}
// creates an instance of the state transfer module.
Expand Down
5 changes: 2 additions & 3 deletions bftengine/src/bftengine/ClientsManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,8 @@ ClientsManager::ClientsManager(const std::set<NodeIdType>& proxyClients,
ConcordAssert(maxNumOfReqsPerClient_ > 0);
reservedPagesPerRequest_ = reservedPagesPerRequest(sizeOfReservedPage(), maxReplySize_);
reservedPagesPerClient_ = reservedPagesPerClient(sizeOfReservedPage(), maxReplySize_, maxNumOfReqsPerClient_);
for (NodeIdType i = 0; i < ReplicaConfig::instance().numReplicas + ReplicaConfig::instance().numRoReplicas +
ReplicaConfig::instance().numFnReplicas;
i++) {
const auto& config = ReplicaConfig::instance();
for (NodeIdType i{}; i < config.numReplicas + config.numRoReplicas + config.numFnReplicas; ++i) {
clientIds_.insert(i);
}
clientIds_.insert(proxyClients_.begin(), proxyClients_.end());
Expand Down
168 changes: 54 additions & 114 deletions bftengine/src/bftengine/FullNodeReplica.cpp
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Concord
//
// Copyright (c) 2018, 2019 VMware, Inc. All Rights Reserved.
// Copyright (c) 2023 VMware, Inc. All Rights Reserved.
//
// This product is licensed to you under the Apache 2.0 license (the "License"). You may not use this product except in
// compliance with the Apache 2.0 License.
Expand All @@ -13,8 +13,8 @@
#include <functional>
#include <bitset>

#include <bftengine/Replica.hpp>
#include <messages/StateTransferMsg.hpp>
#include "bftengine/Replica.hpp"
#include "messages/StateTransferMsg.hpp"
#include "FullNodeReplica.hpp"

#include "log/logger.hpp"
Expand All @@ -33,32 +33,31 @@
#include "communication/StateControl.hpp"

using concordUtil::Timers;
using namespace std::placeholders;

// Note : The changes in files are inclined with RO replica SateTransfer behavior, all the class functions are inherited
// from ReadOnlyReplica. As we know for timebeing StateTransfer functionality is a temporary solution for FullNode,
// until the ASP/BSP is implemented the functions in this class needs to be changed based on the required accordingly.

namespace bftEngine::impl {

FullNodeReplica::FullNodeReplica(const ReplicaConfig &config,
std::shared_ptr<IRequestsHandler> requestsHandler,
IStateTransfer *stateTransfer,
std::shared_ptr<MsgsCommunicator> msgComm,
std::shared_ptr<MsgHandlersRegistrator> msgHandlerReg,
std::shared_ptr<IRequestsHandler> requests_handler,
IStateTransfer *state_transfer,
std::shared_ptr<MsgsCommunicator> msg_comm,
std::shared_ptr<MsgHandlersRegistrator> msg_handler_reg,
concordUtil::Timers &timers,
MetadataStorage *metadataStorage)
: ReplicaForStateTransfer(config, requestsHandler, stateTransfer, msgComm, msgHandlerReg, true, timers),
MetadataStorage *metadata_storage)
: ReplicaForStateTransfer(config, requests_handler, state_transfer, msg_comm, msg_handler_reg, true, timers),
fn_metrics_{metrics_.RegisterCounter("receivedCheckpointMsgs"),
metrics_.RegisterCounter("sentAskForCheckpointMsgs"),
metrics_.RegisterCounter("receivedInvalidMsgs"),
metrics_.RegisterGauge("lastExecutedSeqNum", lastExecutedSeqNum)},
metadataStorage_{metadataStorage} {
metadata_storage_{metadata_storage} {
LOG_INFO(GL, "Initialising Full Node Replica");
repsInfo = new ReplicasInfo(config, dynamicCollectorForPartialProofs, dynamicCollectorForExecutionProofs);
msgHandlers_->registerMsgHandler(
MsgCode::Checkpoint, std::bind(&FullNodeReplica::messageHandler<CheckpointMsg>, this, std::placeholders::_1));
msgHandlers_->registerMsgHandler(
MsgCode::ClientRequest,
std::bind(&FullNodeReplica::messageHandler<ClientRequestMsg>, this, std::placeholders::_1));
msgHandlers_->registerMsgHandler(
MsgCode::StateTransfer,
std::bind(&FullNodeReplica::messageHandler<StateTransferMsg>, this, std::placeholders::_1));

registerMsgHandlers();
metrics_.Register();

SigManager::init(config_.replicaId,
Expand Down Expand Up @@ -96,10 +95,8 @@ void FullNodeReplica::stop() {
}

void FullNodeReplica::onTransferringCompleteImp(uint64_t newStateCheckpoint) {
lastExecutedSeqNum = newStateCheckpoint * checkpointWindowSize;

fn_metrics_.last_executed_seq_num_.Get().Set(lastExecutedSeqNum);
last_executed_seq_num_ = lastExecutedSeqNum;
last_executed_seq_num_ = newStateCheckpoint * checkpointWindowSize;
fn_metrics_.last_executed_seq_num_.Get().Set(last_executed_seq_num_);
}

void FullNodeReplica::onReportAboutInvalidMessage(MessageBase *msg, const char *reason) {
Expand All @@ -111,8 +108,8 @@ void FullNodeReplica::onReportAboutInvalidMessage(MessageBase *msg, const char *
void FullNodeReplica::sendAskForCheckpointMsg() {
fn_metrics_.sent_ask_for_checkpoint_msg_++;
LOG_INFO(GL, "sending AskForCheckpointMsg");
auto msg = std::make_unique<AskForCheckpointMsg>(config_.replicaId);
for (auto id : repsInfo->idsOfPeerReplicas()) send(msg.get(), id);
AskForCheckpointMsg msg{config_.replicaId};
for (auto id : repsInfo->idsOfPeerReplicas()) send(&msg, id);
}

template <>
Expand All @@ -139,24 +136,24 @@ void FullNodeReplica::onMessage<CheckpointMsg>(std::unique_ptr<CheckpointMsg> ms
msg->rvbDataDigest()));

// Reconfiguration cmd block is synced to RO replica via reserved pages
EpochNum replicasLastKnownEpochVal = 0;
auto epochNumberFromResPages = ReconfigurationCmd::instance().getReconfigurationCommandEpochNumber();
if (epochNumberFromResPages.has_value()) replicasLastKnownEpochVal = epochNumberFromResPages.value();
EpochNum replicas_last_known_epoch_val = 0;
auto epoch_number_from_res_pages = ReconfigurationCmd::instance().getReconfigurationCommandEpochNumber();
if (epoch_number_from_res_pages.has_value()) replicas_last_known_epoch_val = epoch_number_from_res_pages.value();

// not relevant
if (!msg->isStableState() || msg->seqNumber() <= lastExecutedSeqNum ||
msg->epochNumber() < replicasLastKnownEpochVal) {
msg->epochNumber() < replicas_last_known_epoch_val) {
return;
}
// no self certificate
static std::map<SeqNum, CheckpointInfo<false>> checkpointsInfo;
const auto msgSeqNum = msg->seqNumber();
const auto idOfGeneratedReplica = msg->idOfGeneratedReplica();
checkpointsInfo[msgSeqNum].addCheckpointMsg(msg.release(), idOfGeneratedReplica);
static std::map<SeqNum, CheckpointInfo<false>> checkpoints_info;
const auto msg_seq_num = msg->seqNumber();
const auto id_of_generated_eplica = msg->idOfGeneratedReplica();
checkpoints_info[msg_seq_num].addCheckpointMsg(msg.release(), id_of_generated_eplica);
// if enough - invoke state transfer
if (checkpointsInfo[msgSeqNum].isCheckpointCertificateComplete()) {
persistCheckpointDescriptor(msgSeqNum, checkpointsInfo[msgSeqNum]);
checkpointsInfo.clear();
if (checkpoints_info[msg_seq_num].isCheckpointCertificateComplete()) {
persistCheckpointDescriptor(msg_seq_num, checkpoints_info[msg_seq_num]);
checkpoints_info.clear();
LOG_INFO(GL, "call to startCollectingState()");
stateTransfer->startCollectingState();
}
Expand All @@ -177,103 +174,37 @@ void FullNodeReplica::persistCheckpointDescriptor(const SeqNum &seqnum, const Ch
m.second->idOfGeneratedReplica()));
}
DescriptorOfLastStableCheckpoint desc(ReplicaConfig::instance().getnumReplicas(), msgs);
const size_t bufLen = DescriptorOfLastStableCheckpoint::maxSize(ReplicaConfig::instance().getnumReplicas());
concord::serialize::UniquePtrToChar descBuf(new char[bufLen]);
char *descBufPtr = descBuf.get();
size_t actualSize = 0;
desc.serialize(descBufPtr, bufLen, actualSize);
ConcordAssertNE(actualSize, 0);
const size_t buf_len = DescriptorOfLastStableCheckpoint::maxSize(ReplicaConfig::instance().getnumReplicas());
concord::serialize::UniquePtrToChar desc_buf(new char[buf_len]);
char *desc_buf_ptr = desc_buf.get();
size_t actual_size = 0;
desc.serialize(desc_buf_ptr, buf_len, actual_size);
ConcordAssertNE(actual_size, 0);

// TODO [TK] S3KeyGenerator
// checkpoints/<BlockId>/<RepId>
std::ostringstream oss;
oss << "checkpoints/" << msgs[0]->state() << "/" << config_.replicaId;
metadataStorage_->atomicWriteArbitraryObject(oss.str(), descBuf.get(), actualSize);
metadata_storage_->atomicWriteArbitraryObject(oss.str(), desc_buf.get(), actual_size);
}

template <>
void FullNodeReplica::onMessage<ClientRequestMsg>(std::unique_ptr<ClientRequestMsg> msg) {
const NodeIdType senderId = msg->senderId();
const NodeIdType clientId = msg->clientProxyId();
const bool reconfig_flag = (msg->flags() & MsgFlag::RECONFIG_FLAG) != 0;
const ReqId reqSeqNum = msg->requestSeqNum();
const NodeIdType sender_id = msg->senderId();
const NodeIdType client_id = msg->clientProxyId();
const ReqId req_seq_num = msg->requestSeqNum();
const uint64_t flags = msg->flags();

SCOPED_MDC_CID(msg->getCid());
LOG_DEBUG(CNSUS, KVLOG(clientId, reqSeqNum, senderId) << " flags: " << std::bitset<sizeof(uint64_t) * 8>(flags));
LOG_DEBUG(CNSUS, KVLOG(client_id, req_seq_num, sender_id) << " flags: " << std::bitset<sizeof(uint64_t) * 8>(flags));

const auto &span_context = msg->spanContext<std::remove_pointer<ClientRequestMsg>::type>();
auto span = concordUtils::startChildSpanFromContext(span_context, "bft_client_request");
span.setTag("rid", config_.getreplicaId());
span.setTag("cid", msg->getCid());
span.setTag("seq_num", reqSeqNum);

// A full node replica can handle only reconfiguration requests. Those requests are signed by the operator and
// the validation is done in the reconfiguration engine. Thus, we don't need to check the client validity as in
// the committers
span.setTag("seq_num", req_seq_num);

if (reconfig_flag) {
LOG_INFO(GL, "FN replica has received a reconfiguration request");
executeReadOnlyRequest(span, *(msg.get()));
return;
}
}

void FullNodeReplica::executeReadOnlyRequest(concordUtils::SpanWrapper &parent_span, const ClientRequestMsg &request) {
auto span = concordUtils::startChildSpan("bft_execute_read_only_request", parent_span);
// full node replica does not know who is the primary, so it always return 0. It is the client responsibility to treat
// the replies accordingly.
ClientReplyMsg reply(0, request.requestSeqNum(), config_.getreplicaId());
const uint16_t clientId = request.clientProxyId();

int executionResult = 0;
bftEngine::IRequestsHandler::ExecutionRequestsQueue accumulatedRequests;
accumulatedRequests.push_back(bftEngine::IRequestsHandler::ExecutionRequest{clientId,
static_cast<uint64_t>(lastExecutedSeqNum),
request.getCid(),
request.flags(),
request.requestLength(),
request.requestBuf(),
"",
reply.maxReplyLength(),
reply.replyBuf(),
request.requestSeqNum(),
request.requestIndexInBatch(),
request.result()});
// DD: Do we need to take care of Time Service here?
bftRequestsHandler_->execute(accumulatedRequests, std::nullopt, request.getCid(), span);
IRequestsHandler::ExecutionRequest &single_request = accumulatedRequests.back();
executionResult = single_request.outExecutionStatus;
const uint32_t actualReplyLength = single_request.outActualReplySize;
const uint32_t actualReplicaSpecificInfoLength = single_request.outReplicaSpecificInfoSize;
LOG_DEBUG(GL,
"Executed full node request. " << KVLOG(clientId,
lastExecutedSeqNum,
request.requestLength(),
reply.maxReplyLength(),
actualReplyLength,
actualReplicaSpecificInfoLength,
executionResult));
// TODO(GG): TBD - how do we want to support empty replies? (actualReplyLength==0)
if (!executionResult) {
if (actualReplyLength > 0) {
reply.setReplyLength(actualReplyLength);
reply.setReplicaSpecificInfoLength(actualReplicaSpecificInfoLength);
send(&reply, clientId);
return;
} else {
LOG_WARN(GL, "Received zero size response. " << KVLOG(clientId));
strcpy(single_request.outReply, "Executed data is empty");
single_request.outActualReplySize = strlen(single_request.outReply);
executionResult = static_cast<uint32_t>(bftEngine::OperationResult::EXEC_DATA_EMPTY);
}

} else {
LOG_ERROR(GL, "Received error while executing FN request. " << KVLOG(clientId, executionResult));
}
ClientReplyMsg replyMsg(
0, request.requestSeqNum(), single_request.outReply, single_request.outActualReplySize, executionResult);
send(&replyMsg, clientId);
// TODO: handle reconfiguration request here, refer ReadOnlyReplica class
}

void FullNodeReplica::registerStatusHandlers() {
Expand All @@ -292,4 +223,13 @@ void FullNodeReplica::registerStatusHandlers() {
concord::diagnostics::RegistrarSingleton::getInstance().status.registerHandler(h);
}

void FullNodeReplica::registerMsgHandlers() {
msgHandlers_->registerMsgHandler(MsgCode::Checkpoint,
std::bind(&FullNodeReplica::messageHandler<CheckpointMsg>, this, _1));
msgHandlers_->registerMsgHandler(MsgCode::ClientRequest,
std::bind(&FullNodeReplica::messageHandler<ClientRequestMsg>, this, _1));
msgHandlers_->registerMsgHandler(MsgCode::StateTransfer,
std::bind(&FullNodeReplica::messageHandler<StateTransferMsg>, this, _1));
}

} // namespace bftEngine::impl
9 changes: 5 additions & 4 deletions bftengine/src/bftengine/FullNodeReplica.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class FullNodeReplica : public ReplicaForStateTransfer {

void start() override;
void stop() override;
virtual bool isReadOnly() const override { return false; }
virtual bool isFullNode() const override { return true; }
virtual bool isReadOnly() const override { return config_.isReadOnly; }
virtual bool isFullNode() const override { return config_.isFullNode; }

protected:
void sendAskForCheckpointMsg();
Expand All @@ -54,7 +54,6 @@ class FullNodeReplica : public ReplicaForStateTransfer {
template <class T>
void onMessage(std::unique_ptr<T>);

void executeReadOnlyRequest(concordUtils::SpanWrapper& parent_span, const ClientRequestMsg& m);
void persistCheckpointDescriptor(const SeqNum&, const CheckpointInfo<false>&);

protected:
Expand All @@ -67,7 +66,7 @@ class FullNodeReplica : public ReplicaForStateTransfer {
concordMetrics::GaugeHandle last_executed_seq_num_;
} fn_metrics_;

std::unique_ptr<MetadataStorage> metadataStorage_;
std::unique_ptr<MetadataStorage> metadata_storage_;
std::atomic<SeqNum> last_executed_seq_num_{0};

private:
Expand All @@ -76,6 +75,8 @@ class FullNodeReplica : public ReplicaForStateTransfer {
// The full node replica also hasn't got an implementation for InternalMessages which are used by the
// ReplicaStatusHandler.
void registerStatusHandlers();

void registerMsgHandlers();
};

} // namespace bftEngine::impl
4 changes: 2 additions & 2 deletions bftengine/src/bftengine/ReadOnlyReplica.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class ReadOnlyReplica : public ReplicaForStateTransfer {

void start() override;
void stop() override;
virtual bool isReadOnly() const override { return true; }
virtual bool isFullNode() const override { return false; }
virtual bool isReadOnly() const override { return config_.isReadOnly; }
virtual bool isFullNode() const override { return config_.isFullNode; }

protected:
void sendAskForCheckpointMsg();
Expand Down
4 changes: 2 additions & 2 deletions bftengine/src/bftengine/ReplicaImp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,9 @@ class ReplicaImp : public InternalReplicaApi, public ReplicaForStateTransfer {

std::shared_ptr<concord::cron::TicksGenerator> ticksGenerator() const { return ticks_gen_; }

virtual bool isReadOnly() const override { return false; }
virtual bool isReadOnly() const override { return config_.isReadOnly; }

virtual bool isFullNode() const override { return false; }
virtual bool isFullNode() const override { return config_.isFullNode; }

shared_ptr<PersistentStorage> getPersistentStorage() const { return ps_; }
std::shared_ptr<concord::secretsmanager::ISecretsManagerImpl> getSecretsManager() { return sm_; }
Expand Down

0 comments on commit c1204d4

Please sign in to comment.