From 5bd9891de5d25cf0dcfa9de131419856c47561b5 Mon Sep 17 00:00:00 2001 From: Dominik Charousset Date: Mon, 23 Sep 2024 17:05:14 +0200 Subject: [PATCH 1/3] Pick up backport for on_backpressure_buffer --- caf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/caf b/caf index 442fd107..10afbbc5 160000 --- a/caf +++ b/caf @@ -1 +1 @@ -Subproject commit 442fd10703e56a8300480eebb3f43a4971028227 +Subproject commit 10afbbc5ee40263b258b7cf3f0e5abb436f79e89 From e2959877bf6c00ae94096a080bd804f0c66c81e8 Mon Sep 17 00:00:00 2001 From: Dominik Charousset Date: Mon, 23 Sep 2024 17:38:04 +0200 Subject: [PATCH 2/3] Disconnect slow peers and WS clients by default --- libbroker/CMakeLists.txt | 1 + libbroker/broker/configuration.cc | 13 ++ libbroker/broker/configuration.hh | 17 ++ libbroker/broker/defaults.hh | 17 ++ libbroker/broker/internal/core_actor.cc | 150 ++++++++++++++---- libbroker/broker/internal/core_actor.hh | 15 +- libbroker/broker/internal/peering.cc | 4 +- libbroker/broker/overflow_policy.cc | 34 ++++ libbroker/broker/overflow_policy.hh | 22 +++ tests/btest/CMakeLists.txt | 1 + tests/btest/peering/disconnect-on-overload.cc | 119 ++++++++++++++ 11 files changed, 362 insertions(+), 31 deletions(-) create mode 100644 libbroker/broker/overflow_policy.cc create mode 100644 libbroker/broker/overflow_policy.hh create mode 100644 tests/btest/peering/disconnect-on-overload.cc diff --git a/libbroker/CMakeLists.txt b/libbroker/CMakeLists.txt index 225273f1..214a989d 100644 --- a/libbroker/CMakeLists.txt +++ b/libbroker/CMakeLists.txt @@ -66,6 +66,7 @@ set(BROKER_SRC broker/internal_command.cc broker/mailbox.cc broker/network_info.cc + broker/overflow_policy.cc broker/p2p_message_type.cc broker/peer_status.cc broker/ping_envelope.cc diff --git a/libbroker/broker/configuration.cc b/libbroker/broker/configuration.cc index 9c0410b5..3fa3adb9 100644 --- a/libbroker/broker/configuration.cc +++ b/libbroker/broker/configuration.cc @@ -20,6 +20,7 @@ #include "broker/address.hh" #include "broker/alm/multipath.hh" #include "broker/config.hh" +#include "broker/convert.hh" #include "broker/data.hh" #include "broker/endpoint.hh" #include "broker/internal/configuration_access.hh" @@ -113,6 +114,10 @@ struct configuration::impl : public caf::actor_system_config { .add(options.disable_forwarding, "disable-forwarding", "disables forwarding of incoming data to peers") .add(options.ttl, "ttl", "drop messages after traversing TTL hops") + .add(options.peer_buffer_size, "peer-buffer-size", + "maximum number of items we buffer per peer before dropping it") + .add(options.web_socket_buffer_size, "web_socket-buffer-size", + "maximum number of items we buffer per web_socket") .add("recording-directory", "path for storing recorded meta information") .add( @@ -174,6 +179,8 @@ struct configuration::impl : public caf::actor_system_config { auto& grp = result["broker"].as_dictionary(); put_missing(grp, "disable-ssl", options.disable_ssl); put_missing(grp, "ttl", options.ttl); + put_missing(grp, "peer-buffer-size", options.peer_buffer_size); + put_missing(grp, "web_socket-buffer-size", options.web_socket_buffer_size); put_missing(grp, "disable-forwarding", options.disable_forwarding); if (auto path = get_as(content, "broker.recording-directory")) put_missing(grp, "recording-directory", std::move(*path)); @@ -199,6 +206,12 @@ configuration::configuration(skip_init_t) { configuration::configuration(broker_options opts) : configuration(skip_init) { impl_->options = opts; impl_->set("broker.ttl", opts.ttl); + impl_->set("broker.peer-buffer-size", opts.peer_buffer_size); + caf::put(impl_->content, "broker.peer-overflow-policy", + broker::to_string(opts.peer_overflow_policy)); + impl_->set("broker.web_socket-buffer-size", opts.web_socket_buffer_size); + caf::put(impl_->content, "broker.web_socket-overflow-policy", + broker::to_string(opts.web_socket_overflow_policy)); caf::put(impl_->content, "disable-forwarding", opts.disable_forwarding); init(0, nullptr); impl_->config_file_path = "broker.conf"; diff --git a/libbroker/broker/configuration.hh b/libbroker/broker/configuration.hh index 437f52f9..9da70546 100644 --- a/libbroker/broker/configuration.hh +++ b/libbroker/broker/configuration.hh @@ -1,6 +1,7 @@ #pragma once #include "broker/defaults.hh" +#include "broker/overflow_policy.hh" #include #include @@ -46,6 +47,22 @@ struct broker_options { /// How many hops we forward at the most before dropping a message. uint16_t ttl = defaults::ttl; + /// Configures how many items we buffer at most per peer before considering + /// it unreseponsive and dropping the connection. + size_t peer_buffer_size = defaults::peer_buffer_size; + + /// Configures how Broker responds to peers that cannot keep up with the + /// incoming message rate. + overflow_policy peer_overflow_policy = overflow_policy::disconnect; + + /// Configures how many items we buffer at most per web_socket client before + /// considering it unreseponsive and dropping the connection. + size_t web_socket_buffer_size = defaults::web_socket_buffer_size; + + /// Configures how Broker responds to web_sockets that cannot keep up with the + /// incoming message rate. + overflow_policy web_socket_overflow_policy = overflow_policy::disconnect; + broker_options() = default; broker_options(const broker_options&) = default; diff --git a/libbroker/broker/defaults.hh b/libbroker/broker/defaults.hh index aa66048c..706bfbcb 100644 --- a/libbroker/broker/defaults.hh +++ b/libbroker/broker/defaults.hh @@ -1,5 +1,6 @@ #pragma once +#include "broker/overflow_policy.hh" #include "broker/time.hh" #include @@ -22,6 +23,22 @@ constexpr timespan await_peer_timeout = std::chrono::seconds{10}; /// Configures the default timeout for unpeering from another node. constexpr timespan unpeer_timeout = std::chrono::seconds{3}; +/// Configures how many items we buffer at most per peer before considering it +/// unreseponsive and dropping the connection. +constexpr size_t peer_buffer_size = 2048; + +/// Configures how Broker responds to peers that cannot keep up with the +/// incoming message rate. +constexpr auto peer_overflow_policy = overflow_policy::disconnect; + +/// Configures how many items we buffer at most per web_socket client before +/// considering it unreseponsive and dropping the connection. +constexpr size_t web_socket_buffer_size = 512; + +/// Configures how Broker responds to web_sockets that cannot keep up with the +/// incoming message rate. +constexpr auto web_socket_overflow_policy = overflow_policy::disconnect; + } // namespace broker::defaults namespace broker::defaults::subscriber { diff --git a/libbroker/broker/internal/core_actor.cc b/libbroker/broker/internal/core_actor.cc index f377e5fa..f10520e6 100644 --- a/libbroker/broker/internal/core_actor.cc +++ b/libbroker/broker/internal/core_actor.cc @@ -399,6 +399,7 @@ caf::behavior core_actor_state::make_behavior() { }, // -- interface for publishers --------------------------------------------- [this](data_consumer_res src) { + auto consumer_id = endpoint_id::random(); auto [in, sub] = self ->make_observable() // @@ -409,10 +410,16 @@ caf::behavior core_actor_state::make_behavior() { .map([this](const data_message& msg) { return node_message{msg}; }) .compose(local_publisher_scope_adder()) .compose(add_killswitch_t{}); - flow_inputs.push(in); + flow_inputs.push(in.do_finally([this, consumer_id] { + auto i = subscriptions.find(consumer_id); + if (i != subscriptions.end()) { + subscriptions.erase(i); + } + }) + .as_observable()); // TODO: next lines seems to be a false positive, but maybe there's // something we can do upstream to avoid the alert. - subscriptions.push_back(sub); // NOLINT + subscriptions[consumer_id].push_back(sub); // NOLINT }, // -- data store management ------------------------------------------------ [this](atom::data_store, atom::clone, atom::attach, const std::string& name, @@ -513,8 +520,11 @@ void core_actor_state::shutdown(shutdown_options options) { // We no longer add new input flows. flow_inputs.close(); // Cancel all subscriptions to local publishers. - for (auto& sub : subscriptions) - sub.dispose(); + for (auto& [id, subs] : subscriptions) { + for (auto& sub : subs) { + sub.dispose(); + } + } subscriptions.clear(); // Inform our clients that we no longer wait for any peer. BROKER_DEBUG("cancel" << awaited_peers.size() @@ -720,10 +730,28 @@ void core_actor_state::client_added(endpoint_id client_id, void core_actor_state::client_removed(endpoint_id client_id, const network_info& addr, - const std::string& type) { + const std::string& type, + const caf::error& reason, bool removed) { BROKER_TRACE(BROKER_ARG(client_id) << BROKER_ARG(addr) << BROKER_ARG(type)); - emit(endpoint_info{client_id, addr, type}, sc_constant(), - "lost connection to client"); + auto i = subscriptions.find(client_id); + if (i == subscriptions.end()) { + return; + } + disposable_list subs; + i->second.swap(subs); + subscriptions.erase(i); + for (auto& sub : subs) { + sub.dispose(); + } + metrics.web_socket_connections->Decrement(); + if (removed) { + auto msg = "client removed: " + to_string(reason); + emit(endpoint_info{client_id, addr, type}, sc_constant(), + msg.c_str()); + } else { + emit(endpoint_info{client_id, addr, type}, sc_constant(), + "lost connection to client"); + } emit(endpoint_info{client_id, std::nullopt, type}, sc_constant(), "lost the last path"); } @@ -841,6 +869,12 @@ caf::error core_actor_state::init_new_peer(endpoint_id peer_id, return msg; return msg->with(id, msg->receiver()); }) + // Disconnect unresponsive peers. + .on_backpressure_buffer(peer_buffer_size(), peer_overflow_policy()) + .do_on_error([this, ptr](const caf::error& what) { + CAF_LOG_INFO("remove peer" << peer_id << "due to:" << what); + ptr->force_disconnect(); + }) .as_observable()); // Push messages received from the peer into the central merge point. flow_inputs.push( // @@ -939,33 +973,44 @@ caf::error core_actor_state::init_new_client(const network_info& addr, client_added(client_id, addr, type); // Hook into the central merge point for forwarding the data to the client. if (out_res) { - auto sub = central_merge - // Select by subscription. - .filter([this, filt = std::move(filter), - client_id](const node_message& msg) { - if (get_type(msg) != packed_message_type::data - || get_sender(msg) == client_id) - return false; - detail::prefix_matcher f; - return f(filt, get_topic(msg)); - }) - // Deserialize payload and wrap it into a data message. - .map([this](const node_message& msg) { // - return msg->as_data(); - }) - // Emit values to the producer resource. - .subscribe(std::move(out_res)); - subscriptions.emplace_back(sub); + auto sub = + central_merge + // Select by subscription. + .filter( + [this, filt = std::move(filter), client_id](const node_message& msg) { + if (get_type(msg) != packed_message_type::data + || get_sender(msg) == client_id) + return false; + detail::prefix_matcher f; + return f(filt, get_topic(msg)); + }) + // Deserialize payload and wrap it into a data message. + .map([this](const node_message& msg) { // + return msg->as_data(); + }) + // Disconnect unresponsive clients. + .on_backpressure_buffer(web_socket_buffer_size(), + web_socket_overflow_policy()) + .do_on_error([this, client_id, addr, type](const caf::error& reason) { + BROKER_DEBUG("client" << addr << "disconnected"); + client_removed(client_id, addr, type, reason, true); + }) + // Emit values to the producer resource. + .subscribe(std::move(out_res)); + subscriptions[client_id].emplace_back(sub); } // Push messages received from the client into the central merge point. auto [in, ks] = self->make_observable() .from_resource(std::move(in_res)) // If the client closes this buffer, we assume a disconnect. - .do_finally([this, client_id, addr, type] { + .do_on_complete([this, client_id, addr, type] { BROKER_DEBUG("client" << addr << "disconnected"); - client_removed(client_id, addr, type); - metrics.web_socket_connections->Decrement(); + client_removed(client_id, addr, type, caf::error{}, false); + }) + .do_on_error([this, client_id, addr, type](const caf::error& reason) { + BROKER_DEBUG("client" << addr << "disconnected"); + client_removed(client_id, addr, type, reason, false); }) .map([this, client_id](const data_message& msg) { metrics_for(packed_message_type::data).buffered->Increment(); @@ -980,7 +1025,7 @@ caf::error core_actor_state::init_new_client(const network_info& addr, .on_error_complete() .compose(add_killswitch_t{}); flow_inputs.push(in); - subscriptions.emplace_back(ks); + subscriptions[client_id].emplace_back(ks); return caf::none; } @@ -1167,4 +1212,53 @@ bool core_actor_state::shutting_down() { return !self->has_behavior(); } +// -- properties --------------------------------------------------------------- + +namespace { + +caf::flow::backpressure_overflow_strategy +overflow_policy_from_string(const std::string* str, overflow_policy fallback) { + using caf::flow::backpressure_overflow_strategy; + if (str != nullptr) { + if (*str == "drop_newest") { + return backpressure_overflow_strategy::drop_newest; + } + if (*str == "drop_oldest") { + return backpressure_overflow_strategy::drop_oldest; + } + if (*str == "disconnect") { + return backpressure_overflow_strategy::fail; + } + } + // Note: overflow_policy and backpressure_overflow_strategy have the same + // values. Hence, casting one to the other is safe. + return static_cast(fallback); +} + +} // namespace + +size_t core_actor_state::peer_buffer_size() { + return caf::get_or(self->config(), "broker.peer-buffer-size", + defaults::peer_buffer_size); +} + +caf::flow::backpressure_overflow_strategy +core_actor_state::peer_overflow_policy() { + auto* str = caf::get_if(&self->config(), + "broker.peer-overflow-policy"); + return overflow_policy_from_string(str, defaults::peer_overflow_policy); +} + +size_t core_actor_state::web_socket_buffer_size() { + return caf::get_or(self->config(), "broker.web-socket-buffer-size", + defaults::web_socket_buffer_size); +} + +caf::flow::backpressure_overflow_strategy +core_actor_state::web_socket_overflow_policy() { + auto* str = caf::get_if(&self->config(), + "broker.web-socket-overflow-policy"); + return overflow_policy_from_string(str, defaults::web_socket_overflow_policy); +} + } // namespace broker::internal diff --git a/libbroker/broker/internal/core_actor.hh b/libbroker/broker/internal/core_actor.hh index 4af9ef2f..7245ebd5 100644 --- a/libbroker/broker/internal/core_actor.hh +++ b/libbroker/broker/internal/core_actor.hh @@ -140,7 +140,8 @@ public: /// Called whenever a client disconnected. void client_removed(endpoint_id client_id, const network_info& addr, - const std::string& type); + const std::string& type, const caf::error& reason, + bool removed); // -- connection management -------------------------------------------------- @@ -213,6 +214,14 @@ public: // -- properties ------------------------------------------------------------- + size_t peer_buffer_size(); + + caf::flow::backpressure_overflow_strategy peer_overflow_policy(); + + size_t web_socket_buffer_size(); + + caf::flow::backpressure_overflow_strategy web_socket_overflow_policy(); + /// Points to the actor itself. caf::event_based_actor* self; @@ -285,8 +294,10 @@ public: /// memory regions over and over again. caf::byte_buffer buf; + using disposable_list = std::vector; + /// Stores the subscriptions for our input sources to allow us to cancel them. - std::vector subscriptions; + std::map subscriptions; /// Bundles state for a subscriber that does not integrate into the flows. struct legacy_subscriber { diff --git a/libbroker/broker/internal/peering.cc b/libbroker/broker/internal/peering.cc index de4c1ca7..dbd43a81 100644 --- a/libbroker/broker/internal/peering.cc +++ b/libbroker/broker/internal/peering.cc @@ -120,7 +120,9 @@ void peering::on_bye_ack() { } void peering::force_disconnect() { - assert(removed_); + if (!removed_) { + removed_ = true; + } on_bye_ack(); } diff --git a/libbroker/broker/overflow_policy.cc b/libbroker/broker/overflow_policy.cc new file mode 100644 index 00000000..573aa543 --- /dev/null +++ b/libbroker/broker/overflow_policy.cc @@ -0,0 +1,34 @@ +#include "broker/overflow_policy.hh" + +#include +#include + +namespace broker { + +void convert(overflow_policy src, std::string& dst) { + switch (src) { + case overflow_policy::disconnect: + dst = "disconnect"; + return; + case overflow_policy::drop_newest: + dst = "drop_newest"; + return; + case overflow_policy::drop_oldest: + dst = "drop_oldest"; + return; + } + dst = "invalid"; +} + +bool convert(const std::string& src, overflow_policy& dst) { + std::string_view values[] = {"disconnect", "drop_newest", "drop_oldest"}; + for (size_t index = 0; index < std::size(values); ++index) { + if (src == values[index]) { + dst = static_cast(index); + return true; + } + } + return false; +} + +} // namespace broker diff --git a/libbroker/broker/overflow_policy.hh b/libbroker/broker/overflow_policy.hh new file mode 100644 index 00000000..d794d252 --- /dev/null +++ b/libbroker/broker/overflow_policy.hh @@ -0,0 +1,22 @@ +#pragma once + +#include +#include + +namespace broker { + +/// Configures how Broker handles message overflow in its peer buffers. +enum class overflow_policy { + /// Drops the newest item when the buffer is full. + drop_newest, + /// Drops the oldest item when the buffer is full. + drop_oldest, + /// Disconnects the peer when its buffer is full. + disconnect, +}; + +void convert(overflow_policy src, std::string& dst); + +bool convert(const std::string& src, overflow_policy& dst); + +} // namespace broker diff --git a/tests/btest/CMakeLists.txt b/tests/btest/CMakeLists.txt index 4105462a..385a9c09 100644 --- a/tests/btest/CMakeLists.txt +++ b/tests/btest/CMakeLists.txt @@ -4,6 +4,7 @@ function(add_btest_executable target source) target_link_libraries(btest-${target} PRIVATE ${BROKER_LIBRARY} ${ARGN}) endfunction() +add_btest_executable(disconnect-on-overload peering/disconnect-on-overload.cc) add_btest_executable(put-unique store/put-unique.cc CAF::core CAF::net) add_btest_executable(sqlite-driver store/sqlite-driver.cc CAF::core CAF::net) add_btest_executable(peers endpoint/peers.cc CAF::core CAF::net) diff --git a/tests/btest/peering/disconnect-on-overload.cc b/tests/btest/peering/disconnect-on-overload.cc new file mode 100644 index 00000000..b5b73d08 --- /dev/null +++ b/tests/btest/peering/disconnect-on-overload.cc @@ -0,0 +1,119 @@ +// Driver for the disconnect-on-overload test. One process subscribes to topic +// /test/data and another one publishes messages to it. The subscriber will +// consume messages at a very slow rate, causing the publisher to buffer them +// and eventually disconnect due to overload. +// +// Rendesvouz: +// - receiver waits for a message on /rendezvous/ping, when received, it sends a +// message on /rendezvous/pong +// - publisher waits for a message on /rendezvous/pong, when received, it starts +// sending messages on /test +// +// Both processes terminate in response to the disconnect event. + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +using namespace std::literals; + +struct invalid_usage : public std::runtime_error { + using std::runtime_error::runtime_error; +}; + +struct done_predicate { + bool operator()(broker::none) const { + return false; + } + + bool operator()(const broker::error& err) const { + std::cerr << "Error: " << broker::to_string(err) << '\n'; + return true; + } + + bool operator()(const broker::status& st) const { + std::cerr << "Status: " << broker::to_string(st) << '\n'; + return st == broker::sc::peer_removed || st == broker::sc::peer_lost; + } +}; + +int run_sender(broker::endpoint& ep, uint16_t port) { + auto ssub = ep.make_status_subscriber(true); + auto ok = ep.peer("localhost", port, 0s); + if (!ok) { + throw std::runtime_error{"failed to peer with receiver"}; + } + auto value = broker::count{0}; + for (;;) { + // Check every 50 messages whether the receiver disconnected. + for (auto i = 0; i < 50; ++i) { + ep.publish("/test/data", broker::data{value}); + ++value; + } + if (std::visit(done_predicate{}, ssub.get(10ms))) { + return EXIT_SUCCESS; + } + } +} + +int run_receiver(broker::endpoint& ep, uint16_t port) { + auto ssub = ep.make_status_subscriber(true); + auto vsub = ep.make_subscriber({"/test/data"}); + auto used_port = ep.listen({}, port); + if (used_port != port) { + throw std::runtime_error{"failed to listen on port " + + std::to_string(port)}; + } + puts(""); + // Read a message every 50ms until the peer disconnects. + for (;;) { + std::this_thread::sleep_for(50ms); + auto val = vsub.get(0s); + if (val) { + std::cout << "\rreceived: " << broker::to_string(*val) << std::flush; + } + if (std::visit(done_predicate{}, ssub.get(10ms))) { + return EXIT_SUCCESS; + } + } +} + +int main(int argc, char** argv) { + setvbuf(stdout, nullptr, _IOLBF, 0); // Always line-buffer stdout. + try { + if (argc != 3) { + throw invalid_usage{"missing role / port"}; + } + auto role = std::string{argv[1]}; + auto port = static_cast(std::stoi(argv[2])); + broker::broker_options opts; + opts.peer_buffer_size = 128; + broker::endpoint ep{broker::configuration{opts}}; + if (role == "sender") { + return run_sender(ep, port); + } else if (role == "receiver") { + return run_receiver(ep, port); + } else { + throw invalid_usage{"invalid role"}; + } + } catch (const invalid_usage&) { + std::cerr << "Usage:\n" + << "- disconnect-on-overload sender \n" + << "- disconnect-on-overload receiver \n"; + return EXIT_FAILURE; + } catch (const std::exception& ex) { + std::cerr << "Error: " << ex.what() << '\n'; + return EXIT_FAILURE; + } catch (...) { + std::cerr << "Unknown error\n"; + return EXIT_FAILURE; + } +} From f848eb48a8cebe30f183dacf9433658681a5093f Mon Sep 17 00:00:00 2001 From: Dominik Charousset Date: Thu, 10 Oct 2024 16:53:20 +0200 Subject: [PATCH 3/3] Fix log statement when disconnecting stalled peers --- libbroker/broker/internal/core_actor.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libbroker/broker/internal/core_actor.cc b/libbroker/broker/internal/core_actor.cc index f10520e6..41b37cf7 100644 --- a/libbroker/broker/internal/core_actor.cc +++ b/libbroker/broker/internal/core_actor.cc @@ -871,8 +871,8 @@ caf::error core_actor_state::init_new_peer(endpoint_id peer_id, }) // Disconnect unresponsive peers. .on_backpressure_buffer(peer_buffer_size(), peer_overflow_policy()) - .do_on_error([this, ptr](const caf::error& what) { - CAF_LOG_INFO("remove peer" << peer_id << "due to:" << what); + .do_on_error([this, ptr, peer_id](const caf::error& what) { + BROKER_INFO("remove peer" << peer_id << "due to:" << what); ptr->force_disconnect(); }) .as_observable());