diff --git a/CHANGES b/CHANGES index 60f1195c..a191676f 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,11 @@ +2.8.0-dev.154 | 2024-12-06 15:14:02 -0800 + + * Fix potential connector infinite loop in read_result::stop scenarios (Christian Kreibich, Corelight) + + * Add reason strings to a peering's force_disconnect() (Christian Kreibich, Corelight) + + * Revert part of 4d9aa3bd that currently breaks Zeek build (Christian Kreibich, Corelight) + 2.8.0-dev.150 | 2024-12-06 00:02:47 -0800 * CI: add Ubuntu 24.10 (Christian Kreibich, Corelight) diff --git a/VERSION b/VERSION index b92c9227..3bc4a226 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.8.0-dev.150 +2.8.0-dev.154 diff --git a/libbroker/broker/expected.hh b/libbroker/broker/expected.hh index d507d22b..b8295147 100644 --- a/libbroker/broker/expected.hh +++ b/libbroker/broker/expected.hh @@ -40,6 +40,12 @@ public: // -- constructors, destructors, and assignment operators -------------------- + template + expected(U x, std::enable_if_t>* = nullptr) + : engaged_(true) { + new (std::addressof(value_)) T(std::move(x)); + } + expected(T&& x) noexcept(nothrow_move) : engaged_(true) { new (std::addressof(value_)) T(std::move(x)); } diff --git a/libbroker/broker/internal/connector.cc b/libbroker/broker/internal/connector.cc index 603e1d5c..d0d4b4c5 100644 --- a/libbroker/broker/internal/connector.cc +++ b/libbroker/broker/internal/connector.cc @@ -1944,7 +1944,8 @@ void connector::run_impl(listener* sub, shared_filter_type* filter) { } else { mgr.abort(*i); } - while ((i->revents & read_mask) && mgr.must_read_more(*i)) + while ((i->revents & read_mask) && (i->events & read_mask) + && mgr.must_read_more(*i)) mgr.continue_reading(*i); } } while (--presult > 0 && advance()); diff --git a/libbroker/broker/internal/core_actor.cc b/libbroker/broker/internal/core_actor.cc index 41b37cf7..c7a04772 100644 --- a/libbroker/broker/internal/core_actor.cc +++ b/libbroker/broker/internal/core_actor.cc @@ -564,7 +564,7 @@ void core_actor_state::shutdown(shutdown_options options) { void core_actor_state::finalize_shutdown() { // Drop any remaining state of peers. for (auto& kvp : peers) - kvp.second->force_disconnect(); + kvp.second->force_disconnect("shutting down"); peers.clear(); // Close the shared state for all peers. peer_statuses->close(); @@ -873,7 +873,7 @@ caf::error core_actor_state::init_new_peer(endpoint_id peer_id, .on_backpressure_buffer(peer_buffer_size(), peer_overflow_policy()) .do_on_error([this, ptr, peer_id](const caf::error& what) { BROKER_INFO("remove peer" << peer_id << "due to:" << what); - ptr->force_disconnect(); + ptr->force_disconnect(to_string(what)); }) .as_observable()); // Push messages received from the peer into the central merge point. diff --git a/libbroker/broker/internal/peering.cc b/libbroker/broker/internal/peering.cc index dbd43a81..404a5aaa 100644 --- a/libbroker/broker/internal/peering.cc +++ b/libbroker/broker/internal/peering.cc @@ -11,6 +11,8 @@ #include +using namespace std::literals; + namespace broker::internal { namespace { @@ -94,9 +96,14 @@ class suffix_generator : public affix_generator { node_message first() override { if (ptr_->removed()) { + auto msg = "removed connection to remote peer"s; + if (const auto& reason = ptr_->removed_reason(); !reason.empty()) { + msg += " ("; + msg += reason; + msg += ')'; + } return make_status_msg(endpoint_info{ptr_->peer_id(), ptr_->addr()}, - sc_constant(), - "removed connection to remote peer"); + sc_constant(), msg.c_str()); } else { return make_status_msg(endpoint_info{ptr_->peer_id(), ptr_->addr()}, sc_constant(), @@ -119,9 +126,10 @@ void peering::on_bye_ack() { bye_timeout_.dispose(); } -void peering::force_disconnect() { +void peering::force_disconnect(std::string reason) { if (!removed_) { removed_ = true; + removed_reason_ = std::move(reason); } on_bye_ack(); } @@ -129,8 +137,9 @@ void peering::force_disconnect() { void peering::schedule_bye_timeout(caf::scheduled_actor* self) { bye_timeout_.dispose(); bye_timeout_ = - self->run_delayed(defaults::unpeer_timeout, - [ptr = shared_from_this()] { ptr->force_disconnect(); }); + self->run_delayed(defaults::unpeer_timeout, [ptr = shared_from_this()] { + ptr->force_disconnect("timeout during graceful disconnect"); + }); } void peering::assign_bye_token(std::array& buf) { diff --git a/libbroker/broker/internal/peering.hh b/libbroker/broker/internal/peering.hh index 0bbfdf2e..623b2258 100644 --- a/libbroker/broker/internal/peering.hh +++ b/libbroker/broker/internal/peering.hh @@ -38,7 +38,7 @@ public: /// Forces the peering to shut down its connection without performing the BYE /// handshake. - void force_disconnect(); + void force_disconnect(std::string reason); void schedule_bye_timeout(caf::scheduled_actor* self); @@ -63,6 +63,11 @@ public: return removed_; } + /// Returns the removal reason, which may be empty. + const std::string& removed_reason() const noexcept { + return removed_reason_; + } + /// Tag this peering as removed and send a BYE message on the `snk` for a /// graceful shutdown. void remove(caf::scheduled_actor* self, @@ -117,6 +122,8 @@ private: /// BYE message to the peer. bool removed_ = false; + std::string removed_reason_; + /// Network address as reported from the transport (usually TCP). network_info addr_;