From 39cf88c4fb2bee1a60950abd58276742df2877e7 Mon Sep 17 00:00:00 2001 From: ssitu Date: Mon, 8 Aug 2022 22:28:03 -0700 Subject: [PATCH] Fix for replica crash in collecting state --- bftengine/src/bftengine/ReplicaImp.cpp | 26 ++++++++++++------- bftengine/src/bftengine/ReplicaImp.hpp | 13 +++++++++- .../simplestatetransfer/SimpleStateTran.cpp | 2 -- 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/bftengine/src/bftengine/ReplicaImp.cpp b/bftengine/src/bftengine/ReplicaImp.cpp index 72e34b0640..3c4917eca0 100644 --- a/bftengine/src/bftengine/ReplicaImp.cpp +++ b/bftengine/src/bftengine/ReplicaImp.cpp @@ -2416,14 +2416,11 @@ void ReplicaImp::onMessage(CheckpointMsg *msg) { } } - if (askForStateTransfer && !stateTransfer->isCollectingState()) { + if (askForStateTransfer && !isCollectingState()) { if (activeExecutions_ > 0) isStartCollectingState_ = true; else { - LOG_INFO(GL, "Call to startCollectingState()"); - time_in_state_transfer_.start(); - clientsManager->clearAllPendingRequests(); // to avoid entering a new view on old request timeout - stateTransfer->startCollectingState(); + startCollectingState("On receiving checkpoint message"); } } else if (msgSenderId == msgGenReplicaId) { if (msgSeqNum > lastStableSeqNum + kWorkWindowSize) { @@ -3354,6 +3351,7 @@ void ReplicaImp::onTransferringCompleteImp(uint64_t newStateCheckpoint) { if (ps_) { ps_->endWriteTran(config_.getsyncOnUpdateOfMetadata()); } + setIsCollectingState(false); return; } lastExecutedSeqNum = newCheckpointSeqNum; @@ -3419,6 +3417,8 @@ void ReplicaImp::onTransferringCompleteImp(uint64_t newStateCheckpoint) { LOG_INFO(GL, "tryToEnterView after State Transfer finished ..."); tryToEnterView(); } + + setIsCollectingState(false); } void ReplicaImp::onSeqNumIsSuperStable(SeqNum superStableSeqNum) { @@ -4285,6 +4285,7 @@ ReplicaImp::ReplicaImp(bool firstTime, shared_ptr ps, const std::function &viewChangeCallBack) : ReplicaForStateTransfer(config, requestsHandler, stateTrans, msgsCommunicator, msgHandlers, firstTime, timers), + isCollectingState_{stateTransfer->isCollectingState()}, viewChangeProtocolEnabled{config.viewChangeProtocolEnabled}, autoPrimaryRotationEnabled{config.autoPrimaryRotationEnabled}, restarted_{!firstTime}, @@ -5240,13 +5241,18 @@ void ReplicaImp::updateLimitsAndMetrics(PrePrepareMsg *ppMsg) { } } +void ReplicaImp::startCollectingState(std::string &&reason) { + setIsCollectingState(true); + LOG_INFO(GL, "Start Collecting State" << KVLOG(reason)); + time_in_state_transfer_.start(); + clientsManager->clearAllPendingRequests(); // to avoid entering a new view on old request timeout + stateTransfer->startCollectingState(); +} + void ReplicaImp::handleDeferredRequests() { if (isStartCollectingState_) { - if (!stateTransfer->isCollectingState()) { - LOG_INFO(GL, "Call to startCollectingState()"); - time_in_state_transfer_.start(); - clientsManager->clearAllPendingRequests(); // to avoid entering a new view on old request timeout - stateTransfer->startCollectingState(); + if (!isCollectingState()) { + startCollectingState("Handle Deferred Requests"); } else { LOG_ERROR(GL, "Collecting state should be active while we are in onExecutionFinish"); } diff --git a/bftengine/src/bftengine/ReplicaImp.hpp b/bftengine/src/bftengine/ReplicaImp.hpp index c0a52ca29d..cf5b00a467 100644 --- a/bftengine/src/bftengine/ReplicaImp.hpp +++ b/bftengine/src/bftengine/ReplicaImp.hpp @@ -78,6 +78,7 @@ using concordMetrics::StatusHandle; class ReplicaImp : public InternalReplicaApi, public ReplicaForStateTransfer { protected: + std::atomic_bool isCollectingState_; const bool viewChangeProtocolEnabled; const bool autoPrimaryRotationEnabled; @@ -368,7 +369,11 @@ class ReplicaImp : public InternalReplicaApi, public ReplicaForStateTransfer { std::function getMessageValidator(); // InternalReplicaApi - bool isCollectingState() const override { return stateTransfer->isCollectingState(); } + bool isCollectingState() const override { + LOG_INFO(GL, "Thread ID: " << std::this_thread::get_id()); + return isCollectingState_; + } + void startCollectingState(std::string&& reason = ""); bool isValidClient(NodeIdType clientId) const override { return clientsManager->isValidClient(clientId); } bool isIdOfReplica(NodeIdType id) const override { return repsInfo->isIdOfReplica(id); } const std::set& getIdsOfPeerReplicas() const override { return repsInfo->idsOfPeerReplicas(); } @@ -382,6 +387,12 @@ class ReplicaImp : public InternalReplicaApi, public ReplicaForStateTransfer { bool isClientRequestInProcess(NodeIdType clientId, ReqId reqSeqNum) const override { return clientsManager->isClientRequestInProcess(clientId, reqSeqNum); } + inline void setIsCollectingState(bool newState) { + LOG_INFO(GL, + std::boolalpha << "Setting CollectingState to" << KVLOG(newState) + << " Thread ID: " << std::this_thread::get_id()); + isCollectingState_ = newState; + } SeqNum getPrimaryLastUsedSeqNum() const override { return primaryLastUsedSeqNum; } uint64_t getRequestsInQueue() const override { return requestsQueueOfPrimary.size(); } SeqNum getLastExecutedSeqNum() const override { return lastExecutedSeqNum; } diff --git a/bftengine/src/simplestatetransfer/SimpleStateTran.cpp b/bftengine/src/simplestatetransfer/SimpleStateTran.cpp index 7817dd22e2..2f8e36c54c 100644 --- a/bftengine/src/simplestatetransfer/SimpleStateTran.cpp +++ b/bftengine/src/simplestatetransfer/SimpleStateTran.cpp @@ -531,8 +531,6 @@ void SimpleStateTran::startCollectingState() { bool SimpleStateTran::isCollectingState() const { ConcordAssert(isInitialized()); - ConcordAssert(internalST_->isRunning()); - return internalST_->isCollectingState(); }