From f6e14c229173d68be05c63d13b8bb970ecece3c3 Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Sun, 24 Nov 2024 14:37:49 +0100 Subject: [PATCH] Removes run_op from runner and renames runner to resp3_handshaker. --- .../boost/redis/detail/connection_base.hpp | 169 +++++++++-- include/boost/redis/detail/health_checker.hpp | 1 + .../boost/redis/detail/resp3_handshaker.hpp | 116 ++++++++ include/boost/redis/detail/runner.hpp | 269 ------------------ .../impl/{runner.ipp => resp3_handshaker.ipp} | 2 +- include/boost/redis/src.hpp | 2 +- test/test_low_level_sync_sans_io.cpp | 4 +- 7 files changed, 273 insertions(+), 290 deletions(-) create mode 100644 include/boost/redis/detail/resp3_handshaker.hpp delete mode 100644 include/boost/redis/detail/runner.hpp rename include/boost/redis/impl/{runner.ipp => resp3_handshaker.ipp} (94%) diff --git a/include/boost/redis/detail/connection_base.hpp b/include/boost/redis/detail/connection_base.hpp index fbfd4337..7771f703 100644 --- a/include/boost/redis/detail/connection_base.hpp +++ b/include/boost/redis/detail/connection_base.hpp @@ -8,39 +8,42 @@ #define BOOST_REDIS_CONNECTION_BASE_HPP #include +#include +#include +#include #include +#include +#include #include #include #include #include -#include -#include -#include -#include -#include #include -#include #include #include +#include +#include +#include +#include #include +#include +#include +#include #include #include #include #include -#include -#include -#include -#include +#include #include #include #include #include +#include #include #include #include -#include namespace boost::redis::detail { @@ -318,6 +321,135 @@ struct reader_op { } }; +template +class run_op { +private: + Conn* conn_ = nullptr; + Logger logger_; + asio::coroutine coro_{}; + + using order_t = std::array; + +public: + run_op(Conn* conn, Logger l) + : conn_{conn} + , logger_{l} + {} + + template + void operator()( Self& self + , order_t order = {} + , system::error_code ec0 = {} + , system::error_code ec1 = {} + , system::error_code ec2 = {} + , system::error_code ec3 = {} + , system::error_code ec4 = {}) + { + BOOST_ASIO_CORO_REENTER (coro_) for (;;) + { + BOOST_ASIO_CORO_YIELD + conn_->resv_.async_resolve(asio::prepend(std::move(self), order_t {})); + + logger_.on_resolve(ec0, conn_->resv_.results()); + + if (ec0) { + self.complete(ec0); + return; + } + + BOOST_ASIO_CORO_YIELD + conn_->ctor_.async_connect( + conn_->next_layer().next_layer(), + conn_->resv_.results(), + asio::prepend(std::move(self), order_t {})); + + logger_.on_connect(ec0, conn_->ctor_.endpoint()); + + if (ec0) { + self.complete(ec0); + return; + } + + if (conn_->use_ssl()) { + BOOST_ASIO_CORO_YIELD + conn_->next_layer().async_handshake( + asio::ssl::stream_base::client, + asio::prepend( + asio::cancel_after( + conn_->cfg_.ssl_handshake_timeout, + std::move(self) + ), + order_t {} + ) + ); + + logger_.on_ssl_handshake(ec0); + + if (ec0) { + self.complete(ec0); + return; + } + } + + conn_->reset(); + + // Note: Oder is important here because the writer might + // trigger an async_write before the async_hello thereby + // causing an authentication problem. + BOOST_ASIO_CORO_YIELD + asio::experimental::make_parallel_group( + [this](auto token) { return conn_->handshaker_.async_hello(*conn_, logger_, token); }, + [this](auto token) { return conn_->health_checker_.async_ping(*conn_, logger_, token); }, + [this](auto token) { return conn_->health_checker_.async_check_timeout(*conn_, logger_, token);}, + [this](auto token) { return conn_->reader(logger_, token);}, + [this](auto token) { return conn_->writer(logger_, token);} + ).async_wait( + asio::experimental::wait_for_one_error(), + std::move(self)); + + if (order[0] == 0 && !!ec0) { + self.complete(ec0); + return; + } + + if (order[0] == 2 && ec2 == error::pong_timeout) { + self.complete(ec1); + return; + } + + // The receive operation must be cancelled because channel + // subscription does not survive a reconnection but requires + // re-subscription. + conn_->cancel(operation::receive); + + if (!conn_->will_reconnect()) { + conn_->cancel(operation::reconnection); + self.complete(ec3); + return; + } + + // It is safe to use the writer timer here because we are not + // connected. + conn_->writer_timer_.expires_after(conn_->cfg_.reconnect_wait_interval); + + BOOST_ASIO_CORO_YIELD + conn_->writer_timer_.async_wait(asio::prepend(std::move(self), order_t {})); + if (ec0) { + self.complete(ec0); + return; + } + + if (!conn_->will_reconnect()) { + self.complete(asio::error::operation_aborted); + return; + } + + conn_->reset_stream(); + } + } +}; + + /** @brief Base class for high level Redis asynchronous connections. * @ingroup high-level-api * @@ -350,7 +482,6 @@ class connection_base { , receive_channel_{ex, 256} , resv_{ex} , health_checker_{ex} - , runner_{ex, {}} , dbuf_{read_buffer_, max_read_size} { set_receive_response(ignore); @@ -464,9 +595,13 @@ class connection_base { resv_.set_config(cfg); ctor_.set_config(cfg); health_checker_.set_config(cfg); - runner_.set_config(cfg); + handshaker_.set_config(cfg); l.set_prefix(cfg.log_prefix); - return runner_.async_run(*this, l, std::move(token)); + + return asio::async_compose + < CompletionToken + , void(system::error_code) + >(run_op{this, l}, token, writer_timer_); } template @@ -490,7 +625,7 @@ class connection_base { using receive_channel_type = asio::experimental::channel; using resolver_type = resolver; using health_checker_type = health_checker; - using runner_type = runner; + using resp3_handshaker_type = resp3_handshaker; using adapter_type = std::function const&, system::error_code&)>; using receiver_adapter_type = std::function const&, system::error_code&)>; using exec_notifier_type = receive_channel_type; @@ -669,7 +804,7 @@ class connection_base { template friend struct reader_op; template friend struct writer_op; template friend struct exec_op; - template friend struct runner_op; + template friend class run_op; void cancel_push_requests() { @@ -899,7 +1034,7 @@ class connection_base { resolver_type resv_; connector ctor_; health_checker_type health_checker_; - runner_type runner_; + resp3_handshaker_type handshaker_; receiver_adapter_type receive_adapter_; using dyn_buffer_type = asio::dynamic_string_buffer, std::allocator>; diff --git a/include/boost/redis/detail/health_checker.hpp b/include/boost/redis/detail/health_checker.hpp index ddc7b940..378547d9 100644 --- a/include/boost/redis/detail/health_checker.hpp +++ b/include/boost/redis/detail/health_checker.hpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include diff --git a/include/boost/redis/detail/resp3_handshaker.hpp b/include/boost/redis/detail/resp3_handshaker.hpp new file mode 100644 index 00000000..d7e65743 --- /dev/null +++ b/include/boost/redis/detail/resp3_handshaker.hpp @@ -0,0 +1,116 @@ +/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#ifndef BOOST_REDIS_RUNNER_HPP +#define BOOST_REDIS_RUNNER_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +//#include +#include +#include +#include + +namespace boost::redis::detail +{ + +void push_hello(config const& cfg, request& req); + +// TODO: Can we avoid this whole function whose only purpose is to +// check for an error in the hello response and complete with an error +// so that the parallel group that starts it can exit? +template +struct hello_op { + Handshaker* handshaker_ = nullptr; + Connection* conn_ = nullptr; + Logger logger_; + asio::coroutine coro_{}; + + template + void operator()(Self& self, system::error_code ec = {}, std::size_t = 0) + { + BOOST_ASIO_CORO_REENTER (coro_) + { + handshaker_->add_hello(); + + BOOST_ASIO_CORO_YIELD + conn_->async_exec(handshaker_->hello_req_, handshaker_->hello_resp_, std::move(self)); + logger_.on_hello(ec, handshaker_->hello_resp_); + + if (ec) { + conn_->cancel(operation::run); + self.complete(ec); + return; + } + + if (handshaker_->has_error_in_response()) { + conn_->cancel(operation::run); + self.complete(error::resp3_hello); + return; + } + + self.complete({}); + } + } +}; + +template +class resp3_handshaker { +public: + void set_config(config const& cfg) + { cfg_ = cfg; } + + template + auto async_hello(Connection& conn, Logger l, CompletionToken token) + { + return asio::async_compose + < CompletionToken + , void(system::error_code) + >(hello_op{this, &conn, l}, token, conn); + } + +private: + template friend struct hello_op; + + void add_hello() + { + hello_req_.clear(); + if (hello_resp_.has_value()) + hello_resp_.value().clear(); + push_hello(cfg_, hello_req_); + } + + bool has_error_in_response() const noexcept + { + if (!hello_resp_.has_value()) + return true; + + auto f = [](auto const& e) + { + switch (e.data_type) { + case resp3::type::simple_error: + case resp3::type::blob_error: return true; + default: return false; + } + }; + + return std::any_of(std::cbegin(hello_resp_.value()), std::cend(hello_resp_.value()), f); + } + + request hello_req_; + generic_response hello_resp_; + config cfg_; +}; + +} // boost::redis::detail + +#endif // BOOST_REDIS_RUNNER_HPP diff --git a/include/boost/redis/detail/runner.hpp b/include/boost/redis/detail/runner.hpp deleted file mode 100644 index 42553bd3..00000000 --- a/include/boost/redis/detail/runner.hpp +++ /dev/null @@ -1,269 +0,0 @@ -/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com) - * - * Distributed under the Boost Software License, Version 1.0. (See - * accompanying file LICENSE.txt) - */ - -#ifndef BOOST_REDIS_RUNNER_HPP -#define BOOST_REDIS_RUNNER_HPP - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace boost::redis::detail -{ - -void push_hello(config const& cfg, request& req); - -// TODO: Can we avoid this whole function whose only purpose is to -// check for an error in the hello response and complete with an error -// so that the parallel group that starts it can exit? -template -struct hello_op { - Runner* runner_ = nullptr; - Connection* conn_ = nullptr; - Logger logger_; - asio::coroutine coro_{}; - - template - void operator()(Self& self, system::error_code ec = {}, std::size_t = 0) - { - BOOST_ASIO_CORO_REENTER (coro_) - { - runner_->add_hello(); - - BOOST_ASIO_CORO_YIELD - conn_->async_exec(runner_->hello_req_, runner_->hello_resp_, std::move(self)); - logger_.on_hello(ec, runner_->hello_resp_); - - if (ec) { - conn_->cancel(operation::run); - self.complete(ec); - return; - } - - if (runner_->has_error_in_response()) { - conn_->cancel(operation::run); - self.complete(error::resp3_hello); - return; - } - - self.complete({}); - } - } -}; - -template -class runner_op { -private: - Runner* runner_ = nullptr; - Connection* conn_ = nullptr; - Logger logger_; - asio::coroutine coro_{}; - - using order_t = std::array; - -public: - runner_op(Runner* runner, Connection* conn, Logger l) - : runner_{runner} - , conn_{conn} - , logger_{l} - {} - - template - void operator()( Self& self - , order_t order = {} - , system::error_code ec0 = {} - , system::error_code ec1 = {} - , system::error_code ec2 = {} - , system::error_code ec3 = {} - , system::error_code ec4 = {}) - { - BOOST_ASIO_CORO_REENTER (coro_) for (;;) - { - BOOST_ASIO_CORO_YIELD - conn_->resv_.async_resolve(asio::prepend(std::move(self), order_t {})); - - logger_.on_resolve(ec0, conn_->resv_.results()); - - if (ec0) { - self.complete(ec0); - return; - } - - BOOST_ASIO_CORO_YIELD - conn_->ctor_.async_connect( - conn_->next_layer().next_layer(), - conn_->resv_.results(), - asio::prepend(std::move(self), order_t {})); - - logger_.on_connect(ec0, conn_->ctor_.endpoint()); - - if (ec0) { - self.complete(ec0); - return; - } - - if (conn_->use_ssl()) { - BOOST_ASIO_CORO_YIELD - conn_->next_layer().async_handshake( - asio::ssl::stream_base::client, - asio::prepend( - asio::cancel_after( - runner_->cfg_.ssl_handshake_timeout, - std::move(self) - ), - order_t {} - ) - ); - - logger_.on_ssl_handshake(ec0); - - if (ec0) { - self.complete(ec0); - return; - } - } - - conn_->reset(); - - // Note: Oder is important here because the writer might - // trigger an async_write before the async_hello thereby - // causing an authentication problem. - BOOST_ASIO_CORO_YIELD - asio::experimental::make_parallel_group( - [this](auto token) { return runner_->async_hello(*conn_, logger_, token); }, - [this](auto token) { return conn_->health_checker_.async_ping(*conn_, logger_, token); }, - [this](auto token) { return conn_->health_checker_.async_check_timeout(*conn_, logger_, token);}, - [this](auto token) { return conn_->reader(logger_, token);}, - [this](auto token) { return conn_->writer(logger_, token);} - ).async_wait( - asio::experimental::wait_for_one_error(), - std::move(self)); - - if (order[0] == 0 && !!ec0) { - self.complete(ec0); - return; - } - - if (order[0] == 2 && ec2 == error::pong_timeout) { - self.complete(ec1); - return; - } - - // The receive operation must be cancelled because channel - // subscription does not survive a reconnection but requires - // re-subscription. - conn_->cancel(operation::receive); - - if (!conn_->will_reconnect()) { - conn_->cancel(operation::reconnection); - self.complete(ec3); - return; - } - - // It is safe to use the writer timer here because we are not - // connected. - conn_->writer_timer_.expires_after(conn_->cfg_.reconnect_wait_interval); - - BOOST_ASIO_CORO_YIELD - conn_->writer_timer_.async_wait(asio::prepend(std::move(self), order_t {})); - if (ec0) { - self.complete(ec0); - return; - } - - if (!conn_->will_reconnect()) { - self.complete(asio::error::operation_aborted); - return; - } - - conn_->reset_stream(); - } - } -}; - -template -class runner { -public: - runner(Executor ex, config cfg) - : cfg_{cfg} - { } - - void set_config(config const& cfg) - { - cfg_ = cfg; - } - - template - auto async_run(Connection& conn, Logger l, CompletionToken token) - { - return asio::async_compose - < CompletionToken - , void(system::error_code) - >(runner_op{this, &conn, l}, token, conn); - } - -private: - - template friend class runner_op; - template friend struct hello_op; - - template - auto async_hello(Connection& conn, Logger l, CompletionToken token) - { - return asio::async_compose - < CompletionToken - , void(system::error_code) - >(hello_op{this, &conn, l}, token, conn); - } - - void add_hello() - { - hello_req_.clear(); - if (hello_resp_.has_value()) - hello_resp_.value().clear(); - push_hello(cfg_, hello_req_); - } - - bool has_error_in_response() const noexcept - { - if (!hello_resp_.has_value()) - return true; - - auto f = [](auto const& e) - { - switch (e.data_type) { - case resp3::type::simple_error: - case resp3::type::blob_error: return true; - default: return false; - } - }; - - return std::any_of(std::cbegin(hello_resp_.value()), std::cend(hello_resp_.value()), f); - } - - request hello_req_; - generic_response hello_resp_; - config cfg_; -}; - -} // boost::redis::detail - -#endif // BOOST_REDIS_RUNNER_HPP diff --git a/include/boost/redis/impl/runner.ipp b/include/boost/redis/impl/resp3_handshaker.ipp similarity index 94% rename from include/boost/redis/impl/runner.ipp rename to include/boost/redis/impl/resp3_handshaker.ipp index 293ad92e..e18bf928 100644 --- a/include/boost/redis/impl/runner.ipp +++ b/include/boost/redis/impl/resp3_handshaker.ipp @@ -4,7 +4,7 @@ * accompanying file LICENSE.txt) */ -#include +#include namespace boost::redis::detail { diff --git a/include/boost/redis/src.hpp b/include/boost/redis/src.hpp index 2d5cac76..5ba662e0 100644 --- a/include/boost/redis/src.hpp +++ b/include/boost/redis/src.hpp @@ -10,7 +10,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/test/test_low_level_sync_sans_io.cpp b/test/test_low_level_sync_sans_io.cpp index 6f6acfe0..d36eccde 100644 --- a/test/test_low_level_sync_sans_io.cpp +++ b/test/test_low_level_sync_sans_io.cpp @@ -1,10 +1,10 @@ -/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) +/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com) * * Distributed under the Boost Software License, Version 1.0. (See * accompanying file LICENSE.txt) */ -#include +#include #include #include #define BOOST_TEST_MODULE conn-quit