Skip to content

Commit

Permalink
Fixed client message replay execution after state transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
WildFireFlum committed Feb 28, 2023
1 parent 2155259 commit c45052c
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 30 deletions.
19 changes: 14 additions & 5 deletions bftengine/src/bftengine/ReplicaImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3073,6 +3073,17 @@ bool ReplicaImp::tryToEnterView() {
return enteredView;
}

size_t ReplicaImp::clearClientRequestQueue() {
size_t primaryCombinedReqSize = 0;
LOG_INFO(GL, "clearing client requests" << KVLOG(requestsQueueOfPrimary.size()));
// clear requestsQueueOfPrimary
while (!requestsQueueOfPrimary.empty()) {
primaryCombinedReqSize += requestsQueueOfPrimary.front()->size();
requestsQueueOfPrimary.pop();
}
return primaryCombinedReqSize;
}

void ReplicaImp::onNewView(const std::vector<PrePrepareMsg *> &prePreparesForNewView) {
SCOPED_MDC_SEQ_NUM(std::to_string(getCurrentView()));
SeqNum firstPPSeq = 0;
Expand Down Expand Up @@ -3199,11 +3210,7 @@ void ReplicaImp::onNewView(const std::vector<PrePrepareMsg *> &prePreparesForNew

requestsOfNonPrimary.clear();

// clear requestsQueueOfPrimary
while (!requestsQueueOfPrimary.empty()) {
primaryCombinedReqSize -= requestsQueueOfPrimary.front()->size();
requestsQueueOfPrimary.pop();
}
primaryCombinedReqSize -= clearClientRequestQueue();

primary_queue_size_.Get().Set(requestsQueueOfPrimary.size());

Expand Down Expand Up @@ -3296,6 +3303,8 @@ void ReplicaImp::onTransferringCompleteImp(uint64_t newStateCheckpoint) {
time_in_state_transfer_.end();
LOG_INFO(GL, KVLOG(newStateCheckpoint));
requestsOfNonPrimary.clear();
clearClientRequestQueue();

if (ps_) {
ps_->beginWriteTran();
}
Expand Down
1 change: 1 addition & 0 deletions bftengine/src/bftengine/ReplicaImp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,7 @@ class ReplicaImp : public InternalReplicaApi, public ReplicaForStateTransfer {
void addTimers();
void startConsensusProcess(PrePrepareMsgUPtr pp, bool isCreatedEarlier);
void startConsensusProcess(PrePrepareMsgUPtr pp);
void clearClientRequestQueue();
/**
* Updates both seqNumInfo and slow_path metric
* @param seqNumInfo
Expand Down
75 changes: 61 additions & 14 deletions tests/simpleKVBC/TesterReplica/internalCommandsHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <unistd.h>
#include <algorithm>
#include <variant>
#include <nlohmann/json.hpp>
#include "ReplicaConfig.hpp"
#include "kvbc_key_types.hpp"

Expand Down Expand Up @@ -64,6 +65,34 @@ static const std::string &keyHashToCategory(const Hash &keyHash) {

static const std::string &keyToCategory(const std::string &key) { return keyHashToCategory(createHash(key)); }

InternalCommandsHandler::InternalCommandsHandler(concord::kvbc::IReader *storage,
concord::kvbc::IBlockAdder *blocksAdder,
concord::kvbc::IBlockMetadata *blockMetadata,
logging::Logger &logger,
bftEngine::IStateTransfer &st,
bool addAllKeysAsPublic,
concord::kvbc::adapter::ReplicaBlockchain *kvbc)
: m_storage(storage),
m_blockAdder(blocksAdder),
m_blockMetadata(blockMetadata),
m_logger(logger),
m_addAllKeysAsPublic{addAllKeysAsPublic},
m_kvbc{kvbc} {
st.addOnTransferringCompleteCallback([this](uint64_t) {
LOG_INFO(GL, "Synchronizing client execution state after state transfer");
auto data = m_storage->getLatest(CLIENT_STATE_CAT_ID, {0x1});
ConcordAssert(data.has_value());
auto raw_json = std::get<concord::kvbc::categorization::VersionedValue>(data.value()).data;
nlohmann::json json2 = nlohmann::json::parse(raw_json);
nlohmann::json::json_serializer<std::unordered_map<uint16_t, uint64_t>, void> serializer;
serializer.from_json(json2, m_clientToMaxExecutedReqId);
LOG_INFO(GL, "raw client state: " << KVLOG(raw_json));
});
if (m_addAllKeysAsPublic) {
ConcordAssertNE(m_kvbc, nullptr);
}
}

void InternalCommandsHandler::add(std::string &&key,
std::string &&value,
VersionedUpdates &verUpdates,
Expand Down Expand Up @@ -318,7 +347,15 @@ void InternalCommandsHandler::writeAccumulatedBlock(ExecutionRequestsQueue &bloc
"SKVBCWrite message handled; writesCounter=" << m_writesCounter << " currBlock=" << write_rep.latest_block);
}
}
addBlock(verUpdates, merkleUpdates, sn);

nlohmann::json json;
nlohmann::json::json_serializer<std::unordered_map<uint16_t, uint64_t>, void> serializer;
serializer.to_json(json, m_clientToMaxExecutedReqId);

VersionedUpdates clientStateUpdate;
clientStateUpdate.addUpdate({0x1}, json.dump());

addBlock(verUpdates, merkleUpdates, clientStateUpdate, sn);
}

OperationResult InternalCommandsHandler::verifyWriteCommand(uint32_t requestSize,
Expand Down Expand Up @@ -364,7 +401,10 @@ void InternalCommandsHandler::addKeys(const SKVBCWriteRequest &writeReq,
addMetadataKeyValue(verUpdates, sequenceNum);
}

void InternalCommandsHandler::addBlock(VersionedUpdates &verUpdates, BlockMerkleUpdates &merkleUpdates, uint64_t sn) {
void InternalCommandsHandler::addBlock(VersionedUpdates &verUpdates,
BlockMerkleUpdates &merkleUpdates,
VersionedUpdates &clientStateVerUpdates,
uint64_t sn) {
BlockId currBlock = m_storage->getLastBlockId();
Updates updates;

Expand Down Expand Up @@ -396,6 +436,7 @@ void InternalCommandsHandler::addBlock(VersionedUpdates &verUpdates, BlockMerkle
updates.add(kConcordInternalCategoryId, std::move(internal_updates));
updates.add(VERSIONED_KV_CAT_ID, std::move(verUpdates));
updates.add(BLOCK_MERKLE_CAT_ID, std::move(merkleUpdates));
updates.add(CLIENT_STATE_CAT_ID, std::move(clientStateVerUpdates));
const auto newBlockId = m_blockAdder->add(std::move(updates));
ConcordAssert(newBlockId == currBlock + 1);
}
Expand Down Expand Up @@ -483,29 +524,35 @@ OperationResult InternalCommandsHandler::executeWriteCommand(uint32_t requestSiz
hasConflict = hasConflict || (!isFirstClientRequest && m_clientToMaxExecutedReqId[clientId] >= requestId);
}

SKVBCReply reply;
reply.reply = SKVBCWriteReply();
SKVBCWriteReply &write_rep = std::get<SKVBCWriteReply>(reply.reply);
write_rep.success = !hasConflict;
if (!hasConflict) {
write_rep.latest_block = currBlock + 1;
auto [iter, isNew] = m_clientToMaxExecutedReqId.emplace(clientId, 0);
UNUSED(iter);
UNUSED(isNew);
m_clientToMaxExecutedReqId[clientId] = std::max(m_clientToMaxExecutedReqId[clientId], batchCid);

if (isBlockAccumulationEnabled) {
// If Block Accumulation is enabled then blocks are added after all requests are processed
addKeys(write_req, sequenceNum, blockAccumulatedVerUpdates, blockAccumulatedMerkleUpdates);
} else {
// If Block Accumulation is not enabled then blocks are added after all requests are processed
VersionedUpdates verUpdates;
BlockMerkleUpdates merkleUpdates;
VersionedUpdates clientVerUpdates;
nlohmann::json json;
nlohmann::json::json_serializer<std::unordered_map<uint16_t, uint64_t>, void> serializer;
serializer.to_json(json, m_clientToMaxExecutedReqId);
auto dump = json.dump();
LOG_INFO(GL, KVLOG(dump));
clientVerUpdates.addUpdate({0x1}, json.dump());
addKeys(write_req, sequenceNum, verUpdates, merkleUpdates);
addBlock(verUpdates, merkleUpdates, sequenceNum);
addBlock(verUpdates, merkleUpdates, clientVerUpdates, sequenceNum);
}
}

SKVBCReply reply;
reply.reply = SKVBCWriteReply();
SKVBCWriteReply &write_rep = std::get<SKVBCWriteReply>(reply.reply);
write_rep.success = !hasConflict;
if (!hasConflict) {
write_rep.latest_block = currBlock + 1;
auto [iter, isNew] = m_clientToMaxExecutedReqId.emplace(clientId, 0);
UNUSED(iter);
UNUSED(isNew);
m_clientToMaxExecutedReqId[clientId] = std::max(m_clientToMaxExecutedReqId[clientId], batchCid);
} else {
write_rep.latest_block = currBlock;
}
Expand Down
16 changes: 5 additions & 11 deletions tests/simpleKVBC/TesterReplica/internalCommandsHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,18 @@

static const std::string VERSIONED_KV_CAT_ID{concord::kvbc::categorization::kExecutionPrivateCategory};
static const std::string BLOCK_MERKLE_CAT_ID{concord::kvbc::categorization::kExecutionProvableCategory};
static constexpr const char *clientReplyStateCategory = "client_state";
static const std::string CLIENT_STATE_CAT_ID{clientReplyStateCategory};

class InternalCommandsHandler : public concord::kvbc::ICommandsHandler {
public:
InternalCommandsHandler(concord::kvbc::IReader *storage,
concord::kvbc::IBlockAdder *blocksAdder,
concord::kvbc::IBlockMetadata *blockMetadata,
logging::Logger &logger,
bftEngine::IStateTransfer &st,
bool addAllKeysAsPublic = false,
concord::kvbc::adapter::ReplicaBlockchain *kvbc = nullptr)
: m_storage(storage),
m_blockAdder(blocksAdder),
m_blockMetadata(blockMetadata),
m_logger(logger),
m_addAllKeysAsPublic{addAllKeysAsPublic},
m_kvbc{kvbc} {
if (m_addAllKeysAsPublic) {
ConcordAssertNE(m_kvbc, nullptr);
}
}
concord::kvbc::adapter::ReplicaBlockchain *kvbc = nullptr);

void execute(ExecutionRequestsQueue &requests,
std::optional<bftEngine::Timestamp> timestamp,
Expand Down Expand Up @@ -146,6 +139,7 @@ class InternalCommandsHandler : public concord::kvbc::ICommandsHandler {
uint64_t sn);
void addBlock(concord::kvbc::categorization::VersionedUpdates &verUpdates,
concord::kvbc::categorization::BlockMerkleUpdates &merkleUpdates,
concord::kvbc::categorization::VersionedUpdates &clientStateUpdates,
uint64_t sn);
void addKeys(const skvbc::messages::SKVBCWriteRequest &writeReq,
uint64_t sequenceNum,
Expand Down
2 changes: 2 additions & 0 deletions tests/simpleKVBC/TesterReplica/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ void run_replica(int argc, char** argv) {
setup->GetPerformanceManager(),
std::map<std::string, categorization::CATEGORY_TYPE>{
{VERSIONED_KV_CAT_ID, categorization::CATEGORY_TYPE::versioned_kv},
{CLIENT_STATE_CAT_ID, categorization::CATEGORY_TYPE::versioned_kv},
{categorization::kExecutionEventGroupLatestCategory, categorization::CATEGORY_TYPE::versioned_kv},
{BLOCK_MERKLE_CAT_ID, categorization::CATEGORY_TYPE::block_merkle}},
setup->GetSecretManager());
Expand All @@ -151,6 +152,7 @@ void run_replica(int argc, char** argv) {
replica.get(),
blockMetadata,
logger,
replica->getStateTransfer(),
setup->AddAllKeysAsPublic(),
replica->kvBlockchain() ? &replica->kvBlockchain().value() : nullptr);
replica->set_command_handler(cmdHandler);
Expand Down

0 comments on commit c45052c

Please sign in to comment.