diff --git a/include/fastdds/rtps/network/NetworkFactory.h b/include/fastdds/rtps/network/NetworkFactory.h index cd110475aa1..10c06ef8122 100644 --- a/include/fastdds/rtps/network/NetworkFactory.h +++ b/include/fastdds/rtps/network/NetworkFactory.h @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -216,6 +217,18 @@ class NetworkFactory */ void update_network_interfaces(); + /** + * Remove the given participants from the send resource list + * + * @param send_resource_list List of send resources associated to the local participant. + * @param remote_participant_locators List of locators associated to the remote participant. + * @param participant_initial_peers List of locators of the initial peers of the local participant. + */ + void remove_participant_associated_send_resources( + fastdds::rtps::SendResourceList& send_resource_list, + const LocatorList_t& remote_participant_locators, + const LocatorList_t& participant_initial_peers) const; + private: std::vector> mRegisteredTransports; diff --git a/src/cpp/rtps/builtin/discovery/participant/PDP.cpp b/src/cpp/rtps/builtin/discovery/participant/PDP.cpp index d4a9f7a544d..02debbd6bd6 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDP.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDP.cpp @@ -44,6 +44,8 @@ #include #include +#include + #include #include #include "fastrtps/utils/shared_mutex.hpp" @@ -1088,6 +1090,21 @@ bool PDP::remove_remote_participant( this->mp_mutex->lock(); + // Delete from sender resource list (TCP only) + LocatorList_t remote_participant_locators; + for (auto& remote_participant_default_locator : pdata->default_locators.unicast) + { + remote_participant_locators.push_back(remote_participant_default_locator); + } + for (auto& remote_participant_metatraffic_locator : pdata->metatraffic_locators.unicast) + { + remote_participant_locators.push_back(remote_participant_metatraffic_locator); + } + if (!remote_participant_locators.empty()) + { + mp_RTPSParticipant->update_removed_participant(remote_participant_locators); + } + // Return reader proxy objects to pool for (auto pit : *pdata->m_readers) { @@ -1115,6 +1132,7 @@ bool PDP::remove_remote_participant( participant_proxies_pool_.push_back(pdata); this->mp_mutex->unlock(); + return true; } diff --git a/src/cpp/rtps/network/NetworkFactory.cpp b/src/cpp/rtps/network/NetworkFactory.cpp index a35a0f2ccd7..3524aeaa678 100644 --- a/src/cpp/rtps/network/NetworkFactory.cpp +++ b/src/cpp/rtps/network/NetworkFactory.cpp @@ -18,12 +18,13 @@ #include #include +#include #include #include #include #include -#include +#include using namespace std; using namespace eprosima::fastdds::rtps; @@ -440,6 +441,24 @@ void NetworkFactory::update_network_interfaces() } } +void NetworkFactory::remove_participant_associated_send_resources( + SendResourceList& send_resource_list, + const LocatorList_t& remote_participant_locators, + const LocatorList_t& participant_initial_peers) const +{ + for (auto& transport : mRegisteredTransports) + { + TCPTransportInterface* tcp_transport = dynamic_cast(transport.get()); + if (tcp_transport) + { + tcp_transport->CloseOutputChannel( + send_resource_list, + remote_participant_locators, + participant_initial_peers); + } + } +} + } // namespace rtps } // namespace fastrtps } // namespace eprosima diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index e8377321ea1..21dba7ca80c 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -50,6 +50,8 @@ #include #include +#include + #include #include #include @@ -2680,6 +2682,19 @@ bool RTPSParticipantImpl::unregister_in_reader( #endif // FASTDDS_STATISTICS +void RTPSParticipantImpl::update_removed_participant( + const LocatorList_t& remote_participant_locators) +{ + if (!remote_participant_locators.empty()) + { + std::lock_guard guard(m_send_resources_mutex_); + m_network_Factory.remove_participant_associated_send_resources( + send_resource_list_, + remote_participant_locators, + m_att.builtin.initialPeersList); + } +} + } /* namespace rtps */ } /* namespace fastrtps */ } /* namespace eprosima */ diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h index 3ce0300a8eb..5a06b0a7139 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h @@ -49,6 +49,8 @@ #include #include +#include +#include #include #include @@ -1067,6 +1069,14 @@ class RTPSParticipantImpl #endif // FASTDDS_STATISTICS + /** + * Method called on participant removal with the set of locators associated to the participant. + * + * @param remote_participant_locators Set of locators associated to the participant removed. + */ + void update_removed_participant( + const LocatorList_t& remote_participant_locators); + }; } // namespace rtps } /* namespace rtps */ diff --git a/src/cpp/rtps/transport/TCPSenderResource.hpp b/src/cpp/rtps/transport/TCPSenderResource.hpp index 9eefdefd2f1..b236872976a 100644 --- a/src/cpp/rtps/transport/TCPSenderResource.hpp +++ b/src/cpp/rtps/transport/TCPSenderResource.hpp @@ -39,7 +39,7 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource // Implementation functions are bound to the right transport parameters clean_up = [this, &transport]() { - transport.CloseOutputChannel(locator_); + transport.SenderResourceHasBeenClosed(locator_); }; send_lambda_ = [this, &transport]( @@ -68,7 +68,7 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource } static TCPSenderResource* cast( - TransportInterface& transport, + const TransportInterface& transport, SenderResource* sender_resource) { TCPSenderResource* returned_resource = nullptr; diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index 74ca287eee1..ed1bd06be19 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -14,6 +14,7 @@ #include +#include #include #include #include @@ -568,11 +569,24 @@ bool TCPTransportInterface::transform_remote_locator( return true; } -void TCPTransportInterface::CloseOutputChannel( +void TCPTransportInterface::SenderResourceHasBeenClosed( fastrtps::rtps::Locator_t& locator) { - locator.set_Invalid_Address(); - locator.port = 0; + // The TCPSendResource associated channel cannot be removed from the channel_resources_ map. On transport's destruction + // this map is consulted to send the unbind requests. If not sending it, the other participant wouldn't disconnect the + // socket and keep a connection status of eEstablished. This would prevent new connect calls since it thinks it's already + // connected. + // If moving this unbind send with the respective channel disconnection to this point, the following problem arises: + // If receiving a SenderResourceHasBeenClosed call after receiving an unbinding message from a remote participant (our participant + // isn't disconnecting but we want to erase this send resource), the channel cannot be disconnected here since the listening thread has + // taken the read mutex (permanently waiting at read asio layer). This mutex is also needed to disconnect the socket (deadlock). + // Socket disconnection should always be done in the listening thread (or in the transport cleanup, when receiver resources have + // already been destroyed and the listening thread had consequently finished). + // An assert() clause finding the respective channel resource cannot be made since in LARGE DATA scenario, where the PDP discovery is done + // via UDP, a server's send resource can be created with without any associated channel resource until receiving a connection request from + // the client. + // The send resource locator is invalidated to prevent further use of associated channel. + LOCATOR_INVALID(locator); } bool TCPTransportInterface::CloseInputChannel( @@ -1133,7 +1147,6 @@ bool TCPTransportInterface::Receive( { std::shared_ptr rtcp_message_manager; if (TCPChannelResource::eConnectionStatus::eDisconnected != channel->connection_status()) - { std::unique_lock lock(rtcp_message_manager_mutex_); rtcp_message_manager = rtcp_manager.lock(); @@ -1807,6 +1820,60 @@ void TCPTransportInterface::send_channel_pending_logical_ports( } } +void TCPTransportInterface::CloseOutputChannel( + SendResourceList& send_resource_list, + const LocatorList& remote_participant_locators, + const LocatorList& participant_initial_peers) const +{ + // Since send resources handle physical locators, we need to convert the remote participant locators to physical + std::set remote_participant_physical_locators; + for (const Locator& remote_participant_locator : remote_participant_locators) + { + remote_participant_physical_locators.insert(IPLocator::toPhysicalLocator(remote_participant_locator)); + + // Also add the WANtoLANLocator ([0][WAN] address) if the remote locator is a WAN locator. In WAN scenario, + //initial peer can also work with the WANtoLANLocator of the remote participant. + if (IPLocator::hasWan(remote_participant_locator)) + { + remote_participant_physical_locators.insert(IPLocator::toPhysicalLocator(IPLocator::WanToLanLocator( + remote_participant_locator))); + } + } + + // Exlude initial peers. + for (const auto& initial_peer : participant_initial_peers) + { + if (std::find(remote_participant_physical_locators.begin(), remote_participant_physical_locators.end(), + IPLocator::toPhysicalLocator(initial_peer)) != remote_participant_physical_locators.end()) + { + remote_participant_physical_locators.erase(IPLocator::toPhysicalLocator(initial_peer)); + } + } + + for (const auto& remote_participant_physical_locator : remote_participant_physical_locators) + { + if (!IsLocatorSupported(remote_participant_physical_locator)) + { + continue; + } + // Remove send resources for the associated remote participant locator + for (auto it = send_resource_list.begin(); it != send_resource_list.end();) + { + TCPSenderResource* tcp_sender_resource = TCPSenderResource::cast(*this, it->get()); + + if (tcp_sender_resource) + { + if (tcp_sender_resource->locator() == remote_participant_physical_locator) + { + it = send_resource_list.erase(it); + continue; + } + } + ++it; + } + } +} + } // namespace rtps } // namespace fastrtps } // namespace eprosima diff --git a/src/cpp/rtps/transport/TCPTransportInterface.h b/src/cpp/rtps/transport/TCPTransportInterface.h index cc09d73d557..8e23f55c1a4 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.h +++ b/src/cpp/rtps/transport/TCPTransportInterface.h @@ -235,7 +235,7 @@ class TCPTransportInterface : public TransportInterface const Locator&) override; //! Resets the locator bound to the sender resource. - void CloseOutputChannel( + void SenderResourceHasBeenClosed( fastrtps::rtps::Locator_t& locator); //! Reports whether Locators correspond to the same port. @@ -472,6 +472,18 @@ class TCPTransportInterface : public TransportInterface */ void send_channel_pending_logical_ports( std::shared_ptr& channel); + /** + * Close the output channel associated to the given remote participant but if its locators belong to the + * given list of initial peers. + * + * @param send_resource_list List of send resources associated to the local participant. + * @param remote_participant_locators Set of locators associated to the remote participant. + * @param participant_initial_peers List of locators associated to the initial peers of the local participant. + */ + void CloseOutputChannel( + SendResourceList& send_resource_list, + const LocatorList& remote_participant_locators, + const LocatorList& participant_initial_peers) const; }; } // namespace rtps diff --git a/src/cpp/rtps/transport/UDPSenderResource.hpp b/src/cpp/rtps/transport/UDPSenderResource.hpp index 6ed793c3f09..dd8c8332ceb 100644 --- a/src/cpp/rtps/transport/UDPSenderResource.hpp +++ b/src/cpp/rtps/transport/UDPSenderResource.hpp @@ -43,7 +43,7 @@ class UDPSenderResource : public fastrtps::rtps::SenderResource // Implementation functions are bound to the right transport parameters clean_up = [this, &transport]() { - transport.CloseOutputChannel(socket_); + transport.SenderResourceHasBeenClosed(socket_); }; send_lambda_ = [this, &transport]( diff --git a/src/cpp/rtps/transport/UDPTransportInterface.cpp b/src/cpp/rtps/transport/UDPTransportInterface.cpp index 0538d3c07ff..08742d823e8 100644 --- a/src/cpp/rtps/transport/UDPTransportInterface.cpp +++ b/src/cpp/rtps/transport/UDPTransportInterface.cpp @@ -102,7 +102,7 @@ bool UDPTransportInterface::CloseInputChannel( return true; } -void UDPTransportInterface::CloseOutputChannel( +void UDPTransportInterface::SenderResourceHasBeenClosed( eProsimaUDPSocket& socket) { socket.cancel(); diff --git a/src/cpp/rtps/transport/UDPTransportInterface.h b/src/cpp/rtps/transport/UDPTransportInterface.h index 48cfd68a89c..7ab22b61291 100644 --- a/src/cpp/rtps/transport/UDPTransportInterface.h +++ b/src/cpp/rtps/transport/UDPTransportInterface.h @@ -49,7 +49,7 @@ class UDPTransportInterface : public TransportInterface const Locator&) override; //! Removes all outbound sockets on the given port. - void CloseOutputChannel( + void SenderResourceHasBeenClosed( eProsimaUDPSocket& socket); //! Reports whether Locators correspond to the same port. diff --git a/test/blackbox/common/BlackboxTestsTransportTCP.cpp b/test/blackbox/common/BlackboxTestsTransportTCP.cpp index 50709b4176b..885f701b95f 100644 --- a/test/blackbox/common/BlackboxTestsTransportTCP.cpp +++ b/test/blackbox/common/BlackboxTestsTransportTCP.cpp @@ -28,6 +28,8 @@ #include #include +#include "DatagramInjectionTransport.hpp" + using namespace eprosima::fastrtps; using namespace eprosima::fastrtps::rtps; @@ -879,6 +881,313 @@ TEST_P(TransportTCP, large_message_large_data_send_receive) reader.block_for_all(); } +// Test TCP send resource cleaning. This test matches a server with a client and then releases the +// client resources. After PDP unbind message, the server removes the client +// from the send resource list. +TEST_P(TransportTCP, send_resource_cleanup) +{ + +#if defined(__APPLE__) + if (use_ipv6) + { + GTEST_SKIP() << "macOS TCPv6 transport skipped"; + return; + } +#endif // if defined(__APPLE__) + + eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Warning); + + using eprosima::fastdds::rtps::DatagramInjectionTransportDescriptor; + + std::unique_ptr> client(new PubSubWriter(TEST_TOPIC_NAME)); + std::unique_ptr> udp_participant(new PubSubWriter( + TEST_TOPIC_NAME)); + std::unique_ptr> server(new PubSubReader(TEST_TOPIC_NAME)); + + // Server + // Create a server with two transports, one of which uses a DatagramInjectionTransportDescriptor + // which heritates from ChainingTransportDescriptor. The low level transport of this chaining transport will be UDP. + // This will allow us to get send_resource_list_ from the server participant when UDP transport gets its OpenOutputChannel() + // method called. This should happen after TCP transports connection is established. We can then see how many TCP send + // resources exist. + // For the cleanup test we follow that same procedure. Firstly we destroy both participants and then instantiate a new + // UDP participant. The send resource list will get updated with no TCP send resource. + // __________________________________________________________ _____________________ + // | Server | | Client | + // | | | | + // | SendResourceList | | | + // | | | | | + // | Empty | | | + // | | | | | + // | | - TCPv4 init() | | | + // | | | | | + // | | - ChainingTransport(UDP) init() | | | + // | | | | | + // | 1 TCP <------------------------------------------------- TCPv4 init() | + // | | | | | + // | 1 TCP + 1 UDP <------------------------------------------------- UDPv4 init() | + // | | | | | + // | | - ChainingTransport-> | | | + // | TCP SendResources == 1 get_send_resource_list() | | | + // | | | | | + // | Empty <-------------------------------------------------- clean transports | + // | | | | | + // | 1 UDP - ChainingTransport(UDP) <------------------------ UDPv4 init() | + // | | | | | + // | | - ChainingTransport-> | | | + // | TCP SendResources == 0 get_send_resource_list() | | | + // |__________________________________________________________| |_____________________| + // + uint16_t server_port = 10000; + test_transport_->add_listener_port(server_port); + auto low_level_transport = std::make_shared(); + auto server_chaining_transport = std::make_shared(low_level_transport); + server->disable_builtin_transport().add_user_transport_to_pparams(test_transport_).add_user_transport_to_pparams( + server_chaining_transport).init(); + ASSERT_TRUE(server->isInitialized()); + + // Client + auto initialize_client = [&](PubSubWriter* client) + { + std::shared_ptr client_transport; + Locator_t initialPeerLocator; + if (use_ipv6) + { + client_transport = std::make_shared(); + initialPeerLocator.kind = LOCATOR_KIND_TCPv6; + IPLocator::setIPv6(initialPeerLocator, "::1"); + } + else + { + client_transport = std::make_shared(); + initialPeerLocator.kind = LOCATOR_KIND_TCPv4; + IPLocator::setIPv4(initialPeerLocator, 127, 0, 0, 1); + } + client->disable_builtin_transport().add_user_transport_to_pparams(client_transport); + initialPeerLocator.port = server_port; + LocatorList_t initial_peer_list; + initial_peer_list.push_back(initialPeerLocator); + client->initial_peers(initial_peer_list); + client->init(); + }; + auto initialize_udp_participant = [&](PubSubWriter* udp_participant) + { + auto udp_participant_transport = std::make_shared(); + udp_participant->disable_builtin_transport().add_user_transport_to_pparams(udp_participant_transport); + udp_participant->init(); + }; + initialize_client(client.get()); + ASSERT_TRUE(client->isInitialized()); + + // Wait for discovery. OpenOutputChannel() is called. We create a udp participant after to guarantee + // that the TCP participants have been mutually discovered when OpenOutputChannel() is called. + server->wait_discovery(std::chrono::seconds(0), 1); + client->wait_discovery(1, std::chrono::seconds(0)); + + initialize_udp_participant(udp_participant.get()); + ASSERT_TRUE(udp_participant->isInitialized()); + server->wait_discovery(std::chrono::seconds(0), 2); + udp_participant->wait_discovery(1, std::chrono::seconds(0)); + + // We can only update the senders when OpenOutputChannel() is called. If the send resource + // is deleted later, senders obtained from get_send_resource_list() won't have changed. + auto send_resource_list = server_chaining_transport->get_send_resource_list(); + auto tcp_send_resources = [](const std::set& send_resource_list) -> size_t + { + size_t tcp_send_resources = 0; + for (auto& sender_resource : send_resource_list) + { + if (sender_resource->kind() == LOCATOR_KIND_TCPv4 || sender_resource->kind() == LOCATOR_KIND_TCPv6) + { + tcp_send_resources++; + } + } + return tcp_send_resources; + }; + EXPECT_EQ(tcp_send_resources(send_resource_list), 1); + + // Release TCP client resources. + client.reset(); + udp_participant.reset(); + + // Wait for undiscovery. + server->wait_writer_undiscovery(); + + // Create new udp client. + udp_participant.reset(new PubSubWriter(TEST_TOPIC_NAME)); + + // Wait for discovery. OpenOutputChannel() is called and we can update the senders. + initialize_udp_participant(udp_participant.get()); + ASSERT_TRUE(udp_participant->isInitialized()); + server->wait_discovery(std::chrono::seconds(0), 1); + udp_participant->wait_discovery(1, std::chrono::seconds(0)); + + // Check that the send_resource_list has size 0. This means that the send resource + // for the client has been removed. + send_resource_list = server_chaining_transport->get_send_resource_list(); + EXPECT_EQ(tcp_send_resources(send_resource_list), 0); + send_resource_list.clear(); +} + +// Test TCP send resource cleaning. In this case, since the send resource has been created from an initial_peer, +// the send resource should not be removed. +TEST_P(TransportTCP, send_resource_cleanup_initial_peer) +{ +#if defined(__APPLE__) + if (use_ipv6) + { + GTEST_SKIP() << "macOS TCPv6 transport skipped"; + return; + } +#endif // if defined(__APPLE__) + + eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Warning); + + using eprosima::fastdds::rtps::DatagramInjectionTransportDescriptor; + + std::unique_ptr> client(new PubSubWriter(TEST_TOPIC_NAME)); + std::unique_ptr> udp_participant(new PubSubReader( + TEST_TOPIC_NAME)); + std::unique_ptr> server(new PubSubReader(TEST_TOPIC_NAME)); + + // Client + // Create a client with two transports, one of which uses a DatagramInjectionTransportDescriptor + // which heritates from ChainingTransportDescriptor. This will allow us to get send_resource_list_ + // from the client participant when its transport gets its OpenOutputChannel() method called. + + // __________________________________________________________ _____________________ + // | Server | | Client | + // | | | | + // | SendResourceList | | | + // | | | | | + // | Empty | | | + // | | | | | + // | | - TCPv4 init() | | | + // | | | | | + // | | - ChainingTransport(UDP) init() | | | + // | | | | | + // | 1 TCP <------------------------------------------------- TCPv4 init() | + // | | | | | + // | 1 TCP + 1 UDP <------------------------------------------------- UDPv4 init() | + // | | | | | + // | | - ChainingTransport-> | | | + // | TCP SendResources == 1 get_send_resource_list() | | | + // | | | | | + // | 1 TCP (initial peer) <-------------------------------------------------- clean transports | + // | | | | | + // | 1 TCP + 1 UDP - ChainingTransport(UDP) <------------------------ UDPv4 init() | + // | | | | | + // | | - ChainingTransport-> | | | + // | TCP SendResources == 1 get_send_resource_list() | | | + // | (initial peer) | | | + // |__________________________________________________________| |_____________________| + // + + uint16_t server_port = 10000; + LocatorList_t initial_peer_list; + Locator_t initialPeerLocator; + if (use_ipv6) + { + initialPeerLocator.kind = LOCATOR_KIND_TCPv6; + IPLocator::setIPv6(initialPeerLocator, "::1"); + } + else + { + initialPeerLocator.kind = LOCATOR_KIND_TCPv4; + IPLocator::setIPv4(initialPeerLocator, 127, 0, 0, 1); + } + initialPeerLocator.port = server_port; + initial_peer_list.push_back(initialPeerLocator); + client->initial_peers(initial_peer_list); + + auto low_level_transport = std::make_shared(); + auto client_chaining_transport = std::make_shared(low_level_transport); + client->disable_builtin_transport().add_user_transport_to_pparams(test_transport_).add_user_transport_to_pparams( + client_chaining_transport).init(); + ASSERT_TRUE(client->isInitialized()); + + // Server + auto initialize_server = [&](PubSubReader* server) + { + std::shared_ptr server_transport; + if (use_ipv6) + { + server_transport = std::make_shared(); + } + else + { + server_transport = std::make_shared(); + } + server_transport->add_listener_port(server_port); + server->disable_builtin_transport().add_user_transport_to_pparams(server_transport); + server->init(); + }; + auto initialize_udp_participant = [&](PubSubReader* udp_participant) + { + auto udp_participant_transport = std::make_shared(); + udp_participant->disable_builtin_transport().add_user_transport_to_pparams(udp_participant_transport); + udp_participant->init(); + }; + initialize_server(server.get()); + ASSERT_TRUE(server->isInitialized()); + + // Wait for discovery. OpenOutputChannel() is called. We create a udp participant after to guarantee + // that the TCP participants have been mutually discovered when OpenOutputChannel() is called. + client->wait_discovery(1, std::chrono::seconds(0)); + server->wait_discovery(std::chrono::seconds(0), 1); + + initialize_udp_participant(udp_participant.get()); + ASSERT_TRUE(udp_participant->isInitialized()); + client->wait_discovery(2, std::chrono::seconds(0)); + udp_participant->wait_discovery(std::chrono::seconds(0), 1); + + // We can only update the senders when OpenOutputChannel() is called. If the send resource + // is deleted later, senders obtained from get_send_resource_list() won't have changed. + auto send_resource_list = client_chaining_transport->get_send_resource_list(); + auto tcp_send_resources = [](const std::set& send_resource_list) -> size_t + { + size_t tcp_send_resources = 0; + for (auto& sender_resource : send_resource_list) + { + if (sender_resource->kind() == LOCATOR_KIND_TCPv4 || sender_resource->kind() == LOCATOR_KIND_TCPv6) + { + tcp_send_resources++; + } + } + return tcp_send_resources; + }; + EXPECT_EQ(tcp_send_resources(send_resource_list), 1); + + // Release TCP client resources. + server.reset(); + udp_participant.reset(); + + // Wait for undiscovery. + client->wait_reader_undiscovery(); + + // Create new client instances. + udp_participant.reset(new PubSubReader(TEST_TOPIC_NAME)); + + // Wait for discovery. OpenOutputChannel() is called and we can update the senders. + initialize_udp_participant(udp_participant.get()); + ASSERT_TRUE(udp_participant->isInitialized()); + client->wait_discovery(1, std::chrono::seconds(0)); + udp_participant->wait_discovery(std::chrono::seconds(0), 1); + + // Check that the send_resource_list has size 1. This means that the send resource + // for the first client hasn't been removed because it was created from an initial_peer. + send_resource_list = client_chaining_transport->get_send_resource_list(); + EXPECT_EQ(tcp_send_resources(send_resource_list), 1); + send_resource_list.clear(); + + // If relaunching the server, the client should connect again. + server.reset(new PubSubReader(TEST_TOPIC_NAME)); + initialize_server(server.get()); + ASSERT_TRUE(server->isInitialized()); + server->wait_discovery(std::chrono::seconds(0), 1); + client->wait_discovery(2, std::chrono::seconds(0)); +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else diff --git a/test/blackbox/common/DatagramInjectionTransport.cpp b/test/blackbox/common/DatagramInjectionTransport.cpp index a4b74f43b3f..3b1259a0de0 100644 --- a/test/blackbox/common/DatagramInjectionTransport.cpp +++ b/test/blackbox/common/DatagramInjectionTransport.cpp @@ -42,6 +42,24 @@ std::set DatagramInjectionTransportDescriptor::get_ return receivers_; } +void DatagramInjectionTransportDescriptor::update_send_resource_list( + const SendResourceList& send_resource_list) +{ + std::lock_guard guard(mtx_); + + send_resource_list_.clear(); + for (const auto& resource : send_resource_list) + { + send_resource_list_.insert(resource.get()); + } +} + +std::set DatagramInjectionTransportDescriptor::get_send_resource_list() +{ + std::lock_guard guard(mtx_); + return send_resource_list_; +} + } // namespace rtps } // namespace fastdds } // namespace eprosima diff --git a/test/blackbox/common/DatagramInjectionTransport.hpp b/test/blackbox/common/DatagramInjectionTransport.hpp index 3cfff45d6c9..98c149d97fa 100644 --- a/test/blackbox/common/DatagramInjectionTransport.hpp +++ b/test/blackbox/common/DatagramInjectionTransport.hpp @@ -17,6 +17,9 @@ #include #include +#include + +using SenderResource = eprosima::fastrtps::rtps::SenderResource; namespace eprosima { namespace fastdds { @@ -37,10 +40,16 @@ class DatagramInjectionTransportDescriptor : public ChainingTransportDescriptor std::set get_receivers(); + void update_send_resource_list( + const SendResourceList& send_resource_list); + + std::set get_send_resource_list(); + private: std::mutex mtx_; std::set receivers_; + std::set send_resource_list_; }; class DatagramInjectionTransport : public ChainingTransport @@ -60,23 +69,25 @@ class DatagramInjectionTransport : public ChainingTransport } bool send( - eprosima::fastrtps::rtps::SenderResource* /*low_sender_resource*/, - const eprosima::fastrtps::rtps::octet* /*send_buffer*/, - uint32_t /*send_buffer_size*/, - eprosima::fastrtps::rtps::LocatorsIterator* /*destination_locators_begin*/, - eprosima::fastrtps::rtps::LocatorsIterator* /*destination_locators_end*/, - const std::chrono::steady_clock::time_point& /*timeout*/) override + eprosima::fastrtps::rtps::SenderResource* low_sender_resource, + const eprosima::fastrtps::rtps::octet* send_buffer, + uint32_t send_buffer_size, + eprosima::fastrtps::rtps::LocatorsIterator* destination_locators_begin, + eprosima::fastrtps::rtps::LocatorsIterator* destination_locators_end, + const std::chrono::steady_clock::time_point& timeout) override { - return true; + return low_sender_resource->send(send_buffer, send_buffer_size, destination_locators_begin, + destination_locators_end, timeout); } void receive( - TransportReceiverInterface* /*next_receiver*/, - const eprosima::fastrtps::rtps::octet* /*receive_buffer*/, - uint32_t /*receive_buffer_size*/, - const eprosima::fastrtps::rtps::Locator_t& /*local_locator*/, - const eprosima::fastrtps::rtps::Locator_t& /*remote_locator*/) override + TransportReceiverInterface* next_receiver, + const eprosima::fastrtps::rtps::octet* receive_buffer, + uint32_t receive_buffer_size, + const eprosima::fastrtps::rtps::Locator_t& local_locator, + const eprosima::fastrtps::rtps::Locator_t& remote_locator) override { + next_receiver->OnDataReceived(receive_buffer, receive_buffer_size, local_locator, remote_locator); } bool OpenInputChannel( @@ -92,6 +103,15 @@ class DatagramInjectionTransport : public ChainingTransport return ret_val; } + bool OpenOutputChannel( + SendResourceList& send_resource_list, + const Locator& loc) override + { + bool ret_val = ChainingTransport::OpenOutputChannel(send_resource_list, loc); + parent_->update_send_resource_list(send_resource_list); + return ret_val; + } + private: DatagramInjectionTransportDescriptor* parent_ = nullptr; diff --git a/test/mock/rtps/RTPSParticipantImpl/rtps/participant/RTPSParticipantImpl.h b/test/mock/rtps/RTPSParticipantImpl/rtps/participant/RTPSParticipantImpl.h index 23580d12ee0..1b8fa85ad10 100644 --- a/test/mock/rtps/RTPSParticipantImpl/rtps/participant/RTPSParticipantImpl.h +++ b/test/mock/rtps/RTPSParticipantImpl/rtps/participant/RTPSParticipantImpl.h @@ -26,6 +26,7 @@ #include #include #include +#include #if HAVE_SECURITY #include @@ -309,6 +310,8 @@ class RTPSParticipantImpl MOCK_METHOD(bool, ignore_participant, (const GuidPrefix_t&)); + MOCK_METHOD(bool, update_removed_participant, (rtps::LocatorList_t&)); + private: MockParticipantListener listener_; diff --git a/test/unittest/rtps/reader/CMakeLists.txt b/test/unittest/rtps/reader/CMakeLists.txt index 6ed21524fea..0ce96746967 100644 --- a/test/unittest/rtps/reader/CMakeLists.txt +++ b/test/unittest/rtps/reader/CMakeLists.txt @@ -64,6 +64,7 @@ set(WRITERPROXYACKNACKTESTS_SOURCE WriterProxyAcknackTests.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/resources/TimedEventImpl.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/flowcontrol/ThroughputControllerDescriptor.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/SystemInfo.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp ) if(WIN32) @@ -72,11 +73,14 @@ endif() add_executable(WriterProxyAcknackTests ${WRITERPROXYACKNACKTESTS_SOURCE}) target_compile_definitions(WriterProxyAcknackTests PRIVATE + BOOST_ASIO_STANDALONE + ASIO_STANDALONE $<$>,$>:__DEBUG> $<$:__INTERNALDEBUG> # Internal debug activated. RETURN_VALID_PARTICIPANT ) target_include_directories(WriterProxyAcknackTests PRIVATE + ${Asio_INCLUDE_DIR} ${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader ${PROJECT_SOURCE_DIR}/test/mock/rtps/Endpoint ${PROJECT_SOURCE_DIR}/test/mock/rtps/ExternalLocatorsProcessor @@ -93,5 +97,6 @@ target_include_directories(WriterProxyAcknackTests PRIVATE ) target_link_libraries(WriterProxyAcknackTests foonathan_memory GTest::gmock - ${CMAKE_DL_LIBS}) + ${CMAKE_DL_LIBS} + ${THIRDPARTY_BOOST_LINK_LIBS}) add_gtest(WriterProxyAcknackTests SOURCES ${WRITERPROXYACKNACKTESTS_SOURCE}) diff --git a/test/unittest/rtps/security/CMakeLists.txt b/test/unittest/rtps/security/CMakeLists.txt index 9c222174dd3..2fa2d398c9c 100644 --- a/test/unittest/rtps/security/CMakeLists.txt +++ b/test/unittest/rtps/security/CMakeLists.txt @@ -39,6 +39,7 @@ set(SOURCES_SECURITY_TEST_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/rtps/security/SecurityManager.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/security/exceptions/SecurityException.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/TimedConditionVariable.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp ${PROJECT_SOURCE_DIR}/src/cpp/security/cryptography/AESGCMGMAC_Types.cpp ${PROJECT_SOURCE_DIR}/test/mock/rtps/SecurityPluginFactory/rtps/security/SecurityPluginFactory.cpp ) diff --git a/test/unittest/transport/CMakeLists.txt b/test/unittest/transport/CMakeLists.txt index 457203ef3bb..66aae99ee35 100644 --- a/test/unittest/transport/CMakeLists.txt +++ b/test/unittest/transport/CMakeLists.txt @@ -46,6 +46,24 @@ if(TLS_FOUND) # ${CMAKE_CURRENT_BINARY_DIR}/permissions_helloworld.smime COPYONLY) endif() +set(TCPTransportInterface_SOURCE + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/messages/RTPSMessageCreator.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/tcp/TCPControlMessage.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPAcceptor.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPAcceptorBasic.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPChannelResource.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPTransportInterface.cpp + ) +if(TLS_FOUND) + set(TCPTransportInterface_SOURCE + ${TCPTransportInterface_SOURCE} + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPAcceptorSecure.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPChannelResourceSecure.cpp + ) +endif() + set(UDPV4TESTS_SOURCE UDPv4Tests.cpp mock/MockReceiverResource.cpp @@ -61,6 +79,7 @@ set(UDPV4TESTS_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/ChannelResource.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPChannelResource.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/network/NetworkFactory.cpp + ${TCPTransportInterface_SOURCE} ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/common/Time_t.cpp ) @@ -81,6 +100,7 @@ set(UDPV6TESTS_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPTransportInterface.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPv4Transport.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPv6Transport.cpp + ${TCPTransportInterface_SOURCE} ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPFinder.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp ) @@ -111,9 +131,6 @@ set(TCPV4TESTS_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPv4Transport.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/tcp/TCPControlMessage.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPChannelResource.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPTransportInterface.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPv4Transport.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPFinder.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/md5.cpp @@ -154,9 +171,6 @@ set(TCPV6TESTS_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPv6Transport.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/tcp/TCPControlMessage.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPChannelResource.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPTransportInterface.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPv4Transport.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPFinder.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/md5.cpp @@ -172,29 +186,6 @@ if(TLS_FOUND) ) endif() -set(TEST_UDPV4TESTS_SOURCE - test_UDPv4Tests.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/messages/RTPSMessageCreator.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/Log.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/OStreamConsumer.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/StdoutConsumer.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/StdoutErrConsumer.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/policy/ParameterList.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPFinder.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/attributes/PropertyPolicy.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/flowcontrol/ThroughputControllerDescriptor.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/test_UDPv4Transport.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPChannelResource.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPTransportInterface.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPv4Transport.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/ChannelResource.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPChannelResource.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/network/NetworkFactory.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/utils/md5.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/common/Time_t.cpp - ) - set(SHAREDMEMTESTS_SOURCE SharedMemTests.cpp mock/MockReceiverResource.cpp @@ -208,10 +199,8 @@ set(SHAREDMEMTESTS_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/rtps/network/NetworkFactory.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/shared_mem/SharedMemTransportDescriptor.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPChannelResource.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPTransportInterface.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPv4Transport.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/ChannelResource.cpp + ${TCPTransportInterface_SOURCE} ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPFinder.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/md5.cpp @@ -233,7 +222,11 @@ target_include_directories(UDPv4Tests PRIVATE ${PROJECT_SOURCE_DIR}/include ${PROJECT_BINARY_DIR}/include ${PROJECT_SOURCE_DIR}/src/cpp ) -target_link_libraries(UDPv4Tests GTest::gtest ${MOCKS}) +target_link_libraries(UDPv4Tests + fastcdr + GTest::gtest + ${MOCKS} + $<$:OpenSSL::SSL$OpenSSL::Crypto>) if(MSVC OR MSVC_IDE) target_link_libraries(UDPv4Tests ${PRIVACY} iphlpapi Shlwapi ) endif() @@ -257,7 +250,12 @@ if(NOT DISABLE_UDPV6_TESTS) ${PROJECT_SOURCE_DIR}/include ${PROJECT_BINARY_DIR}/include ${PROJECT_SOURCE_DIR}/src/cpp ) - target_link_libraries(UDPv6Tests GTest::gtest ${MOCKS}) + target_link_libraries(UDPv6Tests + fastcdr + GTest::gtest + ${MOCKS} + $<$:OpenSSL::SSL$OpenSSL::Crypto>) + if(MSVC OR MSVC_IDE) target_link_libraries(UDPv6Tests ${PRIVACY} iphlpapi Shlwapi ) endif() @@ -291,31 +289,6 @@ if(NOT DISABLE_UDPV6_TESTS) set(TRANSPORT_XFAIL_LIST ${TRANSPORT_XFAIL_LIST} XFAIL_TCP6) endif() -add_executable(test_UDPv4Tests ${TEST_UDPV4TESTS_SOURCE}) -target_compile_definitions(test_UDPv4Tests PRIVATE - BOOST_ASIO_STANDALONE - ASIO_STANDALONE - $<$>,$>:__DEBUG> - $<$:__INTERNALDEBUG> # Internal debug activated. - ) -target_include_directories(test_UDPv4Tests PRIVATE - ${Asio_INCLUDE_DIR} - ${PROJECT_SOURCE_DIR}/test/mock/rtps/ParticipantProxyData - ${PROJECT_SOURCE_DIR}/test/mock/dds/QosPolicies - ${PROJECT_SOURCE_DIR}/test/mock/rtps/MessageReceiver - ${PROJECT_SOURCE_DIR}/test/mock/rtps/ReceiverResource - ${PROJECT_SOURCE_DIR}/include ${PROJECT_BINARY_DIR}/include - ${PROJECT_SOURCE_DIR}/src/cpp - ) -target_link_libraries(test_UDPv4Tests GTest::gtest ${MOCKS}) -if(MSVC OR MSVC_IDE) - target_link_libraries(test_UDPv4Tests ${PRIVACY} iphlpapi Shlwapi) -else() - target_link_libraries(test_UDPv4Tests ${PRIVACY}) -endif() -add_gtest(test_UDPv4Tests SOURCES ${TEST_UDPV4TESTS_SOURCE}) -set(TRANSPORT_XFAIL_LIST ${TRANSPORT_XFAIL_LIST} XFAIL_TEST_UDP4) - add_executable(TCPv4Tests ${TCPV4TESTS_SOURCE}) target_compile_definitions(TCPv4Tests PRIVATE BOOST_ASIO_STANDALONE diff --git a/test/unittest/transport/TCPv4Tests.cpp b/test/unittest/transport/TCPv4Tests.cpp index cd38f212d73..52a7e9e72f8 100644 --- a/test/unittest/transport/TCPv4Tests.cpp +++ b/test/unittest/transport/TCPv4Tests.cpp @@ -22,6 +22,7 @@ #include "mock/MockTCPv4Transport.h" #include #include +#include #include #include #include @@ -1025,96 +1026,8 @@ TEST_F(TCPv4Tests, send_and_receive_between_secure_clients_1) sem.wait(); } } -/* - TEST_F(TCPv4Tests, send_and_receive_between_secure_clients_2) - { - eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Kind::Info); - - using TLSVerifyMode = TCPTransportDescriptor::TLSConfig::TLSVerifyMode; - using TLSOptions = TCPTransportDescriptor::TLSConfig::TLSOptions; - using TLSHSRole = TCPTransportDescriptor::TLSConfig::TLSHandShakeRole; - - TCPv4TransportDescriptor recvDescriptor; - recvDescriptor.add_listener_port(g_default_port + 1); - recvDescriptor.apply_security = true; - recvDescriptor.tls_config.handshake_role = TLSHSRole::CLIENT; - //recvDescriptor.tls_config.password = "testkey"; - //recvDescriptor.tls_config.password = "test"; - //recvDescriptor.tls_config.cert_chain_file = "mainpubcert.pem"; - //recvDescriptor.tls_config.private_key_file = "mainpubkey.pem"; - recvDescriptor.tls_config.verify_file = "maincacert.pem"; // This CA only know about mainsub certificates - //recvDescriptor.tls_config.verify_file = "ca.pem"; - // Server doesn't accept clients without certs - recvDescriptor.tls_config.verify_mode = TLSVerifyMode::VERIFY_FAIL_IF_NO_PEER_CERT | TLSVerifyMode::VERIFY_PEER; - recvDescriptor.tls_config.add_option(TLSOptions::DEFAULT_WORKAROUNDS); - TCPv4Transport receiveTransportUnderTest(recvDescriptor); - receiveTransportUnderTest.init(); - - Locator_t inputLocator; - inputLocator.kind = LOCATOR_KIND_TCPv4; - inputLocator.port = g_default_port + 1; - IPLocator::setIPv4(inputLocator, 127, 0, 0, 1); - IPLocator::setLogicalPort(inputLocator, 7410); - - Locator_t outputLocator; - outputLocator.kind = LOCATOR_KIND_TCPv4; - IPLocator::setIPv4(outputLocator, 127, 0, 0, 1); - outputLocator.port = g_default_port + 1; - IPLocator::setLogicalPort(outputLocator, 7410); - - TCPv4TransportDescriptor sendDescriptor2; - sendDescriptor2.apply_security = true; - sendDescriptor2.tls_config.handshake_role = TLSHSRole::SERVER; - sendDescriptor2.tls_config.password = "test"; - sendDescriptor2.tls_config.cert_chain_file = "server.pem"; - sendDescriptor2.tls_config.private_key_file = "server.pem"; - //sendDescriptor2.tls_config.password = "testkey"; - //sendDescriptor2.tls_config.cert_chain_file = "mainsubcert.pem"; - //sendDescriptor2.tls_config.private_key_file = "mainsubkey.pem"; - sendDescriptor2.tls_config.verify_mode = TLSVerifyMode::VERIFY_PEER; - sendDescriptor2.tls_config.add_option(TLSOptions::DEFAULT_WORKAROUNDS); - TCPv4Transport sendTransportUnderTest2(sendDescriptor2); - sendTransportUnderTest2.init(); - - { - MockReceiverResource receiver(receiveTransportUnderTest, inputLocator); - MockMessageReceiver *msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); - ASSERT_TRUE(receiveTransportUnderTest.IsInputChannelOpen(inputLocator)); - - ASSERT_TRUE(sendTransportUnderTest2.OpenOutputChannel(outputLocator)); - octet message[5] = { 'H','e','l','l','o' }; - - Semaphore sem; - std::function recCallback = [&]() - { - EXPECT_FALSE(true); // Should not receive - sem.post(); - }; - - msg_recv->setCallback(recCallback); - auto sendThreadFunction = [&]() - { - bool sent = sendTransportUnderTest2.send(message, 5, outputLocator, inputLocator); - int count = 0; - while (!sent && count < 30) - { - sent = sendTransportUnderTest2.send(message, 5, outputLocator, inputLocator); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - ++count; - } - EXPECT_FALSE(sent); - sem.post(); - }; - - senderThread.reset(new std::thread(sendThreadFunction)); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - senderThread->join(); - sem.wait(); - } - ASSERT_TRUE(sendTransportUnderTest2.CloseOutputChannel(outputLocator)); - } - */ +// TODO(eduponz): TEST_F(TCPv4Tests, send_and_receive_between_secure_clients_2) TEST_F(TCPv4Tests, send_and_receive_between_secure_ports_untrusted_server) { @@ -2103,6 +2016,96 @@ TEST_F(TCPv4Tests, add_logical_port_on_send_resource_creation) } } +// This test verifies that the send resource list is correctly cleaned both in LAN and WAN cases. +TEST_F(TCPv4Tests, remove_from_send_resource_list) +{ + // Three scenarios are considered: LAN, WAN1 and WAN2 + // LAN: The remote locator is in the same LAN as the local locator + // WAN1: The remote locator is in a different LAN than the local locator, and initial peers have LAN and WAN remote addresses. + // WAN2: The remote locator is in a different LAN than the local locator, and initial peers have WANtoLANLocator ([0][WAN] address). + std::vector test_cases = { + "LAN", + "WAN1", + "WAN2" + }; + + for (const std::string& test_case : test_cases) + { + TCPv4TransportDescriptor send_descriptor; + + MockTCPv4Transport send_transport_under_test(send_descriptor); + send_transport_under_test.init(); + + Locator_t discovery_locator; + IPLocator::createLocator(LOCATOR_KIND_TCPv4, "127.0.0.1", g_default_port, discovery_locator); + IPLocator::setLogicalPort(discovery_locator, 7410); + + Locator_t initial_peer_locator; + IPLocator::createLocator(LOCATOR_KIND_TCPv4, "127.0.0.1", g_default_port + 1, initial_peer_locator); + IPLocator::setLogicalPort(initial_peer_locator, 7410); + LocatorList_t initial_peer_list; + + if (test_case == "WAN1" || test_case == "WAN2") + { + IPLocator::setWan(discovery_locator, g_test_wan_address); + IPLocator::setWan(initial_peer_locator, g_test_wan_address); + + if (test_case == "WAN2") + { + initial_peer_locator = IPLocator::WanToLanLocator(initial_peer_locator); + } + } + + initial_peer_list.push_back(initial_peer_locator); + + SendResourceList send_resource_list; + ASSERT_TRUE(send_transport_under_test.OpenOutputChannel(send_resource_list, discovery_locator)); + ASSERT_TRUE(send_transport_under_test.OpenOutputChannel(send_resource_list, initial_peer_locator)); + ASSERT_EQ(send_resource_list.size(), 2u); + + // Using a wrong locator should not remove the channel resource + LocatorList_t wrong_remote_participant_physical_locators; + Locator_t wrong_output_locator; + IPLocator::createLocator(LOCATOR_KIND_TCPv4, "127.0.0.1", g_default_port + 2, wrong_output_locator); + IPLocator::setLogicalPort(wrong_output_locator, 7410); + + if (test_case == "WAN1" || test_case == "WAN2") + { + IPLocator::setWan(wrong_output_locator, g_test_wan_address); + } + wrong_remote_participant_physical_locators.push_back(wrong_output_locator); + send_transport_under_test.CloseOutputChannel( + send_resource_list, + wrong_remote_participant_physical_locators, + initial_peer_list); + ASSERT_EQ(send_resource_list.size(), 2); + + // Using the correct locator should remove the channel resource + LocatorList_t remote_participant_physical_locators; + remote_participant_physical_locators.push_back(discovery_locator); + send_transport_under_test.CloseOutputChannel( + send_resource_list, + remote_participant_physical_locators, + initial_peer_list); + ASSERT_EQ(send_resource_list.size(), 1); + + // Using the initial peer locator should not remove the channel resource + remote_participant_physical_locators.clear(); + if (test_case == "WAN2") + { + // In WAN2, the remote_participant_physical_locators are the real Locators, not the WANtoLANLocators. + IPLocator::setIPv4(initial_peer_locator, "127.0.0.1"); + IPLocator::setWan(initial_peer_locator, g_test_wan_address); + } + remote_participant_physical_locators.push_back(initial_peer_locator); + send_transport_under_test.CloseOutputChannel( + send_resource_list, + remote_participant_physical_locators, + initial_peer_list); + ASSERT_EQ(send_resource_list.size(), 1); + } +} + void TCPv4Tests::HELPER_SetDescriptorDefaults() { descriptor.add_listener_port(g_default_port); diff --git a/test/unittest/transport/TCPv6Tests.cpp b/test/unittest/transport/TCPv6Tests.cpp index 8ef93e69f6a..59249c33b51 100644 --- a/test/unittest/transport/TCPv6Tests.cpp +++ b/test/unittest/transport/TCPv6Tests.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -133,9 +134,9 @@ TEST_F(TCPv6Tests, opening_and_closing_output_channel) ASSERT_FALSE (transportUnderTest.IsOutputChannelOpen(genericOutputChannelLocator)); ASSERT_TRUE (transportUnderTest.OpenOutputChannel(genericOutputChannelLocator)); ASSERT_TRUE (transportUnderTest.IsOutputChannelOpen(genericOutputChannelLocator)); - ASSERT_TRUE (transportUnderTest.CloseOutputChannel(genericOutputChannelLocator)); + ASSERT_TRUE (transportUnderTest.SenderResourceHasBeenClosed(genericOutputChannelLocator)); ASSERT_FALSE (transportUnderTest.IsOutputChannelOpen(genericOutputChannelLocator)); - ASSERT_FALSE (transportUnderTest.CloseOutputChannel(genericOutputChannelLocator)); + ASSERT_FALSE (transportUnderTest.SenderResourceHasBeenClosed(genericOutputChannelLocator)); */ } @@ -549,6 +550,61 @@ TEST_F(TCPv6Tests, add_logical_port_on_send_resource_creation) } } +// This test verifies that the send resource list is correctly cleaned and the channel resource is removed +// from the channel_resources_map. +TEST_F(TCPv6Tests, remove_from_send_resource_list) +{ + TCPv6TransportDescriptor send_descriptor; + MockTCPv6Transport send_transport_under_test(send_descriptor); + send_transport_under_test.init(); + + Locator_t output_locator_1; + IPLocator::createLocator(LOCATOR_KIND_TCPv6, "::1", g_default_port, output_locator_1); + IPLocator::setLogicalPort(output_locator_1, 7410); + + Locator_t output_locator_2; + IPLocator::createLocator(LOCATOR_KIND_TCPv6, "::1", g_default_port + 1, output_locator_2); + IPLocator::setLogicalPort(output_locator_2, 7410); + + LocatorList_t initial_peer_list; + initial_peer_list.push_back(output_locator_2); + + SendResourceList send_resource_list; + ASSERT_TRUE(send_transport_under_test.OpenOutputChannel(send_resource_list, output_locator_1)); + ASSERT_TRUE(send_transport_under_test.OpenOutputChannel(send_resource_list, output_locator_2)); + ASSERT_EQ(send_resource_list.size(), 2u); + + // Using a wrong locator should not remove the channel resource + LocatorList_t wrong_remote_participant_physical_locators; + Locator_t wrong_output_locator; + IPLocator::createLocator(LOCATOR_KIND_TCPv6, "::1", g_default_port + 2, wrong_output_locator); + IPLocator::setLogicalPort(wrong_output_locator, 7410); + wrong_remote_participant_physical_locators.push_back(wrong_output_locator); + send_transport_under_test.CloseOutputChannel( + send_resource_list, + wrong_remote_participant_physical_locators, + initial_peer_list); + ASSERT_EQ(send_resource_list.size(), 2u); + + // Using the correct locator should remove the channel resource + LocatorList_t remote_participant_physical_locators; + remote_participant_physical_locators.push_back(output_locator_1); + send_transport_under_test.CloseOutputChannel( + send_resource_list, + remote_participant_physical_locators, + initial_peer_list); + ASSERT_EQ(send_resource_list.size(), 1u); + + // Using the initial peer locator should not remove the channel resource + remote_participant_physical_locators.clear(); + remote_participant_physical_locators.push_back(output_locator_2); + send_transport_under_test.CloseOutputChannel( + send_resource_list, + remote_participant_physical_locators, + initial_peer_list); + ASSERT_EQ(send_resource_list.size(), 1u); +} + // TODO: TEST_F(TCPv6Tests, send_and_receive_between_both_secure_ports) // TODO: TEST_F(TCPv6Tests, send_and_receive_between_ports) diff --git a/test/unittest/transport/test_UDPv4Tests.cpp b/test/unittest/transport/test_UDPv4Tests.cpp deleted file mode 100644 index 512095ac7fd..00000000000 --- a/test/unittest/transport/test_UDPv4Tests.cpp +++ /dev/null @@ -1,270 +0,0 @@ -// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include - -#if defined(_WIN32) -#define GET_PID _getpid -#else -#define GET_PID getpid -#endif // if defined(_WIN32) - -using IPLocator = eprosima::fastrtps::rtps::IPLocator; -using test_UDPv4Transport = eprosima::fastdds::rtps::test_UDPv4Transport; - -static uint16_t g_default_port = 0; - -uint16_t get_port() -{ - uint16_t port = static_cast(GET_PID()); - - if (4000 > port) - { - port += 4000; - } - - return port; -} - -using namespace eprosima::fastrtps; -using namespace eprosima::fastrtps::rtps; - -class test_UDPv4Tests : public ::testing::Test -{ -public: - - test_UDPv4Tests() - { - HELPER_SetDescriptorDefaults(); - } - - ~test_UDPv4Tests() - { - eprosima::fastdds::dds::Log::KillThread(); - } - - void HELPER_SetDescriptorDefaults(); - void HELPER_WarmUpOutput( - test_UDPv4Transport& transport); - void HELPER_FillDataMessage( - CDRMessage_t& message, - SequenceNumber_t sequenceNumber); - void HELPER_FillAckNackMessage( - CDRMessage_t& message); - void HELPER_FillHeartbeatMessage( - CDRMessage_t& message); - - test_UDPv4TransportDescriptor descriptor; - std::unique_ptr senderThread; - std::unique_ptr receiverThread; -}; - -/* - TEST_F(test_UDPv4Tests, DATA_messages_dropped) - { - // Given - descriptor.dropDataMessagesPercentage = 100; - test_UDPv4Transport transportUnderTest(descriptor); - transportUnderTest.init(); - CDRMessage_t testDataMessage; - HELPER_FillDataMessage(testDataMessage, SequenceNumber_t()); - HELPER_WarmUpOutput(transportUnderTest); - Locator_t locator; - locator.port = g_default_port; - locator.kind = LOCATOR_KIND_UDPv4; - - // Then - ASSERT_TRUE(transportUnderTest.send(testDataMessage.buffer, testDataMessage.length, locator, locator)); - ASSERT_EQ(1u, test_UDPv4Transport::test_UDPv4Transport_DropLog.size()); - - ASSERT_TRUE(transportUnderTest.CloseOutputChannel(locator)); - } - - TEST_F(test_UDPv4Tests, ACKNACK_messages_dropped) - { - // Given - descriptor.dropAckNackMessagesPercentage = 100; - test_UDPv4Transport transportUnderTest(descriptor); - transportUnderTest.init(); - CDRMessage_t testDataMessage; - HELPER_FillAckNackMessage(testDataMessage); - HELPER_WarmUpOutput(transportUnderTest); - Locator_t locator; - locator.port = g_default_port; - locator.kind = LOCATOR_KIND_UDPv4; - - // Then - ASSERT_TRUE(transportUnderTest.send(testDataMessage.buffer, testDataMessage.length, locator, locator)); - ASSERT_EQ(1u, test_UDPv4Transport::test_UDPv4Transport_DropLog.size()); - ASSERT_TRUE(transportUnderTest.CloseOutputChannel(locator)); - } - - TEST_F(test_UDPv4Tests, HEARTBEAT_messages_dropped) - { - // Given - descriptor.dropHeartbeatMessagesPercentage = 100; - test_UDPv4Transport transportUnderTest(descriptor); - transportUnderTest.init(); - CDRMessage_t testDataMessage; - HELPER_FillHeartbeatMessage(testDataMessage); - HELPER_WarmUpOutput(transportUnderTest); - Locator_t locator; - locator.port = g_default_port; - locator.kind = LOCATOR_KIND_UDPv4; - - // Then - ASSERT_TRUE(transportUnderTest.send(testDataMessage.buffer, testDataMessage.length, locator, locator)); - ASSERT_EQ(1u, test_UDPv4Transport::test_UDPv4Transport_DropLog.size()); - ASSERT_TRUE(transportUnderTest.CloseOutputChannel(locator)); - } - - TEST_F(test_UDPv4Tests, Dropping_by_random_chance) - { - // Given - descriptor.percentageOfMessagesToDrop = 100; // To avoid a non-deterministic test - test_UDPv4Transport transportUnderTest(descriptor); - transportUnderTest.init(); - CDRMessage_t testDataMessage; - HELPER_FillAckNackMessage(testDataMessage); - HELPER_WarmUpOutput(transportUnderTest); - Locator_t locator; - locator.port = g_default_port; - locator.kind = LOCATOR_KIND_UDPv4; - - // Then - ASSERT_TRUE(transportUnderTest.send(testDataMessage.buffer, testDataMessage.length, locator, locator)); - ASSERT_TRUE(transportUnderTest.send(testDataMessage.buffer, testDataMessage.length, locator, locator)); - ASSERT_TRUE(transportUnderTest.send(testDataMessage.buffer, testDataMessage.length, locator, locator)); - ASSERT_EQ(3u, test_UDPv4Transport::test_UDPv4Transport_DropLog.size()); - ASSERT_TRUE(transportUnderTest.CloseOutputChannel(locator)); - } - - TEST_F(test_UDPv4Tests, dropping_by_sequence_number) - { - // Given - std::vector sequenceNumbersToDrop(1); - sequenceNumbersToDrop.back().low = 1; - - descriptor.sequenceNumberDataMessagesToDrop = sequenceNumbersToDrop; - test_UDPv4Transport transportUnderTest(descriptor); - transportUnderTest.init(); - CDRMessage_t testDataMessage; - HELPER_FillDataMessage(testDataMessage, sequenceNumbersToDrop.back()); - HELPER_WarmUpOutput(transportUnderTest); - Locator_t locator; - locator.port = g_default_port; - locator.kind = LOCATOR_KIND_UDPv4; - - // Then - ASSERT_TRUE(transportUnderTest.send(testDataMessage.buffer, testDataMessage.length, locator, locator)); - ASSERT_EQ(1u, test_UDPv4Transport::test_UDPv4Transport_DropLog.size()); - ASSERT_TRUE(transportUnderTest.CloseOutputChannel(locator)); - } - - TEST_F(test_UDPv4Tests, No_drops_when_unrequested) - { - // Given - descriptor.dropHeartbeatMessagesPercentage = 100; - descriptor.dropDataMessagesPercentage = 100; - descriptor.granularMode = false; - - test_UDPv4Transport transportUnderTest(descriptor); // Default, no drops - transportUnderTest.init(); - CDRMessage_t testDataMessage; - HELPER_FillAckNackMessage(testDataMessage); - HELPER_WarmUpOutput(transportUnderTest); - Locator_t locator; - locator.port = g_default_port; - locator.kind = LOCATOR_KIND_UDPv4; - IPLocator::setIPv4(locator, 239, 255, 1, 4); - - // Then - ASSERT_TRUE(transportUnderTest.send(testDataMessage.buffer, testDataMessage.length, locator, locator)); - ASSERT_EQ(0u, test_UDPv4Transport::test_UDPv4Transport_DropLog.size()); - ASSERT_TRUE(transportUnderTest.CloseOutputChannel(locator)); - } - - void test_UDPv4Tests::HELPER_SetDescriptorDefaults() - { - descriptor.sendBufferSize = 80; - descriptor.receiveBufferSize = 80; - descriptor.dropDataMessagesPercentage = 0; - descriptor.dropDataFragMessagesPercentage = 0; - descriptor.dropAckNackMessagesPercentage = 0; - descriptor.dropHeartbeatMessagesPercentage = 0; - descriptor.percentageOfMessagesToDrop = 0; - descriptor.dropLogLength = 10; - descriptor.granularMode = false; - } - - void test_UDPv4Tests::HELPER_WarmUpOutput(test_UDPv4Transport& transport) - { - Locator_t outputChannelLocator; - outputChannelLocator.port = g_default_port; - outputChannelLocator.kind = LOCATOR_KIND_UDPv4; - ASSERT_TRUE(transport.OpenOutputChannel(outputChannelLocator)); - } - */ - -void test_UDPv4Tests::HELPER_FillDataMessage( - CDRMessage_t& message, - SequenceNumber_t sequenceNumber) -{ - GuidPrefix_t prefix; - TopicKind_t topic = WITH_KEY; - EntityId_t entityID; - CacheChange_t change; - change.sequenceNumber = sequenceNumber; // Here is where the SN propagates from - RTPSMessageCreator::addMessageData(&message, prefix, &change, topic, entityID, false, nullptr); -} - -void test_UDPv4Tests::HELPER_FillAckNackMessage( - CDRMessage_t& message) -{ - GuidPrefix_t prefix; - EntityId_t entityID; - SequenceNumberSet_t set; - RTPSMessageCreator::addMessageAcknack(&message, prefix, prefix, entityID, entityID, set, 0, false); -} - -void test_UDPv4Tests::HELPER_FillHeartbeatMessage( - CDRMessage_t& message) -{ - GuidPrefix_t prefix; - EntityId_t entityID; - SequenceNumber_t sn1; - SequenceNumber_t sn2; - RTPSMessageCreator::addMessageHeartbeat(&message, prefix, entityID, entityID, sn1, sn2, 0, false, false); -} - -int main( - int argc, - char** argv) -{ - g_default_port = get_port(); - - testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -}