Skip to content

Commit

Permalink
Removes run_op from runner and renames runner to resp3_handshaker.
Browse files Browse the repository at this point in the history
  • Loading branch information
mzimbres committed Nov 24, 2024
1 parent d5de174 commit f6e14c2
Show file tree
Hide file tree
Showing 7 changed files with 273 additions and 290 deletions.
169 changes: 152 additions & 17 deletions include/boost/redis/detail/connection_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,42 @@
#define BOOST_REDIS_CONNECTION_BASE_HPP

#include <boost/redis/adapter/adapt.hpp>
#include <boost/redis/config.hpp>
#include <boost/redis/detail/connector.hpp>
#include <boost/redis/detail/health_checker.hpp>
#include <boost/redis/detail/helper.hpp>
#include <boost/redis/detail/resolver.hpp>
#include <boost/redis/detail/resp3_handshaker.hpp>
#include <boost/redis/error.hpp>
#include <boost/redis/operation.hpp>
#include <boost/redis/request.hpp>
#include <boost/redis/resp3/type.hpp>
#include <boost/redis/config.hpp>
#include <boost/redis/detail/resolver.hpp>
#include <boost/redis/detail/connector.hpp>
#include <boost/redis/detail/health_checker.hpp>
#include <boost/redis/detail/runner.hpp>
#include <boost/redis/usage.hpp>

#include <boost/system.hpp>
#include <boost/asio/basic_stream_socket.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/asio/cancel_after.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <boost/asio/experimental/parallel_group.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/prepend.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/write.hpp>
#include <boost/assert.hpp>
#include <boost/core/ignore_unused.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <boost/system.hpp>

#include <algorithm>
#include <array>
#include <chrono>
#include <deque>
#include <functional>
#include <memory>
#include <string_view>
#include <type_traits>
#include <functional>

namespace boost::redis::detail
{
Expand Down Expand Up @@ -318,6 +321,135 @@ struct reader_op {
}
};

template <class Conn, class Logger>
class run_op {
private:
Conn* conn_ = nullptr;
Logger logger_;
asio::coroutine coro_{};

using order_t = std::array<std::size_t, 5>;

public:
run_op(Conn* conn, Logger l)
: conn_{conn}
, logger_{l}
{}

template <class Self>
void operator()( Self& self
, order_t order = {}
, system::error_code ec0 = {}
, system::error_code ec1 = {}
, system::error_code ec2 = {}
, system::error_code ec3 = {}
, system::error_code ec4 = {})
{
BOOST_ASIO_CORO_REENTER (coro_) for (;;)
{
BOOST_ASIO_CORO_YIELD
conn_->resv_.async_resolve(asio::prepend(std::move(self), order_t {}));

logger_.on_resolve(ec0, conn_->resv_.results());

if (ec0) {
self.complete(ec0);
return;
}

BOOST_ASIO_CORO_YIELD
conn_->ctor_.async_connect(
conn_->next_layer().next_layer(),
conn_->resv_.results(),
asio::prepend(std::move(self), order_t {}));

logger_.on_connect(ec0, conn_->ctor_.endpoint());

if (ec0) {
self.complete(ec0);
return;
}

if (conn_->use_ssl()) {
BOOST_ASIO_CORO_YIELD
conn_->next_layer().async_handshake(
asio::ssl::stream_base::client,
asio::prepend(
asio::cancel_after(
conn_->cfg_.ssl_handshake_timeout,
std::move(self)
),
order_t {}
)
);

logger_.on_ssl_handshake(ec0);

if (ec0) {
self.complete(ec0);
return;
}
}

conn_->reset();

// Note: Oder is important here because the writer might
// trigger an async_write before the async_hello thereby
// causing an authentication problem.
BOOST_ASIO_CORO_YIELD
asio::experimental::make_parallel_group(
[this](auto token) { return conn_->handshaker_.async_hello(*conn_, logger_, token); },
[this](auto token) { return conn_->health_checker_.async_ping(*conn_, logger_, token); },
[this](auto token) { return conn_->health_checker_.async_check_timeout(*conn_, logger_, token);},
[this](auto token) { return conn_->reader(logger_, token);},
[this](auto token) { return conn_->writer(logger_, token);}
).async_wait(
asio::experimental::wait_for_one_error(),
std::move(self));

if (order[0] == 0 && !!ec0) {
self.complete(ec0);
return;
}

if (order[0] == 2 && ec2 == error::pong_timeout) {
self.complete(ec1);
return;
}

// The receive operation must be cancelled because channel
// subscription does not survive a reconnection but requires
// re-subscription.
conn_->cancel(operation::receive);

if (!conn_->will_reconnect()) {
conn_->cancel(operation::reconnection);
self.complete(ec3);
return;
}

// It is safe to use the writer timer here because we are not
// connected.
conn_->writer_timer_.expires_after(conn_->cfg_.reconnect_wait_interval);

BOOST_ASIO_CORO_YIELD
conn_->writer_timer_.async_wait(asio::prepend(std::move(self), order_t {}));
if (ec0) {
self.complete(ec0);
return;
}

if (!conn_->will_reconnect()) {
self.complete(asio::error::operation_aborted);
return;
}

conn_->reset_stream();
}
}
};


