From 5bcc42fab4793085270c3661d811c88b3d13459a Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Mon, 2 Dec 2024 15:53:15 -0800 Subject: [PATCH 1/3] Revert part of 4d9aa3bd that currently breaks Zeek build --- libbroker/broker/expected.hh | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/libbroker/broker/expected.hh b/libbroker/broker/expected.hh index d507d22b..b8295147 100644 --- a/libbroker/broker/expected.hh +++ b/libbroker/broker/expected.hh @@ -40,6 +40,12 @@ public: // -- constructors, destructors, and assignment operators -------------------- + template + expected(U x, std::enable_if_t>* = nullptr) + : engaged_(true) { + new (std::addressof(value_)) T(std::move(x)); + } + expected(T&& x) noexcept(nothrow_move) : engaged_(true) { new (std::addressof(value_)) T(std::move(x)); } From 96aaa1471a4557840d854c177dd48cf9513a4019 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Thu, 10 Oct 2024 13:05:09 -0700 Subject: [PATCH 2/3] Add reason strings to a peering's force_disconnect() This includes providing a reason for the new backpressure overflow, so Zeek can pick up on them. --- libbroker/broker/internal/core_actor.cc | 4 ++-- libbroker/broker/internal/peering.cc | 19 ++++++++++++++----- libbroker/broker/internal/peering.hh | 9 ++++++++- 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/libbroker/broker/internal/core_actor.cc b/libbroker/broker/internal/core_actor.cc index 41b37cf7..c7a04772 100644 --- a/libbroker/broker/internal/core_actor.cc +++ b/libbroker/broker/internal/core_actor.cc @@ -564,7 +564,7 @@ void core_actor_state::shutdown(shutdown_options options) { void core_actor_state::finalize_shutdown() { // Drop any remaining state of peers. for (auto& kvp : peers) - kvp.second->force_disconnect(); + kvp.second->force_disconnect("shutting down"); peers.clear(); // Close the shared state for all peers. peer_statuses->close(); @@ -873,7 +873,7 @@ caf::error core_actor_state::init_new_peer(endpoint_id peer_id, .on_backpressure_buffer(peer_buffer_size(), peer_overflow_policy()) .do_on_error([this, ptr, peer_id](const caf::error& what) { BROKER_INFO("remove peer" << peer_id << "due to:" << what); - ptr->force_disconnect(); + ptr->force_disconnect(to_string(what)); }) .as_observable()); // Push messages received from the peer into the central merge point. diff --git a/libbroker/broker/internal/peering.cc b/libbroker/broker/internal/peering.cc index dbd43a81..404a5aaa 100644 --- a/libbroker/broker/internal/peering.cc +++ b/libbroker/broker/internal/peering.cc @@ -11,6 +11,8 @@ #include +using namespace std::literals; + namespace broker::internal { namespace { @@ -94,9 +96,14 @@ class suffix_generator : public affix_generator { node_message first() override { if (ptr_->removed()) { + auto msg = "removed connection to remote peer"s; + if (const auto& reason = ptr_->removed_reason(); !reason.empty()) { + msg += " ("; + msg += reason; + msg += ')'; + } return make_status_msg(endpoint_info{ptr_->peer_id(), ptr_->addr()}, - sc_constant(), - "removed connection to remote peer"); + sc_constant(), msg.c_str()); } else { return make_status_msg(endpoint_info{ptr_->peer_id(), ptr_->addr()}, sc_constant(), @@ -119,9 +126,10 @@ void peering::on_bye_ack() { bye_timeout_.dispose(); } -void peering::force_disconnect() { +void peering::force_disconnect(std::string reason) { if (!removed_) { removed_ = true; + removed_reason_ = std::move(reason); } on_bye_ack(); } @@ -129,8 +137,9 @@ void peering::force_disconnect() { void peering::schedule_bye_timeout(caf::scheduled_actor* self) { bye_timeout_.dispose(); bye_timeout_ = - self->run_delayed(defaults::unpeer_timeout, - [ptr = shared_from_this()] { ptr->force_disconnect(); }); + self->run_delayed(defaults::unpeer_timeout, [ptr = shared_from_this()] { + ptr->force_disconnect("timeout during graceful disconnect"); + }); } void peering::assign_bye_token(std::array& buf) { diff --git a/libbroker/broker/internal/peering.hh b/libbroker/broker/internal/peering.hh index 0bbfdf2e..623b2258 100644 --- a/libbroker/broker/internal/peering.hh +++ b/libbroker/broker/internal/peering.hh @@ -38,7 +38,7 @@ public: /// Forces the peering to shut down its connection without performing the BYE /// handshake. - void force_disconnect(); + void force_disconnect(std::string reason); void schedule_bye_timeout(caf::scheduled_actor* self); @@ -63,6 +63,11 @@ public: return removed_; } + /// Returns the removal reason, which may be empty. + const std::string& removed_reason() const noexcept { + return removed_reason_; + } + /// Tag this peering as removed and send a BYE message on the `snk` for a /// graceful shutdown. void remove(caf::scheduled_actor* self, @@ -117,6 +122,8 @@ private: /// BYE message to the peer. bool removed_ = false; + std::string removed_reason_; + /// Network address as reported from the transport (usually TCP). network_info addr_; From e2d1d413b838b121206c085642e1b45fcdfa76f3 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Mon, 2 Dec 2024 14:39:14 -0800 Subject: [PATCH 3/3] Fix potential connector infinite loop in read_result::stop scenarios When connect_manager::continue_reading() encounters a read_result::stop, it clears the read mask, indicating no more reads should happen, but this flag isn't checked in connector::run_impl()'s while loop. This could lead to infinite 100% CPU utilization in a tight loop. We now simply consult the read mask in addition to the other checks. --- libbroker/broker/internal/connector.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libbroker/broker/internal/connector.cc b/libbroker/broker/internal/connector.cc index 603e1d5c..d0d4b4c5 100644 --- a/libbroker/broker/internal/connector.cc +++ b/libbroker/broker/internal/connector.cc @@ -1944,7 +1944,8 @@ void connector::run_impl(listener* sub, shared_filter_type* filter) { } else { mgr.abort(*i); } - while ((i->revents & read_mask) && mgr.must_read_more(*i)) + while ((i->revents & read_mask) && (i->events & read_mask) + && mgr.must_read_more(*i)) mgr.continue_reading(*i); } } while (--presult > 0 && advance());