diff --git a/bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp b/bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp index 3afed4d5ab..c51fe6aab1 100644 --- a/bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp +++ b/bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp @@ -184,7 +184,8 @@ std::shared_ptr CreFactory::create( Config cre_config; cre_config.id_ = repConfig.replicaId; cre_config.interval_timeout_ms_ = 1000; - IStateClient* pbc = new PollBasedStateClient(bftClient, cre_config.interval_timeout_ms_, 0, cre_config.id_); + // TODO: fix relying on f + 1, so that byzantine replicas are also handled + IStateClient* pbc = new PollBasedStateClient(bftClient, cre_config.interval_timeout_ms_, 0, cre_config.id_, true); auto cre = std::make_shared(cre_config, pbc, std::make_shared()); if (bftEngine::ReplicaConfig::instance().isReadOnly) { diff --git a/bftengine/src/bftengine/ReplicaForStateTransfer.cpp b/bftengine/src/bftengine/ReplicaForStateTransfer.cpp index dea76c0e72..4d1c994aad 100644 --- a/bftengine/src/bftengine/ReplicaForStateTransfer.cpp +++ b/bftengine/src/bftengine/ReplicaForStateTransfer.cpp @@ -97,6 +97,7 @@ void ReplicaForStateTransfer::start() { // on other replicas but not on this one, finishing ST does not mean that missed key exchanges are executed) // This can be done by iterating the saved cryptosystems and updating their private key if their // public key matches the candidate saved in KeyExchangeManager + // TODO: persist the candidate CryptoManager::instance().onCheckpoint(checkpoint); auto [priv, pub] = KeyExchangeManager::instance().getCandidateKeyPair(); CryptoManager::instance().syncPrivateKeyAfterST(priv, pub); @@ -108,6 +109,8 @@ void ReplicaForStateTransfer::start() { auto *pbc = reinterpret_cast(cre_->getStateClient()); + // TODO: remove loop so that state transfer doesn't hang if it cannot complete reconfiguration requests + // The current implementation expects f + 1 identical responses bool succ = false; while (!succ) { auto latestHandledUpdate = cre_->getLatestKnownUpdateBlock(); diff --git a/bftengine/src/bftengine/ReplicaImp.cpp b/bftengine/src/bftengine/ReplicaImp.cpp index 6e2959ef02..72de4e28dd 100644 --- a/bftengine/src/bftengine/ReplicaImp.cpp +++ b/bftengine/src/bftengine/ReplicaImp.cpp @@ -2372,14 +2372,14 @@ void ReplicaImp::onMessage(std::unique_ptr message static uint32_t maxTimeSinceLastExecutionInMainWindowMs = config_.get("concord.bft.st.maxTimeSinceLastExecutionInMainWindowMs", 5000); - Time timeOfLastEcecution = MinTime; + Time timeOfLastExecution = MinTime; if (mainLog->insideActiveWindow(lastExecutedSeqNum)) - timeOfLastEcecution = mainLog->get(lastExecutedSeqNum).lastUpdateTimeOfCommitMsgs(); - if ((getMonotonicTime() - timeOfLastEcecution) > (milliseconds(maxTimeSinceLastExecutionInMainWindowMs))) { + timeOfLastExecution = mainLog->get(lastExecutedSeqNum).lastUpdateTimeOfCommitMsgs(); + if ((getMonotonicTime() - timeOfLastExecution) > (milliseconds(maxTimeSinceLastExecutionInMainWindowMs))) { LOG_INFO(GL, "Number of stable checkpoints in current window: " << numRelevant << " time since last execution: " - << (getMonotonicTime() - timeOfLastEcecution).count() << " ms"); + << (getMonotonicTime() - timeOfLastExecution).count() << " ms"); askForStateTransfer = true; startStReason = "Too much time has passed since last execution"; } @@ -2911,7 +2911,7 @@ void ReplicaImp::onMessage(std::unique_ptr message ViewNum maxKnownCorrectView = 0; ViewNum maxKnownAgreedView = 0; viewsManager->computeCorrectRelevantViewNumbers(&maxKnownCorrectView, &maxKnownAgreedView); - LOG_INFO(VC_LOG, "View Number details: " << KVLOG(maxKnownCorrectView, maxKnownAgreedView)); + LOG_INFO(VC_LOG, "View Number details: " << KVLOG(maxKnownCorrectView, maxKnownAgreedView, getCurrentView())); if (maxKnownCorrectView > getCurrentView()) { // we have at least f+1 view-changes with view number >= maxKnownCorrectView diff --git a/bftengine/src/bftengine/messages/ClientRequestMsg.cpp b/bftengine/src/bftengine/messages/ClientRequestMsg.cpp index 90d81d2ef2..8776cb043d 100644 --- a/bftengine/src/bftengine/messages/ClientRequestMsg.cpp +++ b/bftengine/src/bftengine/messages/ClientRequestMsg.cpp @@ -141,7 +141,7 @@ void ClientRequestMsg::validateImp(const ReplicasInfo& repInfo) const { (repInfo.isIdOfReplica(clientId) || repInfo.isIdOfPeerRoReplica(clientId))) { // Allow every reconfiguration/internal message from replicas (it will be verified in the reconfiguration handler) LOG_INFO(CNSUS, - "Reconfig/Internal replica message not validated" + "Reconfig/Internal replica message validation skipped" << KVLOG(clientId, header->flags & RECONFIG_FLAG, header->flags & INTERNAL_FLAG)); return; } diff --git a/client/reconfiguration/include/client/reconfiguration/poll_based_state_client.hpp b/client/reconfiguration/include/client/reconfiguration/poll_based_state_client.hpp index 672c5378c1..c19e81ad1b 100644 --- a/client/reconfiguration/include/client/reconfiguration/poll_based_state_client.hpp +++ b/client/reconfiguration/include/client/reconfiguration/poll_based_state_client.hpp @@ -29,7 +29,8 @@ class PollBasedStateClient : public IStateClient { PollBasedStateClient(bft::client::Client* client, uint64_t interval_timeout_ms, uint64_t last_known_block, - const uint16_t id_); + const uint16_t id_, + bool use_byzantine_quorum = false); State getNextState() const override; bool updateState(const WriteState& state) override; ~PollBasedStateClient(); @@ -63,6 +64,8 @@ class PollBasedStateClient : public IStateClient { bool halted_ = false; std::condition_variable resume_cond_; std::mutex resume_lock_; + // At the end of State transfer we use a f + 1 quorum + bool use_byzantine_quorum_ = false; }; } // namespace concord::client::reconfiguration \ No newline at end of file diff --git a/client/reconfiguration/src/poll_based_state_client.cpp b/client/reconfiguration/src/poll_based_state_client.cpp index 4b12a93353..598bcdd2b6 100644 --- a/client/reconfiguration/src/poll_based_state_client.cpp +++ b/client/reconfiguration/src/poll_based_state_client.cpp @@ -29,7 +29,14 @@ concord::messages::ReconfigurationResponse PollBasedStateClient::sendReconfigura concord::messages::ReconfigurationResponse rres; try { if (read_request) { - bft::client::ReadConfig read_config{request_config, bft::client::LinearizableQuorum{}}; + // TODO: State transfer can work with f + 1 as long as there are no byzantine replicas + bft::client::ReadConfig read_config; + if (use_byzantine_quorum_) { + read_config = bft::client::ReadConfig{request_config, bft::client::ByzantineSafeQuorum{}}; + } else { + read_config = bft::client::ReadConfig{request_config, bft::client::LinearizableQuorum{}}; + } + rep = bftclient_->send(read_config, std::move(msg)); } else { bft::client::WriteConfig write_config{request_config, bft::client::LinearizableQuorum{}}; @@ -57,12 +64,14 @@ State PollBasedStateClient::getNextState() const { PollBasedStateClient::PollBasedStateClient(bft::client::Client* client, uint64_t interval_timeout_ms, uint64_t last_known_block, - const uint16_t id) + const uint16_t id, + bool use_byzantine_quorum) : bftclient_{client}, id_{id}, interval_timeout_ms_{interval_timeout_ms}, last_known_block_{last_known_block}, - sn_gen_(bft::client::ClientId{id}) {} + sn_gen_(bft::client::ClientId{id}), + use_byzantine_quorum_{use_byzantine_quorum} {} std::vector PollBasedStateClient::getStateUpdate(bool& succ) const { concord::messages::ClientReconfigurationStateRequest creq{id_}; diff --git a/tests/apollo/test_skvbc_backup_restore.py b/tests/apollo/test_skvbc_backup_restore.py index 92976c33bc..8e14c7f20f 100644 --- a/tests/apollo/test_skvbc_backup_restore.py +++ b/tests/apollo/test_skvbc_backup_restore.py @@ -59,16 +59,19 @@ class SkvbcBackupRestoreTest(ApolloTest): @with_bft_network(start_replica_cmd, selected_configs=lambda n, f, c: n == 7) async def test_checkpoint_propagation_after_restarting_replicas(self, bft_network): """ - Here we trigger a checkpoint, restart all replicas in a random order with 5s delay in-between, - both while stopping and starting. We verify checkpoint persisted upon restart and then trigger + Here we trigger a checkpoint, restart all replicas. + We verify checkpoint persisted upon restart and then trigger another checkpoint. We make sure checkpoint is propagated to all the replicas. 1) Given a BFT network, we make sure all nodes are up 2) Send sufficient number of client requests to trigger checkpoint protocol - 3) Stop all replicas in a random order (with 5s delay in between) - 4) Start all replicas in a random order (with 5s delay in between) + 3) Stop all replicas in a random order + 4) Start all replicas in a random order 5) Make sure the initial view is stable 6) Send sufficient number of client requests to trigger another checkpoint 7) Make sure checkpoint propagates to all the replicas + + Note: UDP configuration waits for 5 seconds until it assumes network communication is established. + A replica can thus trigger a view change if it """ bft_network.start_all_replicas() skvbc = kvbc.SimpleKVBCProtocol(bft_network) @@ -87,20 +90,15 @@ async def test_checkpoint_propagation_after_restarting_replicas(self, bft_networ verify_checkpoint_persistency=False ) - # stop n replicas in a random order with a delay of 5s in between - stopped_replicas = await self._stop_random_replicas_with_delay(bft_network, delay=5, - exclude_replicas={current_primary}) - bft_network.stop_replica(current_primary) - # start stopped replicas in a random order with a delay of 5s in between - bft_network.start_replica(current_primary) - await self._start_random_replicas_with_delay(bft_network, stopped_replicas, delay=5) - + bft_network.stop_all_replicas() + bft_network.start_all_replicas() + stopped_replicas = bft_network.all_replicas() # verify checkpoint persistence log.log_message(message_type=f"Wait for replicas to reach checkpoint", checkpoint=checkpoint_before+1, - replicas=[current_primary] + list(stopped_replicas)) + replicas=stopped_replicas) await bft_network.wait_for_replicas_to_checkpoint( stopped_replicas, - expected_checkpoint_num=lambda ecn: ecn == checkpoint_before + 1) + expected_checkpoint_num=lambda ecn: ecn >= checkpoint_before + 1) # verify current view is stable for replica in bft_network.all_replicas(): diff --git a/tests/apollo/test_skvbc_checkpoints.py b/tests/apollo/test_skvbc_checkpoints.py index 41637ddb90..00cb092159 100644 --- a/tests/apollo/test_skvbc_checkpoints.py +++ b/tests/apollo/test_skvbc_checkpoints.py @@ -395,12 +395,11 @@ async def test_checkpoint_propagation_after_f_nodes_including_primary_isolated(s # Once the adversary is gone, the isolated replicas should be able reach the checkpoint await bft_network.wait_for_replicas_to_checkpoint( isolated_replicas, - expected_checkpoint_num=lambda ecn: ecn == checkpoint_before + 1) + expected_checkpoint_num=lambda ecn: ecn >= checkpoint_before + 1) @with_trio @with_bft_network(start_replica_cmd_with_corrupted_checkpoint_msgs(corrupt_checkpoints_from_replica_ids={ 1 }), selected_configs=lambda n, f, c: n == 7) - async def test_rvt_conflict_detection_after_corrupting_checkpoint_msg_for_single_replica(self, bft_network): await self._test_checkpointing_with_corruptions(bft_network, { 1 }) diff --git a/tests/apollo/util/bft.py b/tests/apollo/util/bft.py index 2e1369da48..c7e9585c6a 100644 --- a/tests/apollo/util/bft.py +++ b/tests/apollo/util/bft.py @@ -819,6 +819,7 @@ def monitor_replica_subproc(subproc: subprocess.Popen, replica_id: int, stop_eve f"return code = {return_code}" log_message(message_type=f"{error_msg}, aborting test", replica_log=stdout_file.name) stdout_file.write(f"####### FATAL ERROR: The process has crashed, subproc return value: {return_code}\n") + print(error_msg, file=sys.stderr) os.kill(os.getpid(), signal.SIGINT) break diff --git a/tests/apollo/util/bft_network_partitioning.py b/tests/apollo/util/bft_network_partitioning.py index 1a465a1656..8331958d91 100644 --- a/tests/apollo/util/bft_network_partitioning.py +++ b/tests/apollo/util/bft_network_partitioning.py @@ -18,6 +18,7 @@ from functools import partial sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "pyclient")) import bft_config +from util import eliot_logging as log class NetworkPartitioningAdversary(ABC): """Represents an adversary capable of inflicting network partitioning""" @@ -42,6 +43,7 @@ def __enter__(self): def __exit__(self, *args): """context manager method for 'with' statements""" self._remove_bft_network_rule_chain() + log.log_message(message_type=f"Interference terminated") @abstractmethod def interfere(self): @@ -236,6 +238,7 @@ def __init__(self, bft_network, replicas_to_isolate): super(ReplicaSubsetIsolatingAdversary, self).__init__(bft_network) def interfere(self): + log.log_message(message_type=f"Disabling replicas communication", replicas=self.replicas_to_isolate) other_replicas = set(self.bft_network.all_replicas()) - set(self.replicas_to_isolate) for ir in self.replicas_to_isolate: for r in other_replicas: diff --git a/tests/apollo/util/skvbc.py b/tests/apollo/util/skvbc.py index c524f5fdfb..6ff8a55b9b 100644 --- a/tests/apollo/util/skvbc.py +++ b/tests/apollo/util/skvbc.py @@ -343,17 +343,20 @@ async def assert_successful_put_get(self): reply = await client.write(self.write_req([], [(key, val)], 0)) reply = self.parse_reply(reply) assert reply.success - assert last_block + 1 == reply.last_block_id + last_block_after_write = reply.last_block_id + assert last_block_after_write > last_block, f'last_block_after_write: {last_block_after_write},' \ + f'last_block: {last_block}' # Retrieve the last block and ensure that it matches what's expected read_reply = await client.read(self.get_last_block_req()) newest_block = self.parse_reply(read_reply) - assert last_block + 1 == newest_block + assert newest_block >= last_block_after_write, f'newest_block: {newest_block}, ' \ + f'last_block_after_write: {last_block_after_write}' # Get the previous put value, and ensure it's correct read_req = self.read_req([key], newest_block) kvpairs = self.parse_reply(await client.read(read_req)) - assert {key: val} == kvpairs + assert {key: val} == kvpairs, '{}, {}'.format({key: val}, kvpairs) def _create_keys(self): """ diff --git a/tests/simpleKVBC/TesterReplica/internalCommandsHandler.cpp b/tests/simpleKVBC/TesterReplica/internalCommandsHandler.cpp index 66489fa5b7..686b276ddb 100644 --- a/tests/simpleKVBC/TesterReplica/internalCommandsHandler.cpp +++ b/tests/simpleKVBC/TesterReplica/internalCommandsHandler.cpp @@ -78,21 +78,39 @@ InternalCommandsHandler::InternalCommandsHandler(concord::kvbc::IReader *storage m_logger(logger), m_addAllKeysAsPublic{addAllKeysAsPublic}, m_kvbc{kvbc} { - st.addOnTransferringCompleteCallback([this](uint64_t) { - LOG_INFO(GL, "Synchronizing client execution state after state transfer"); - auto data = m_storage->getLatest(CLIENT_STATE_CAT_ID, {0x1}); - ConcordAssert(data.has_value()); - auto raw_json = std::get(data.value()).data; - nlohmann::json json2 = nlohmann::json::parse(raw_json); - nlohmann::json::json_serializer, void> serializer; - serializer.from_json(json2, m_clientToMaxExecutedReqId); - LOG_INFO(GL, "raw client state: " << KVLOG(raw_json)); - }); + if (ReplicaConfig::instance().isReadOnly) { + return; + } + + loadClientStateFromStorage(); + st.addOnTransferringCompleteCallback([this](uint64_t) { this->loadClientStateFromStorage(); }); + if (m_addAllKeysAsPublic) { ConcordAssertNE(m_kvbc, nullptr); } } +void InternalCommandsHandler::loadClientStateFromStorage() { + ConcordAssert(!ReplicaConfig::instance().isReadOnly); + LOG_INFO(GL, "Synchronizing client execution state"); + auto data = m_storage->getLatest(CLIENT_STATE_CAT_ID, {0x1}); + if (!data.has_value()) { + LOG_WARN(GL, "empty client execution state, were any client requests executed?"); + return; + } + auto raw_json = std::get(data.value()).data; + nlohmann::json json2 = nlohmann::json::parse(raw_json); + nlohmann::json::json_serializer>, void> serializer; + std::vector> deserialized; + serializer.from_json(json2, deserialized); + m_clientToMaxExecutedReqId.clear(); + + for (auto [clientId, reqId] : deserialized) { + m_clientToMaxExecutedReqId[clientId] = reqId; + } + LOG_INFO(GL, "raw client state: " << KVLOG(raw_json)); +} + void InternalCommandsHandler::add(std::string &&key, std::string &&value, VersionedUpdates &verUpdates, @@ -348,16 +366,26 @@ void InternalCommandsHandler::writeAccumulatedBlock(ExecutionRequestsQueue &bloc } } - nlohmann::json json; - nlohmann::json::json_serializer, void> serializer; - serializer.to_json(json, m_clientToMaxExecutedReqId); - VersionedUpdates clientStateUpdate; - clientStateUpdate.addUpdate({0x1}, json.dump()); + clientStateUpdate.addUpdate({0x1}, serializeClientState()); addBlock(verUpdates, merkleUpdates, clientStateUpdate, sn); } +std::string InternalCommandsHandler::serializeClientState() const { + nlohmann::json json; + nlohmann::json::json_serializer>, void> serializer; + // Need to maintain a fixed order in the blocks so that the replica state won't diverge + std::vector> serialized; + for (auto clientIdReqIdPair : m_clientToMaxExecutedReqId) { + serialized.push_back(clientIdReqIdPair); + } + serializer.to_json(json, serialized); + auto serialized_raw_json = json.dump(); + LOG_INFO(GL, KVLOG(serialized_raw_json)); + return json.dump(); +} + OperationResult InternalCommandsHandler::verifyWriteCommand(uint32_t requestSize, const uint8_t *request, size_t maxReplySize, @@ -543,12 +571,7 @@ OperationResult InternalCommandsHandler::executeWriteCommand(uint32_t requestSiz VersionedUpdates verUpdates; BlockMerkleUpdates merkleUpdates; VersionedUpdates clientVerUpdates; - nlohmann::json json; - nlohmann::json::json_serializer, void> serializer; - serializer.to_json(json, m_clientToMaxExecutedReqId); - auto dump = json.dump(); - LOG_INFO(GL, KVLOG(dump)); - clientVerUpdates.addUpdate({0x1}, json.dump()); + clientVerUpdates.addUpdate({0x1}, serializeClientState()); addKeys(write_req, sequenceNum, verUpdates, merkleUpdates); addBlock(verUpdates, merkleUpdates, clientVerUpdates, sequenceNum); } diff --git a/tests/simpleKVBC/TesterReplica/internalCommandsHandler.hpp b/tests/simpleKVBC/TesterReplica/internalCommandsHandler.hpp index 499b77d2b6..51b03d0395 100644 --- a/tests/simpleKVBC/TesterReplica/internalCommandsHandler.hpp +++ b/tests/simpleKVBC/TesterReplica/internalCommandsHandler.hpp @@ -22,7 +22,7 @@ #include "ControlStateManager.hpp" #include #include - +#include #include "log/logger.hpp" #include "skvbc_messages.cmf.hpp" #include "SharedTypes.hpp" @@ -150,6 +150,9 @@ class InternalCommandsHandler : public concord::kvbc::ICommandsHandler { concord::kvbc::categorization::VersionedUpdates &blockAccumulatedVerUpdates, concord::kvbc::categorization::BlockMerkleUpdates &blockAccumulatedMerkleUpdates) const; + std::string serializeClientState() const; + void loadClientStateFromStorage(); + private: concord::kvbc::IReader *m_storage; concord::kvbc::IBlockAdder *m_blockAdder; @@ -161,7 +164,7 @@ class InternalCommandsHandler : public concord::kvbc::ICommandsHandler { std::shared_ptr perfManager_; bool m_addAllKeysAsPublic{false}; // Add all key-values in the block merkle category as public ones. concord::kvbc::adapter::ReplicaBlockchain *m_kvbc{nullptr}; - std::unordered_map m_clientToMaxExecutedReqId; + std::map m_clientToMaxExecutedReqId; // This string is used by clients to distinguish blocks that should be ignored by them. // Some tests expect every block to be created by a request issued by test clients.