diff --git a/bftengine/include/bftengine/KeyExchangeManager.hpp b/bftengine/include/bftengine/KeyExchangeManager.hpp index 7d54617856..f374e163aa 100644 --- a/bftengine/include/bftengine/KeyExchangeManager.hpp +++ b/bftengine/include/bftengine/KeyExchangeManager.hpp @@ -35,8 +35,11 @@ class KeyExchangeManager { void generateConsensusKeyAndSendInternalClientMsg(const SeqNum& sn); // Send the current main public key of the replica to consensus void sendMainPublicKey(); + + void waitForQuorum(const ReplicaImp* repImpInstance); + // Waits for a quorum and calls generateConsensusKeyAndSendInternalClientMsg - void waitForQuorumAndTriggerConsensusExchange(const ReplicaImp* repImpInstance, const SeqNum& = 0); + void waitForQuorumAndTriggerConsensusExchange(const ReplicaImp* repImpInstance, const SeqNum s = 0); // The execution handler implementation that is called after a key exchange msg passed consensus. // The new key pair will be used from two checkpoints after kemsg.generated_sn std::string onKeyExchange(const KeyExchangeMsg& kemsg, const SeqNum& req_sn, const std::string& cid); diff --git a/bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp b/bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp index ac5076b34a..3afed4d5ab 100644 --- a/bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp +++ b/bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp @@ -130,6 +130,7 @@ class ScalingReplicaHandler : public IStateHandler { } }; +// TODO(yf): remove class MainKeyUpdateHandler : public IStateHandler { public: MainKeyUpdateHandler() { LOG_INFO(getLogger(), "Created StateTransfer CRE replica main key update handler"); } @@ -186,8 +187,11 @@ std::shared_ptr CreFactory::create( IStateClient* pbc = new PollBasedStateClient(bftClient, cre_config.interval_timeout_ms_, 0, cre_config.id_); auto cre = std::make_shared(cre_config, pbc, std::make_shared()); - if (!bftEngine::ReplicaConfig::instance().isReadOnly) cre->registerHandler(std::make_shared()); - cre->registerHandler(std::make_shared()); + if (bftEngine::ReplicaConfig::instance().isReadOnly) { + cre->registerHandler(std::make_shared()); + } else { + cre->registerHandler(std::make_shared()); + } return cre; } diff --git a/bftengine/src/bftengine/KeyExchangeManager.cpp b/bftengine/src/bftengine/KeyExchangeManager.cpp index a91b996de3..77bca63157 100644 --- a/bftengine/src/bftengine/KeyExchangeManager.cpp +++ b/bftengine/src/bftengine/KeyExchangeManager.cpp @@ -355,14 +355,20 @@ void KeyExchangeManager::loadClientPublicKey(const std::string& key, if (saveToReservedPages) saveClientsPublicKeys(SigManager::instance()->getClientsPublicKeys()); } -void KeyExchangeManager::waitForQuorumAndTriggerConsensusExchange(const ReplicaImp* repImpInstance, const SeqNum& s) { - std::unique_lock lock(startup_mutex_); - SCOPED_MDC(MDC_REPLICA_ID_KEY, std::to_string(ReplicaConfig::instance().replicaId)); - if (!ReplicaConfig::instance().waitForFullCommOnStartup) { +void KeyExchangeManager::waitForQuorum(const ReplicaImp* repImpInstance) { + bool partialQuorum = !ReplicaConfig::instance().waitForFullCommOnStartup; + LOG_INFO(KEY_EX_LOG, "Waiting for quorum" << KVLOG(partialQuorum)); + if (partialQuorum) { waitForLiveQuorum(repImpInstance); } else { waitForFullCommunication(); } +} + +void KeyExchangeManager::waitForQuorumAndTriggerConsensusExchange(const ReplicaImp* repImpInstance, const SeqNum s) { + std::unique_lock lock(startup_mutex_); + SCOPED_MDC(MDC_REPLICA_ID_KEY, std::to_string(ReplicaConfig::instance().replicaId)); + waitForQuorum(repImpInstance); generateConsensusKeyAndSendInternalClientMsg(s); metrics_->sent_key_exchange_on_start_status.Get().Set("True"); diff --git a/bftengine/src/bftengine/ReadOnlyReplica.cpp b/bftengine/src/bftengine/ReadOnlyReplica.cpp index 1e36e441c5..82d234163f 100644 --- a/bftengine/src/bftengine/ReadOnlyReplica.cpp +++ b/bftengine/src/bftengine/ReadOnlyReplica.cpp @@ -60,12 +60,6 @@ ReadOnlyReplica::ReadOnlyReplica(const ReplicaConfig &config, msgHandlers_->registerMsgHandler( MsgCode::StateTransfer, std::bind(&ReadOnlyReplica::messageHandler, this, std::placeholders::_1)); - msgHandlers_->registerMsgHandler( - MsgCode::StateTransfer, - std::bind(&ReadOnlyReplica::messageHandler, this, std::placeholders::_1)); - msgHandlers_->registerMsgHandler( - MsgCode::StateTransfer, - std::bind(&ReadOnlyReplica::messageHandler, this, std::placeholders::_1)); metrics_.Register(); SigManager::init(config_.replicaId, diff --git a/bftengine/src/bftengine/ReplicaBase.hpp b/bftengine/src/bftengine/ReplicaBase.hpp index 7799fb4820..86cd8d0f81 100644 --- a/bftengine/src/bftengine/ReplicaBase.hpp +++ b/bftengine/src/bftengine/ReplicaBase.hpp @@ -26,6 +26,7 @@ namespace bftEngine::impl { class MsgHandlersRegistrator; class MsgsCommunicator; class ReplicasInfo; +class CheckpointMsg; using concordMetrics::GaugeHandle; using concordMetrics::StatusHandle; @@ -80,7 +81,25 @@ class ReplicaBase { void sendRaw(MessageBase* m, NodeIdType dest); - bool validateMessage(MessageBase* msg) { + template + bool validateMessage(MessageType* msg) { + if (config_.debugStatisticsEnabled) { + DebugStatistics::onReceivedExMessage(msg->type()); + } + try { + if constexpr (std::is_same_v) { + msg->validate(*repsInfo, false); + } else { + msg->validate(*repsInfo); + } + return true; + } catch (std::exception& e) { + onReportAboutInvalidMessage(msg, e.what()); + return false; + } + } + + /*bool validateMessage(MessageBase* msg) { try { if (config_.debugStatisticsEnabled) DebugStatistics::onReceivedExMessage(msg->type()); @@ -90,7 +109,7 @@ class ReplicaBase { onReportAboutInvalidMessage(msg, e.what()); return false; } - } + }*/ protected: static const uint16_t ALL_OTHER_REPLICAS = UINT16_MAX; diff --git a/bftengine/src/bftengine/ReplicaForStateTransfer.cpp b/bftengine/src/bftengine/ReplicaForStateTransfer.cpp index 7fe7ee2767..dea76c0e72 100644 --- a/bftengine/src/bftengine/ReplicaForStateTransfer.cpp +++ b/bftengine/src/bftengine/ReplicaForStateTransfer.cpp @@ -87,23 +87,20 @@ void ReplicaForStateTransfer::start() { if (!config_.isReadOnly) { // Load the public keys of the other replicas from reserved pages // so that their responses can be validated - cre_->halt(); KeyExchangeManager::instance().loadPublicKeys(); + + // Make sure to sign the reconfiguration client messages using the key + // other replicas expect + SigManager::instance()->setReplicaLastExecutedSeq(checkpoint * checkpointWindowSize); + // Need to update private key to match the loaded public key in case they differ (key exchange was executed // on other replicas but not on this one, finishing ST does not mean that missed key exchanges are executed) // This can be done by iterating the saved cryptosystems and updating their private key if their // public key matches the candidate saved in KeyExchangeManager - - // Clear old keys CryptoManager::instance().onCheckpoint(checkpoint); auto [priv, pub] = KeyExchangeManager::instance().getCandidateKeyPair(); CryptoManager::instance().syncPrivateKeyAfterST(priv, pub); - // Make sure to sign the reconfiguration client messages using the key - // other replicas expect - SigManager::instance()->setReplicaLastExecutedSeq(checkpoint * checkpointWindowSize); - cre_->resume(); - // At this point, we, if are not going to have another blocks in state transfer. So, we can safely stop CRE. // if there is a reconfiguration state change that prevents us from starting another state transfer (i.e. // scaling) then CRE probably won't work as well. diff --git a/bftengine/src/bftengine/ReplicaImp.cpp b/bftengine/src/bftengine/ReplicaImp.cpp index f67b95da83..5312d7d215 100644 --- a/bftengine/src/bftengine/ReplicaImp.cpp +++ b/bftengine/src/bftengine/ReplicaImp.cpp @@ -260,24 +260,6 @@ void ReplicaImp::validatedMessageHandler(CarrierMesssage *msg) { } } -template -bool ReplicaImp::validateMessage(MessageType *msg) { - if (config_.debugStatisticsEnabled) { - DebugStatistics::onReceivedExMessage(msg->type()); - } - try { - if constexpr (std::is_same_v) { - msg->validate(*repsInfo, false); - } else { - msg->validate(*repsInfo); - } - return true; - } catch (std::exception &e) { - onReportAboutInvalidMessage(msg, e.what()); - return false; - } -} - /** * asyncValidateMessage This is a family of asynchronous message which just schedules * the validation in a thread bag and returns peacefully. This will also translate the message @@ -4663,6 +4645,7 @@ void ReplicaImp::start() { // If key exchange is disabled, first publish the replica's main key to clients if (ReplicaConfig::instance().singleSignatureScheme || ReplicaConfig::instance().publishReplicasMasterKeyOnStartup) { + KeyExchangeManager::instance().waitForQuorum(this); KeyExchangeManager::instance().sendMainPublicKey(); } } diff --git a/bftengine/src/bftengine/RequestHandler.cpp b/bftengine/src/bftengine/RequestHandler.cpp index c4d759075f..3e0da5173b 100644 --- a/bftengine/src/bftengine/RequestHandler.cpp +++ b/bftengine/src/bftengine/RequestHandler.cpp @@ -161,7 +161,7 @@ void RequestHandler::execute(IRequestsHandler::ExecutionRequestsQueue& requests, } else { // this replica has not reached stable seqNum yet to create snapshot at requested seqNum // add a callback to be called when seqNum is stable. We need to create snapshot on stable - // seq num because checkpoint msg certificate is stored on stable seq num and is used for intergrity + // seq num because checkpoint msg certificate is stored on stable seq num and is used for integrity // check of db snapshots const auto& seqNumToCreateSanpshot = createDbChkPtMsg.seqNum; DbCheckpointManager::instance().setCheckpointInProcess(true, *blockId); diff --git a/bftengine/src/bftengine/SigManager.cpp b/bftengine/src/bftengine/SigManager.cpp index 2860008199..955e3d7444 100644 --- a/bftengine/src/bftengine/SigManager.cpp +++ b/bftengine/src/bftengine/SigManager.cpp @@ -325,7 +325,8 @@ bool SigManager::verifyOwnSignature(const concord::Byte* data, const concord::Byte* expectedSignature) const { std::vector sig(getMySigLength()); if (ReplicaConfig::instance().singleSignatureScheme) { - for (auto signer : CryptoManager::instance().getLatestSigners()) { + auto signers = CryptoManager::instance().getLatestSigners(); + for (auto& signer : signers) { signer->signBuffer(data, dataLength, sig.data()); if (std::memcmp(sig.data(), expectedSignature, getMySigLength()) == 0) { diff --git a/bftengine/src/preprocessor/tests/preprocessor_test.cpp b/bftengine/src/preprocessor/tests/preprocessor_test.cpp index 99194f80c3..0ede21dddd 100644 --- a/bftengine/src/preprocessor/tests/preprocessor_test.cpp +++ b/bftengine/src/preprocessor/tests/preprocessor_test.cpp @@ -36,7 +36,6 @@ using namespace std; using namespace bft::communication; using namespace bftEngine; using namespace preprocessor; -using concord::crypto::SignatureAlgorithm; namespace { diff --git a/kvbc/src/reconfiguration_kvbc_handler.cpp b/kvbc/src/reconfiguration_kvbc_handler.cpp index d08a05bacc..71c688c8af 100644 --- a/kvbc/src/reconfiguration_kvbc_handler.cpp +++ b/kvbc/src/reconfiguration_kvbc_handler.cpp @@ -443,6 +443,7 @@ concord::messages::ClientStateReply KvbcClientReconfigurationHandler::buildRepli } else if (command_type == std::string{kvbc::keyTypes::reconfiguration_rep_main_key}) { concord::messages::ReplicaMainKeyUpdate cmd; concord::messages::deserialize(data_buf, cmd); + cmd.seq_num -= (cmd.seq_num < 2 * checkpointWindowSize ? 0 : 2 * checkpointWindowSize); creply.response = cmd; } auto epoch_data = diff --git a/kvbc/tools/object_store_utility/integrity_checker.cpp b/kvbc/tools/object_store_utility/integrity_checker.cpp index 78d7f55bdf..1d4b7777ba 100644 --- a/kvbc/tools/object_store_utility/integrity_checker.cpp +++ b/kvbc/tools/object_store_utility/integrity_checker.cpp @@ -28,7 +28,6 @@ using namespace std::placeholders; using concordUtils::Status; using bftEngine::bcst::impl::BCStateTran; using kvbc::v1DirectKeyValue::S3StorageFactory; -using crypto::KeyFormat; void IntegrityChecker::initKeysConfig(const fs::path& keys_file) { LOG_DEBUG(logger_, keys_file); diff --git a/tests/apollo/test_skvbc_backup_restore.py b/tests/apollo/test_skvbc_backup_restore.py index a2b74e0c24..92976c33bc 100644 --- a/tests/apollo/test_skvbc_backup_restore.py +++ b/tests/apollo/test_skvbc_backup_restore.py @@ -88,17 +88,16 @@ async def test_checkpoint_propagation_after_restarting_replicas(self, bft_networ ) # stop n replicas in a random order with a delay of 5s in between - #bft_network.stop_all_replicas() - #stopped_replicas = bft_network.all_replicas() - stopped_replicas = await self._stop_random_replicas_with_delay(bft_network, delay=5) - + stopped_replicas = await self._stop_random_replicas_with_delay(bft_network, delay=5, + exclude_replicas={current_primary}) + bft_network.stop_replica(current_primary) # start stopped replicas in a random order with a delay of 5s in between - await self._start_random_replicas_with_delay(bft_network, stopped_replicas, current_primary, delay=5) - #bft_network.start_all_replicas() + bft_network.start_replica(current_primary) + await self._start_random_replicas_with_delay(bft_network, stopped_replicas, delay=5) # verify checkpoint persistence log.log_message(message_type=f"Wait for replicas to reach checkpoint", checkpoint=checkpoint_before+1, - replicas=list(stopped_replicas)) + replicas=[current_primary] + list(stopped_replicas)) await bft_network.wait_for_replicas_to_checkpoint( stopped_replicas, expected_checkpoint_num=lambda ecn: ecn == checkpoint_before + 1) @@ -313,12 +312,12 @@ async def _stop_random_replicas_with_delay(bft_network, delay=10, exclude_replic return list(all_replicas) @staticmethod - async def _start_random_replicas_with_delay(bft_network, stopped_replicas, initial_primary, + async def _start_random_replicas_with_delay(bft_network, stopped_replicas, initial_primary=None, f_replicas_stopped_early=None, delay=10): random.shuffle(stopped_replicas) if f_replicas_stopped_early: stopped_replicas.extend(f_replicas_stopped_early) - if initial_primary not in stopped_replicas: + if initial_primary and initial_primary not in stopped_replicas: stopped_replicas.append(initial_primary) for replica in stopped_replicas: log.log_message(message_type=f"starting replica: {replica}") diff --git a/tests/apollo/test_skvbc_checkpoints.py b/tests/apollo/test_skvbc_checkpoints.py index 2baef9f26b..41637ddb90 100644 --- a/tests/apollo/test_skvbc_checkpoints.py +++ b/tests/apollo/test_skvbc_checkpoints.py @@ -320,7 +320,7 @@ async def test_checkpoint_propagation_after_primary_isolation(self, bft_network) self.assertEqual(current_view, expected_next_primary) # send sufficient number of client requests to trigger checkpoint protocol - # verify checkpoint propagation to all the nodes except the the initial primary + # verify checkpoint propagation to all the nodes except the initial primary await skvbc.fill_and_wait_for_checkpoint( initial_nodes=bft_network.all_replicas(without={initial_primary}), num_of_checkpoints_to_add=1, diff --git a/tests/apollo/test_skvbc_dbsnapshot.py b/tests/apollo/test_skvbc_dbsnapshot.py index c7d84eacba..f4ee4bb67f 100644 --- a/tests/apollo/test_skvbc_dbsnapshot.py +++ b/tests/apollo/test_skvbc_dbsnapshot.py @@ -509,8 +509,7 @@ async def state_snapshot_req_existing_checkpoint(self, bft_network, tracker, exp await skvbc.send_kv_set(client, set(), [(key, value)], 0) await bft_network.wait_for_stable_checkpoint(bft_network.all_replicas(), stable_seqnum=DB_CHECKPOINT_HIGH_WIN_SIZE) - # Expect that a snapshot/checkpoint with an ID of 600 is available. For that, we assume that the snapshot/checkpoint ID - # is the last block ID at which the snapshot/checkpoint is created. + # Expect that a snapshot/checkpoint with an ID of 600 or greater is available. await bft_network.wait_for_created_db_snapshots_metric(bft_network.all_replicas(), 1) for replica_id in bft_network.all_replicas(): last_block_id = await bft_network.last_db_checkpoint_block_id(replica_id) @@ -523,11 +522,12 @@ async def state_snapshot_req_existing_checkpoint(self, bft_network, tracker, exp resp = cmf_msgs.ReconfigurationResponse.deserialize(rep)[0] self.assertTrue(resp.success) self.assertIsNotNone(resp.response.data) - self.assertGreaterEqual(resp.response.data.snapshot_id, DB_CHECKPOINT_HIGH_WIN_SIZE) + self.assertEqual(resp.response.data.snapshot_id, last_block_id) # TODO: add test for BlockchainHeightType.EventGroupId here (including support for it in TesterReplica). - self.assertGreaterEqual(resp.response.data.blockchain_height, DB_CHECKPOINT_HIGH_WIN_SIZE) + self.assertGreaterEqual(resp.response.data.blockchain_height, last_block_id) self.assertEqual(resp.response.data.blockchain_height_type, cmf_msgs.BlockchainHeightType.BlockId) - self.assertEqual(resp.response.data.key_value_count_estimate, expected_key_value_count_estimate) + # First blocks are main key updates + self.assertGreaterEqual(resp.response.data.key_value_count_estimate, expected_key_value_count_estimate - bft_network.config.n) @with_trio @with_bft_network(start_replica_cmd_with_operator_and_public_keys, selected_configs=lambda n, f, c: n == 7) @@ -564,13 +564,13 @@ async def state_snapshot_req_non_existent_checkpoint(self, bft_network, tracker, self.assertGreaterEqual(resp.response.data.blockchain_height, 100) # TODO: add test for BlockchainHeightType.EventGroupId here (including support for it in TesterReplica). self.assertEqual(resp.response.data.blockchain_height_type, cmf_msgs.BlockchainHeightType.BlockId) - self.assertEqual(resp.response.data.key_value_count_estimate, expected_key_value_count_estimate) + # First blocks are main key updates + self.assertGreaterEqual(resp.response.data.key_value_count_estimate, expected_key_value_count_estimate - bft_network.config.n) # Expect that a snapshot/checkpoint with an ID of 100 is available. For that, we assume that the snapshot/checkpoint ID # is the last block ID at which the snapshot/checkpoint is created. - last_block_id = 100 for replica_id in bft_network.all_replicas(): - await bft_network.wait_for_db_snapshot(replica_id, last_block_id) + await bft_network.wait_for_db_snapshot(replica_id, resp.response.data.snapshot_id) @with_trio @with_bft_network(start_replica_cmd_with_high_db_window_size, selected_configs=lambda n, f, c: n == 7) @@ -768,7 +768,7 @@ async def test_signed_public_state_hash_req_existing_checkpoint(self, bft_networ await bft_network.wait_for_db_snapshot(replica_id, last_block_id) op = operator.Operator(bft_network.config, client, bft_network.builddir) - ser_resp = await op.signed_public_state_hash_req(DB_CHECKPOINT_HIGH_WIN_SIZE) + ser_resp = await op.signed_public_state_hash_req(last_block_id) ser_rsis = op.get_rsi_replies() resp = cmf_msgs.ReconfigurationResponse.deserialize(ser_resp)[0] self.assertTrue(resp.success) @@ -777,8 +777,8 @@ async def test_signed_public_state_hash_req_existing_checkpoint(self, bft_networ for ser_rsi in ser_rsis.values(): rsi_resp = cmf_msgs.ReconfigurationResponse.deserialize(ser_rsi)[0] self.assertEqual(rsi_resp.response.status, cmf_msgs.SnapshotResponseStatus.Success) - self.assertGreaterEqual(rsi_resp.response.data.snapshot_id, DB_CHECKPOINT_HIGH_WIN_SIZE) - self.assertGreaterEqual(rsi_resp.response.data.block_id, DB_CHECKPOINT_HIGH_WIN_SIZE) + self.assertEqual(rsi_resp.response.data.snapshot_id, last_block_id) + self.assertEqual(rsi_resp.response.data.block_id, last_block_id) # Expect the SHA3-256 hash of the empty string. empty_string_sha3_256 = bytes.fromhex("a7ffc6f8bf1ed76651c14756a061d662f580ff4de43b49fa82d80a4b80f8434a") self.assertEqual(bytearray(rsi_resp.response.data.hash), empty_string_sha3_256) @@ -828,7 +828,7 @@ async def test_signed_state_snapshot_read_as_of_req_without_public_keys(self, bf op = operator.Operator(bft_network.config, client, bft_network.builddir) # Try to read two of the keys that we wrote. We shouldn't be able to get them, though, because they are not public. - ser_resp = await op.state_snapshot_read_as_of_req(DB_CHECKPOINT_HIGH_WIN_SIZE, [kvs[0][0], kvs[1][0]]) + ser_resp = await op.state_snapshot_read_as_of_req(last_block_id, [kvs[0][0], kvs[1][0]]) ser_rsis = op.get_rsi_replies() resp = cmf_msgs.ReconfigurationResponse.deserialize(ser_resp)[0] self.assertTrue(resp.success) @@ -864,7 +864,7 @@ async def test_signed_state_snapshot_read_as_of_req_with_public_keys(self, bft_n op = operator.Operator(bft_network.config, client, bft_network.builddir) # Read two of the keys that we wrote. - ser_resp = await op.state_snapshot_read_as_of_req(DB_CHECKPOINT_HIGH_WIN_SIZE, [kvs[0][0], kvs[1][0]]) + ser_resp = await op.state_snapshot_read_as_of_req(last_block_id, [kvs[0][0], kvs[1][0]]) ser_rsis = op.get_rsi_replies() resp = cmf_msgs.ReconfigurationResponse.deserialize(ser_resp)[0] self.assertTrue(resp.success) @@ -882,10 +882,7 @@ async def test_signed_state_snapshot_read_as_of_req_invalid_key_with_public_keys bft_network.start_all_replicas() client = bft_network.random_client() skvbc = kvbc.SimpleKVBCProtocol(bft_network, tracker) - for i in range(DB_CHECKPOINT_HIGH_WIN_SIZE): - key = skvbc.unique_random_key() - value = skvbc.random_value() - await skvbc.send_kv_set(client, set(), [(key, value)], 0) + await skvbc.send_n_kvs_sequentially(DB_CHECKPOINT_HIGH_WIN_SIZE) await bft_network.wait_for_stable_checkpoint(bft_network.all_replicas(), stable_seqnum=DB_CHECKPOINT_HIGH_WIN_SIZE) # Expect that a snapshot/checkpoint with an ID of 600 is available. For that, we assume that the snapshot/checkpoint ID @@ -898,7 +895,7 @@ async def test_signed_state_snapshot_read_as_of_req_invalid_key_with_public_keys op = operator.Operator(bft_network.config, client, bft_network.builddir) # Read two keys that we haven't written. - ser_resp = await op.state_snapshot_read_as_of_req(DB_CHECKPOINT_HIGH_WIN_SIZE, + ser_resp = await op.state_snapshot_read_as_of_req(last_block_id, [skvbc.unique_random_key().decode(), skvbc.unique_random_key().decode()]) ser_rsis = op.get_rsi_replies() diff --git a/tests/apollo/test_skvbc_persistence.py b/tests/apollo/test_skvbc_persistence.py index d01ac66989..9c64d16815 100644 --- a/tests/apollo/test_skvbc_persistence.py +++ b/tests/apollo/test_skvbc_persistence.py @@ -345,7 +345,7 @@ async def test_st_when_fetcher_and_sender_crash(self, bft_network, tracker): client, known_key, known_val = \ await skvbc.prime_for_state_transfer(stale_nodes={stale_node}, - checkpoints_num=random.randint(10, 13)) + checkpoints_num=10) # exclude the primary and the stale node non_primary_replicas = bft_network.all_replicas(without={0, stale_node}) diff --git a/tests/apollo/test_skvbc_reconfiguration.py b/tests/apollo/test_skvbc_reconfiguration.py index b7e5d99294..fd641058b0 100644 --- a/tests/apollo/test_skvbc_reconfiguration.py +++ b/tests/apollo/test_skvbc_reconfiguration.py @@ -552,7 +552,7 @@ async def test_tls_exchange_replicas_replicas_with_ror(self, bft_network): bft_network.restart_clients(False, False) skvbc = kvbc.SimpleKVBCProtocol(bft_network) # Make sure that the read only replica is able to complete another state transfer - for i in range(500): # Produce 500 new blocks + for i in range(300): await skvbc.send_write_kv_set() await self._wait_for_st(bft_network, ro_replica_id, 600) @@ -1959,17 +1959,17 @@ async def try_to_unwedge(self, bft_network, bft, restart, quorum=None): except trio.TooSlowError: pass - async def wait_for_network_to_stop_after_wedge(self, stop_condition, client, op, quorum, fullWedge): - stopped_replicas = set() - while len(stopped_replicas) < stop_condition: + async def wait_for_network_wedge_status(self, stop_condition, client, op, quorum, fullWedge, waitForStop): + replica_set = set() + while len(replica_set) < stop_condition: await op.wedge_status(quorum=quorum, fullWedge=fullWedge) rsi_rep = client.get_rsi_replies() for replica_id, r in rsi_rep.items(): res = cmf_msgs.ReconfigurationResponse.deserialize(r) status = res[0].response.stopped - if status and replica_id not in stopped_replicas: - stopped_replicas.add(replica_id) - log.log_message(message_type=f"Replica stopped after wedge", replica_id=replica_id) + if status == waitForStop and replica_id not in replica_set: + replica_set.add(replica_id) + log.log_message(message_type=f"Replica {'stopped' if waitForStop else 'started'}", replica_id=replica_id) await trio.sleep(1) async def validate_stop_on_wedge_point(self, bft_network, skvbc, fullWedge=False): @@ -1980,21 +1980,21 @@ async def validate_stop_on_wedge_point(self, bft_network, skvbc, fullWedge=False op = operator.Operator(bft_network.config, client, bft_network.builddir) quorum = None if fullWedge is True else bft_client.MofNQuorum.LinearizableQuorum(bft_network.config, [r.id for r in bft_network.replicas]) stop_condition = bft_network.config.n if fullWedge is True else (bft_network.config.n - bft_network.config.f) - await self.wait_for_network_to_stop_after_wedge(stop_condition, client, op, quorum, fullWedge) + await self.wait_for_network_wedge_status(stop_condition, client, op, quorum, fullWedge, waitForStop=True) with log.start_action(action_type='expect_kv_failure_due_to_wedge'): with self.assertRaises(trio.TooSlowError): await skvbc.send_write_kv_set() async def validate_start_on_unwedge(self, bft_network, skvbc, fullWedge=False): - with log.start_action(action_type="validate_start_on_unwedge") as action: + with log.start_action(action_type="validate_start_on_unwedge", fullWedge=fullWedge) as action: with trio.fail_after(seconds=90): client = bft_network.random_client() client.config._replace(req_timeout_milli=10000) op = operator.Operator(bft_network.config, client, bft_network.builddir) quorum = None if fullWedge is True else bft_client.MofNQuorum.LinearizableQuorum(bft_network.config, [r.id for r in bft_network.replicas]) stop_condition = bft_network.config.n if fullWedge is True else (bft_network.config.n - bft_network.config.f) - await self.wait_for_network_to_stop_after_wedge(stop_condition, client, op, quorum, fullWedge) + await self.wait_for_network_wedge_status(stop_condition, client, op, quorum, fullWedge, waitForStop=False) async def validate_stop_on_super_stable_checkpoint(self, bft_network, skvbc): with log.start_action(action_type="validate_stop_on_super_stable_checkpoint") as action: @@ -2025,15 +2025,14 @@ async def validate_stop_on_super_stable_checkpoint(self, bft_network, skvbc): async def verify_replicas_are_in_wedged_checkpoint(self, bft_network, previous_checkpoint, replicas): with log.start_action(action_type="verify_replicas_are_in_wedged_checkpoint", previous_checkpoint=previous_checkpoint): for replica_id in replicas: - with log.start_action(action_type="verify_replica", replica=replica_id): - with trio.fail_after(seconds=60): - while True: - with trio.move_on_after(seconds=1): - checkpoint_after = await bft_network.wait_for_checkpoint(replica_id=replica_id) - if checkpoint_after == previous_checkpoint + 2: - break - else: - await trio.sleep(1) + with trio.fail_after(seconds=60), log.start_action(action_type="verify_replicas_are_in_wedged_checkpoint", replica=replica_id): + while True: + with trio.move_on_after(seconds=1): + checkpoint_after = await bft_network.wait_for_checkpoint(replica_id=replica_id) + if checkpoint_after == previous_checkpoint + 2: + break + else: + await trio.sleep(1) async def verify_last_executed_seq_num(self, bft_network, previous_checkpoint): expectedSeqNum = (previous_checkpoint + 2) * 150 diff --git a/tests/apollo/test_skvbc_ro_replica.py b/tests/apollo/test_skvbc_ro_replica.py index 039dc04029..58f81979d1 100644 --- a/tests/apollo/test_skvbc_ro_replica.py +++ b/tests/apollo/test_skvbc_ro_replica.py @@ -272,11 +272,8 @@ async def test_ro_replica_state_fetch_after_exchanges(self, bft_network): Start ROR Verify ROR fetches state """ - print("hi1") bft_network.start_all_replicas() - print("hi2") skvbc = kvbc.SimpleKVBCProtocol(bft_network) - print("hi3") await skvbc.fill_and_wait_for_checkpoint( initial_nodes=bft_network.all_replicas(), diff --git a/tests/apollo/util/bft.py b/tests/apollo/util/bft.py index 0d96604742..2e1369da48 100644 --- a/tests/apollo/util/bft.py +++ b/tests/apollo/util/bft.py @@ -35,6 +35,7 @@ import trio from util.test_base import repeat_test +from util.consts import CHECKPOINT_SEQUENCES sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "pyclient")) @@ -494,7 +495,6 @@ async def change_configuration(self, config, generate_tls=False, use_unified_cer # Generate certificates for replicas, clients, and reserved clients self.generate_tls_certs(self.num_total_replicas() + config.num_clients + RESERVED_CLIENTS_QUOTA + generate_cre, use_unified_certs=use_unified_certs) - @log_call def restart_clients(self, generate_tx_signing_keys=True, restart_replicas=True): with log.start_action(action_type="restart_clients", generate_tx_signing_keys=generate_tx_signing_keys, restart_replicas=restart_replicas): @@ -1304,7 +1304,15 @@ async def wait_for_fetching_state(self, replica_id): """ with log.start_action(action_type="wait_for_fetching_state", replica=replica_id) as action: async def replica_to_be_in_fetching_state(): - is_fetching = await self.is_fetching(replica_id) + has_metric = False + while not has_metric: + try: + is_fetching = await self.is_fetching(replica_id) + has_metric = True + except KeyError: + # If a replica was down, the metric server might be up prior to the replica + # registering its state transfer related metrics + pass source_replica_id = await self.source_replica(replica_id) if is_fetching: action.add_success_fields(source_replica_id=source_replica_id) @@ -1340,7 +1348,8 @@ async def wait_for_state_transfer_to_start(self, replica_ids: Optional[Sequence[ nursery.cancel_scope) async def wait_for_replicas_to_collect_stable_checkpoint(self, replicas, checkpoint, timeout=30): - with log.start_action(action_type="wait_for_replicas_to_collect_stable_checkpoint", replicas=replicas) as action: + with log.start_action(action_type="wait_for_replicas_to_collect_stable_checkpoint", replicas=replicas, + checkpoint=checkpoint) as action: with trio.fail_after(seconds=timeout): last_stable_seqs = [] while True: @@ -1349,7 +1358,7 @@ async def wait_for_replicas_to_collect_stable_checkpoint(self, replicas, checkpo last_stable_seqs.append(last_stable) action.log(message_type="lastStableSeqNum", replica=replica_id, last_stable=last_stable) assert checkpoint >= last_stable / 150, "Probably got wrong checkpoint as input" - if sum(x == 150 * checkpoint for x in last_stable_seqs) == len(replicas): + if sum(x >= 150 * checkpoint for x in last_stable_seqs) == len(replicas): break else: last_stable_seqs.clear() @@ -1388,26 +1397,30 @@ async def wait_for_state_transfer_to_stop(self, up_to_date_node: int, stale_node with trio.move_on_after(.5): # seconds metrics = await self.metrics.get_all(stale_node) try: - n = self.metrics.get_local(metrics, *key) + stale_node_metric = self.metrics.get_local(metrics, *key) except KeyError: # ignore - the metric will eventually become available await trio.sleep(0.1) else: # Debugging - if n != last_n: - last_n = n + if stale_node_metric != last_n: + last_n = stale_node_metric last_stored_checkpoint = self.metrics.get_local(metrics, 'bc_state_transfer', 'Gauges', 'last_stored_checkpoint') on_transferring_complete = self.metrics.get_local(metrics, 'bc_state_transfer', 'Counters', 'on_transferring_complete') action.log(message_type="Not complete yet", - seq_num=n, expected_seq_num=expected_seq_num, + seq_num=stale_node_metric, expected_seq_num=expected_seq_num, last_stored_checkpoint=last_stored_checkpoint, on_transferring_complete=on_transferring_complete) - - # Exit condition - if n >= expected_seq_num: - action.add_success_fields(n=n, expected_seq_num=expected_seq_num) + log.log_message(message_type="Not complete yet", + seq_num=stale_node_metric, expected_seq_num=expected_seq_num, + last_stored_checkpoint=last_stored_checkpoint, + on_transferring_complete=on_transferring_complete) + + # Exit condition - make sure that same checkpoint as live replica is reached + if (stale_node_metric // CHECKPOINT_SEQUENCES) == (expected_seq_num // CHECKPOINT_SEQUENCES): + action.add_success_fields(n=stale_node_metric, expected_seq_num=expected_seq_num) return await trio.sleep(0.5) @@ -1518,11 +1531,10 @@ async def wait_for_checkpoint(self, replica_id, expected_checkpoint_num=None): async def expected_checkpoint_to_be_reached(): key = ['bc_state_transfer', 'Gauges', 'last_stored_checkpoint'] - last_stored_checkpoint = await self.retrieve_metric(replica_id, *key) - action.log(message_type=f'[checkpoint] #{last_stored_checkpoint}, replica=#{replica_id}') if last_stored_checkpoint is not None and expected_checkpoint_num(last_stored_checkpoint): + action.log(message_type=f'[checkpoint] #{last_stored_checkpoint} reached by replica=#{replica_id}') action.add_success_fields(last_stored_checkpoint=last_stored_checkpoint) return last_stored_checkpoint @@ -1697,18 +1709,16 @@ async def wait_for(self, task, timeout, interval): returns None before interval expires. This only matters in that it uses more CPU. """ - with log.start_action(action_type="wait_for"): - with trio.fail_after(timeout): - while True: - with trio.move_on_after(interval): - if inspect.iscoroutinefunction(task): - result = await task() - if result is not None: - return result - else: - await trio.sleep(0.1) - else: - raise TypeError + assert inspect.iscoroutinefunction(task) + with trio.fail_after(timeout): + while True: + with trio.move_on_after(interval): + result = await task() + if result is not None: + return result + else: + await trio.sleep(0.1) + async def retrieve_metric(self, replica_id, component_name, type_, key): try: @@ -1990,7 +2000,8 @@ def remove_metadata(self, replica_id): async def wait_for_stable_checkpoint(self, replicas, stable_seqnum): - with trio.fail_after(seconds=30): + with trio.fail_after(seconds=30), log.start_action(action_type="wait_for_stable_checkpoint", + replicas=replicas, stable_seqnum=stable_seqnum): all_in_checkpoint = False while all_in_checkpoint is False: all_in_checkpoint = True @@ -2018,22 +2029,22 @@ async def wait_for_created_db_snapshots_metric(self, replicas, expected_num_of_c break def db_snapshot_exists(self, replica_id, snapshot_id=None): - with log.start_action(action_type="db_snapshot_exists()"): - snapshot_db_dir = os.path.join( - self.testdir, DB_SNAPSHOT_PREFIX + str(replica_id)) - if snapshot_id is not None: - snapshot_db_dir = os.path.join(snapshot_db_dir, str(snapshot_id)) - if not os.path.exists(snapshot_db_dir): - return False - - # Make sure that checkpoint folder is not empty. - size = 0 - for element in os.scandir(snapshot_db_dir): - size += os.path.getsize(element) - return (size > 0) + snapshot_db_dir = os.path.join( + self.testdir, DB_SNAPSHOT_PREFIX + str(replica_id)) + if snapshot_id is not None: + snapshot_db_dir = os.path.join(snapshot_db_dir, str(snapshot_id)) + if not os.path.exists(snapshot_db_dir): + return False + + # Make sure that checkpoint folder is not empty. + size = 0 + for element in os.scandir(snapshot_db_dir): + size += os.path.getsize(element) + return (size > 0) async def wait_for_db_snapshot(self, replica_id, snapshot_id=None): - with trio.fail_after(seconds=30): + with trio.fail_after(seconds=30), log.start_action(action_type="wait_for_db_snapshot", replica_id=replica_id, + snapshot_id=snapshot_id): while True: if self.db_snapshot_exists(replica_id, snapshot_id) == True: break diff --git a/tests/apollo/util/skvbc.py b/tests/apollo/util/skvbc.py index fdfad79048..c524f5fdfb 100644 --- a/tests/apollo/util/skvbc.py +++ b/tests/apollo/util/skvbc.py @@ -295,10 +295,11 @@ async def fill_and_wait_for_checkpoint( await self.network_wait_for_checkpoint( initial_nodes, - expected_checkpoint_num=lambda ecn: ecn == checkpoint_before + num_of_checkpoints_to_add, + expected_checkpoint_num=lambda ecn: ecn >= checkpoint_before + num_of_checkpoints_to_add, verify_checkpoint_persistency=verify_checkpoint_persistency, assert_state_transfer_not_started=assert_state_transfer_not_started, timeout=timeout) + async def network_wait_for_checkpoint( self, initial_nodes, expected_checkpoint_num=lambda ecn: ecn == 2, diff --git a/tests/simpleKVBC/TesterReplica/main.cpp b/tests/simpleKVBC/TesterReplica/main.cpp index 83f9c01cc7..30d27eeb16 100644 --- a/tests/simpleKVBC/TesterReplica/main.cpp +++ b/tests/simpleKVBC/TesterReplica/main.cpp @@ -118,6 +118,10 @@ void run_replica(int argc, char** argv) { MDC_PUT(MDC_REPLICA_ID_KEY, std::to_string(setup->GetReplicaConfig().replicaId)); MDC_PUT(MDC_THREAD_KEY, "main"); + // Start metrics server before the creation of the replica so that we handle the case where + // a replica waits for an unresponsive primary on startup but still updates state-transfer related metrics + setup->GetMetricsServer().Start(); + replica = std::make_shared( setup->GetCommunication(), setup->GetReplicaConfig(), @@ -167,10 +171,7 @@ void run_replica(int argc, char** argv) { // Setup a test cron table, if requested in configuration. cronSetup(*setup, *replica); - // Start metrics server after creation of the replica so that we ensure - // registration of metrics from the replica with the aggregator and don't - // return empty metrics from the metrics server. - setup->GetMetricsServer().Start(); + while (replica->isRunning()) { if (timeToExit) { setup->GetMetricsServer().Stop();