Skip to content

Commit

Permalink
Removes resp3::async_read.
Browse files Browse the repository at this point in the history
  • Loading branch information
mzimbres committed Sep 2, 2023
1 parent d8cf431 commit 1ed8e01
Show file tree
Hide file tree
Showing 18 changed files with 316 additions and 761 deletions.
2 changes: 0 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ if (BOOST_REDIS_TESTS)
make_test(test_conn_exec_error 17)
make_test(test_request 17)
make_test(test_run 17)
make_test(test_low_level_sync 17)
make_test(test_low_level_sync_sans_io 17)
make_test(test_conn_check_health 17)

Expand All @@ -172,7 +171,6 @@ if (BOOST_REDIS_TESTS)
make_test(test_conn_exec_cancel 20)
make_test(test_conn_exec_cancel2 20)
make_test(test_conn_echo_stress 20)
make_test(test_low_level_async 20)
make_test(test_conn_run_cancel 20)
make_test(test_issue_50 20)
endif()
Expand Down
6 changes: 3 additions & 3 deletions examples/cpp17_intro.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include <boost/asio/detached.hpp>
#include <iostream>

namespace net = boost::asio;
namespace asio = boost::asio;
using boost::redis::connection;
using boost::redis::request;
using boost::redis::response;
Expand All @@ -29,10 +29,10 @@ auto main(int argc, char * argv[]) -> int

response<std::string> resp;

net::io_context ioc;
asio::io_context ioc;
connection conn{ioc};

conn.async_run(cfg, {}, net::detached);
conn.async_run(cfg, {}, asio::detached);

