Skip to content

Commit

Permalink
Merge pull request #1841 from upshaw-alex/migrate-skvbc-to-cmf-messages
Browse files Browse the repository at this point in the history
Migrate SKVBC to use CMF messages
  • Loading branch information
upshaw-alex authored Aug 25, 2021
2 parents 8ccfdbd + 558817d commit 48c2d26
Show file tree
Hide file tree
Showing 9 changed files with 552 additions and 739 deletions.
136 changes: 54 additions & 82 deletions tests/apollo/util/skvbc.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Concord
#
# Copyright (c) 2019 VMware, Inc. All Rights Reserved.
# Copyright (c) 2019-2021 VMware, Inc. All Rights Reserved.
#
# This product is licensed to you under the Apache 2.0 license (the "License").
# You may not use this product except in compliance with the Apache 2.0 License.
Expand All @@ -10,7 +10,6 @@
# terms and conditions of the subcomponent's license, as noted in the LICENSE
# file.

import struct
import copy
import random
import trio
Expand All @@ -22,21 +21,26 @@
from util import bft
from enum import Enum

import os.path
import sys
sys.path.append(os.path.abspath("../../build/tests/apollo/util/"))
import skvbc_messages

WriteReply = namedtuple('WriteReply', ['success', 'last_block_id'])

class ExitPolicy(Enum):
COUNT = 0
TIME = 1

class SimpleKVBCProtocol:
KV_LEN = 21 ## SimpleKVBC requies fixed size keys and values right now
READ_LATEST = 0xFFFFFFFFFFFFFFFF
# Length to use for randomly generated key and value strings; this length is
# effectively arbitrary as the SimpleKVBC protocol permits key and value
# bytestrings of any length (usually up to some practical
# implementation-imposed upper length limits).
LENGTH_FOR_RANDOM_KVS = 21

READ = 1
WRITE = 2
GET_LAST_BLOCK = 3
GET_BLOCK_DATA = 4
LONG_EXEC_WRITE = 5
# Maximum value for a 64-bit unsigned integer.
READ_LATEST = 0xFFFFFFFFFFFFFFFF

"""
An implementation of the wire protocol for SimpleKVBC requests.
Expand All @@ -55,93 +59,61 @@ def __init__(self, bft_network, tracker = None, pre_process=False):

@classmethod
def write_req(cls, readset, writeset, block_id, long_exec=False):
data = bytearray()
# A conditional write request type
if long_exec is True:
data.append(cls.LONG_EXEC_WRITE)
else:
data.append(cls.WRITE)
# SimpleConditionalWriteHeader
data.extend(
struct.pack("<QQQ", block_id, len(readset), len(writeset)))
# SimpleKey[numberOfKeysInReadSet]
for r in readset:
data.extend(r)
# SimpleKV[numberOfWrites]
for kv in writeset:
data.extend(kv[0])
data.extend(kv[1])

return data
write_request = skvbc_messages.SKVBCWriteRequest()
write_request.read_version = block_id
write_request.long_exec = long_exec
write_request.readset = list(readset)
write_request.writeset = writeset
request = skvbc_messages.SKVBCRequest()
request.request = write_request
return request.serialize()

@classmethod
def read_req(cls, readset, block_id=READ_LATEST):
data = bytearray()
data.append(cls.READ)
# SimpleReadHeader
data.extend(struct.pack("<QQ", block_id, len(readset)))
# SimpleKey[numberOfKeysToRead]
for r in readset:
data.extend(r)
return data
read_request = skvbc_messages.SKVBCReadRequest()
read_request.read_version = block_id
read_request.keys = list(readset)
request = skvbc_messages.SKVBCRequest()
request.request = read_request
return request.serialize()

@classmethod
def get_last_block_req(cls):
data = bytearray()
data.append(cls.GET_LAST_BLOCK)
return data
request = skvbc_messages.SKVBCRequest()
request.request = skvbc_messages.SKVBCGetLastBlockRequest()
return request.serialize()

@classmethod
def get_block_data_req(cls, block_id):
data = bytearray()
data.append(cls.GET_BLOCK_DATA)
data.extend(struct.pack("<Q", block_id))
return data
get_block_data_request = skvbc_messages.SKVBCGetBlockDataRequest()
get_block_data_request.block_id = block_id
request = skvbc_messages.SKVBCRequest()
request.request = get_block_data_request
return request.serialize()

@classmethod
def parse_reply(cls, data):
reply_type = data[0]
if reply_type == cls.WRITE:
return cls.parse_write_reply(data[1:])
elif reply_type == cls.READ:
return cls.parse_read_reply(data[1:])
elif reply_type == cls.GET_LAST_BLOCK:
return cls.parse_get_last_block_reply(data[1:])
(reply, offset) = skvbc_messages.SKVBCReply.deserialize(data)
if isinstance(reply.reply, skvbc_messages.SKVBCWriteReply):
return WriteReply(reply.reply.success, reply.reply.latest_block)
elif isinstance(reply.reply, skvbc_messages.SKVBCReadReply):
return dict(reply.reply.reads)
elif isinstance(reply.reply, skvbc_messages.SKVBCGetLastBlockReply):
return reply.reply.latest_block
else:
raise BadReplyError

@staticmethod
def parse_write_reply(data):
return WriteReply._make(struct.unpack("<?Q", data))

@classmethod
def parse_read_reply(cls, data):
num_kv_pairs = struct.unpack("<Q", data[0:8])[0]
data = data[8:]
kv_pairs = {}
for i in range(num_kv_pairs):
kv_pairs[data[0:cls.KV_LEN]] = data[cls.KV_LEN:2 * cls.KV_LEN]
if i + 1 != num_kv_pairs:
data = data[2 * cls.KV_LEN:]
return kv_pairs

@staticmethod
def parse_get_last_block_reply(data):
return struct.unpack("<Q", data)[0]

@staticmethod
def parse_have_you_stopped_reply(data):
with log.start_action(action_type="parse_have_you_stopped_reply"):
return struct.unpack("<q", data)[0]

def initial_state(self):
"""Return a dict with KV_LEN zero byte values for all keys"""
"""
Return a dict with empty byte strings (which is the value SKVBC will
report for any key that has never been written to) for all keys
"""
with log.start_action(action_type="initial_state"):
all_zeros = b''.join([b'\x00' for _ in range(0, self.KV_LEN)])
return dict([(k, all_zeros) for k in self.keys])
empty_byte_string = b''
return dict([(k, empty_byte_string) for k in self.keys])

def random_value(self):
return bytes(random.sample(self.alphanum, self.KV_LEN))
return bytes(random.sample(self.alphanum, self.LENGTH_FOR_RANDOM_KVS))

def random_values(self, n):
return [self.random_value() for _ in range(0, n)]
Expand All @@ -159,15 +131,15 @@ def unique_random_key(self):
from a list of pre-generated keys. Use a prefix of '1' so that every key
is different than keys pre-generated by _create_keys().
"""
unique_random = bytes(random.sample(self.alphanum, self.KV_LEN - 1))
unique_random = bytes(random.sample(self.alphanum, self.LENGTH_FOR_RANDOM_KVS - 1))
return b'1' + unique_random

