Skip to content

Commit

Permalink
Refactoring the ST changes in FullNode
Browse files Browse the repository at this point in the history
  • Loading branch information
Tarunkumar Banda committed Apr 25, 2023
1 parent e471315 commit 6059ae8
Show file tree
Hide file tree
Showing 13 changed files with 37 additions and 38 deletions.
4 changes: 1 addition & 3 deletions bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,7 @@ std::shared_ptr<ClientReconfigurationEngine> CreFactory::create(std::shared_ptr<
for (uint16_t i = 0; i < repConfig.numReplicas; i++) {
bftClientConf.all_replicas.emplace(bft::client::ReplicaId{i});
}
for (uint16_t i = repConfig.numReplicas;
i < repConfig.numReplicas + repConfig.numRoReplicas + repConfig.numFnReplicas;
i++) {
for (uint16_t i = repConfig.numReplicas; i < repConfig.numReplicas + repConfig.numRoReplicas; i++) {
bftClientConf.ro_replicas.emplace(bft::client::ReplicaId{i});
}
bftClientConf.replicas_master_key_folder_path = std::nullopt;
Expand Down
6 changes: 3 additions & 3 deletions bftengine/src/bcstatetransfer/BCStateTran.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3971,13 +3971,13 @@ void BCStateTran::computeDigestOfPage(
if (checkpointNumber > 0) {
digestGenerator.update(page, pageSize);
}
digestGenerator.writeDigest(reinterpret_cast<char *>(&outDigest));
digestGenerator.writeDigest(outDigest.getForUpdate());
}

void BCStateTran::computeDigestOfPagesDescriptor(const DataStore::ResPagesDescriptor *pagesDesc, Digest &outDigest) {
DigestGenerator digestGenerator;
digestGenerator.update(reinterpret_cast<const char *>(pagesDesc), pagesDesc->size());
digestGenerator.writeDigest(reinterpret_cast<char *>(&outDigest));
digestGenerator.writeDigest(outDigest.getForUpdate());
}

void BCStateTran::computeDigestOfBlockImpl(const uint64_t blockNum,
Expand All @@ -3996,7 +3996,7 @@ void BCStateTran::computeDigestOfBlock(const uint64_t blockNum,
const char *block,
const uint32_t blockSize,
Digest *outDigest) {
computeDigestOfBlockImpl(blockNum, block, blockSize, reinterpret_cast<char *>(outDigest));
computeDigestOfBlockImpl(blockNum, block, blockSize, outDigest->getForUpdate());
}

BlockDigest BCStateTran::computeDigestOfBlock(const uint64_t blockNum, const char *block, const uint32_t blockSize) {
Expand Down
26 changes: 13 additions & 13 deletions bftengine/src/bftengine/FullNodeReplica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ FullNodeReplica::FullNodeReplica(const ReplicaConfig &config,
concordUtil::Timers &timers,
MetadataStorage *metadataStorage)
: ReplicaForStateTransfer(config, requestsHandler, stateTransfer, msgComm, msgHandlerReg, true, timers),
ro_metrics_{metrics_.RegisterCounter("receivedCheckpointMsgs"),
fn_metrics_{metrics_.RegisterCounter("receivedCheckpointMsgs"),
metrics_.RegisterCounter("sentAskForCheckpointMsgs"),
metrics_.RegisterCounter("receivedInvalidMsgs"),
metrics_.RegisterGauge("lastExecutedSeqNum", lastExecutedSeqNum)},
metadataStorage_{metadataStorage} {
LOG_INFO(GL, "Initialising ReadOnly Replica");
LOG_INFO(GL, "Initialising Full Node Replica");
repsInfo = new ReplicasInfo(config, dynamicCollectorForPartialProofs, dynamicCollectorForExecutionProofs);
msgHandlers_->registerMsgHandler(
MsgCode::Checkpoint, std::bind(&FullNodeReplica::messageHandler<CheckpointMsg>, this, std::placeholders::_1));
Expand All @@ -72,7 +72,7 @@ FullNodeReplica::FullNodeReplica(const ReplicaConfig &config,
concord::crypto::KeyFormat::PemFormat}},
*repsInfo);

// Register status handler for Read-Only replica
// Register status handler for Full Node replica
registerStatusHandlers();
bft::communication::StateControl::instance().setGetPeerPubKeyMethod(
[&](uint32_t id) { return SigManager::instance()->getPublicKeyOfVerifier(id); });
Expand All @@ -98,18 +98,18 @@ void FullNodeReplica::stop() {
void FullNodeReplica::onTransferringCompleteImp(uint64_t newStateCheckpoint) {
lastExecutedSeqNum = newStateCheckpoint * checkpointWindowSize;

ro_metrics_.last_executed_seq_num_.Get().Set(lastExecutedSeqNum);
fn_metrics_.last_executed_seq_num_.Get().Set(lastExecutedSeqNum);
last_executed_seq_num_ = lastExecutedSeqNum;
}

void FullNodeReplica::onReportAboutInvalidMessage(MessageBase *msg, const char *reason) {
ro_metrics_.received_invalid_msg_++;
fn_metrics_.received_invalid_msg_++;
LOG_WARN(GL,
"Node " << config_.replicaId << " received invalid message from Node " << msg->senderId()
<< " type=" << msg->type() << " reason: " << reason);
}
void FullNodeReplica::sendAskForCheckpointMsg() {
ro_metrics_.sent_ask_for_checkpoint_msg_++;
fn_metrics_.sent_ask_for_checkpoint_msg_++;
LOG_INFO(GL, "sending AskForCheckpointMsg");
auto msg = std::make_unique<AskForCheckpointMsg>(config_.replicaId);
for (auto id : repsInfo->idsOfPeerReplicas()) send(msg.get(), id);
Expand All @@ -125,7 +125,7 @@ void FullNodeReplica::onMessage<CheckpointMsg>(std::unique_ptr<CheckpointMsg> ms
if (isCollectingState()) {
return;
}
ro_metrics_.received_checkpoint_msg_++;
fn_metrics_.received_checkpoint_msg_++;
LOG_INFO(GL,
KVLOG(msg->senderId(),
msg->idOfGeneratedReplica(),
Expand Down Expand Up @@ -208,20 +208,20 @@ void FullNodeReplica::onMessage<ClientRequestMsg>(std::unique_ptr<ClientRequestM
span.setTag("cid", msg->getCid());
span.setTag("seq_num", reqSeqNum);

// A read only replica can handle only reconfiguration requests. Those requests are signed by the operator and
// A full node replica can handle only reconfiguration requests. Those requests are signed by the operator and
// the validation is done in the reconfiguration engine. Thus, we don't need to check the client validity as in
// the committers

if (reconfig_flag) {
LOG_INFO(GL, "ro replica has received a reconfiguration request");
LOG_INFO(GL, "FN replica has received a reconfiguration request");
executeReadOnlyRequest(span, *(msg.get()));
return;
}
}

void FullNodeReplica::executeReadOnlyRequest(concordUtils::SpanWrapper &parent_span, const ClientRequestMsg &request) {
auto span = concordUtils::startChildSpan("bft_execute_read_only_request", parent_span);
// Read only replica does not know who is the primary, so it always return 0. It is the client responsibility to treat
// full node replica does not know who is the primary, so it always return 0. It is the client responsibility to treat
// the replies accordingly.
ClientReplyMsg reply(0, request.requestSeqNum(), config_.getreplicaId());
const uint16_t clientId = request.clientProxyId();
Expand All @@ -247,7 +247,7 @@ void FullNodeReplica::executeReadOnlyRequest(concordUtils::SpanWrapper &parent_s
const uint32_t actualReplyLength = single_request.outActualReplySize;
const uint32_t actualReplicaSpecificInfoLength = single_request.outReplicaSpecificInfoSize;
LOG_DEBUG(GL,
"Executed read only request. " << KVLOG(clientId,
"Executed full node request. " << KVLOG(clientId,
lastExecutedSeqNum,
request.requestLength(),
reply.maxReplyLength(),
Expand All @@ -269,7 +269,7 @@ void FullNodeReplica::executeReadOnlyRequest(concordUtils::SpanWrapper &parent_s
}

} else {
LOG_ERROR(GL, "Received error while executing RO request. " << KVLOG(clientId, executionResult));
LOG_ERROR(GL, "Received error while executing FN request. " << KVLOG(clientId, executionResult));
}
ClientReplyMsg replyMsg(
0, request.requestSeqNum(), single_request.outReply, single_request.outActualReplySize, executionResult);
Expand All @@ -278,7 +278,7 @@ void FullNodeReplica::executeReadOnlyRequest(concordUtils::SpanWrapper &parent_s

void FullNodeReplica::registerStatusHandlers() {
auto h = concord::diagnostics::StatusHandler(
"replica-sequence-numbers", "Last executed sequence number of the read-only replica", [this]() {
"replica-sequence-numbers", "Last executed sequence number of the full node replica", [this]() {
concordUtils::BuildJson bj;

bj.startJson();
Expand Down
9 changes: 5 additions & 4 deletions bftengine/src/bftengine/FullNodeReplica.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class FullNodeReplica : public ReplicaForStateTransfer {

void start() override;
void stop() override;
virtual bool isReadOnly() const override { return true; }
virtual bool isReadOnly() const override { return false; }
virtual bool isFullNode() const override { return true; }

protected:
void sendAskForCheckpointMsg();
Expand Down Expand Up @@ -64,15 +65,15 @@ class FullNodeReplica : public ReplicaForStateTransfer {
concordMetrics::CounterHandle sent_ask_for_checkpoint_msg_;
concordMetrics::CounterHandle received_invalid_msg_;
concordMetrics::GaugeHandle last_executed_seq_num_;
} ro_metrics_;
} fn_metrics_;

std::unique_ptr<MetadataStorage> metadataStorage_;
std::atomic<SeqNum> last_executed_seq_num_{0};

private:
// This function serves as an ReplicaStatusHandlers alternative for FullNodeReplica. The reason to use this function
// is that regular and read-only replicas expose different metrics and the status handlers are not interchangeable.
// The read-only replica also hasn't got an implementation for InternalMessages which are used by the
// is that regular and full node replicas expose different metrics and the status handlers are not interchangeable.
// The full node replica also hasn't got an implementation for InternalMessages which are used by the
// ReplicaStatusHandler.
void registerStatusHandlers();
};
Expand Down
1 change: 1 addition & 0 deletions bftengine/src/bftengine/ReadOnlyReplica.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class ReadOnlyReplica : public ReplicaForStateTransfer {
void start() override;
void stop() override;
virtual bool isReadOnly() const override { return true; }
virtual bool isFullNode() const override { return false; }

protected:
void sendAskForCheckpointMsg();
Expand Down
2 changes: 2 additions & 0 deletions bftengine/src/bftengine/ReplicaBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class ReplicaBase {

virtual bool isReadOnly() const = 0;

virtual bool isFullNode() const = 0;

std::shared_ptr<MsgsCommunicator> getMsgsCommunicator() const { return msgsCommunicator_; }
std::shared_ptr<MsgHandlersRegistrator> getMsgHandlersRegistrator() const { return msgHandlers_; }
concordUtil::Timers* getTimers() { return &timers_; }
Expand Down
2 changes: 1 addition & 1 deletion bftengine/src/bftengine/ReplicaFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ void ReplicaInternal::restartForDebug(uint32_t delayMillisec) {
}
}

if (!replica_->isReadOnly()) {
if (!replica_->isReadOnly() && !replica_->isFullNode()) {
auto replicaImp = dynamic_cast<ReplicaImp *>(replica_.get());

shared_ptr<PersistentStorage> persistentStorage(replicaImp->getPersistentStorage());
Expand Down
2 changes: 2 additions & 0 deletions bftengine/src/bftengine/ReplicaImp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,8 @@ class ReplicaImp : public InternalReplicaApi, public ReplicaForStateTransfer {

virtual bool isReadOnly() const override { return false; }

virtual bool isFullNode() const override { return false; }

shared_ptr<PersistentStorage> getPersistentStorage() const { return ps_; }
std::shared_ptr<concord::secretsmanager::ISecretsManagerImpl> getSecretsManager() { return sm_; }

Expand Down
3 changes: 1 addition & 2 deletions bftengine/src/preprocessor/PreProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,7 @@ PreProcessor::PreProcessor(shared_ptr<MsgsCommunicator> &msgsCommunicator,
myReplicaId_(myReplica.getReplicaConfig().replicaId),
maxExternalMsgSize_(myReplica.getReplicaConfig().maxExternalMessageSize),
responseSizeInternalOverhead_{/* for conflict detection BlockId */ sizeof(uint64_t)},
numOfReplicas_(myReplica.getReplicaConfig().numReplicas + myReplica.getReplicaConfig().numRoReplicas +
myReplica.getReplicaConfig().numFnReplicas),
numOfReplicas_(myReplica.getReplicaConfig().numReplicas + myReplica.getReplicaConfig().numRoReplicas),
numOfClientProxies_(myReplica.getReplicaConfig().numOfClientProxies),
clientBatchingEnabled_(myReplica.getReplicaConfig().clientBatchingEnabled),
threadPool_("PreProcessor::threadPool"),
Expand Down
4 changes: 2 additions & 2 deletions examples/replica/src/SetupReplica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ bft::communication::ICommunication* SetupReplica::createCommunication(
logging::Logger logger = getLogger();
TestCommConfig testCommConfig(logger);
testCommConfig.GetReplicaConfig(replicaConfig.replicaId, keysFilePrefix, &replicaConfig);
uint16_t numOfReplicas = (uint16_t)(3 * replicaConfig.fVal + 2 * replicaConfig.cVal + 1 +
replicaConfig.numRoReplicas + replicaConfig.numFnReplicas);
uint16_t numOfReplicas =
(uint16_t)(3 * replicaConfig.fVal + 2 * replicaConfig.cVal + 1 + replicaConfig.numRoReplicas);
auto numOfClients =
replicaConfig.numOfClientProxies ? replicaConfig.numOfClientProxies : replicaConfig.numOfExternalClients;
#ifdef USE_COMM_PLAIN_TCP
Expand Down
4 changes: 2 additions & 2 deletions kvbc/src/Replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ Status Replica::initInternals() {
bftEngine::ReplicaConfig::instance().pathToOperatorPublicKey_,
bftEngine::ReplicaConfig::instance().operatorMsgSigningAlgo,
*this));
if (replicaConfig_.isReadOnly || replicaConfig_.isFullNode) {
if (replicaConfig_.isReadOnly) {
LOG_INFO(logger,
"Read Replica Status:" << KVLOG(getLastBlockNum(), getLastBlockId(), getLastReachableBlockNum()));
"ReadOnly Replica Status:" << KVLOG(getLastBlockNum(), getLastBlockId(), getLastReachableBlockNum()));
m_replicaPtr = bftEngine::ReplicaFactory::createRoReplica(
replicaConfig_, requestHandler, m_stateTransfer, m_ptrComm.get(), m_metadataStorage);
} else {
Expand Down
8 changes: 2 additions & 6 deletions kvbc/src/reconfiguration_kvbc_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,7 @@ bool KvbcClientReconfigurationHandler::handle(const concord::messages::ClientRec
const std::optional<bftEngine::Timestamp>& ts,
concord::messages::ReconfigurationResponse& rres) {
concord::messages::ClientReconfigurationStateReply rep;
uint16_t first_client_id = ReplicaConfig::instance().numReplicas + ReplicaConfig::instance().numRoReplicas +
ReplicaConfig::instance().numFnReplicas;
uint16_t first_client_id = ReplicaConfig::instance().numReplicas + ReplicaConfig::instance().numRoReplicas;
if (sender_id > first_client_id) {
for (uint8_t i = kvbc::keyTypes::CLIENT_COMMAND_TYPES::start_ + 1; i < kvbc::keyTypes::CLIENT_COMMAND_TYPES::end_;
i++) {
Expand Down Expand Up @@ -699,10 +698,7 @@ bool ReconfigurationHandler::handle(const concord::messages::AddRemoveWithWedgeC
auto execute_key_prefix =
std::string{kvbc::keyTypes::reconfiguration_client_data_prefix,
static_cast<char>(kvbc::keyTypes::CLIENT_COMMAND_TYPES::CLIENT_SCALING_EXECUTE_COMMAND)};

for (auto i = 0; i < ReplicaConfig::instance().numReplicas + ReplicaConfig::instance().numRoReplicas +
ReplicaConfig::instance().numFnReplicas;
i++) {
for (uint64_t i = 0; i < ReplicaConfig::instance().numReplicas + ReplicaConfig::instance().numRoReplicas; i++) {
concord::messages::ClientsAddRemoveExecuteCommand cmd;
cmd.config_descriptor = command.config_descriptor;
if (token.find(i) == token.end()) continue;
Expand Down
4 changes: 2 additions & 2 deletions tests/simpleKVBC/TesterReplica/setup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,8 @@ std::unique_ptr<TestSetup> TestSetup::ParseArgs(int argc, char** argv) {

TestCommConfig testCommConfig(logger);
testCommConfig.GetReplicaConfig(replicaConfig.replicaId, keysFilePrefix, &replicaConfig);
uint16_t numOfReplicas = (uint16_t)(3 * replicaConfig.fVal + 2 * replicaConfig.cVal + 1 +
replicaConfig.numRoReplicas + replicaConfig.numFnReplicas);
uint16_t numOfReplicas =
(uint16_t)(3 * replicaConfig.fVal + 2 * replicaConfig.cVal + 1 + replicaConfig.numRoReplicas);
auto numOfClients =
replicaConfig.numOfClientProxies ? replicaConfig.numOfClientProxies : replicaConfig.numOfExternalClients;
std::shared_ptr<concord::secretsmanager::ISecretsManagerImpl> sm_ =
Expand Down

0 comments on commit 6059ae8

Please sign in to comment.