Skip to content

Commit

Permalink
Enable returning errors in reply messages to the client. (#2136)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuliasherman authored Dec 16, 2021
1 parent eeced05 commit b4ba1fc
Show file tree
Hide file tree
Showing 12 changed files with 50 additions and 35 deletions.
2 changes: 1 addition & 1 deletion bftclient/include/bftclient/bft_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class Client {
Reply send(const ReadConfig& config, Msg&& request);
SeqNumToReplyMap sendBatch(std::deque<WriteRequest>& write_requests, const std::string& cid);

// Return true if the client has at least num_replicas_required active replica connecions.
// Return true if the client has at least num_replicas_required active replica connections.
bool isServing(int num_replicas, int num_replicas_required) const;

// Useful for testing. Shouldn't be relied on in production.
Expand Down
1 change: 1 addition & 0 deletions bftengine/include/bftengine/ClientMsgs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ struct ClientReplyMsgHeader {
uint32_t spanContextSize = 0u;
uint16_t currentPrimaryId;
uint64_t reqSeqNum;
uint32_t result; // Request execution result

// Reply length is the total length of the reply, including any replica specific info.
uint32_t replyLength;
Expand Down
2 changes: 1 addition & 1 deletion bftengine/include/bftengine/IRequestHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class IRequestsHandler {
uint64_t requestSequenceNum = executionSequenceNum;
uint32_t outActualReplySize = 0;
uint32_t outReplicaSpecificInfoSize = 0;
int outExecutionStatus = 1;
uint32_t outExecutionStatus = 1;
uint64_t blockId = 0;
};

Expand Down
10 changes: 5 additions & 5 deletions bftengine/include/bftengine/SimpleClient.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ enum ClientMsgFlag : uint8_t {
EMPTY_CLIENT_REQ = 0x10,
};

enum OperationResult : int8_t { SUCCESS, NOT_READY, TIMEOUT, BUFFER_TOO_SMALL, INVALID_REQUEST };
// Call back for request - at this point we know for sure that a client is handling the request so we can assure that we
// will have reply. This callback will be attached to the client reply struct and whenever we get the reply from, the
// bft client we will activate the callback.
typedef std::variant<int, bft::client::Reply> SendResult;
enum OperationResult : uint32_t { SUCCESS, INVALID_REQUEST, NOT_READY, TIMEOUT, BUFFER_TOO_SMALL, EMPTY_EXEC_RESULT };
// Call back for request - at this point we know for sure that a client is handling the request, so we can assure that
// we will have reply. This callback will be attached to the client reply struct and whenever we get the reply from,
// the bft client we will activate the callback.
typedef std::variant<uint32_t, bft::client::Reply> SendResult;
typedef std::function<void(SendResult&&)> RequestCallBack;

struct ClientRequest {
Expand Down
29 changes: 16 additions & 13 deletions bftengine/src/bftengine/messages/ClientReplyMsg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,33 @@ namespace impl {

ClientReplyMsg::ClientReplyMsg(ReplicaId primaryId, ReqId reqSeqNum, ReplicaId replicaId)
: MessageBase(replicaId, MsgCode::ClientReply, ReplicaConfig::instance().getmaxExternalMessageSize()) {
b()->reqSeqNum = reqSeqNum;
b()->currentPrimaryId = primaryId;
b()->replyLength = 0;
b()->replicaSpecificInfoLength = 0;
setHeaderParameters(primaryId, reqSeqNum, 0, 0);
setMsgSize(sizeof(ClientReplyMsgHeader));
}

ClientReplyMsg::ClientReplyMsg(ReplicaId replicaId, ReqId reqSeqNum, char* reply, uint32_t replyLength)
: MessageBase(replicaId, MsgCode::ClientReply, sizeof(ClientReplyMsgHeader) + replyLength) {
b()->reqSeqNum = reqSeqNum;
b()->currentPrimaryId = 0;
b()->replyLength = replyLength;

setHeaderParameters(0, reqSeqNum, replyLength, 0);
memcpy(body() + sizeof(ClientReplyMsgHeader), reply, replyLength);
setMsgSize(sizeof(ClientReplyMsgHeader) + replyLength);
}

ClientReplyMsg::ClientReplyMsg(ReplicaId replicaId, uint32_t replyLength)
: MessageBase(replicaId, MsgCode::ClientReply, sizeof(ClientReplyMsgHeader) + replyLength) {
b()->reqSeqNum = 0;
b()->currentPrimaryId = 0;
b()->replyLength = replyLength;
setHeaderParameters(0, 0, replyLength, 0);
}

setMsgSize(sizeof(ClientReplyMsgHeader) + replyLength);
// Reply with no data; returns an error to the client
ClientReplyMsg::ClientReplyMsg(ReplicaId primaryId, ReqId reqSeqNum, ReplicaId replicaId, uint32_t result)
: MessageBase(replicaId, MsgCode::ClientReply, sizeof(ClientReplyMsgHeader)) {
setHeaderParameters(primaryId, reqSeqNum, 0, result);
}

void ClientReplyMsg::setHeaderParameters(ReplicaId primaryId, ReqId reqSeqNum, uint32_t replyLength, uint32_t result) {
b()->currentPrimaryId = primaryId;
b()->reqSeqNum = reqSeqNum;
b()->replyLength = replyLength;
b()->result = result;
b()->replicaSpecificInfoLength = 0;
}

void ClientReplyMsg::setReplyLength(uint32_t replyLength) {
Expand Down
9 changes: 8 additions & 1 deletion bftengine/src/bftengine/messages/ClientReplyMsg.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ class ClientReplyMsg : public MessageBase {
static_assert(sizeof(ClientReplyMsgHeader::msgType) == sizeof(MessageBase::Header::msgType), "");
static_assert(sizeof(ClientReplyMsgHeader::reqSeqNum) == sizeof(ReqId), "");
static_assert(sizeof(ClientReplyMsgHeader::currentPrimaryId) == sizeof(ReplicaId), "");
static_assert(sizeof(ClientReplyMsgHeader) == 24, "ClientRequestMsgHeader is 24B");
static_assert(sizeof(ClientReplyMsgHeader::result) == sizeof(uint32_t), "");
static_assert(sizeof(ClientReplyMsgHeader) == 28, "ClientRequestMsgHeader is 28B");

public:
ClientReplyMsg(ReplicaId primaryId, ReqId reqSeqNum, ReplicaId replicaId);

ClientReplyMsg(ReplicaId replicaId, ReqId reqSeqNum, char* reply, uint32_t replyLength);

ClientReplyMsg(ReplicaId primaryId, ReqId reqSeqNum, ReplicaId replicaId, uint32_t result);

ClientReplyMsg(ReplicaId replicaId, uint32_t replyLength);

uint32_t maxReplyLength() const { return internalStorageSize() - sizeof(ClientReplyMsgHeader); }
Expand All @@ -56,6 +59,10 @@ class ClientReplyMsg : public MessageBase {
void setMsgSize(MsgSize size) { MessageBase::setMsgSize(size); }

ClientReplyMsgHeader* b() const { return (ClientReplyMsgHeader*)msgBody_; }

private:
void setHeaderParameters(ReplicaId primaryId, ReqId reqSeqNum, uint32_t replyLength, uint32_t result);
};

} // namespace impl
} // namespace bftEngine
3 changes: 0 additions & 3 deletions bftengine/src/bftengine/messages/MessageBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,6 @@ void MessageBase::setMsgSize(MsgSize size) {
ConcordAssert((msgBody_ != nullptr));
ConcordAssert(size <= storageSize_);

// TODO(GG): do we need to reset memory here?
if (storageSize_ > size) memset(body() + size, 0, (storageSize_ - size));

msgSize_ = size;
}

Expand Down
2 changes: 1 addition & 1 deletion client/clientservice/src/request_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Status RequestServiceImpl::Send(ServerContext* context, const Request* proto_req
auto callback = [&](cc::SendResult&& send_result) {
if (not std::holds_alternative<bft::client::Reply>(send_result)) {
LOG_INFO(logger_, "Send returned error");
switch (std::get<int>(send_result)) {
switch (std::get<uint32_t>(send_result)) {
case (concord_client_pool::Overloaded):
status.set_value(grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, "All clients occupied"));
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ struct SendError {
};
ErrorType type;
};
typedef std::variant<int, bft::client::Reply> SendResult;

typedef std::variant<uint32_t, bft::client::Reply> SendResult;

struct ReplicaInfo {
bft::client::ReplicaId id;
Expand Down
2 changes: 2 additions & 0 deletions kvbc/src/ClientImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ Status ClientImp::invokeCommandSynch(const char* request,
return Status::InvalidArgument("Specified output buffer is too small");
case INVALID_REQUEST:
return Status::InvalidArgument("Request is invalid");
case EMPTY_EXEC_RESULT:
return Status::GeneralError("Execution result is empty");
}
return Status::GeneralError("Unknown error");
}
Expand Down
8 changes: 4 additions & 4 deletions util/pyclient/bft_msgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def __init__(self, message):
# Little Endian format with no padding
# We don't include the msg type here, since we have to read it first to
# understand what message is incoming.
REPLY_HEADER_FMT = "<LHQLL"
REPLY_HEADER_FMT = "<LHQLLL"
REPLY_HEADER_SIZE = struct.calcsize(REPLY_HEADER_FMT)

RequestHeader = namedtuple('RequestHeader', ['span_context_size', 'client_id', 'flags',
Expand All @@ -61,7 +61,7 @@ def __init__(self, message):
#
# Replica specific information is not used yet, so rsi_length is always 0
ReplyHeader = namedtuple('ReplyHeader', ['span_context_size', 'primary_id',
'req_seq_num', 'length', 'rsi_length'])
'req_seq_num', 'result', 'length', 'rsi_length'])

def pack_request(client_id, req_seq_num, read_only, timeout_milli, cid, msg, pre_process=False, reconfiguration=False,
span_context=b'', signature=b''):
Expand Down Expand Up @@ -125,13 +125,13 @@ def unpack_request_header(data):
return RequestHeader._make(struct.unpack(REQUEST_HEADER_FMT,
data[MSG_TYPE_SIZE:end]))

def pack_reply(primary_id, req_seq_num, msg, rsi_length=0):
def pack_reply(primary_id, req_seq_num, msg, result=0, rsi_length=0):
"""
Take message information and a message and return a construct a buffer
containing a serialized reply header and message.
"""

header = ReplyHeader(0, primary_id, req_seq_num, len(msg), rsi_length)
header = ReplyHeader(0, primary_id, req_seq_num, result, len(msg), rsi_length)
return b''.join([pack_reply_header(header), msg])

def pack_reply_header(header):
Expand Down
14 changes: 9 additions & 5 deletions util/pyclient/test_msgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

class TestRepliesManager(unittest.TestCase):

def _build_msg(self, msg, primary_id=0, req_seq_num=1, rsi_length=0):
return bft_msgs.pack_reply(primary_id, req_seq_num, msg, rsi_length)
def _build_msg(self, msg, primary_id=0, req_seq_num=1, result=0, rsi_length=0):
return bft_msgs.pack_reply(primary_id, req_seq_num, msg, result, rsi_length)

def test_add_message_to_manager(self):
replies_manager = rsi.RepliesManager()
Expand Down Expand Up @@ -105,7 +105,7 @@ def test_create_empty_rsi_message(self):
msg = b'hello'
primary_id = 0
req_seq_num = 1
packed = bft_msgs.pack_reply(primary_id, req_seq_num, msg, 0)
packed = bft_msgs.pack_reply(primary_id, req_seq_num, msg, 0, 0)
rsi_reply = rsi.MsgWithReplicaSpecificInfo(packed, 0)
self.assertEqual(rsi_reply.sender_id, 0)
common_header, common_data = rsi_reply.get_common_reply()
Expand All @@ -116,7 +116,7 @@ def test_create_non_empty_rsi_message(self):
msg = b'hellorsi'
primary_id = 0
req_seq_num = 1
packed = bft_msgs.pack_reply(primary_id, req_seq_num, msg, 3)
packed = bft_msgs.pack_reply(primary_id, req_seq_num, msg, 0, 3)
rsi_reply = rsi.MsgWithReplicaSpecificInfo(packed, 0)
self.assertEqual(rsi_reply.sender_id, 0)
common_header, common_data = rsi_reply.get_common_reply()
Expand All @@ -131,10 +131,14 @@ def test_unpack_reply(self):
msg = b'hello'
primary_id = 0
req_seq_num = 1
packed = bft_msgs.pack_reply(primary_id, req_seq_num, msg)
result = 1
rsi_length = 5
packed = bft_msgs.pack_reply(primary_id, req_seq_num, msg, result, rsi_length)
(header, unpacked_msg) = bft_msgs.unpack_reply(packed)
self.assertEqual(primary_id, header.primary_id)
self.assertEqual(req_seq_num, header.req_seq_num)
self.assertEqual(result, header.result)
self.assertEqual(rsi_length, header.rsi_length)
self.assertEqual(msg, unpacked_msg)

def test_unpack_request(self):
Expand Down

0 comments on commit b4ba1fc

Please sign in to comment.