From 665faf8d2a5256a944f9511ffa56d63c82a36ad3 Mon Sep 17 00:00:00 2001 From: Alex Upshaw Date: Thu, 8 Jul 2021 03:26:29 +0000 Subject: [PATCH] Migrate SKVBC to use CMF messages This commit modifies SimpleKVBC (implemented in tests/simpleKVBC) to use the CMF messages recently introduced in tests/simpleKVBC/cmf/skvbc_messages.cmf for client requests and replies to them, replacing SimpleKVBC's existing ad hoc message implementation which effectively used packed C structs. This commit also modifies Apollo's SimpleKVBC client implementation in tests/apollo/util/skvbc.py to use these CMF messages and remain compatible with the SimpleKVBC replicas. --- tests/apollo/util/skvbc.py | 136 +++---- tests/simpleKVBC/TesterClient/CMakeLists.txt | 2 +- tests/simpleKVBC/TesterReplica/CMakeLists.txt | 2 +- .../TesterReplica/internalCommandsHandler.cpp | 345 +++++++++------- .../TesterReplica/internalCommandsHandler.hpp | 21 +- tests/simpleKVBC/basicRandomTestsRunner.cpp | 78 ++-- tests/simpleKVBC/basicRandomTestsRunner.hpp | 8 +- tests/simpleKVBC/simpleKVBTestsBuilder.cpp | 383 +++++++++++------- tests/simpleKVBC/simpleKVBTestsBuilder.hpp | 316 +-------------- 9 files changed, 552 insertions(+), 739 deletions(-) diff --git a/tests/apollo/util/skvbc.py b/tests/apollo/util/skvbc.py index 8306c60ae3..421f260720 100644 --- a/tests/apollo/util/skvbc.py +++ b/tests/apollo/util/skvbc.py @@ -1,6 +1,6 @@ # Concord # -# Copyright (c) 2019 VMware, Inc. All Rights Reserved. +# Copyright (c) 2019-2021 VMware, Inc. All Rights Reserved. # # This product is licensed to you under the Apache 2.0 license (the "License"). # You may not use this product except in compliance with the Apache 2.0 License. @@ -10,7 +10,6 @@ # terms and conditions of the subcomponent's license, as noted in the LICENSE # file. -import struct import copy import random import trio @@ -22,6 +21,11 @@ from util import bft from enum import Enum +import os.path +import sys +sys.path.append(os.path.abspath("../../build/tests/apollo/util/")) +import skvbc_messages + WriteReply = namedtuple('WriteReply', ['success', 'last_block_id']) class ExitPolicy(Enum): @@ -29,14 +33,14 @@ class ExitPolicy(Enum): TIME = 1 class SimpleKVBCProtocol: - KV_LEN = 21 ## SimpleKVBC requies fixed size keys and values right now - READ_LATEST = 0xFFFFFFFFFFFFFFFF + # Length to use for randomly generated key and value strings; this length is + # effectively arbitrary as the SimpleKVBC protocol permits key and value + # bytestrings of any length (usually up to some practical + # implementation-imposed upper length limits). + LENGTH_FOR_RANDOM_KVS = 21 - READ = 1 - WRITE = 2 - GET_LAST_BLOCK = 3 - GET_BLOCK_DATA = 4 - LONG_EXEC_WRITE = 5 + # Maximum value for a 64-bit unsigned integer. + READ_LATEST = 0xFFFFFFFFFFFFFFFF """ An implementation of the wire protocol for SimpleKVBC requests. @@ -55,93 +59,61 @@ def __init__(self, bft_network, tracker = None, pre_process=False): @classmethod def write_req(cls, readset, writeset, block_id, long_exec=False): - data = bytearray() - # A conditional write request type - if long_exec is True: - data.append(cls.LONG_EXEC_WRITE) - else: - data.append(cls.WRITE) - # SimpleConditionalWriteHeader - data.extend( - struct.pack("serialize(sequenceNum)); } -Sliver InternalCommandsHandler::buildSliverFromStaticBuf(char *buf) { - char *newBuf = new char[KV_LEN]; - memcpy(newBuf, buf, KV_LEN); - return Sliver(newBuf, KV_LEN); -} - std::optional InternalCommandsHandler::get(const std::string &key, BlockId blockId) const { const auto v = m_storage->get(keyToCategory(key), key, blockId); if (!v) { @@ -144,10 +148,10 @@ std::optional InternalCommandsHandler::get(const std::string &key, std::string InternalCommandsHandler::getAtMost(const std::string &key, BlockId current) const { if (m_storage->getLastBlockId() == 0 || m_storage->getGenesisBlockId() == 0 || current == 0) { - return std::string(KV_LEN, '\0'); + return std::string(); } - auto value = std::string(KV_LEN, '\0'); + auto value = std::string(); do { const auto v = get(key, current); if (v) { @@ -162,7 +166,7 @@ std::string InternalCommandsHandler::getAtMost(const std::string &key, BlockId c std::string InternalCommandsHandler::getLatest(const std::string &key) const { const auto v = m_storage->getLatest(keyToCategory(key), key); if (!v) { - return std::string(KV_LEN, '\0'); + return std::string(); } return std::visit([](const auto &v) { return v.data; }, *v); } @@ -217,31 +221,47 @@ void InternalCommandsHandler::writeAccumulatedBlock(ExecutionRequestsQueue &bloc for (auto &req : blockedRequests) { if (req.flags & bftEngine::MsgFlag::HAS_PRE_PROCESSED_FLAG) { - auto *reply = (SimpleReply_ConditionalWrite *)req.outReply; - reply->latestBlock = currBlock + 1; + SKVBCReply reply; + size_t existing_reply_size = req.outActualReplySize; + static_assert(sizeof(*(req.outReply)) == sizeof(uint8_t), + "Byte pointer type used by bftEngine::IRequestsHandler::ExecutionRequest is incompatible with byte " + "pointer type used by CMF."); + const uint8_t *reply_buffer_as_uint8 = reinterpret_cast(req.outReply); + deserialize(reply_buffer_as_uint8, reply_buffer_as_uint8 + req.outActualReplySize, reply); + SKVBCWriteReply &write_rep = std::get(reply.reply); + write_rep.latest_block = currBlock + 1; + vector serialized_reply; + serialize(serialized_reply, reply); + + // We expect modifying the value of latest_block in the SKVBCWriteReply + // will not alter the length of its serialization. + ConcordAssert(existing_reply_size == serialized_reply.size()); + + copy(serialized_reply.begin(), serialized_reply.end(), req.outReply); LOG_INFO( m_logger, - "ConditionalWrite message handled; writesCounter=" << m_writesCounter << " currBlock=" << reply->latestBlock); + "SKVBCWrite message handled; writesCounter=" << m_writesCounter << " currBlock=" << write_rep.latest_block); } } addBlock(verUpdates, merkleUpdates); } bool InternalCommandsHandler::verifyWriteCommand(uint32_t requestSize, - const SimpleCondWriteRequest &request, + const uint8_t *request, size_t maxReplySize, uint32_t &outReplySize) const { - if (requestSize < sizeof(SimpleCondWriteRequest)) { - LOG_ERROR(m_logger, - "The message is too small: requestSize is " << requestSize << ", required size is " - << sizeof(SimpleCondWriteRequest)); + SKVBCRequest deserialized_request; + try { + deserialize(request, request + requestSize, deserialized_request); + } catch (const runtime_error &e) { + LOG_ERROR(m_logger, "Failed to deserialize SKVBCRequest: " << e.what()); return false; } - if (requestSize < sizeof(request)) { - LOG_ERROR(m_logger, - "The message is too small: requestSize is " << requestSize << ", required size is " << sizeof(request)); + if (!holds_alternative(deserialized_request.request)) { + LOG_ERROR(m_logger, "Received an SKVBCRequest other than an SKVBCWriteRequest but not marked as read-only."); return false; } + if (maxReplySize < outReplySize) { LOG_ERROR(m_logger, "replySize is too big: replySize=" << outReplySize << ", maxReplySize=" << maxReplySize); return false; @@ -249,14 +269,20 @@ bool InternalCommandsHandler::verifyWriteCommand(uint32_t requestSize, return true; } -void InternalCommandsHandler::addKeys(SimpleCondWriteRequest *writeReq, +void InternalCommandsHandler::addKeys(const SKVBCWriteRequest &writeReq, uint64_t sequenceNum, VersionedUpdates &verUpdates, BlockMerkleUpdates &merkleUpdates) { - SimpleKV *keyValArray = writeReq->keyValueArray(); - for (size_t i = 0; i < writeReq->numOfWrites; i++) { - add(std::string(keyValArray[i].simpleKey.key, KV_LEN), - std::string(keyValArray[i].simpleValue.value, KV_LEN), + for (size_t i = 0; i < writeReq.writeset.size(); i++) { + static_assert( + (sizeof(*(writeReq.writeset[i].first.data())) == sizeof(string::value_type)) && + (sizeof(*(writeReq.writeset[i].second.data())) == sizeof(string::value_type)), + "Byte pointer type used by concord::kvbc::categorization::VersionedUpdates and/or " + "concord::kvbc::categorization::BlockMerkleUpdates is incompatible with byte pointer type used by CMF."); + add(string(reinterpret_cast(writeReq.writeset[i].first.data()), + writeReq.writeset[i].first.size()), + string(reinterpret_cast(writeReq.writeset[i].second.data()), + writeReq.writeset[i].second.size()), verUpdates, merkleUpdates); } @@ -299,37 +325,56 @@ bool InternalCommandsHandler::executeWriteCommand(uint32_t requestSize, bool isBlockAccumulationEnabled, VersionedUpdates &blockAccumulatedVerUpdates, BlockMerkleUpdates &blockAccumulatedMerkleUpdates) { - auto *writeReq = (SimpleCondWriteRequest *)request; - LOG_INFO(m_logger, - "Execute WRITE command:" - << " type=" << writeReq->header.type << " seqNum=" << sequenceNum - << " numOfWrites=" << writeReq->numOfWrites << " numOfKeysInReadSet=" << writeReq->numOfKeysInReadSet - << " readVersion=" << writeReq->readVersion - << " READ_ONLY_FLAG=" << ((flags & MsgFlag::READ_ONLY_FLAG) != 0 ? "true" : "false") - << " PRE_PROCESS_FLAG=" << ((flags & MsgFlag::PRE_PROCESS_FLAG) != 0 ? "true" : "false") - << " HAS_PRE_PROCESSED_FLAG=" << ((flags & MsgFlag::HAS_PRE_PROCESSED_FLAG) != 0 ? "true" : "false") - << " BLOCK_ACCUMULATION_ENABLED=" << isBlockAccumulationEnabled); - + static_assert(sizeof(*request) == sizeof(uint8_t), + "Byte pointer type used by bftEngine::IRequestsHandler::ExecutionRequest is incompatible with byte " + "pointer type used by CMF."); + const uint8_t *request_buffer_as_uint8 = reinterpret_cast(request); if (!(flags & MsgFlag::HAS_PRE_PROCESSED_FLAG)) { - bool result = verifyWriteCommand(requestSize, *writeReq, maxReplySize, outReplySize); + bool result = verifyWriteCommand(requestSize, request_buffer_as_uint8, maxReplySize, outReplySize); if (!result) ConcordAssert(0); if (flags & MsgFlag::PRE_PROCESS_FLAG) { - if (writeReq->header.type == LONG_EXEC_COND_WRITE) sleep(LONG_EXEC_CMD_TIME_IN_SEC); + SKVBCRequest deserialized_request; + deserialize(request_buffer_as_uint8, request_buffer_as_uint8 + requestSize, deserialized_request); + const SKVBCWriteRequest &write_req = std::get(deserialized_request.request); + LOG_INFO(m_logger, + "Execute WRITE command:" + << " type=SKVBCWriteRequest seqNum=" << sequenceNum << " numOfWrites=" << write_req.writeset.size() + << " numOfKeysInReadSet=" << write_req.readset.size() << " readVersion=" << write_req.read_version + << " READ_ONLY_FLAG=" << ((flags & MsgFlag::READ_ONLY_FLAG) != 0 ? "true" : "false") + << " PRE_PROCESS_FLAG=" << ((flags & MsgFlag::PRE_PROCESS_FLAG) != 0 ? "true" : "false") + << " HAS_PRE_PROCESSED_FLAG=" << ((flags & MsgFlag::HAS_PRE_PROCESSED_FLAG) != 0 ? "true" : "false") + << " BLOCK_ACCUMULATION_ENABLED=" << isBlockAccumulationEnabled); + if (write_req.long_exec) sleep(LONG_EXEC_CMD_TIME_IN_SEC); outReplySize = requestSize; memcpy(outReply, request, requestSize); return result; } } + SKVBCRequest deserialized_request; + deserialize(request_buffer_as_uint8, request_buffer_as_uint8 + requestSize, deserialized_request); + const SKVBCWriteRequest &write_req = std::get(deserialized_request.request); + LOG_INFO(m_logger, + "Execute WRITE command:" + << " type=SKVBCWriteRequest seqNum=" << sequenceNum << " numOfWrites=" << write_req.writeset.size() + << " numOfKeysInReadSet=" << write_req.readset.size() << " readVersion=" << write_req.read_version + << " READ_ONLY_FLAG=" << ((flags & MsgFlag::READ_ONLY_FLAG) != 0 ? "true" : "false") + << " PRE_PROCESS_FLAG=" << ((flags & MsgFlag::PRE_PROCESS_FLAG) != 0 ? "true" : "false") + << " HAS_PRE_PROCESSED_FLAG=" << ((flags & MsgFlag::HAS_PRE_PROCESSED_FLAG) != 0 ? "true" : "false") + << " BLOCK_ACCUMULATION_ENABLED=" << isBlockAccumulationEnabled); - SimpleKey *readSetArray = writeReq->readSetArray(); BlockId currBlock = m_storage->getLastBlockId(); // Look for conflicts bool hasConflict = false; - for (size_t i = 0; !hasConflict && i < writeReq->numOfKeysInReadSet; i++) { - const auto key = std::string(readSetArray[i].key, KV_LEN); + for (size_t i = 0; !hasConflict && i < write_req.readset.size(); i++) { + static_assert( + sizeof(*(write_req.readset[i].data())) == sizeof(string::value_type), + "Byte pointer type used by concord::kvbc::IReader, concord::kvbc::categorization::VersionedUpdates, and/or " + "concord::kvbc::categorization::BlockMerkleUpdatesis incompatible with byte pointer type used by CMF."); + const string key = + string(reinterpret_cast(write_req.readset[i].data()), write_req.readset[i].size()); const auto latest_ver = getLatestVersion(key); - hasConflict = (latest_ver && latest_ver > writeReq->readVersion); + hasConflict = (latest_ver && latest_ver > write_req.read_version); if (isBlockAccumulationEnabled && !hasConflict) { if (hasConflictInBlockAccumulatedRequests(key, blockAccumulatedVerUpdates, blockAccumulatedMerkleUpdates)) { hasConflict = true; @@ -340,48 +385,46 @@ bool InternalCommandsHandler::executeWriteCommand(uint32_t requestSize, if (!hasConflict) { if (isBlockAccumulationEnabled) { // If Block Accumulation is enabled then blocks are added after all requests are processed - addKeys(writeReq, sequenceNum, blockAccumulatedVerUpdates, blockAccumulatedMerkleUpdates); + addKeys(write_req, sequenceNum, blockAccumulatedVerUpdates, blockAccumulatedMerkleUpdates); } else { // If Block Accumulation is not enabled then blocks are added after all requests are processed VersionedUpdates verUpdates; BlockMerkleUpdates merkleUpdates; - addKeys(writeReq, sequenceNum, verUpdates, merkleUpdates); + addKeys(write_req, sequenceNum, verUpdates, merkleUpdates); addBlock(verUpdates, merkleUpdates); } } - ConcordAssert(sizeof(SimpleReply_ConditionalWrite) <= maxReplySize); - auto *reply = (SimpleReply_ConditionalWrite *)outReply; - reply->header.type = COND_WRITE; - reply->success = (!hasConflict); + SKVBCReply reply; + reply.reply = SKVBCWriteReply(); + SKVBCWriteReply &write_rep = std::get(reply.reply); + write_rep.success = (!hasConflict); if (!hasConflict) - reply->latestBlock = currBlock + 1; + write_rep.latest_block = currBlock + 1; else - reply->latestBlock = currBlock; + write_rep.latest_block = currBlock; - outReplySize = sizeof(SimpleReply_ConditionalWrite); + vector serialized_reply; + serialize(serialized_reply, reply); + ConcordAssert(serialized_reply.size() <= maxReplySize); + copy(serialized_reply.begin(), serialized_reply.end(), outReply); + outReplySize = serialized_reply.size(); ++m_writesCounter; if (!isBlockAccumulationEnabled) - LOG_INFO( - m_logger, - "ConditionalWrite message handled; writesCounter=" << m_writesCounter << " currBlock=" << reply->latestBlock); + LOG_INFO(m_logger, + "ConditionalWrite message handled; writesCounter=" << m_writesCounter + << " currBlock=" << write_rep.latest_block); return true; } -bool InternalCommandsHandler::executeGetBlockDataCommand( - uint32_t requestSize, const char *request, size_t maxReplySize, char *outReply, uint32_t &outReplySize) { - auto *req = (SimpleGetBlockDataRequest *)request; - LOG_INFO(m_logger, "Execute GET_BLOCK_DATA command: type=" << req->h.type << ", BlockId=" << req->block_id); - - auto minRequestSize = std::max(sizeof(SimpleGetBlockDataRequest), req->size()); - if (requestSize < minRequestSize) { - LOG_ERROR(m_logger, - "The message is too small: requestSize=" << requestSize << ", minRequestSize=" << minRequestSize); - return false; - } +bool InternalCommandsHandler::executeGetBlockDataCommand(const SKVBCGetBlockDataRequest &request, + size_t maxReplySize, + char *outReply, + uint32_t &outReplySize) { + LOG_INFO(m_logger, "Execute GET_BLOCK_DATA command: type=SKVBCGetBlockDataRequest, BlockId=" << request.block_id); - auto block_id = req->block_id; + auto block_id = request.block_id; const auto updates = getBlockUpdates(block_id); if (!updates) { LOG_ERROR(m_logger, "GetBlockData: Failed to retrieve block ID " << block_id); @@ -391,102 +434,95 @@ bool InternalCommandsHandler::executeGetBlockDataCommand( // Each block contains a single metadata key holding the sequence number const int numMetadataKeys = 1; auto numOfElements = updates->size() - numMetadataKeys; - size_t replySize = SimpleReply_Read::getSize(numOfElements); LOG_INFO(m_logger, "NUM OF ELEMENTS IN BLOCK = " << numOfElements); - if (maxReplySize < replySize) { - LOG_ERROR(m_logger, "replySize is too big: replySize=" << replySize << ", maxReplySize=" << maxReplySize); - return false; - } - - SimpleReply_Read *pReply = (SimpleReply_Read *)(outReply); - outReplySize = replySize; - memset(pReply, 0, replySize); - pReply->header.type = READ; - pReply->numOfItems = numOfElements; - auto i = 0; + SKVBCReply reply; + reply.reply = SKVBCReadReply(); + SKVBCReadReply &read_rep = std::get(reply.reply); + read_rep.reads.resize(numOfElements); + size_t i = 0; for (const auto &[key, value] : *updates) { if (key != concord::kvbc::IBlockMetadata::kBlockMetadataKeyStr) { - memcpy(pReply->items[i].simpleKey.key, key.data(), KV_LEN); - memcpy(pReply->items[i].simpleValue.value, value.data(), KV_LEN); + read_rep.reads[i].first.assign(key.begin(), key.end()); + read_rep.reads[i].second.assign(value.begin(), value.end()); ++i; } } - return true; -} - -bool InternalCommandsHandler::executeReadCommand( - uint32_t requestSize, const char *request, size_t maxReplySize, char *outReply, uint32_t &outReplySize) { - auto *readReq = (SimpleReadRequest *)request; - LOG_INFO(m_logger, - "Execute READ command: type=" << readReq->header.type << ", numberOfKeysToRead=" - << readReq->numberOfKeysToRead << ", readVersion=" << readReq->readVersion); - auto minRequestSize = std::max(sizeof(SimpleReadRequest), readReq->getSize()); - if (requestSize < minRequestSize) { + vector serialized_reply; + serialize(serialized_reply, reply); + if (maxReplySize < serialized_reply.size()) { LOG_ERROR(m_logger, - "The message is too small: requestSize=" << requestSize << ", minRequestSize=" << minRequestSize); - return false; - } - - size_t numOfItems = readReq->numberOfKeysToRead; - size_t replySize = SimpleReply_Read::getSize(numOfItems); - - if (maxReplySize < replySize) { - LOG_ERROR(m_logger, "replySize is too big: replySize=" << replySize << ", maxReplySize=" << maxReplySize); + "replySize is too big: replySize=" << serialized_reply.size() << ", maxReplySize=" << maxReplySize); return false; } + copy(serialized_reply.begin(), serialized_reply.end(), outReply); + outReplySize = serialized_reply.size(); + return true; +} - auto *reply = (SimpleReply_Read *)(outReply); - outReplySize = replySize; - reply->header.type = READ; - reply->numOfItems = numOfItems; - - SimpleKey *readKeys = readReq->keys; - SimpleKV *replyItems = reply->items; - for (size_t i = 0; i < numOfItems; i++) { - memcpy(replyItems->simpleKey.key, readKeys->key, KV_LEN); - auto value = std::string(KV_LEN, '\0'); - if (readReq->readVersion > m_storage->getLastBlockId()) { - value = getLatest(std::string(readKeys->key, KV_LEN)); +bool InternalCommandsHandler::executeReadCommand(const SKVBCReadRequest &request, + size_t maxReplySize, + char *outReply, + uint32_t &outReplySize) { + LOG_INFO(m_logger, + "Execute READ command: type=SKVBCReadRequest, numberOfKeysToRead=" << request.keys.size() << ", readVersion=" + << request.read_version); + + SKVBCReply reply; + reply.reply = SKVBCReadReply(); + SKVBCReadReply &read_rep = std::get(reply.reply); + read_rep.reads.resize(request.keys.size()); + for (size_t i = 0; i < request.keys.size(); i++) { + read_rep.reads[i].first = request.keys[i]; + string value = ""; + static_assert( + sizeof(*(request.keys[i].data())) == sizeof(string::value_type), + "Byte pointer type used by concord::kvbc::IReader is incompatible with byte pointer type used by CMF."); + string key(reinterpret_cast(request.keys[i].data()), request.keys[i].size()); + if (request.read_version > m_storage->getLastBlockId()) { + value = getLatest(key); } else { - value = getAtMost(std::string(readKeys->key, KV_LEN), readReq->readVersion); + value = getAtMost(key, request.read_version); } - memcpy(replyItems->simpleValue.value, value.data(), KV_LEN); - ++readKeys; - ++replyItems; + read_rep.reads[i].second.assign(value.begin(), value.end()); + } + + vector serialized_reply; + serialize(serialized_reply, reply); + if (maxReplySize < serialized_reply.size()) { + LOG_ERROR(m_logger, + "replySize is too big: replySize=" << serialized_reply.size() << ", maxReplySize=" << maxReplySize); + return false; } + copy(serialized_reply.begin(), serialized_reply.end(), outReply); + outReplySize = serialized_reply.size(); ++m_readsCounter; LOG_INFO(m_logger, "READ message handled; readsCounter=" << m_readsCounter); return true; } -bool InternalCommandsHandler::executeGetLastBlockCommand(uint32_t requestSize, - size_t maxReplySize, - char *outReply, - uint32_t &outReplySize) { +bool InternalCommandsHandler::executeGetLastBlockCommand(size_t maxReplySize, char *outReply, uint32_t &outReplySize) { LOG_INFO(m_logger, "GET LAST BLOCK!!!"); - if (requestSize < sizeof(SimpleGetLastBlockRequest)) { - LOG_ERROR(m_logger, - "The message is too small: requestSize is " << requestSize << ", required size is " - << sizeof(SimpleGetLastBlockRequest)); - return false; - } + SKVBCReply reply; + reply.reply = SKVBCGetLastBlockReply(); + SKVBCGetLastBlockReply &glb_rep = std::get(reply.reply); + glb_rep.latest_block = m_storage->getLastBlockId(); - outReplySize = sizeof(SimpleReply_GetLastBlock); - if (maxReplySize < outReplySize) { - LOG_ERROR(m_logger, "maxReplySize is too small: replySize=" << outReplySize << ", maxReplySize=" << maxReplySize); + vector serialized_reply; + serialize(serialized_reply, reply); + if (maxReplySize < serialized_reply.size()) { + LOG_ERROR(m_logger, + "maxReplySize is too small: replySize=" << serialized_reply.size() << ", maxReplySize=" << maxReplySize); return false; } - - auto *reply = (SimpleReply_GetLastBlock *)(outReply); - reply->header.type = GET_LAST_BLOCK; - reply->latestBlock = m_storage->getLastBlockId(); + copy(serialized_reply.begin(), serialized_reply.end(), outReply); + outReplySize = serialized_reply.size(); ++m_getLastBlockCounter; LOG_INFO(m_logger, "GetLastBlock message handled; getLastBlockCounter=" << m_getLastBlockCounter - << ", latestBlock=" << reply->latestBlock); + << ", latestBlock=" << glb_rep.latest_block); return true; } @@ -496,16 +532,29 @@ bool InternalCommandsHandler::executeReadOnlyCommand(uint32_t requestSize, char *outReply, uint32_t &outReplySize, uint32_t &specificReplicaInfoOutReplySize) { - auto *requestHeader = (SimpleRequest *)request; - if (requestHeader->type == READ) { - return executeReadCommand(requestSize, request, maxReplySize, outReply, outReplySize); - } else if (requestHeader->type == GET_LAST_BLOCK) { - return executeGetLastBlockCommand(requestSize, maxReplySize, outReply, outReplySize); - } else if (requestHeader->type == GET_BLOCK_DATA) { - return executeGetBlockDataCommand(requestSize, request, maxReplySize, outReply, outReplySize); + SKVBCRequest deserialized_request; + try { + static_assert(sizeof(*request) == sizeof(uint8_t), + "Byte pointer type used by bftEngine::IRequestsHandler::ExecutionRequest is incompatible with byte " + "pointer type used by CMF."); + const uint8_t *request_buffer_as_uint8 = reinterpret_cast(request); + deserialize(request_buffer_as_uint8, request_buffer_as_uint8 + requestSize, deserialized_request); + } catch (const runtime_error &e) { + outReplySize = 0; + LOG_ERROR(m_logger, "Failed to deserialize SKVBCRequest: " << e.what()); + return false; + } + if (holds_alternative(deserialized_request.request)) { + return executeReadCommand( + std::get(deserialized_request.request), maxReplySize, outReply, outReplySize); + } else if (holds_alternative(deserialized_request.request)) { + return executeGetLastBlockCommand(maxReplySize, outReply, outReplySize); + } else if (holds_alternative(deserialized_request.request)) { + return executeGetBlockDataCommand( + std::get(deserialized_request.request), maxReplySize, outReply, outReplySize); } else { outReplySize = 0; - LOG_ERROR(m_logger, "Illegal message received: requestHeader->type=" << requestHeader->type); + LOG_ERROR(m_logger, "Received read-only request of unrecognized message type."); return false; } } diff --git a/tests/simpleKVBC/TesterReplica/internalCommandsHandler.hpp b/tests/simpleKVBC/TesterReplica/internalCommandsHandler.hpp index 4a35730f80..c6d56e9a71 100644 --- a/tests/simpleKVBC/TesterReplica/internalCommandsHandler.hpp +++ b/tests/simpleKVBC/TesterReplica/internalCommandsHandler.hpp @@ -1,6 +1,6 @@ // Concord // -// Copyright (c) 2018-2019 VMware, Inc. All Rights Reserved. +// Copyright (c) 2018-2021 VMware, Inc. All Rights Reserved. // // This product is licensed to you under the Apache 2.0 license (the "License"). // You may not use this product except in compliance with the Apache 2.0 @@ -63,22 +63,25 @@ class InternalCommandsHandler : public concord::kvbc::ICommandsHandler { uint32_t &specificReplicaInfoOutReplySize); bool verifyWriteCommand(uint32_t requestSize, - const BasicRandomTests::SimpleCondWriteRequest &request, + const uint8_t *request, size_t maxReplySize, uint32_t &outReplySize) const; - bool executeReadCommand( - uint32_t requestSize, const char *request, size_t maxReplySize, char *outReply, uint32_t &outReplySize); + bool executeReadCommand(const skvbc::messages::SKVBCReadRequest &request, + size_t maxReplySize, + char *outReply, + uint32_t &outReplySize); - bool executeGetBlockDataCommand( - uint32_t requestSize, const char *request, size_t maxReplySize, char *outReply, uint32_t &outReplySize); + bool executeGetBlockDataCommand(const skvbc::messages::SKVBCGetBlockDataRequest &request, + size_t maxReplySize, + char *outReply, + uint32_t &outReplySize); - bool executeGetLastBlockCommand(uint32_t requestSize, size_t maxReplySize, char *outReply, uint32_t &outReplySize); + bool executeGetLastBlockCommand(size_t maxReplySize, char *outReply, uint32_t &outReplySize); void addMetadataKeyValue(concord::kvbc::categorization::VersionedUpdates &updates, uint64_t sequenceNum) const; private: - static concordUtils::Sliver buildSliverFromStaticBuf(char *buf); std::optional get(const std::string &key, concord::kvbc::BlockId blockId) const; std::string getAtMost(const std::string &key, concord::kvbc::BlockId blockId) const; std::string getLatest(const std::string &key) const; @@ -89,7 +92,7 @@ class InternalCommandsHandler : public concord::kvbc::ICommandsHandler { concord::kvbc::categorization::BlockMerkleUpdates &merkleUpdates); void addBlock(concord::kvbc::categorization::VersionedUpdates &verUpdates, concord::kvbc::categorization::BlockMerkleUpdates &merkleUpdates); - void addKeys(BasicRandomTests::SimpleCondWriteRequest *writeReq, + void addKeys(const skvbc::messages::SKVBCWriteRequest &writeReq, uint64_t sequenceNum, concord::kvbc::categorization::VersionedUpdates &verUpdates, concord::kvbc::categorization::BlockMerkleUpdates &merkleUpdates); diff --git a/tests/simpleKVBC/basicRandomTestsRunner.cpp b/tests/simpleKVBC/basicRandomTestsRunner.cpp index d910dffc18..9dd0e99b96 100644 --- a/tests/simpleKVBC/basicRandomTestsRunner.cpp +++ b/tests/simpleKVBC/basicRandomTestsRunner.cpp @@ -1,6 +1,6 @@ // Concord // -// Copyright (c) 2018-2020 VMware, Inc. All Rights Reserved. +// Copyright (c) 2018-2021 VMware, Inc. All Rights Reserved. // // This product is licensed to you under the Apache 2.0 license (the "License"). // You may not use this product except in compliance with the Apache 2.0 @@ -14,18 +14,22 @@ #include "basicRandomTestsRunner.hpp" #include "assertUtils.hpp" #include - -#ifndef _WIN32 #include -#endif using std::chrono::seconds; +using std::holds_alternative; +using std::list; using concord::kvbc::IClient; +using skvbc::messages::SKVBCGetLastBlockRequest; +using skvbc::messages::SKVBCReadRequest; +using skvbc::messages::SKVBCReply; +using skvbc::messages::SKVBCRequest; +using skvbc::messages::SKVBCWriteRequest; namespace BasicRandomTests { -BasicRandomTestsRunner::BasicRandomTestsRunner(logging::Logger &logger, IClient &client, size_t numOfOperations) +BasicRandomTestsRunner::BasicRandomTestsRunner(logging::Logger& logger, IClient& client, size_t numOfOperations) : logger_(logger), client_(client), numOfOperations_(numOfOperations) { // We have to start the client here, since construction of the TestsBuilder // uses the client. @@ -35,69 +39,71 @@ BasicRandomTestsRunner::BasicRandomTestsRunner(logging::Logger &logger, IClient } void BasicRandomTestsRunner::sleep(int ops) { -#ifndef _WIN32 if (ops % 100 == 0) usleep(100 * 1000); -#endif } void BasicRandomTestsRunner::run() { testsBuilder_->createRandomTest(numOfOperations_, 1111); - RequestsList requests = testsBuilder_->getRequests(); - RepliesList expectedReplies = testsBuilder_->getReplies(); + list requests = testsBuilder_->getRequests(); + list expectedReplies = testsBuilder_->getReplies(); ConcordAssert(requests.size() == expectedReplies.size()); int ops = 0; while (!requests.empty()) { sleep(ops); - SimpleRequest *request = requests.front(); - SimpleReply *expectedReply = expectedReplies.front(); + SKVBCRequest request = requests.front(); + SKVBCReply expectedReply = expectedReplies.front(); requests.pop_front(); expectedReplies.pop_front(); - bool readOnly = (request->type != COND_WRITE); - size_t requestSize = TestsBuilder::sizeOfRequest(request); - size_t expectedReplySize = TestsBuilder::sizeOfReply(expectedReply); + bool readOnly = !holds_alternative(request.request); + vector serialized_request; + vector serialized_reply; + serialize(serialized_request, request); + serialize(serialized_reply, expectedReply); + size_t expectedReplySize = serialized_reply.size(); uint32_t actualReplySize = 0; - std::vector reply(expectedReplySize); - - auto res = client_.invokeCommandSynch( - (char *)request, requestSize, readOnly, seconds(0), expectedReplySize, reply.data(), &actualReplySize); + static_assert( + (sizeof(*(serialized_request.data())) == sizeof(char)) && (sizeof(*(serialized_reply.data())) == sizeof(char)), + "Byte pointer type used by concord::kvbc::IClient interface is incompatible with byte pointer type used by " + "CMF."); + auto res = client_.invokeCommandSynch(reinterpret_cast(serialized_request.data()), + serialized_request.size(), + readOnly, + seconds(0), + expectedReplySize, + reinterpret_cast(serialized_reply.data()), + &actualReplySize); ConcordAssert(res.isOK()); - if (isReplyCorrect(request->type, expectedReply, reply.data(), expectedReplySize, actualReplySize)) ops++; + if (isReplyCorrect(request, expectedReply, serialized_reply, expectedReplySize, actualReplySize)) ops++; } sleep(1); LOG_INFO(logger_, "\n*** Test completed. " << ops << " messages have been handled."); client_.stop(); } -bool BasicRandomTestsRunner::isReplyCorrect(RequestType requestType, - const SimpleReply *expectedReply, - const char *reply, +bool BasicRandomTestsRunner::isReplyCorrect(const SKVBCRequest& request, + const SKVBCReply& expectedReply, + const vector& serialized_reply, size_t expectedReplySize, uint32_t actualReplySize) { if (actualReplySize != expectedReplySize) { LOG_ERROR(logger_, "*** Test failed: actual reply size != expected"); ConcordAssert(0); } - std::ostringstream error; - switch (requestType) { - case COND_WRITE: - if (((SimpleReply_ConditionalWrite *)expectedReply)->isEquiv(*(SimpleReply_ConditionalWrite *)reply, error)) - return true; - break; - case READ: - if (((SimpleReply_Read *)expectedReply)->isEquiv(*(SimpleReply_Read *)reply, error)) return true; - break; - case GET_LAST_BLOCK: - if (((SimpleReply_GetLastBlock *)expectedReply)->isEquiv(*(SimpleReply_GetLastBlock *)reply, error)) return true; - break; - default:; + SKVBCReply actual_reply; + deserialize(serialized_reply, actual_reply); + if (holds_alternative(request.request) || holds_alternative(request.request) || + holds_alternative(request.request)) { + if (expectedReply == actual_reply) { + return true; + } } - LOG_ERROR(logger_, "*** Test failed: actual reply != expected; error: " << error.str()); + LOG_ERROR(logger_, "*** Test failed: actual reply != expected."); ConcordAssert(0); return false; } diff --git a/tests/simpleKVBC/basicRandomTestsRunner.hpp b/tests/simpleKVBC/basicRandomTestsRunner.hpp index 425de1cd26..c46d6bcc9f 100644 --- a/tests/simpleKVBC/basicRandomTestsRunner.hpp +++ b/tests/simpleKVBC/basicRandomTestsRunner.hpp @@ -1,6 +1,6 @@ // Concord // -// Copyright (c) 2019 VMware, Inc. All Rights Reserved. +// Copyright (c) 2019-2021 VMware, Inc. All Rights Reserved. // // This product is licensed to you under the Apache 2.0 license (the "License"). // You may not use this product except in compliance with the Apache 2.0 @@ -27,9 +27,9 @@ class BasicRandomTestsRunner { private: static void sleep(int ops); - bool isReplyCorrect(RequestType requestType, - const SimpleReply *expectedReply, - const char *reply, + bool isReplyCorrect(const skvbc::messages::SKVBCRequest &request, + const skvbc::messages::SKVBCReply &expectedReply, + const vector &serialized_reply, size_t expectedReplySize, uint32_t actualReplySize); diff --git a/tests/simpleKVBC/simpleKVBTestsBuilder.cpp b/tests/simpleKVBC/simpleKVBTestsBuilder.cpp index 95ece66260..64d479fbca 100644 --- a/tests/simpleKVBC/simpleKVBTestsBuilder.cpp +++ b/tests/simpleKVBC/simpleKVBTestsBuilder.cpp @@ -1,6 +1,6 @@ // Concord // -// Copyright (c) 2018-2020 VMware, Inc. All Rights Reserved. +// Copyright (c) 2018-2021 VMware, Inc. All Rights Reserved. // // This product is licensed to you under the Apache 2.0 license (the "License"). // You may not use this product except in compliance with the Apache 2.0 @@ -11,6 +11,7 @@ // terms and conditions of the subcomponent's license, as noted in the LICENSE // file. +#include "boost/detail/endian.hpp" #include "assertUtils.hpp" #include #include @@ -21,15 +22,19 @@ #include "storage/db_types.h" #include "block_metadata.hpp" -#ifndef _WIN32 -#include -#endif - using std::set; using std::chrono::seconds; using concord::kvbc::BlockId; using concord::kvbc::IClient; +using skvbc::messages::SKVBCGetLastBlockReply; +using skvbc::messages::SKVBCGetLastBlockRequest; +using skvbc::messages::SKVBCReadReply; +using skvbc::messages::SKVBCReadRequest; +using skvbc::messages::SKVBCReply; +using skvbc::messages::SKVBCRequest; +using skvbc::messages::SKVBCWriteReply; +using skvbc::messages::SKVBCWriteRequest; const int NUMBER_OF_KEYS = 200; const int CONFLICT_DISTANCE = 49; @@ -37,45 +42,56 @@ const int MAX_WRITES_IN_REQ = 12; const int MAX_READ_SET_SIZE_IN_REQ = 10; const int MAX_READS_IN_REQ = 12; +const size_t kMaxKVSizeToUse = sizeof(uint64_t); + +static_assert( + sizeof(uint8_t) == 1, + "code in this file for packing data of integer types into byte strings may assume uint8_t is a 1-byte type"); + namespace BasicRandomTests { -TestsBuilder::TestsBuilder(logging::Logger &logger, IClient &client) : logger_(logger), client_(client) { +TestsBuilder::TestsBuilder(logging::Logger& logger, IClient& client) : logger_(logger), client_(client) { prevLastBlockId_ = getInitialLastBlockId(); lastBlockId_ = prevLastBlockId_; LOG_INFO(logger, "TestsBuilder: initialBlockId_=" << prevLastBlockId_); } -TestsBuilder::~TestsBuilder() { - LOG_INFO(logger_, "TestsBuilder: The last DB block is " << lastBlockId_); - for (auto elem : requests_) delete[] elem; - for (auto elem : replies_) delete[] elem; -} +TestsBuilder::~TestsBuilder() { LOG_INFO(logger_, "TestsBuilder: The last DB block is " << lastBlockId_); } // When working with persistent KVB, we need to retrieve current last block-id // and all written keys before starting. BlockId TestsBuilder::getInitialLastBlockId() { - auto *request = SimpleGetLastBlockRequest::alloc(); - request->header.type = GET_LAST_BLOCK; - - size_t expectedReplySize = sizeof(SimpleReply_GetLastBlock); - std::vector reply(expectedReplySize); - uint32_t actualReplySize = 0; - - auto res = client_.invokeCommandSynch((char *)request, - sizeof(SimpleGetLastBlockRequest), + SKVBCRequest request; + request.request = SKVBCGetLastBlockRequest(); + vector serialized_request; + serialize(serialized_request, request); + + SKVBCReply reply; + reply.reply = SKVBCGetLastBlockReply(); + vector serialized_reply; + serialize(serialized_reply, reply); + size_t expected_reply_size = serialized_reply.size(); + uint32_t actual_reply_size = 0; + + static_assert( + (sizeof(*(serialized_request.data())) == sizeof(char)) && (sizeof(*(serialized_reply.data())) == sizeof(char)), + "Byte pointer type used by concord::kvbc::IClient interface is not compatible with byte pointer type used by " + "CMF."); + auto res = client_.invokeCommandSynch(reinterpret_cast(serialized_request.data()), + serialized_request.size(), true, seconds(5), - expectedReplySize, - reply.data(), - &actualReplySize); + expected_reply_size, + reinterpret_cast(serialized_reply.data()), + &actual_reply_size); ConcordAssert(res.isOK()); - auto *replyObj = (SimpleReply_GetLastBlock *)reply.data(); - LOG_INFO(logger_, "Actual reply size = " << actualReplySize << ", expected reply size = " << expectedReplySize); - ConcordAssert(actualReplySize == expectedReplySize); - ConcordAssert(replyObj->header.type == GET_LAST_BLOCK); - SimpleGetLastBlockRequest::free(request); - return replyObj->latestBlock; + LOG_INFO(logger_, "Actual reply size = " << actual_reply_size << ", expected reply size = " << expected_reply_size); + ConcordAssert(actual_reply_size == expected_reply_size); + deserialize(serialized_reply, reply); + ConcordAssert(holds_alternative(reply.reply)); + + return (get(reply.reply)).latest_block; } void TestsBuilder::retrieveExistingBlocksFromKVB() { @@ -84,41 +100,81 @@ void TestsBuilder::retrieveExistingBlocksFromKVB() { return; // KVB is not empty. Read existing blocks and save in the memory. - auto *request = SimpleReadRequest::alloc(NUMBER_OF_KEYS); - request->header.type = READ; - request->readVersion = prevLastBlockId_; - request->numberOfKeysToRead = NUMBER_OF_KEYS; - SimpleKey *requestKeys = request->keys; - - size_t expectedReplySize = SimpleReply_Read::getSize(NUMBER_OF_KEYS); - std::vector reply(expectedReplySize); + SKVBCRequest request; + request.request = SKVBCReadRequest(); + SKVBCReadRequest& read_req = get(request.request); + read_req.read_version = prevLastBlockId_; + read_req.keys.resize(NUMBER_OF_KEYS); + + SKVBCReply reply; + + // Note we make the assumption that the serialization size for a CMF message is not dependent on the value of any + // fixed-length data within the message, and that the serialization size will never decrease as the length of a vector + // within the message increases. + SKVBCReadReply maximally_sized_reply; + maximally_sized_reply.reads.resize(NUMBER_OF_KEYS); + for (auto& kvp : maximally_sized_reply.reads) { + kvp.first.resize(kMaxKVSizeToUse); + kvp.second.resize(kMaxKVSizeToUse); + } + reply.reply = maximally_sized_reply; + vector serialized_reply; + serialize(serialized_reply, reply); + size_t expected_max_reply_size = serialized_reply.size(); uint32_t actualReplySize = 0; - for (int key = 0; key < NUMBER_OF_KEYS; key++) memcpy(requestKeys[key].key, &key, sizeof(key)); + for (uint64_t key = 0; key < NUMBER_OF_KEYS; ++key) { + read_req.keys[key].resize(kMaxKVSizeToUse); + + uint8_t* key_first_byte = reinterpret_cast(&key); + uint8_t* key_last_byte = key_first_byte + sizeof(key); + +#ifdef BOOST_BIG_ENDIAN + copy(key_first_byte, key_last_byte, read_req.keys[key].data()); +#else // BOOST_BIG_ENDIAN not defined +#ifdef BOOST_LITTLE_ENDIAN + reverse_copy(key_first_byte, key_last_byte, read_req.keys[key].data()); +#else // BOOST_LITTLE_ENDIAN not defined + static_assert(false, "failed to determine the endianness being compiled for"); +#endif // if BOOST_LITTLE_ENDIAN defined/else +#endif // if BOOST_BIG_ENDIAN defined/else + } + + vector serialized_request; + serialize(serialized_request, request); + static_assert( + (sizeof(*(serialized_request.data())) == sizeof(char)) && (sizeof(*(serialized_reply.data())) == sizeof(char)), + "Byte pointer type used by concord::kvbc::IClient interface is not compatible with byte pointer type used by " + "CMF."); // Infinite timeout - auto res = client_.invokeCommandSynch( - (char *)request, request->getSize(), true, seconds(0), expectedReplySize, reply.data(), &actualReplySize); + auto res = client_.invokeCommandSynch(reinterpret_cast(serialized_request.data()), + serialized_request.size(), + true, + seconds(0), + expected_max_reply_size, + reinterpret_cast(serialized_reply.data()), + &actualReplySize); ConcordAssert(res.isOK()); - auto *replyObj = (SimpleReply_Read *)reply.data(); - __attribute__((unused)) size_t numOfItems = replyObj->numOfItems; - ConcordAssert(actualReplySize == expectedReplySize); - ConcordAssert(replyObj->header.type == READ); - ConcordAssert(numOfItems == NUMBER_OF_KEYS); + ConcordAssert(actualReplySize <= expected_max_reply_size); + serialized_reply.resize(actualReplySize); + deserialize(serialized_reply, reply); + ConcordAssert(holds_alternative(reply.reply)); + const SKVBCReadReply& read_rep = get(reply.reply); + ConcordAssert(read_rep.reads.size() == NUMBER_OF_KEYS); for (int key = 0; key < NUMBER_OF_KEYS; key++) { - SimpleKeyBlockIdPair simpleKIDPair(replyObj->items[key].simpleKey, request->readVersion); - allKeysToValueMap_[simpleKIDPair] = replyObj->items[key].simpleValue; + pair, uint64_t> simpleKIDPair(read_rep.reads[key].first, read_req.read_version); + allKeysToValueMap_[simpleKIDPair] = read_rep.reads[key].second; } - SimpleReadRequest::free(request); } void TestsBuilder::createRandomTest(size_t numOfRequests, size_t seed) { retrieveExistingBlocksFromKVB(); create(numOfRequests, seed); for (auto elem : internalBlockchain_) { - free(elem.second); + elem.second = SimpleBlock(); } } @@ -136,66 +192,58 @@ void TestsBuilder::create(size_t numOfRequests, size_t seed) { ConcordAssert(0); } - for (__attribute__((unused)) auto elem : internalBlockchain_) { - __attribute__((unused)) BlockId blockId = elem.first; - __attribute__((unused)) SimpleBlock *block = elem.second; - ConcordAssert(blockId == block->id); + for (const auto& elem : internalBlockchain_) { + BlockId blockId = elem.first; + const SimpleBlock& block = elem.second; + ConcordAssert(blockId == block.id); } } void TestsBuilder::addExpectedWriteReply(bool foundConflict) { - auto *reply = SimpleReply_ConditionalWrite::alloc(); + SKVBCReply reply; + reply.reply = SKVBCWriteReply(); + SKVBCWriteReply& write_rep = get(reply.reply); if (foundConflict) { - reply->success = false; - reply->latestBlock = lastBlockId_; + write_rep.success = false; + write_rep.latest_block = lastBlockId_; } else { - reply->success = true; - reply->latestBlock = lastBlockId_ + 1; + write_rep.success = true; + write_rep.latest_block = lastBlockId_ + 1; } - reply->header.type = COND_WRITE; - replies_.push_back((SimpleReply *)reply); + replies_.push_back(reply); } -bool TestsBuilder::lookForConflicts(BlockId readVersion, size_t numOfKeysInReadSet, SimpleKey *readKeysArray) { +bool TestsBuilder::lookForConflicts(uint64_t readVersion, const vector>& readKeysArray) { bool foundConflict = false; - BlockId i; + uint64_t i; for (i = readVersion + 1; (i <= lastBlockId_) && !foundConflict; i++) { - SimpleBlock *currBlock = internalBlockchain_[i]; - for (size_t a = 0; (a < numOfKeysInReadSet) && !foundConflict; a++) { - SimpleKV *items = currBlock->items; - for (size_t b = 0; (b < currBlock->numOfItems) && !foundConflict; b++) { - if (memcmp(readKeysArray[a].key, items[b].simpleKey.key, KV_LEN) == 0) foundConflict = true; + const SimpleBlock& currBlock = internalBlockchain_[i]; + for (size_t a = 0; (a < readKeysArray.size()) && !foundConflict; a++) { + const vector, vector>>& items = currBlock.items; + for (size_t b = 0; (b < currBlock.items.size()) && !foundConflict; b++) { + if (readKeysArray[a] == items[b].first) foundConflict = true; } } } return foundConflict; } -void TestsBuilder::addNewBlock(size_t numOfWrites, SimpleKV *writesKVArray) { +void TestsBuilder::addNewBlock(const vector, vector>>& writesKVArray) { ++lastBlockId_; - auto *newBlock = SimpleBlock::alloc(numOfWrites); - - newBlock->id = lastBlockId_; - newBlock->numOfItems = numOfWrites; - - SimpleKV *items = newBlock->items; - for (size_t i = 0; i < numOfWrites; i++) { - items[i] = writesKVArray[i]; + SimpleBlock new_block; + new_block.items = writesKVArray; - SimpleKey simpleKey; - memcpy(simpleKey.key, writesKVArray[i].simpleKey.key, KV_LEN); + new_block.id = lastBlockId_; - SimpleValue simpleValue; - memcpy(simpleValue.value, writesKVArray[i].simpleValue.value, KV_LEN); - - allKeysToValueMap_[SimpleKeyBlockIdPair(simpleKey, lastBlockId_)] = simpleValue; + for (size_t i = 0; i < writesKVArray.size(); i++) { + allKeysToValueMap_[pair, uint64_t>(writesKVArray[i].first, lastBlockId_)] = writesKVArray[i].second; } - internalBlockchain_[lastBlockId_] = newBlock; + internalBlockchain_[lastBlockId_] = new_block; } void TestsBuilder::createAndInsertRandomConditionalWrite() { // Create request - BlockId readVersion = lastBlockId_; + uint64_t readVersion = lastBlockId_; if (lastBlockId_ > prevLastBlockId_ + CONFLICT_DISTANCE) { readVersion = 0; while (readVersion < prevLastBlockId_) readVersion = lastBlockId_ - (rand() % CONFLICT_DISTANCE); @@ -204,141 +252,162 @@ void TestsBuilder::createAndInsertRandomConditionalWrite() { size_t numOfWrites = (rand() % (MAX_WRITES_IN_REQ - 1)) + 1; size_t numOfKeysInReadSet = (rand() % MAX_READ_SET_SIZE_IN_REQ); - auto *request = SimpleCondWriteRequest::alloc(numOfKeysInReadSet, numOfWrites); - request->header.type = COND_WRITE; - request->readVersion = readVersion; - request->numOfKeysInReadSet = numOfKeysInReadSet; - request->numOfWrites = numOfWrites; - SimpleKey *readKeysArray = request->readSetArray(); - SimpleKV *writesKVArray = request->keyValueArray(); + SKVBCRequest request; + request.request = SKVBCWriteRequest(); + SKVBCWriteRequest& write_req = get(request.request); + write_req.readset.resize(numOfKeysInReadSet); + write_req.writeset.resize(numOfWrites); + write_req.read_version = readVersion; for (size_t i = 0; i < numOfKeysInReadSet; i++) { - size_t key = 0; + uint64_t key = 0; do { key = rand() % NUMBER_OF_KEYS; } while (key == concord::kvbc::IBlockMetadata::kBlockMetadataKey); - memcpy(readKeysArray[i].key, &key, sizeof(key)); + write_req.readset[i].resize(kMaxKVSizeToUse); + + uint8_t* key_first_byte = reinterpret_cast(&key); + uint8_t* key_last_byte = key_first_byte + sizeof(key); + +#ifdef BOOST_BIG_ENDIAN + copy(key_first_byte, key_last_byte, write_req.readset[i].data()); +#else // BOOST_BIG_ENDIAN not defined +#ifdef BOOST_LITTLE_ENDIAN + reverse_copy(key_first_byte, key_last_byte, write_req.readset[i].data()); +#else // BOOST_LITTLE_ENDIAN not defined + static_assert(false, "failed to determine the endianness being compiled for"); +#endif // if BOOST_LITTLE_ENDIAN defined/else +#endif // if BOOST_BIG_ENDIAN defined/else } - std::set usedKeys; + std::set usedKeys; for (size_t i = 0; i < numOfWrites; i++) { - size_t key = 0; + uint64_t key = 0; do { // Avoid duplications key = rand() % NUMBER_OF_KEYS; } while (usedKeys.count(key) > 0 || key == concord::kvbc::IBlockMetadata::kBlockMetadataKey); usedKeys.insert(key); - size_t value = rand(); - memcpy(writesKVArray[i].simpleKey.key, &key, sizeof(key)); - memcpy(writesKVArray[i].simpleValue.value, &value, sizeof(value)); + uint64_t value = rand(); + + write_req.writeset[i].first.resize(kMaxKVSizeToUse); + uint8_t* key_first_byte = reinterpret_cast(&key); + uint8_t* key_last_byte = key_first_byte + sizeof(key); +#ifdef BOOST_BIG_ENDIAN + copy(key_first_byte, key_last_byte, write_req.writeset[i].first.data()); +#else // BOOST_BIG_ENDIAN not defined +#ifdef BOOST_LITTLE_ENDIAN + reverse_copy(key_first_byte, key_last_byte, write_req.writeset[i].first.data()); +#else // BOOST_LITTLE_ENDIAN not defined + static_assert(false, "failed to determine the endianness being compiled for"); +#endif // if BOOST_LITTLE_ENDIAN defined/else +#endif // if BOOST_BIG_ENDIAN defined/else + + write_req.writeset[i].second.resize(kMaxKVSizeToUse); + uint8_t* value_first_byte = reinterpret_cast(&value); + uint8_t* value_last_byte = value_first_byte + sizeof(value); +#ifdef BOOST_BIG_ENDIAN + copy(value_first_byte, value_last_byte, write_req.writeset[i].second.data()); +#else // BOOST_BIG_ENDIAN not defined +#ifdef BOOST_LITTLE_ENDIAN + reverse_copy(value_first_byte, value_last_byte, write_req.writeset[i].second.data()); +#else // BOOST_LITTLE_ENDIAN not defined + static_assert(false, "failed to determine the endianness being compiled for"); +#endif // if BOOST_LITTLE_ENDIAN defined/else +#endif // if BOOST_BIG_ENDIAN defined/else } // Add request to m_requests - requests_.push_back((SimpleRequest *)request); + requests_.push_back(request); - bool foundConflict = lookForConflicts(readVersion, numOfKeysInReadSet, readKeysArray); + bool foundConflict = lookForConflicts(readVersion, write_req.readset); // Add expected reply to m_replies addExpectedWriteReply(foundConflict); // If needed, add new block into the blockchain if (!foundConflict) { - addNewBlock(request->numOfWrites, writesKVArray); + addNewBlock(write_req.writeset); } } void TestsBuilder::createAndInsertRandomRead() { // Create request - BlockId readVersion = 0; + uint64_t readVersion = 0; if (prevLastBlockId_ == lastBlockId_) { readVersion = lastBlockId_; } else { // New blocks have been written to the DB during this run. while (readVersion <= prevLastBlockId_) readVersion = (rand() % (lastBlockId_ + 1)); } size_t numberOfKeysToRead = (rand() % (MAX_READS_IN_REQ - 1)) + 1; - auto *request = SimpleReadRequest::alloc(numberOfKeysToRead); - request->header.type = READ; - request->readVersion = readVersion; - request->numberOfKeysToRead = numberOfKeysToRead; + SKVBCRequest request; + request.request = SKVBCReadRequest(); + SKVBCReadRequest& read_req = get(request.request); + read_req.keys.resize(numberOfKeysToRead); + read_req.read_version = readVersion; - SimpleKey *requestKeys = request->keys; for (size_t i = 0; i < numberOfKeysToRead; i++) { - size_t key = 0; + uint64_t key = 0; do { key = rand() % NUMBER_OF_KEYS; } while (key == concord::kvbc::IBlockMetadata::kBlockMetadataKey); - memcpy(requestKeys[i].key, &key, sizeof(key)); + read_req.keys[i].resize(kMaxKVSizeToUse); + + uint8_t* key_first_byte = reinterpret_cast(&key); + uint8_t* key_last_byte = key_first_byte + sizeof(key); + +#ifdef BOOST_BIG_ENDIAN + copy(key_first_byte, key_last_byte, read_req.keys[i].data()); +#else // BOOST_BIG_ENDIAN not defined +#ifdef BOOST_LITTLE_ENDIAN + reverse_copy(key_first_byte, key_last_byte, read_req.keys[i].data()); +#else // BOOST_LITTLE_ENDIAN not defined + static_assert(false, "failed to determine the endianness being compiled for"); +#endif // if BOOST_LITTLE_ENDIAN defined/else +#endif // if BOOST_BIG_ENDIAN defined/else } // Add request to m_requests - requests_.push_back((SimpleRequest *)request); + requests_.push_back(request); // Compute expected reply - auto *reply = SimpleReply_Read::alloc(numberOfKeysToRead); - reply->header.type = READ; - reply->numOfItems = numberOfKeysToRead; + SKVBCReply reply; + reply.reply = SKVBCReadReply(); + SKVBCReadReply& read_rep = get(reply.reply); + read_rep.reads.resize(numberOfKeysToRead); - SimpleKV *replyItems = reply->items; for (size_t i = 0; i < numberOfKeysToRead; i++) { - memcpy(replyItems[i].simpleKey.key, requestKeys[i].key, KV_LEN); - SimpleKeyBlockIdPair simpleKIDPair(requestKeys[i], request->readVersion); + read_rep.reads[i].first = read_req.keys[i]; + pair, uint64_t> simpleKIDPair(read_req.keys[i], read_req.read_version); KeyBlockIdToValueMap::const_iterator it = allKeysToValueMap_.lower_bound(simpleKIDPair); - if (it != allKeysToValueMap_.end() && (request->readVersion >= it->first.blockId) && - (memcmp(it->first.key.key, requestKeys[i].key, KV_LEN) == 0)) { - memcpy(replyItems[i].simpleValue.value, it->second.value, KV_LEN); + if (it != allKeysToValueMap_.end() && (read_req.read_version >= it->first.second) && + (it->first.first == read_req.keys[i])) { + read_rep.reads[i].second = it->second; } else { - // Fill value by zeroes for a non-found key. - memset(replyItems[i].simpleValue.value, 0, KV_LEN); + // Make the value an empty bytestring for a non-found key. + read_rep.reads[i].second.clear(); } } // Add reply to m_replies - replies_.push_back((SimpleReply *)reply); + replies_.push_back(reply); } void TestsBuilder::createAndInsertGetLastBlock() { // Create request - auto *request = SimpleGetLastBlockRequest::alloc(); - request->header.type = GET_LAST_BLOCK; + SKVBCRequest request; + request.request = SKVBCGetLastBlockRequest(); // Add request to m_requests - requests_.push_back((SimpleRequest *)request); + requests_.push_back(request); // compute expected reply - auto *reply = SimpleReply_GetLastBlock::alloc(); - reply->header.type = GET_LAST_BLOCK; - reply->latestBlock = lastBlockId_; + SKVBCReply reply; + reply.reply = SKVBCGetLastBlockReply(); + (get(reply.reply)).latest_block = lastBlockId_; // Add reply to m_replies - replies_.push_back((SimpleReply *)reply); -} - -size_t TestsBuilder::sizeOfRequest(SimpleRequest *request) { - switch (request->type) { - case COND_WRITE: - return ((SimpleCondWriteRequest *)request)->getSize(); - case READ: - return ((SimpleReadRequest *)request)->getSize(); - case GET_LAST_BLOCK: - return sizeof(SimpleRequest); - default: - ConcordAssert(0); - } - return 0; -} - -size_t TestsBuilder::sizeOfReply(SimpleReply *reply) { - switch (reply->type) { - case COND_WRITE: - return sizeof(SimpleReply_ConditionalWrite); - case READ: - return ((SimpleReply_Read *)reply)->getSize(); - case GET_LAST_BLOCK: - return sizeof(SimpleReply_GetLastBlock); - default: - ConcordAssert(0); - } - return 0; + replies_.push_back(reply); } } // namespace BasicRandomTests diff --git a/tests/simpleKVBC/simpleKVBTestsBuilder.hpp b/tests/simpleKVBC/simpleKVBTestsBuilder.hpp index 0bad4699ec..f6daef022a 100644 --- a/tests/simpleKVBC/simpleKVBTestsBuilder.hpp +++ b/tests/simpleKVBC/simpleKVBTestsBuilder.hpp @@ -1,6 +1,6 @@ // Concord // -// Copyright (c) 2018-2019 VMware, Inc. All Rights Reserved. +// Copyright (c) 2018-2021 VMware, Inc. All Rights Reserved. // // This product is licensed to you under the Apache 2.0 license (the "License"). // You may not use this product except in compliance with the Apache 2.0 @@ -15,314 +15,28 @@ #include #include -#include -#include #include "kv_types.hpp" #include "KVBCInterfaces.h" #include "Logger.hpp" +#include "skvbc_messages.cmf.hpp" namespace BasicRandomTests { -const int KV_LEN = 21; - -#pragma pack(push, 1) - -struct SimpleKey { - char key[KV_LEN]; -}; - -struct SimpleValue { - char value[KV_LEN]; -}; - -struct SimpleKV { - SimpleKey simpleKey; - SimpleValue simpleValue; -}; - struct SimpleBlock { - concord::kvbc::BlockId id = 0; - size_t numOfItems = 0; - SimpleKV items[1]; - - static SimpleBlock* alloc(size_t items) { - size_t blockSize = sizeof(SimpleBlock) + sizeof(SimpleKV) * (items - 1); - char* buf = new char[blockSize]; - memset(buf, 0, blockSize); - return (SimpleBlock*)buf; - } - - static void free(SimpleBlock* buf) { delete[] buf; } -}; - -enum RequestType : char { - NONE = 0, - READ = 1, - COND_WRITE = 2, - GET_LAST_BLOCK = 3, - GET_BLOCK_DATA = 4, - LONG_EXEC_COND_WRITE = 5, -}; - -struct SimpleRequest { - RequestType type = {NONE}; -}; - -struct SimpleGetLastBlockRequest { - static SimpleGetLastBlockRequest* alloc() { - size_t reqSize = sizeof(SimpleGetLastBlockRequest); - char* buf = new char[reqSize]; - memset(buf, 0, reqSize); - return (SimpleGetLastBlockRequest*)buf; - } - - static void free(SimpleGetLastBlockRequest* buf) { delete[] buf; } - - SimpleRequest header; + uint64_t id = 0; + std::vector, std::vector>> items; }; -// A SimpleGetBlockDataRequest returns a read response, except -// all keys are for the specific block requested. -struct SimpleGetBlockDataRequest { - static SimpleGetBlockDataRequest* alloc() { - size_t size = sizeof(SimpleGetBlockDataRequest); - char* pBuf = new char[size]; - memset(pBuf, 0, size); - return (SimpleGetBlockDataRequest*)(pBuf); - } - static void free(SimpleGetBlockDataRequest* p) { delete[] p; } - static size_t size() { return sizeof(SimpleGetBlockDataRequest); } - - SimpleRequest h; - concord::kvbc::BlockId block_id; -}; - -struct SimpleCondWriteRequest { - static SimpleCondWriteRequest* alloc(size_t numOfKeysInReadSet, size_t numOfWrites) { - size_t reqSize = getSize(numOfKeysInReadSet, numOfWrites); - char* buf = new char[reqSize]; - memset(buf, 0, reqSize); - return (SimpleCondWriteRequest*)buf; - } - - static void free(SimpleCondWriteRequest* buf) { delete[] buf; } - - static size_t getSize(size_t numOfKeysInReadSet, size_t numOfWrites) { - return sizeof(SimpleCondWriteRequest) + numOfKeysInReadSet * sizeof(SimpleKey) + numOfWrites * sizeof(SimpleKV); - } - - size_t getSize() { return getSize(numOfKeysInReadSet, numOfWrites); } - - SimpleKey* readSetArray() { return (SimpleKey*)(((char*)this) + sizeof(SimpleCondWriteRequest)); } - - SimpleKV* keyValueArray() { - return (SimpleKV*)(((char*)this) + sizeof(SimpleCondWriteRequest) + numOfKeysInReadSet * sizeof(SimpleKey)); - } - - SimpleRequest header; - concord::kvbc::BlockId readVersion = 0; - size_t numOfKeysInReadSet = 0; - size_t numOfWrites = 0; -}; - -struct SimpleReadRequest { - static SimpleReadRequest* alloc(size_t numOfKeysToRead) { - size_t reqSize = sizeof(SimpleReadRequest) + (sizeof(SimpleKey) * (numOfKeysToRead - 1)); - char* buf = new char[reqSize]; - memset(buf, 0, reqSize); - return (SimpleReadRequest*)buf; - } - - static void free(SimpleReadRequest* buf) { delete[] buf; } - - static size_t getSize(size_t numOfKeysToRead) { - return sizeof(SimpleReadRequest) + (numOfKeysToRead - 1) * sizeof(SimpleKey); - } - - size_t getSize() { return getSize(numberOfKeysToRead); } - SimpleKey* keysArray() { return ((SimpleKey*)keys); } - - SimpleRequest header; - concord::kvbc::BlockId readVersion = 0; // If 0, read from the latest version - size_t numberOfKeysToRead = 0; - SimpleKey keys[1]; -}; - -struct SimpleReply { - RequestType type = {NONE}; -}; - -struct SimpleReply_ConditionalWrite { - static SimpleReply_ConditionalWrite* alloc() { - size_t repSize = sizeof(SimpleReply_ConditionalWrite); - char* buf = new char[repSize]; - memset(buf, 0, repSize); - return (SimpleReply_ConditionalWrite*)buf; - } - - bool isEquiv(SimpleReply_ConditionalWrite& other, std::ostringstream& error) { - if (header.type != other.header.type) { - error << "*** Write: Wrong message type: " << other.header.type; - } else if (latestBlock != other.latestBlock) { - error << "*** Write: Wrong latestBlock: " << other.latestBlock << ", expected: " << latestBlock; - } else if (success != other.success) { - error << "*** Write: Wrong result: " << other.success; - } else { - return true; - } - return false; - } - - static void free(SimpleReply_ConditionalWrite* buf) { delete[] buf; } - - SimpleReply header; - bool success = false; - concord::kvbc::BlockId latestBlock = 0; -}; - -struct SimpleReply_Read { - static size_t getSize(size_t numOfItems) { - size_t size = sizeof(SimpleReply_Read) + (sizeof(SimpleKV) * (numOfItems - 1)); - return size; - } - - size_t getSize() { return getSize(numOfItems); } - - static SimpleReply_Read* alloc(size_t numOfItems) { - size_t size = sizeof(SimpleReply_Read) + (sizeof(SimpleKV) * (numOfItems - 1)); - char* buf = new char[size]; - memset(buf, 0, size); - return (SimpleReply_Read*)buf; - } - - bool isEquiv(SimpleReply_Read& other, std::ostringstream& error) { - if (header.type != other.header.type) { - error << "*** READ: Wrong message type: " << other.header.type; - return false; - } - if (numOfItems != other.numOfItems) { - error << "*** READ: Wrong numOfItems: " << other.numOfItems << ", expected: " << numOfItems; - return false; - } - SimpleKV* itemPtr = items; - SimpleKV* otherItemPtr = other.items; - for (size_t i = 0; i < numOfItems; i++) { - if (memcmp(itemPtr->simpleKey.key, otherItemPtr->simpleKey.key, sizeof(itemPtr->simpleKey.key)) != 0) { - error << "*** READ: Key for item number " << i << " is wrong"; - return false; - } - if (memcmp(itemPtr->simpleValue.value, otherItemPtr->simpleValue.value, sizeof(itemPtr->simpleValue.value)) != - 0) { - error << "*** READ: Value for item number " << i << " is wrong"; - return false; - } - ++itemPtr; - ++otherItemPtr; - } - return true; - } - - static void free(SimpleReply_Read* buf) { delete[] buf; } - - SimpleReply header; - size_t numOfItems = 0; - SimpleKV items[1]; -}; - -struct SimpleReply_GetLastBlock { - static SimpleReply_GetLastBlock* alloc() { - size_t repSize = sizeof(SimpleReply_GetLastBlock); - char* buf = new char[repSize]; - memset(buf, 0, repSize); - return (SimpleReply_GetLastBlock*)buf; - } - - bool isEquiv(SimpleReply_GetLastBlock& other, std::ostringstream& error) { - if (header.type != other.header.type) { - error << "*** GetLastBlock: Wrong message type: " << other.header.type; - } else if (latestBlock != other.latestBlock) { - error << "*** GetLastBlock: Wrong latestBlock: " << other.latestBlock << ", expected: " << latestBlock; - } else { - return true; - } - return false; - } - - static void free(SimpleReply_GetLastBlock* buf) { delete[] buf; } - - SimpleReply header; - concord::kvbc::BlockId latestBlock = 0; -}; - -struct SimpleReply_HaveYouStopped { - size_t getSize() { return sizeof(SimpleReply_HaveYouStopped); } - - static SimpleReply_Read* alloc(size_t numOfItems) { - size_t size = sizeof(SimpleReply_Read); - char* buf = new char[size]; - memset(buf, 0, size); - return (SimpleReply_Read*)buf; - } - - bool isEquiv(SimpleReply_HaveYouStopped& other, std::ostringstream& error) { - if (header.type != other.header.type) { - error << "*** WEDGE: Wrong message type: " << other.header.type; - return false; - } - if (stopped != other.stopped) { - error << "*** WEDGE: Wrong stopeed indication: " << other.stopped; - return false; - } - - return true; - } - - static void free(SimpleReply_HaveYouStopped* buf) { delete[] buf; } - - SimpleReply header; - int64_t stopped; -}; - -#pragma pack(pop) - -class SimpleKeyBlockIdPair // Represents -{ - public: - const SimpleKey key; - const concord::kvbc::BlockId blockId; - - SimpleKeyBlockIdPair(const SimpleKey& simpleKey, concord::kvbc::BlockId bId) : key(simpleKey), blockId(bId) {} - - bool operator<(const SimpleKeyBlockIdPair& other) const { - int c = memcmp((char*)&this->key, (char*)&other.key, sizeof(SimpleKey)); - if (c == 0) - return this->blockId > other.blockId; - else - return (c < 0); - } - - bool operator==(const SimpleKeyBlockIdPair& other) const { - if (this->blockId != other.blockId) return false; - int c = memcmp((char*)&this->key, (char*)&other.key, sizeof(SimpleKey)); - return (c == 0); - } -}; - -typedef std::map KeyBlockIdToValueMap; -typedef std::list RequestsList; -typedef std::list RepliesList; +typedef std::map, uint64_t>, std::vector> KeyBlockIdToValueMap; class TestsBuilder { public: explicit TestsBuilder(logging::Logger& logger, concord::kvbc::IClient& client); ~TestsBuilder(); - static size_t sizeOfRequest(SimpleRequest* req); - static size_t sizeOfReply(SimpleReply* rep); - void createRandomTest(size_t numOfRequests, size_t seed); - RequestsList getRequests() { return requests_; } - RepliesList getReplies() { return replies_; } + std::list getRequests() { return requests_; } + std::list getReplies() { return replies_; } private: void create(size_t numOfRequests, size_t seed); @@ -330,20 +44,20 @@ class TestsBuilder { void createAndInsertRandomRead(); void createAndInsertGetLastBlock(); void addExpectedWriteReply(bool foundConflict); - bool lookForConflicts(concord::kvbc::BlockId readVersion, size_t numOfKeysInReadSet, SimpleKey* readKeysArray); - void addNewBlock(size_t numOfWrites, SimpleKV* writesKVArray); + bool lookForConflicts(uint64_t readVersion, const std::vector>& readKeysArray); + void addNewBlock(const std::vector, std::vector>>& writesKVArray); void retrieveExistingBlocksFromKVB(); - concord::kvbc::BlockId getInitialLastBlockId(); + uint64_t getInitialLastBlockId(); private: logging::Logger& logger_; concord::kvbc::IClient& client_; - RequestsList requests_; - RepliesList replies_; - std::map internalBlockchain_; + std::list requests_; + std::list replies_; + std::map internalBlockchain_; KeyBlockIdToValueMap allKeysToValueMap_; - concord::kvbc::BlockId prevLastBlockId_ = 0; - concord::kvbc::BlockId lastBlockId_ = 0; + uint64_t prevLastBlockId_ = 0; + uint64_t lastBlockId_ = 0; }; } // namespace BasicRandomTests