Skip to content

Commit

Permalink
Merge branch 'topic/christian/disconnect-slow-peers'
Browse files Browse the repository at this point in the history
* topic/christian/disconnect-slow-peers:
  Fix potential connector infinite loop in read_result::stop scenarios
  Add reason strings to a peering's force_disconnect()
  Revert part of 4d9aa3b that currently breaks Zeek build
  • Loading branch information
ckreibich committed Dec 6, 2024
2 parents 5c9f59f + e2d1d41 commit 28cdb75
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 10 deletions.
8 changes: 8 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
2.8.0-dev.154 | 2024-12-06 15:14:02 -0800

* Fix potential connector infinite loop in read_result::stop scenarios (Christian Kreibich, Corelight)

* Add reason strings to a peering's force_disconnect() (Christian Kreibich, Corelight)

* Revert part of 4d9aa3bd that currently breaks Zeek build (Christian Kreibich, Corelight)

2.8.0-dev.150 | 2024-12-06 00:02:47 -0800

* CI: add Ubuntu 24.10 (Christian Kreibich, Corelight)
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.8.0-dev.150
2.8.0-dev.154
6 changes: 6 additions & 0 deletions libbroker/broker/expected.hh
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ public:

// -- constructors, destructors, and assignment operators --------------------

template <class U>
expected(U x, std::enable_if_t<std::is_convertible_v<U, T>>* = nullptr)
: engaged_(true) {
new (std::addressof(value_)) T(std::move(x));
}

expected(T&& x) noexcept(nothrow_move) : engaged_(true) {
new (std::addressof(value_)) T(std::move(x));
}
Expand Down
3 changes: 2 additions & 1 deletion libbroker/broker/internal/connector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1944,7 +1944,8 @@ void connector::run_impl(listener* sub, shared_filter_type* filter) {
} else {
mgr.abort(*i);
}
while ((i->revents & read_mask) && mgr.must_read_more(*i))
while ((i->revents & read_mask) && (i->events & read_mask)
&& mgr.must_read_more(*i))
mgr.continue_reading(*i);
}
} while (--presult > 0 && advance());
Expand Down
4 changes: 2 additions & 2 deletions libbroker/broker/internal/core_actor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ void core_actor_state::shutdown(shutdown_options options) {
void core_actor_state::finalize_shutdown() {
// Drop any remaining state of peers.
for (auto& kvp : peers)
kvp.second->force_disconnect();
kvp.second->force_disconnect("shutting down");
peers.clear();
// Close the shared state for all peers.
peer_statuses->close();
Expand Down Expand Up @@ -873,7 +873,7 @@ caf::error core_actor_state::init_new_peer(endpoint_id peer_id,
.on_backpressure_buffer(peer_buffer_size(), peer_overflow_policy())
.do_on_error([this, ptr, peer_id](const caf::error& what) {
BROKER_INFO("remove peer" << peer_id << "due to:" << what);
ptr->force_disconnect();
ptr->force_disconnect(to_string(what));
})
.as_observable());
// Push messages received from the peer into the central merge point.
Expand Down
19 changes: 14 additions & 5 deletions libbroker/broker/internal/peering.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

#include <caf/scheduled_actor/flow.hpp>

using namespace std::literals;

namespace broker::internal {

namespace {
Expand Down Expand Up @@ -94,9 +96,14 @@ class suffix_generator : public affix_generator {

node_message first() override {
if (ptr_->removed()) {
auto msg = "removed connection to remote peer"s;
if (const auto& reason = ptr_->removed_reason(); !reason.empty()) {
msg += " (";
msg += reason;
msg += ')';
}
return make_status_msg(endpoint_info{ptr_->peer_id(), ptr_->addr()},
sc_constant<sc::peer_removed>(),
"removed connection to remote peer");
sc_constant<sc::peer_removed>(), msg.c_str());
} else {
return make_status_msg(endpoint_info{ptr_->peer_id(), ptr_->addr()},
sc_constant<sc::peer_lost>(),
Expand All @@ -119,18 +126,20 @@ void peering::on_bye_ack() {
bye_timeout_.dispose();
}

void peering::force_disconnect() {
void peering::force_disconnect(std::string reason) {
if (!removed_) {
removed_ = true;
removed_reason_ = std::move(reason);
}
on_bye_ack();
}

void peering::schedule_bye_timeout(caf::scheduled_actor* self) {
bye_timeout_.dispose();
bye_timeout_ =
self->run_delayed(defaults::unpeer_timeout,
[ptr = shared_from_this()] { ptr->force_disconnect(); });
self->run_delayed(defaults::unpeer_timeout, [ptr = shared_from_this()] {
ptr->force_disconnect("timeout during graceful disconnect");
});
}

void peering::assign_bye_token(std::array<std::byte, bye_token_size>& buf) {
Expand Down
9 changes: 8 additions & 1 deletion libbroker/broker/internal/peering.hh
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public:

/// Forces the peering to shut down its connection without performing the BYE
/// handshake.
void force_disconnect();
void force_disconnect(std::string reason);

void schedule_bye_timeout(caf::scheduled_actor* self);

Expand All @@ -63,6 +63,11 @@ public:
return removed_;
}

/// Returns the removal reason, which may be empty.
const std::string& removed_reason() const noexcept {
return removed_reason_;
}

/// Tag this peering as removed and send a BYE message on the `snk` for a
/// graceful shutdown.
void remove(caf::scheduled_actor* self,
Expand Down Expand Up @@ -117,6 +122,8 @@ private:
/// BYE message to the peer.
bool removed_ = false;

std::string removed_reason_;

/// Network address as reported from the transport (usually TCP).
network_info addr_;

Expand Down

0 comments on commit 28cdb75

Please sign in to comment.