Skip to content

Commit

Permalink
Fix tsan potential deadlock between StatefulWriter and `FlowControl…
Browse files Browse the repository at this point in the history
…ler` (#5432)

* Refs #22339: Add BB test

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #22339: Fix tsan deadlock report

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #22339: Take writer's mutex before rproxy->stop() and check_acked_status()

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #22339: Apply Miguels suggestion

Signed-off-by: Mario Dominguez <[email protected]>

---------

Signed-off-by: Mario Dominguez <[email protected]>
  • Loading branch information
Mario-DL authored Dec 13, 2024
1 parent 0deeb14 commit 8fcd7ca
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 36 deletions.
72 changes: 36 additions & 36 deletions src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1218,55 +1218,58 @@ bool StatefulWriter::matched_reader_remove(
{
ReaderProxy* rproxy = nullptr;
std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
std::unique_lock<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_);
std::unique_lock<LocatorSelectorSender> guard_locator_selector_async(locator_selector_async_);

for (ReaderProxyIterator it = matched_local_readers_.begin();
it != matched_local_readers_.end(); ++it)
{
if ((*it)->guid() == reader_guid)
{
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
rproxy = std::move(*it);
it = matched_local_readers_.erase(it);
break;
}
}
std::lock_guard<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_);
std::lock_guard<LocatorSelectorSender> guard_locator_selector_async(locator_selector_async_);

if (rproxy == nullptr)
{
for (ReaderProxyIterator it = matched_datasharing_readers_.begin();
it != matched_datasharing_readers_.end(); ++it)
for (ReaderProxyIterator it = matched_local_readers_.begin();
it != matched_local_readers_.end(); ++it)
{
if ((*it)->guid() == reader_guid)
{
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
rproxy = std::move(*it);
it = matched_datasharing_readers_.erase(it);
it = matched_local_readers_.erase(it);
break;
}
}
}

if (rproxy == nullptr)
{
for (ReaderProxyIterator it = matched_remote_readers_.begin();
it != matched_remote_readers_.end(); ++it)
if (rproxy == nullptr)
{
if ((*it)->guid() == reader_guid)
for (ReaderProxyIterator it = matched_datasharing_readers_.begin();
it != matched_datasharing_readers_.end(); ++it)
{
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
rproxy = std::move(*it);
it = matched_remote_readers_.erase(it);
break;
if ((*it)->guid() == reader_guid)
{
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
rproxy = std::move(*it);
it = matched_datasharing_readers_.erase(it);
break;
}
}
}
}

locator_selector_general_.locator_selector.remove_entry(reader_guid);
locator_selector_async_.locator_selector.remove_entry(reader_guid);
update_reader_info(locator_selector_general_, false);
update_reader_info(locator_selector_async_, false);
if (rproxy == nullptr)
{
for (ReaderProxyIterator it = matched_remote_readers_.begin();
it != matched_remote_readers_.end(); ++it)
{
if ((*it)->guid() == reader_guid)
{
EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
rproxy = std::move(*it);
it = matched_remote_readers_.erase(it);
break;
}
}
}

locator_selector_general_.locator_selector.remove_entry(reader_guid);
locator_selector_async_.locator_selector.remove_entry(reader_guid);
update_reader_info(locator_selector_general_, false);
update_reader_info(locator_selector_async_, false);
}

if (get_matched_readers_size() == 0)
{
Expand All @@ -1282,11 +1285,8 @@ bool StatefulWriter::matched_reader_remove(

if (nullptr != listener_)
{
// call the listener without locks taken
guard_locator_selector_async.unlock();
guard_locator_selector_general.unlock();
// listener is called without locks taken
lock.unlock();

listener_->on_reader_discovery(this, ReaderDiscoveryStatus::REMOVED_READER, reader_guid, nullptr);
}

Expand Down
37 changes: 37 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,43 @@ TEST(DDSBasic, successful_destruction_among_intraprocess_participants)
}
}
}
TEST(DDSBasic, reliable_volatile_writer_secure_builtin_no_potential_deadlock)
{
// Create
PubSubWriter<HelloWorldPubSubType> writer("HelloWorldTopic_no_potential_deadlock");
PubSubReader<HelloWorldPubSubType> reader("HelloWorldTopic_no_potential_deadlock");

writer.asynchronously(eprosima::fastdds::dds::ASYNCHRONOUS_PUBLISH_MODE)
.durability_kind(eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS)
.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS)
.history_depth(20)
.init();

ASSERT_TRUE(writer.isInitialized());

reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS)
.history_depth(20)
.durability_kind(eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS)
.init();

ASSERT_TRUE(reader.isInitialized());

auto data = default_helloworld_data_generator(30);

std::thread th([&]()
{
reader.startReception(data);
reader.block_for_at_least(5);
});

writer.wait_discovery();
writer.send(data);

th.join();
reader.destroy();
writer.destroy();
}

} // namespace dds
} // namespace fastdds
Expand Down

0 comments on commit 8fcd7ca

Please sign in to comment.