Skip to content

Commit

Permalink
use a single strand per client to avoid writing to tcp out of order
Browse files Browse the repository at this point in the history
  • Loading branch information
lionkor committed Jan 25, 2024
1 parent ee74d62 commit 7d813f7
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 15 deletions.
4 changes: 4 additions & 0 deletions include/Network.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ using VehicleID = uint16_t;
using namespace boost::asio;

struct Client {
using StrandPtr = std::shared_ptr<boost::asio::strand<ip::tcp::socket::executor_type>>;
using Ptr = std::shared_ptr<Client>;

ClientID id;
Sync<bmp::State> state { bmp::State::None };

Expand Down Expand Up @@ -93,6 +95,8 @@ struct Client {
ip::udp::endpoint m_udp_endpoint;

class Network& m_network;

StrandPtr m_tcp_strand;
};

struct Vehicle {
Expand Down
42 changes: 27 additions & 15 deletions src/Network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@

/// Boost::asio + strands + timer magic to make writes timeout after some time.
template <typename HandlerFn>
static void async_write_timeout(ip::tcp::socket& stream, const_buffer&& sequence, boost::posix_time::milliseconds timeout_ms, HandlerFn&& handler) {
static void async_write_timeout(ip::tcp::socket& stream, const_buffer&& sequence, boost::posix_time::milliseconds timeout_ms, HandlerFn&& handler, Client::StrandPtr strand) {
struct TimeoutHelper : std::enable_shared_from_this<TimeoutHelper> {
/// Given a socket (stream), buffer and a completion handler, constructs a state machine.
TimeoutHelper(ip::tcp::socket& stream, const_buffer buffer, HandlerFn handler)
TimeoutHelper(ip::tcp::socket& stream, const_buffer buffer, HandlerFn handler, Client::StrandPtr strand)
: m_stream(stream)
, m_buffer(std::move(buffer))
, m_handler_fn(std::move(handler)) { }
, m_handler_fn(std::move(handler))
, m_strand(std::move(strand)) { }
/// Kicks off the timer and async_write, which race to cancel each other.
/// Whichever completes first gets to cancel the other one.
/// Effectively, the timer will finish before the write if the write is "timing out",
Expand All @@ -53,12 +54,12 @@ static void async_write_timeout(ip::tcp::socket& stream, const_buffer&& sequence
// as a copy so that it can call the timeout handler on this object.
// the whole thing is wrapped in a strand to avoid this happening on two separate thwrites at the same time,
// i.e. the timer and write finish at the same time on separate thwrites, or other goofy stuff.
m_timer.async_wait(bind_executor(m_strand, [self = this->shared_from_this()](auto&& ec) {
m_timer.async_wait(bind_executor(*m_strand, [self = this->shared_from_this()](auto&& ec) {
self->handle_timeout(ec);
}));
// start the write on the same strand, again giving a copy of a shared_ptr to ourselves so the handler can be
// called.
boost::asio::async_write(m_stream, m_buffer, bind_executor(m_strand, [self = this->shared_from_this()](auto&& ec, auto size) {
boost::asio::async_write(m_stream, m_buffer, bind_executor(*m_strand, [self = this->shared_from_this()](auto&& ec, auto size) {
self->handle_write(ec, size);
}));
}
Expand Down Expand Up @@ -91,14 +92,15 @@ static void async_write_timeout(ip::tcp::socket& stream, const_buffer&& sequence
ip::tcp::socket& m_stream;
const_buffer m_buffer;
HandlerFn m_handler_fn;
boost::asio::strand<ip::tcp::socket::executor_type> m_strand { m_stream.get_executor() };
Client::StrandPtr m_strand;
boost::asio::deadline_timer m_timer { m_stream.get_executor() };
bool m_completed = false;
};

auto helper = std::make_shared<TimeoutHelper>(stream,
std::forward<const_buffer>(sequence),
std::forward<HandlerFn>(handler));
std::forward<HandlerFn>(handler),
strand);
helper->start(timeout_ms);
}

Expand Down Expand Up @@ -195,12 +197,14 @@ void Client::tcp_write(bmp::Packet& packet) {
auto timeout = boost::posix_time::milliseconds(std::max(size_t(500), size_t(std::ceil(double(data->size()) * m_write_byte_timeout))));
beammp_tracef("Packet of size {} B given a timeout of {}ms ({}s)", data->size(), timeout.total_milliseconds(), timeout.seconds());
// write header and packet data
async_write_timeout(m_tcp_socket, buffer(*data), timeout, [data, this](const boost::system::error_code& ec, size_t) {
if (ec && ec.value() == boost::system::errc::operation_canceled) {
// write timeout is fatal
m_network.disconnect(id, "Write timeout");
}
});
async_write_timeout(
m_tcp_socket, buffer(*data), timeout, [data, this](const boost::system::error_code& ec, size_t) {
if (ec && ec.value() == boost::system::errc::operation_canceled) {
// write timeout is fatal
m_network.disconnect(id, "Write timeout");
}
},
m_tcp_strand);
}

void Client::tcp_write_file_raw(const std::filesystem::path& path) {
Expand Down Expand Up @@ -237,7 +241,8 @@ Client::Client(ClientID id, Network& network, ip::tcp::socket&& tcp_socket)
: id(id)
, udp_magic(id ^ (uint64_t(std::rand()) << 32) ^ uint64_t(this))
, m_tcp_socket(std::forward<ip::tcp::socket&&>(tcp_socket))
, m_network(network) {
, m_network(network)
, m_tcp_strand(std::make_shared<StrandPtr::element_type>(m_tcp_socket.get_executor())) {
beammp_debugf("Client {} created", id);
}

Expand Down Expand Up @@ -879,6 +884,13 @@ void Network::handle_mod_download(ClientID id, const bmp::Packet& packet, std::s
switch (packet.purpose) {
case bmp::Purpose::ModsSyncDone: {
beammp_debugf("Client {} is done with mods sync", id);
beammp_debugf("Sending map info to client {}", id);
bmp::Packet map_info {
.purpose = bmp::Purpose::MapInfo,
.raw_data = std::vector<uint8_t>(Application::Settings.MapName.begin(), Application::Settings.MapName.end()),
};
client->tcp_write(map_info);
beammp_debugf("Client {} entering session setup", id);
bmp::Packet state_change {
.purpose = bmp::Purpose::StateChangeSessionSetup,
};
Expand Down Expand Up @@ -1066,7 +1078,7 @@ void Vehicle::refresh_cache(std::unique_lock<std::recursive_mutex>& lock) {
}

TEST_CASE("Vehicle position parse, cache, access") {
Vehicle veh {};
Vehicle veh { {} };
std::string str = R"({"rvel":[0.034001241344458,0.016966195008928,-0.0032029844877877],"rot":[-0.0012675799979579,0.0014056711767528,0.94126306518056,0.3376688606555],"tim":66.978502945043,"vel":[-18.80228647297,22.830758602197,0.0011466381380035],"pos":[562.68027268429,-379.27891669179,160.40605946989],"ping":0.032000000871718})";
veh.update_status(std::vector<uint8_t>(str.begin(), str.end()));
auto status = veh.get_status();
Expand Down

0 comments on commit 7d813f7

Please sign in to comment.