Skip to content

Commit

Permalink
Merge pull request #2729 from situ-s/st_fix
Browse files Browse the repository at this point in the history
Fix for replica crash in collecting state
  • Loading branch information
situ-s authored Sep 20, 2022
2 parents de4b3b7 + 39cf88c commit 1f3edbb
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 13 deletions.
26 changes: 16 additions & 10 deletions bftengine/src/bftengine/ReplicaImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2427,14 +2427,11 @@ void ReplicaImp::onMessage<CheckpointMsg>(CheckpointMsg *msg) {
}
}

if (askForStateTransfer && !stateTransfer->isCollectingState()) {
if (askForStateTransfer && !isCollectingState()) {
if (activeExecutions_ > 0)
isStartCollectingState_ = true;
else {
LOG_INFO(GL, "Call to startCollectingState()");
time_in_state_transfer_.start();
clientsManager->clearAllPendingRequests(); // to avoid entering a new view on old request timeout
stateTransfer->startCollectingState();
startCollectingState("On receiving checkpoint message");
}
} else if (msgSenderId == msgGenReplicaId) {
if (msgSeqNum > lastStableSeqNum + kWorkWindowSize) {
Expand Down Expand Up @@ -3365,6 +3362,7 @@ void ReplicaImp::onTransferringCompleteImp(uint64_t newStateCheckpoint) {
if (ps_) {
ps_->endWriteTran(config_.getsyncOnUpdateOfMetadata());
}
setIsCollectingState(false);
return;
}
lastExecutedSeqNum = newCheckpointSeqNum;
Expand Down Expand Up @@ -3430,6 +3428,8 @@ void ReplicaImp::onTransferringCompleteImp(uint64_t newStateCheckpoint) {
LOG_INFO(GL, "tryToEnterView after State Transfer finished ...");
tryToEnterView();
}

setIsCollectingState(false);
}

void ReplicaImp::onSeqNumIsSuperStable(SeqNum superStableSeqNum) {
Expand Down Expand Up @@ -4298,6 +4298,7 @@ ReplicaImp::ReplicaImp(bool firstTime,
shared_ptr<PersistentStorage> ps,
const std::function<void(bool)> &viewChangeCallBack)
: ReplicaForStateTransfer(config, requestsHandler, stateTrans, msgsCommunicator, msgHandlers, firstTime, timers),
isCollectingState_{stateTransfer->isCollectingState()},
viewChangeProtocolEnabled{config.viewChangeProtocolEnabled},
autoPrimaryRotationEnabled{config.autoPrimaryRotationEnabled},
restarted_{!firstTime},
Expand Down Expand Up @@ -5256,13 +5257,18 @@ void ReplicaImp::updateLimitsAndMetrics(PrePrepareMsg *ppMsg) {
}
}

void ReplicaImp::startCollectingState(std::string &&reason) {
setIsCollectingState(true);
LOG_INFO(GL, "Start Collecting State" << KVLOG(reason));
time_in_state_transfer_.start();
clientsManager->clearAllPendingRequests(); // to avoid entering a new view on old request timeout
stateTransfer->startCollectingState();
}

void ReplicaImp::handleDeferredRequests() {
if (isStartCollectingState_) {
if (!stateTransfer->isCollectingState()) {
LOG_INFO(GL, "Call to startCollectingState()");
time_in_state_transfer_.start();
clientsManager->clearAllPendingRequests(); // to avoid entering a new view on old request timeout
stateTransfer->startCollectingState();
if (!isCollectingState()) {
startCollectingState("Handle Deferred Requests");
} else {
LOG_ERROR(GL, "Collecting state should be active while we are in onExecutionFinish");
}
Expand Down
13 changes: 12 additions & 1 deletion bftengine/src/bftengine/ReplicaImp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ using concordMetrics::StatusHandle;

class ReplicaImp : public InternalReplicaApi, public ReplicaForStateTransfer {
protected:
std::atomic_bool isCollectingState_;
const bool viewChangeProtocolEnabled;
const bool autoPrimaryRotationEnabled;

Expand Down Expand Up @@ -368,7 +369,11 @@ class ReplicaImp : public InternalReplicaApi, public ReplicaForStateTransfer {
std::function<bool(MessageBase*)> getMessageValidator();

// InternalReplicaApi
bool isCollectingState() const override { return stateTransfer->isCollectingState(); }
bool isCollectingState() const override {
LOG_INFO(GL, "Thread ID: " << std::this_thread::get_id());
return isCollectingState_;
}
void startCollectingState(std::string&& reason = "");
bool isValidClient(NodeIdType clientId) const override { return clientsManager->isValidClient(clientId); }
bool isIdOfReplica(NodeIdType id) const override { return repsInfo->isIdOfReplica(id); }
const std::set<ReplicaId>& getIdsOfPeerReplicas() const override { return repsInfo->idsOfPeerReplicas(); }
Expand All @@ -382,6 +387,12 @@ class ReplicaImp : public InternalReplicaApi, public ReplicaForStateTransfer {
bool isClientRequestInProcess(NodeIdType clientId, ReqId reqSeqNum) const override {
return clientsManager->isClientRequestInProcess(clientId, reqSeqNum);
}
inline void setIsCollectingState(bool newState) {
LOG_INFO(GL,
std::boolalpha << "Setting CollectingState to" << KVLOG(newState)
<< " Thread ID: " << std::this_thread::get_id());
isCollectingState_ = newState;
}
SeqNum getPrimaryLastUsedSeqNum() const override { return primaryLastUsedSeqNum; }
uint64_t getRequestsInQueue() const override { return requestsQueueOfPrimary.size(); }
SeqNum getLastExecutedSeqNum() const override { return lastExecutedSeqNum; }
Expand Down
2 changes: 0 additions & 2 deletions bftengine/src/simplestatetransfer/SimpleStateTran.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -531,8 +531,6 @@ void SimpleStateTran::startCollectingState() {

bool SimpleStateTran::isCollectingState() const {
ConcordAssert(isInitialized());
ConcordAssert(internalST_->isRunning());

return internalST_->isCollectingState();
}

Expand Down

0 comments on commit 1f3edbb

Please sign in to comment.