Skip to content

Commit

Permalink
C++11 msghub_category error codes
Browse files Browse the repository at this point in the history
The old situation had easy-to-ignore boolean return values.

Ihe new interface has tich information, are harder to ignore, but
convenient to use correctly (not providing error_code will raise
system_error exception)
  • Loading branch information
sehe committed Jan 26, 2021
1 parent 73bbaa1 commit a8f6c3b
Show file tree
Hide file tree
Showing 12 changed files with 247 additions and 152 deletions.
24 changes: 10 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,13 @@ Examples
boost::asio::io_context io;
// Create hub to listen on 0xbee port
mh::msghub hub(io.get_executor());
if (!hub.create(0xbee)) {
std::cerr << "Couldn't create hub\n";
} else {
// Subscribe on "any topic"
/* bool ok = */hub.subscribe("any topic", on_message);
hub.create(0xbee);

// Current or any another client
/* bool ok = */hub.publish("any topic", "new message");
}
// Subscribe on "any topic"
hub.subscribe("any topic", on_message);

// Current or any another client
hub.publish("any topic", "new message");

io.run(); // keep server active, if created
}
Expand All @@ -54,9 +52,8 @@ Examples
{
boost::asio::io_context io;
mh::msghub hub(io.get_executor());
if (hub.connect("localhost", 0xbee)) {
/*bool ok = */hub.publish("any topic", "new message");
}
hub.connect("localhost", 0xbee);
hub.publish("any topic", "new message");

hub.stop();
io.run();
Expand All @@ -72,9 +69,8 @@ Examples
{
boost::asio::thread_pool io(5); // count optional
mh::msghub hub(io.get_executor());
if (hub.connect("localhost", 0xbee)) {
/*bool ok = */hub.publish("any topic", "new message");
}
hub.connect("localhost", 0xbee);
hub.publish("any topic", "new message");

hub.stop();
io.join();
Expand Down
45 changes: 45 additions & 0 deletions pub/hub_error.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#pragma once
#include <boost/system/error_code.hpp>
#include <boost/system/is_error_code_enum.hpp>
#include <boost/system/system_error.hpp>
#include <system_error>
#include <type_traits>

namespace msghublib {
using boost::system::error_code;
using boost::system::error_category;
using boost::system::system_error;

enum hub_errc {
hub_connection_failed = 1,
hub_creation_failed = 2,
hub_not_connected = 3,
};
}

template <> struct boost::system::is_error_code_enum<msghublib::hub_errc>
: std::true_type {};

namespace msghublib {
struct msghub_category : error_category {
virtual char const* name() const noexcept override { return "msghub"; }
virtual std::string message(int ev) const override {
switch (ev) {
case hub_errc::hub_connection_failed: return "hub_connection_failed";
case hub_errc::hub_creation_failed: return "hub_creation_failed";
case hub_errc::hub_not_connected: return "hub_not_connected";
}
return "unknown";
}
};

static inline error_category const& msghub_category() {
static constexpr struct msghub_category s_msghub_category_instance;
return s_msghub_category_instance;
};

[[maybe_unused]] static inline error_code make_error_code(hub_errc e) {
return {e, msghub_category()};
}
}

33 changes: 26 additions & 7 deletions pub/msghub.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#pragma once

#include <boost/system/detail/error_code.hpp>
#include <hub_error.h>
#include <memory>
#include <boost/asio.hpp>
#include <string>
#include <string_view>
#include "span.h"
#include "hub_error.h"

namespace msghublib {

Expand All @@ -17,21 +20,37 @@ namespace msghublib {
explicit msghub(boost::asio::any_io_executor);
~msghub();

bool connect(const std::string& hostip, uint16_t port);
bool create(uint16_t port);
//void connect(const std::string& hostip, uint16_t port);
void connect(const std::string& hostip, uint16_t port, error_code& ec);
void create(uint16_t port, error_code& ec);

bool unsubscribe(const std::string& topic);
bool subscribe(const std::string& topic, onmessage handler);
bool publish(std::string_view topic, span<char const> message);
void unsubscribe(const std::string& topic, error_code& ec);
void subscribe(const std::string& topic, onmessage handler, error_code& ec);
void publish(std::string_view topic, span<char const> message, error_code& ec);

// Treat string literals specially, not including the terminating NUL
template <size_t N>
bool publish(std::string_view topic, char const (&literal)[N]) {
void publish(std::string_view topic, char const (&literal)[N], error_code& ec) {
static_assert(N>0);
return publish(topic, span<char const>{literal, N-1});
publish(topic, span<char const>{literal, N-1}, ec);
}

void stop();

// convenience throwing wrappers
void connect(const std::string& hostip, uint16_t port);
void create(uint16_t port);
void unsubscribe(const std::string& topic);
void subscribe(const std::string& topic, onmessage handler);
void publish(std::string_view topic, span<char const> message);

// Treat string literals specially, not including the terminating NUL
template <size_t N>
void publish(std::string_view topic, char const (&literal)[N]) {
error_code ec;
publish(topic, literal, ec);
if (ec) throw system_error(ec);
}

private:
class impl;
Expand Down
2 changes: 1 addition & 1 deletion src/hubclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace msghublib::detail {
});
}

void hubclient::write(const hubmessage& msg)
void hubclient::send(const hubmessage& msg)
{
post(socket_.get_executor(), [this, msg, self=shared_from_this()] () mutable {
bool write_in_progress = !outmsg_queue_.empty();
Expand Down
2 changes: 1 addition & 1 deletion src/hubclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class hubclient : public std::enable_shared_from_this<hubclient>
tcp::socket& socket();
void start();
void stop();
void write(const hubmessage& msg);
void send(const hubmessage& msg);
private:
using error_code = boost::system::error_code;
void handle_read_header(error_code /*error*/);
Expand Down
110 changes: 47 additions & 63 deletions src/hubconnection.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#include "hubconnection.h"
#include <boost/system/detail/error_code.hpp>
#include <hub_error.h>

namespace msghublib::detail {
using boost::asio::ip::tcp;

auto hubconnection::bind(void (hubconnection::*handler)(error_code)) {
#pragma GCC diagnostic ignored "-Wdeprecated" // implicit this-capture
Expand All @@ -10,100 +11,83 @@ namespace msghublib::detail {
};
}

bool hubconnection::init(const std::string& host, uint16_t port)
{
try
{
void hubconnection::init(const std::string& host, uint16_t port, error_code& ec) {
try {
error_code ec;
tcp::resolver resolver(socket_.get_executor());
tcp::resolver::results_type results = resolver.resolve(host, std::to_string(port));
tcp::resolver::results_type results =
resolver.resolve(host, std::to_string(port), ec);

// Do blocking connect (connection is more important than subscription here)
connect(socket_, results);
// Do blocking connect (connection is more important than
// subscription here)
if (!ec)
connect(socket_, results, ec);

// Schedule packet read
async_read(socket_, inmsg_.header_buf(), bind(&hubconnection::handle_read_header));
}
catch (std::exception&)
{
return false;
if (!ec)
async_read(socket_, inmsg_.header_buf(),
bind(&hubconnection::handle_read_header));
} catch (system_error const& se) {
ec = se.code();
} catch (...) {
ec = hub_errc::hub_connection_failed;
}

return true;
}

bool hubconnection::write(const hubmessage& msg, bool wait)
{
try
{
if (wait)
{
boost::asio::write(socket_, msg.on_the_wire());
}
else
{
void hubconnection::async_send(const hubmessage& msg) {
#pragma GCC diagnostic ignored "-Wdeprecated" // implicit this-capture
post(socket_.get_executor(), [=, self=shared_from_this()] () mutable
{ do_write(std::move(msg)); });
}
return true;
}
catch (std::exception&)
{
return false;
}
post(socket_.get_executor(), [=, self = shared_from_this()]() mutable {
do_send(std::move(msg));
});
}

void hubconnection::close(bool forced)
{
void hubconnection::send(const hubmessage& msg, error_code& ec) {
write(socket_, msg.on_the_wire(), ec);
}

void hubconnection::close(bool forced) {
#pragma GCC diagnostic ignored "-Wdeprecated" // implicit this-capture
post(socket_.get_executor(), [=, self=shared_from_this()]
{ do_close(forced); });
post(socket_.get_executor(),
[=, self = shared_from_this()] { do_close(forced); });
}

void hubconnection::handle_read_header(error_code error)
{
if (!error && inmsg_.verify())
{
async_read(socket_, inmsg_.payload_area(), bind(&hubconnection::handle_read_body));
}
else
{
void hubconnection::handle_read_header(error_code error) {
if (!error && inmsg_.verify()) {
async_read(socket_, inmsg_.payload_area(),
bind(&hubconnection::handle_read_body));
} else {
do_close(true);
}
}

void hubconnection::handle_read_body(error_code error)
{
if (!error)
{
void hubconnection::handle_read_body(error_code error) {
if (!error) {
courier_.deliver(inmsg_);
async_read(socket_, inmsg_.header_buf(), bind(&hubconnection::handle_read_header));
}
else
{
async_read(socket_, inmsg_.header_buf(),
bind(&hubconnection::handle_read_header));
} else {
do_close(true);
}
}

void hubconnection::do_write(hubmessage msg)
{
if (outmsg_queue_.push_back(std::move(msg)); 1 == outmsg_queue_.size())
void hubconnection::do_send(hubmessage msg) {
if (outmsg_queue_.push_back(std::move(msg));
1 == outmsg_queue_.size())
{
async_write(socket_,
outmsg_queue_.front().on_the_wire(),
bind(&hubconnection::handle_write));
async_write(socket_, outmsg_queue_.front().on_the_wire(),
bind(&hubconnection::handle_send));
}
}

void hubconnection::handle_write(error_code error)
void hubconnection::handle_send(error_code error)
{
if (!error)
{
if (outmsg_queue_.pop_front(); !outmsg_queue_.empty())
{
async_write(socket_,
outmsg_queue_.front().on_the_wire(),
bind(&hubconnection::handle_write));
bind(&hubconnection::handle_send));
} else if (is_closing) {
do_close(false);
}
Expand All @@ -122,7 +106,7 @@ namespace msghublib::detail {

if (forced || outmsg_queue_.empty()) {
if (socket_.is_open()) {
boost::system::error_code ec;
error_code ec;
socket_.close(ec);
}
}
Expand Down
14 changes: 8 additions & 6 deletions src/hubconnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

#include "ihub.h"
#include "hubmessage.h"
#include "hub_error.h"

#include <boost/system/detail/error_code.hpp>
#include <string>
#include <memory>
#include <functional>
Expand All @@ -23,25 +25,25 @@ class hubconnection : public std::enable_shared_from_this<hubconnection>
, is_closing(false)
{}

bool init(const std::string& host, uint16_t port);
bool write(const hubmessage& msg, bool wait = false);
void init(const std::string& host, uint16_t port, error_code& ec);
void async_send(const hubmessage& msg);
void send(const hubmessage& msg, error_code& ec);
void close(bool forced);

private:
using error_code = boost::system::error_code;
auto bind(void (hubconnection::* /*handler*/)(error_code));

void handle_read_header(error_code error);
void handle_read_body(error_code error);
void do_write(hubmessage msg);
void handle_write(error_code error);
void do_send(hubmessage msg);
void handle_send(error_code error);
void do_close(bool forced);

tcp::socket socket_;
ihub& courier_;
hubmessage inmsg_;
hubmessage_queue outmsg_queue_;
boost::atomic_bool is_closing;
std::atomic_bool is_closing;
};

} // namespace detail
Expand Down
Loading

0 comments on commit a8f6c3b

Please sign in to comment.