Skip to content

Commit

Permalink
Effectively assert automatic/manual_by_participant liveliness (#4501) (
Browse files Browse the repository at this point in the history
…#4596)

* Refs #20584: Add BB test

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #20584: Fix

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #20584: Update LivelinessManager tests

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #20584: Linter

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #20584: Apply reviewer suggestions

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #20584: Fix PubSubParticipant comparison and add padding between writer announcement period and lease duration test's writers

Signed-off-by: Mario Dominguez <[email protected]>

---------

Signed-off-by: Mario Dominguez <[email protected]>
(cherry picked from commit f708244)

Co-authored-by: Mario Domínguez López <[email protected]>
  • Loading branch information
mergify[bot] and Mario-DL authored Mar 26, 2024
1 parent 1b7eaea commit 62a752f
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 140 deletions.
6 changes: 4 additions & 2 deletions include/fastdds/rtps/writer/LivelinessManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,14 @@ class LivelinessManager
Duration_t lease_duration);

/**
* @brief Asserts liveliness of writers with given liveliness kind
* @brief Asserts liveliness of writers with given liveliness kind and GuidPrefix
* @param kind Liveliness kind
* @param guid_prefix The guid prefix of the writers to assert liveliness of
* @return True if liveliness was successfully asserted
*/
bool assert_liveliness(
LivelinessQosPolicyKind kind);
LivelinessQosPolicyKind kind,
GuidPrefix_t guid_prefix);

