Skip to content

Commit

Permalink
Implemented Backup domains in v0 protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
FlorianReimold committed May 3, 2024
1 parent a0f6f2d commit c480870
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 71 deletions.
3 changes: 1 addition & 2 deletions ecal/service/ecal_service/src/client_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ namespace eCAL
{
if (protocol_version == 0)
{
// TODO: Enable V0 protocol again
//impl_ = ClientSessionV0::create(io_context, address, port, event_callback, logger);
impl_ = ClientSessionV0::create(io_context, server_list, event_callback, logger);
}
else
{
Expand Down
111 changes: 81 additions & 30 deletions ecal/service/ecal_service/src/client_session_impl_v0.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <memory>
#include <mutex>
#include <string>
#include <utility>
#include <vector>

namespace eCAL
Expand All @@ -38,27 +39,24 @@ namespace eCAL
/////////////////////////////////////
// Constructor, Destructor, Create
/////////////////////////////////////
std::shared_ptr<ClientSessionV0> ClientSessionV0::create(const std::shared_ptr<asio::io_context>& io_context
, const std::string& address
, std::uint16_t port
, const EventCallbackT& event_callback
, const LoggerT& logger)
std::shared_ptr<ClientSessionV0> ClientSessionV0::create(const std::shared_ptr<asio::io_context>& io_context
, const std::vector<std::pair<std::string, std::uint16_t>>& server_list
, const EventCallbackT& event_callback
, const LoggerT& logger)
{
std::shared_ptr<ClientSessionV0> instance(new ClientSessionV0(io_context, address, port, event_callback, logger));
std::shared_ptr<ClientSessionV0> instance(new ClientSessionV0(io_context, server_list, event_callback, logger));

instance->resolve_endpoint();
instance->resolve_endpoint(0); // TODO: Write a test that checks what happens when the server_list is empty

return instance;
}

ClientSessionV0::ClientSessionV0(const std::shared_ptr<asio::io_context>& io_context
, const std::string& address
, std::uint16_t port
, const EventCallbackT& event_callback
, const LoggerT& logger)
ClientSessionV0::ClientSessionV0(const std::shared_ptr<asio::io_context>& io_context
, const std::vector<std::pair<std::string, std::uint16_t>>& server_list
, const EventCallbackT& event_callback
, const LoggerT& logger)
: ClientSessionBase(io_context, event_callback)
, address_ (address)
, port_ (port)
, server_list_ (server_list)
, service_call_queue_strand_(*io_context)
, resolver_ (*io_context)
, logger_ (logger)
Expand All @@ -78,42 +76,65 @@ namespace eCAL
//////////////////////////////////////
// Connection establishement
//////////////////////////////////////
void ClientSessionV0::resolve_endpoint()
void ClientSessionV0::resolve_endpoint(size_t server_list_index)
{
ECAL_SERVICE_LOG_DEBUG(logger_, "Resolving endpoint [" + address_ + ":" + std::to_string(port_) + "]...");
ECAL_SERVICE_LOG_DEBUG(logger_, "Resolving endpoint [" + server_list_[server_list_index].first + ":" + std::to_string(server_list_[server_list_index].second) + "]...");

const asio::ip::tcp::resolver::query query(address_, std::to_string(port_));
const asio::ip::tcp::resolver::query query(server_list_[server_list_index].first, std::to_string(server_list_[server_list_index].second));

resolver_.async_resolve(query
, service_call_queue_strand_.wrap([me = enable_shared_from_this<ClientSessionV0>::shared_from_this()]
, service_call_queue_strand_.wrap([me = enable_shared_from_this<ClientSessionV0>::shared_from_this(), server_list_index]
(asio::error_code ec, const asio::ip::tcp::resolver::iterator& resolved_endpoints)
{
if (ec)
{
const std::string message = "Failed resolving endpoint [" + me->address_ + ":" + std::to_string(me->port_) + "]: " + ec.message();
me->logger_(LogLevel::Error, message);
me->handle_connection_loss_error(message);
#if ECAL_SERVICE_LOG_DEBUG_ENABLED
{
const std::string message = "Failed resolving endpoint [" + me->server_list_[server_list_index].first + ":" + std::to_string(me->server_list_[server_list_index].second) + "]: " + ec.message();
ECAL_SERVICE_LOG_DEBUG(me->logger_, message);
}
#endif

if (server_list_index + 1 < me->server_list_.size())
{
// Try next possible endpoint
me->resolve_endpoint(server_list_index + 1);
}
else
{
std::string message = "Failed resolving any endpoint: ";
for (size_t j = 0; j < me->server_list_.size(); ++j)
{
message += me->server_list_[j].first + ":" + std::to_string(me->server_list_[j].second);
if (j + 1 < me->server_list_.size())
{
message += ", ";
}
}
me->logger_(LogLevel::Error, message);
me->handle_connection_loss_error(message);
}
return;
}
else
{
#if ECAL_SERVICE_LOG_DEBUG_VERBOSE_ENABLED
// Verbose-debug log of all endpoints
{
std::string endpoints_str = "Resolved endpoints for " + me->address_ + ": ";
std::string endpoints_str = "Resolved endpoints for " + me->server_list_[server_list_index].first + ": ";
for (auto it = resolved_endpoints; it != asio::ip::tcp::resolver::iterator(); ++it)
{
endpoints_str += endpoint_to_string(*it) + ", ";
}
ECAL_SERVICE_LOG_DEBUG_VERBOSE(me->logger_, endpoints_str);
}
#endif //ECAL_SERVICE_LOG_DEBUG_VERBOSE_ENABLED
me->connect_to_endpoint(resolved_endpoints);
me->connect_to_endpoint(resolved_endpoints, server_list_index);
}
}));
}

void ClientSessionV0::connect_to_endpoint(const asio::ip::tcp::resolver::iterator& resolved_endpoints)
void ClientSessionV0::connect_to_endpoint(const asio::ip::tcp::resolver::iterator& resolved_endpoints, size_t server_list_index)
{
// Convert the resolved_endpoints iterator to an endpoint sequence
// (i.e. a vector of endpoints)
Expand All @@ -126,14 +147,36 @@ namespace eCAL
const std::lock_guard<std::mutex> socket_lock(socket_mutex_);
asio::async_connect(socket_
, *endpoint_sequence
, service_call_queue_strand_.wrap([me = shared_from_this(), endpoint_sequence](asio::error_code ec, const asio::ip::tcp::endpoint& endpoint)
, service_call_queue_strand_.wrap([me = shared_from_this(), endpoint_sequence, server_list_index](asio::error_code ec, const asio::ip::tcp::endpoint& endpoint)
{
(void)endpoint;
if (ec)
{
const std::string message = "Failed to connect to endpoint [" + me->address_ + ":" + std::to_string(me->port_) + "]: " + ec.message();
me->logger_(LogLevel::Error, message);
me->handle_connection_loss_error(message);
{
// Log an error
const std::string message = "Failed to connect to endpoint [" + me->chosen_endpoint_.first + ":" + std::to_string(me->chosen_endpoint_.second) + "]: " + ec.message();
me->logger_(LogLevel::Error, message);
}

// If there are more servers available, try the next one
if (server_list_index + 1 < me->server_list_.size())
{
me->resolve_endpoint(server_list_index + 1);
}
else
{
std::string message = "Failed to connect to any endpoint: ";
for (size_t j = 0; j < me->server_list_.size(); ++j)
{
message += me->server_list_[j].first + ":" + std::to_string(me->server_list_[j].second);
if (j + 1 < me->server_list_.size())
{
message += ", ";
}
}
me->logger_(LogLevel::Error, message);
me->handle_connection_loss_error(message);
}
return;
}
else
Expand All @@ -156,6 +199,12 @@ namespace eCAL
}
}

{
// Set the chosen endpoint
std::lock_guard<std::mutex> chosen_endpoint_lock(me->chosen_endpoint_mutex_);
me->chosen_endpoint_ = me->server_list_[server_list_index];
}

const std::string message = "Connected to server. Using protocol version 0.";
me->logger_(LogLevel::Info, "[" + get_connection_info_string(me->socket_) + "] " + message);

Expand Down Expand Up @@ -356,12 +405,14 @@ namespace eCAL

std::string ClientSessionV0::get_address() const
{
return address_;
std::lock_guard<std::mutex> chosen_endpoint_lock(chosen_endpoint_mutex_);
return chosen_endpoint_.first;
}

std::uint16_t ClientSessionV0::get_port() const
{
return port_;
std::lock_guard<std::mutex> chosen_endpoint_lock(chosen_endpoint_mutex_);
return chosen_endpoint_.second;
}

State ClientSessionV0::get_state() const
Expand Down
33 changes: 18 additions & 15 deletions ecal/service/ecal_service/src/client_session_impl_v0.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
#pragma once

#include "client_session_impl_base.h"
#include <cstdint>

#include <ecal/service/logger.h>

#include <cstdint>
#include <deque>
#include <memory>
#include <mutex>
#include <string>
#include <utility>
#include <vector>

namespace eCAL
{
Expand All @@ -50,18 +53,16 @@ namespace eCAL
// Constructor, Destructor, Create
/////////////////////////////////////
public:
static std::shared_ptr<ClientSessionV0> create(const std::shared_ptr<asio::io_context>& io_context
, const std::string& address
, std::uint16_t port
, const EventCallbackT& event_callback
, const LoggerT& logger = default_logger("Service Client V1"));
static std::shared_ptr<ClientSessionV0> create(const std::shared_ptr<asio::io_context>& io_context
, const std::vector<std::pair<std::string, std::uint16_t>>& server_list
, const EventCallbackT& event_callback
, const LoggerT& logger = default_logger("Service Client V1"));

protected:
ClientSessionV0(const std::shared_ptr<asio::io_context>& io_context
, const std::string& address
, std::uint16_t port
, const EventCallbackT& event_callback
, const LoggerT& logger);
ClientSessionV0(const std::shared_ptr<asio::io_context>& io_context
, const std::vector<std::pair<std::string, std::uint16_t>>& server_list
, const EventCallbackT& event_callback
, const LoggerT& logger);

public:
// Delete copy / move constructor and assignment operator
Expand All @@ -77,8 +78,8 @@ namespace eCAL
// Connection establishement
//////////////////////////////////////
private:
void resolve_endpoint();
void connect_to_endpoint(const asio::ip::tcp::resolver::iterator& resolved_endpoints);
void resolve_endpoint(size_t server_list_index);
void connect_to_endpoint(const asio::ip::tcp::resolver::iterator& resolved_endpoints, size_t server_list_index);

//////////////////////////////////////
// Service calls
Expand Down Expand Up @@ -137,8 +138,10 @@ namespace eCAL
//////////////////////////////////////
private:

const std::string address_; //!< The original address that this client was created with.
const std::uint16_t port_; //!< The original port that this client was created with.
const std::vector<std::pair<std::string, std::uint16_t>> server_list_; //!< The list of servers that this client was created with. They will be tried in order.

mutable std::mutex chosen_endpoint_mutex_; //!< Protects the chosen_endpoint_ variable.
std::pair<std::string, std::uint16_t> chosen_endpoint_; //!< The endpoint that the client is currently connected to. Protected by chosen_endpoint_mutex_.

asio::io_context::strand service_call_queue_strand_;
asio::ip::tcp::resolver resolver_;
Expand Down
Loading

0 comments on commit c480870

Please sign in to comment.