diff --git a/ecal/service/ecal_service/src/client_session.cpp b/ecal/service/ecal_service/src/client_session.cpp index a624a92569..535895e678 100644 --- a/ecal/service/ecal_service/src/client_session.cpp +++ b/ecal/service/ecal_service/src/client_session.cpp @@ -73,8 +73,7 @@ namespace eCAL { if (protocol_version == 0) { - // TODO: Enable V0 protocol again - //impl_ = ClientSessionV0::create(io_context, address, port, event_callback, logger); + impl_ = ClientSessionV0::create(io_context, server_list, event_callback, logger); } else { diff --git a/ecal/service/ecal_service/src/client_session_impl_v0.cpp b/ecal/service/ecal_service/src/client_session_impl_v0.cpp index 739e14b5de..cb51837167 100644 --- a/ecal/service/ecal_service/src/client_session_impl_v0.cpp +++ b/ecal/service/ecal_service/src/client_session_impl_v0.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include namespace eCAL @@ -38,27 +39,24 @@ namespace eCAL ///////////////////////////////////// // Constructor, Destructor, Create ///////////////////////////////////// - std::shared_ptr ClientSessionV0::create(const std::shared_ptr& io_context - , const std::string& address - , std::uint16_t port - , const EventCallbackT& event_callback - , const LoggerT& logger) + std::shared_ptr ClientSessionV0::create(const std::shared_ptr& io_context + , const std::vector>& server_list + , const EventCallbackT& event_callback + , const LoggerT& logger) { - std::shared_ptr instance(new ClientSessionV0(io_context, address, port, event_callback, logger)); + std::shared_ptr instance(new ClientSessionV0(io_context, server_list, event_callback, logger)); - instance->resolve_endpoint(); + instance->resolve_endpoint(0); // TODO: Write a test that checks what happens when the server_list is empty return instance; } - ClientSessionV0::ClientSessionV0(const std::shared_ptr& io_context - , const std::string& address - , std::uint16_t port - , const EventCallbackT& event_callback - , const LoggerT& logger) + ClientSessionV0::ClientSessionV0(const std::shared_ptr& io_context + , const std::vector>& server_list + , const EventCallbackT& event_callback + , const LoggerT& logger) : ClientSessionBase(io_context, event_callback) - , address_ (address) - , port_ (port) + , server_list_ (server_list) , service_call_queue_strand_(*io_context) , resolver_ (*io_context) , logger_ (logger) @@ -78,21 +76,44 @@ namespace eCAL ////////////////////////////////////// // Connection establishement ////////////////////////////////////// - void ClientSessionV0::resolve_endpoint() + void ClientSessionV0::resolve_endpoint(size_t server_list_index) { - ECAL_SERVICE_LOG_DEBUG(logger_, "Resolving endpoint [" + address_ + ":" + std::to_string(port_) + "]..."); + ECAL_SERVICE_LOG_DEBUG(logger_, "Resolving endpoint [" + server_list_[server_list_index].first + ":" + std::to_string(server_list_[server_list_index].second) + "]..."); - const asio::ip::tcp::resolver::query query(address_, std::to_string(port_)); + const asio::ip::tcp::resolver::query query(server_list_[server_list_index].first, std::to_string(server_list_[server_list_index].second)); resolver_.async_resolve(query - , service_call_queue_strand_.wrap([me = enable_shared_from_this::shared_from_this()] + , service_call_queue_strand_.wrap([me = enable_shared_from_this::shared_from_this(), server_list_index] (asio::error_code ec, const asio::ip::tcp::resolver::iterator& resolved_endpoints) { if (ec) { - const std::string message = "Failed resolving endpoint [" + me->address_ + ":" + std::to_string(me->port_) + "]: " + ec.message(); - me->logger_(LogLevel::Error, message); - me->handle_connection_loss_error(message); +#if ECAL_SERVICE_LOG_DEBUG_ENABLED + { + const std::string message = "Failed resolving endpoint [" + me->server_list_[server_list_index].first + ":" + std::to_string(me->server_list_[server_list_index].second) + "]: " + ec.message(); + ECAL_SERVICE_LOG_DEBUG(me->logger_, message); + } +#endif + + if (server_list_index + 1 < me->server_list_.size()) + { + // Try next possible endpoint + me->resolve_endpoint(server_list_index + 1); + } + else + { + std::string message = "Failed resolving any endpoint: "; + for (size_t j = 0; j < me->server_list_.size(); ++j) + { + message += me->server_list_[j].first + ":" + std::to_string(me->server_list_[j].second); + if (j + 1 < me->server_list_.size()) + { + message += ", "; + } + } + me->logger_(LogLevel::Error, message); + me->handle_connection_loss_error(message); + } return; } else @@ -100,7 +121,7 @@ namespace eCAL #if ECAL_SERVICE_LOG_DEBUG_VERBOSE_ENABLED // Verbose-debug log of all endpoints { - std::string endpoints_str = "Resolved endpoints for " + me->address_ + ": "; + std::string endpoints_str = "Resolved endpoints for " + me->server_list_[server_list_index].first + ": "; for (auto it = resolved_endpoints; it != asio::ip::tcp::resolver::iterator(); ++it) { endpoints_str += endpoint_to_string(*it) + ", "; @@ -108,12 +129,12 @@ namespace eCAL ECAL_SERVICE_LOG_DEBUG_VERBOSE(me->logger_, endpoints_str); } #endif //ECAL_SERVICE_LOG_DEBUG_VERBOSE_ENABLED - me->connect_to_endpoint(resolved_endpoints); + me->connect_to_endpoint(resolved_endpoints, server_list_index); } })); } - void ClientSessionV0::connect_to_endpoint(const asio::ip::tcp::resolver::iterator& resolved_endpoints) + void ClientSessionV0::connect_to_endpoint(const asio::ip::tcp::resolver::iterator& resolved_endpoints, size_t server_list_index) { // Convert the resolved_endpoints iterator to an endpoint sequence // (i.e. a vector of endpoints) @@ -126,14 +147,36 @@ namespace eCAL const std::lock_guard socket_lock(socket_mutex_); asio::async_connect(socket_ , *endpoint_sequence - , service_call_queue_strand_.wrap([me = shared_from_this(), endpoint_sequence](asio::error_code ec, const asio::ip::tcp::endpoint& endpoint) + , service_call_queue_strand_.wrap([me = shared_from_this(), endpoint_sequence, server_list_index](asio::error_code ec, const asio::ip::tcp::endpoint& endpoint) { (void)endpoint; if (ec) { - const std::string message = "Failed to connect to endpoint [" + me->address_ + ":" + std::to_string(me->port_) + "]: " + ec.message(); - me->logger_(LogLevel::Error, message); - me->handle_connection_loss_error(message); + { + // Log an error + const std::string message = "Failed to connect to endpoint [" + me->chosen_endpoint_.first + ":" + std::to_string(me->chosen_endpoint_.second) + "]: " + ec.message(); + me->logger_(LogLevel::Error, message); + } + + // If there are more servers available, try the next one + if (server_list_index + 1 < me->server_list_.size()) + { + me->resolve_endpoint(server_list_index + 1); + } + else + { + std::string message = "Failed to connect to any endpoint: "; + for (size_t j = 0; j < me->server_list_.size(); ++j) + { + message += me->server_list_[j].first + ":" + std::to_string(me->server_list_[j].second); + if (j + 1 < me->server_list_.size()) + { + message += ", "; + } + } + me->logger_(LogLevel::Error, message); + me->handle_connection_loss_error(message); + } return; } else @@ -156,6 +199,12 @@ namespace eCAL } } + { + // Set the chosen endpoint + std::lock_guard chosen_endpoint_lock(me->chosen_endpoint_mutex_); + me->chosen_endpoint_ = me->server_list_[server_list_index]; + } + const std::string message = "Connected to server. Using protocol version 0."; me->logger_(LogLevel::Info, "[" + get_connection_info_string(me->socket_) + "] " + message); @@ -356,12 +405,14 @@ namespace eCAL std::string ClientSessionV0::get_address() const { - return address_; + std::lock_guard chosen_endpoint_lock(chosen_endpoint_mutex_); + return chosen_endpoint_.first; } std::uint16_t ClientSessionV0::get_port() const { - return port_; + std::lock_guard chosen_endpoint_lock(chosen_endpoint_mutex_); + return chosen_endpoint_.second; } State ClientSessionV0::get_state() const diff --git a/ecal/service/ecal_service/src/client_session_impl_v0.h b/ecal/service/ecal_service/src/client_session_impl_v0.h index 96017f1be3..a6309da8a3 100644 --- a/ecal/service/ecal_service/src/client_session_impl_v0.h +++ b/ecal/service/ecal_service/src/client_session_impl_v0.h @@ -20,13 +20,16 @@ #pragma once #include "client_session_impl_base.h" -#include + #include +#include #include #include #include #include +#include +#include namespace eCAL { @@ -50,18 +53,16 @@ namespace eCAL // Constructor, Destructor, Create ///////////////////////////////////// public: - static std::shared_ptr create(const std::shared_ptr& io_context - , const std::string& address - , std::uint16_t port - , const EventCallbackT& event_callback - , const LoggerT& logger = default_logger("Service Client V1")); + static std::shared_ptr create(const std::shared_ptr& io_context + , const std::vector>& server_list + , const EventCallbackT& event_callback + , const LoggerT& logger = default_logger("Service Client V1")); protected: - ClientSessionV0(const std::shared_ptr& io_context - , const std::string& address - , std::uint16_t port - , const EventCallbackT& event_callback - , const LoggerT& logger); + ClientSessionV0(const std::shared_ptr& io_context + , const std::vector>& server_list + , const EventCallbackT& event_callback + , const LoggerT& logger); public: // Delete copy / move constructor and assignment operator @@ -77,8 +78,8 @@ namespace eCAL // Connection establishement ////////////////////////////////////// private: - void resolve_endpoint(); - void connect_to_endpoint(const asio::ip::tcp::resolver::iterator& resolved_endpoints); + void resolve_endpoint(size_t server_list_index); + void connect_to_endpoint(const asio::ip::tcp::resolver::iterator& resolved_endpoints, size_t server_list_index); ////////////////////////////////////// // Service calls @@ -137,8 +138,10 @@ namespace eCAL ////////////////////////////////////// private: - const std::string address_; //!< The original address that this client was created with. - const std::uint16_t port_; //!< The original port that this client was created with. + const std::vector> server_list_; //!< The list of servers that this client was created with. They will be tried in order. + + mutable std::mutex chosen_endpoint_mutex_; //!< Protects the chosen_endpoint_ variable. + std::pair chosen_endpoint_; //!< The endpoint that the client is currently connected to. Protected by chosen_endpoint_mutex_. asio::io_context::strand service_call_queue_strand_; asio::ip::tcp::resolver resolver_; diff --git a/ecal/service/test/src/ecal_tcp_service_test.cpp b/ecal/service/test/src/ecal_tcp_service_test.cpp index 1084bc609b..a44f4ae963 100644 --- a/ecal/service/test/src/ecal_tcp_service_test.cpp +++ b/ecal/service/test/src/ecal_tcp_service_test.cpp @@ -61,7 +61,7 @@ constexpr std::uint8_t max_protocol_version = 1; // TODO: Add test for the new multi-host feature, where the second host is tried if the connection to the first one fails -#if 0 +#if 1 TEST(ecal_service, RAII_TcpServiceServer) // NOLINT { for (std::uint8_t protocol_version = min_protocol_version; protocol_version <= max_protocol_version; protocol_version++) @@ -112,7 +112,7 @@ TEST(ecal_service, RAII_TcpServiceServer) // NOLINT } #endif -#if 0 +#if 1 TEST(ecal_service, RAII_TcpServiceClient) // NOLINT { for (std::uint8_t protocol_version = min_protocol_version; protocol_version <= max_protocol_version; protocol_version++) @@ -143,7 +143,7 @@ TEST(ecal_service, RAII_TcpServiceClient) // NOLINT } #endif -#if 0 +#if 1 TEST(ecal_service, RAII_TcpServiceServerAndClient) // NOLINT { for (std::uint8_t protocol_version = min_protocol_version; protocol_version <= max_protocol_version; protocol_version++) @@ -230,7 +230,7 @@ TEST(ecal_service, RAII_TcpServiceServerAndClient) // NOLINT } #endif -#if 0 +#if 1 TEST(ecal_service, RAII_StopDuringServiceCall) // NOLINT { for (std::uint8_t protocol_version = min_protocol_version; protocol_version <= max_protocol_version; protocol_version++) @@ -309,7 +309,7 @@ TEST(ecal_service, RAII_StopDuringServiceCall) // NOLINT } #endif -#if 0 +#if 1 TEST(ecal_service, Communication_SlowCommunication) // NOLINT { for (std::uint8_t protocol_version = min_protocol_version; protocol_version <= max_protocol_version; protocol_version++) @@ -419,7 +419,7 @@ TEST(ecal_service, Communication_SlowCommunication) // NOLINT } #endif -#if 0 +#if 1 TEST(ecal_service, CallbacksConnectDisconnect_ClientDisconnectsFirst) // NOLINT { for (std::uint8_t protocol_version = min_protocol_version; protocol_version <= max_protocol_version; protocol_version++) @@ -526,7 +526,7 @@ TEST(ecal_service, CallbacksConnectDisconnect_ClientDisconnectsFirst) // NOLINT } #endif -#if 0 +#if 1 TEST(ecal_service, CommunicationAndCallbacks_ClientsDisconnectFirst) // NOLINT { for (std::uint8_t protocol_version = min_protocol_version; protocol_version <= max_protocol_version; protocol_version++) @@ -719,7 +719,7 @@ TEST(ecal_service, CommunicationAndCallbacks_ClientsDisconnectFirst) // NOLINT } #endif -#if 0 +#if 1 TEST(ecal_service, CommunicationAndCallbacks_ServerDisconnectsFirst) // NOLINT { for (std::uint8_t protocol_version = min_protocol_version; protocol_version <= max_protocol_version; protocol_version++) @@ -867,7 +867,7 @@ TEST(ecal_service, CommunicationAndCallbacks_ServerDisconnectsFirst) // NOLINT } #endif -#if 0 +#if 1 TEST(ecal_service, CommunicationAndCallbacks_StressfulCommunication) // NOLINT { for (std::uint8_t protocol_version = min_protocol_version; protocol_version <= max_protocol_version; protocol_version++) @@ -1022,7 +1022,7 @@ TEST(ecal_service, CommunicationAndCallbacks_StressfulCommunication) // NOLINT } #endif -#if 0 +#if 1 TEST(ecal_service, CommunicationAndCallbacks_StressfulCommunicationNoParallelCalls) // NOLINT { for (std::uint8_t protocol_version = min_protocol_version; protocol_version <= max_protocol_version; protocol_version++) @@ -1149,7 +1149,7 @@ TEST(ecal_service, CommunicationAndCallbacks_StressfulCommunicationNoParallelCal } #endif -#if 0 +#if 1 TEST(ecal_service, CommunicationAndCallbacks_StressfulCommunicationMassivePayload) // NOLINT { // This test does not work for Protocol version 0 and there is no way to fix that (which is the reason why we invented protocol version 1) @@ -1306,7 +1306,7 @@ TEST(ecal_service, CommunicationAndCallbacks_StressfulCommunicationMassivePayloa } #endif -#if 0 +#if 1 TEST(ecal_service, Callback_ServerAndClientManagers) // NOLINT { for (std::uint8_t protocol_version = min_protocol_version; protocol_version <= max_protocol_version; protocol_version++) @@ -1441,7 +1441,7 @@ TEST(ecal_service, Callback_ServerAndClientManagers) // NOLINT } #endif -#if 0 +#if 1 TEST(ecal_service, Callback_ServiceCallFromCallback) // NOLINT { for (std::uint8_t protocol_version = min_protocol_version; protocol_version <= max_protocol_version; protocol_version++) @@ -1511,7 +1511,7 @@ TEST(ecal_service, Callback_ServiceCallFromCallback) // NOLINT } #endif -#if 0 +#if 1 TEST(ecal_service, Callback_SerializedServiceCallbacks) // NOLINT { for (std::uint8_t protocol_version = min_protocol_version; protocol_version <= max_protocol_version; protocol_version++) @@ -1605,7 +1605,7 @@ TEST(ecal_service, Callback_SerializedServiceCallbacks) // NOLINT } #endif -#if 0 +#if 1 // Call different eCAL Service API functions from within the callbacks TEST(ecal_service, Callback_ApiCallsFromCallbacks) // NOLINT { @@ -1750,7 +1750,7 @@ TEST(ecal_service, Callback_ApiCallsFromCallbacks) // NOLINT } #endif -#if 2 +#if 1 // Connect to a list of hosts of which the first one does not exist, so the next one is used and connected to TEST(ecal_service, BackupHost) { @@ -1846,7 +1846,7 @@ TEST(ecal_service, BackupHost) } #endif -#if 0 +#if 1 TEST(ecal_service, ErrorCallback_ErrorCallbackNoServer) // NOLINT { for (std::uint8_t protocol_version = min_protocol_version; protocol_version <= max_protocol_version; protocol_version++) @@ -1898,7 +1898,7 @@ TEST(ecal_service, ErrorCallback_ErrorCallbackNoServer) // NOLINT } #endif -#if 0 +#if 1 TEST(ecal_service, ErrorCallback_ErrorCallbackServerHasDisconnected) // NOLINT { for (std::uint8_t protocol_version = min_protocol_version; protocol_version <= max_protocol_version; protocol_version++) @@ -2065,7 +2065,7 @@ TEST(ecal_service, ErrorCallback_ErrorCallbackServerHasDisconnected) // NOLINT } #endif -#if 0 +#if 1 TEST(ecal_service, ErrorCallback_ErrorCallbackClientDisconnects) // NOLINT { for (std::uint8_t protocol_version = min_protocol_version; protocol_version <= max_protocol_version; protocol_version++) @@ -2164,7 +2164,7 @@ TEST(ecal_service, ErrorCallback_ErrorCallbackClientDisconnects) // NOLINT } #endif -#if 0 +#if 1 TEST(ecal_service, ErrorCallback_StressfulErrorsHalfwayThrough) // NOLINT { for (std::uint8_t protocol_version = min_protocol_version; protocol_version <= max_protocol_version; protocol_version++) @@ -2347,7 +2347,7 @@ TEST(ecal_service, ErrorCallback_StressfulErrorsHalfwayThrough) // NOLINT } #endif -#if 0 +#if 1 TEST(ecal_service, ErrorCallback_StressfulErrorsHalfwayThroughWithManagers) // NOLINT { for (std::uint8_t protocol_version = min_protocol_version; protocol_version <= max_protocol_version; protocol_version++) @@ -2529,7 +2529,7 @@ TEST(ecal_service, ErrorCallback_StressfulErrorsHalfwayThroughWithManagers) // N } #endif -#if 0 +#if 1 TEST(ecal_service, BlockingCall_RegularBlockingCall) // NOLINT { for (std::uint8_t protocol_version = min_protocol_version; protocol_version <= max_protocol_version; protocol_version++) @@ -2600,7 +2600,7 @@ TEST(ecal_service, BlockingCall_RegularBlockingCall) // NOLINT } #endif -#if 0 +#if 1 TEST(ecal_service, BlockingCall_BlockingCallWithErrorHalfwayThrough) // NOLINT { for (std::uint8_t protocol_version = min_protocol_version; protocol_version <= max_protocol_version; protocol_version++) @@ -2734,7 +2734,7 @@ TEST(ecal_service, BlockingCall_BlockingCallWithErrorHalfwayThrough) // NOLINT } #endif -#if 0 +#if 1 TEST(ecal_service, BlockingCall_Stopped) // NOLINT // This test shows the proper way to stop everything. I should adapt all other tests, too { for (std::uint8_t protocol_version = min_protocol_version; protocol_version <= max_protocol_version; protocol_version++)