Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the CRE (possibly infinite) loop from the end of state transfer. #2995

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ class InternalSigner : public concord::crypto::ISigner {
std::string getPrivKey() const override { return ""; }
};

// Scaling command may break state transfer itself (if, for example, we scale from n1 to n2 < n1 replicas). For that we
// need a client like mechanism which work asynchronously to the state itself. However, a committer replica, may end
// state transfer and get the scale command in the state, before it was caught by CRE. In this case, the command is
// handled by the reconfiguration state transfer callback (see concordbft/kvbc/include/st_reconfiguraion_sm.hpp). The
// two mechanisms are synchronized via the configurations list. Note that we are unable to synchronize them based on
// epoch number, because CRE is (at the moment) unaware to epochs. Epochs are shared via reserved pages which are
// getting updated at the end of state transfer.
class ScalingReplicaHandler : public IStateHandler {
public:
ScalingReplicaHandler() {}
Expand All @@ -100,7 +107,7 @@ class ScalingReplicaHandler : public IStateHandler {
std::stringstream stream;
stream << configurations_file.rdbuf();
std::string configs = stream.str();
return (configs.empty()) || (configs.find(command.config_descriptor) != std::string::npos);
return (configs.empty()) || (configs.find(command.config_descriptor) == std::string::npos);
}
}
return false;
Expand Down
28 changes: 5 additions & 23 deletions bftengine/src/bftengine/ReplicaForStateTransfer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,31 +74,13 @@ void ReplicaForStateTransfer::start() {
stateTransfer->setReconfigurationEngine(cre_);
stateTransfer->addOnTransferringCompleteCallback(
[this](std::uint64_t) {
// TODO - The next lines up to comment 'YYY' do not belong here (CRE) - consider refactor or move outside
if (!config_.isReadOnly) {
// At this point, we, if are not going to have another blocks in state transfer. So, we can safely stop CRE.
// if there is a reconfiguration state change that prevents us from starting another state transfer (i.e.
// scaling) then CRE probably won't work as well.
// 1. First, make sure we handled the most recent available updates.
concord::client::reconfiguration::PollBasedStateClient *pbc =
(concord::client::reconfiguration::PollBasedStateClient *)(cre_->getStateClient());
bool succ = false;
while (!succ) {
auto latestHandledUpdate = cre_->getLatestKnownUpdateBlock();
auto latestReconfUpdates = pbc->getStateUpdate(succ);
if (!succ) {
LOG_WARN(GL, "unable to get the latest reconfiguration updates");
}
for (const auto &update : latestReconfUpdates) {
if (update.blockid > latestHandledUpdate) {
succ = false;
break;
} // else if (!isGettingBlocks)
}
} // while (!succ) {
// CRE may not read all the relevant updates. This may be an issue only in a committer node, which do not run
// state transfer continually. If this is the case, we need to execute the the relevant reconfiguration
// commands, based on the state we gained during state transfer. In any such case, it is the developer
// responsibility to synchronize between the two mechanisms. For example, see the AddRemoveWithWedgeCommand
// handler in concordbft/kvbc/include/st_reconfiguraion_sm.hpp
LOG_INFO(GL, "halting cre");
// 2. Now we can safely halt cre. We know for sure that there are no update in the state transffered
// blocks that haven't been handled yet
cre_->halt();
}
},
Expand Down
29 changes: 27 additions & 2 deletions kvbc/src/st_reconfiguration_sm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,33 @@ bool StReconfigurationHandler::handle(const concord::messages::AddRemoveWithWedg
uint64_t bft_seq_num,
uint64_t current_cp_num,
uint64_t bid) {
return handleWedgeCommands(
command, bid, current_cp_num, bft_seq_num, command.bft_support, true, command.restart, command.restart);
// This callback should work together with the asyncCRE scaling handler. If the scale command has broken state
// transfer itself, we won't even get to that point, and the CRE is expected to handle this case. However, if we did
// manage to complete state transfer, CRE is halted and we need to execute the command based on the state we gained
// during state transfer. In order not to execute the command twice, we do check that this configuration was not
// already executed by reading the local configuration list.
std::ofstream configurations_file;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

configurations_file.open(bftEngine::ReplicaConfig::instance().configurationViewFilePath + "/" +
concord::reconfiguration::configurationsFileName + "." +
std::to_string(bftEngine::ReplicaConfig::instance().replicaId),
std::ios_base::app);
if (configurations_file.good()) {
std::stringstream stream;
stream << configurations_file.rdbuf();
std::string configs = stream.str();
if (configs.find(command.config_descriptor) != std::string::npos) {
LOG_INFO(GL, "the scale command was already executed by async CRE, we won't execute it again");
return false;
}
}
bool succ = true;
concord::messages::ReconfigurationResponse response;
for (auto &h : orig_reconf_handlers_) {
// If it was written to the blockchain, it means that this is a valid request.
// We do need to execute every relevant reconfiguration handler to complete the scale command.
succ &= h->handle(command, bft_seq_num, UINT32_MAX, std::nullopt, response);
}
return succ;
}

bool StReconfigurationHandler::handle(const concord::messages::RestartCommand &command,
Expand Down
1 change: 0 additions & 1 deletion tests/apollo/test_skvbc_restart_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,6 @@ async def test_recovering_of_primary_with_initiated_view_change(self, bft_networ

await bft_network.wait_for_replicas_to_reach_at_least_view(replicas_ids=bft_network.all_replicas(), expected_view=view, timeout=20 + timeouts)

@unittest.skip("Unstable")
@with_trio
@with_bft_network(start_replica_cmd, selected_configs=lambda n, f, c: c == 0, rotate_keys=True)
@verify_linearizability()
Expand Down