Skip to content

Commit

Permalink
Make replicas wait for quorum before publishing main key when starting
Browse files Browse the repository at this point in the history
Move message validation to ReplicaBase
Update RO replicas key state after ST via cre (same mechanism as
clients)
  • Loading branch information
WildFireFlum committed Feb 28, 2023
1 parent cf5caeb commit 2155259
Show file tree
Hide file tree
Showing 21 changed files with 153 additions and 142 deletions.
5 changes: 4 additions & 1 deletion bftengine/include/bftengine/KeyExchangeManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ class KeyExchangeManager {
void generateConsensusKeyAndSendInternalClientMsg(const SeqNum& sn);
// Send the current main public key of the replica to consensus
void sendMainPublicKey();

void waitForQuorum(const ReplicaImp* repImpInstance);

// Waits for a quorum and calls generateConsensusKeyAndSendInternalClientMsg
void waitForQuorumAndTriggerConsensusExchange(const ReplicaImp* repImpInstance, const SeqNum& = 0);
void waitForQuorumAndTriggerConsensusExchange(const ReplicaImp* repImpInstance, const SeqNum s = 0);
// The execution handler implementation that is called after a key exchange msg passed consensus.
// The new key pair will be used from two checkpoints after kemsg.generated_sn
std::string onKeyExchange(const KeyExchangeMsg& kemsg, const SeqNum& req_sn, const std::string& cid);
Expand Down
8 changes: 6 additions & 2 deletions bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ class ScalingReplicaHandler : public IStateHandler {
}
};

