diff --git a/CMakeLists.txt b/CMakeLists.txt index 5c4347d5..fb9655db 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -162,7 +162,6 @@ if (BOOST_REDIS_TESTS) make_test(test_conn_exec_error 17) make_test(test_request 17) make_test(test_run 17) - make_test(test_low_level_sync 17) make_test(test_low_level_sync_sans_io 17) make_test(test_conn_check_health 17) @@ -172,7 +171,6 @@ if (BOOST_REDIS_TESTS) make_test(test_conn_exec_cancel 20) make_test(test_conn_exec_cancel2 20) make_test(test_conn_echo_stress 20) - make_test(test_low_level_async 20) make_test(test_conn_run_cancel 20) make_test(test_issue_50 20) endif() diff --git a/examples/cpp17_intro.cpp b/examples/cpp17_intro.cpp index 13a303a0..d0a98b4e 100644 --- a/examples/cpp17_intro.cpp +++ b/examples/cpp17_intro.cpp @@ -8,7 +8,7 @@ #include #include -namespace net = boost::asio; +namespace asio = boost::asio; using boost::redis::connection; using boost::redis::request; using boost::redis::response; @@ -29,10 +29,10 @@ auto main(int argc, char * argv[]) -> int response resp; - net::io_context ioc; + asio::io_context ioc; connection conn{ioc}; - conn.async_run(cfg, {}, net::detached); + conn.async_run(cfg, {}, asio::detached); conn.async_exec(req, resp, [&](auto ec, auto) { if (!ec) diff --git a/examples/cpp17_intro_sync.cpp b/examples/cpp17_intro_sync.cpp index e9a4627d..1ed9a7d6 100644 --- a/examples/cpp17_intro_sync.cpp +++ b/examples/cpp17_intro_sync.cpp @@ -9,7 +9,6 @@ #include #include -namespace net = boost::asio; using boost::redis::sync_connection; using boost::redis::request; using boost::redis::response; diff --git a/examples/cpp20_chat_room.cpp b/examples/cpp20_chat_room.cpp index bc9a73e7..167f59e1 100644 --- a/examples/cpp20_chat_room.cpp +++ b/examples/cpp20_chat_room.cpp @@ -17,16 +17,23 @@ #if defined(BOOST_ASIO_HAS_CO_AWAIT) #if defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR) -namespace net = boost::asio; -using stream_descriptor = net::deferred_t::as_default_on_t; -using signal_set = net::deferred_t::as_default_on_t; -using boost::redis::request; -using boost::redis::generic_response; +namespace asio = boost::asio; +using stream_descriptor = asio::deferred_t::as_default_on_t; +using signal_set = asio::deferred_t::as_default_on_t; +using boost::asio::async_read_until; +using boost::asio::awaitable; +using boost::asio::co_spawn; +using boost::asio::consign; +using boost::asio::deferred; +using boost::asio::detached; +using boost::asio::dynamic_buffer; +using boost::asio::redirect_error; +using boost::asio::use_awaitable; using boost::redis::config; using boost::redis::connection; +using boost::redis::generic_response; using boost::redis::ignore; -using net::redirect_error; -using net::use_awaitable; +using boost::redis::request; using boost::system::error_code; using namespace std::chrono_literals; @@ -34,7 +41,7 @@ using namespace std::chrono_literals; // terminals and type messages to stdin. auto -receiver(std::shared_ptr conn) -> net::awaitable +receiver(std::shared_ptr conn) -> awaitable { request req; req.push("SUBSCRIBE", "channel"); @@ -45,7 +52,7 @@ receiver(std::shared_ptr conn) -> net::awaitable while (conn->will_reconnect()) { // Subscribe to channels. - co_await conn->async_exec(req, ignore, net::deferred); + co_await conn->async_exec(req, ignore, deferred); // Loop reading Redis push messages. for (error_code ec;;) { @@ -63,27 +70,27 @@ receiver(std::shared_ptr conn) -> net::awaitable } // Publishes stdin messages to a Redis channel. -auto publisher(std::shared_ptr in, std::shared_ptr conn) -> net::awaitable +auto publisher(std::shared_ptr in, std::shared_ptr conn) -> awaitable { for (std::string msg;;) { - auto n = co_await net::async_read_until(*in, net::dynamic_buffer(msg, 1024), "\n"); + auto n = co_await async_read_until(*in, dynamic_buffer(msg, 1024), "\n"); request req; req.push("PUBLISH", "channel", msg); - co_await conn->async_exec(req, ignore, net::deferred); + co_await conn->async_exec(req, ignore, deferred); msg.erase(0, n); } } // Called from the main function (see main.cpp) -auto co_main(config cfg) -> net::awaitable +auto co_main(config cfg) -> awaitable { - auto ex = co_await net::this_coro::executor; + auto ex = co_await asio::this_coro::executor; auto conn = std::make_shared(ex); auto stream = std::make_shared(ex, ::dup(STDIN_FILENO)); - net::co_spawn(ex, receiver(conn), net::detached); - net::co_spawn(ex, publisher(stream, conn), net::detached); - conn->async_run(cfg, {}, net::consign(net::detached, conn)); + co_spawn(ex, receiver(conn), detached); + co_spawn(ex, publisher(stream, conn), detached); + conn->async_run(cfg, {}, consign(detached, conn)); signal_set sig_set{ex, SIGINT, SIGTERM}; co_await sig_set.async_wait(); @@ -92,7 +99,7 @@ auto co_main(config cfg) -> net::awaitable } #else // defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR) -auto co_main(config const&) -> net::awaitable +auto co_main(config const&) -> awaitable { std::cout << "Requires support for posix streams." << std::endl; co_return; diff --git a/examples/cpp20_containers.cpp b/examples/cpp20_containers.cpp index dfedd82e..66a7e942 100644 --- a/examples/cpp20_containers.cpp +++ b/examples/cpp20_containers.cpp @@ -14,13 +14,17 @@ #if defined(BOOST_ASIO_HAS_CO_AWAIT) -namespace net = boost::asio; +namespace asio = boost::asio; using boost::redis::request; using boost::redis::response; using boost::redis::ignore_t; using boost::redis::ignore; using boost::redis::config; using boost::redis::connection; +using boost::asio::awaitable; +using boost::asio::deferred; +using boost::asio::detached; +using boost::asio::consign; void print(std::map const& cont) { @@ -35,7 +39,7 @@ void print(std::vector const& cont) } // Stores the content of some STL containers in Redis. -auto store(std::shared_ptr conn) -> net::awaitable +auto store(std::shared_ptr conn) -> awaitable { std::vector vec {1, 2, 3, 4, 5, 6}; @@ -47,10 +51,10 @@ auto store(std::shared_ptr conn) -> net::awaitable req.push_range("RPUSH", "rpush-key", vec); req.push_range("HSET", "hset-key", map); - co_await conn->async_exec(req, ignore, net::deferred); + co_await conn->async_exec(req, ignore, deferred); } -auto hgetall(std::shared_ptr conn) -> net::awaitable +auto hgetall(std::shared_ptr conn) -> awaitable { // A request contains multiple commands. request req; @@ -60,13 +64,13 @@ auto hgetall(std::shared_ptr conn) -> net::awaitable response> resp; // Executes the request and reads the response. - co_await conn->async_exec(req, resp, net::deferred); + co_await conn->async_exec(req, resp, deferred); print(std::get<0>(resp).value()); } // Retrieves in a transaction. -auto transaction(std::shared_ptr conn) -> net::awaitable +auto transaction(std::shared_ptr conn) -> awaitable { request req; req.push("MULTI"); @@ -81,17 +85,17 @@ auto transaction(std::shared_ptr conn) -> net::awaitable response>, std::optional>> // exec > resp; - co_await conn->async_exec(req, resp, net::deferred); + co_await conn->async_exec(req, resp, deferred); print(std::get<0>(std::get<3>(resp).value()).value().value()); print(std::get<1>(std::get<3>(resp).value()).value().value()); } // Called from the main function (see main.cpp) -net::awaitable co_main(config cfg) +awaitable co_main(config cfg) { - auto conn = std::make_shared(co_await net::this_coro::executor); - conn->async_run(cfg, {}, net::consign(net::detached, conn)); + auto conn = std::make_shared(co_await asio::this_coro::executor); + conn->async_run(cfg, {}, consign(detached, conn)); co_await store(conn); co_await transaction(conn); diff --git a/examples/cpp20_echo_server.cpp b/examples/cpp20_echo_server.cpp index 9b637240..eba908a2 100644 --- a/examples/cpp20_echo_server.cpp +++ b/examples/cpp20_echo_server.cpp @@ -14,10 +14,10 @@ #if defined(BOOST_ASIO_HAS_CO_AWAIT) -namespace net = boost::asio; -using tcp_socket = net::deferred_t::as_default_on_t; -using tcp_acceptor = net::deferred_t::as_default_on_t; -using signal_set = net::deferred_t::as_default_on_t; +namespace asio = boost::asio; +using tcp_socket = asio::deferred_t::as_default_on_t; +using tcp_acceptor = asio::deferred_t::as_default_on_t; +using signal_set = asio::deferred_t::as_default_on_t; using boost::redis::request; using boost::redis::response; using boost::redis::config; @@ -25,16 +25,16 @@ using boost::system::error_code; using boost::redis::connection; using namespace std::chrono_literals; -auto echo_server_session(tcp_socket socket, std::shared_ptr conn) -> net::awaitable +auto echo_server_session(tcp_socket socket, std::shared_ptr conn) -> asio::awaitable { request req; response resp; for (std::string buffer;;) { - auto n = co_await net::async_read_until(socket, net::dynamic_buffer(buffer, 1024), "\n"); + auto n = co_await asio::async_read_until(socket, asio::dynamic_buffer(buffer, 1024), "\n"); req.push("PING", buffer); - co_await conn->async_exec(req, resp, net::deferred); - co_await net::async_write(socket, net::buffer(std::get<0>(resp).value())); + co_await conn->async_exec(req, resp, asio::deferred); + co_await asio::async_write(socket, asio::buffer(std::get<0>(resp).value())); std::get<0>(resp).value().clear(); req.clear(); buffer.erase(0, n); @@ -42,25 +42,25 @@ auto echo_server_session(tcp_socket socket, std::shared_ptr conn) -> } // Listens for tcp connections. -auto listener(std::shared_ptr conn) -> net::awaitable +auto listener(std::shared_ptr conn) -> asio::awaitable { try { - auto ex = co_await net::this_coro::executor; - tcp_acceptor acc(ex, {net::ip::tcp::v4(), 55555}); + auto ex = co_await asio::this_coro::executor; + tcp_acceptor acc(ex, {asio::ip::tcp::v4(), 55555}); for (;;) - net::co_spawn(ex, echo_server_session(co_await acc.async_accept(), conn), net::detached); + asio::co_spawn(ex, echo_server_session(co_await acc.async_accept(), conn), asio::detached); } catch (std::exception const& e) { std::clog << "Listener: " << e.what() << std::endl; } } // Called from the main function (see main.cpp) -auto co_main(config cfg) -> net::awaitable +auto co_main(config cfg) -> asio::awaitable { - auto ex = co_await net::this_coro::executor; + auto ex = co_await asio::this_coro::executor; auto conn = std::make_shared(ex); - net::co_spawn(ex, listener(conn), net::detached); - conn->async_run(cfg, {}, net::consign(net::detached, conn)); + asio::co_spawn(ex, listener(conn), asio::detached); + conn->async_run(cfg, {}, asio::consign(asio::detached, conn)); signal_set sig_set(ex, SIGINT, SIGTERM); co_await sig_set.async_wait(); diff --git a/examples/cpp20_intro.cpp b/examples/cpp20_intro.cpp index 195122cf..b2154d31 100644 --- a/examples/cpp20_intro.cpp +++ b/examples/cpp20_intro.cpp @@ -13,17 +13,17 @@ #if defined(BOOST_ASIO_HAS_CO_AWAIT) -namespace net = boost::asio; +namespace asio = boost::asio; using boost::redis::request; using boost::redis::response; using boost::redis::config; using boost::redis::connection; // Called from the main function (see main.cpp) -auto co_main(config cfg) -> net::awaitable +auto co_main(config cfg) -> asio::awaitable { - auto conn = std::make_shared(co_await net::this_coro::executor); - conn->async_run(cfg, {}, net::consign(net::detached, conn)); + auto conn = std::make_shared(co_await asio::this_coro::executor); + conn->async_run(cfg, {}, asio::consign(asio::detached, conn)); // A request containing only a ping command. request req; @@ -33,7 +33,7 @@ auto co_main(config cfg) -> net::awaitable response resp; // Executes the request. - co_await conn->async_exec(req, resp, net::deferred); + co_await conn->async_exec(req, resp, asio::deferred); conn->cancel(); std::cout << "PING: " << std::get<0>(resp).value() << std::endl; diff --git a/examples/cpp20_intro_tls.cpp b/examples/cpp20_intro_tls.cpp index b911af27..b98028ce 100644 --- a/examples/cpp20_intro_tls.cpp +++ b/examples/cpp20_intro_tls.cpp @@ -13,20 +13,20 @@ #if defined(BOOST_ASIO_HAS_CO_AWAIT) -namespace net = boost::asio; +namespace asio = boost::asio; using boost::redis::request; using boost::redis::response; using boost::redis::config; using boost::redis::logger; using boost::redis::connection; -auto verify_certificate(bool, net::ssl::verify_context&) -> bool +auto verify_certificate(bool, asio::ssl::verify_context&) -> bool { std::cout << "set_verify_callback" << std::endl; return true; } -auto co_main(config cfg) -> net::awaitable +auto co_main(config cfg) -> asio::awaitable { cfg.use_ssl = true; cfg.username = "aedis"; @@ -34,18 +34,18 @@ auto co_main(config cfg) -> net::awaitable cfg.addr.host = "db.occase.de"; cfg.addr.port = "6380"; - auto conn = std::make_shared(co_await net::this_coro::executor); - conn->async_run(cfg, {}, net::consign(net::detached, conn)); + auto conn = std::make_shared(co_await asio::this_coro::executor); + conn->async_run(cfg, {}, asio::consign(asio::detached, conn)); request req; req.push("PING"); response resp; - conn->next_layer().set_verify_mode(net::ssl::verify_peer); + conn->next_layer().set_verify_mode(asio::ssl::verify_peer); conn->next_layer().set_verify_callback(verify_certificate); - co_await conn->async_exec(req, resp, net::deferred); + co_await conn->async_exec(req, resp, asio::deferred); conn->cancel(); std::cout << "Response: " << std::get<0>(resp).value() << std::endl; diff --git a/examples/cpp20_json.cpp b/examples/cpp20_json.cpp index d0c6423c..8a18e1d7 100644 --- a/examples/cpp20_json.cpp +++ b/examples/cpp20_json.cpp @@ -23,7 +23,7 @@ #include #include -namespace net = boost::asio; +namespace asio = boost::asio; using namespace boost::describe; using boost::redis::request; using boost::redis::response; @@ -48,11 +48,11 @@ void boost_redis_to_bulk(std::string& to, user const& u) void boost_redis_from_bulk(user& u, std::string_view sv, boost::system::error_code&) { u = boost::json::value_to(boost::json::parse(sv)); } -auto co_main(config cfg) -> net::awaitable +auto co_main(config cfg) -> asio::awaitable { - auto ex = co_await net::this_coro::executor; + auto ex = co_await asio::this_coro::executor; auto conn = std::make_shared(ex); - conn->async_run(cfg, {}, net::consign(net::detached, conn)); + conn->async_run(cfg, {}, asio::consign(asio::detached, conn)); // user object that will be stored in Redis in json format. user const u{"Joao", "58", "Brazil"}; @@ -64,7 +64,7 @@ auto co_main(config cfg) -> net::awaitable response resp; - co_await conn->async_exec(req, resp, net::deferred); + co_await conn->async_exec(req, resp, asio::deferred); conn->cancel(); // Prints the first ping diff --git a/examples/cpp20_protobuf.cpp b/examples/cpp20_protobuf.cpp index 75eb8fd2..f8ab5494 100644 --- a/examples/cpp20_protobuf.cpp +++ b/examples/cpp20_protobuf.cpp @@ -19,7 +19,7 @@ #if defined(BOOST_ASIO_HAS_CO_AWAIT) -namespace net = boost::asio; +namespace asio = boost::asio; using boost::redis::request; using boost::redis::response; using boost::redis::operation; @@ -58,11 +58,11 @@ void boost_redis_from_bulk(person& u, std::string_view sv, boost::system::error_ using tutorial::boost_redis_to_bulk; using tutorial::boost_redis_from_bulk; -net::awaitable co_main(config cfg) +asio::awaitable co_main(config cfg) { - auto ex = co_await net::this_coro::executor; + auto ex = co_await asio::this_coro::executor; auto conn = std::make_shared(ex); - conn->async_run(cfg, {}, net::consign(net::detached, conn)); + conn->async_run(cfg, {}, asio::consign(asio::detached, conn)); person p; p.set_name("Louis"); @@ -76,7 +76,7 @@ net::awaitable co_main(config cfg) response resp; // Sends the request and receives the response. - co_await conn->async_exec(req, resp, net::deferred); + co_await conn->async_exec(req, resp, asio::deferred); conn->cancel(); std::cout diff --git a/examples/cpp20_resolve_with_sentinel.cpp b/examples/cpp20_resolve_with_sentinel.cpp index 8401cd1e..cca77ded 100644 --- a/examples/cpp20_resolve_with_sentinel.cpp +++ b/examples/cpp20_resolve_with_sentinel.cpp @@ -12,8 +12,8 @@ #if defined(BOOST_ASIO_HAS_CO_AWAIT) -namespace net = boost::asio; -using endpoints = net::ip::tcp::resolver::results_type; +namespace asio = boost::asio; +using endpoints = asio::ip::tcp::resolver::results_type; using boost::redis::request; using boost::redis::response; using boost::redis::ignore_t; @@ -22,18 +22,18 @@ using boost::redis::address; using boost::redis::connection; auto redir(boost::system::error_code& ec) - { return net::redirect_error(net::use_awaitable, ec); } + { return asio::redirect_error(asio::use_awaitable, ec); } // For more info see // - https://redis.io/docs/manual/sentinel. // - https://redis.io/docs/reference/sentinel-clients. -auto resolve_master_address(std::vector
const& addresses) -> net::awaitable
+auto resolve_master_address(std::vector
const& addresses) -> asio::awaitable
{ request req; req.push("SENTINEL", "get-master-addr-by-name", "mymaster"); req.push("QUIT"); - auto conn = std::make_shared(co_await net::this_coro::executor); + auto conn = std::make_shared(co_await asio::this_coro::executor); response>, ignore_t> resp; for (auto addr : addresses) { @@ -43,7 +43,7 @@ auto resolve_master_address(std::vector
const& addresses) -> net::await // TODO: async_run and async_exec should be lauched in // parallel here so we can wait for async_run completion // before eventually calling it again. - conn->async_run(cfg, {}, net::consign(net::detached, conn)); + conn->async_run(cfg, {}, asio::consign(asio::detached, conn)); co_await conn->async_exec(req, resp, redir(ec)); conn->cancel(); conn->reset_stream(); @@ -54,7 +54,7 @@ auto resolve_master_address(std::vector
const& addresses) -> net::await co_return address{}; } -auto co_main(config cfg) -> net::awaitable +auto co_main(config cfg) -> asio::awaitable { // A list of sentinel addresses from which only one is responsive. // This simulates sentinels that are down. diff --git a/examples/cpp20_subscriber.cpp b/examples/cpp20_subscriber.cpp index 30dcb308..ac1cc884 100644 --- a/examples/cpp20_subscriber.cpp +++ b/examples/cpp20_subscriber.cpp @@ -18,7 +18,7 @@ #if defined(BOOST_ASIO_HAS_CO_AWAIT) -namespace net = boost::asio; +namespace asio = boost::asio; using namespace std::chrono_literals; using boost::redis::request; using boost::redis::generic_response; @@ -27,7 +27,7 @@ using boost::redis::config; using boost::redis::ignore; using boost::system::error_code; using boost::redis::connection; -using signal_set = net::deferred_t::as_default_on_t; +using signal_set = asio::deferred_t::as_default_on_t; /* This example will subscribe and read pushes indefinitely. * @@ -47,7 +47,7 @@ using signal_set = net::deferred_t::as_default_on_t; // Receives server pushes. auto -receiver(std::shared_ptr conn) -> net::awaitable +receiver(std::shared_ptr conn) -> asio::awaitable { request req; req.push("SUBSCRIBE", "channel"); @@ -59,11 +59,11 @@ receiver(std::shared_ptr conn) -> net::awaitable while (conn->will_reconnect()) { // Reconnect to channels. - co_await conn->async_exec(req, ignore, net::deferred); + co_await conn->async_exec(req, ignore, asio::deferred); // Loop reading Redis pushs messages. for (error_code ec;;) { - co_await conn->async_receive(net::redirect_error(net::use_awaitable, ec)); + co_await conn->async_receive(asio::redirect_error(asio::use_awaitable, ec)); if (ec) break; // Connection lost, break so we can reconnect to channels. std::cout @@ -76,12 +76,12 @@ receiver(std::shared_ptr conn) -> net::awaitable } } -auto co_main(config cfg) -> net::awaitable +auto co_main(config cfg) -> asio::awaitable { - auto ex = co_await net::this_coro::executor; + auto ex = co_await asio::this_coro::executor; auto conn = std::make_shared(ex); - net::co_spawn(ex, receiver(conn), net::detached); - conn->async_run(cfg, {}, net::consign(net::detached, conn)); + asio::co_spawn(ex, receiver(conn), asio::detached); + conn->async_run(cfg, {}, asio::consign(asio::detached, conn)); signal_set sig_set(ex, SIGINT, SIGTERM); co_await sig_set.async_wait(); diff --git a/examples/main.cpp b/examples/main.cpp index f2d79213..0bef3ccc 100644 --- a/examples/main.cpp +++ b/examples/main.cpp @@ -11,13 +11,13 @@ #include #include -namespace net = boost::asio; +namespace asio = boost::asio; using boost::redis::config; using boost::redis::logger; #if defined(BOOST_ASIO_HAS_CO_AWAIT) -extern net::awaitable co_main(config); +extern asio::awaitable co_main(config); auto main(int argc, char * argv[]) -> int { @@ -29,8 +29,8 @@ auto main(int argc, char * argv[]) -> int cfg.addr.port = argv[2]; } - net::io_context ioc; - net::co_spawn(ioc, co_main(cfg), [](std::exception_ptr p) { + asio::io_context ioc; + asio::co_spawn(ioc, co_main(cfg), [](std::exception_ptr p) { if (p) std::rethrow_exception(p); }); diff --git a/include/boost/redis/detail/connection_base.hpp b/include/boost/redis/detail/connection_base.hpp index 78ee229e..1c4ff578 100644 --- a/include/boost/redis/detail/connection_base.hpp +++ b/include/boost/redis/detail/connection_base.hpp @@ -9,7 +9,6 @@ #include #include -#include #include #include #include @@ -44,6 +43,66 @@ namespace boost::redis::detail { +template +std::string_view buffer_view(DynamicBuffer buf) noexcept +{ + char const* start = static_cast(buf.data(0, buf.size()).data()); + return std::string_view{start, std::size(buf)}; +} + +template +class append_some_op { +private: + AsyncReadStream& stream_; + DynamicBuffer buf_; + std::size_t size_ = 0; + std::size_t tmp_ = 0; + asio::coroutine coro_{}; + +public: + append_some_op(AsyncReadStream& stream, DynamicBuffer buf, std::size_t size) + : stream_ {stream} + , buf_ {std::move(buf)} + , size_{size} + { } + + template + void operator()( Self& self + , system::error_code ec = {} + , std::size_t n = 0) + { + BOOST_ASIO_CORO_REENTER (coro_) + { + tmp_ = buf_.size(); + buf_.grow(size_); + + BOOST_ASIO_CORO_YIELD + stream_.async_read_some(buf_.data(tmp_, size_), std::move(self)); + if (ec) { + self.complete(ec, 0); + return; + } + + buf_.shrink(buf_.size() - tmp_ - n); + self.complete({}, n); + } + } +}; + +template +auto +async_append_some( + AsyncReadStream& stream, + DynamicBuffer buffer, + std::size_t size, + CompletionToken&& token) +{ + return asio::async_compose + < CompletionToken + , void(system::error_code, std::size_t) + >(append_some_op {stream, buffer, size}, token, stream); +} + template struct exec_op { using req_info_type = typename Conn::req_info; @@ -128,9 +187,7 @@ struct run_op { { BOOST_ASIO_CORO_REENTER (coro) { - conn->write_buffer_.clear(); - conn->read_buffer_.clear(); - conn->parser_.reset(); + conn->reset(); BOOST_ASIO_CORO_YIELD asio::experimental::make_parallel_group( @@ -331,8 +388,6 @@ class connection_base { using clock_traits_type = asio::wait_traits; using timer_type = asio::basic_waitable_timer; - using receiver_adapter_type = std::function const&, system::error_code&)>; - using this_type = connection_base; /// Constructs from an executor. @@ -438,6 +493,7 @@ class connection_base { using receive_channel_type = asio::experimental::channel; using runner_type = runner; using adapter_type = std::function const&, system::error_code&)>; + using receiver_adapter_type = std::function const&, system::error_code&)>; auto use_ssl() const noexcept { return runner_.get_config().use_ssl;} @@ -726,7 +782,6 @@ class connection_base { void close() { if (stream_->next_layer().is_open()) { - // TODO: Communicate the error to the caller. system::error_code ec; stream_->next_layer().close(ec); } @@ -845,6 +900,14 @@ class connection_base { return on_finish_parsing(parse_result::resp); } + void reset() + { + write_buffer_.clear(); + read_buffer_.clear(); + parser_.reset(); + on_push_ = false; + } + asio::ssl::context ctx_; std::unique_ptr stream_; diff --git a/include/boost/redis/detail/read.hpp b/include/boost/redis/detail/read.hpp deleted file mode 100644 index 9a74a498..00000000 --- a/include/boost/redis/detail/read.hpp +++ /dev/null @@ -1,291 +0,0 @@ -/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) - * - * Distributed under the Boost Software License, Version 1.0. (See - * accompanying file LICENSE.txt) - */ - -#ifndef BOOST_REDIS_READ_HPP -#define BOOST_REDIS_READ_HPP - -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -namespace boost::redis::detail { - -template -std::string_view buffer_view(DynamicBuffer buf) noexcept -{ - char const* start = static_cast(buf.data(0, buf.size()).data()); - return std::string_view{start, std::size(buf)}; -} - -template -class append_some_op { -private: - AsyncReadStream& stream_; - DynamicBuffer buf_; - std::size_t size_ = 0; - std::size_t tmp_ = 0; - asio::coroutine coro_{}; - -public: - append_some_op(AsyncReadStream& stream, DynamicBuffer buf, std::size_t size) - : stream_ {stream} - , buf_ {std::move(buf)} - , size_{size} - { } - - template - void operator()( Self& self - , system::error_code ec = {} - , std::size_t n = 0) - { - BOOST_ASIO_CORO_REENTER (coro_) - { - tmp_ = buf_.size(); - buf_.grow(size_); - - BOOST_ASIO_CORO_YIELD - stream_.async_read_some(buf_.data(tmp_, size_), std::move(self)); - if (ec) { - self.complete(ec, 0); - return; - } - - buf_.shrink(buf_.size() - tmp_ - n); - self.complete({}, n); - } - } -}; - -template -auto -async_append_some( - AsyncReadStream& stream, - DynamicBuffer buffer, - std::size_t size, - CompletionToken&& token) -{ - return asio::async_compose - < CompletionToken - , void(system::error_code, std::size_t) - >(append_some_op {stream, buffer, size}, token, stream); -} - -template < - class AsyncReadStream, - class DynamicBuffer, - class ResponseAdapter> -class parse_op { -private: - AsyncReadStream& stream_; - DynamicBuffer buf_; - resp3::parser parser_; - ResponseAdapter adapter_; - bool needs_rescheduling_ = true; - system::error_code ec_; - asio::coroutine coro_{}; - - static std::size_t const growth = 1024; - -public: - parse_op(AsyncReadStream& stream, DynamicBuffer buf, ResponseAdapter adapter) - : stream_ {stream} - , buf_ {std::move(buf)} - , adapter_ {std::move(adapter)} - { } - - template - void operator()( Self& self - , system::error_code ec = {} - , std::size_t = 0) - { - BOOST_ASIO_CORO_REENTER (coro_) - { - while (!resp3::parse(parser_, buffer_view(buf_), adapter_, ec)) { - needs_rescheduling_ = false; - BOOST_ASIO_CORO_YIELD - async_append_some( - stream_, buf_, parser_.get_suggested_buffer_growth(growth), - std::move(self)); - if (ec) { - self.complete(ec, 0); - return; - } - } - - ec_ = ec; - if (needs_rescheduling_) { - BOOST_ASIO_CORO_YIELD - asio::post(std::move(self)); - } - - self.complete(ec_, parser_.get_consumed()); - } - } -}; - -/** \brief Reads a complete response to a command sychronously. - * - * This function reads a complete response to a command or a - * server push synchronously. For example - * - * @code - * int resp; - * std::string buffer; - * resp3::read(socket, dynamic_buffer(buffer), adapt(resp)); - * @endcode - * - * For a complete example see examples/intro_sync.cpp. This function - * is implemented in terms of one or more calls to @c - * asio::read_until and @c asio::read functions, and is known as a @a - * composed @a operation. Furthermore, the implementation may read - * additional bytes from the stream that lie past the end of the - * message being read. These additional bytes are stored in the - * dynamic buffer, which must be preserved for subsequent reads. - * - * \param stream The stream from which to read e.g. a tcp socket. - * \param buf Dynamic buffer (version 2). - * \param adapter The response adapter. - * \param ec If an error occurs, it will be assigned to this paramter. - * \returns The number of bytes that have been consumed from the dynamic buffer. - * - * \remark This function calls buf.consume() in each chunk of data - * after it has been passed to the adapter. Users must not consume - * the bytes after it returns. - */ -template < - class SyncReadStream, - class DynamicBuffer, - class ResponseAdapter - > -auto -read( - SyncReadStream& stream, - DynamicBuffer buf, - ResponseAdapter adapter, - system::error_code& ec) -> std::size_t -{ - static std::size_t const growth = 1024; - - resp3::parser parser; - while (!parser.done()) { - auto const res = parser.consume(detail::buffer_view(buf), ec); - if (ec) - return 0UL; - - if (!res.has_value()) { - auto const size_before = buf.size(); - buf.grow(parser.get_suggested_buffer_growth(growth)); - auto const n = - stream.read_some( - buf.data(size_before, parser.get_suggested_buffer_growth(growth)), - ec); - if (ec) - return 0UL; - - buf.shrink(buf.size() - size_before - n); - continue; - } - - adapter(res.value(), ec); - if (ec) - return 0UL; - } - - return parser.get_consumed(); -} - -/** \brief Reads a complete response to a command sychronously. - * - * Same as the error_code overload but throws on error. - */ -template< - class SyncReadStream, - class DynamicBuffer, - class ResponseAdapter = adapter::ignore> -auto -read( - SyncReadStream& stream, - DynamicBuffer buf, - ResponseAdapter adapter = ResponseAdapter{}) -{ - system::error_code ec; - auto const n = redis::detail::read(stream, buf, adapter, ec); - - if (ec) - BOOST_THROW_EXCEPTION(system::system_error{ec}); - - return n; -} - -/** \brief Reads a complete response to a Redis command asynchronously. - * - * This function reads a complete response to a command or a - * server push asynchronously. For example - * - * @code - * std::string buffer; - * std::set resp; - * co_await resp3::async_read(socket, dynamic_buffer(buffer), adapt(resp)); - * @endcode - * - * For a complete example see examples/transaction.cpp. This function - * is implemented in terms of one or more calls to @c - * asio::async_read_until and @c asio::async_read functions, and is - * known as a @a composed @a operation. Furthermore, the - * implementation may read additional bytes from the stream that lie - * past the end of the message being read. These additional bytes are - * stored in the dynamic buffer, which must be preserved for - * subsequent reads. - * - * \param stream The stream from which to read e.g. a tcp socket. - * \param buffer Dynamic buffer (version 2). - * \param adapter The response adapter. - * \param token The completion token. - * - * The completion handler will receive as a parameter the total - * number of bytes transferred from the stream and must have the - * following signature - * - * @code - * void(system::error_code, std::size_t); - * @endcode - * - * \remark This function calls buf.consume() in each chunk of data - * after it has been passed to the adapter. Users must not consume - * the bytes after it returns. - */ -template < - class AsyncReadStream, - class DynamicBuffer, - class ResponseAdapter = adapter::ignore, - class CompletionToken = asio::default_completion_token_t - > -auto async_read( - AsyncReadStream& stream, - DynamicBuffer buffer, - ResponseAdapter adapter = ResponseAdapter{}, - CompletionToken&& token = - asio::default_completion_token_t{}) -{ - return asio::async_compose - < CompletionToken - , void(system::error_code, std::size_t) - >(parse_op {stream, buffer, adapter}, - token, - stream); -} - -} // boost::redis::detail - -#endif // BOOST_REDIS_READ_HPP diff --git a/tests/test_low_level.cpp b/tests/test_low_level.cpp index 0fb5b2e3..44419386 100644 --- a/tests/test_low_level.cpp +++ b/tests/test_low_level.cpp @@ -7,16 +7,11 @@ #include #include #include -#include -#include -#include -#include -#include -#include -#include -#include +#include + #define BOOST_TEST_MODULE low level #include + #include #include #include @@ -30,16 +25,17 @@ auto operator==(boost::redis::ignore_t, boost::redis::ignore_t) noexcept {return auto operator!=(boost::redis::ignore_t, boost::redis::ignore_t) noexcept {return false;} } -namespace net = boost::asio; namespace redis = boost::redis; namespace resp3 = boost::redis::resp3; +using boost::system::error_code; using boost::redis::request; using boost::redis::response; using boost::redis::ignore; using boost::redis::ignore_t; using boost::redis::adapter::result; +using boost::redis::resp3::parser; +using boost::redis::resp3::parse; -using test_stream = boost::beast::test::stream; using boost::redis::adapter::adapt2; using node_type = result; using vec_node_type = result>; @@ -82,95 +78,51 @@ template struct expect { std::string in; Result expected; - boost::system::error_code ec{}; + error_code ec{}; resp3::type error_type = resp3::type::invalid; }; template -auto make_expected(std::string in, Result expected, boost::system::error_code ec = {}, resp3::type error_type = resp3::type::invalid) +auto make_expected(std::string in, Result expected, error_code ec = {}, resp3::type error_type = resp3::type::invalid) { return expect{in, expected, ec, error_type}; } template -void test_sync(net::any_io_executor ex, expect e) +void test_sync(expect e) { - std::string rbuffer; - test_stream ts {ex}; - ts.append(e.in); + parser p; Result result; - boost::system::error_code ec; - auto dbuf = net::dynamic_buffer(rbuffer); - auto const consumed = redis::detail::read(ts, dbuf, adapt2(result), ec); - if (e.ec) { + auto adapter = adapt2(result); + error_code ec; + auto const res = parse(p, e.in, adapter, ec); + + BOOST_TEST(res); // None of these tests need more data. + + if (ec) { BOOST_CHECK_EQUAL(ec, e.ec); return; } - dbuf.consume(consumed); - - BOOST_TEST(!ec); - BOOST_TEST(rbuffer.empty()); - if (result.has_value()) { - auto const res = result == e.expected; - BOOST_TEST(res); + BOOST_TEST(bool(result == e.expected)); + BOOST_CHECK_EQUAL(e.in.size(), p.get_consumed()); } else { - BOOST_TEST(result.has_error()); BOOST_CHECK_EQUAL(result.error().data_type, e.error_type); } } template -class async_test: public std::enable_shared_from_this> { -private: - std::string rbuffer_; - test_stream ts_; - expect data_; - Result result_; - -public: - async_test(net::any_io_executor ex, expect e) - : ts_{ex} - , data_{e} - { - ts_.append(e.in); - } - - void run() - { - auto self = this->shared_from_this(); - auto f = [self](auto ec, auto) - { - if (self->data_.ec) { - BOOST_CHECK_EQUAL(ec, self->data_.ec); - return; - } - - BOOST_TEST(!ec); - //BOOST_TEST(self->rbuffer_.empty()); - - if (self->result_.has_value()) { - auto const res = self->result_ == self->data_.expected; - BOOST_TEST(res); - } else { - BOOST_TEST(self->result_.has_error()); - BOOST_CHECK_EQUAL(self->result_.error().data_type, self->data_.error_type); - } - }; - - redis::detail::async_read( - ts_, - net::dynamic_buffer(rbuffer_), - adapt2(result_), - f); - } -}; - -template -void test_async(net::any_io_executor ex, expect e) +void test_sync2(expect e) { - std::make_shared>(ex, e)->run(); + parser p; + Result result; + auto adapter = adapt2(result); + error_code ec; + auto const res = parse(p, e.in, adapter, ec); + + BOOST_TEST(res); // None of these tests need more data. + BOOST_CHECK_EQUAL(ec, e.ec); } auto make_blob() @@ -397,173 +349,136 @@ vec_node_type const attr_e1b #define S18d "$0\r\n\r\n" #define NUMBER_TEST_CONDITIONS(test) \ - test(ex, make_expected(S01a, result>{}, boost::redis::error::unexpected_bool_value)); \ - test(ex, make_expected(S01b, result{{false}})); \ - test(ex, make_expected(S01b, node_type{{resp3::type::boolean, 1UL, 0UL, {"f"}}})); \ - test(ex, make_expected(S01c, result{{true}})); \ - test(ex, make_expected(S01c, node_type{{resp3::type::boolean, 1UL, 0UL, {"t"}}})); \ - test(ex, make_expected(S01c, op_bool_ok)); \ - test(ex, make_expected(S01c, result>{}, boost::redis::error::expects_resp3_map)); \ - test(ex, make_expected(S01c, result>{}, boost::redis::error::expects_resp3_set)); \ - test(ex, make_expected(S01c, result>{}, boost::redis::error::expects_resp3_map)); \ - test(ex, make_expected(S01c, result>{}, boost::redis::error::expects_resp3_set)); \ - test(ex, make_expected(S02a, streamed_string_e2)); \ - test(ex, make_expected(S03a, result{}, boost::redis::error::expects_resp3_simple_type));\ - test(ex, make_expected(S03a, result>{}, boost::redis::error::expects_resp3_simple_type));; \ - test(ex, make_expected(S02b, result{}, boost::redis::error::not_a_number)); \ - test(ex, make_expected(S02b, result{std::string{"Hello word"}})); \ - test(ex, make_expected(S02b, streamed_string_e1)); \ - test(ex, make_expected(S02c, result{}, boost::redis::error::not_a_number)); \ - test(ex, make_expected(S05a, node_type{{resp3::type::number, 1UL, 0UL, {"-3"}}})); \ - test(ex, make_expected(S05b, result{11})); \ - test(ex, make_expected(S05b, op_int_ok)); \ - test(ex, make_expected(S05b, result>{}, boost::redis::error::expects_resp3_aggregate)); \ - test(ex, make_expected(S05b, result>{}, boost::redis::error::expects_resp3_map)); \ - test(ex, make_expected(S05b, result>{}, boost::redis::error::expects_resp3_set)); \ - test(ex, make_expected(S05b, result>{}, boost::redis::error::expects_resp3_map)); \ - test(ex, make_expected(S05b, result>{}, boost::redis::error::expects_resp3_set)); \ - test(ex, make_expected(s05c, array_type2{}, boost::redis::error::expects_resp3_aggregate));\ - test(ex, make_expected(s05c, node_type{{resp3::type::number, 1UL, 0UL, {"3"}}}));\ - test(ex, make_expected(S06a, op_type_01{})); \ - test(ex, make_expected(S06a, op_type_02{}));\ - test(ex, make_expected(S06a, op_type_03{}));\ - test(ex, make_expected(S06a, op_type_04{}));\ - test(ex, make_expected(S06a, op_type_05{}));\ - test(ex, make_expected(S06a, op_type_06{}));\ - test(ex, make_expected(S06a, op_type_07{}));\ - test(ex, make_expected(S06a, op_type_08{}));\ - test(ex, make_expected(S06a, op_type_09{}));\ - test(ex, make_expected(S07a, push_e1a)); \ - test(ex, make_expected(S07b, push_e1b)); \ - test(ex, make_expected(S04b, map_type{}, boost::redis::error::expects_resp3_map));\ - test(ex, make_expected(S03b, map_e1f));\ - test(ex, make_expected(S03b, map_e1g));\ - test(ex, make_expected(S03b, map_e1k));\ - test(ex, make_expected(S03b, map_expected_1a));\ - test(ex, make_expected(S03b, map_expected_1b));\ - test(ex, make_expected(S03b, map_expected_1c));\ - test(ex, make_expected(S03b, map_expected_1d));\ - test(ex, make_expected(S03b, map_expected_1e));\ - test(ex, make_expected(S08a, attr_e1a)); \ - test(ex, make_expected(S08b, attr_e1b)); \ - test(ex, make_expected(S04e, array_e1a));\ - test(ex, make_expected(S04e, array_e1b));\ - test(ex, make_expected(S04e, array_e1c));\ - test(ex, make_expected(S04e, array_e1f));\ - test(ex, make_expected(S04e, array_e1g));\ - test(ex, make_expected(S04e, array_e1h));\ - test(ex, make_expected(S04e, array_type2{}, boost::redis::error::incompatible_size));\ - test(ex, make_expected(S04e, tuple_int_2{}, boost::redis::error::incompatible_size));\ - test(ex, make_expected(S04f, array_type2{}, boost::redis::error::nested_aggregate_not_supported));\ - test(ex, make_expected(S04g, vec_node_type{}, boost::redis::error::exceeeds_max_nested_depth));\ - test(ex, make_expected(S04h, array_e1d));\ - test(ex, make_expected(S04h, array_e1e));\ - test(ex, make_expected(S04i, set_type{}, boost::redis::error::expects_resp3_set)); \ - test(ex, make_expected(S09a, set_e1c)); \ - test(ex, make_expected(S09a, set_e1d)); \ - test(ex, make_expected(S09a, set_e1f)); \ - test(ex, make_expected(S09a, set_e1g)); \ - test(ex, make_expected(S09a, set_expected1a)); \ - test(ex, make_expected(S09a, set_expected_1e)); \ - test(ex, make_expected(S09a, set_type{{"apple", "one", "orange", "three", "two"}})); \ - test(ex, make_expected(S09b, vec_node_type{{{resp3::type::set, 0UL, 0UL, {}}}})); \ - test(ex, make_expected(S03c, map_type{}));\ - test(ex, make_expected(S11a, node_type{{resp3::type::doublean, 1UL, 0UL, {"1.23"}}}));\ - test(ex, make_expected(S11b, node_type{{resp3::type::doublean, 1UL, 0UL, {"inf"}}}));\ - test(ex, make_expected(S11c, node_type{{resp3::type::doublean, 1UL, 0UL, {"-inf"}}}));\ - test(ex, make_expected(S11d, result{{1.23}}));\ - test(ex, make_expected(S11e, result{{0}}, boost::redis::error::not_a_double));\ - test(ex, make_expected(S13a, node_type{{resp3::type::verbatim_string, 1UL, 0UL, {"txt:Some string"}}}));\ - test(ex, make_expected(S13b, node_type{{resp3::type::verbatim_string, 1UL, 0UL, {}}}));\ - test(ex, make_expected(S14a, node_type{{resp3::type::big_number, 1UL, 0UL, {"3492890328409238509324850943850943825024385"}}}));\ - test(ex, make_expected(S14b, result{}, boost::redis::error::empty_field));\ - test(ex, make_expected(S15a, result>{{"OK"}}));\ - test(ex, make_expected(S15a, result{{"OK"}}));\ - test(ex, make_expected(S15b, result>{""}));\ - test(ex, make_expected(S15b, result{{""}}));\ - test(ex, make_expected(S16a, result{}, boost::redis::error::invalid_data_type));\ - test(ex, make_expected(S05d, result{11}, boost::redis::error::not_a_number));\ - test(ex, make_expected(S03d, map_type{}, boost::redis::error::not_a_number));\ - test(ex, make_expected(S02d, result{}, boost::redis::error::not_a_number));\ - test(ex, make_expected(S17a, result{}, boost::redis::error::not_a_number));\ - test(ex, make_expected(S05e, result{}, boost::redis::error::empty_field));\ - test(ex, make_expected(S01d, result>{}, boost::redis::error::empty_field));\ - test(ex, make_expected(S11f, result{}, boost::redis::error::empty_field));\ - test(ex, make_expected(S17b, node_type{{resp3::type::blob_string, 1UL, 0UL, {"hh"}}}));\ - test(ex, make_expected(S18c, node_type{{resp3::type::blob_string, 1UL, 0UL, {"hhaa\aaaa\raaaaa\r\naaaaaaaaaa"}}}));\ - test(ex, make_expected(S18d, node_type{{resp3::type::blob_string, 1UL, 0UL, {}}}));\ - test(ex, make_expected(make_blob_string(blob), node_type{{resp3::type::blob_string, 1UL, 0UL, {blob}}}));\ - test(ex, make_expected(S04a, result>{{11}})); \ - test(ex, make_expected(S04d, result>>{response>{{set_e1c}}})); \ - test(ex, make_expected(S04c, result>>{response>{{map_expected_1b}}}));\ - test(ex, make_expected(S03b, map_e1l));\ - test(ex, make_expected(S06a, result{0}, {}, resp3::type::null)); \ - test(ex, make_expected(S06a, map_type{}, {}, resp3::type::null));\ - test(ex, make_expected(S06a, array_type{}, {}, resp3::type::null));\ - test(ex, make_expected(S06a, result>{}, {}, resp3::type::null));\ - test(ex, make_expected(S06a, result>{}, {}, resp3::type::null));\ - test(ex, make_expected(S10a, result{}, boost::redis::error::resp3_simple_error)); \ - test(ex, make_expected(S10a, node_type{{resp3::type::simple_error, 1UL, 0UL, {"Error"}}}, {}, resp3::type::simple_error)); \ - test(ex, make_expected(S10b, node_type{{resp3::type::simple_error, 1UL, 0UL, {""}}}, {}, resp3::type::simple_error)); \ - test(ex, make_expected(S12a, node_type{{resp3::type::blob_error, 1UL, 0UL, {"SYNTAX invalid syntax"}}}, {}, resp3::type::blob_error));\ - test(ex, make_expected(S12b, node_type{{resp3::type::blob_error, 1UL, 0UL, {}}}, {}, resp3::type::blob_error));\ - test(ex, make_expected(S12c, result{}, boost::redis::error::resp3_blob_error));\ - -BOOST_AUTO_TEST_CASE(parser) + test(make_expected(S01a, result>{}, boost::redis::error::unexpected_bool_value)); \ + test(make_expected(S01b, result{{false}})); \ + test(make_expected(S01b, node_type{{resp3::type::boolean, 1UL, 0UL, {"f"}}})); \ + test(make_expected(S01c, result{{true}})); \ + test(make_expected(S01c, node_type{{resp3::type::boolean, 1UL, 0UL, {"t"}}})); \ + test(make_expected(S01c, op_bool_ok)); \ + test(make_expected(S01c, result>{}, boost::redis::error::expects_resp3_map)); \ + test(make_expected(S01c, result>{}, boost::redis::error::expects_resp3_set)); \ + test(make_expected(S01c, result>{}, boost::redis::error::expects_resp3_map)); \ + test(make_expected(S01c, result>{}, boost::redis::error::expects_resp3_set)); \ + test(make_expected(S02a, streamed_string_e2)); \ + test(make_expected(S03a, result{}, boost::redis::error::expects_resp3_simple_type));\ + test(make_expected(S03a, result>{}, boost::redis::error::expects_resp3_simple_type));; \ + test(make_expected(S02b, result{}, boost::redis::error::not_a_number)); \ + test(make_expected(S02b, result{std::string{"Hello word"}})); \ + test(make_expected(S02b, streamed_string_e1)); \ + test(make_expected(S02c, result{}, boost::redis::error::not_a_number)); \ + test(make_expected(S05a, node_type{{resp3::type::number, 1UL, 0UL, {"-3"}}})); \ + test(make_expected(S05b, result{11})); \ + test(make_expected(S05b, op_int_ok)); \ + test(make_expected(S05b, result>{}, boost::redis::error::expects_resp3_aggregate)); \ + test(make_expected(S05b, result>{}, boost::redis::error::expects_resp3_map)); \ + test(make_expected(S05b, result>{}, boost::redis::error::expects_resp3_set)); \ + test(make_expected(S05b, result>{}, boost::redis::error::expects_resp3_map)); \ + test(make_expected(S05b, result>{}, boost::redis::error::expects_resp3_set)); \ + test(make_expected(s05c, array_type2{}, boost::redis::error::expects_resp3_aggregate));\ + test(make_expected(s05c, node_type{{resp3::type::number, 1UL, 0UL, {"3"}}}));\ + test(make_expected(S06a, op_type_01{})); \ + test(make_expected(S06a, op_type_02{}));\ + test(make_expected(S06a, op_type_03{}));\ + test(make_expected(S06a, op_type_04{}));\ + test(make_expected(S06a, op_type_05{}));\ + test(make_expected(S06a, op_type_06{}));\ + test(make_expected(S06a, op_type_07{}));\ + test(make_expected(S06a, op_type_08{}));\ + test(make_expected(S06a, op_type_09{}));\ + test(make_expected(S07a, push_e1a)); \ + test(make_expected(S07b, push_e1b)); \ + test(make_expected(S04b, map_type{}, boost::redis::error::expects_resp3_map));\ + test(make_expected(S03b, map_e1f));\ + test(make_expected(S03b, map_e1g));\ + test(make_expected(S03b, map_e1k));\ + test(make_expected(S03b, map_expected_1a));\ + test(make_expected(S03b, map_expected_1b));\ + test(make_expected(S03b, map_expected_1c));\ + test(make_expected(S03b, map_expected_1d));\ + test(make_expected(S03b, map_expected_1e));\ + test(make_expected(S08a, attr_e1a)); \ + test(make_expected(S08b, attr_e1b)); \ + test(make_expected(S04e, array_e1a));\ + test(make_expected(S04e, array_e1b));\ + test(make_expected(S04e, array_e1c));\ + test(make_expected(S04e, array_e1f));\ + test(make_expected(S04e, array_e1g));\ + test(make_expected(S04e, array_e1h));\ + test(make_expected(S04e, array_type2{}, boost::redis::error::incompatible_size));\ + test(make_expected(S04e, tuple_int_2{}, boost::redis::error::incompatible_size));\ + test(make_expected(S04f, array_type2{}, boost::redis::error::nested_aggregate_not_supported));\ + test(make_expected(S04g, vec_node_type{}, boost::redis::error::exceeeds_max_nested_depth));\ + test(make_expected(S04h, array_e1d));\ + test(make_expected(S04h, array_e1e));\ + test(make_expected(S04i, set_type{}, boost::redis::error::expects_resp3_set)); \ + test(make_expected(S09a, set_e1c)); \ + test(make_expected(S09a, set_e1d)); \ + test(make_expected(S09a, set_e1f)); \ + test(make_expected(S09a, set_e1g)); \ + test(make_expected(S09a, set_expected1a)); \ + test(make_expected(S09a, set_expected_1e)); \ + test(make_expected(S09a, set_type{{"apple", "one", "orange", "three", "two"}})); \ + test(make_expected(S09b, vec_node_type{{{resp3::type::set, 0UL, 0UL, {}}}})); \ + test(make_expected(S03c, map_type{}));\ + test(make_expected(S11a, node_type{{resp3::type::doublean, 1UL, 0UL, {"1.23"}}}));\ + test(make_expected(S11b, node_type{{resp3::type::doublean, 1UL, 0UL, {"inf"}}}));\ + test(make_expected(S11c, node_type{{resp3::type::doublean, 1UL, 0UL, {"-inf"}}}));\ + test(make_expected(S11d, result{{1.23}}));\ + test(make_expected(S11e, result{{0}}, boost::redis::error::not_a_double));\ + test(make_expected(S13a, node_type{{resp3::type::verbatim_string, 1UL, 0UL, {"txt:Some string"}}}));\ + test(make_expected(S13b, node_type{{resp3::type::verbatim_string, 1UL, 0UL, {}}}));\ + test(make_expected(S14a, node_type{{resp3::type::big_number, 1UL, 0UL, {"3492890328409238509324850943850943825024385"}}}));\ + test(make_expected(S14b, result{}, boost::redis::error::empty_field));\ + test(make_expected(S15a, result>{{"OK"}}));\ + test(make_expected(S15a, result{{"OK"}}));\ + test(make_expected(S15b, result>{""}));\ + test(make_expected(S15b, result{{""}}));\ + test(make_expected(S16a, result{}, boost::redis::error::invalid_data_type));\ + test(make_expected(S05d, result{11}, boost::redis::error::not_a_number));\ + test(make_expected(S03d, map_type{}, boost::redis::error::not_a_number));\ + test(make_expected(S02d, result{}, boost::redis::error::not_a_number));\ + test(make_expected(S17a, result{}, boost::redis::error::not_a_number));\ + test(make_expected(S05e, result{}, boost::redis::error::empty_field));\ + test(make_expected(S01d, result>{}, boost::redis::error::empty_field));\ + test(make_expected(S11f, result{}, boost::redis::error::empty_field));\ + test(make_expected(S17b, node_type{{resp3::type::blob_string, 1UL, 0UL, {"hh"}}}));\ + test(make_expected(S18c, node_type{{resp3::type::blob_string, 1UL, 0UL, {"hhaa\aaaa\raaaaa\r\naaaaaaaaaa"}}}));\ + test(make_expected(S18d, node_type{{resp3::type::blob_string, 1UL, 0UL, {}}}));\ + test(make_expected(make_blob_string(blob), node_type{{resp3::type::blob_string, 1UL, 0UL, {blob}}}));\ + test(make_expected(S04a, result>{{11}})); \ + test(make_expected(S04d, result>>{response>{{set_e1c}}})); \ + test(make_expected(S04c, result>>{response>{{map_expected_1b}}}));\ + test(make_expected(S03b, map_e1l));\ + test(make_expected(S06a, result{0}, {}, resp3::type::null)); \ + test(make_expected(S06a, map_type{}, {}, resp3::type::null));\ + test(make_expected(S06a, array_type{}, {}, resp3::type::null));\ + test(make_expected(S06a, result>{}, {}, resp3::type::null));\ + test(make_expected(S06a, result>{}, {}, resp3::type::null));\ + test(make_expected(S10a, result{}, boost::redis::error::resp3_simple_error)); \ + test(make_expected(S10a, node_type{{resp3::type::simple_error, 1UL, 0UL, {"Error"}}}, {}, resp3::type::simple_error)); \ + test(make_expected(S10b, node_type{{resp3::type::simple_error, 1UL, 0UL, {""}}}, {}, resp3::type::simple_error)); \ + test(make_expected(S12a, node_type{{resp3::type::blob_error, 1UL, 0UL, {"SYNTAX invalid syntax"}}}, {}, resp3::type::blob_error));\ + test(make_expected(S12b, node_type{{resp3::type::blob_error, 1UL, 0UL, {}}}, {}, resp3::type::blob_error));\ + test(make_expected(S12c, result{}, boost::redis::error::resp3_blob_error));\ + +BOOST_AUTO_TEST_CASE(sansio) { - net::io_context ioc; - - auto ex = ioc.get_executor(); - -#define TEST test_sync - NUMBER_TEST_CONDITIONS(TEST) -#undef TEST - -#define TEST test_async - NUMBER_TEST_CONDITIONS(TEST) -#undef TEST - - ioc.run(); + NUMBER_TEST_CONDITIONS(test_sync) } BOOST_AUTO_TEST_CASE(ignore_adapter_simple_error) { - net::io_context ioc; - std::string rbuffer; - - boost::system::error_code ec; - - test_stream ts {ioc}; - ts.append(S10a); - redis::detail::read(ts, net::dynamic_buffer(rbuffer), adapt2(ignore), ec); - BOOST_CHECK_EQUAL(ec, boost::redis::error::resp3_simple_error); - BOOST_TEST(!rbuffer.empty()); + test_sync2(make_expected(S10a, ignore, boost::redis::error::resp3_simple_error)); } BOOST_AUTO_TEST_CASE(ignore_adapter_blob_error) { - net::io_context ioc; - std::string rbuffer; - boost::system::error_code ec; - - test_stream ts {ioc}; - ts.append(S12a); - redis::detail::read(ts, net::dynamic_buffer(rbuffer), adapt2(ignore), ec); - BOOST_CHECK_EQUAL(ec, boost::redis::error::resp3_blob_error); - BOOST_TEST(!rbuffer.empty()); + test_sync2(make_expected(S12a, ignore, boost::redis::error::resp3_blob_error)); } BOOST_AUTO_TEST_CASE(ignore_adapter_no_error) { - net::io_context ioc; - std::string rbuffer; - boost::system::error_code ec; - - test_stream ts {ioc}; - ts.append(S05b); - auto const consumed = redis::detail::read(ts, net::dynamic_buffer(rbuffer), adapt2(ignore), ec); - BOOST_TEST(!ec); - BOOST_CHECK_EQUAL(rbuffer.size(), consumed); + test_sync2(make_expected(S05b, ignore)); } //----------------------------------------------------------------------------------- @@ -660,7 +575,7 @@ BOOST_AUTO_TEST_CASE(adapter) using boost::redis::adapter::boost_redis_adapt; using resp3::type; - boost::system::error_code ec; + error_code ec; response resp; @@ -674,4 +589,3 @@ BOOST_AUTO_TEST_CASE(adapter) BOOST_CHECK_EQUAL(std::get<1>(resp).value(), 42); BOOST_TEST(!ec); } - diff --git a/tests/test_low_level_async.cpp b/tests/test_low_level_async.cpp deleted file mode 100644 index b8b47d14..00000000 --- a/tests/test_low_level_async.cpp +++ /dev/null @@ -1,78 +0,0 @@ -/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) - * - * Distributed under the Boost Software License, Version 1.0. (See - * accompanying file LICENSE.txt) - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#define BOOST_TEST_MODULE conn-tls -#include -#if defined(BOOST_ASIO_HAS_CO_AWAIT) - -namespace net = boost::asio; -namespace redis = boost::redis; -using resolver = net::use_awaitable_t<>::as_default_on_t; -using tcp_socket = net::use_awaitable_t<>::as_default_on_t; -using boost::redis::adapter::adapt2; -using net::ip::tcp; -using boost::redis::request; -using boost::redis::adapter::result; -using redis::config; - -auto co_main(config cfg) -> net::awaitable -{ - auto ex = co_await net::this_coro::executor; - - resolver resv{ex}; - auto const addrs = co_await resv.async_resolve(cfg.addr.host, cfg.addr.port); - tcp_socket socket{ex}; - co_await net::async_connect(socket, addrs); - - // Creates the request and writes to the socket. - request req; - req.push("HELLO", 3); - req.push("PING", "Hello world"); - req.push("QUIT"); - co_await redis::detail::async_write(socket, req); - - // Responses - std::string buffer; - result resp; - - std::size_t consumed = 0; - // Reads the responses to all commands in the request. - auto dbuf = net::dynamic_buffer(buffer); - consumed = co_await redis::detail::async_read(socket, dbuf); - dbuf.consume(consumed); - consumed = co_await redis::detail::async_read(socket, dbuf, adapt2(resp)); - dbuf.consume(consumed); - consumed = co_await redis::detail::async_read(socket, dbuf); - dbuf.consume(consumed); - - std::cout << "Ping: " << resp.value() << std::endl; -} - -BOOST_AUTO_TEST_CASE(low_level_async) -{ - net::io_context ioc; - net::co_spawn(ioc, co_main({}), net::detached); - ioc.run(); -} - -#else // defined(BOOST_ASIO_HAS_CO_AWAIT) - -BOOST_AUTO_TEST_CASE(low_level_async) -{ -} - -#endif // defined(BOOST_ASIO_HAS_CO_AWAIT) diff --git a/tests/test_low_level_sync.cpp b/tests/test_low_level_sync.cpp deleted file mode 100644 index 2349fbee..00000000 --- a/tests/test_low_level_sync.cpp +++ /dev/null @@ -1,61 +0,0 @@ -/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) - * - * Distributed under the Boost Software License, Version 1.0. (See - * accompanying file LICENSE.txt) - */ - -#include -#include -#include -#include -#include -#define BOOST_TEST_MODULE conn-quit -#include -#include -#include - -namespace net = boost::asio; -namespace redis = boost::redis; -using boost::redis::adapter::adapt2; -using boost::redis::request; -using boost::redis::adapter::result; - -BOOST_AUTO_TEST_CASE(low_level_sync) -{ - try { - std::string const host = "127.0.0.1"; - std::string const port = "6379"; - - net::io_context ioc; - net::ip::tcp::resolver resv{ioc}; - auto const res = resv.resolve(host, port); - net::ip::tcp::socket socket{ioc}; - net::connect(socket, res); - - // Creates the request and writes to the socket. - request req; - req.push("HELLO", 3); - req.push("PING", "Hello world"); - req.push("QUIT"); - redis::detail::write(socket, req); - - std::string buffer; - result resp; - - std::size_t consumed = 0; - // Reads the responses to all commands in the request. - auto dbuf = net::dynamic_buffer(buffer); - consumed = redis::detail::read(socket, dbuf); - dbuf.consume(consumed); - consumed = redis::detail::read(socket, dbuf, adapt2(resp)); - dbuf.consume(consumed); - consumed = redis::detail::read(socket, dbuf); - dbuf.consume(consumed); - - std::cout << "Ping: " << resp.value() << std::endl; - - } catch (std::exception const& e) { - std::cerr << e.what() << std::endl; - exit(EXIT_FAILURE); - } -}