/**
* @brief A method to check any writer of the given kind is alive
Expand Down
4 changes: 3 additions & 1 deletion src/cpp/rtps/builtin/liveliness/WLP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,9 @@ bool WLP::assert_liveliness_manual_by_participant()
{
if (manual_by_participant_writers_.size() > 0)
{
return pub_liveliness_manager_->assert_liveliness(MANUAL_BY_PARTICIPANT_LIVELINESS_QOS);
return pub_liveliness_manager_->assert_liveliness(
MANUAL_BY_PARTICIPANT_LIVELINESS_QOS,
mp_participant->getGuid().guidPrefix);
}
return false;
}
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/rtps/builtin/liveliness/WLPListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ void WLPListener::onNewCacheChangeAdded(
history->getMutex()->unlock();
if (mp_WLP->automatic_readers_)
{
mp_WLP->sub_liveliness_manager_->assert_liveliness(AUTOMATIC_LIVELINESS_QOS);
mp_WLP->sub_liveliness_manager_->assert_liveliness(AUTOMATIC_LIVELINESS_QOS, guidP);
}
if (livelinessKind == MANUAL_BY_PARTICIPANT_LIVELINESS_QOS)
{
mp_WLP->sub_liveliness_manager_->assert_liveliness(MANUAL_BY_PARTICIPANT_LIVELINESS_QOS);
mp_WLP->sub_liveliness_manager_->assert_liveliness(MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, guidP);
}
mp_WLP->mp_builtinProtocols->mp_PDP->getMutex()->unlock();
history->getMutex()->lock();
Expand Down
9 changes: 6 additions & 3 deletions src/cpp/rtps/writer/LivelinessManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ bool LivelinessManager::assert_liveliness(
{
for (LivelinessData& w: writers_)
{
if (w.kind == writer.kind)
if (w.kind == writer.kind &&
w.guid.guidPrefix == guid.guidPrefix)
{
assert_writer_liveliness(w);
}
Expand Down Expand Up @@ -232,7 +233,8 @@ bool LivelinessManager::assert_liveliness(
}

bool LivelinessManager::assert_liveliness(
LivelinessQosPolicyKind kind)
LivelinessQosPolicyKind kind,
GuidPrefix_t guid_prefix)
{

if (!manage_automatic_ && kind == LivelinessQosPolicyKind::AUTOMATIC_LIVELINESS_QOS)
Expand All @@ -253,7 +255,8 @@ bool LivelinessManager::assert_liveliness(

for (LivelinessData& writer: writers_)
{
if (writer.kind == kind)
if (writer.kind == kind &&
guid_prefix == writer.guid.guidPrefix)
{
assert_writer_liveliness(writer);
}
Expand Down
16 changes: 16 additions & 0 deletions test/blackbox/api/dds-pim/PubSubParticipant.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,22 @@ class PubSubParticipant
});
}

template<class _Rep,
class _Period
>
size_t sub_wait_liveliness_lost_for(
unsigned int expected_num_lost,
const std::chrono::duration<_Rep, _Period>& max_wait)
{
std::unique_lock<std::mutex> lock(sub_liveliness_mutex_);
sub_liveliness_cv_.wait_for(lock, max_wait, [this, &expected_num_lost]() -> bool
{
return sub_times_liveliness_lost_ >= expected_num_lost;
});

return sub_times_liveliness_lost_;
}

PubSubParticipant& property_policy(
const eprosima::fastrtps::rtps::PropertyPolicy property_policy)
{
Expand Down
16 changes: 16 additions & 0 deletions test/blackbox/api/fastrtps_deprecated/PubSubParticipant.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,22 @@ class PubSubParticipant
});
}

template<class _Rep,
class _Period
>
size_t sub_wait_liveliness_lost_for(
unsigned int expected_num_lost,
const std::chrono::duration<_Rep, _Period>& max_wait)
{
std::unique_lock<std::mutex> lock(sub_liveliness_mutex_);
sub_liveliness_cv_.wait_for(lock, max_wait, [this, &expected_num_lost]() -> bool
{
return sub_times_liveliness_lost_ >= expected_num_lost;
});

return sub_times_liveliness_lost_;
}

PubSubParticipant& property_policy(
const eprosima::fastrtps::rtps::PropertyPolicy property_policy)
{
Expand Down
52 changes: 52 additions & 0 deletions test/blackbox/common/BlackboxTestsLivelinessQos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include <gtest/gtest.h>
#include <fastrtps/xmlparser/XMLProfileManager.h>

#include <rtps/transport/test_UDPv4Transport.h>

using namespace eprosima::fastrtps;
using namespace eprosima::fastrtps::rtps;

Expand Down Expand Up @@ -1946,6 +1948,56 @@ TEST(LivelinessTests, Detect_Deadlock_ManualByParticipant_Intraprocess)
// Test failure is due to timeout
}

// Regression test of Refs #20584, github issue #4373
TEST(LivelinessTests, Reader_Successfully_Asserts_Liveliness_on_a_Disconnected_Writer)
{
// Create a TestTransport to simulate a network shutdown (Ctrl+C)
auto testTransport = std::make_shared<eprosima::fastdds::rtps::test_UDPv4TransportDescriptor>();

// Create two writer participants
PubSubWriter<HelloWorldPubSubType> writer_1(TEST_TOPIC_NAME);
PubSubWriter<HelloWorldPubSubType> writer_2(TEST_TOPIC_NAME + "2");

// Create a reader participant containing 2 subscribers and readers
PubSubParticipant<HelloWorldPubSubType> reader(0, 2, 0, 2);

reader.init_participant();
// Define the reader's lease duration in 1.6 secs
reader.sub_liveliness_lease_duration(eprosima::fastrtps::Time_t(1, 600000000));

// Create Subscribers and readers, one for each writer
reader.sub_topic_name(TEST_TOPIC_NAME);
reader.init_subscriber(0);
reader.sub_topic_name(TEST_TOPIC_NAME + "2");
reader.init_subscriber(1);

// Create writers
writer_1.disable_builtin_transport()
.lease_duration(c_TimeInfinite, 1)
.add_user_transport_to_pparams(testTransport)
.liveliness_lease_duration(eprosima::fastrtps::Time_t(1, 0))
.liveliness_kind(eprosima::fastdds::dds::AUTOMATIC_LIVELINESS_QOS)
.liveliness_announcement_period(eprosima::fastrtps::Time_t(0, 500000000))
.init();

writer_2.lease_duration(c_TimeInfinite, 1)
.liveliness_lease_duration(eprosima::fastrtps::Time_t(1, 0))
.liveliness_kind(eprosima::fastdds::dds::AUTOMATIC_LIVELINESS_QOS)
.liveliness_announcement_period(eprosima::fastrtps::Time_t(0, 500000000))
.init();

// Wait for discovery to occur. Liveliness should be recovered twice,
// one per matched reader.
reader.sub_wait_liveliness_recovered(2);

// Simulate a Ctrl+C in one of the writers
eprosima::fastdds::rtps::test_UDPv4Transport::test_UDPv4Transport_ShutdownAllNetwork = true;

// After 1.6 secs, we should receive a on_liveliness_changed(status lost)
// in the TEST_TOPIC_NAME reader that was matched with the disconnected writer_1
ASSERT_EQ(reader.sub_wait_liveliness_lost_for(1, std::chrono::seconds(4)), 1u);
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down
Loading

0 comments on commit 62a752f

Please sign in to comment.