/** @brief Base class for high level Redis asynchronous connections.
* @ingroup high-level-api
*
Expand Down Expand Up @@ -350,7 +482,6 @@ class connection_base {
, receive_channel_{ex, 256}
, resv_{ex}
, health_checker_{ex}
, runner_{ex, {}}
, dbuf_{read_buffer_, max_read_size}
{
set_receive_response(ignore);
Expand Down Expand Up @@ -464,9 +595,13 @@ class connection_base {
resv_.set_config(cfg);
ctor_.set_config(cfg);
health_checker_.set_config(cfg);
runner_.set_config(cfg);
handshaker_.set_config(cfg);
l.set_prefix(cfg.log_prefix);
return runner_.async_run(*this, l, std::move(token));

return asio::async_compose
< CompletionToken
, void(system::error_code)
>(run_op<this_type, Logger>{this, l}, token, writer_timer_);
}

template <class Response>
Expand All @@ -490,7 +625,7 @@ class connection_base {
using receive_channel_type = asio::experimental::channel<executor_type, void(system::error_code, std::size_t)>;
using resolver_type = resolver<Executor>;
using health_checker_type = health_checker<Executor>;
using runner_type = runner<executor_type>;
using resp3_handshaker_type = resp3_handshaker<executor_type>;
using adapter_type = std::function<void(std::size_t, resp3::basic_node<std::string_view> const&, system::error_code&)>;
using receiver_adapter_type = std::function<void(resp3::basic_node<std::string_view> const&, system::error_code&)>;
using exec_notifier_type = receive_channel_type;
Expand Down Expand Up @@ -669,7 +804,7 @@ class connection_base {
template <class, class> friend struct reader_op;
template <class, class> friend struct writer_op;
template <class> friend struct exec_op;
template <class, class, class> friend struct runner_op;
template <class, class> friend class run_op;

void cancel_push_requests()
{
Expand Down Expand Up @@ -899,7 +1034,7 @@ class connection_base {
resolver_type resv_;
connector ctor_;
health_checker_type health_checker_;
runner_type runner_;
resp3_handshaker_type handshaker_;
receiver_adapter_type receive_adapter_;

using dyn_buffer_type = asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>>;
Expand Down
1 change: 1 addition & 0 deletions include/boost/redis/detail/health_checker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <boost/redis/request.hpp>
#include <boost/redis/response.hpp>
#include <boost/redis/config.hpp>
#include <boost/redis/operation.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/compose.hpp>
#include <boost/asio/consign.hpp>
Expand Down
116 changes: 116 additions & 0 deletions include/boost/redis/detail/resp3_handshaker.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/* Copyright (c) 2018-2024 Marcelo Zimbres Silva ([email protected])
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/

#ifndef BOOST_REDIS_RUNNER_HPP
#define BOOST_REDIS_RUNNER_HPP

#include <boost/redis/config.hpp>
#include <boost/redis/request.hpp>
#include <boost/redis/response.hpp>
#include <boost/redis/error.hpp>
#include <boost/redis/logger.hpp>
#include <boost/redis/operation.hpp>
#include <boost/asio/compose.hpp>
#include <boost/asio/coroutine.hpp>
//#include <boost/asio/ip/tcp.hpp>
#include <string>
#include <memory>
#include <chrono>

namespace boost::redis::detail
{

void push_hello(config const& cfg, request& req);

// TODO: Can we avoid this whole function whose only purpose is to
// check for an error in the hello response and complete with an error
// so that the parallel group that starts it can exit?
template <class Handshaker, class Connection, class Logger>
struct hello_op {
Handshaker* handshaker_ = nullptr;
Connection* conn_ = nullptr;
Logger logger_;
asio::coroutine coro_{};

template <class Self>
void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
{
BOOST_ASIO_CORO_REENTER (coro_)
{
handshaker_->add_hello();

BOOST_ASIO_CORO_YIELD
conn_->async_exec(handshaker_->hello_req_, handshaker_->hello_resp_, std::move(self));
logger_.on_hello(ec, handshaker_->hello_resp_);

if (ec) {
conn_->cancel(operation::run);
self.complete(ec);
return;
}

if (handshaker_->has_error_in_response()) {
conn_->cancel(operation::run);
self.complete(error::resp3_hello);
return;
}

self.complete({});
}
}
};

template <class Executor>
class resp3_handshaker {
public:
void set_config(config const& cfg)
{ cfg_ = cfg; }

template <class Connection, class Logger, class CompletionToken>
auto async_hello(Connection& conn, Logger l, CompletionToken token)
{
return asio::async_compose
< CompletionToken
, void(system::error_code)
>(hello_op<resp3_handshaker, Connection, Logger>{this, &conn, l}, token, conn);
}

private:
template <class, class, class> friend struct hello_op;

void add_hello()
{
hello_req_.clear();
if (hello_resp_.has_value())
hello_resp_.value().clear();
push_hello(cfg_, hello_req_);
}

bool has_error_in_response() const noexcept
{
if (!hello_resp_.has_value())
return true;

auto f = [](auto const& e)
{
switch (e.data_type) {
case resp3::type::simple_error:
case resp3::type::blob_error: return true;
default: return false;
}
};

return std::any_of(std::cbegin(hello_resp_.value()), std::cend(hello_resp_.value()), f);
}

request hello_req_;
generic_response hello_resp_;
config cfg_;
};

} // boost::redis::detail

#endif // BOOST_REDIS_RUNNER_HPP
Loading

0 comments on commit f6e14c2

Please sign in to comment.