// TODO(yf): remove
class MainKeyUpdateHandler : public IStateHandler {
public:
MainKeyUpdateHandler() { LOG_INFO(getLogger(), "Created StateTransfer CRE replica main key update handler"); }
Expand Down Expand Up @@ -186,8 +187,11 @@ std::shared_ptr<ClientReconfigurationEngine> CreFactory::create(
IStateClient* pbc = new PollBasedStateClient(bftClient, cre_config.interval_timeout_ms_, 0, cre_config.id_);
auto cre =
std::make_shared<ClientReconfigurationEngine>(cre_config, pbc, std::make_shared<concordMetrics::Aggregator>());
if (!bftEngine::ReplicaConfig::instance().isReadOnly) cre->registerHandler(std::make_shared<ScalingReplicaHandler>());
cre->registerHandler(std::make_shared<MainKeyUpdateHandler>());
if (bftEngine::ReplicaConfig::instance().isReadOnly) {
cre->registerHandler(std::make_shared<MainKeyUpdateHandler>());
} else {
cre->registerHandler(std::make_shared<ScalingReplicaHandler>());
}
return cre;
}

Expand Down
14 changes: 10 additions & 4 deletions bftengine/src/bftengine/KeyExchangeManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,14 +355,20 @@ void KeyExchangeManager::loadClientPublicKey(const std::string& key,
if (saveToReservedPages) saveClientsPublicKeys(SigManager::instance()->getClientsPublicKeys());
}

void KeyExchangeManager::waitForQuorumAndTriggerConsensusExchange(const ReplicaImp* repImpInstance, const SeqNum& s) {
std::unique_lock<std::mutex> lock(startup_mutex_);
SCOPED_MDC(MDC_REPLICA_ID_KEY, std::to_string(ReplicaConfig::instance().replicaId));
if (!ReplicaConfig::instance().waitForFullCommOnStartup) {
void KeyExchangeManager::waitForQuorum(const ReplicaImp* repImpInstance) {
bool partialQuorum = !ReplicaConfig::instance().waitForFullCommOnStartup;
LOG_INFO(KEY_EX_LOG, "Waiting for quorum" << KVLOG(partialQuorum));
if (partialQuorum) {
waitForLiveQuorum(repImpInstance);
} else {
waitForFullCommunication();
}
}

void KeyExchangeManager::waitForQuorumAndTriggerConsensusExchange(const ReplicaImp* repImpInstance, const SeqNum s) {
std::unique_lock<std::mutex> lock(startup_mutex_);
SCOPED_MDC(MDC_REPLICA_ID_KEY, std::to_string(ReplicaConfig::instance().replicaId));
waitForQuorum(repImpInstance);

generateConsensusKeyAndSendInternalClientMsg(s);
metrics_->sent_key_exchange_on_start_status.Get().Set("True");
Expand Down
6 changes: 0 additions & 6 deletions bftengine/src/bftengine/ReadOnlyReplica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,6 @@ ReadOnlyReplica::ReadOnlyReplica(const ReplicaConfig &config,
msgHandlers_->registerMsgHandler(
MsgCode::StateTransfer,
std::bind(&ReadOnlyReplica::messageHandler<StateTransferMsg>, this, std::placeholders::_1));
msgHandlers_->registerMsgHandler(
MsgCode::StateTransfer,
std::bind(&ReadOnlyReplica::messageHandler<StateTransferMsg>, this, std::placeholders::_1));
msgHandlers_->registerMsgHandler(
MsgCode::StateTransfer,
std::bind(&ReadOnlyReplica::messageHandler<StateTransferMsg>, this, std::placeholders::_1));
metrics_.Register();

SigManager::init(config_.replicaId,
Expand Down
23 changes: 21 additions & 2 deletions bftengine/src/bftengine/ReplicaBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ namespace bftEngine::impl {
class MsgHandlersRegistrator;
class MsgsCommunicator;
class ReplicasInfo;
class CheckpointMsg;

using concordMetrics::GaugeHandle;
using concordMetrics::StatusHandle;
Expand Down Expand Up @@ -80,7 +81,25 @@ class ReplicaBase {

void sendRaw(MessageBase* m, NodeIdType dest);

bool validateMessage(MessageBase* msg) {
template <typename MessageType>
bool validateMessage(MessageType* msg) {
if (config_.debugStatisticsEnabled) {
DebugStatistics::onReceivedExMessage(msg->type());
}
try {
if constexpr (std::is_same_v<MessageType, CheckpointMsg>) {
msg->validate(*repsInfo, false);
} else {
msg->validate(*repsInfo);
}
return true;
} catch (std::exception& e) {
onReportAboutInvalidMessage(msg, e.what());
return false;
}
}

/*bool validateMessage(MessageBase* msg) {
try {
if (config_.debugStatisticsEnabled) DebugStatistics::onReceivedExMessage(msg->type());
Expand All @@ -90,7 +109,7 @@ class ReplicaBase {
onReportAboutInvalidMessage(msg, e.what());
return false;
}
}
}*/

protected:
static const uint16_t ALL_OTHER_REPLICAS = UINT16_MAX;
Expand Down
13 changes: 5 additions & 8 deletions bftengine/src/bftengine/ReplicaForStateTransfer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,23 +87,20 @@ void ReplicaForStateTransfer::start() {
if (!config_.isReadOnly) {
// Load the public keys of the other replicas from reserved pages
// so that their responses can be validated
cre_->halt();
KeyExchangeManager::instance().loadPublicKeys();

// Make sure to sign the reconfiguration client messages using the key
// other replicas expect
SigManager::instance()->setReplicaLastExecutedSeq(checkpoint * checkpointWindowSize);

// Need to update private key to match the loaded public key in case they differ (key exchange was executed
// on other replicas but not on this one, finishing ST does not mean that missed key exchanges are executed)
// This can be done by iterating the saved cryptosystems and updating their private key if their
// public key matches the candidate saved in KeyExchangeManager

// Clear old keys
CryptoManager::instance().onCheckpoint(checkpoint);
auto [priv, pub] = KeyExchangeManager::instance().getCandidateKeyPair();
CryptoManager::instance().syncPrivateKeyAfterST(priv, pub);

// Make sure to sign the reconfiguration client messages using the key
// other replicas expect
SigManager::instance()->setReplicaLastExecutedSeq(checkpoint * checkpointWindowSize);
cre_->resume();

// At this point, we, if are not going to have another blocks in state transfer. So, we can safely stop CRE.
// if there is a reconfiguration state change that prevents us from starting another state transfer (i.e.
// scaling) then CRE probably won't work as well.
Expand Down
19 changes: 1 addition & 18 deletions bftengine/src/bftengine/ReplicaImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,24 +260,6 @@ void ReplicaImp::validatedMessageHandler(CarrierMesssage *msg) {
}
}

template <typename MessageType>
bool ReplicaImp::validateMessage(MessageType *msg) {
if (config_.debugStatisticsEnabled) {
DebugStatistics::onReceivedExMessage(msg->type());
}
try {
if constexpr (std::is_same_v<MessageType, CheckpointMsg>) {
msg->validate(*repsInfo, false);
} else {
msg->validate(*repsInfo);
}
return true;
} catch (std::exception &e) {
onReportAboutInvalidMessage(msg, e.what());
return false;
}
}

/**
* asyncValidateMessage<T> This is a family of asynchronous message which just schedules
* the validation in a thread bag and returns peacefully. This will also translate the message
Expand Down Expand Up @@ -4663,6 +4645,7 @@ void ReplicaImp::start() {
// If key exchange is disabled, first publish the replica's main key to clients
if (ReplicaConfig::instance().singleSignatureScheme ||
ReplicaConfig::instance().publishReplicasMasterKeyOnStartup) {
KeyExchangeManager::instance().waitForQuorum(this);
KeyExchangeManager::instance().sendMainPublicKey();
}
}
Expand Down
2 changes: 1 addition & 1 deletion bftengine/src/bftengine/RequestHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ void RequestHandler::execute(IRequestsHandler::ExecutionRequestsQueue& requests,
} else {
// this replica has not reached stable seqNum yet to create snapshot at requested seqNum
// add a callback to be called when seqNum is stable. We need to create snapshot on stable
// seq num because checkpoint msg certificate is stored on stable seq num and is used for intergrity
// seq num because checkpoint msg certificate is stored on stable seq num and is used for integrity
// check of db snapshots
const auto& seqNumToCreateSanpshot = createDbChkPtMsg.seqNum;
DbCheckpointManager::instance().setCheckpointInProcess(true, *blockId);
Expand Down
3 changes: 2 additions & 1 deletion bftengine/src/bftengine/SigManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ bool SigManager::verifyOwnSignature(const concord::Byte* data,
const concord::Byte* expectedSignature) const {
std::vector<concord::Byte> sig(getMySigLength());
if (ReplicaConfig::instance().singleSignatureScheme) {
for (auto signer : CryptoManager::instance().getLatestSigners()) {
auto signers = CryptoManager::instance().getLatestSigners();
for (auto& signer : signers) {
signer->signBuffer(data, dataLength, sig.data());

if (std::memcmp(sig.data(), expectedSignature, getMySigLength()) == 0) {
Expand Down
1 change: 0 additions & 1 deletion bftengine/src/preprocessor/tests/preprocessor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ using namespace std;
using namespace bft::communication;
using namespace bftEngine;
using namespace preprocessor;
using concord::crypto::SignatureAlgorithm;

namespace {

Expand Down
1 change: 1 addition & 0 deletions kvbc/src/reconfiguration_kvbc_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ concord::messages::ClientStateReply KvbcClientReconfigurationHandler::buildRepli
} else if (command_type == std::string{kvbc::keyTypes::reconfiguration_rep_main_key}) {
concord::messages::ReplicaMainKeyUpdate cmd;
concord::messages::deserialize(data_buf, cmd);
cmd.seq_num -= (cmd.seq_num < 2 * checkpointWindowSize ? 0 : 2 * checkpointWindowSize);
creply.response = cmd;
}
auto epoch_data =
Expand Down
1 change: 0 additions & 1 deletion kvbc/tools/object_store_utility/integrity_checker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ using namespace std::placeholders;
using concordUtils::Status;
using bftEngine::bcst::impl::BCStateTran;
using kvbc::v1DirectKeyValue::S3StorageFactory;
using crypto::KeyFormat;

void IntegrityChecker::initKeysConfig(const fs::path& keys_file) {
LOG_DEBUG(logger_, keys_file);
Expand Down
17 changes: 8 additions & 9 deletions tests/apollo/test_skvbc_backup_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,16 @@ async def test_checkpoint_propagation_after_restarting_replicas(self, bft_networ
)

# stop n replicas in a random order with a delay of 5s in between
#bft_network.stop_all_replicas()
#stopped_replicas = bft_network.all_replicas()
stopped_replicas = await self._stop_random_replicas_with_delay(bft_network, delay=5)

stopped_replicas = await self._stop_random_replicas_with_delay(bft_network, delay=5,
exclude_replicas={current_primary})
bft_network.stop_replica(current_primary)
# start stopped replicas in a random order with a delay of 5s in between
await self._start_random_replicas_with_delay(bft_network, stopped_replicas, current_primary, delay=5)
#bft_network.start_all_replicas()
bft_network.start_replica(current_primary)
await self._start_random_replicas_with_delay(bft_network, stopped_replicas, delay=5)

# verify checkpoint persistence
log.log_message(message_type=f"Wait for replicas to reach checkpoint", checkpoint=checkpoint_before+1,
replicas=list(stopped_replicas))
replicas=[current_primary] + list(stopped_replicas))
await bft_network.wait_for_replicas_to_checkpoint(
stopped_replicas,
expected_checkpoint_num=lambda ecn: ecn == checkpoint_before + 1)
Expand Down Expand Up @@ -313,12 +312,12 @@ async def _stop_random_replicas_with_delay(bft_network, delay=10, exclude_replic
return list(all_replicas)

@staticmethod
async def _start_random_replicas_with_delay(bft_network, stopped_replicas, initial_primary,
async def _start_random_replicas_with_delay(bft_network, stopped_replicas, initial_primary=None,
f_replicas_stopped_early=None, delay=10):
random.shuffle(stopped_replicas)
if f_replicas_stopped_early:
stopped_replicas.extend(f_replicas_stopped_early)
if initial_primary not in stopped_replicas:
if initial_primary and initial_primary not in stopped_replicas:
stopped_replicas.append(initial_primary)
for replica in stopped_replicas:
log.log_message(message_type=f"starting replica: {replica}")
Expand Down
2 changes: 1 addition & 1 deletion tests/apollo/test_skvbc_checkpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ async def test_checkpoint_propagation_after_primary_isolation(self, bft_network)
self.assertEqual(current_view, expected_next_primary)

# send sufficient number of client requests to trigger checkpoint protocol
# verify checkpoint propagation to all the nodes except the the initial primary
# verify checkpoint propagation to all the nodes except the initial primary
await skvbc.fill_and_wait_for_checkpoint(
initial_nodes=bft_network.all_replicas(without={initial_primary}),
num_of_checkpoints_to_add=1,
Expand Down
Loading

0 comments on commit 2155259

Please sign in to comment.