From 106452d444188f5c2e56ba963f77105496a1bb59 Mon Sep 17 00:00:00 2001 From: Fernando Pelliccioni Date: Fri, 18 Oct 2024 13:21:09 +0200 Subject: [PATCH] feat: refactor domain objects deserialization --- include/kth/network/message_subscriber.hpp | 81 +++++++++++++--- src/message_subscriber.cpp | 108 ++++++++++++++------- src/proxy.cpp | 80 +++++++++++++-- src/sessions/session_outbound.cpp | 5 - 4 files changed, 212 insertions(+), 62 deletions(-) diff --git a/include/kth/network/message_subscriber.hpp b/include/kth/network/message_subscriber.hpp index 55988236..aee76e82 100644 --- a/include/kth/network/message_subscriber.hpp +++ b/include/kth/network/message_subscriber.hpp @@ -93,17 +93,43 @@ class BCT_API message_subscriber : noncopyable { * @param[in] subscriber The subscriber for the message type. * @return Returns error::bad_stream if failed. */ + // template + // code relay(std::istream& stream, uint32_t version, Subscriber& subscriber) const { + // // auto const message = std::make_shared(); + + // // // Subscribers are invoked only with stop and success codes. + // // if ( ! domain::entity_from_data(*message, stream, version)) { + // // return error::bad_stream; + // // } + + // auto msg = Message::from_data(stream, version); + // if ( ! msg) { + // return error::bad_stream; + // } + // auto const msg_ptr = std::make_shared(std::move(*msg)); + + // ////auto const const_ptr = std::const_pointer_cast(msg_ptr); + // subscriber->relay(error::success, msg_ptr); + // return error::success; + // } + template - code relay(std::istream& stream, uint32_t version, Subscriber& subscriber) const { - auto const message = std::make_shared(); + code relay(byte_reader& reader, uint32_t version, Subscriber& subscriber) const { + // auto const message = std::make_shared(); + + // // Subscribers are invoked only with stop and success codes. + // if ( ! domain::entity_from_data(*message, stream, version)) { + // return error::bad_stream; + // } - // Subscribers are invoked only with stop and success codes. - if ( ! domain::entity_from_data(*message, stream, version)) { + auto msg = Message::from_data(reader, version); + if ( ! msg) { return error::bad_stream; } + auto const msg_ptr = std::make_shared(std::move(*msg)); - ////auto const const_ptr = std::const_pointer_cast(message); - subscriber->relay(error::success, message); + ////auto const const_ptr = std::const_pointer_cast(msg_ptr); + subscriber->relay(error::success, msg_ptr); return error::success; } @@ -114,20 +140,47 @@ class BCT_API message_subscriber : noncopyable { * @param[in] subscriber The subscriber for the message type. * @return Returns error::bad_stream if failed. */ + // template + // code handle(std::istream& stream, uint32_t version, Subscriber& subscriber) const { + // // auto const message = std::make_shared(); + + // // // Subscribers are invoked only with stop and success codes. + // // if ( ! domain::entity_from_data(*message, stream, version)) { + // // return error::bad_stream; + // // } + + // auto msg = Message::from_data(stream, version); + // if ( ! msg) { + // return error::bad_stream; + // } + // auto const msg_ptr = std::make_shared(std::move(*msg)); + + // ////auto const const_ptr = std::const_pointer_cast(msg_ptr); + // subscriber->invoke(error::success, msg_ptr); + // return error::success; + // } + template - code handle(std::istream& stream, uint32_t version, Subscriber& subscriber) const { - auto const message = std::make_shared(); + code handle(byte_reader& reader, uint32_t version, Subscriber& subscriber) const { + // auto const message = std::make_shared(); - // Subscribers are invoked only with stop and success codes. - if ( ! domain::entity_from_data(*message, stream, version)) { + // // Subscribers are invoked only with stop and success codes. + // if ( ! domain::entity_from_data(*message, stream, version)) { + // return error::bad_stream; + // } + + auto msg = Message::from_data(reader, version); + if ( ! msg) { return error::bad_stream; } + auto const msg_ptr = std::make_shared(std::move(*msg)); - ////auto const const_ptr = std::const_pointer_cast(message); - subscriber->invoke(error::success, message); + ////auto const const_ptr = std::const_pointer_cast(msg_ptr); + subscriber->invoke(error::success, msg_ptr); return error::success; } + /** * Broadcast a default message instance with the specified error code. * @param[in] ec The error code to broadcast. @@ -143,7 +196,9 @@ class BCT_API message_subscriber : noncopyable { * @param[in] stream The stream from which to load the message. * @return Returns error::bad_stream if failed. */ - virtual code load(domain::message::message_type type, uint32_t version, std::istream& stream) const; + // virtual code load(domain::message::message_type type, uint32_t version, std::istream& stream) const; + virtual code load(domain::message::message_type type, uint32_t version, byte_reader& reader) const; + /** * Start all subscribers so that they accept subscription. diff --git a/src/message_subscriber.cpp b/src/message_subscriber.cpp index 2967200c..435b9095 100644 --- a/src/message_subscriber.cpp +++ b/src/message_subscriber.cpp @@ -13,13 +13,13 @@ #define RELAY_CODE(code, value) value##_subscriber_->relay(code, {}) // This allows us to block the peer while handling the message. -#define CASE_HANDLE_MESSAGE(stream, version, value) \ +#define CASE_HANDLE_MESSAGE(reader, version, value) \ case message_type::value: \ - return handle(stream, version, value##_subscriber_) + return handle(reader, version, value##_subscriber_) -#define CASE_RELAY_MESSAGE(stream, version, value) \ +#define CASE_RELAY_MESSAGE(reader, version, value) \ case message_type::value: \ - return relay(stream, version, value##_subscriber_) + return relay(reader, version, value##_subscriber_) #define START_SUBSCRIBER(value) value##_subscriber_->start() @@ -95,38 +95,76 @@ void message_subscriber::broadcast(code const& ec) { // RELAY_CODE(ec, xverack); } -code message_subscriber::load(message_type type, uint32_t version, std::istream& stream) const { +// code message_subscriber::load(message_type type, uint32_t version, std::istream& stream) const { +// switch (type) { +// CASE_RELAY_MESSAGE(stream, version, address); +// CASE_RELAY_MESSAGE(stream, version, alert); +// CASE_HANDLE_MESSAGE(stream, version, block); +// CASE_RELAY_MESSAGE(stream, version, block_transactions); +// CASE_RELAY_MESSAGE(stream, version, compact_block); +// CASE_RELAY_MESSAGE(stream, version, double_spend_proof); +// CASE_RELAY_MESSAGE(stream, version, fee_filter); +// CASE_RELAY_MESSAGE(stream, version, filter_add); +// CASE_RELAY_MESSAGE(stream, version, filter_clear); +// CASE_RELAY_MESSAGE(stream, version, filter_load); +// CASE_RELAY_MESSAGE(stream, version, get_address); +// CASE_RELAY_MESSAGE(stream, version, get_blocks); +// CASE_RELAY_MESSAGE(stream, version, get_block_transactions); +// CASE_RELAY_MESSAGE(stream, version, get_data); +// CASE_RELAY_MESSAGE(stream, version, get_headers); +// CASE_RELAY_MESSAGE(stream, version, headers); +// CASE_RELAY_MESSAGE(stream, version, inventory); +// CASE_RELAY_MESSAGE(stream, version, memory_pool); +// CASE_RELAY_MESSAGE(stream, version, merkle_block); +// CASE_RELAY_MESSAGE(stream, version, not_found); +// CASE_RELAY_MESSAGE(stream, version, ping); +// CASE_RELAY_MESSAGE(stream, version, pong); +// CASE_RELAY_MESSAGE(stream, version, reject); +// CASE_RELAY_MESSAGE(stream, version, send_compact); +// CASE_RELAY_MESSAGE(stream, version, send_headers); +// CASE_HANDLE_MESSAGE(stream, version, transaction); +// CASE_HANDLE_MESSAGE(stream, version, verack); +// CASE_HANDLE_MESSAGE(stream, version, version); +// CASE_HANDLE_MESSAGE(stream, version, xversion); +// // CASE_HANDLE_MESSAGE(stream, version, xverack); +// case message_type::unknown: +// default: +// return error::not_found; +// } +// } + +code message_subscriber::load(message_type type, uint32_t version, byte_reader& reader) const { switch (type) { - CASE_RELAY_MESSAGE(stream, version, address); - CASE_RELAY_MESSAGE(stream, version, alert); - CASE_HANDLE_MESSAGE(stream, version, block); - CASE_RELAY_MESSAGE(stream, version, block_transactions); - CASE_RELAY_MESSAGE(stream, version, compact_block); - CASE_RELAY_MESSAGE(stream, version, double_spend_proof); - CASE_RELAY_MESSAGE(stream, version, fee_filter); - CASE_RELAY_MESSAGE(stream, version, filter_add); - CASE_RELAY_MESSAGE(stream, version, filter_clear); - CASE_RELAY_MESSAGE(stream, version, filter_load); - CASE_RELAY_MESSAGE(stream, version, get_address); - CASE_RELAY_MESSAGE(stream, version, get_blocks); - CASE_RELAY_MESSAGE(stream, version, get_block_transactions); - CASE_RELAY_MESSAGE(stream, version, get_data); - CASE_RELAY_MESSAGE(stream, version, get_headers); - CASE_RELAY_MESSAGE(stream, version, headers); - CASE_RELAY_MESSAGE(stream, version, inventory); - CASE_RELAY_MESSAGE(stream, version, memory_pool); - CASE_RELAY_MESSAGE(stream, version, merkle_block); - CASE_RELAY_MESSAGE(stream, version, not_found); - CASE_RELAY_MESSAGE(stream, version, ping); - CASE_RELAY_MESSAGE(stream, version, pong); - CASE_RELAY_MESSAGE(stream, version, reject); - CASE_RELAY_MESSAGE(stream, version, send_compact); - CASE_RELAY_MESSAGE(stream, version, send_headers); - CASE_HANDLE_MESSAGE(stream, version, transaction); - CASE_HANDLE_MESSAGE(stream, version, verack); - CASE_HANDLE_MESSAGE(stream, version, version); - CASE_HANDLE_MESSAGE(stream, version, xversion); - // CASE_HANDLE_MESSAGE(stream, version, xverack); + CASE_RELAY_MESSAGE(reader, version, address); + CASE_RELAY_MESSAGE(reader, version, alert); + CASE_HANDLE_MESSAGE(reader, version, block); + CASE_RELAY_MESSAGE(reader, version, block_transactions); + CASE_RELAY_MESSAGE(reader, version, compact_block); + CASE_RELAY_MESSAGE(reader, version, double_spend_proof); + CASE_RELAY_MESSAGE(reader, version, fee_filter); + CASE_RELAY_MESSAGE(reader, version, filter_add); + CASE_RELAY_MESSAGE(reader, version, filter_clear); + CASE_RELAY_MESSAGE(reader, version, filter_load); + CASE_RELAY_MESSAGE(reader, version, get_address); + CASE_RELAY_MESSAGE(reader, version, get_blocks); + CASE_RELAY_MESSAGE(reader, version, get_block_transactions); + CASE_RELAY_MESSAGE(reader, version, get_data); + CASE_RELAY_MESSAGE(reader, version, get_headers); + CASE_RELAY_MESSAGE(reader, version, headers); + CASE_RELAY_MESSAGE(reader, version, inventory); + CASE_RELAY_MESSAGE(reader, version, memory_pool); + CASE_RELAY_MESSAGE(reader, version, merkle_block); + CASE_RELAY_MESSAGE(reader, version, not_found); + CASE_RELAY_MESSAGE(reader, version, ping); + CASE_RELAY_MESSAGE(reader, version, pong); + CASE_RELAY_MESSAGE(reader, version, reject); + CASE_RELAY_MESSAGE(reader, version, send_compact); + CASE_RELAY_MESSAGE(reader, version, send_headers); + CASE_HANDLE_MESSAGE(reader, version, transaction); + CASE_HANDLE_MESSAGE(reader, version, verack); + CASE_HANDLE_MESSAGE(reader, version, version); + CASE_HANDLE_MESSAGE(reader, version, xversion); + // CASE_HANDLE_MESSAGE(reader, version, xverack); case message_type::unknown: default: return error::not_found; diff --git a/src/proxy.cpp b/src/proxy.cpp index 13f181ef..8ae5ec0c 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -34,8 +34,8 @@ static size_t const invalid_payload_dump_size = 1024; proxy::proxy(threadpool& pool, socket::ptr socket, settings const& settings) : authority_(socket->authority()) , heading_buffer_(heading::maximum_size()) - , payload_buffer_(heading::maximum_payload_size(settings.protocol_maximum, false, settings.identifier, settings.inbound_port == 48333)) - , maximum_payload_(heading::maximum_payload_size(settings.protocol_maximum, (settings.services & version::service::node_witness) != 0, settings.identifier, settings.inbound_port == 48333)) + , payload_buffer_(heading::maximum_payload_size(settings.protocol_maximum, settings.identifier, settings.inbound_port == 48333)) + , maximum_payload_(heading::maximum_payload_size(settings.protocol_maximum, settings.identifier, settings.inbound_port == 48333)) , socket_(socket) , stopped_(true) , protocol_magic_(settings.identifier) @@ -118,7 +118,7 @@ void proxy::handle_read_heading(boost_code const& ec, size_t) { return; } - auto const head = domain::create(heading_buffer_); + auto const head = domain::create_old(heading_buffer_, 0); if ( ! head.is_valid()) { LOG_WARNING(LOG_NETWORK, "Invalid heading from [", authority(), "]"); @@ -154,7 +154,7 @@ void proxy::handle_read_heading(boost_code const& ec, size_t) { read_payload(head); } -void proxy::read_payload(const heading& head) { +void proxy::read_payload(heading const& head) { if (stopped()) { return; } @@ -165,7 +165,68 @@ void proxy::read_payload(const heading& head) { async_read(socket_->get(), buffer(payload_buffer_), std::bind(&proxy::handle_read_payload, shared_from_this(), _1, _2, head)); } -void proxy::handle_read_payload(boost_code const& ec, size_t payload_size, const heading& head) { +// void proxy::handle_read_payload(boost_code const& ec, size_t payload_size, heading const& head) { +// //LOG_INFO(LOG_NETWORK, "proxy::handle_read_payload()"); +// if (stopped()) return; + +// if (ec) { +// LOG_DEBUG(LOG_NETWORK +// , "Payload read failure [", authority(), "] " +// , code(error::boost_to_error_code(ec)).message()); +// stop(ec); +// return; +// } + +// // This is a pointless test but we allow it as an option for completeness. +// if (validate_checksum_ && head.checksum() != bitcoin_checksum(payload_buffer_)) { +// LOG_WARNING(LOG_NETWORK, "Invalid ", head.command(), " payload from [", authority(), "] bad checksum."); +// stop(error::bad_stream); +// return; +// } + +// LOG_DEBUG(LOG_NETWORK +// , "Read ", head.command(), " from [", authority() +// , "] (", payload_size, " bytes). Now parsing ..."); + +// // Notify subscribers of the new message. +// // payload_source source(payload_buffer_); +// // payload_stream istream(source); +// byte_reader reader(payload_buffer_); + +// // Failures are not forwarded to subscribers and channel is stopped below. +// auto const code = message_subscriber_.load(head.type(), version_, istream); +// auto const consumed = istream.peek() == std::istream::traits_type::eof(); + +// if (verbose_ && code) { +// auto const size = std::min(payload_size, invalid_payload_dump_size); +// auto const begin = payload_buffer_.begin(); + +// LOG_VERBOSE(LOG_NETWORK, "Invalid payload from [", authority(), "] ", encode_base16(data_chunk{ begin, begin + size })); +// stop(code); +// return; +// } + +// if (code) { +// LOG_WARNING(LOG_NETWORK, "Invalid ", head.command(), " payload from [", authority(), "] ", code.message()); +// stop(code); +// return; +// } + +// if ( ! consumed) { +// LOG_WARNING(LOG_NETWORK, "Invalid ", head.command(), " payload from [", authority(), "] trailing bytes."); +// stop(error::bad_stream); +// return; +// } + +// LOG_DEBUG(LOG_NETWORK +// , "Received ", head.command(), " from [", authority() +// , "] (", payload_size, " bytes)"); + +// signal_activity(); +// read_heading(); +// } + +void proxy::handle_read_payload(boost_code const& ec, size_t payload_size, heading const& head) { //LOG_INFO(LOG_NETWORK, "proxy::handle_read_payload()"); if (stopped()) return; @@ -189,12 +250,13 @@ void proxy::handle_read_payload(boost_code const& ec, size_t payload_size, const , "] (", payload_size, " bytes). Now parsing ..."); // Notify subscribers of the new message. - payload_source source(payload_buffer_); - payload_stream istream(source); + // payload_source source(payload_buffer_); + // payload_stream istream(source); + byte_reader reader(payload_buffer_); // Failures are not forwarded to subscribers and channel is stopped below. - auto const code = message_subscriber_.load(head.type(), version_, istream); - auto const consumed = istream.peek() == std::istream::traits_type::eof(); + auto const code = message_subscriber_.load(head.type(), version_, reader); + auto const consumed = reader.is_exhausted(); if (verbose_ && code) { auto const size = std::min(payload_size, invalid_payload_dump_size); diff --git a/src/sessions/session_outbound.cpp b/src/sessions/session_outbound.cpp index d968c50e..b474132b 100644 --- a/src/sessions/session_outbound.cpp +++ b/src/sessions/session_outbound.cpp @@ -117,12 +117,7 @@ void session_outbound::attach_handshake_protocols(channel::ptr channel, result_h auto const invalid_services = settings_.invalid_services; auto const minimum_version = settings_.protocol_minimum; -#if defined(KTH_CURRENCY_BCH) auto const minimum_services = serve::node_network; -#else - // Require peer to serve network (and witness if configured on self). - auto const minimum_services = (own_services & serve::node_witness) | serve::node_network; -#endif // Reject messages are not handled until bip61 (70002). // The negotiated_version is initialized to the configured maximum.