@classmethod
def max_key(cls):
"""
Return the maximum possible key according to the schema in _create_keys.
"""
return b''.join([b'Z' for _ in range(0, cls.KV_LEN)])
return b''.join([b'Z' for _ in range(0, cls.LENGTH_FOR_RANDOM_KVS)])

async def send_indefinite_write_requests(self, client=None, delay=.1):
with log.start_action(action_type="send_indefinite_write_requests"):
Expand Down Expand Up @@ -361,7 +333,7 @@ def _create_keys(self):
value reaches 'Z', a new character is appended and the sequence starts
over again.
Since all keys must be KV_LEN bytes long, they are extended with '.'
All keys are extended with '.' to LENGTH_FOR_RANDOM_KVS bytes with '.'
characters.
"""
with log.start_action(action_type="_create_keys"):
Expand All @@ -377,8 +349,8 @@ def _create_keys(self):
else:
cur[-1] = end + 1
key = copy.deepcopy(cur)
# Extend the key to be KV_LEN bytes
key.extend([ord('.') for _ in range(self.KV_LEN - len(cur))])
# Extend the key to be LENGTH_FOR_RANDOM_KVS bytes
key.extend([ord('.') for _ in range(self.LENGTH_FOR_RANDOM_KVS - len(cur))])
keys.append(bytes(key))

return keys
Expand Down
2 changes: 1 addition & 1 deletion tests/simpleKVBC/TesterClient/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ if(${USE_COMM_TLS_TCP})
target_compile_definitions(skvbc_client PUBLIC USE_COMM_TLS_TCP)
endif()

target_link_libraries(skvbc_client PUBLIC kvbc corebft threshsign util test_config_lib )
target_link_libraries(skvbc_client PUBLIC kvbc corebft threshsign util test_config_lib skvbc_messages_cmf)

target_include_directories(skvbc_client PUBLIC ..)
target_include_directories(skvbc_client PUBLIC ../..)
Expand Down
2 changes: 1 addition & 1 deletion tests/simpleKVBC/TesterReplica/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ if(BUILD_ROCKSDB_STORAGE)
target_compile_definitions(skvbc_replica PUBLIC "USE_ROCKSDB=1")
endif()

target_link_libraries(skvbc_replica PUBLIC kvbc corebft threshsign util test_config_lib stdc++fs)
target_link_libraries(skvbc_replica PUBLIC kvbc corebft threshsign util test_config_lib stdc++fs skvbc_messages_cmf)

target_include_directories(skvbc_replica PUBLIC ..)
target_include_directories(skvbc_replica PUBLIC ../..)
Expand Down
Loading

0 comments on commit 48c2d26

Please sign in to comment.