From 8fcd7ca4833af25d0792524587792c750f41b717 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mario=20Dom=C3=ADnguez=20L=C3=B3pez?= <116071334+Mario-DL@users.noreply.github.com> Date: Fri, 13 Dec 2024 13:12:40 +0100 Subject: [PATCH] Fix tsan potential deadlock between `StatefulWriter` and `FlowController` (#5432) * Refs #22339: Add BB test Signed-off-by: Mario Dominguez * Refs #22339: Fix tsan deadlock report Signed-off-by: Mario Dominguez * Refs #22339: Take writer's mutex before rproxy->stop() and check_acked_status() Signed-off-by: Mario Dominguez * Refs #22339: Apply Miguels suggestion Signed-off-by: Mario Dominguez --------- Signed-off-by: Mario Dominguez --- src/cpp/rtps/writer/StatefulWriter.cpp | 72 +++++++++---------- .../blackbox/common/DDSBlackboxTestsBasic.cpp | 37 ++++++++++ 2 files changed, 73 insertions(+), 36 deletions(-) diff --git a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp index ced15bae493..f0dd8f2d883 100644 --- a/src/cpp/rtps/writer/StatefulWriter.cpp +++ b/src/cpp/rtps/writer/StatefulWriter.cpp @@ -1218,55 +1218,58 @@ bool StatefulWriter::matched_reader_remove( { ReaderProxy* rproxy = nullptr; std::unique_lock lock(mp_mutex); - std::unique_lock guard_locator_selector_general(locator_selector_general_); - std::unique_lock guard_locator_selector_async(locator_selector_async_); - for (ReaderProxyIterator it = matched_local_readers_.begin(); - it != matched_local_readers_.end(); ++it) { - if ((*it)->guid() == reader_guid) - { - EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid); - rproxy = std::move(*it); - it = matched_local_readers_.erase(it); - break; - } - } + std::lock_guard guard_locator_selector_general(locator_selector_general_); + std::lock_guard guard_locator_selector_async(locator_selector_async_); - if (rproxy == nullptr) - { - for (ReaderProxyIterator it = matched_datasharing_readers_.begin(); - it != matched_datasharing_readers_.end(); ++it) + for (ReaderProxyIterator it = matched_local_readers_.begin(); + it != matched_local_readers_.end(); ++it) { if ((*it)->guid() == reader_guid) { EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid); rproxy = std::move(*it); - it = matched_datasharing_readers_.erase(it); + it = matched_local_readers_.erase(it); break; } } - } - if (rproxy == nullptr) - { - for (ReaderProxyIterator it = matched_remote_readers_.begin(); - it != matched_remote_readers_.end(); ++it) + if (rproxy == nullptr) { - if ((*it)->guid() == reader_guid) + for (ReaderProxyIterator it = matched_datasharing_readers_.begin(); + it != matched_datasharing_readers_.end(); ++it) { - EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid); - rproxy = std::move(*it); - it = matched_remote_readers_.erase(it); - break; + if ((*it)->guid() == reader_guid) + { + EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid); + rproxy = std::move(*it); + it = matched_datasharing_readers_.erase(it); + break; + } } } - } - locator_selector_general_.locator_selector.remove_entry(reader_guid); - locator_selector_async_.locator_selector.remove_entry(reader_guid); - update_reader_info(locator_selector_general_, false); - update_reader_info(locator_selector_async_, false); + if (rproxy == nullptr) + { + for (ReaderProxyIterator it = matched_remote_readers_.begin(); + it != matched_remote_readers_.end(); ++it) + { + if ((*it)->guid() == reader_guid) + { + EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid); + rproxy = std::move(*it); + it = matched_remote_readers_.erase(it); + break; + } + } + } + + locator_selector_general_.locator_selector.remove_entry(reader_guid); + locator_selector_async_.locator_selector.remove_entry(reader_guid); + update_reader_info(locator_selector_general_, false); + update_reader_info(locator_selector_async_, false); + } if (get_matched_readers_size() == 0) { @@ -1282,11 +1285,8 @@ bool StatefulWriter::matched_reader_remove( if (nullptr != listener_) { - // call the listener without locks taken - guard_locator_selector_async.unlock(); - guard_locator_selector_general.unlock(); + // listener is called without locks taken lock.unlock(); - listener_->on_reader_discovery(this, ReaderDiscoveryStatus::REMOVED_READER, reader_guid, nullptr); } diff --git a/test/blackbox/common/DDSBlackboxTestsBasic.cpp b/test/blackbox/common/DDSBlackboxTestsBasic.cpp index 1670ec49b0c..540281d62d5 100644 --- a/test/blackbox/common/DDSBlackboxTestsBasic.cpp +++ b/test/blackbox/common/DDSBlackboxTestsBasic.cpp @@ -1014,6 +1014,43 @@ TEST(DDSBasic, successful_destruction_among_intraprocess_participants) } } } +TEST(DDSBasic, reliable_volatile_writer_secure_builtin_no_potential_deadlock) +{ + // Create + PubSubWriter writer("HelloWorldTopic_no_potential_deadlock"); + PubSubReader reader("HelloWorldTopic_no_potential_deadlock"); + + writer.asynchronously(eprosima::fastdds::dds::ASYNCHRONOUS_PUBLISH_MODE) + .durability_kind(eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS) + .history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS) + .history_depth(20) + .init(); + + ASSERT_TRUE(writer.isInitialized()); + + reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) + .history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS) + .history_depth(20) + .durability_kind(eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS) + .init(); + + ASSERT_TRUE(reader.isInitialized()); + + auto data = default_helloworld_data_generator(30); + + std::thread th([&]() + { + reader.startReception(data); + reader.block_for_at_least(5); + }); + + writer.wait_discovery(); + writer.send(data); + + th.join(); + reader.destroy(); + writer.destroy(); +} } // namespace dds } // namespace fastdds