Skip to content

Commit

Permalink
modernize/tidy
Browse files Browse the repository at this point in the history
  • Loading branch information
sehe committed Jan 25, 2021
1 parent 9f7eda9 commit 73bbaa1
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 60 deletions.
13 changes: 7 additions & 6 deletions src/hubclient.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include "hubclient.h"

namespace msghublib { namespace detail {
namespace msghublib::detail {
using boost::asio::ip::tcp;

auto hubclient::bind(void (hubclient::*handler)(error_code)) {
Expand Down Expand Up @@ -29,7 +29,7 @@ namespace msghublib { namespace detail {

void hubclient::write(const hubmessage& msg)
{
post(socket_.get_executor(), [this, msg, self=shared_from_this()]{
post(socket_.get_executor(), [this, msg, self=shared_from_this()] () mutable {
bool write_in_progress = !outmsg_queue_.empty();
outmsg_queue_.push_back(std::move(msg));
if (!write_in_progress)
Expand All @@ -48,7 +48,7 @@ namespace msghublib { namespace detail {
async_read(socket_, inmsg_.payload_area(), bind(&hubclient::handle_read_body));
}

// TODO handle invalid headers (connection reset?)
// TODO(sehe): handle invalid headers (connection reset?)
}

void hubclient::handle_read_body(error_code error)
Expand All @@ -59,7 +59,7 @@ namespace msghublib { namespace detail {
// Get next
async_read(socket_, inmsg_.header_buf(), bind(&hubclient::handle_read_header));
}
// TODO handle IO failure
// TODO(sehe): handle IO failure
}

void hubclient::handle_write(error_code error)
Expand All @@ -68,7 +68,8 @@ namespace msghublib { namespace detail {
outmsg_queue_.pop_front();

if (!outmsg_queue_.empty()) {
// Write next from queue // TODO remove duplication
// Write next from queue
// TODO(sehe) remove duplication
async_write(socket_,
outmsg_queue_.front().on_the_wire(),
bind(&hubclient::handle_write));
Expand All @@ -77,4 +78,4 @@ namespace msghublib { namespace detail {
// error TODO handling
}

} }
} // namespace msghublib::detail
13 changes: 7 additions & 6 deletions src/hubclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,17 @@ class hubclient : public std::enable_shared_from_this<hubclient>
void write(const hubmessage& msg);
private:
using error_code = boost::system::error_code;
void handle_read_header(error_code);
void handle_read_body(error_code);
void handle_write(error_code);
void handle_read_header(error_code /*error*/);
void handle_read_body(error_code /*error*/);
void handle_write(error_code /*error*/);

auto bind(void (hubclient::*)(error_code));
private:
auto bind(void (hubclient::* /*handler*/)(error_code));

tcp::socket socket_;
ihub& distributor_;
hubmessage inmsg_;
hubmessage_queue outmsg_queue_;
};

} }
} // namespace detail
} // namespace msghublib
8 changes: 4 additions & 4 deletions src/hubconnection.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include "hubconnection.h"

namespace msghublib { namespace detail {
namespace msghublib::detail {
using boost::asio::ip::tcp;

auto hubconnection::bind(void (hubconnection::*handler)(error_code)) {
Expand Down Expand Up @@ -42,7 +42,7 @@ namespace msghublib { namespace detail {
else
{
#pragma GCC diagnostic ignored "-Wdeprecated" // implicit this-capture
post(socket_.get_executor(), [=, self=shared_from_this()]
post(socket_.get_executor(), [=, self=shared_from_this()] () mutable
{ do_write(std::move(msg)); });
}
return true;
Expand Down Expand Up @@ -118,7 +118,7 @@ namespace msghublib { namespace detail {
{
is_closing = true; // atomic

// TODO: Unsubscribe?
// TODO(sehe): Unsubscribe?

if (forced || outmsg_queue_.empty()) {
if (socket_.is_open()) {
Expand All @@ -128,4 +128,4 @@ namespace msghublib { namespace detail {
}
}

} }
} // namespace msghublib::detail
6 changes: 3 additions & 3 deletions src/hubconnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,20 @@ class hubconnection : public std::enable_shared_from_this<hubconnection>

private:
using error_code = boost::system::error_code;
auto bind(void (hubconnection::*)(error_code));
auto bind(void (hubconnection::* /*handler*/)(error_code));

void handle_read_header(error_code error);
void handle_read_body(error_code error);
void do_write(hubmessage msg);
void handle_write(error_code error);
void do_close(bool forced);

private:
tcp::socket socket_;
ihub& courier_;
hubmessage inmsg_;
hubmessage_queue outmsg_queue_;
boost::atomic_bool is_closing;
};

} }
} // namespace detail
} // namespace msghublib
4 changes: 2 additions & 2 deletions src/hubmessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace msghublib {
headers_.msgaction = action_;
headers_.magic = cookie;

auto out = payload_.data();
auto *out = payload_.data();
out = std::copy_n(topic.data(), topic.size(), out);
out = std::copy_n(msg.data(), msg.size(), out);
}
Expand All @@ -41,4 +41,4 @@ namespace msghublib {
.substr(headers_.topiclen, headers_.bodylen);
}

}
} // namespace msghublib
23 changes: 13 additions & 10 deletions src/hubmessage.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <boost/asio/buffer.hpp>
#include <functional>
#include <string_view>

Expand All @@ -12,18 +13,20 @@ namespace msghublib {
class hubmessage
{
public:
// the following affect on-the-wire compatiblity
enum action : char { subscribe, unsubscribe, publish };
enum { version = 0x1 };
enum { cookie = 0xF00D ^ (version << 8) };
enum { messagesize = 0x2000 };
// the following does NOT affect on-the-wire compatiblity
enum { preallocated = 196 };

hubmessage(action a={}, std::string_view topic={}, span<char const> msg = {});

bool verify() const;

action get_action() const;
std::string_view topic() const;
span<char const> body() const;
[[nodiscard]] bool verify() const;
[[nodiscard]] action get_action() const;
[[nodiscard]] std::string_view topic() const;
[[nodiscard]] span<char const> body() const;

private:
#pragma pack(push, 1)
Expand All @@ -36,7 +39,7 @@ class hubmessage
#pragma pack(pop)

headers_t headers_;
boost::container::small_vector<char, 242> payload_;
boost::container::small_vector<char, preallocated> payload_;

public:
// input buffer views
Expand All @@ -50,14 +53,14 @@ class hubmessage
}

// output buffer views
auto on_the_wire() const {
return std::vector {
[[nodiscard]] auto on_the_wire() const {
return std::array<boost::asio::const_buffer, 2> {
boost::asio::buffer(&headers_, sizeof(headers_)),
boost::asio::buffer(payload_.data(), payload_.size())
};
}
};

typedef std::deque<hubmessage> hubmessage_queue;
using hubmessage_queue = std::deque<hubmessage>;

}
} // namespace msghublib
2 changes: 1 addition & 1 deletion src/ihub.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ namespace msghublib {
virtual void deliver(hubmessage const& msg) = 0;
};
}
}
} // namespace msghublib
49 changes: 24 additions & 25 deletions src/msghub.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "msghub.h"

#include <utility>

#include "hubclient.h"
#include "hubconnection.h"
#include "ihub.h"
Expand Down Expand Up @@ -28,33 +30,28 @@ namespace msghublib {
std::multimap<std::string, std::weak_ptr<hubclient>> remote_subs_;

public:
impl(boost::asio::any_io_executor executor)
explicit impl(boost::asio::any_io_executor const& executor)
: executor_(executor)
, acceptor_(make_strand(executor))
{}

void stop() {
{
std::shared_ptr<hubconnection> rhub;
if (auto p = std::atomic_exchange(&remote_hub_, rhub))
if (auto p = std::atomic_exchange(&remote_hub_, rhub)) {
p->close(false);
}
}

if (!weak_from_this().expired()) {
post(acceptor_.get_executor(), [this, self = shared_from_this()] {
if (acceptor_.is_open())
acceptor_.cancel();
});
} else {
if (acceptor_.is_open())
acceptor_.cancel();
}

if (0) {
std::lock_guard lk(mutex_);
for (auto& [_, client] : remote_subs_)
if (auto alive = client.lock())
alive->stop();
post(acceptor_.get_executor(),
[this, self = shared_from_this()] {
if (acceptor_.is_open()) {
acceptor_.cancel();
}
});
} else if (acceptor_.is_open()) {
acceptor_.cancel();
}

work_.reset();
Expand Down Expand Up @@ -114,13 +111,13 @@ namespace msghublib {
return false;
}

bool subscribe(const std::string& topic, msghub::onmessage handler) {
bool subscribe(const std::string& topic, const msghub::onmessage& handler) {
std::unique_lock lk(mutex_);
if (auto [it, ins] = local_subs_.emplace(topic, handler); ins) {
lk.unlock();
if (auto p = atomic_load(&remote_hub_)) {
return p->write({ hubmessage::action::subscribe, topic }, true);
// TODO: wait feedback from server here?
// TODO(sehe): wait feedback from server here?
}
} else {
// just update the handler
Expand All @@ -133,15 +130,15 @@ namespace msghublib {

private:
msghub::onmessage const& lookup_handler(std::string_view topic) const {
static const msghub::onmessage no_handler = [](auto...) {};
static const msghub::onmessage no_handler = [](auto... /*unused*/) {};

std::lock_guard lk(mutex_);
auto it = local_subs_.find(std::string(topic));
return (it == local_subs_.end()) ? no_handler : it->second;
}

void distribute(std::shared_ptr<hubclient> const& subscriber,
hubmessage const& msg) {
hubmessage const& msg) override {
std::string topic(msg.topic());
std::unique_lock lk(mutex_);
auto range = remote_subs_.equal_range(topic);
Expand All @@ -152,8 +149,9 @@ namespace msghublib {
if (auto alive = it->second.lock()) {
alive->write(msg);
++it;
} else
} else {
it = remote_subs_.erase(it);
}
}
break;

Expand All @@ -168,10 +166,11 @@ namespace msghublib {
case hubmessage::action::unsubscribe:
for (auto it = range.first; it != range.second;) {
if (auto alive = it->second.lock();
!alive || alive == subscriber)
!alive || alive == subscriber) {
it = remote_subs_.erase(it);
else
} else {
++it;
}
}
break;

Expand All @@ -180,7 +179,7 @@ namespace msghublib {
}
}

void deliver(hubmessage const& msg) {
void deliver(hubmessage const& msg) override {
lookup_handler(msg.topic())(msg.topic(), msg.body());
}

Expand Down Expand Up @@ -221,7 +220,7 @@ namespace msghublib {
bool msghub::connect(const std::string& hostip, uint16_t port) { return pimpl->connect(hostip, port); }
bool msghub::create(uint16_t port) { return pimpl->create(port); }
bool msghub::unsubscribe(const std::string& topic) { return pimpl->unsubscribe(topic); }
bool msghub::subscribe(const std::string& topic, onmessage handler) { return pimpl->subscribe(topic, handler); }
bool msghub::subscribe(const std::string& topic, onmessage handler) { return pimpl->subscribe(topic, std::move(handler)); }
bool msghub::publish(std::string_view topic, span<char const> message) { return pimpl->publish(topic, message); }

} // namespace msghublib
4 changes: 2 additions & 2 deletions test/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ void test_server_oncleintfailure();
void test_toobigmsg();
void test_emptymsg();

test_suite* init_unit_test_suite(int, char**)
test_suite* init_unit_test_suite(int /*unused*/, char** /*unused*/)
{
test_suite *test = BOOST_TEST_SUITE("messagehub test");
auto *test = BOOST_TEST_SUITE("messagehub test");

test->add(BOOST_TEST_CASE(&test_create));
test->add(BOOST_TEST_CASE(&test_connect));
Expand Down
2 changes: 1 addition & 1 deletion test/subscribe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace {
BOOST_TEST(expected == message, boost::test_tools::per_element());
newmessage.notify_one();
}
}
} // namespace

void test_subscribe()
{
Expand Down

0 comments on commit 73bbaa1

Please sign in to comment.