Skip to content

Commit

Permalink
Fix assertion deleting a Datawriter configured with persistent and fl…
Browse files Browse the repository at this point in the history
…ow controller (#5364)

* Refs #22021. Regression test

Signed-off-by: Ricardo González Moreno <[email protected]>

* Refs #21867. Fix deinit of persistence writer when using flow

Signed-off-by: Ricardo González <[email protected]>

* Refs #22021. Apply suggestion

Signed-off-by: Ricardo González Moreno <[email protected]>

* Refs #22021. Apply suggestion

Signed-off-by: Ricardo González Moreno <[email protected]>

---------

Signed-off-by: Ricardo González Moreno <[email protected]>
Signed-off-by: Ricardo González <[email protected]>
  • Loading branch information
richiware authored Nov 7, 2024
1 parent fe29535 commit 7458386
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 7 deletions.
11 changes: 11 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1961,6 +1961,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint(
bool found = false, found_in_users = false;
Endpoint* p_endpoint = nullptr;
BaseReader* reader = nullptr;
BaseWriter* writer {nullptr};

if (endpoint.entityId.is_writer())
{
Expand All @@ -1970,6 +1971,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint(
{
if ((*wit)->getGuid().entityId == endpoint.entityId) //Found it
{
writer = *wit;
m_userWriterList.erase(wit);
found_in_users = true;
break;
Expand All @@ -1980,6 +1982,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint(
{
if ((*wit)->getGuid().entityId == endpoint.entityId) //Found it
{
writer = *wit;
p_endpoint = *wit;
m_allWriterList.erase(wit);
found = true;
Expand Down Expand Up @@ -2069,6 +2072,10 @@ bool RTPSParticipantImpl::deleteUserEndpoint(
{
reader->local_actions_on_reader_removed();
}
else if (writer)
{
writer->local_actions_on_writer_removed();
}
delete(p_endpoint);
return true;
}
Expand Down Expand Up @@ -2160,6 +2167,10 @@ void RTPSParticipantImpl::deleteAllUserEndpoints()
{
static_cast<BaseReader*>(endpoint)->local_actions_on_reader_removed();
}
else if (WRITER == kind)
{
static_cast<BaseWriter*>(endpoint)->local_actions_on_writer_removed();
}

// remove the endpoints
delete(endpoint);
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/writer/BaseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ void BaseWriter::init(
}
}

void BaseWriter::deinit()
void BaseWriter::local_actions_on_writer_removed()
{
// First, unregister changes from FlowController. This action must be protected.
{
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/rtps/writer/BaseWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class BaseWriter

bool is_async() const final;

virtual void local_actions_on_writer_removed();

#ifdef FASTDDS_STATISTICS

bool add_statistics_listener(
Expand Down Expand Up @@ -354,8 +356,6 @@ class BaseWriter
void init(
const WriterAttributes& att);

void deinit();

void add_guid(
LocatorSelectorSender& locator_selector,
const GUID_t& remote_guid);
Expand Down
1 change: 0 additions & 1 deletion src/cpp/rtps/writer/StatefulPersistentWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ StatefulPersistentWriter::StatefulPersistentWriter(

StatefulPersistentWriter::~StatefulPersistentWriter()
{
deinit();
}

/*
Expand Down
7 changes: 6 additions & 1 deletion src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@ void StatefulWriter::init(
StatefulWriter::~StatefulWriter()
{
EPROSIMA_LOG_INFO(RTPS_WRITER, "StatefulWriter destructor");
}

void StatefulWriter::local_actions_on_writer_removed()
{
EPROSIMA_LOG_INFO(RTPS_WRITER, "StatefulWriter local_actions_on_writer_removed");

// Disable timed events, because their callbacks use cache changes
if (disable_positive_acks_)
Expand All @@ -267,7 +272,7 @@ StatefulWriter::~StatefulWriter()
}

// This must be the next action, as it frees CacheChange_t from the async thread.
deinit();
BaseWriter::local_actions_on_writer_removed();

// Stop all active proxies and pass them to the pool
{
Expand Down
2 changes: 2 additions & 0 deletions src/cpp/rtps/writer/StatefulWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class StatefulWriter : public BaseWriter

virtual ~StatefulWriter();

void local_actions_on_writer_removed() override;

//vvvvvvvvvvvvvvvvvvvvv [Exported API] vvvvvvvvvvvvvvvvvvvvv

bool matched_reader_add_edp(
Expand Down
1 change: 0 additions & 1 deletion src/cpp/rtps/writer/StatelessPersistentWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ StatelessPersistentWriter::StatelessPersistentWriter(

StatelessPersistentWriter::~StatelessPersistentWriter()
{
deinit();
}

/*
Expand Down
7 changes: 6 additions & 1 deletion src/cpp/rtps/writer/StatelessWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,12 @@ void StatelessWriter::init(
StatelessWriter::~StatelessWriter()
{
EPROSIMA_LOG_INFO(RTPS_WRITER, "StatelessWriter destructor"; );
deinit();
}

void StatelessWriter::local_actions_on_writer_removed()
{
EPROSIMA_LOG_INFO(RTPS_WRITER, "StatelessWriter local_actions_on_writer_removed"; );
BaseWriter::local_actions_on_writer_removed();
}

void StatelessWriter::get_builtin_guid()
Expand Down
2 changes: 2 additions & 0 deletions src/cpp/rtps/writer/StatelessWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class StatelessWriter : public BaseWriter

virtual ~StatelessWriter();

void local_actions_on_writer_removed() override;

//vvvvvvvvvvvvvvvvvvvvv [Exported API] vvvvvvvvvvvvvvvvvvvvv

bool matched_reader_add_edp(
Expand Down
55 changes: 55 additions & 0 deletions test/blackbox/common/BlackboxTestsPubSubFlowControllers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <cstdio>
#include <thread>

#include <gtest/gtest.h>
Expand Down Expand Up @@ -228,6 +229,60 @@ TEST_P(PubSubFlowControllers, AsyncMultipleWritersFlowController64kb)
entities.block_for_all();
}

TEST_P(PubSubFlowControllers, AsyncPubSubAsReliableData64kbWithParticipantFlowControlAndPersistence)
{
PubSubReader<Data64kbPubSubType> reader(TEST_TOPIC_NAME);
PubSubWriter<Data64kbPubSubType> writer(TEST_TOPIC_NAME);

// Get info about current test
auto info = ::testing::UnitTest::GetInstance()->current_test_info();

// Create DB file name from test name and PID
std::ostringstream ss;
std::string test_case_name(info->test_case_name());
std::string test_name(info->name());
ss <<
test_case_name.replace(test_case_name.find_first_of('/'), 1, "_") << "_" <<
test_name.replace(test_name.find_first_of('/'), 1, "_") << "_" << GET_PID() << ".db";
std::string db_file_name = {ss.str()};

reader.history_depth(3).
reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS).init();

ASSERT_TRUE(reader.isInitialized());

uint32_t bytesPerPeriod = 65000;
uint32_t periodInMs = 500;
writer.add_flow_controller_descriptor_to_pparams(scheduler_policy_, bytesPerPeriod, periodInMs);

writer.history_depth(3)
.asynchronously(eprosima::fastdds::dds::ASYNCHRONOUS_PUBLISH_MODE)
.make_transient(db_file_name, "33.72.69.74.65.72.5f.70.65.72.73.5f|67.75.69.64")
.init();

ASSERT_TRUE(writer.isInitialized());

// Because its volatile the durability
// Wait for discovery.
writer.wait_discovery();
reader.wait_discovery();

auto data = default_data64kb_data_generator(3);

reader.startReception(data);

// Send data
writer.send(data);
// In this test all data should be sent.
ASSERT_TRUE(data.empty());
// Block reader until reception finished or timeout.
reader.block_for_all();

reader.destroy();
writer.destroy();
std::remove(db_file_name.c_str());
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down
1 change: 1 addition & 0 deletions test/blackbox/common/DDSBlackboxTestsPersistence.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <cstdio>
#include <thread>

#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
Expand Down

0 comments on commit 7458386

Please sign in to comment.