Skip to content

Commit

Permalink
feat: refactor domain objects deserialization
Browse files Browse the repository at this point in the history
  • Loading branch information
fpelliccioni committed Oct 18, 2024
1 parent 9bde3e9 commit 106452d
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 62 deletions.
81 changes: 68 additions & 13 deletions include/kth/network/message_subscriber.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,43 @@ class BCT_API message_subscriber : noncopyable {
* @param[in] subscriber The subscriber for the message type.
* @return Returns error::bad_stream if failed.
*/
// template <typename Message, typename Subscriber>
// code relay(std::istream& stream, uint32_t version, Subscriber& subscriber) const {
// // auto const message = std::make_shared<Message>();

// // // Subscribers are invoked only with stop and success codes.
// // if ( ! domain::entity_from_data(*message, stream, version)) {
// // return error::bad_stream;
// // }

// auto msg = Message::from_data(stream, version);
// if ( ! msg) {
// return error::bad_stream;
// }
// auto const msg_ptr = std::make_shared<Message>(std::move(*msg));

// ////auto const const_ptr = std::const_pointer_cast<const Message>(msg_ptr);
// subscriber->relay(error::success, msg_ptr);
// return error::success;
// }

template <typename Message, typename Subscriber>
code relay(std::istream& stream, uint32_t version, Subscriber& subscriber) const {
auto const message = std::make_shared<Message>();
code relay(byte_reader& reader, uint32_t version, Subscriber& subscriber) const {
// auto const message = std::make_shared<Message>();

// // Subscribers are invoked only with stop and success codes.
// if ( ! domain::entity_from_data(*message, stream, version)) {
// return error::bad_stream;
// }

// Subscribers are invoked only with stop and success codes.
if ( ! domain::entity_from_data(*message, stream, version)) {
auto msg = Message::from_data(reader, version);
if ( ! msg) {
return error::bad_stream;
}
auto const msg_ptr = std::make_shared<Message>(std::move(*msg));

////auto const const_ptr = std::const_pointer_cast<const Message>(message);
subscriber->relay(error::success, message);
////auto const const_ptr = std::const_pointer_cast<const Message>(msg_ptr);
subscriber->relay(error::success, msg_ptr);
return error::success;
}

Expand All @@ -114,20 +140,47 @@ class BCT_API message_subscriber : noncopyable {
* @param[in] subscriber The subscriber for the message type.
* @return Returns error::bad_stream if failed.
*/
// template <typename Message, typename Subscriber>
// code handle(std::istream& stream, uint32_t version, Subscriber& subscriber) const {
// // auto const message = std::make_shared<Message>();

// // // Subscribers are invoked only with stop and success codes.
// // if ( ! domain::entity_from_data(*message, stream, version)) {
// // return error::bad_stream;
// // }

// auto msg = Message::from_data(stream, version);
// if ( ! msg) {
// return error::bad_stream;
// }
// auto const msg_ptr = std::make_shared<Message>(std::move(*msg));

// ////auto const const_ptr = std::const_pointer_cast<const Message>(msg_ptr);
// subscriber->invoke(error::success, msg_ptr);
// return error::success;
// }

template <typename Message, typename Subscriber>
code handle(std::istream& stream, uint32_t version, Subscriber& subscriber) const {
auto const message = std::make_shared<Message>();
code handle(byte_reader& reader, uint32_t version, Subscriber& subscriber) const {
// auto const message = std::make_shared<Message>();

// Subscribers are invoked only with stop and success codes.
if ( ! domain::entity_from_data(*message, stream, version)) {
// // Subscribers are invoked only with stop and success codes.
// if ( ! domain::entity_from_data(*message, stream, version)) {
// return error::bad_stream;
// }

auto msg = Message::from_data(reader, version);
if ( ! msg) {
return error::bad_stream;
}
auto const msg_ptr = std::make_shared<Message>(std::move(*msg));

////auto const const_ptr = std::const_pointer_cast<const Message>(message);
subscriber->invoke(error::success, message);
////auto const const_ptr = std::const_pointer_cast<const Message>(msg_ptr);
subscriber->invoke(error::success, msg_ptr);
return error::success;
}


/**
* Broadcast a default message instance with the specified error code.
* @param[in] ec The error code to broadcast.
Expand All @@ -143,7 +196,9 @@ class BCT_API message_subscriber : noncopyable {
* @param[in] stream The stream from which to load the message.
* @return Returns error::bad_stream if failed.
*/
virtual code load(domain::message::message_type type, uint32_t version, std::istream& stream) const;
// virtual code load(domain::message::message_type type, uint32_t version, std::istream& stream) const;
virtual code load(domain::message::message_type type, uint32_t version, byte_reader& reader) const;


/**
* Start all subscribers so that they accept subscription.
Expand Down
108 changes: 73 additions & 35 deletions src/message_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
#define RELAY_CODE(code, value) value##_subscriber_->relay(code, {})

// This allows us to block the peer while handling the message.
#define CASE_HANDLE_MESSAGE(stream, version, value) \
#define CASE_HANDLE_MESSAGE(reader, version, value) \
case message_type::value: \
return handle<domain::message::value>(stream, version, value##_subscriber_)
return handle<domain::message::value>(reader, version, value##_subscriber_)

#define CASE_RELAY_MESSAGE(stream, version, value) \
#define CASE_RELAY_MESSAGE(reader, version, value) \
case message_type::value: \
return relay<domain::message::value>(stream, version, value##_subscriber_)
return relay<domain::message::value>(reader, version, value##_subscriber_)

#define START_SUBSCRIBER(value) value##_subscriber_->start()

Expand Down Expand Up @@ -95,38 +95,76 @@ void message_subscriber::broadcast(code const& ec) {
// RELAY_CODE(ec, xverack);
}

code message_subscriber::load(message_type type, uint32_t version, std::istream& stream) const {
// code message_subscriber::load(message_type type, uint32_t version, std::istream& stream) const {
// switch (type) {
// CASE_RELAY_MESSAGE(stream, version, address);
// CASE_RELAY_MESSAGE(stream, version, alert);
// CASE_HANDLE_MESSAGE(stream, version, block);
// CASE_RELAY_MESSAGE(stream, version, block_transactions);
// CASE_RELAY_MESSAGE(stream, version, compact_block);
// CASE_RELAY_MESSAGE(stream, version, double_spend_proof);
// CASE_RELAY_MESSAGE(stream, version, fee_filter);
// CASE_RELAY_MESSAGE(stream, version, filter_add);
// CASE_RELAY_MESSAGE(stream, version, filter_clear);
// CASE_RELAY_MESSAGE(stream, version, filter_load);
// CASE_RELAY_MESSAGE(stream, version, get_address);
// CASE_RELAY_MESSAGE(stream, version, get_blocks);
// CASE_RELAY_MESSAGE(stream, version, get_block_transactions);
// CASE_RELAY_MESSAGE(stream, version, get_data);
// CASE_RELAY_MESSAGE(stream, version, get_headers);
// CASE_RELAY_MESSAGE(stream, version, headers);
// CASE_RELAY_MESSAGE(stream, version, inventory);
// CASE_RELAY_MESSAGE(stream, version, memory_pool);
// CASE_RELAY_MESSAGE(stream, version, merkle_block);
// CASE_RELAY_MESSAGE(stream, version, not_found);
// CASE_RELAY_MESSAGE(stream, version, ping);
// CASE_RELAY_MESSAGE(stream, version, pong);
// CASE_RELAY_MESSAGE(stream, version, reject);
// CASE_RELAY_MESSAGE(stream, version, send_compact);
// CASE_RELAY_MESSAGE(stream, version, send_headers);
// CASE_HANDLE_MESSAGE(stream, version, transaction);
// CASE_HANDLE_MESSAGE(stream, version, verack);
// CASE_HANDLE_MESSAGE(stream, version, version);
// CASE_HANDLE_MESSAGE(stream, version, xversion);
// // CASE_HANDLE_MESSAGE(stream, version, xverack);
// case message_type::unknown:
// default:
// return error::not_found;
// }
// }

code message_subscriber::load(message_type type, uint32_t version, byte_reader& reader) const {
switch (type) {
CASE_RELAY_MESSAGE(stream, version, address);
CASE_RELAY_MESSAGE(stream, version, alert);
CASE_HANDLE_MESSAGE(stream, version, block);
CASE_RELAY_MESSAGE(stream, version, block_transactions);
CASE_RELAY_MESSAGE(stream, version, compact_block);
CASE_RELAY_MESSAGE(stream, version, double_spend_proof);
CASE_RELAY_MESSAGE(stream, version, fee_filter);
CASE_RELAY_MESSAGE(stream, version, filter_add);
CASE_RELAY_MESSAGE(stream, version, filter_clear);
CASE_RELAY_MESSAGE(stream, version, filter_load);
CASE_RELAY_MESSAGE(stream, version, get_address);
CASE_RELAY_MESSAGE(stream, version, get_blocks);
CASE_RELAY_MESSAGE(stream, version, get_block_transactions);
CASE_RELAY_MESSAGE(stream, version, get_data);
CASE_RELAY_MESSAGE(stream, version, get_headers);
CASE_RELAY_MESSAGE(stream, version, headers);
CASE_RELAY_MESSAGE(stream, version, inventory);
CASE_RELAY_MESSAGE(stream, version, memory_pool);
CASE_RELAY_MESSAGE(stream, version, merkle_block);
CASE_RELAY_MESSAGE(stream, version, not_found);
CASE_RELAY_MESSAGE(stream, version, ping);
CASE_RELAY_MESSAGE(stream, version, pong);
CASE_RELAY_MESSAGE(stream, version, reject);
CASE_RELAY_MESSAGE(stream, version, send_compact);
CASE_RELAY_MESSAGE(stream, version, send_headers);
CASE_HANDLE_MESSAGE(stream, version, transaction);
CASE_HANDLE_MESSAGE(stream, version, verack);
CASE_HANDLE_MESSAGE(stream, version, version);
CASE_HANDLE_MESSAGE(stream, version, xversion);
// CASE_HANDLE_MESSAGE(stream, version, xverack);
CASE_RELAY_MESSAGE(reader, version, address);
CASE_RELAY_MESSAGE(reader, version, alert);
CASE_HANDLE_MESSAGE(reader, version, block);
CASE_RELAY_MESSAGE(reader, version, block_transactions);
CASE_RELAY_MESSAGE(reader, version, compact_block);
CASE_RELAY_MESSAGE(reader, version, double_spend_proof);
CASE_RELAY_MESSAGE(reader, version, fee_filter);
CASE_RELAY_MESSAGE(reader, version, filter_add);
CASE_RELAY_MESSAGE(reader, version, filter_clear);
CASE_RELAY_MESSAGE(reader, version, filter_load);
CASE_RELAY_MESSAGE(reader, version, get_address);
CASE_RELAY_MESSAGE(reader, version, get_blocks);
CASE_RELAY_MESSAGE(reader, version, get_block_transactions);
CASE_RELAY_MESSAGE(reader, version, get_data);
CASE_RELAY_MESSAGE(reader, version, get_headers);
CASE_RELAY_MESSAGE(reader, version, headers);
CASE_RELAY_MESSAGE(reader, version, inventory);
CASE_RELAY_MESSAGE(reader, version, memory_pool);
CASE_RELAY_MESSAGE(reader, version, merkle_block);
CASE_RELAY_MESSAGE(reader, version, not_found);
CASE_RELAY_MESSAGE(reader, version, ping);
CASE_RELAY_MESSAGE(reader, version, pong);
CASE_RELAY_MESSAGE(reader, version, reject);
CASE_RELAY_MESSAGE(reader, version, send_compact);
CASE_RELAY_MESSAGE(reader, version, send_headers);
CASE_HANDLE_MESSAGE(reader, version, transaction);
CASE_HANDLE_MESSAGE(reader, version, verack);
CASE_HANDLE_MESSAGE(reader, version, version);
CASE_HANDLE_MESSAGE(reader, version, xversion);
// CASE_HANDLE_MESSAGE(reader, version, xverack);
case message_type::unknown:
default:
return error::not_found;
Expand Down
80 changes: 71 additions & 9 deletions src/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ static size_t const invalid_payload_dump_size = 1024;
proxy::proxy(threadpool& pool, socket::ptr socket, settings const& settings)
: authority_(socket->authority())
, heading_buffer_(heading::maximum_size())
, payload_buffer_(heading::maximum_payload_size(settings.protocol_maximum, false, settings.identifier, settings.inbound_port == 48333))
, maximum_payload_(heading::maximum_payload_size(settings.protocol_maximum, (settings.services & version::service::node_witness) != 0, settings.identifier, settings.inbound_port == 48333))
, payload_buffer_(heading::maximum_payload_size(settings.protocol_maximum, settings.identifier, settings.inbound_port == 48333))
, maximum_payload_(heading::maximum_payload_size(settings.protocol_maximum, settings.identifier, settings.inbound_port == 48333))
, socket_(socket)
, stopped_(true)
, protocol_magic_(settings.identifier)
Expand Down Expand Up @@ -118,7 +118,7 @@ void proxy::handle_read_heading(boost_code const& ec, size_t) {
return;
}

auto const head = domain::create<heading>(heading_buffer_);
auto const head = domain::create_old<heading>(heading_buffer_, 0);

if ( ! head.is_valid()) {
LOG_WARNING(LOG_NETWORK, "Invalid heading from [", authority(), "]");
Expand Down Expand Up @@ -154,7 +154,7 @@ void proxy::handle_read_heading(boost_code const& ec, size_t) {
read_payload(head);
}

void proxy::read_payload(const heading& head) {
void proxy::read_payload(heading const& head) {
if (stopped()) {
return;
}
Expand All @@ -165,7 +165,68 @@ void proxy::read_payload(const heading& head) {
async_read(socket_->get(), buffer(payload_buffer_), std::bind(&proxy::handle_read_payload, shared_from_this(), _1, _2, head));
}

void proxy::handle_read_payload(boost_code const& ec, size_t payload_size, const heading& head) {
// void proxy::handle_read_payload(boost_code const& ec, size_t payload_size, heading const& head) {
// //LOG_INFO(LOG_NETWORK, "proxy::handle_read_payload()");
// if (stopped()) return;

// if (ec) {
// LOG_DEBUG(LOG_NETWORK
// , "Payload read failure [", authority(), "] "
// , code(error::boost_to_error_code(ec)).message());
// stop(ec);
// return;
// }

// // This is a pointless test but we allow it as an option for completeness.
// if (validate_checksum_ && head.checksum() != bitcoin_checksum(payload_buffer_)) {
// LOG_WARNING(LOG_NETWORK, "Invalid ", head.command(), " payload from [", authority(), "] bad checksum.");
// stop(error::bad_stream);
// return;
// }

// LOG_DEBUG(LOG_NETWORK
// , "Read ", head.command(), " from [", authority()
// , "] (", payload_size, " bytes). Now parsing ...");

// // Notify subscribers of the new message.
// // payload_source source(payload_buffer_);
// // payload_stream istream(source);
// byte_reader reader(payload_buffer_);

// // Failures are not forwarded to subscribers and channel is stopped below.
// auto const code = message_subscriber_.load(head.type(), version_, istream);
// auto const consumed = istream.peek() == std::istream::traits_type::eof();

// if (verbose_ && code) {
// auto const size = std::min(payload_size, invalid_payload_dump_size);
// auto const begin = payload_buffer_.begin();

// LOG_VERBOSE(LOG_NETWORK, "Invalid payload from [", authority(), "] ", encode_base16(data_chunk{ begin, begin + size }));
// stop(code);
// return;
// }

// if (code) {
// LOG_WARNING(LOG_NETWORK, "Invalid ", head.command(), " payload from [", authority(), "] ", code.message());
// stop(code);
// return;
// }

// if ( ! consumed) {
// LOG_WARNING(LOG_NETWORK, "Invalid ", head.command(), " payload from [", authority(), "] trailing bytes.");
// stop(error::bad_stream);
// return;
// }

// LOG_DEBUG(LOG_NETWORK
// , "Received ", head.command(), " from [", authority()
// , "] (", payload_size, " bytes)");

// signal_activity();
// read_heading();
// }

void proxy::handle_read_payload(boost_code const& ec, size_t payload_size, heading const& head) {
//LOG_INFO(LOG_NETWORK, "proxy::handle_read_payload()");
if (stopped()) return;

Expand All @@ -189,12 +250,13 @@ void proxy::handle_read_payload(boost_code const& ec, size_t payload_size, const
, "] (", payload_size, " bytes). Now parsing ...");

// Notify subscribers of the new message.
payload_source source(payload_buffer_);
payload_stream istream(source);
// payload_source source(payload_buffer_);
// payload_stream istream(source);
byte_reader reader(payload_buffer_);

// Failures are not forwarded to subscribers and channel is stopped below.
auto const code = message_subscriber_.load(head.type(), version_, istream);
auto const consumed = istream.peek() == std::istream::traits_type::eof();
auto const code = message_subscriber_.load(head.type(), version_, reader);
auto const consumed = reader.is_exhausted();

if (verbose_ && code) {
auto const size = std::min(payload_size, invalid_payload_dump_size);
Expand Down
5 changes: 0 additions & 5 deletions src/sessions/session_outbound.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,7 @@ void session_outbound::attach_handshake_protocols(channel::ptr channel, result_h
auto const invalid_services = settings_.invalid_services;
auto const minimum_version = settings_.protocol_minimum;

#if defined(KTH_CURRENCY_BCH)
auto const minimum_services = serve::node_network;
#else
// Require peer to serve network (and witness if configured on self).
auto const minimum_services = (own_services & serve::node_witness) | serve::node_network;
#endif

// Reject messages are not handled until bip61 (70002).
// The negotiated_version is initialized to the configured maximum.
Expand Down

0 comments on commit 106452d

Please sign in to comment.