conn.async_exec(req, resp, [&](auto ec, auto) {
if (!ec)
Expand Down
1 change: 0 additions & 1 deletion examples/cpp17_intro_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#include <string>
#include <iostream>

namespace net = boost::asio;
using boost::redis::sync_connection;
using boost::redis::request;
using boost::redis::response;
Expand Down
43 changes: 25 additions & 18 deletions examples/cpp20_chat_room.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,31 @@
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
#if defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR)

namespace net = boost::asio;
using stream_descriptor = net::deferred_t::as_default_on_t<net::posix::stream_descriptor>;
using signal_set = net::deferred_t::as_default_on_t<net::signal_set>;
using boost::redis::request;
using boost::redis::generic_response;
namespace asio = boost::asio;
using stream_descriptor = asio::deferred_t::as_default_on_t<asio::posix::stream_descriptor>;
using signal_set = asio::deferred_t::as_default_on_t<asio::signal_set>;
using boost::asio::async_read_until;
using boost::asio::awaitable;
using boost::asio::co_spawn;
using boost::asio::consign;
using boost::asio::deferred;
using boost::asio::detached;
using boost::asio::dynamic_buffer;
using boost::asio::redirect_error;
using boost::asio::use_awaitable;
using boost::redis::config;
using boost::redis::connection;
using boost::redis::generic_response;
using boost::redis::ignore;
using net::redirect_error;
using net::use_awaitable;
using boost::redis::request;
using boost::system::error_code;
using namespace std::chrono_literals;

// Chat over Redis pubsub. To test, run this program from multiple
// terminals and type messages to stdin.

auto
receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
receiver(std::shared_ptr<connection> conn) -> awaitable<void>
{
request req;
req.push("SUBSCRIBE", "channel");
Expand All @@ -45,7 +52,7 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
while (conn->will_reconnect()) {

// Subscribe to channels.
co_await conn->async_exec(req, ignore, net::deferred);
co_await conn->async_exec(req, ignore, deferred);

// Loop reading Redis push messages.
for (error_code ec;;) {
Expand All @@ -63,27 +70,27 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
}

// Publishes stdin messages to a Redis channel.
auto publisher(std::shared_ptr<stream_descriptor> in, std::shared_ptr<connection> conn) -> net::awaitable<void>
auto publisher(std::shared_ptr<stream_descriptor> in, std::shared_ptr<connection> conn) -> awaitable<void>
{
for (std::string msg;;) {
auto n = co_await net::async_read_until(*in, net::dynamic_buffer(msg, 1024), "\n");
auto n = co_await async_read_until(*in, dynamic_buffer(msg, 1024), "\n");
request req;
req.push("PUBLISH", "channel", msg);
co_await conn->async_exec(req, ignore, net::deferred);
co_await conn->async_exec(req, ignore, deferred);
msg.erase(0, n);
}
}

// Called from the main function (see main.cpp)
auto co_main(config cfg) -> net::awaitable<void>
auto co_main(config cfg) -> awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto ex = co_await asio::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
auto stream = std::make_shared<stream_descriptor>(ex, ::dup(STDIN_FILENO));

net::co_spawn(ex, receiver(conn), net::detached);
net::co_spawn(ex, publisher(stream, conn), net::detached);
conn->async_run(cfg, {}, net::consign(net::detached, conn));
co_spawn(ex, receiver(conn), detached);
co_spawn(ex, publisher(stream, conn), detached);
conn->async_run(cfg, {}, consign(detached, conn));

signal_set sig_set{ex, SIGINT, SIGTERM};
co_await sig_set.async_wait();
Expand All @@ -92,7 +99,7 @@ auto co_main(config cfg) -> net::awaitable<void>
}

#else // defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR)
auto co_main(config const&) -> net::awaitable<void>
auto co_main(config const&) -> awaitable<void>
{
std::cout << "Requires support for posix streams." << std::endl;
co_return;
Expand Down
24 changes: 14 additions & 10 deletions examples/cpp20_containers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@

#if defined(BOOST_ASIO_HAS_CO_AWAIT)

namespace net = boost::asio;
namespace asio = boost::asio;
using boost::redis::request;
using boost::redis::response;
using boost::redis::ignore_t;
using boost::redis::ignore;
using boost::redis::config;
using boost::redis::connection;
using boost::asio::awaitable;
using boost::asio::deferred;
using boost::asio::detached;
using boost::asio::consign;

void print(std::map<std::string, std::string> const& cont)
{
Expand All @@ -35,7 +39,7 @@ void print(std::vector<int> const& cont)
}

// Stores the content of some STL containers in Redis.
auto store(std::shared_ptr<connection> conn) -> net::awaitable<void>
auto store(std::shared_ptr<connection> conn) -> awaitable<void>
{
std::vector<int> vec
{1, 2, 3, 4, 5, 6};
Expand All @@ -47,10 +51,10 @@ auto store(std::shared_ptr<connection> conn) -> net::awaitable<void>
req.push_range("RPUSH", "rpush-key", vec);
req.push_range("HSET", "hset-key", map);

co_await conn->async_exec(req, ignore, net::deferred);
co_await conn->async_exec(req, ignore, deferred);
}

auto hgetall(std::shared_ptr<connection> conn) -> net::awaitable<void>
auto hgetall(std::shared_ptr<connection> conn) -> awaitable<void>
{
// A request contains multiple commands.
request req;
Expand All @@ -60,13 +64,13 @@ auto hgetall(std::shared_ptr<connection> conn) -> net::awaitable<void>
response<std::map<std::string, std::string>> resp;

// Executes the request and reads the response.
co_await conn->async_exec(req, resp, net::deferred);
co_await conn->async_exec(req, resp, deferred);

print(std::get<0>(resp).value());
}

// Retrieves in a transaction.
auto transaction(std::shared_ptr<connection> conn) -> net::awaitable<void>
auto transaction(std::shared_ptr<connection> conn) -> awaitable<void>
{
request req;
req.push("MULTI");
Expand All @@ -81,17 +85,17 @@ auto transaction(std::shared_ptr<connection> conn) -> net::awaitable<void>
response<std::optional<std::vector<int>>, std::optional<std::map<std::string, std::string>>> // exec
> resp;

co_await conn->async_exec(req, resp, net::deferred);
co_await conn->async_exec(req, resp, deferred);

print(std::get<0>(std::get<3>(resp).value()).value().value());
print(std::get<1>(std::get<3>(resp).value()).value().value());
}

// Called from the main function (see main.cpp)
net::awaitable<void> co_main(config cfg)
awaitable<void> co_main(config cfg)
{
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
conn->async_run(cfg, {}, net::consign(net::detached, conn));
auto conn = std::make_shared<connection>(co_await asio::this_coro::executor);
conn->async_run(cfg, {}, consign(detached, conn));

co_await store(conn);
co_await transaction(conn);
Expand Down
32 changes: 16 additions & 16 deletions examples/cpp20_echo_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,53 +14,53 @@

#if defined(BOOST_ASIO_HAS_CO_AWAIT)

namespace net = boost::asio;
using tcp_socket = net::deferred_t::as_default_on_t<net::ip::tcp::socket>;
using tcp_acceptor = net::deferred_t::as_default_on_t<net::ip::tcp::acceptor>;
using signal_set = net::deferred_t::as_default_on_t<net::signal_set>;
namespace asio = boost::asio;
using tcp_socket = asio::deferred_t::as_default_on_t<asio::ip::tcp::socket>;
using tcp_acceptor = asio::deferred_t::as_default_on_t<asio::ip::tcp::acceptor>;
using signal_set = asio::deferred_t::as_default_on_t<asio::signal_set>;
using boost::redis::request;
using boost::redis::response;
using boost::redis::config;
using boost::system::error_code;
using boost::redis::connection;
using namespace std::chrono_literals;

auto echo_server_session(tcp_socket socket, std::shared_ptr<connection> conn) -> net::awaitable<void>
auto echo_server_session(tcp_socket socket, std::shared_ptr<connection> conn) -> asio::awaitable<void>
{
request req;
response<std::string> resp;

for (std::string buffer;;) {
auto n = co_await net::async_read_until(socket, net::dynamic_buffer(buffer, 1024), "\n");
auto n = co_await asio::async_read_until(socket, asio::dynamic_buffer(buffer, 1024), "\n");
req.push("PING", buffer);
co_await conn->async_exec(req, resp, net::deferred);
co_await net::async_write(socket, net::buffer(std::get<0>(resp).value()));
co_await conn->async_exec(req, resp, asio::deferred);
co_await asio::async_write(socket, asio::buffer(std::get<0>(resp).value()));
std::get<0>(resp).value().clear();
req.clear();
buffer.erase(0, n);
}
}

// Listens for tcp connections.
auto listener(std::shared_ptr<connection> conn) -> net::awaitable<void>
auto listener(std::shared_ptr<connection> conn) -> asio::awaitable<void>
{
try {
auto ex = co_await net::this_coro::executor;
tcp_acceptor acc(ex, {net::ip::tcp::v4(), 55555});
auto ex = co_await asio::this_coro::executor;
tcp_acceptor acc(ex, {asio::ip::tcp::v4(), 55555});
for (;;)
net::co_spawn(ex, echo_server_session(co_await acc.async_accept(), conn), net::detached);
asio::co_spawn(ex, echo_server_session(co_await acc.async_accept(), conn), asio::detached);
} catch (std::exception const& e) {
std::clog << "Listener: " << e.what() << std::endl;
}
}

// Called from the main function (see main.cpp)
auto co_main(config cfg) -> net::awaitable<void>
auto co_main(config cfg) -> asio::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto ex = co_await asio::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
net::co_spawn(ex, listener(conn), net::detached);
conn->async_run(cfg, {}, net::consign(net::detached, conn));
asio::co_spawn(ex, listener(conn), asio::detached);
conn->async_run(cfg, {}, asio::consign(asio::detached, conn));

signal_set sig_set(ex, SIGINT, SIGTERM);
co_await sig_set.async_wait();
Expand Down
10 changes: 5 additions & 5 deletions examples/cpp20_intro.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@

#if defined(BOOST_ASIO_HAS_CO_AWAIT)

namespace net = boost::asio;
namespace asio = boost::asio;
using boost::redis::request;
using boost::redis::response;
using boost::redis::config;
using boost::redis::connection;

// Called from the main function (see main.cpp)
auto co_main(config cfg) -> net::awaitable<void>
auto co_main(config cfg) -> asio::awaitable<void>
{
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
conn->async_run(cfg, {}, net::consign(net::detached, conn));
auto conn = std::make_shared<connection>(co_await asio::this_coro::executor);
conn->async_run(cfg, {}, asio::consign(asio::detached, conn));

// A request containing only a ping command.
request req;
Expand All @@ -33,7 +33,7 @@ auto co_main(config cfg) -> net::awaitable<void>
response<std::string> resp;

// Executes the request.
co_await conn->async_exec(req, resp, net::deferred);
co_await conn->async_exec(req, resp, asio::deferred);
conn->cancel();

std::cout << "PING: " << std::get<0>(resp).value() << std::endl;
Expand Down
14 changes: 7 additions & 7 deletions examples/cpp20_intro_tls.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,39 +13,39 @@

#if defined(BOOST_ASIO_HAS_CO_AWAIT)

namespace net = boost::asio;
namespace asio = boost::asio;
using boost::redis::request;
using boost::redis::response;
using boost::redis::config;
using boost::redis::logger;
using boost::redis::connection;

auto verify_certificate(bool, net::ssl::verify_context&) -> bool
auto verify_certificate(bool, asio::ssl::verify_context&) -> bool
{
std::cout << "set_verify_callback" << std::endl;
return true;
}

auto co_main(config cfg) -> net::awaitable<void>
auto co_main(config cfg) -> asio::awaitable<void>
{
cfg.use_ssl = true;
cfg.username = "aedis";
cfg.password = "aedis";
cfg.addr.host = "db.occase.de";
cfg.addr.port = "6380";

auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
conn->async_run(cfg, {}, net::consign(net::detached, conn));
auto conn = std::make_shared<connection>(co_await asio::this_coro::executor);
conn->async_run(cfg, {}, asio::consign(asio::detached, conn));

request req;
req.push("PING");

response<std::string> resp;

conn->next_layer().set_verify_mode(net::ssl::verify_peer);
conn->next_layer().set_verify_mode(asio::ssl::verify_peer);
conn->next_layer().set_verify_callback(verify_certificate);

co_await conn->async_exec(req, resp, net::deferred);
co_await conn->async_exec(req, resp, asio::deferred);
conn->cancel();

std::cout << "Response: " << std::get<0>(resp).value() << std::endl;
Expand Down
10 changes: 5 additions & 5 deletions examples/cpp20_json.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include <boost/redis/resp3/serialization.hpp>
#include <boost/json/src.hpp>

namespace net = boost::asio;
namespace asio = boost::asio;
using namespace boost::describe;
using boost::redis::request;
using boost::redis::response;
Expand All @@ -48,11 +48,11 @@ void boost_redis_to_bulk(std::string& to, user const& u)
void boost_redis_from_bulk(user& u, std::string_view sv, boost::system::error_code&)
{ u = boost::json::value_to<user>(boost::json::parse(sv)); }

auto co_main(config cfg) -> net::awaitable<void>
auto co_main(config cfg) -> asio::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto ex = co_await asio::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
conn->async_run(cfg, {}, net::consign(net::detached, conn));
conn->async_run(cfg, {}, asio::consign(asio::detached, conn));

// user object that will be stored in Redis in json format.
user const u{"Joao", "58", "Brazil"};
Expand All @@ -64,7 +64,7 @@ auto co_main(config cfg) -> net::awaitable<void>

response<ignore_t, user> resp;

co_await conn->async_exec(req, resp, net::deferred);
co_await conn->async_exec(req, resp, asio::deferred);
conn->cancel();

// Prints the first ping
Expand Down
Loading

0 comments on commit 1ed8e01

Please sign in to comment.