diff --git a/src/cpp/rtps/builtin/discovery/participant/PDP.cpp b/src/cpp/rtps/builtin/discovery/participant/PDP.cpp index 0983c708b90..6d981df056c 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDP.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDP.cpp @@ -44,6 +44,8 @@ #include #include +#include + #include #include #include "fastrtps/utils/shared_mutex.hpp" @@ -1104,6 +1106,21 @@ bool PDP::remove_remote_participant( this->mp_mutex->lock(); + // Delete from sender resource list (TCP only) + LocatorList_t remote_participant_locators; + for (auto& remote_participant_default_locator : pdata->default_locators.unicast) + { + remote_participant_locators.push_back(remote_participant_default_locator); + } + for (auto& remote_participant_metatraffic_locator : pdata->metatraffic_locators.unicast) + { + remote_participant_locators.push_back(remote_participant_metatraffic_locator); + } + if (!remote_participant_locators.empty()) + { + mp_RTPSParticipant->update_removed_participant(remote_participant_locators); + } + // Return reader proxy objects to pool for (auto pit : *pdata->m_readers) { @@ -1131,6 +1148,7 @@ bool PDP::remove_remote_participant( participant_proxies_pool_.push_back(pdata); this->mp_mutex->unlock(); + return true; } diff --git a/src/cpp/rtps/network/NetworkFactory.cpp b/src/cpp/rtps/network/NetworkFactory.cpp index 58ef285d74c..a4c6cd85957 100644 --- a/src/cpp/rtps/network/NetworkFactory.cpp +++ b/src/cpp/rtps/network/NetworkFactory.cpp @@ -18,12 +18,13 @@ #include #include +#include #include #include #include #include -#include +#include using namespace std; using namespace eprosima::fastdds::rtps; @@ -440,6 +441,24 @@ void NetworkFactory::update_network_interfaces() } } +void NetworkFactory::remove_participant_associated_send_resources( + SendResourceList& send_resource_list, + const LocatorList_t& remote_participant_locators, + const LocatorList_t& participant_initial_peers) const +{ + for (auto& transport : mRegisteredTransports) + { + TCPTransportInterface* tcp_transport = dynamic_cast(transport.get()); + if (tcp_transport) + { + tcp_transport->CloseOutputChannel( + send_resource_list, + remote_participant_locators, + participant_initial_peers); + } + } +} + } // namespace rtps } // namespace fastrtps } // namespace eprosima diff --git a/src/cpp/rtps/network/NetworkFactory.h b/src/cpp/rtps/network/NetworkFactory.h index ade996764b7..97cee2fdc8b 100644 --- a/src/cpp/rtps/network/NetworkFactory.h +++ b/src/cpp/rtps/network/NetworkFactory.h @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -217,6 +218,18 @@ class NetworkFactory */ void update_network_interfaces(); + /** + * Remove the given participants from the send resource list + * + * @param send_resource_list List of send resources associated to the local participant. + * @param remote_participant_locators List of locators associated to the remote participant. + * @param participant_initial_peers List of locators of the initial peers of the local participant. + */ + void remove_participant_associated_send_resources( + fastdds::rtps::SendResourceList& send_resource_list, + const LocatorList_t& remote_participant_locators, + const LocatorList_t& participant_initial_peers) const; + private: std::vector> mRegisteredTransports; diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index 0cdc4b0aedc..ae17735436a 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -51,6 +51,8 @@ #include #include +#include + #include #include #include @@ -2731,6 +2733,19 @@ bool RTPSParticipantImpl::should_match_local_endpoints( return should_match_local_endpoints; } +void RTPSParticipantImpl::update_removed_participant( + const LocatorList_t& remote_participant_locators) +{ + if (!remote_participant_locators.empty()) + { + std::lock_guard guard(m_send_resources_mutex_); + m_network_Factory.remove_participant_associated_send_resources( + send_resource_list_, + remote_participant_locators, + m_att.builtin.initialPeersList); + } +} + } /* namespace rtps */ } /* namespace fastrtps */ } /* namespace eprosima */ diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h index cb042f45d29..9720fcd1830 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h @@ -40,6 +40,7 @@ #include #include #include +#include #include #include #include @@ -1133,6 +1134,14 @@ class RTPSParticipantImpl return match_local_endpoints_; } + /** + * Method called on participant removal with the set of locators associated to the participant. + * + * @param remote_participant_locators Set of locators associated to the participant removed. + */ + void update_removed_participant( + const LocatorList_t& remote_participant_locators); + }; } // namespace rtps } /* namespace rtps */ diff --git a/src/cpp/rtps/transport/TCPSenderResource.hpp b/src/cpp/rtps/transport/TCPSenderResource.hpp index c5d136938e4..746b7ee0c58 100644 --- a/src/cpp/rtps/transport/TCPSenderResource.hpp +++ b/src/cpp/rtps/transport/TCPSenderResource.hpp @@ -39,7 +39,11 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource // Implementation functions are bound to the right transport parameters clean_up = [this, &transport]() { +<<<<<<< HEAD transport.CloseOutputChannel(channel_); +======= + transport.SenderResourceHasBeenClosed(locator_); +>>>>>>> fe116500c (TCPSendResources cleanup (#4300)) }; send_lambda_ = [this, &transport]( @@ -68,7 +72,7 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource } static TCPSenderResource* cast( - TransportInterface& transport, + const TransportInterface& transport, SenderResource* sender_resource) { TCPSenderResource* returned_resource = nullptr; diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index aa8eab949cc..821d54e09c1 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -14,6 +14,19 @@ #include +<<<<<<< HEAD +======= +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +>>>>>>> fe116500c (TCPSendResources cleanup (#4300)) #include #include #include @@ -224,6 +237,35 @@ void TCPTransportInterface::clean() } } +<<<<<<< HEAD +======= +Locator TCPTransportInterface::remote_endpoint_to_locator( + const std::shared_ptr& channel) const +{ + Locator locator; + asio::error_code ec; + endpoint_to_locator(channel->remote_endpoint(ec), locator); + if (ec) + { + LOCATOR_INVALID(locator); + } + return locator; +} + +Locator TCPTransportInterface::local_endpoint_to_locator( + const std::shared_ptr& channel) const +{ + Locator locator; + asio::error_code ec; + endpoint_to_locator(channel->local_endpoint(ec), locator); + if (ec) + { + LOCATOR_INVALID(locator); + } + return locator; +} + +>>>>>>> fe116500c (TCPSendResources cleanup (#4300)) void TCPTransportInterface::bind_socket( std::shared_ptr& channel) { @@ -570,6 +612,7 @@ bool TCPTransportInterface::transform_remote_locator( return true; } +<<<<<<< HEAD void TCPTransportInterface::CloseOutputChannel( std::shared_ptr& channel) { @@ -579,6 +622,26 @@ void TCPTransportInterface::CloseOutputChannel( auto channel_resource = channel_resources_.find(physical_locator); assert(channel_resource != channel_resources_.end()); (void)channel_resource; +======= +void TCPTransportInterface::SenderResourceHasBeenClosed( + fastrtps::rtps::Locator_t& locator) +{ + // The TCPSendResource associated channel cannot be removed from the channel_resources_ map. On transport's destruction + // this map is consulted to send the unbind requests. If not sending it, the other participant wouldn't disconnect the + // socket and keep a connection status of eEstablished. This would prevent new connect calls since it thinks it's already + // connected. + // If moving this unbind send with the respective channel disconnection to this point, the following problem arises: + // If receiving a SenderResourceHasBeenClosed call after receiving an unbinding message from a remote participant (our participant + // isn't disconnecting but we want to erase this send resource), the channel cannot be disconnected here since the listening thread has + // taken the read mutex (permanently waiting at read asio layer). This mutex is also needed to disconnect the socket (deadlock). + // Socket disconnection should always be done in the listening thread (or in the transport cleanup, when receiver resources have + // already been destroyed and the listening thread had consequently finished). + // An assert() clause finding the respective channel resource cannot be made since in LARGE DATA scenario, where the PDP discovery is done + // via UDP, a server's send resource can be created with without any associated channel resource until receiving a connection request from + // the client. + // The send resource locator is invalidated to prevent further use of associated channel. + LOCATOR_INVALID(locator); +>>>>>>> fe116500c (TCPSendResources cleanup (#4300)) } bool TCPTransportInterface::CloseInputChannel( @@ -1083,7 +1146,6 @@ bool TCPTransportInterface::Receive( { std::shared_ptr rtcp_message_manager; if (TCPChannelResource::eConnectionStatus::eDisconnected != channel->connection_status()) - { std::unique_lock lock(rtcp_message_manager_mutex_); rtcp_message_manager = rtcp_manager.lock(); @@ -1327,10 +1389,8 @@ void TCPTransportInterface::SocketAccepted( channel_weak_ptr, rtcp_manager_weak_ptr)); EPROSIMA_LOG_INFO(RTCP, "Accepted connection (local: " - << channel->local_endpoint().address() << ":" - << channel->local_endpoint().port() << "), remote: " - << channel->remote_endpoint().address() << ":" - << channel->remote_endpoint().port() << ")"); + << local_endpoint_to_locator(channel) << ", remote: " + << remote_endpoint_to_locator(channel) << ")"); } else { @@ -1375,10 +1435,8 @@ void TCPTransportInterface::SecureSocketAccepted( channel_weak_ptr, rtcp_manager_weak_ptr)); EPROSIMA_LOG_INFO(RTCP, " Accepted connection (local: " - << socket->lowest_layer().local_endpoint().address() << ":" - << socket->lowest_layer().local_endpoint().port() << "), remote: " - << socket->lowest_layer().remote_endpoint().address() << ":" - << socket->lowest_layer().remote_endpoint().port() << ")"); + << local_endpoint_to_locator(secure_channel) << ", remote: " + << remote_endpoint_to_locator(secure_channel) << ")"); } else { @@ -1727,6 +1785,60 @@ void TCPTransportInterface::fill_local_physical_port( } } +void TCPTransportInterface::CloseOutputChannel( + SendResourceList& send_resource_list, + const LocatorList& remote_participant_locators, + const LocatorList& participant_initial_peers) const +{ + // Since send resources handle physical locators, we need to convert the remote participant locators to physical + std::set remote_participant_physical_locators; + for (const Locator& remote_participant_locator : remote_participant_locators) + { + remote_participant_physical_locators.insert(IPLocator::toPhysicalLocator(remote_participant_locator)); + + // Also add the WANtoLANLocator ([0][WAN] address) if the remote locator is a WAN locator. In WAN scenario, + //initial peer can also work with the WANtoLANLocator of the remote participant. + if (IPLocator::hasWan(remote_participant_locator)) + { + remote_participant_physical_locators.insert(IPLocator::toPhysicalLocator(IPLocator::WanToLanLocator( + remote_participant_locator))); + } + } + + // Exlude initial peers. + for (const auto& initial_peer : participant_initial_peers) + { + if (std::find(remote_participant_physical_locators.begin(), remote_participant_physical_locators.end(), + IPLocator::toPhysicalLocator(initial_peer)) != remote_participant_physical_locators.end()) + { + remote_participant_physical_locators.erase(IPLocator::toPhysicalLocator(initial_peer)); + } + } + + for (const auto& remote_participant_physical_locator : remote_participant_physical_locators) + { + if (!IsLocatorSupported(remote_participant_physical_locator)) + { + continue; + } + // Remove send resources for the associated remote participant locator + for (auto it = send_resource_list.begin(); it != send_resource_list.end();) + { + TCPSenderResource* tcp_sender_resource = TCPSenderResource::cast(*this, it->get()); + + if (tcp_sender_resource) + { + if (tcp_sender_resource->locator() == remote_participant_physical_locator) + { + it = send_resource_list.erase(it); + continue; + } + } + ++it; + } + } +} + } // namespace rtps } // namespace fastrtps } // namespace eprosima diff --git a/src/cpp/rtps/transport/TCPTransportInterface.h b/src/cpp/rtps/transport/TCPTransportInterface.h index 6d3f6009c4b..04290601de1 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.h +++ b/src/cpp/rtps/transport/TCPTransportInterface.h @@ -186,6 +186,21 @@ class TCPTransportInterface : public TransportInterface Locator& locator) const = 0; /** +<<<<<<< HEAD +======= + * Converts a remote endpoint to a locator if possible. Otherwise, it sets an invalid locator. + */ + Locator remote_endpoint_to_locator( + const std::shared_ptr& channel) const; + + /** + * Converts a local endpoint to a locator if possible. Otherwise, it sets an invalid locator. + */ + Locator local_endpoint_to_locator( + const std::shared_ptr& channel) const; + + /** +>>>>>>> fe116500c (TCPSendResources cleanup (#4300)) * Shutdown method to close the connections of the transports. */ void shutdown() override; @@ -225,9 +240,15 @@ class TCPTransportInterface : public TransportInterface bool CloseInputChannel( const Locator&) override; +<<<<<<< HEAD //! Removes all outbound sockets on the given port. void CloseOutputChannel( std::shared_ptr& channel); +======= + //! Resets the locator bound to the sender resource. + void SenderResourceHasBeenClosed( + fastrtps::rtps::Locator_t& locator); +>>>>>>> fe116500c (TCPSendResources cleanup (#4300)) //! Reports whether Locators correspond to the same port. bool DoInputLocatorsMatch( @@ -452,11 +473,26 @@ class TCPTransportInterface : public TransportInterface void fill_local_physical_port( Locator& locator) const; +<<<<<<< HEAD bool get_non_blocking_send() const { return non_blocking_send_; } +======= + /** + * Close the output channel associated to the given remote participant but if its locators belong to the + * given list of initial peers. + * + * @param send_resource_list List of send resources associated to the local participant. + * @param remote_participant_locators Set of locators associated to the remote participant. + * @param participant_initial_peers List of locators associated to the initial peers of the local participant. + */ + void CloseOutputChannel( + SendResourceList& send_resource_list, + const LocatorList& remote_participant_locators, + const LocatorList& participant_initial_peers) const; +>>>>>>> fe116500c (TCPSendResources cleanup (#4300)) }; } // namespace rtps diff --git a/src/cpp/rtps/transport/UDPSenderResource.hpp b/src/cpp/rtps/transport/UDPSenderResource.hpp index 4db78236c72..70165141f80 100644 --- a/src/cpp/rtps/transport/UDPSenderResource.hpp +++ b/src/cpp/rtps/transport/UDPSenderResource.hpp @@ -43,7 +43,7 @@ class UDPSenderResource : public fastrtps::rtps::SenderResource // Implementation functions are bound to the right transport parameters clean_up = [this, &transport]() { - transport.CloseOutputChannel(socket_); + transport.SenderResourceHasBeenClosed(socket_); }; send_lambda_ = [this, &transport]( diff --git a/src/cpp/rtps/transport/UDPTransportInterface.cpp b/src/cpp/rtps/transport/UDPTransportInterface.cpp index 7b9fe4acd10..f4b683f0639 100644 --- a/src/cpp/rtps/transport/UDPTransportInterface.cpp +++ b/src/cpp/rtps/transport/UDPTransportInterface.cpp @@ -102,7 +102,7 @@ bool UDPTransportInterface::CloseInputChannel( return true; } -void UDPTransportInterface::CloseOutputChannel( +void UDPTransportInterface::SenderResourceHasBeenClosed( eProsimaUDPSocket& socket) { socket.cancel(); diff --git a/src/cpp/rtps/transport/UDPTransportInterface.h b/src/cpp/rtps/transport/UDPTransportInterface.h index 46adcc8b19a..3ec5c00e708 100644 --- a/src/cpp/rtps/transport/UDPTransportInterface.h +++ b/src/cpp/rtps/transport/UDPTransportInterface.h @@ -49,7 +49,7 @@ class UDPTransportInterface : public TransportInterface const Locator&) override; //! Removes all outbound sockets on the given port. - void CloseOutputChannel( + void SenderResourceHasBeenClosed( eProsimaUDPSocket& socket); //! Reports whether Locators correspond to the same port. diff --git a/test/blackbox/common/BlackboxTestsTransportTCP.cpp b/test/blackbox/common/BlackboxTestsTransportTCP.cpp index e23328a9abe..8060a66a9e2 100644 --- a/test/blackbox/common/BlackboxTestsTransportTCP.cpp +++ b/test/blackbox/common/BlackboxTestsTransportTCP.cpp @@ -23,6 +23,15 @@ #include #include +<<<<<<< HEAD +======= +#include "../api/dds-pim/TCPReqRepHelloWorldRequester.hpp" +#include "../api/dds-pim/TCPReqRepHelloWorldReplier.hpp" +#include "PubSubReader.hpp" +#include "PubSubWriter.hpp" +#include "DatagramInjectionTransport.hpp" + +>>>>>>> fe116500c (TCPSendResources cleanup (#4300)) using namespace eprosima::fastrtps; using namespace eprosima::fastrtps::rtps; @@ -724,6 +733,495 @@ TEST_P(TransportTCP, TCPv6_autofill_port) EXPECT_TRUE(IPLocator::getPhysicalPort(p2_locators.begin()[0]) == port); } +<<<<<<< HEAD +======= +// Test TCP transport on LARGE_DATA topology +TEST_P(TransportTCP, large_data_topology) +{ + eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Warning); + + // Limited to 12 readers and 12 writers so as not to exceed the system's file descriptor limit. + uint16_t n_participants = 12; + constexpr uint32_t samples_per_participant = 10; + + /* Test configuration */ + std::vector>> readers; + std::vector>> writers; + + for (uint16_t i = 0; i < n_participants; i++) + { + readers.emplace_back(new PubSubReader(TEST_TOPIC_NAME)); + writers.emplace_back(new PubSubWriter(TEST_TOPIC_NAME)); + } + + // Create a vector of ports and shuffle it + std::vector ports; + for (uint16_t i = 0; i < 2 * n_participants; i++) + { + ports.push_back(7200 + i); + } + auto rng = std::default_random_engine{}; + std::shuffle(ports.begin(), ports.end(), rng); + + // Reliable Keep_all to wait for all acked as end condition + for (uint16_t i = 0; i < n_participants; i++) + { + writers[i]->reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS) + .history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS) + .durability_kind(eprosima::fastrtps::TRANSIENT_LOCAL_DURABILITY_QOS) + .lease_duration(eprosima::fastrtps::c_TimeInfinite, eprosima::fastrtps::Duration_t(3, 0)) + .resource_limits_max_instances(1) + .resource_limits_max_samples_per_instance(samples_per_participant); + + readers[i]->reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS) + .history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS) + .durability_kind(eprosima::fastrtps::TRANSIENT_LOCAL_DURABILITY_QOS) + .lease_duration(eprosima::fastrtps::c_TimeInfinite, eprosima::fastrtps::Duration_t(3, 0)) + .resource_limits_max_instances(n_participants) + .resource_limits_max_samples_per_instance(samples_per_participant); + + // Force TCP EDP discovery & data communication and UDP PDP discovery (NO SHM) + writers[i]->setup_large_data_tcp(use_ipv6, ports[i]); + readers[i]->setup_large_data_tcp(use_ipv6, ports[n_participants + i]); + } + + // Init participants + for (uint16_t i = 0; i < n_participants; i++) + { + writers[i]->init(); + readers[i]->init(); + ASSERT_TRUE(writers[i]->isInitialized()); + ASSERT_TRUE(readers[i]->isInitialized()); + } + + // Wait for discovery + for (uint16_t i = 0; i < n_participants; i++) + { + writers[i]->wait_discovery(n_participants, std::chrono::seconds(0)); + ASSERT_EQ(writers[i]->get_matched(), n_participants); + readers[i]->wait_discovery(std::chrono::seconds(0), n_participants); + ASSERT_EQ(readers[i]->get_matched(), n_participants); + } + + // Send and receive data + std::list data; + data = default_keyedhelloworld_per_participant_data_generator(n_participants, samples_per_participant); + + for (auto& reader : readers) + { + reader->startReception(data); + } + + auto validate_key = [](const std::list& data, uint16_t participant_key) + { + for (const auto& sample : data) + { + ASSERT_EQ(sample.key(), participant_key); + } + }; + + for (uint16_t i = 0; i < n_participants; i++) + { + auto start = std::next(data.begin(), i * samples_per_participant ); + auto end = std::next(start, samples_per_participant); + auto writer_data(std::list(start, end)); + validate_key(writer_data, i); + writers[i]->send(writer_data); + EXPECT_TRUE(writer_data.empty()); + } + + for (auto& reader : readers) + { + reader->block_for_all(); + } + for (auto& writer : writers) + { + EXPECT_TRUE(writer->waitForAllAcked(std::chrono::seconds(5))); + } + + // Destroy participants + readers.clear(); + writers.clear(); +} + +// This test verifies that if having a server with several listening ports, only the first one is used. +TEST_P(TransportTCP, multiple_listening_ports) +{ + // Create a server with several listening ports + PubSubReader* server = new PubSubReader(TEST_TOPIC_NAME); + uint16_t server_port_1 = 10000; + uint16_t server_port_2 = 10001; + + std::shared_ptr server_transport; + if (use_ipv6) + { + server_transport = std::make_shared(); + } + else + { + server_transport = std::make_shared(); + } + server_transport->add_listener_port(server_port_1); + server_transport->add_listener_port(server_port_2); + server->disable_builtin_transport().add_user_transport_to_pparams(server_transport).init(); + ASSERT_TRUE(server->isInitialized()); + + // Create two clients each one connecting to a different port + PubSubWriter* client_1 = new PubSubWriter(TEST_TOPIC_NAME); + PubSubWriter* client_2 = new PubSubWriter(TEST_TOPIC_NAME); + std::shared_ptr client_transport_1; + std::shared_ptr client_transport_2; + Locator_t initialPeerLocator_1; + Locator_t initialPeerLocator_2; + if (use_ipv6) + { + client_transport_1 = std::make_shared(); + client_transport_2 = std::make_shared(); + initialPeerLocator_1.kind = LOCATOR_KIND_TCPv6; + initialPeerLocator_2.kind = LOCATOR_KIND_TCPv6; + IPLocator::setIPv6(initialPeerLocator_1, "::1"); + IPLocator::setIPv6(initialPeerLocator_2, "::1"); + } + else + { + client_transport_1 = std::make_shared(); + client_transport_2 = std::make_shared(); + initialPeerLocator_1.kind = LOCATOR_KIND_TCPv4; + initialPeerLocator_2.kind = LOCATOR_KIND_TCPv4; + IPLocator::setIPv4(initialPeerLocator_1, 127, 0, 0, 1); + IPLocator::setIPv4(initialPeerLocator_2, 127, 0, 0, 1); + } + client_1->disable_builtin_transport().add_user_transport_to_pparams(client_transport_1); + client_2->disable_builtin_transport().add_user_transport_to_pparams(client_transport_2); + initialPeerLocator_1.port = server_port_1; + initialPeerLocator_2.port = server_port_2; + LocatorList_t initial_peer_list_1; + LocatorList_t initial_peer_list_2; + initial_peer_list_1.push_back(initialPeerLocator_1); + initial_peer_list_2.push_back(initialPeerLocator_2); + client_1->initial_peers(initial_peer_list_1); + client_2->initial_peers(initial_peer_list_2); + client_1->init(); + client_2->init(); + ASSERT_TRUE(client_1->isInitialized()); + ASSERT_TRUE(client_2->isInitialized()); + + // Wait for discovery. + server->wait_discovery(); + client_1->wait_discovery(); + client_2->wait_discovery(std::chrono::seconds(1)); + EXPECT_EQ(server->get_matched(), 1U); + EXPECT_EQ(client_1->get_matched(), 1U); + EXPECT_EQ(client_2->get_matched(), 0U); + + // Send data + auto data = default_helloworld_data_generator(); + server->startReception(data); + client_1->send(data); + // In this test all data should be sent. + ASSERT_TRUE(data.empty()); + // Block server until reception finished. + server->block_for_all(); + // Wait for all data to be acked. + EXPECT_TRUE(client_1->waitForAllAcked(std::chrono::milliseconds(100))); + + // Release TCP client and server resources. + delete client_1; + delete client_2; + delete server; +} + +// Test TCP send resource cleaning. This test matches a server with a client and then releases the +// client resources. After PDP unbind message, the server removes the client +// from the send resource list. +TEST_P(TransportTCP, send_resource_cleanup) +{ + eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Warning); + + using eprosima::fastdds::rtps::DatagramInjectionTransportDescriptor; + + std::unique_ptr> client(new PubSubWriter(TEST_TOPIC_NAME)); + std::unique_ptr> udp_participant(new PubSubWriter( + TEST_TOPIC_NAME)); + std::unique_ptr> server(new PubSubReader(TEST_TOPIC_NAME)); + + // Server + // Create a server with two transports, one of which uses a DatagramInjectionTransportDescriptor + // which heritates from ChainingTransportDescriptor. The low level transport of this chaining transport will be UDP. + // This will allow us to get send_resource_list_ from the server participant when UDP transport gets its OpenOutputChannel() + // method called. This should happen after TCP transports connection is established. We can then see how many TCP send + // resources exist. + // For the cleanup test we follow that same procedure. Firstly we destroy both participants and then instantiate a new + // UDP participant. The send resource list will get updated with no TCP send resource. + // __________________________________________________________ _____________________ + // | Server | | Client | + // | | | | + // | SendResourceList | | | + // | | | | | + // | Empty | | | + // | | | | | + // | | - TCPv4 init() | | | + // | | | | | + // | | - ChainingTransport(UDP) init() | | | + // | | | | | + // | 1 TCP <------------------------------------------------- TCPv4 init() | + // | | | | | + // | 1 TCP + 1 UDP <------------------------------------------------- UDPv4 init() | + // | | | | | + // | | - ChainingTransport-> | | | + // | TCP SendResources == 1 get_send_resource_list() | | | + // | | | | | + // | Empty <-------------------------------------------------- clean transports | + // | | | | | + // | 1 UDP - ChainingTransport(UDP) <------------------------ UDPv4 init() | + // | | | | | + // | | - ChainingTransport-> | | | + // | TCP SendResources == 0 get_send_resource_list() | | | + // |__________________________________________________________| |_____________________| + // + uint16_t server_port = 10000; + test_transport_->add_listener_port(server_port); + auto low_level_transport = std::make_shared(); + auto server_chaining_transport = std::make_shared(low_level_transport); + server->disable_builtin_transport().add_user_transport_to_pparams(test_transport_).add_user_transport_to_pparams( + server_chaining_transport).init(); + ASSERT_TRUE(server->isInitialized()); + + // Client + auto initialize_client = [&](PubSubWriter* client) + { + std::shared_ptr client_transport; + Locator_t initialPeerLocator; + if (use_ipv6) + { + client_transport = std::make_shared(); + initialPeerLocator.kind = LOCATOR_KIND_TCPv6; + IPLocator::setIPv6(initialPeerLocator, "::1"); + } + else + { + client_transport = std::make_shared(); + initialPeerLocator.kind = LOCATOR_KIND_TCPv4; + IPLocator::setIPv4(initialPeerLocator, 127, 0, 0, 1); + } + client->disable_builtin_transport().add_user_transport_to_pparams(client_transport); + initialPeerLocator.port = server_port; + LocatorList_t initial_peer_list; + initial_peer_list.push_back(initialPeerLocator); + client->initial_peers(initial_peer_list); + client->init(); + }; + auto initialize_udp_participant = [&](PubSubWriter* udp_participant) + { + auto udp_participant_transport = std::make_shared(); + udp_participant->disable_builtin_transport().add_user_transport_to_pparams(udp_participant_transport); + udp_participant->init(); + }; + initialize_client(client.get()); + ASSERT_TRUE(client->isInitialized()); + + // Wait for discovery. OpenOutputChannel() is called. We create a udp participant after to guarantee + // that the TCP participants have been mutually discovered when OpenOutputChannel() is called. + server->wait_discovery(std::chrono::seconds(0), 1); + client->wait_discovery(1, std::chrono::seconds(0)); + + initialize_udp_participant(udp_participant.get()); + ASSERT_TRUE(udp_participant->isInitialized()); + server->wait_discovery(std::chrono::seconds(0), 2); + udp_participant->wait_discovery(1, std::chrono::seconds(0)); + + // We can only update the senders when OpenOutputChannel() is called. If the send resource + // is deleted later, senders obtained from get_send_resource_list() won't have changed. + auto send_resource_list = server_chaining_transport->get_send_resource_list(); + auto tcp_send_resources = [](const std::set& send_resource_list) -> size_t + { + size_t tcp_send_resources = 0; + for (auto& sender_resource : send_resource_list) + { + if (sender_resource->kind() == LOCATOR_KIND_TCPv4 || sender_resource->kind() == LOCATOR_KIND_TCPv6) + { + tcp_send_resources++; + } + } + return tcp_send_resources; + }; + EXPECT_EQ(tcp_send_resources(send_resource_list), 1); + + // Release TCP client resources. + client.reset(); + udp_participant.reset(); + + // Wait for undiscovery. + server->wait_writer_undiscovery(); + + // Create new udp client. + udp_participant.reset(new PubSubWriter(TEST_TOPIC_NAME)); + + // Wait for discovery. OpenOutputChannel() is called and we can update the senders. + initialize_udp_participant(udp_participant.get()); + ASSERT_TRUE(udp_participant->isInitialized()); + server->wait_discovery(std::chrono::seconds(0), 1); + udp_participant->wait_discovery(1, std::chrono::seconds(0)); + + // Check that the send_resource_list has size 0. This means that the send resource + // for the client has been removed. + send_resource_list = server_chaining_transport->get_send_resource_list(); + EXPECT_EQ(tcp_send_resources(send_resource_list), 0); + send_resource_list.clear(); +} + +// Test TCP send resource cleaning. In this case, since the send resource has been created from an initial_peer, +// the send resource should not be removed. +TEST_P(TransportTCP, send_resource_cleanup_initial_peer) +{ + eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Warning); + + using eprosima::fastdds::rtps::DatagramInjectionTransportDescriptor; + + std::unique_ptr> client(new PubSubWriter(TEST_TOPIC_NAME)); + std::unique_ptr> udp_participant(new PubSubReader( + TEST_TOPIC_NAME)); + std::unique_ptr> server(new PubSubReader(TEST_TOPIC_NAME)); + + // Client + // Create a client with two transports, one of which uses a DatagramInjectionTransportDescriptor + // which heritates from ChainingTransportDescriptor. This will allow us to get send_resource_list_ + // from the client participant when its transport gets its OpenOutputChannel() method called. + + // __________________________________________________________ _____________________ + // | Server | | Client | + // | | | | + // | SendResourceList | | | + // | | | | | + // | Empty | | | + // | | | | | + // | | - TCPv4 init() | | | + // | | | | | + // | | - ChainingTransport(UDP) init() | | | + // | | | | | + // | 1 TCP <------------------------------------------------- TCPv4 init() | + // | | | | | + // | 1 TCP + 1 UDP <------------------------------------------------- UDPv4 init() | + // | | | | | + // | | - ChainingTransport-> | | | + // | TCP SendResources == 1 get_send_resource_list() | | | + // | | | | | + // | 1 TCP (initial peer) <-------------------------------------------------- clean transports | + // | | | | | + // | 1 TCP + 1 UDP - ChainingTransport(UDP) <------------------------ UDPv4 init() | + // | | | | | + // | | - ChainingTransport-> | | | + // | TCP SendResources == 1 get_send_resource_list() | | | + // | (initial peer) | | | + // |__________________________________________________________| |_____________________| + // + + uint16_t server_port = 10000; + LocatorList_t initial_peer_list; + Locator_t initialPeerLocator; + if (use_ipv6) + { + initialPeerLocator.kind = LOCATOR_KIND_TCPv6; + IPLocator::setIPv6(initialPeerLocator, "::1"); + } + else + { + initialPeerLocator.kind = LOCATOR_KIND_TCPv4; + IPLocator::setIPv4(initialPeerLocator, 127, 0, 0, 1); + } + initialPeerLocator.port = server_port; + initial_peer_list.push_back(initialPeerLocator); + client->initial_peers(initial_peer_list); + + auto low_level_transport = std::make_shared(); + auto client_chaining_transport = std::make_shared(low_level_transport); + client->disable_builtin_transport().add_user_transport_to_pparams(test_transport_).add_user_transport_to_pparams( + client_chaining_transport).init(); + ASSERT_TRUE(client->isInitialized()); + + // Server + auto initialize_server = [&](PubSubReader* server) + { + std::shared_ptr server_transport; + if (use_ipv6) + { + server_transport = std::make_shared(); + } + else + { + server_transport = std::make_shared(); + } + server_transport->add_listener_port(server_port); + server->disable_builtin_transport().add_user_transport_to_pparams(server_transport); + server->init(); + }; + auto initialize_udp_participant = [&](PubSubReader* udp_participant) + { + auto udp_participant_transport = std::make_shared(); + udp_participant->disable_builtin_transport().add_user_transport_to_pparams(udp_participant_transport); + udp_participant->init(); + }; + initialize_server(server.get()); + ASSERT_TRUE(server->isInitialized()); + + // Wait for discovery. OpenOutputChannel() is called. We create a udp participant after to guarantee + // that the TCP participants have been mutually discovered when OpenOutputChannel() is called. + client->wait_discovery(1, std::chrono::seconds(0)); + server->wait_discovery(std::chrono::seconds(0), 1); + + initialize_udp_participant(udp_participant.get()); + ASSERT_TRUE(udp_participant->isInitialized()); + client->wait_discovery(2, std::chrono::seconds(0)); + udp_participant->wait_discovery(std::chrono::seconds(0), 1); + + // We can only update the senders when OpenOutputChannel() is called. If the send resource + // is deleted later, senders obtained from get_send_resource_list() won't have changed. + auto send_resource_list = client_chaining_transport->get_send_resource_list(); + auto tcp_send_resources = [](const std::set& send_resource_list) -> size_t + { + size_t tcp_send_resources = 0; + for (auto& sender_resource : send_resource_list) + { + if (sender_resource->kind() == LOCATOR_KIND_TCPv4 || sender_resource->kind() == LOCATOR_KIND_TCPv6) + { + tcp_send_resources++; + } + } + return tcp_send_resources; + }; + EXPECT_EQ(tcp_send_resources(send_resource_list), 1); + + // Release TCP client resources. + server.reset(); + udp_participant.reset(); + + // Wait for undiscovery. + client->wait_reader_undiscovery(); + + // Create new client instances. + udp_participant.reset(new PubSubReader(TEST_TOPIC_NAME)); + + // Wait for discovery. OpenOutputChannel() is called and we can update the senders. + initialize_udp_participant(udp_participant.get()); + ASSERT_TRUE(udp_participant->isInitialized()); + client->wait_discovery(1, std::chrono::seconds(0)); + udp_participant->wait_discovery(std::chrono::seconds(0), 1); + + // Check that the send_resource_list has size 1. This means that the send resource + // for the first client hasn't been removed because it was created from an initial_peer. + send_resource_list = client_chaining_transport->get_send_resource_list(); + EXPECT_EQ(tcp_send_resources(send_resource_list), 1); + send_resource_list.clear(); + + // If relaunching the server, the client should connect again. + server.reset(new PubSubReader(TEST_TOPIC_NAME)); + initialize_server(server.get()); + ASSERT_TRUE(server->isInitialized()); + server->wait_discovery(std::chrono::seconds(0), 1); + client->wait_discovery(2, std::chrono::seconds(0)); +} + +>>>>>>> fe116500c (TCPSendResources cleanup (#4300)) #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else diff --git a/test/blackbox/common/DatagramInjectionTransport.cpp b/test/blackbox/common/DatagramInjectionTransport.cpp index a4b74f43b3f..3b1259a0de0 100644 --- a/test/blackbox/common/DatagramInjectionTransport.cpp +++ b/test/blackbox/common/DatagramInjectionTransport.cpp @@ -42,6 +42,24 @@ std::set DatagramInjectionTransportDescriptor::get_ return receivers_; } +void DatagramInjectionTransportDescriptor::update_send_resource_list( + const SendResourceList& send_resource_list) +{ + std::lock_guard guard(mtx_); + + send_resource_list_.clear(); + for (const auto& resource : send_resource_list) + { + send_resource_list_.insert(resource.get()); + } +} + +std::set DatagramInjectionTransportDescriptor::get_send_resource_list() +{ + std::lock_guard guard(mtx_); + return send_resource_list_; +} + } // namespace rtps } // namespace fastdds } // namespace eprosima diff --git a/test/blackbox/common/DatagramInjectionTransport.hpp b/test/blackbox/common/DatagramInjectionTransport.hpp index 3cfff45d6c9..aa1cd4b0873 100644 --- a/test/blackbox/common/DatagramInjectionTransport.hpp +++ b/test/blackbox/common/DatagramInjectionTransport.hpp @@ -17,6 +17,9 @@ #include #include +#include + +using SenderResource = eprosima::fastrtps::rtps::SenderResource; namespace eprosima { namespace fastdds { @@ -37,10 +40,16 @@ class DatagramInjectionTransportDescriptor : public ChainingTransportDescriptor std::set get_receivers(); + void update_send_resource_list( + const SendResourceList& send_resource_list); + + std::set get_send_resource_list(); + private: std::mutex mtx_; std::set receivers_; + std::set send_resource_list_; }; class DatagramInjectionTransport : public ChainingTransport @@ -60,23 +69,25 @@ class DatagramInjectionTransport : public ChainingTransport } bool send( - eprosima::fastrtps::rtps::SenderResource* /*low_sender_resource*/, - const eprosima::fastrtps::rtps::octet* /*send_buffer*/, - uint32_t /*send_buffer_size*/, - eprosima::fastrtps::rtps::LocatorsIterator* /*destination_locators_begin*/, - eprosima::fastrtps::rtps::LocatorsIterator* /*destination_locators_end*/, - const std::chrono::steady_clock::time_point& /*timeout*/) override + eprosima::fastrtps::rtps::SenderResource* low_sender_resource, + const eprosima::fastrtps::rtps::octet* send_buffer, + uint32_t send_buffer_size, + eprosima::fastrtps::rtps::LocatorsIterator* destination_locators_begin, + eprosima::fastrtps::rtps::LocatorsIterator* destination_locators_end, + const std::chrono::steady_clock::time_point& timeout) override { - return true; + return low_sender_resource->send(send_buffer, send_buffer_size, destination_locators_begin, + destination_locators_end, timeout); } void receive( - TransportReceiverInterface* /*next_receiver*/, - const eprosima::fastrtps::rtps::octet* /*receive_buffer*/, - uint32_t /*receive_buffer_size*/, - const eprosima::fastrtps::rtps::Locator_t& /*local_locator*/, - const eprosima::fastrtps::rtps::Locator_t& /*remote_locator*/) override + TransportReceiverInterface* next_receiver, + const eprosima::fastrtps::rtps::octet* receive_buffer, + uint32_t receive_buffer_size, + const eprosima::fastrtps::rtps::Locator_t& local_locator, + const eprosima::fastrtps::rtps::Locator_t& remote_locator) override { + next_receiver->OnDataReceived(receive_buffer, receive_buffer_size, local_locator, remote_locator); } bool OpenInputChannel( @@ -92,6 +103,15 @@ class DatagramInjectionTransport : public ChainingTransport return ret_val; } + bool OpenOutputChannel( + SendResourceList& send_resource_list, + const Locator& loc) override + { + bool ret_val = ChainingTransport::OpenOutputChannel(send_resource_list, loc); + parent_->update_send_resource_list(send_resource_list); + return ret_val; + } + private: DatagramInjectionTransportDescriptor* parent_ = nullptr; diff --git a/test/mock/rtps/RTPSParticipantImpl/rtps/participant/RTPSParticipantImpl.h b/test/mock/rtps/RTPSParticipantImpl/rtps/participant/RTPSParticipantImpl.h index 66749f7e5d8..2812fbabc54 100644 --- a/test/mock/rtps/RTPSParticipantImpl/rtps/participant/RTPSParticipantImpl.h +++ b/test/mock/rtps/RTPSParticipantImpl/rtps/participant/RTPSParticipantImpl.h @@ -26,6 +26,7 @@ #include #include #include +#include #if HAVE_SECURITY #include @@ -312,6 +313,8 @@ class RTPSParticipantImpl MOCK_METHOD(bool, ignore_participant, (const GuidPrefix_t&)); + MOCK_METHOD(bool, update_removed_participant, (rtps::LocatorList_t&)); + private: MockParticipantListener listener_; diff --git a/test/unittest/rtps/discovery/CMakeLists.txt b/test/unittest/rtps/discovery/CMakeLists.txt index 240e9f0223d..a8cdbaf6156 100644 --- a/test/unittest/rtps/discovery/CMakeLists.txt +++ b/test/unittest/rtps/discovery/CMakeLists.txt @@ -95,8 +95,132 @@ else() target_link_libraries(EdpTests ${PRIVACY} fastcdr) endif() +<<<<<<< HEAD add_gtest(EdpTests SOURCES ${EDPTESTS_SOURCE}) if(ANDROID) set_property(TARGET EdpTests PROPERTY CROSSCOMPILING_EMULATOR "adb;shell;cd;${CMAKE_CURRENT_BINARY_DIR};&&") -endif() \ No newline at end of file +endif() +======= +gtest_discover_tests(EdpTests) + +#PDP TESTS + +set(TCPTransportInterface_SOURCE + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/ChannelResource.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/PortBasedTransportDescriptor.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/messages/RTPSMessageCreator.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/tcp/TCPControlMessage.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPAcceptor.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPAcceptorBasic.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPChannelResource.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPTransportInterface.cpp + ) +if(TLS_FOUND) + set(TCPTransportInterface_SOURCE + ${TCPTransportInterface_SOURCE} + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPAcceptorSecure.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPChannelResourceSecure.cpp + ) +endif() + +set(PDPTESTS_SOURCE PDPTests.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/discovery/participant/PDP.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/publisher/qos/WriterQos.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/qos/ReaderQos.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/topic/TopicDataType.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/data/ParticipantProxyData.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/policy/ParameterList.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/attributes/PropertyPolicy.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/network/NetworkFactory.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPFinder.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/Log.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/utils/StringMatching.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/utils/string_convert.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/attributes/ThreadSettings.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/data/WriterProxyData.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/data/ReaderProxyData.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/flowcontrol/ThroughputControllerDescriptor.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/flowcontrol/FlowControllerConsts.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/common/Time_t.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/OStreamConsumer.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/StdoutConsumer.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/StdoutErrConsumer.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/AnnotationDescriptor.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/DynamicData.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/DynamicDataFactory.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/DynamicType.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/DynamicPubSubType.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/DynamicTypePtr.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/DynamicDataPtr.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/DynamicTypeBuilder.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/DynamicTypeBuilderPtr.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/DynamicTypeBuilderFactory.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/DynamicTypeMember.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/TypeDescriptor.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/MemberDescriptor.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/AnnotationParameterValue.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/TypeIdentifier.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/TypeIdentifierTypes.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/TypeObject.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/TypeObjectFactory.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/TypeObjectHashId.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/TypeNamesGenerator.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/TypesBase.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/BuiltinAnnotationsTypeObject.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/utils/md5.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/utils/SystemInfo.cpp + ${TCPTransportInterface_SOURCE} + ) + +add_executable(PDPTests ${PDPTESTS_SOURCE}) + +target_compile_definitions(PDPTests PRIVATE FASTRTPS_NO_LIB + BOOST_ASIO_STANDALONE + ASIO_STANDALONE + $<$>,$>:__DEBUG> + $<$:__INTERNALDEBUG> # Internal debug activated. + ) +target_include_directories(PDPTests PRIVATE + ${PROJECT_SOURCE_DIR}/test/mock/rtps/BuiltinProtocols + ${PROJECT_SOURCE_DIR}/test/mock/rtps/ExternalLocatorsProcessor + ${PROJECT_SOURCE_DIR}/test/mock/rtps/RTPSParticipantImpl + ${PROJECT_SOURCE_DIR}/test/mock/rtps/Endpoint + ${PROJECT_SOURCE_DIR}/test/mock/rtps/EDP + ${PROJECT_SOURCE_DIR}/test/mock/rtps/WLP + ${PROJECT_SOURCE_DIR}/test/mock/rtps/ReceiverResource + ${PROJECT_SOURCE_DIR}/test/mock/rtps/RTPSReader + ${PROJECT_SOURCE_DIR}/test/mock/rtps/StatefulReader + ${PROJECT_SOURCE_DIR}/test/mock/rtps/StatelessReader + ${PROJECT_SOURCE_DIR}/test/mock/rtps/ReaderHistory + ${PROJECT_SOURCE_DIR}/test/mock/rtps/RTPSWriter + ${PROJECT_SOURCE_DIR}/test/mock/rtps/StatefulWriter + ${PROJECT_SOURCE_DIR}/test/mock/rtps/StatelessWriter + ${PROJECT_SOURCE_DIR}/test/mock/rtps/WriterHistory + ${PROJECT_SOURCE_DIR}/test/mock/rtps/TimedEvent + ${PROJECT_SOURCE_DIR}/test/mock/rtps/ResourceEvent + ${PROJECT_SOURCE_DIR}/test/mock/rtps/SecurityManager + ${PROJECT_SOURCE_DIR}/test/mock/rtps/TypeLookupManager + ${PROJECT_SOURCE_DIR}/include ${PROJECT_BINARY_DIR}/include + ${PROJECT_SOURCE_DIR}/src/cpp + ${Asio_INCLUDE_DIR} + ) + +target_link_libraries(PDPTests foonathan_memory + GTest::gmock + ${CMAKE_DL_LIBS} + $<$:OpenSSL::SSL$OpenSSL::Crypto>) +if(QNX) + target_link_libraries(PDPTests socket) +endif() +if(MSVC OR MSVC_IDE) + target_link_libraries(PDPTests ${PRIVACY} fastcdr iphlpapi Shlwapi ws2_32) +else() + target_link_libraries(PDPTests ${PRIVACY} fastcdr) +endif() + +gtest_discover_tests(PDPTests) +>>>>>>> fe116500c (TCPSendResources cleanup (#4300)) diff --git a/test/unittest/rtps/security/CMakeLists.txt b/test/unittest/rtps/security/CMakeLists.txt index a1a7a347fa6..e8341a54332 100644 --- a/test/unittest/rtps/security/CMakeLists.txt +++ b/test/unittest/rtps/security/CMakeLists.txt @@ -40,6 +40,7 @@ set(SOURCES_SECURITY_TEST_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/rtps/security/exceptions/SecurityException.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/SystemInfo.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/TimedConditionVariable.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp ${PROJECT_SOURCE_DIR}/src/cpp/security/cryptography/AESGCMGMAC_Types.cpp ${PROJECT_SOURCE_DIR}/test/mock/rtps/SecurityPluginFactory/rtps/security/SecurityPluginFactory.cpp ) diff --git a/test/unittest/statistics/rtps/CMakeLists.txt b/test/unittest/statistics/rtps/CMakeLists.txt index 7ea0f1a216b..cf4e0c8bf11 100644 --- a/test/unittest/statistics/rtps/CMakeLists.txt +++ b/test/unittest/statistics/rtps/CMakeLists.txt @@ -38,10 +38,145 @@ target_include_directories(RTPSStatisticsTests PRIVATE ) target_link_libraries(RTPSStatisticsTests fastrtps fastcdr GTest::gtest GTest::gmock) +<<<<<<< HEAD add_gtest(RTPSStatisticsTests SOURCES ${STATISTICS_RTPS_TESTS_SOURCE}) +======= +gtest_discover_tests(RTPSStatisticsTests) + +set(TCPTransportInterface_SOURCE + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/ChannelResource.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/PortBasedTransportDescriptor.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/messages/RTPSMessageCreator.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/tcp/TCPControlMessage.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPAcceptor.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPAcceptorBasic.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPChannelResource.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPTransportInterface.cpp + ) +if(TLS_FOUND) + set(TCPTransportInterface_SOURCE + ${TCPTransportInterface_SOURCE} + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPAcceptorSecure.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPChannelResourceSecure.cpp + ) +endif() + +set(STATISTICS_RTPS_MONITORSERVICETESTS_SOURCE + MonitorServiceTests.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/publisher/qos/WriterQos.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/qos/ReaderQos.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/topic/TopicDataType.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/policy/ParameterList.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/attributes/PropertyPolicy.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/network/NetworkFactory.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPFinder.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/Log.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/utils/string_convert.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/attributes/ThreadSettings.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/data/WriterProxyData.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/flowcontrol/ThroughputControllerDescriptor.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/flowcontrol/FlowControllerConsts.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/common/Time_t.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/OStreamConsumer.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/StdoutConsumer.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/StdoutErrConsumer.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/AnnotationDescriptor.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/DynamicData.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/DynamicDataFactory.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/DynamicType.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/DynamicPubSubType.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/DynamicTypePtr.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/DynamicDataPtr.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/DynamicTypeBuilder.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/DynamicTypeBuilderPtr.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/DynamicTypeBuilderFactory.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/DynamicTypeMember.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/TypeDescriptor.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/MemberDescriptor.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/AnnotationParameterValue.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/TypeIdentifier.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/TypeIdentifierTypes.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/TypeObject.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/TypeObjectFactory.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/TypeObjectHashId.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/TypeNamesGenerator.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/TypesBase.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/dynamic-types/BuiltinAnnotationsTypeObject.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/utils/md5.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/utils/SystemInfo.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/common/Time_t.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/Log.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/OStreamConsumer.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/StdoutConsumer.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/StdoutErrConsumer.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/history/TopicPayloadPool.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/history/TopicPayloadPoolRegistry.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/resources/ResourceEvent.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/resources/TimedEventImpl.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/resources/TimedEvent.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/writer/LocatorSelectorSender.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/statistics/rtps/monitor-service/MonitorService.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/statistics/rtps/monitor-service/MonitorServiceListener.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/statistics/types/monitorservice_typesPubSubTypes.cxx + ${PROJECT_SOURCE_DIR}/src/cpp/statistics/types/monitorservice_types.cxx + ${PROJECT_SOURCE_DIR}/src/cpp/statistics/types/monitorservice_typesv1.cxx + ${PROJECT_SOURCE_DIR}/src/cpp/statistics/types/types.cxx + ${PROJECT_SOURCE_DIR}/src/cpp/statistics/types/typesv1.cxx + ${TCPTransportInterface_SOURCE} + ) + +add_executable(MonitorServiceTests ${STATISTICS_RTPS_MONITORSERVICETESTS_SOURCE}) + +target_compile_definitions(MonitorServiceTests PRIVATE FASTRTPS_NO_LIB + BOOST_ASIO_STANDALONE + ASIO_STANDALONE + $<$>,$>:__DEBUG> + $<$:__INTERNAL_DEBUG> # Internal debug activated. + ) + +target_include_directories(MonitorServiceTests PRIVATE + mock/StatisticsBase + ${PROJECT_SOURCE_DIR}/test/mock/rtps/BuiltinProtocols + ${PROJECT_SOURCE_DIR}/test/mock/rtps/ExternalLocatorsProcessor + ${PROJECT_SOURCE_DIR}/test/mock/rtps/RTPSParticipantImpl + ${PROJECT_SOURCE_DIR}/test/mock/rtps/Endpoint + ${PROJECT_SOURCE_DIR}/test/mock/rtps/EDP + ${PROJECT_SOURCE_DIR}/test/mock/rtps/WLP + ${PROJECT_SOURCE_DIR}/test/mock/rtps/ReceiverResource + ${PROJECT_SOURCE_DIR}/test/mock/rtps/RTPSReader + ${PROJECT_SOURCE_DIR}/test/mock/rtps/StatefulReader + ${PROJECT_SOURCE_DIR}/test/mock/rtps/StatelessReader + ${PROJECT_SOURCE_DIR}/test/mock/rtps/ReaderHistory + ${PROJECT_SOURCE_DIR}/test/mock/rtps/RTPSWriter + ${PROJECT_SOURCE_DIR}/test/mock/rtps/StatefulWriter + ${PROJECT_SOURCE_DIR}/test/mock/rtps/StatelessWriter + ${PROJECT_SOURCE_DIR}/test/mock/rtps/WriterHistory + ${PROJECT_SOURCE_DIR}/test/mock/rtps/SecurityManager + ${PROJECT_SOURCE_DIR}/test/mock/rtps/TypeLookupManager + ${PROJECT_SOURCE_DIR}/include + ${PROJECT_BINARY_DIR}/include + ${PROJECT_SOURCE_DIR}/src/cpp + ${Asio_INCLUDE_DIR} + ) + +target_link_libraries(MonitorServiceTests + fastcdr + GTest::gtest + GTest::gmock + $<$:OpenSSL::SSL$OpenSSL::Crypto>) + +gtest_discover_tests(MonitorServiceTests) + +>>>>>>> fe116500c (TCPSendResources cleanup (#4300)) if(QNX) target_link_libraries(RTPSStatisticsTests socket) endif() +<<<<<<< HEAD if(ANDROID) set_property(TARGET RTPSStatisticsTests PROPERTY CROSSCOMPILING_EMULATOR "adb;shell;cd;${CMAKE_CURRENT_BINARY_DIR};&&") -endif() \ No newline at end of file +endif() +======= +>>>>>>> fe116500c (TCPSendResources cleanup (#4300)) diff --git a/test/unittest/transport/CMakeLists.txt b/test/unittest/transport/CMakeLists.txt index 0facbdf1dac..5da2050ab53 100644 --- a/test/unittest/transport/CMakeLists.txt +++ b/test/unittest/transport/CMakeLists.txt @@ -46,6 +46,24 @@ if(TLS_FOUND) # ${CMAKE_CURRENT_BINARY_DIR}/permissions_helloworld.smime COPYONLY) endif() +set(TCPTransportInterface_SOURCE + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/messages/RTPSMessageCreator.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/tcp/TCPControlMessage.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPAcceptor.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPAcceptorBasic.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPChannelResource.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPTransportInterface.cpp + ) +if(TLS_FOUND) + set(TCPTransportInterface_SOURCE + ${TCPTransportInterface_SOURCE} + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPAcceptorSecure.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPChannelResourceSecure.cpp + ) +endif() + set(UDPV4TESTS_SOURCE UDPv4Tests.cpp mock/MockReceiverResource.cpp @@ -57,10 +75,15 @@ set(UDPV4TESTS_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/rtps/attributes/PropertyPolicy.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/flowcontrol/ThroughputControllerDescriptor.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPv4Transport.cpp +<<<<<<< HEAD ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPTransportInterface.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/ChannelResource.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPChannelResource.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/network/NetworkFactory.cpp +======= + ${TCPTransportInterface_SOURCE} + ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPFinder.cpp +>>>>>>> fe116500c (TCPSendResources cleanup (#4300)) ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/SystemInfo.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/common/Time_t.cpp @@ -82,6 +105,7 @@ set(UDPV6TESTS_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPTransportInterface.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPv4Transport.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPv6Transport.cpp + ${TCPTransportInterface_SOURCE} ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPFinder.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/SystemInfo.cpp @@ -111,11 +135,14 @@ set(TCPV4TESTS_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPTransportInterface.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPv4Transport.cpp +<<<<<<< HEAD ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/tcp/TCPControlMessage.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPChannelResource.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPTransportInterface.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPv4Transport.cpp +======= +>>>>>>> fe116500c (TCPSendResources cleanup (#4300)) ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPFinder.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/md5.cpp @@ -155,11 +182,14 @@ set(TCPV6TESTS_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPTransportInterface.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPv6Transport.cpp +<<<<<<< HEAD ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/tcp/TCPControlMessage.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPChannelResource.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPTransportInterface.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPv4Transport.cpp +======= +>>>>>>> fe116500c (TCPSendResources cleanup (#4300)) ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPFinder.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/md5.cpp @@ -176,6 +206,7 @@ if(TLS_FOUND) ) endif() +<<<<<<< HEAD set(TEST_UDPV4TESTS_SOURCE test_UDPv4Tests.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/messages/RTPSMessageCreator.cpp @@ -200,6 +231,8 @@ set(TEST_UDPV4TESTS_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/rtps/common/Time_t.cpp ) +======= +>>>>>>> fe116500c (TCPSendResources cleanup (#4300)) set(SHAREDMEMTESTS_SOURCE SharedMemTests.cpp mock/MockReceiverResource.cpp @@ -213,10 +246,14 @@ set(SHAREDMEMTESTS_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/rtps/network/NetworkFactory.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/shared_mem/SharedMemTransportDescriptor.cpp +<<<<<<< HEAD ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPChannelResource.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPTransportInterface.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPv4Transport.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/ChannelResource.cpp +======= + ${TCPTransportInterface_SOURCE} +>>>>>>> fe116500c (TCPSendResources cleanup (#4300)) ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPFinder.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/md5.cpp @@ -260,7 +297,15 @@ target_include_directories(UDPv4Tests PRIVATE ${PROJECT_SOURCE_DIR}/src/cpp $<$:${ANDROID_IFADDRS_INCLUDE_DIR}> ) +<<<<<<< HEAD target_link_libraries(UDPv4Tests GTest::gtest ${MOCKS}) +======= +target_link_libraries(UDPv4Tests + fastcdr + GTest::gtest + ${MOCKS} + $<$:OpenSSL::SSL$OpenSSL::Crypto>) +>>>>>>> fe116500c (TCPSendResources cleanup (#4300)) if(QNX) target_link_libraries(UDPv4Tests socket) endif() @@ -328,6 +373,7 @@ if(NOT DISABLE_UDPV6_TESTS) set(TRANSPORT_XFAIL_LIST ${TRANSPORT_XFAIL_LIST} XFAIL_TCP6) endif() +<<<<<<< HEAD add_executable(test_UDPv4Tests ${TEST_UDPV4TESTS_SOURCE}) target_compile_definitions(test_UDPv4Tests PRIVATE BOOST_ASIO_STANDALONE @@ -357,6 +403,83 @@ endif() add_gtest(test_UDPv4Tests SOURCES ${TEST_UDPV4TESTS_SOURCE}) set(TRANSPORT_XFAIL_LIST ${TRANSPORT_XFAIL_LIST} XFAIL_TEST_UDP4) +======= +########################## +# IPv6 tests +########################## +option(DISABLE_UDPV6_TESTS "Disable UDPv6 tests because fails in some systems" OFF) + +if(NOT DISABLE_UDPV6_TESTS) + ########################## + # UDPv6 tests + ########################## + add_executable(UDPv6Tests ${UDPV6TESTS_SOURCE}) + target_compile_definitions(UDPv6Tests PRIVATE + BOOST_ASIO_STANDALONE + ASIO_STANDALONE + $<$>,$>:__DEBUG> + $<$:__INTERNALDEBUG> # Internal debug activated. + ) + target_include_directories(UDPv6Tests PRIVATE + ${Asio_INCLUDE_DIR} + ${PROJECT_SOURCE_DIR}/test/mock/rtps/MessageReceiver + ${PROJECT_SOURCE_DIR}/test/mock/rtps/ReceiverResource + ${PROJECT_SOURCE_DIR}/include ${PROJECT_BINARY_DIR}/include + ${PROJECT_SOURCE_DIR}/src/cpp + ) + target_link_libraries(UDPv6Tests + fastcdr + GTest::gtest + ${MOCKS} + $<$:OpenSSL::SSL$OpenSSL::Crypto>) + if(QNX) + target_link_libraries(UDPv6Tests socket) + endif() + if(MSVC OR MSVC_IDE) + target_link_libraries(UDPv6Tests ${PRIVACY} iphlpapi Shlwapi ) + endif() + gtest_discover_tests(UDPv6Tests) + + ########################## + # TCPv6 tests + ########################## + add_executable(TCPv6Tests ${TCPV6TESTS_SOURCE}) + target_compile_definitions(TCPv6Tests PRIVATE + BOOST_ASIO_STANDALONE + ASIO_STANDALONE + $<$>,$>:__DEBUG> + $<$:__INTERNALDEBUG> # Internal debug activated. + ) + target_include_directories(TCPv6Tests PRIVATE + ${Asio_INCLUDE_DIR} + ${PROJECT_SOURCE_DIR}/test/mock/rtps/ParticipantProxyData + ${PROJECT_SOURCE_DIR}/test/mock/dds/QosPolicies + ${PROJECT_SOURCE_DIR}/test/mock/rtps/MessageReceiver + ${PROJECT_SOURCE_DIR}/test/mock/rtps/ReceiverResource + ${PROJECT_SOURCE_DIR}/include ${PROJECT_BINARY_DIR}/include + ${PROJECT_SOURCE_DIR}/src/cpp + $<$:${ANDROID_IFADDRS_INCLUDE_DIR}> + ) + target_link_libraries(TCPv6Tests + fastcdr + GTest::gtest + ${MOCKS} + $<$:OpenSSL::SSL$OpenSSL::Crypto>) + if(QNX) + target_link_libraries(TCPv6Tests socket) + endif() + if(MSVC OR MSVC_IDE) + target_link_libraries(TCPv6Tests ${PRIVACY} fastcdr iphlpapi Shlwapi) + else() + target_link_libraries(TCPv6Tests ${PRIVACY} fastcdr) + endif() + gtest_discover_tests(TCPv6Tests) +endif() + +########################## +# TCPv4 tests +########################## +>>>>>>> fe116500c (TCPSendResources cleanup (#4300)) add_executable(TCPv4Tests ${TCPV4TESTS_SOURCE}) target_compile_definitions(TCPv4Tests PRIVATE BOOST_ASIO_STANDALONE diff --git a/test/unittest/transport/TCPv4Tests.cpp b/test/unittest/transport/TCPv4Tests.cpp index c06ca6bbe2e..feb63623bc3 100644 --- a/test/unittest/transport/TCPv4Tests.cpp +++ b/test/unittest/transport/TCPv4Tests.cpp @@ -21,7 +21,11 @@ #include "mock/MockTCPChannelResource.h" #include "mock/MockTCPv4Transport.h" #include +<<<<<<< HEAD #include +======= +#include +>>>>>>> fe116500c (TCPSendResources cleanup (#4300)) #include #include #include @@ -964,96 +968,8 @@ TEST_F(TCPv4Tests, send_and_receive_between_secure_clients_1) sem.wait(); } } -/* - TEST_F(TCPv4Tests, send_and_receive_between_secure_clients_2) - { - eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Kind::Info); - using TLSVerifyMode = TCPTransportDescriptor::TLSConfig::TLSVerifyMode; - using TLSOptions = TCPTransportDescriptor::TLSConfig::TLSOptions; - using TLSHSRole = TCPTransportDescriptor::TLSConfig::TLSHandShakeRole; - - TCPv4TransportDescriptor recvDescriptor; - recvDescriptor.add_listener_port(g_default_port + 1); - recvDescriptor.apply_security = true; - recvDescriptor.tls_config.handshake_role = TLSHSRole::CLIENT; - //recvDescriptor.tls_config.password = "testkey"; - //recvDescriptor.tls_config.password = "test"; - //recvDescriptor.tls_config.cert_chain_file = "mainpubcert.pem"; - //recvDescriptor.tls_config.private_key_file = "mainpubkey.pem"; - recvDescriptor.tls_config.verify_file = "maincacert.pem"; // This CA only know about mainsub certificates - //recvDescriptor.tls_config.verify_file = "ca.pem"; - // Server doesn't accept clients without certs - recvDescriptor.tls_config.verify_mode = TLSVerifyMode::VERIFY_FAIL_IF_NO_PEER_CERT | TLSVerifyMode::VERIFY_PEER; - recvDescriptor.tls_config.add_option(TLSOptions::DEFAULT_WORKAROUNDS); - TCPv4Transport receiveTransportUnderTest(recvDescriptor); - receiveTransportUnderTest.init(); - - Locator_t inputLocator; - inputLocator.kind = LOCATOR_KIND_TCPv4; - inputLocator.port = g_default_port + 1; - IPLocator::setIPv4(inputLocator, 127, 0, 0, 1); - IPLocator::setLogicalPort(inputLocator, 7410); - - Locator_t outputLocator; - outputLocator.kind = LOCATOR_KIND_TCPv4; - IPLocator::setIPv4(outputLocator, 127, 0, 0, 1); - outputLocator.port = g_default_port + 1; - IPLocator::setLogicalPort(outputLocator, 7410); - - TCPv4TransportDescriptor sendDescriptor2; - sendDescriptor2.apply_security = true; - sendDescriptor2.tls_config.handshake_role = TLSHSRole::SERVER; - sendDescriptor2.tls_config.password = "test"; - sendDescriptor2.tls_config.cert_chain_file = "server.pem"; - sendDescriptor2.tls_config.private_key_file = "server.pem"; - //sendDescriptor2.tls_config.password = "testkey"; - //sendDescriptor2.tls_config.cert_chain_file = "mainsubcert.pem"; - //sendDescriptor2.tls_config.private_key_file = "mainsubkey.pem"; - sendDescriptor2.tls_config.verify_mode = TLSVerifyMode::VERIFY_PEER; - sendDescriptor2.tls_config.add_option(TLSOptions::DEFAULT_WORKAROUNDS); - TCPv4Transport sendTransportUnderTest2(sendDescriptor2); - sendTransportUnderTest2.init(); - - { - MockReceiverResource receiver(receiveTransportUnderTest, inputLocator); - MockMessageReceiver *msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); - ASSERT_TRUE(receiveTransportUnderTest.IsInputChannelOpen(inputLocator)); - - ASSERT_TRUE(sendTransportUnderTest2.OpenOutputChannel(outputLocator)); - octet message[5] = { 'H','e','l','l','o' }; - - Semaphore sem; - std::function recCallback = [&]() - { - EXPECT_FALSE(true); // Should not receive - sem.post(); - }; - - msg_recv->setCallback(recCallback); - - auto sendThreadFunction = [&]() - { - bool sent = sendTransportUnderTest2.send(message, 5, outputLocator, inputLocator); - int count = 0; - while (!sent && count < 30) - { - sent = sendTransportUnderTest2.send(message, 5, outputLocator, inputLocator); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - ++count; - } - EXPECT_FALSE(sent); - sem.post(); - }; - - senderThread.reset(new std::thread(sendThreadFunction)); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - senderThread->join(); - sem.wait(); - } - ASSERT_TRUE(sendTransportUnderTest2.CloseOutputChannel(outputLocator)); - } - */ +// TODO(eduponz): TEST_F(TCPv4Tests, send_and_receive_between_secure_clients_2) TEST_F(TCPv4Tests, send_and_receive_between_secure_ports_untrusted_server) { @@ -1900,6 +1816,213 @@ TEST_F(TCPv4Tests, non_blocking_send) } #endif // ifndef _WIN32 +<<<<<<< HEAD +======= +// This test verifies that a server can reconnect to a client after the client has once failed in a +// openLogicalPort request +TEST_F(TCPv4Tests, reconnect_after_open_port_failure) +{ + eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Warning); + uint16_t port = g_default_port; + // Create a TCP Server transport + TCPv4TransportDescriptor serverDescriptor; + serverDescriptor.add_listener_port(port); + std::unique_ptr serverTransportUnderTest(new TCPv4Transport(serverDescriptor)); + serverTransportUnderTest->init(); + + // Create a TCP Client transport + TCPv4TransportDescriptor clientDescriptor; + std::unique_ptr clientTransportUnderTest(new MockTCPv4Transport(clientDescriptor)); + clientTransportUnderTest->init(); + + // Add initial peer to the client + Locator_t initialPeerLocator; + IPLocator::createLocator(LOCATOR_KIND_TCPv4, "127.0.0.1", port, initialPeerLocator); + IPLocator::setLogicalPort(initialPeerLocator, 7410); + + // Connect client to server + EXPECT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF)); + SendResourceList client_resource_list; + ASSERT_TRUE(clientTransportUnderTest->OpenOutputChannel(client_resource_list, initialPeerLocator)); + ASSERT_FALSE(client_resource_list.empty()); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + auto channel = clientTransportUnderTest->get_channel_resources().begin()->second; + + // Logical port is opened + ASSERT_TRUE(channel->is_logical_port_opened(7410)); + + // Disconnect server + EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator)); + serverTransportUnderTest.reset(); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + // Client should have passed logical port to pending list + ASSERT_FALSE(channel->is_logical_port_opened(7410)); + ASSERT_TRUE(channel->is_logical_port_added(7410)); + + // Now try normal reconnection + serverTransportUnderTest.reset(new TCPv4Transport(serverDescriptor)); + serverTransportUnderTest->init(); + ASSERT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF)); + clientTransportUnderTest->send(nullptr, 0, channel->locator(), initialPeerLocator); // connect() + + // Logical port is opened (moved from pending list) + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + ASSERT_TRUE(channel->is_logical_port_opened(7410)); + + // Disconnect server + EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator)); + serverTransportUnderTest.reset(); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + // Client should have passed logical port to pending list + ASSERT_FALSE(channel->is_logical_port_opened(7410)); + ASSERT_TRUE(channel->is_logical_port_added(7410)); + + // Now try reconnect the server and close server's input channel before client's open logical + // port request, and then delete server and reconnect + serverTransportUnderTest.reset(new TCPv4Transport(serverDescriptor)); + serverTransportUnderTest->init(); + ASSERT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF)); + EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator)); + clientTransportUnderTest->send(nullptr, 0, channel->locator(), initialPeerLocator); // connect() + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + serverTransportUnderTest.reset(); + ASSERT_FALSE(channel->is_logical_port_opened(7410)); + ASSERT_TRUE(channel->is_logical_port_added(7410)); + + // Now try normal reconnection + serverTransportUnderTest.reset(new TCPv4Transport(serverDescriptor)); + serverTransportUnderTest->init(); + ASSERT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF)); + clientTransportUnderTest->send(nullptr, 0, channel->locator(), initialPeerLocator); // connect() + + // Logical port is opened (moved from pending list) + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + ASSERT_TRUE(channel->is_logical_port_opened(7410)); + + // Clear test + EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator)); + client_resource_list.clear(); +} + +// This test verifies that OpenOutputChannel correctly handles a remote locator with +// same physical port as the local listening port. +TEST_F(TCPv4Tests, opening_output_channel_with_same_locator_as_local_listening_port) +{ + TCPv4Transport transportUnderTest(descriptor); + transportUnderTest.init(); + + // Two locators with the same port as the local listening port, but different addresses + Locator_t lowerOutputChannelLocator; + lowerOutputChannelLocator.kind = LOCATOR_KIND_TCPv4; + lowerOutputChannelLocator.port = g_default_port; + IPLocator::setLogicalPort(lowerOutputChannelLocator, g_default_port); + Locator_t higherOutputChannelLocator = lowerOutputChannelLocator; + IPLocator::setIPv4(lowerOutputChannelLocator, 1, 1, 1, 1); + IPLocator::setIPv4(higherOutputChannelLocator, 255, 255, 255, 255); + + SendResourceList send_resource_list; + + // If the remote address is lower than the local one, no channel must be created but it must be added to the send_resource_list + ASSERT_TRUE(transportUnderTest.OpenOutputChannel(send_resource_list, lowerOutputChannelLocator)); + ASSERT_FALSE(transportUnderTest.is_output_channel_open_for(lowerOutputChannelLocator)); + ASSERT_EQ(send_resource_list.size(), 1u); + // If the remote address is higher than the local one, a CONNECT channel must be created and added to the send_resource_list + ASSERT_TRUE(transportUnderTest.OpenOutputChannel(send_resource_list, higherOutputChannelLocator)); + ASSERT_TRUE(transportUnderTest.is_output_channel_open_for(higherOutputChannelLocator)); + ASSERT_EQ(send_resource_list.size(), 2u); +} + +// This test verifies that the send resource list is correctly cleaned both in LAN and WAN cases. +TEST_F(TCPv4Tests, remove_from_send_resource_list) +{ + // Three scenarios are considered: LAN, WAN1 and WAN2 + // LAN: The remote locator is in the same LAN as the local locator + // WAN1: The remote locator is in a different LAN than the local locator, and initial peers have LAN and WAN remote addresses. + // WAN2: The remote locator is in a different LAN than the local locator, and initial peers have WANtoLANLocator ([0][WAN] address). + std::vector test_cases = { + "LAN", + "WAN1", + "WAN2" + }; + + for (const std::string& test_case : test_cases) + { + TCPv4TransportDescriptor send_descriptor; + + MockTCPv4Transport send_transport_under_test(send_descriptor); + send_transport_under_test.init(); + + Locator_t discovery_locator; + IPLocator::createLocator(LOCATOR_KIND_TCPv4, "127.0.0.1", g_default_port, discovery_locator); + IPLocator::setLogicalPort(discovery_locator, 7410); + + Locator_t initial_peer_locator; + IPLocator::createLocator(LOCATOR_KIND_TCPv4, "127.0.0.1", g_default_port + 1, initial_peer_locator); + IPLocator::setLogicalPort(initial_peer_locator, 7410); + LocatorList_t initial_peer_list; + + if (test_case == "WAN1" || test_case == "WAN2") + { + IPLocator::setWan(discovery_locator, g_test_wan_address); + IPLocator::setWan(initial_peer_locator, g_test_wan_address); + + if (test_case == "WAN2") + { + initial_peer_locator = IPLocator::WanToLanLocator(initial_peer_locator); + } + } + + initial_peer_list.push_back(initial_peer_locator); + + SendResourceList send_resource_list; + ASSERT_TRUE(send_transport_under_test.OpenOutputChannel(send_resource_list, discovery_locator)); + ASSERT_TRUE(send_transport_under_test.OpenOutputChannel(send_resource_list, initial_peer_locator)); + ASSERT_EQ(send_resource_list.size(), 2u); + + // Using a wrong locator should not remove the channel resource + LocatorList_t wrong_remote_participant_physical_locators; + Locator_t wrong_output_locator; + IPLocator::createLocator(LOCATOR_KIND_TCPv4, "127.0.0.1", g_default_port + 2, wrong_output_locator); + IPLocator::setLogicalPort(wrong_output_locator, 7410); + + if (test_case == "WAN1" || test_case == "WAN2") + { + IPLocator::setWan(wrong_output_locator, g_test_wan_address); + } + wrong_remote_participant_physical_locators.push_back(wrong_output_locator); + send_transport_under_test.CloseOutputChannel( + send_resource_list, + wrong_remote_participant_physical_locators, + initial_peer_list); + ASSERT_EQ(send_resource_list.size(), 2); + + // Using the correct locator should remove the channel resource + LocatorList_t remote_participant_physical_locators; + remote_participant_physical_locators.push_back(discovery_locator); + send_transport_under_test.CloseOutputChannel( + send_resource_list, + remote_participant_physical_locators, + initial_peer_list); + ASSERT_EQ(send_resource_list.size(), 1); + + // Using the initial peer locator should not remove the channel resource + remote_participant_physical_locators.clear(); + if (test_case == "WAN2") + { + // In WAN2, the remote_participant_physical_locators are the real Locators, not the WANtoLANLocators. + IPLocator::setIPv4(initial_peer_locator, "127.0.0.1"); + IPLocator::setWan(initial_peer_locator, g_test_wan_address); + } + remote_participant_physical_locators.push_back(initial_peer_locator); + send_transport_under_test.CloseOutputChannel( + send_resource_list, + remote_participant_physical_locators, + initial_peer_list); + ASSERT_EQ(send_resource_list.size(), 1); + } +} + +>>>>>>> fe116500c (TCPSendResources cleanup (#4300)) void TCPv4Tests::HELPER_SetDescriptorDefaults() { descriptor.add_listener_port(g_default_port); diff --git a/test/unittest/transport/TCPv6Tests.cpp b/test/unittest/transport/TCPv6Tests.cpp index d0c77c98277..d7729ca6cf1 100644 --- a/test/unittest/transport/TCPv6Tests.cpp +++ b/test/unittest/transport/TCPv6Tests.cpp @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -135,9 +136,9 @@ TEST_F(TCPv6Tests, opening_and_closing_output_channel) ASSERT_FALSE (transportUnderTest.IsOutputChannelOpen(genericOutputChannelLocator)); ASSERT_TRUE (transportUnderTest.OpenOutputChannel(genericOutputChannelLocator)); ASSERT_TRUE (transportUnderTest.IsOutputChannelOpen(genericOutputChannelLocator)); - ASSERT_TRUE (transportUnderTest.CloseOutputChannel(genericOutputChannelLocator)); + ASSERT_TRUE (transportUnderTest.SenderResourceHasBeenClosed(genericOutputChannelLocator)); ASSERT_FALSE (transportUnderTest.IsOutputChannelOpen(genericOutputChannelLocator)); - ASSERT_FALSE (transportUnderTest.CloseOutputChannel(genericOutputChannelLocator)); + ASSERT_FALSE (transportUnderTest.SenderResourceHasBeenClosed(genericOutputChannelLocator)); */ } @@ -328,186 +329,184 @@ TEST_F(TCPv6Tests, non_blocking_send) } #endif // ifndef _WIN32 +<<<<<<< HEAD /* TEST_F(TCPv6Tests, send_and_receive_between_both_secure_ports) { eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Kind::Info); +======= +// This test verifies that a server can reconnect to a client after the client has once failed in a +// openLogicalPort request +TEST_F(TCPv6Tests, reconnect_after_open_port_failure) +{ + eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Warning); + uint16_t port = g_default_port; + // Create a TCP Server transport + TCPv6TransportDescriptor serverDescriptor; + serverDescriptor.add_listener_port(port); + std::unique_ptr serverTransportUnderTest(new TCPv6Transport(serverDescriptor)); + serverTransportUnderTest->init(); + + // Create a TCP Client transport + TCPv6TransportDescriptor clientDescriptor; + std::unique_ptr clientTransportUnderTest(new MockTCPv6Transport(clientDescriptor)); + clientTransportUnderTest->init(); + + // Add initial peer to the client + Locator_t initialPeerLocator; + IPLocator::createLocator(LOCATOR_KIND_TCPv6, "::1", port, initialPeerLocator); + IPLocator::setLogicalPort(initialPeerLocator, 7410); + + // Connect client to server + EXPECT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF)); + SendResourceList client_resource_list; + ASSERT_TRUE(clientTransportUnderTest->OpenOutputChannel(client_resource_list, initialPeerLocator)); + ASSERT_FALSE(client_resource_list.empty()); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + auto channel = clientTransportUnderTest->get_channel_resources().begin()->second; + + // Logical port is opened + ASSERT_TRUE(channel->is_logical_port_opened(7410)); + + // Disconnect server + EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator)); + serverTransportUnderTest.reset(); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + // Client should have passed logical port to pending list + ASSERT_FALSE(channel->is_logical_port_opened(7410)); + ASSERT_TRUE(channel->is_logical_port_added(7410)); + + // Now try normal reconnection + serverTransportUnderTest.reset(new TCPv6Transport(serverDescriptor)); + serverTransportUnderTest->init(); + ASSERT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF)); + clientTransportUnderTest->send(nullptr, 0, channel->locator(), initialPeerLocator); // connect() + + // Logical port is opened (moved from pending list) + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + ASSERT_TRUE(channel->is_logical_port_opened(7410)); + + // Disconnect server + EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator)); + serverTransportUnderTest.reset(); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + // Client should have passed logical port to pending list + ASSERT_FALSE(channel->is_logical_port_opened(7410)); + ASSERT_TRUE(channel->is_logical_port_added(7410)); + + // Now try reconnect the server and close server's input channel before client's open logical + // port request, and then delete server and reconnect + serverTransportUnderTest.reset(new TCPv6Transport(serverDescriptor)); + serverTransportUnderTest->init(); + ASSERT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF)); + EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator)); + clientTransportUnderTest->send(nullptr, 0, channel->locator(), initialPeerLocator); // connect() + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + serverTransportUnderTest.reset(); + ASSERT_FALSE(channel->is_logical_port_opened(7410)); + ASSERT_TRUE(channel->is_logical_port_added(7410)); + + // Now try normal reconnection + serverTransportUnderTest.reset(new TCPv6Transport(serverDescriptor)); + serverTransportUnderTest->init(); + ASSERT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF)); + clientTransportUnderTest->send(nullptr, 0, channel->locator(), initialPeerLocator); // connect() + + // Logical port is opened (moved from pending list) + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + ASSERT_TRUE(channel->is_logical_port_opened(7410)); + + // Clear test + EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator)); + client_resource_list.clear(); +} - using TLSOptions = TCPTransportDescriptor::TLSConfig::TLSOptions; - using TLSVerifyMode = TCPTransportDescriptor::TLSConfig::TLSVerifyMode; - - TCPv6TransportDescriptor recvDescriptor; - recvDescriptor.add_listener_port(g_default_port); - recvDescriptor.apply_security = true; - recvDescriptor.tls_config.password = "testkey"; - recvDescriptor.tls_config.cert_chain_file = "mainpubcert.pem"; - recvDescriptor.tls_config.private_key_file = "mainpubkey.pem"; - recvDescriptor.tls_config.verify_file = "maincacert.pem"; - // Server doesn't accept clients without certs - recvDescriptor.tls_config.verify_mode = TLSVerifyMode::VERIFY_PEER | TLSVerifyMode::VERIFY_FAIL_IF_NO_PEER_CERT; - recvDescriptor.tls_config.add_option(TLSOptions::DEFAULT_WORKAROUNDS); - recvDescriptor.tls_config.add_option(TLSOptions::SINGLE_DH_USE); - recvDescriptor.tls_config.add_option(TLSOptions::NO_COMPRESSION); - recvDescriptor.tls_config.add_option(TLSOptions::NO_SSLV2); - recvDescriptor.tls_config.add_option(TLSOptions::NO_SSLV3); - TCPv6Transport receiveTransportUnderTest(recvDescriptor); - receiveTransportUnderTest.init(); - - TCPv6TransportDescriptor sendDescriptor; - sendDescriptor.apply_security = true; - sendDescriptor.tls_config.password = "testkey"; - sendDescriptor.tls_config.cert_chain_file = "mainsubcert.pem"; - sendDescriptor.tls_config.private_key_file = "mainsubkey.pem"; - sendDescriptor.tls_config.verify_file = "maincacert.pem"; - sendDescriptor.tls_config.verify_mode = TLSVerifyMode::VERIFY_PEER; - sendDescriptor.tls_config.add_option(TLSOptions::DEFAULT_WORKAROUNDS); - sendDescriptor.tls_config.add_option(TLSOptions::SINGLE_DH_USE); - sendDescriptor.tls_config.add_option(TLSOptions::NO_COMPRESSION); - sendDescriptor.tls_config.add_option(TLSOptions::NO_SSLV2); - sendDescriptor.tls_config.add_option(TLSOptions::NO_SSLV3); - TCPv6Transport sendTransportUnderTest(sendDescriptor); - sendTransportUnderTest.init(); - - Locator_t inputLocator; - inputLocator.kind = LOCATOR_KIND_TCPv6; - inputLocator.port = g_default_port; - IPLocator::setIPv4(inputLocator, "::1"); - IPLocator::setLogicalPort(inputLocator, 7410); - - Locator_t outputLocator; - outputLocator.kind = LOCATOR_KIND_TCPv6; - IPLocator::setIPv4(outputLocator, "::1"); - outputLocator.port = g_default_port; - IPLocator::setLogicalPort(outputLocator, 7410); - - { - MockReceiverResource receiver(receiveTransportUnderTest, inputLocator); - MockMessageReceiver *msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); - ASSERT_TRUE(receiveTransportUnderTest.IsInputChannelOpen(inputLocator)); - - ASSERT_TRUE(sendTransportUnderTest.OpenOutputChannel(outputLocator)); - octet message[5] = { 'H','e','l','l','o' }; - - Semaphore sem; - std::function recCallback = [&]() - { - EXPECT_EQ(memcmp(message, msg_recv->data, 5), 0); - sem.post(); - }; - - msg_recv->setCallback(recCallback); - - auto sendThreadFunction = [&]() - { - bool sent = sendTransportUnderTest.send(message, 5, outputLocator, inputLocator); - while (!sent) - { - sent = sendTransportUnderTest.send(message, 5, outputLocator, inputLocator); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - EXPECT_TRUE(sent); - //EXPECT_TRUE(transportUnderTest.send(message, 5, outputLocator, inputLocator)); - }; - - senderThread.reset(new std::thread(sendThreadFunction)); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - senderThread->join(); - sem.wait(); - } - ASSERT_TRUE(sendTransportUnderTest.CloseOutputChannel(outputLocator)); - } - */ -// TODO SKIP AT THIS MOMENT -/* - TEST_F(TCPv6Tests, send_and_receive_between_ports) - { - descriptor.listening_ports.push_back(g_default_port); - TCPv6Transport transportUnderTest(descriptor); - transportUnderTest.init(); - - Locator_t localLocator; - localLocator.port = g_default_port; - localLocator.kind = LOCATOR_KIND_TCPv6; - IPLocator::setIPv6(localLocator, "::1"); - - Locator_t outputChannelLocator; - outputChannelLocator = g_default_port; - outputChannelLocator.kind = LOCATOR_KIND_TCPv6; - IPLocator::setIPv6(outputChannelLocator, "::1"); - - MockReceiverResource receiver(transportUnderTest, localLocator); - MockMessageReceiver *msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); - - ASSERT_TRUE(transportUnderTest.OpenOutputChannel(outputChannelLocator)); // Includes loopback - ASSERT_TRUE(transportUnderTest.IsInputChannelOpen(localLocator)); - octet message[5] = { 'H','e','l','l','o' }; - - std::this_thread::sleep_for(std::chrono::seconds(5)); - - Semaphore sem; - std::function recCallback = [&]() - { - EXPECT_EQ(memcmp(message,msg_recv->data,5), 0); - sem.post(); - }; - - msg_recv->setCallback(recCallback); - - auto sendThreadFunction = [&]() - { - EXPECT_TRUE(transportUnderTest.send(message, 5, outputChannelLocator, localLocator)); - }; - - senderThread.reset(new std::thread(sendThreadFunction)); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - senderThread->join(); - sem.wait(); - ASSERT_TRUE(transportUnderTest.CloseOutputChannel(outputChannelLocator)); - } - - TEST_F(TCPv6Tests, send_to_loopback) - { +TEST_F(TCPv6Tests, opening_output_channel_with_same_locator_as_local_listening_port) +{ + descriptor.add_listener_port(g_default_port); TCPv6Transport transportUnderTest(descriptor); transportUnderTest.init(); - Locator_t multicastLocator; - multicastLocator.set_port(g_default_port); - multicastLocator.kind = LOCATOR_KIND_TCPv6; - IPLocator::setIPv6(multicastLocator, 0xff31, 0, 0, 0, 0, 0, 0, 0); - - Locator_t outputChannelLocator; - outputChannelLocator.set_port(g_default_port + 1); - outputChannelLocator.kind = LOCATOR_KIND_TCPv6; - IPLocator::setIPv6(outputChannelLocator, 0,0,0,0,0,0,0,1); // Loopback - - MockReceiverResource receiver(transportUnderTest, multicastLocator); - MockMessageReceiver *msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); - - ASSERT_TRUE(transportUnderTest.OpenOutputChannel(outputChannelLocator)); - ASSERT_TRUE(transportUnderTest.IsInputChannelOpen(multicastLocator)); - octet message[5] = { 'H','e','l','l','o' }; + // Two locators with the same port as the local listening port, but different addresses + Locator_t lowerOutputChannelLocator; + lowerOutputChannelLocator.kind = LOCATOR_KIND_TCPv6; + lowerOutputChannelLocator.port = g_default_port; + IPLocator::setLogicalPort(lowerOutputChannelLocator, g_default_port); + Locator_t higherOutputChannelLocator = lowerOutputChannelLocator; + IPLocator::setIPv6(lowerOutputChannelLocator, "::"); + IPLocator::setIPv6(higherOutputChannelLocator, "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"); + + SendResourceList send_resource_list; + + // If the remote address is lower than the local one, no channel must be created but it must be added to the send_resource_list + ASSERT_TRUE(transportUnderTest.OpenOutputChannel(send_resource_list, lowerOutputChannelLocator)); + ASSERT_FALSE(transportUnderTest.is_output_channel_open_for(lowerOutputChannelLocator)); + ASSERT_EQ(send_resource_list.size(), 1); + // If the remote address is higher than the local one, a CONNECT channel must be created and added to the send_resource_list + ASSERT_TRUE(transportUnderTest.OpenOutputChannel(send_resource_list, higherOutputChannelLocator)); + ASSERT_TRUE(transportUnderTest.is_output_channel_open_for(higherOutputChannelLocator)); + ASSERT_EQ(send_resource_list.size(), 2u); +} - Semaphore sem; - std::function recCallback = [&]() - { - EXPECT_EQ(memcmp(message,msg_recv->data,5), 0); - sem.post(); - }; +// This test verifies that the send resource list is correctly cleaned and the channel resource is removed +// from the channel_resources_map. +TEST_F(TCPv6Tests, remove_from_send_resource_list) +{ + TCPv6TransportDescriptor send_descriptor; + MockTCPv6Transport send_transport_under_test(send_descriptor); + send_transport_under_test.init(); +>>>>>>> fe116500c (TCPSendResources cleanup (#4300)) + + Locator_t output_locator_1; + IPLocator::createLocator(LOCATOR_KIND_TCPv6, "::1", g_default_port, output_locator_1); + IPLocator::setLogicalPort(output_locator_1, 7410); + + Locator_t output_locator_2; + IPLocator::createLocator(LOCATOR_KIND_TCPv6, "::1", g_default_port + 1, output_locator_2); + IPLocator::setLogicalPort(output_locator_2, 7410); + + LocatorList_t initial_peer_list; + initial_peer_list.push_back(output_locator_2); + + SendResourceList send_resource_list; + ASSERT_TRUE(send_transport_under_test.OpenOutputChannel(send_resource_list, output_locator_1)); + ASSERT_TRUE(send_transport_under_test.OpenOutputChannel(send_resource_list, output_locator_2)); + ASSERT_EQ(send_resource_list.size(), 2u); + + // Using a wrong locator should not remove the channel resource + LocatorList_t wrong_remote_participant_physical_locators; + Locator_t wrong_output_locator; + IPLocator::createLocator(LOCATOR_KIND_TCPv6, "::1", g_default_port + 2, wrong_output_locator); + IPLocator::setLogicalPort(wrong_output_locator, 7410); + wrong_remote_participant_physical_locators.push_back(wrong_output_locator); + send_transport_under_test.CloseOutputChannel( + send_resource_list, + wrong_remote_participant_physical_locators, + initial_peer_list); + ASSERT_EQ(send_resource_list.size(), 2u); + + // Using the correct locator should remove the channel resource + LocatorList_t remote_participant_physical_locators; + remote_participant_physical_locators.push_back(output_locator_1); + send_transport_under_test.CloseOutputChannel( + send_resource_list, + remote_participant_physical_locators, + initial_peer_list); + ASSERT_EQ(send_resource_list.size(), 1u); + + // Using the initial peer locator should not remove the channel resource + remote_participant_physical_locators.clear(); + remote_participant_physical_locators.push_back(output_locator_2); + send_transport_under_test.CloseOutputChannel( + send_resource_list, + remote_participant_physical_locators, + initial_peer_list); + ASSERT_EQ(send_resource_list.size(), 1u); +} - msg_recv->setCallback(recCallback); +// TODO: TEST_F(TCPv6Tests, send_and_receive_between_both_secure_ports) +// TODO: TEST_F(TCPv6Tests, send_and_receive_between_ports) - auto sendThreadFunction = [&]() - { - EXPECT_TRUE(transportUnderTest.send(message, 5, outputChannelLocator, multicastLocator)); - }; - - senderThread.reset(new std::thread(sendThreadFunction)); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - senderThread->join(); - sem.wait(); - ASSERT_TRUE(transportUnderTest.CloseOutputChannel(outputChannelLocator)); - } - */ #endif // ifndef __APPLE__ void TCPv6Tests::HELPER_SetDescriptorDefaults() diff --git a/test/unittest/transport/test_UDPv4Tests.cpp b/test/unittest/transport/test_UDPv4Tests.cpp deleted file mode 100644 index 512095ac7fd..00000000000 --- a/test/unittest/transport/test_UDPv4Tests.cpp +++ /dev/null @@ -1,270 +0,0 @@ -// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include - -#if defined(_WIN32) -#define GET_PID _getpid -#else -#define GET_PID getpid -#endif // if defined(_WIN32) - -using IPLocator = eprosima::fastrtps::rtps::IPLocator; -using test_UDPv4Transport = eprosima::fastdds::rtps::test_UDPv4Transport; - -static uint16_t g_default_port = 0; - -uint16_t get_port() -{ - uint16_t port = static_cast(GET_PID()); - - if (4000 > port) - { - port += 4000; - } - - return port; -} - -using namespace eprosima::fastrtps; -using namespace eprosima::fastrtps::rtps; - -class test_UDPv4Tests : public ::testing::Test -{ -public: - - test_UDPv4Tests() - { - HELPER_SetDescriptorDefaults(); - } - - ~test_UDPv4Tests() - { - eprosima::fastdds::dds::Log::KillThread(); - } - - void HELPER_SetDescriptorDefaults(); - void HELPER_WarmUpOutput( - test_UDPv4Transport& transport); - void HELPER_FillDataMessage( - CDRMessage_t& message, - SequenceNumber_t sequenceNumber); - void HELPER_FillAckNackMessage( - CDRMessage_t& message); - void HELPER_FillHeartbeatMessage( - CDRMessage_t& message); - - test_UDPv4TransportDescriptor descriptor; - std::unique_ptr senderThread; - std::unique_ptr receiverThread; -}; - -/* - TEST_F(test_UDPv4Tests, DATA_messages_dropped) - { - // Given - descriptor.dropDataMessagesPercentage = 100; - test_UDPv4Transport transportUnderTest(descriptor); - transportUnderTest.init(); - CDRMessage_t testDataMessage; - HELPER_FillDataMessage(testDataMessage, SequenceNumber_t()); - HELPER_WarmUpOutput(transportUnderTest); - Locator_t locator; - locator.port = g_default_port; - locator.kind = LOCATOR_KIND_UDPv4; - - // Then - ASSERT_TRUE(transportUnderTest.send(testDataMessage.buffer, testDataMessage.length, locator, locator)); - ASSERT_EQ(1u, test_UDPv4Transport::test_UDPv4Transport_DropLog.size()); - - ASSERT_TRUE(transportUnderTest.CloseOutputChannel(locator)); - } - - TEST_F(test_UDPv4Tests, ACKNACK_messages_dropped) - { - // Given - descriptor.dropAckNackMessagesPercentage = 100; - test_UDPv4Transport transportUnderTest(descriptor); - transportUnderTest.init(); - CDRMessage_t testDataMessage; - HELPER_FillAckNackMessage(testDataMessage); - HELPER_WarmUpOutput(transportUnderTest); - Locator_t locator; - locator.port = g_default_port; - locator.kind = LOCATOR_KIND_UDPv4; - - // Then - ASSERT_TRUE(transportUnderTest.send(testDataMessage.buffer, testDataMessage.length, locator, locator)); - ASSERT_EQ(1u, test_UDPv4Transport::test_UDPv4Transport_DropLog.size()); - ASSERT_TRUE(transportUnderTest.CloseOutputChannel(locator)); - } - - TEST_F(test_UDPv4Tests, HEARTBEAT_messages_dropped) - { - // Given - descriptor.dropHeartbeatMessagesPercentage = 100; - test_UDPv4Transport transportUnderTest(descriptor); - transportUnderTest.init(); - CDRMessage_t testDataMessage; - HELPER_FillHeartbeatMessage(testDataMessage); - HELPER_WarmUpOutput(transportUnderTest); - Locator_t locator; - locator.port = g_default_port; - locator.kind = LOCATOR_KIND_UDPv4; - - // Then - ASSERT_TRUE(transportUnderTest.send(testDataMessage.buffer, testDataMessage.length, locator, locator)); - ASSERT_EQ(1u, test_UDPv4Transport::test_UDPv4Transport_DropLog.size()); - ASSERT_TRUE(transportUnderTest.CloseOutputChannel(locator)); - } - - TEST_F(test_UDPv4Tests, Dropping_by_random_chance) - { - // Given - descriptor.percentageOfMessagesToDrop = 100; // To avoid a non-deterministic test - test_UDPv4Transport transportUnderTest(descriptor); - transportUnderTest.init(); - CDRMessage_t testDataMessage; - HELPER_FillAckNackMessage(testDataMessage); - HELPER_WarmUpOutput(transportUnderTest); - Locator_t locator; - locator.port = g_default_port; - locator.kind = LOCATOR_KIND_UDPv4; - - // Then - ASSERT_TRUE(transportUnderTest.send(testDataMessage.buffer, testDataMessage.length, locator, locator)); - ASSERT_TRUE(transportUnderTest.send(testDataMessage.buffer, testDataMessage.length, locator, locator)); - ASSERT_TRUE(transportUnderTest.send(testDataMessage.buffer, testDataMessage.length, locator, locator)); - ASSERT_EQ(3u, test_UDPv4Transport::test_UDPv4Transport_DropLog.size()); - ASSERT_TRUE(transportUnderTest.CloseOutputChannel(locator)); - } - - TEST_F(test_UDPv4Tests, dropping_by_sequence_number) - { - // Given - std::vector sequenceNumbersToDrop(1); - sequenceNumbersToDrop.back().low = 1; - - descriptor.sequenceNumberDataMessagesToDrop = sequenceNumbersToDrop; - test_UDPv4Transport transportUnderTest(descriptor); - transportUnderTest.init(); - CDRMessage_t testDataMessage; - HELPER_FillDataMessage(testDataMessage, sequenceNumbersToDrop.back()); - HELPER_WarmUpOutput(transportUnderTest); - Locator_t locator; - locator.port = g_default_port; - locator.kind = LOCATOR_KIND_UDPv4; - - // Then - ASSERT_TRUE(transportUnderTest.send(testDataMessage.buffer, testDataMessage.length, locator, locator)); - ASSERT_EQ(1u, test_UDPv4Transport::test_UDPv4Transport_DropLog.size()); - ASSERT_TRUE(transportUnderTest.CloseOutputChannel(locator)); - } - - TEST_F(test_UDPv4Tests, No_drops_when_unrequested) - { - // Given - descriptor.dropHeartbeatMessagesPercentage = 100; - descriptor.dropDataMessagesPercentage = 100; - descriptor.granularMode = false; - - test_UDPv4Transport transportUnderTest(descriptor); // Default, no drops - transportUnderTest.init(); - CDRMessage_t testDataMessage; - HELPER_FillAckNackMessage(testDataMessage); - HELPER_WarmUpOutput(transportUnderTest); - Locator_t locator; - locator.port = g_default_port; - locator.kind = LOCATOR_KIND_UDPv4; - IPLocator::setIPv4(locator, 239, 255, 1, 4); - - // Then - ASSERT_TRUE(transportUnderTest.send(testDataMessage.buffer, testDataMessage.length, locator, locator)); - ASSERT_EQ(0u, test_UDPv4Transport::test_UDPv4Transport_DropLog.size()); - ASSERT_TRUE(transportUnderTest.CloseOutputChannel(locator)); - } - - void test_UDPv4Tests::HELPER_SetDescriptorDefaults() - { - descriptor.sendBufferSize = 80; - descriptor.receiveBufferSize = 80; - descriptor.dropDataMessagesPercentage = 0; - descriptor.dropDataFragMessagesPercentage = 0; - descriptor.dropAckNackMessagesPercentage = 0; - descriptor.dropHeartbeatMessagesPercentage = 0; - descriptor.percentageOfMessagesToDrop = 0; - descriptor.dropLogLength = 10; - descriptor.granularMode = false; - } - - void test_UDPv4Tests::HELPER_WarmUpOutput(test_UDPv4Transport& transport) - { - Locator_t outputChannelLocator; - outputChannelLocator.port = g_default_port; - outputChannelLocator.kind = LOCATOR_KIND_UDPv4; - ASSERT_TRUE(transport.OpenOutputChannel(outputChannelLocator)); - } - */ - -void test_UDPv4Tests::HELPER_FillDataMessage( - CDRMessage_t& message, - SequenceNumber_t sequenceNumber) -{ - GuidPrefix_t prefix; - TopicKind_t topic = WITH_KEY; - EntityId_t entityID; - CacheChange_t change; - change.sequenceNumber = sequenceNumber; // Here is where the SN propagates from - RTPSMessageCreator::addMessageData(&message, prefix, &change, topic, entityID, false, nullptr); -} - -void test_UDPv4Tests::HELPER_FillAckNackMessage( - CDRMessage_t& message) -{ - GuidPrefix_t prefix; - EntityId_t entityID; - SequenceNumberSet_t set; - RTPSMessageCreator::addMessageAcknack(&message, prefix, prefix, entityID, entityID, set, 0, false); -} - -void test_UDPv4Tests::HELPER_FillHeartbeatMessage( - CDRMessage_t& message) -{ - GuidPrefix_t prefix; - EntityId_t entityID; - SequenceNumber_t sn1; - SequenceNumber_t sn2; - RTPSMessageCreator::addMessageHeartbeat(&message, prefix, entityID, entityID, sn1, sn2, 0, false, false); -} - -int main( - int argc, - char** argv) -{ - g_default_port = get_port(); - - testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -}