Skip to content

Commit

Permalink
Fix more errors
Browse files Browse the repository at this point in the history
  • Loading branch information
WildFireFlum committed Feb 28, 2023
1 parent c45052c commit 98f5f3b
Show file tree
Hide file tree
Showing 13 changed files with 99 additions and 53 deletions.
3 changes: 2 additions & 1 deletion bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ std::shared_ptr<ClientReconfigurationEngine> CreFactory::create(
Config cre_config;
cre_config.id_ = repConfig.replicaId;
cre_config.interval_timeout_ms_ = 1000;
IStateClient* pbc = new PollBasedStateClient(bftClient, cre_config.interval_timeout_ms_, 0, cre_config.id_);
// TODO: fix relying on f + 1, so that byzantine replicas are also handled
IStateClient* pbc = new PollBasedStateClient(bftClient, cre_config.interval_timeout_ms_, 0, cre_config.id_, true);
auto cre =
std::make_shared<ClientReconfigurationEngine>(cre_config, pbc, std::make_shared<concordMetrics::Aggregator>());
if (bftEngine::ReplicaConfig::instance().isReadOnly) {
Expand Down
3 changes: 3 additions & 0 deletions bftengine/src/bftengine/ReplicaForStateTransfer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ void ReplicaForStateTransfer::start() {
// on other replicas but not on this one, finishing ST does not mean that missed key exchanges are executed)
// This can be done by iterating the saved cryptosystems and updating their private key if their
// public key matches the candidate saved in KeyExchangeManager
// TODO: persist the candidate
CryptoManager::instance().onCheckpoint(checkpoint);
auto [priv, pub] = KeyExchangeManager::instance().getCandidateKeyPair();
CryptoManager::instance().syncPrivateKeyAfterST(priv, pub);
Expand All @@ -108,6 +109,8 @@ void ReplicaForStateTransfer::start() {
auto *pbc =
reinterpret_cast<concord::client::reconfiguration::PollBasedStateClient *>(cre_->getStateClient());

// TODO: remove loop so that state transfer doesn't hang if it cannot complete reconfiguration requests
// The current implementation expects f + 1 identical responses
bool succ = false;
while (!succ) {
auto latestHandledUpdate = cre_->getLatestKnownUpdateBlock();
Expand Down
10 changes: 5 additions & 5 deletions bftengine/src/bftengine/ReplicaImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2372,14 +2372,14 @@ void ReplicaImp::onMessage<CheckpointMsg>(std::unique_ptr<CheckpointMsg> message
static uint32_t maxTimeSinceLastExecutionInMainWindowMs =
config_.get<uint32_t>("concord.bft.st.maxTimeSinceLastExecutionInMainWindowMs", 5000);

Time timeOfLastEcecution = MinTime;
Time timeOfLastExecution = MinTime;
if (mainLog->insideActiveWindow(lastExecutedSeqNum))
timeOfLastEcecution = mainLog->get(lastExecutedSeqNum).lastUpdateTimeOfCommitMsgs();
if ((getMonotonicTime() - timeOfLastEcecution) > (milliseconds(maxTimeSinceLastExecutionInMainWindowMs))) {
timeOfLastExecution = mainLog->get(lastExecutedSeqNum).lastUpdateTimeOfCommitMsgs();
if ((getMonotonicTime() - timeOfLastExecution) > (milliseconds(maxTimeSinceLastExecutionInMainWindowMs))) {
LOG_INFO(GL,
"Number of stable checkpoints in current window: "
<< numRelevant << " time since last execution: "
<< (getMonotonicTime() - timeOfLastEcecution).count() << " ms");
<< (getMonotonicTime() - timeOfLastExecution).count() << " ms");
askForStateTransfer = true;
startStReason = "Too much time has passed since last execution";
}
Expand Down Expand Up @@ -2911,7 +2911,7 @@ void ReplicaImp::onMessage<ViewChangeMsg>(std::unique_ptr<ViewChangeMsg> message
ViewNum maxKnownCorrectView = 0;
ViewNum maxKnownAgreedView = 0;
viewsManager->computeCorrectRelevantViewNumbers(&maxKnownCorrectView, &maxKnownAgreedView);
LOG_INFO(VC_LOG, "View Number details: " << KVLOG(maxKnownCorrectView, maxKnownAgreedView));
LOG_INFO(VC_LOG, "View Number details: " << KVLOG(maxKnownCorrectView, maxKnownAgreedView, getCurrentView()));

if (maxKnownCorrectView > getCurrentView()) {
// we have at least f+1 view-changes with view number >= maxKnownCorrectView
Expand Down
2 changes: 1 addition & 1 deletion bftengine/src/bftengine/messages/ClientRequestMsg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ void ClientRequestMsg::validateImp(const ReplicasInfo& repInfo) const {
(repInfo.isIdOfReplica(clientId) || repInfo.isIdOfPeerRoReplica(clientId))) {
// Allow every reconfiguration/internal message from replicas (it will be verified in the reconfiguration handler)
LOG_INFO(CNSUS,
"Reconfig/Internal replica message not validated"
"Reconfig/Internal replica message validation skipped"
<< KVLOG(clientId, header->flags & RECONFIG_FLAG, header->flags & INTERNAL_FLAG));
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ class PollBasedStateClient : public IStateClient {
PollBasedStateClient(bft::client::Client* client,
uint64_t interval_timeout_ms,
uint64_t last_known_block,
const uint16_t id_);
const uint16_t id_,
bool use_byzantine_quorum = false);
State getNextState() const override;
bool updateState(const WriteState& state) override;
~PollBasedStateClient();
Expand Down Expand Up @@ -63,6 +64,8 @@ class PollBasedStateClient : public IStateClient {
bool halted_ = false;
std::condition_variable resume_cond_;
std::mutex resume_lock_;
// At the end of State transfer we use a f + 1 quorum
bool use_byzantine_quorum_ = false;
};

} // namespace concord::client::reconfiguration
15 changes: 12 additions & 3 deletions client/reconfiguration/src/poll_based_state_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,14 @@ concord::messages::ReconfigurationResponse PollBasedStateClient::sendReconfigura
concord::messages::ReconfigurationResponse rres;
try {
if (read_request) {
bft::client::ReadConfig read_config{request_config, bft::client::LinearizableQuorum{}};
// TODO: State transfer can work with f + 1 as long as there are no byzantine replicas
bft::client::ReadConfig read_config;
if (use_byzantine_quorum_) {
read_config = bft::client::ReadConfig{request_config, bft::client::ByzantineSafeQuorum{}};
} else {
read_config = bft::client::ReadConfig{request_config, bft::client::LinearizableQuorum{}};
}

rep = bftclient_->send(read_config, std::move(msg));
} else {
bft::client::WriteConfig write_config{request_config, bft::client::LinearizableQuorum{}};
Expand Down Expand Up @@ -57,12 +64,14 @@ State PollBasedStateClient::getNextState() const {
PollBasedStateClient::PollBasedStateClient(bft::client::Client* client,
uint64_t interval_timeout_ms,
uint64_t last_known_block,
const uint16_t id)
const uint16_t id,
bool use_byzantine_quorum)
: bftclient_{client},
id_{id},
interval_timeout_ms_{interval_timeout_ms},
last_known_block_{last_known_block},
sn_gen_(bft::client::ClientId{id}) {}
sn_gen_(bft::client::ClientId{id}),
use_byzantine_quorum_{use_byzantine_quorum} {}

std::vector<State> PollBasedStateClient::getStateUpdate(bool& succ) const {
concord::messages::ClientReconfigurationStateRequest creq{id_};
Expand Down
26 changes: 12 additions & 14 deletions tests/apollo/test_skvbc_backup_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,19 @@ class SkvbcBackupRestoreTest(ApolloTest):
@with_bft_network(start_replica_cmd, selected_configs=lambda n, f, c: n == 7)
async def test_checkpoint_propagation_after_restarting_replicas(self, bft_network):
"""
Here we trigger a checkpoint, restart all replicas in a random order with 5s delay in-between,
both while stopping and starting. We verify checkpoint persisted upon restart and then trigger
Here we trigger a checkpoint, restart all replicas.
We verify checkpoint persisted upon restart and then trigger
another checkpoint. We make sure checkpoint is propagated to all the replicas.
1) Given a BFT network, we make sure all nodes are up
2) Send sufficient number of client requests to trigger checkpoint protocol
3) Stop all replicas in a random order (with 5s delay in between)
4) Start all replicas in a random order (with 5s delay in between)
3) Stop all replicas in a random order
4) Start all replicas in a random order
5) Make sure the initial view is stable
6) Send sufficient number of client requests to trigger another checkpoint
7) Make sure checkpoint propagates to all the replicas
Note: UDP configuration waits for 5 seconds until it assumes network communication is established.
A replica can thus trigger a view change if it
"""
bft_network.start_all_replicas()
skvbc = kvbc.SimpleKVBCProtocol(bft_network)
Expand All @@ -87,20 +90,15 @@ async def test_checkpoint_propagation_after_restarting_replicas(self, bft_networ
verify_checkpoint_persistency=False
)

# stop n replicas in a random order with a delay of 5s in between
stopped_replicas = await self._stop_random_replicas_with_delay(bft_network, delay=5,
exclude_replicas={current_primary})
bft_network.stop_replica(current_primary)
# start stopped replicas in a random order with a delay of 5s in between
bft_network.start_replica(current_primary)
await self._start_random_replicas_with_delay(bft_network, stopped_replicas, delay=5)

bft_network.stop_all_replicas()
bft_network.start_all_replicas()
stopped_replicas = bft_network.all_replicas()
# verify checkpoint persistence
log.log_message(message_type=f"Wait for replicas to reach checkpoint", checkpoint=checkpoint_before+1,
replicas=[current_primary] + list(stopped_replicas))
replicas=stopped_replicas)
await bft_network.wait_for_replicas_to_checkpoint(
stopped_replicas,
expected_checkpoint_num=lambda ecn: ecn == checkpoint_before + 1)
expected_checkpoint_num=lambda ecn: ecn >= checkpoint_before + 1)

# verify current view is stable
for replica in bft_network.all_replicas():
Expand Down
3 changes: 1 addition & 2 deletions tests/apollo/test_skvbc_checkpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,12 +395,11 @@ async def test_checkpoint_propagation_after_f_nodes_including_primary_isolated(s
# Once the adversary is gone, the isolated replicas should be able reach the checkpoint
await bft_network.wait_for_replicas_to_checkpoint(
isolated_replicas,
expected_checkpoint_num=lambda ecn: ecn == checkpoint_before + 1)
expected_checkpoint_num=lambda ecn: ecn >= checkpoint_before + 1)

@with_trio
@with_bft_network(start_replica_cmd_with_corrupted_checkpoint_msgs(corrupt_checkpoints_from_replica_ids={ 1 }),
selected_configs=lambda n, f, c: n == 7)

async def test_rvt_conflict_detection_after_corrupting_checkpoint_msg_for_single_replica(self, bft_network):
await self._test_checkpointing_with_corruptions(bft_network, { 1 })

Expand Down
1 change: 1 addition & 0 deletions tests/apollo/util/bft.py
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,7 @@ def monitor_replica_subproc(subproc: subprocess.Popen, replica_id: int, stop_eve
f"return code = {return_code}"
log_message(message_type=f"{error_msg}, aborting test", replica_log=stdout_file.name)
stdout_file.write(f"####### FATAL ERROR: The process has crashed, subproc return value: {return_code}\n")
print(error_msg, file=sys.stderr)
os.kill(os.getpid(), signal.SIGINT)
break

Expand Down
3 changes: 3 additions & 0 deletions tests/apollo/util/bft_network_partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from functools import partial
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "pyclient"))
import bft_config
from util import eliot_logging as log

class NetworkPartitioningAdversary(ABC):
"""Represents an adversary capable of inflicting network partitioning"""
Expand All @@ -42,6 +43,7 @@ def __enter__(self):
def __exit__(self, *args):
"""context manager method for 'with' statements"""
self._remove_bft_network_rule_chain()
log.log_message(message_type=f"Interference terminated")

@abstractmethod
def interfere(self):
Expand Down Expand Up @@ -236,6 +238,7 @@ def __init__(self, bft_network, replicas_to_isolate):
super(ReplicaSubsetIsolatingAdversary, self).__init__(bft_network)

def interfere(self):
log.log_message(message_type=f"Disabling replicas communication", replicas=self.replicas_to_isolate)
other_replicas = set(self.bft_network.all_replicas()) - set(self.replicas_to_isolate)
for ir in self.replicas_to_isolate:
for r in other_replicas:
Expand Down
9 changes: 6 additions & 3 deletions tests/apollo/util/skvbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,17 +343,20 @@ async def assert_successful_put_get(self):
reply = await client.write(self.write_req([], [(key, val)], 0))
reply = self.parse_reply(reply)
assert reply.success
assert last_block + 1 == reply.last_block_id
last_block_after_write = reply.last_block_id
assert last_block_after_write > last_block, f'last_block_after_write: {last_block_after_write},' \
f'last_block: {last_block}'

# Retrieve the last block and ensure that it matches what's expected
read_reply = await client.read(self.get_last_block_req())
newest_block = self.parse_reply(read_reply)
assert last_block + 1 == newest_block
assert newest_block >= last_block_after_write, f'newest_block: {newest_block}, ' \
f'last_block_after_write: {last_block_after_write}'

# Get the previous put value, and ensure it's correct
read_req = self.read_req([key], newest_block)
kvpairs = self.parse_reply(await client.read(read_req))
assert {key: val} == kvpairs
assert {key: val} == kvpairs, '{}, {}'.format({key: val}, kvpairs)

def _create_keys(self):
"""
Expand Down
65 changes: 44 additions & 21 deletions tests/simpleKVBC/TesterReplica/internalCommandsHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,21 +78,39 @@ InternalCommandsHandler::InternalCommandsHandler(concord::kvbc::IReader *storage
m_logger(logger),
m_addAllKeysAsPublic{addAllKeysAsPublic},
m_kvbc{kvbc} {
st.addOnTransferringCompleteCallback([this](uint64_t) {
LOG_INFO(GL, "Synchronizing client execution state after state transfer");
auto data = m_storage->getLatest(CLIENT_STATE_CAT_ID, {0x1});
ConcordAssert(data.has_value());
auto raw_json = std::get<concord::kvbc::categorization::VersionedValue>(data.value()).data;
nlohmann::json json2 = nlohmann::json::parse(raw_json);
nlohmann::json::json_serializer<std::unordered_map<uint16_t, uint64_t>, void> serializer;
serializer.from_json(json2, m_clientToMaxExecutedReqId);
LOG_INFO(GL, "raw client state: " << KVLOG(raw_json));
});
if (ReplicaConfig::instance().isReadOnly) {
return;
}

loadClientStateFromStorage();
st.addOnTransferringCompleteCallback([this](uint64_t) { this->loadClientStateFromStorage(); });

if (m_addAllKeysAsPublic) {
ConcordAssertNE(m_kvbc, nullptr);
}
}

void InternalCommandsHandler::loadClientStateFromStorage() {
ConcordAssert(!ReplicaConfig::instance().isReadOnly);
LOG_INFO(GL, "Synchronizing client execution state");
auto data = m_storage->getLatest(CLIENT_STATE_CAT_ID, {0x1});
if (!data.has_value()) {
LOG_WARN(GL, "empty client execution state, were any client requests executed?");
return;
}
auto raw_json = std::get<concord::kvbc::categorization::VersionedValue>(data.value()).data;
nlohmann::json json2 = nlohmann::json::parse(raw_json);
nlohmann::json::json_serializer<std::vector<std::pair<uint16_t, uint64_t>>, void> serializer;
std::vector<std::pair<uint16_t, uint64_t>> deserialized;
serializer.from_json(json2, deserialized);
m_clientToMaxExecutedReqId.clear();

for (auto [clientId, reqId] : deserialized) {
m_clientToMaxExecutedReqId[clientId] = reqId;
}
LOG_INFO(GL, "raw client state: " << KVLOG(raw_json));
}

void InternalCommandsHandler::add(std::string &&key,
std::string &&value,
VersionedUpdates &verUpdates,
Expand Down Expand Up @@ -348,16 +366,26 @@ void InternalCommandsHandler::writeAccumulatedBlock(ExecutionRequestsQueue &bloc
}
}

nlohmann::json json;
nlohmann::json::json_serializer<std::unordered_map<uint16_t, uint64_t>, void> serializer;
serializer.to_json(json, m_clientToMaxExecutedReqId);

VersionedUpdates clientStateUpdate;
clientStateUpdate.addUpdate({0x1}, json.dump());
clientStateUpdate.addUpdate({0x1}, serializeClientState());

addBlock(verUpdates, merkleUpdates, clientStateUpdate, sn);
}

std::string InternalCommandsHandler::serializeClientState() const {
nlohmann::json json;
nlohmann::json::json_serializer<std::vector<std::pair<uint16_t, uint64_t>>, void> serializer;
// Need to maintain a fixed order in the blocks so that the replica state won't diverge
std::vector<std::pair<uint16_t, uint64_t>> serialized;
for (auto clientIdReqIdPair : m_clientToMaxExecutedReqId) {
serialized.push_back(clientIdReqIdPair);
}
serializer.to_json(json, serialized);
auto serialized_raw_json = json.dump();
LOG_INFO(GL, KVLOG(serialized_raw_json));
return json.dump();
}

OperationResult InternalCommandsHandler::verifyWriteCommand(uint32_t requestSize,
const uint8_t *request,
size_t maxReplySize,
Expand Down Expand Up @@ -543,12 +571,7 @@ OperationResult InternalCommandsHandler::executeWriteCommand(uint32_t requestSiz
VersionedUpdates verUpdates;
BlockMerkleUpdates merkleUpdates;
VersionedUpdates clientVerUpdates;
nlohmann::json json;
nlohmann::json::json_serializer<std::unordered_map<uint16_t, uint64_t>, void> serializer;
serializer.to_json(json, m_clientToMaxExecutedReqId);
auto dump = json.dump();
LOG_INFO(GL, KVLOG(dump));
clientVerUpdates.addUpdate({0x1}, json.dump());
clientVerUpdates.addUpdate({0x1}, serializeClientState());
addKeys(write_req, sequenceNum, verUpdates, merkleUpdates);
addBlock(verUpdates, merkleUpdates, clientVerUpdates, sequenceNum);
}
Expand Down
7 changes: 5 additions & 2 deletions tests/simpleKVBC/TesterReplica/internalCommandsHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include "ControlStateManager.hpp"
#include <chrono>
#include <thread>

#include <map>
#include "log/logger.hpp"
#include "skvbc_messages.cmf.hpp"
#include "SharedTypes.hpp"
Expand Down Expand Up @@ -150,6 +150,9 @@ class InternalCommandsHandler : public concord::kvbc::ICommandsHandler {
concord::kvbc::categorization::VersionedUpdates &blockAccumulatedVerUpdates,
concord::kvbc::categorization::BlockMerkleUpdates &blockAccumulatedMerkleUpdates) const;

std::string serializeClientState() const;
void loadClientStateFromStorage();

private:
concord::kvbc::IReader *m_storage;
concord::kvbc::IBlockAdder *m_blockAdder;
Expand All @@ -161,7 +164,7 @@ class InternalCommandsHandler : public concord::kvbc::ICommandsHandler {
std::shared_ptr<concord::performance::PerformanceManager> perfManager_;
bool m_addAllKeysAsPublic{false}; // Add all key-values in the block merkle category as public ones.
concord::kvbc::adapter::ReplicaBlockchain *m_kvbc{nullptr};
std::unordered_map<uint16_t, uint64_t> m_clientToMaxExecutedReqId;
std::map<uint16_t, uint64_t> m_clientToMaxExecutedReqId;

// This string is used by clients to distinguish blocks that should be ignored by them.
// Some tests expect every block to be created by a request issued by test clients.
Expand Down

0 comments on commit 98f5f3b

Please sign in to comment.