diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp b/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp index 17a9dc149f7..18eefbd2dd5 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp @@ -898,12 +898,22 @@ class SharedMemGlobal void lock_read_exclusive() { + if (OpenMode::ReadShared == open_mode()) + { + throw std::runtime_error("port is opened ReadShared"); + } + std::string lock_name = std::string(node_->domain_name) + "_port" + std::to_string(node_->port_id) + "_el"; read_exclusive_lock_ = std::unique_ptr(new RobustExclusiveLock(lock_name)); } void lock_read_shared() { + if (OpenMode::ReadExclusive == open_mode()) + { + throw std::runtime_error("port is opened ReadExclusive"); + } + std::string lock_name = std::string(node_->domain_name) + "_port" + std::to_string(node_->port_id) + "_sl"; read_shared_lock_ = std::unique_ptr(new RobustSharedLock(lock_name)); } @@ -1124,7 +1134,24 @@ class SharedMemGlobal std::stringstream ss; ss << port_node->port_id << " (" << port_node->uuid.to_string() << - ") because is ReadExclusive locked"; + ") because it was already locked"; + + err_reason = ss.str(); + port.reset(); + } + } + else if (open_mode == Port::OpenMode::ReadShared) + { + try + { + port->lock_read_shared(); + } + catch (const std::exception&) + { + std::stringstream ss; + + ss << port_node->port_id << " (" << port_node->uuid.to_string() << + ") because it had a ReadExclusive lock"; err_reason = ss.str(); port.reset(); diff --git a/test/blackbox/common/BlackboxTestsTransportSHM.cpp b/test/blackbox/common/BlackboxTestsTransportSHM.cpp index be5a48bc3ac..f3fc6a1dabd 100644 --- a/test/blackbox/common/BlackboxTestsTransportSHM.cpp +++ b/test/blackbox/common/BlackboxTestsTransportSHM.cpp @@ -16,6 +16,7 @@ #include "BlackboxTests.hpp" +#include "PubSubParticipant.hpp" #include "PubSubReader.hpp" #include "PubSubWriter.hpp" @@ -28,6 +29,8 @@ using namespace eprosima::fastrtps; using SharedMemTransportDescriptor = eprosima::fastdds::rtps::SharedMemTransportDescriptor; using test_SharedMemTransportDescriptor = eprosima::fastdds::rtps::test_SharedMemTransportDescriptor; +using Locator = eprosima::fastdds::rtps::Locator; +using LocatorList = eprosima::fastdds::rtps::LocatorList; TEST(SHM, TransportPubSub) { @@ -70,6 +73,148 @@ TEST(SHM, TransportPubSub) reader.wait_participant_undiscovery(); } +<<<<<<< HEAD +======= +/* Regression test for redmine issue #20701 + * + * This test checks that the SHM transport will not listen on the same port + * in unicast and multicast at the same time. + * It does so by specifying custom default locators on a DataReader and then + * checking that the port mutation took place, thus producing a different port. + */ +TEST(SHM, SamePortUnicastMulticast) +{ + PubSubReader participant(TEST_TOPIC_NAME); + + Locator locator; + locator.kind = LOCATOR_KIND_SHM; + locator.port = global_port; + + LocatorList unicast_list; + LocatorList multicast_list; + + // Note: this is using knowledge of the SHM locator address format since + // SHMLocator is not exposed to the user. + locator.address[0] = 'U'; + unicast_list.push_back(locator); + + // Note: this is using knowledge of the SHM locator address format since + // SHMLocator is not exposed to the user. + locator.address[0] = 'M'; + multicast_list.push_back(locator); + + // Create the reader with the custom transport and locators + auto testTransport = std::make_shared(); + participant + .disable_builtin_transport() + .add_user_transport_to_pparams(testTransport) + .set_default_unicast_locators(unicast_list) + .set_default_multicast_locators(multicast_list) + .init(); + + ASSERT_TRUE(participant.isInitialized()); + + // Retrieve the listening locators and check that one port is different + LocatorList reader_locators; + participant.get_native_reader().get_listening_locators(reader_locators); + + ASSERT_EQ(reader_locators.size(), 2u); + auto it = reader_locators.begin(); + auto first_port = it->port; + ++it; + auto second_port = it->port; + EXPECT_NE(first_port, second_port); + EXPECT_TRUE(first_port == global_port || second_port == global_port); +} + +// Regression test for redmine #19500 +TEST(SHM, IgnoreNonExistentSegment) +{ + using namespace eprosima::fastdds::dds; + + // Set up log + BlackboxMockConsumer* helper_consumer = new BlackboxMockConsumer(); + Log::ClearConsumers(); // Remove default consumers + Log::RegisterConsumer(std::unique_ptr(helper_consumer)); // Registering a consumer transfer ownership + // Filter specific message + Log::SetVerbosity(eprosima::fastdds::dds::Log::Kind::Warning); + Log::SetCategoryFilter(std::regex("RTPS_TRANSPORT_SHM")); + Log::SetErrorStringFilter(std::regex("Error receiving data.*")); + + PubSubReader reader(TEST_TOPIC_NAME); + PubSubWriter writer(TEST_TOPIC_NAME); + + writer + .asynchronously(eprosima::fastrtps::SYNCHRONOUS_PUBLISH_MODE) + .reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS) + .durability_kind(eprosima::fastrtps::TRANSIENT_LOCAL_DURABILITY_QOS) + .history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS) + .disable_builtin_transport() + .add_user_transport_to_pparams(std::make_shared()) + .init(); + ASSERT_TRUE(writer.isInitialized()); + + reader + .reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS) + .durability_kind(eprosima::fastrtps::TRANSIENT_LOCAL_DURABILITY_QOS) + .history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS) + .disable_builtin_transport() + .add_user_transport_to_pparams(std::make_shared()) + .init(); + + ASSERT_TRUE(reader.isInitialized()); + + reader.wait_discovery(); + + // Create and quickly destroy several participants in several threads +#ifdef _WIN32 + constexpr size_t num_threads = 1; +#else + constexpr size_t num_threads = 10; +#endif // _WIN32 + std::vector threads; + for (size_t i = 0; i < num_threads; i++) + { + threads.push_back(std::thread([]() + { +#ifdef _WIN32 + constexpr size_t num_parts = 2; +#else + constexpr size_t num_parts = 10; +#endif // _WIN32 + for (size_t i = 0; i < num_parts; ++i) + { + PubSubWriter late_writer(TEST_TOPIC_NAME); + late_writer + .asynchronously(eprosima::fastrtps::SYNCHRONOUS_PUBLISH_MODE) + .reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS) + .disable_builtin_transport() + .add_user_transport_to_pparams(std::make_shared()) + .init(); + ASSERT_TRUE(late_writer.isInitialized()); + } + })); + } + + // Destroy the writer participant. + writer.destroy(); + + // Check that reader receives the unmatched. + reader.wait_participant_undiscovery(); + + for (auto& thread : threads) + { + thread.join(); + } + // Check logs + Log::Flush(); + EXPECT_EQ(helper_consumer->ConsumedEntries().size(), 0u); + + // Clean-up + Log::Reset(); // This calls to ClearConsumers, which deletes the registered consumer +} + +>>>>>>> 3d159dc8c (Enforce SHM ports open mode exclusions (#4635)) TEST(SHM, Test300KFragmentation) { PubSubReader reader(TEST_TOPIC_NAME); diff --git a/test/unittest/transport/SharedMemTests.cpp b/test/unittest/transport/SharedMemTests.cpp index 7cfde9a6db0..5494ea4230d 100644 --- a/test/unittest/transport/SharedMemTests.cpp +++ b/test/unittest/transport/SharedMemTests.cpp @@ -1013,6 +1013,36 @@ TEST_F(SHMTransportTests, port_lock_read_exclusive) port = shared_mem_manager->open_port(0, 1, 1000, SharedMemGlobal::Port::OpenMode::ReadExclusive); } +// Regression test for redmine issue #20701 +TEST_F(SHMTransportTests, port_lock_read_shared_then_exclusive) +{ + auto shared_mem_manager = SharedMemManager::create(domain_name); + + shared_mem_manager->remove_port(0); + + auto port = shared_mem_manager->open_port(0, 1, 1000, SharedMemGlobal::Port::OpenMode::ReadShared); + ASSERT_THROW(shared_mem_manager->open_port(0, 1, 1000, SharedMemGlobal::Port::OpenMode::ReadExclusive), + std::exception); + + port.reset(); + port = shared_mem_manager->open_port(0, 1, 1000, SharedMemGlobal::Port::OpenMode::ReadExclusive); +} + +// Regression test for redmine issue #20701 +TEST_F(SHMTransportTests, port_lock_read_exclusive_then_shared) +{ + auto shared_mem_manager = SharedMemManager::create(domain_name); + + shared_mem_manager->remove_port(0); + + auto port = shared_mem_manager->open_port(0, 1, 1000, SharedMemGlobal::Port::OpenMode::ReadExclusive); + ASSERT_THROW(shared_mem_manager->open_port(0, 1, 1000, SharedMemGlobal::Port::OpenMode::ReadShared), + std::exception); + + port.reset(); + port = shared_mem_manager->open_port(0, 1, 1000, SharedMemGlobal::Port::OpenMode::ReadShared); +} + TEST_F(SHMTransportTests, robust_exclusive_lock) { const std::string lock_name = "robust_exclusive_lock_test1_el";