Skip to content

Commit

Permalink
Merge branch 'issue/gh-426'
Browse files Browse the repository at this point in the history
* issue/gh-426:
  Fix log statement when disconnecting stalled peers
  Disconnect slow peers and WS clients by default
  Pick up backport for on_backpressure_buffer
  • Loading branch information
ckreibich committed Dec 3, 2024
2 parents d357955 + f848eb4 commit 7df7358
Show file tree
Hide file tree
Showing 14 changed files with 372 additions and 33 deletions.
8 changes: 8 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
2.8.0-dev.144 | 2024-12-02 23:59:25 -0800

* Fix log statement when disconnecting stalled peers (Dominik Charousset)

* Disconnect slow peers and WS clients by default (Dominik Charousset)

* Pick up backport for on_backpressure_buffer (Dominik Charousset)

2.8.0-dev.140 | 2024-11-25 19:00:28 +0100

* Fix invalid range clang-tidy findings (Dominik Charousset, Corelight)
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.8.0-dev.140
2.8.0-dev.144
1 change: 1 addition & 0 deletions libbroker/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ set(BROKER_SRC
broker/internal_command.cc
broker/mailbox.cc
broker/network_info.cc
broker/overflow_policy.cc
broker/p2p_message_type.cc
broker/peer_status.cc
broker/ping_envelope.cc
Expand Down
13 changes: 13 additions & 0 deletions libbroker/broker/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "broker/address.hh"
#include "broker/alm/multipath.hh"
#include "broker/config.hh"
#include "broker/convert.hh"
#include "broker/data.hh"
#include "broker/endpoint.hh"
#include "broker/internal/configuration_access.hh"
Expand Down Expand Up @@ -113,6 +114,10 @@ struct configuration::impl : public caf::actor_system_config {
.add(options.disable_forwarding, "disable-forwarding",
"disables forwarding of incoming data to peers")
.add(options.ttl, "ttl", "drop messages after traversing TTL hops")
.add(options.peer_buffer_size, "peer-buffer-size",
"maximum number of items we buffer per peer before dropping it")
.add(options.web_socket_buffer_size, "web_socket-buffer-size",
"maximum number of items we buffer per web_socket")
.add<string>("recording-directory",
"path for storing recorded meta information")
.add<size_t>(
Expand Down Expand Up @@ -174,6 +179,8 @@ struct configuration::impl : public caf::actor_system_config {
auto& grp = result["broker"].as_dictionary();
put_missing(grp, "disable-ssl", options.disable_ssl);
put_missing(grp, "ttl", options.ttl);
put_missing(grp, "peer-buffer-size", options.peer_buffer_size);
put_missing(grp, "web_socket-buffer-size", options.web_socket_buffer_size);
put_missing(grp, "disable-forwarding", options.disable_forwarding);
if (auto path = get_as<std::string>(content, "broker.recording-directory"))
put_missing(grp, "recording-directory", std::move(*path));
Expand All @@ -199,6 +206,12 @@ configuration::configuration(skip_init_t) {
configuration::configuration(broker_options opts) : configuration(skip_init) {
impl_->options = opts;
impl_->set("broker.ttl", opts.ttl);
impl_->set("broker.peer-buffer-size", opts.peer_buffer_size);
caf::put(impl_->content, "broker.peer-overflow-policy",
broker::to_string(opts.peer_overflow_policy));
impl_->set("broker.web_socket-buffer-size", opts.web_socket_buffer_size);
caf::put(impl_->content, "broker.web_socket-overflow-policy",
broker::to_string(opts.web_socket_overflow_policy));
caf::put(impl_->content, "disable-forwarding", opts.disable_forwarding);
init(0, nullptr);
impl_->config_file_path = "broker.conf";
Expand Down
17 changes: 17 additions & 0 deletions libbroker/broker/configuration.hh
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "broker/defaults.hh"
#include "broker/overflow_policy.hh"

#include <cstdint>
#include <memory>
Expand Down Expand Up @@ -46,6 +47,22 @@ struct broker_options {
/// How many hops we forward at the most before dropping a message.
uint16_t ttl = defaults::ttl;

/// Configures how many items we buffer at most per peer before considering
/// it unreseponsive and dropping the connection.
size_t peer_buffer_size = defaults::peer_buffer_size;

/// Configures how Broker responds to peers that cannot keep up with the
/// incoming message rate.
overflow_policy peer_overflow_policy = overflow_policy::disconnect;

/// Configures how many items we buffer at most per web_socket client before
/// considering it unreseponsive and dropping the connection.
size_t web_socket_buffer_size = defaults::web_socket_buffer_size;

/// Configures how Broker responds to web_sockets that cannot keep up with the
/// incoming message rate.
overflow_policy web_socket_overflow_policy = overflow_policy::disconnect;

broker_options() = default;

broker_options(const broker_options&) = default;
Expand Down
17 changes: 17 additions & 0 deletions libbroker/broker/defaults.hh
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "broker/overflow_policy.hh"
#include "broker/time.hh"

#include <chrono>
Expand All @@ -22,6 +23,22 @@ constexpr timespan await_peer_timeout = std::chrono::seconds{10};
/// Configures the default timeout for unpeering from another node.
constexpr timespan unpeer_timeout = std::chrono::seconds{3};

/// Configures how many items we buffer at most per peer before considering it
/// unreseponsive and dropping the connection.
constexpr size_t peer_buffer_size = 2048;

/// Configures how Broker responds to peers that cannot keep up with the
/// incoming message rate.
constexpr auto peer_overflow_policy = overflow_policy::disconnect;

/// Configures how many items we buffer at most per web_socket client before
/// considering it unreseponsive and dropping the connection.
constexpr size_t web_socket_buffer_size = 512;

/// Configures how Broker responds to web_sockets that cannot keep up with the
/// incoming message rate.
constexpr auto web_socket_overflow_policy = overflow_policy::disconnect;

} // namespace broker::defaults

namespace broker::defaults::subscriber {
Expand Down
150 changes: 122 additions & 28 deletions libbroker/broker/internal/core_actor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ caf::behavior core_actor_state::make_behavior() {
},
// -- interface for publishers ---------------------------------------------
[this](data_consumer_res src) {
auto consumer_id = endpoint_id::random();
auto [in, sub] =
self
->make_observable() //
Expand All @@ -409,10 +410,16 @@ caf::behavior core_actor_state::make_behavior() {
.map([this](const data_message& msg) { return node_message{msg}; })
.compose(local_publisher_scope_adder())
.compose(add_killswitch_t{});
flow_inputs.push(in);
flow_inputs.push(in.do_finally([this, consumer_id] {
auto i = subscriptions.find(consumer_id);
if (i != subscriptions.end()) {
subscriptions.erase(i);
}
})
.as_observable());
// TODO: next lines seems to be a false positive, but maybe there's
// something we can do upstream to avoid the alert.
subscriptions.push_back(sub); // NOLINT
subscriptions[consumer_id].push_back(sub); // NOLINT
},
// -- data store management ------------------------------------------------
[this](atom::data_store, atom::clone, atom::attach, const std::string& name,
Expand Down Expand Up @@ -513,8 +520,11 @@ void core_actor_state::shutdown(shutdown_options options) {
// We no longer add new input flows.
flow_inputs.close();
// Cancel all subscriptions to local publishers.
for (auto& sub : subscriptions)
sub.dispose();
for (auto& [id, subs] : subscriptions) {
for (auto& sub : subs) {
sub.dispose();
}
}
subscriptions.clear();
// Inform our clients that we no longer wait for any peer.
BROKER_DEBUG("cancel" << awaited_peers.size()
Expand Down Expand Up @@ -720,10 +730,28 @@ void core_actor_state::client_added(endpoint_id client_id,

void core_actor_state::client_removed(endpoint_id client_id,
const network_info& addr,
const std::string& type) {
const std::string& type,
const caf::error& reason, bool removed) {
BROKER_TRACE(BROKER_ARG(client_id) << BROKER_ARG(addr) << BROKER_ARG(type));
emit(endpoint_info{client_id, addr, type}, sc_constant<sc::peer_lost>(),
"lost connection to client");
auto i = subscriptions.find(client_id);
if (i == subscriptions.end()) {
return;
}
disposable_list subs;
i->second.swap(subs);
subscriptions.erase(i);
for (auto& sub : subs) {
sub.dispose();
}
metrics.web_socket_connections->Decrement();
if (removed) {
auto msg = "client removed: " + to_string(reason);
emit(endpoint_info{client_id, addr, type}, sc_constant<sc::peer_removed>(),
msg.c_str());
} else {
emit(endpoint_info{client_id, addr, type}, sc_constant<sc::peer_lost>(),
"lost connection to client");
}
emit(endpoint_info{client_id, std::nullopt, type},
sc_constant<sc::endpoint_unreachable>(), "lost the last path");
}
Expand Down Expand Up @@ -841,6 +869,12 @@ caf::error core_actor_state::init_new_peer(endpoint_id peer_id,
return msg;
return msg->with(id, msg->receiver());
})
// Disconnect unresponsive peers.
.on_backpressure_buffer(peer_buffer_size(), peer_overflow_policy())
.do_on_error([this, ptr, peer_id](const caf::error& what) {
BROKER_INFO("remove peer" << peer_id << "due to:" << what);
ptr->force_disconnect();
})
.as_observable());
// Push messages received from the peer into the central merge point.
flow_inputs.push( //
Expand Down Expand Up @@ -939,33 +973,44 @@ caf::error core_actor_state::init_new_client(const network_info& addr,
client_added(client_id, addr, type);
// Hook into the central merge point for forwarding the data to the client.
if (out_res) {
auto sub = central_merge
// Select by subscription.
.filter([this, filt = std::move(filter),
client_id](const node_message& msg) {
if (get_type(msg) != packed_message_type::data
|| get_sender(msg) == client_id)
return false;
detail::prefix_matcher f;
return f(filt, get_topic(msg));
})
// Deserialize payload and wrap it into a data message.
.map([this](const node_message& msg) { //
return msg->as_data();
})
// Emit values to the producer resource.
.subscribe(std::move(out_res));
subscriptions.emplace_back(sub);
auto sub =
central_merge
// Select by subscription.
.filter(
[this, filt = std::move(filter), client_id](const node_message& msg) {
if (get_type(msg) != packed_message_type::data
|| get_sender(msg) == client_id)
return false;
detail::prefix_matcher f;
return f(filt, get_topic(msg));
})
// Deserialize payload and wrap it into a data message.
.map([this](const node_message& msg) { //
return msg->as_data();
})
// Disconnect unresponsive clients.
.on_backpressure_buffer(web_socket_buffer_size(),
web_socket_overflow_policy())
.do_on_error([this, client_id, addr, type](const caf::error& reason) {
BROKER_DEBUG("client" << addr << "disconnected");
client_removed(client_id, addr, type, reason, true);
})
// Emit values to the producer resource.
.subscribe(std::move(out_res));
subscriptions[client_id].emplace_back(sub);
}
// Push messages received from the client into the central merge point.
auto [in, ks] =
self->make_observable()
.from_resource(std::move(in_res))
// If the client closes this buffer, we assume a disconnect.
.do_finally([this, client_id, addr, type] {
.do_on_complete([this, client_id, addr, type] {
BROKER_DEBUG("client" << addr << "disconnected");
client_removed(client_id, addr, type);
metrics.web_socket_connections->Decrement();
client_removed(client_id, addr, type, caf::error{}, false);
})
.do_on_error([this, client_id, addr, type](const caf::error& reason) {
BROKER_DEBUG("client" << addr << "disconnected");
client_removed(client_id, addr, type, reason, false);
})
.map([this, client_id](const data_message& msg) {
metrics_for(packed_message_type::data).buffered->Increment();
Expand All @@ -980,7 +1025,7 @@ caf::error core_actor_state::init_new_client(const network_info& addr,
.on_error_complete()
.compose(add_killswitch_t{});
flow_inputs.push(in);
subscriptions.emplace_back(ks);
subscriptions[client_id].emplace_back(ks);
return caf::none;
}

Expand Down Expand Up @@ -1167,4 +1212,53 @@ bool core_actor_state::shutting_down() {
return !self->has_behavior();
}

// -- properties ---------------------------------------------------------------

namespace {

caf::flow::backpressure_overflow_strategy
overflow_policy_from_string(const std::string* str, overflow_policy fallback) {
using caf::flow::backpressure_overflow_strategy;
if (str != nullptr) {
if (*str == "drop_newest") {
return backpressure_overflow_strategy::drop_newest;
}
if (*str == "drop_oldest") {
return backpressure_overflow_strategy::drop_oldest;
}
if (*str == "disconnect") {
return backpressure_overflow_strategy::fail;
}
}
// Note: overflow_policy and backpressure_overflow_strategy have the same
// values. Hence, casting one to the other is safe.
return static_cast<backpressure_overflow_strategy>(fallback);
}

} // namespace

size_t core_actor_state::peer_buffer_size() {
return caf::get_or(self->config(), "broker.peer-buffer-size",
defaults::peer_buffer_size);
}

caf::flow::backpressure_overflow_strategy
core_actor_state::peer_overflow_policy() {
auto* str = caf::get_if<std::string>(&self->config(),
"broker.peer-overflow-policy");
return overflow_policy_from_string(str, defaults::peer_overflow_policy);
}

size_t core_actor_state::web_socket_buffer_size() {
return caf::get_or(self->config(), "broker.web-socket-buffer-size",
defaults::web_socket_buffer_size);
}

caf::flow::backpressure_overflow_strategy
core_actor_state::web_socket_overflow_policy() {
auto* str = caf::get_if<std::string>(&self->config(),
"broker.web-socket-overflow-policy");
return overflow_policy_from_string(str, defaults::web_socket_overflow_policy);
}

} // namespace broker::internal
15 changes: 13 additions & 2 deletions libbroker/broker/internal/core_actor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ public:

/// Called whenever a client disconnected.
void client_removed(endpoint_id client_id, const network_info& addr,
const std::string& type);
const std::string& type, const caf::error& reason,
bool removed);

// -- connection management --------------------------------------------------

Expand Down Expand Up @@ -213,6 +214,14 @@ public:

// -- properties -------------------------------------------------------------

size_t peer_buffer_size();

caf::flow::backpressure_overflow_strategy peer_overflow_policy();

size_t web_socket_buffer_size();

caf::flow::backpressure_overflow_strategy web_socket_overflow_policy();

/// Points to the actor itself.
caf::event_based_actor* self;

Expand Down Expand Up @@ -285,8 +294,10 @@ public:
/// memory regions over and over again.
caf::byte_buffer buf;

using disposable_list = std::vector<caf::disposable>;

/// Stores the subscriptions for our input sources to allow us to cancel them.
std::vector<caf::disposable> subscriptions;
std::map<endpoint_id, disposable_list> subscriptions;

/// Bundles state for a subscriber that does not integrate into the flows.
struct legacy_subscriber {
Expand Down
4 changes: 3 additions & 1 deletion libbroker/broker/internal/peering.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ void peering::on_bye_ack() {
}

void peering::force_disconnect() {
assert(removed_);
if (!removed_) {
removed_ = true;
}
on_bye_ack();
}

Expand Down
Loading

0 comments on commit 7df7358

Please sign in to comment.