Skip to content

Commit

Permalink
guard dds-topic-reader against dtor/stop inside callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
maloel committed Mar 22, 2024
1 parent 93bc9fd commit 3b74738
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 21 deletions.
2 changes: 2 additions & 0 deletions third-party/realdds/include/realdds/dds-stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class dds_stream : public dds_stream_base
dds_stream( std::string const & stream_name, std::string const & sensor_name );

public:
~dds_stream();

bool is_open() const override { return !! _reader; }
virtual void open( std::string const & topic_name, std::shared_ptr< dds_subscriber > const & ) = 0;
virtual void close();
Expand Down
15 changes: 11 additions & 4 deletions third-party/realdds/include/realdds/dds-topic-reader-thread.h
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
// License: Apache 2.0. See LICENSE file in root directory.
// Copyright(c) 2023 Intel Corporation. All Rights Reserved.

// Copyright(c) 2023-4 Intel Corporation. All Rights Reserved.
#pragma once

#include "dds-topic-reader.h"

#include <fastdds/dds/core/condition/GuardCondition.hpp>
#include <thread>


namespace eprosima {
namespace fastdds {
namespace dds {
class GuardCondition;
} // namespace dds
} // namespace fastdds
} // namespace eprosima


namespace realdds {


Expand All @@ -31,7 +38,7 @@ class dds_topic_reader_thread : public dds_topic_reader
{
typedef dds_topic_reader super;

eprosima::fastdds::dds::GuardCondition _stopped;
std::shared_ptr< eprosima::fastdds::dds::GuardCondition > _stopped;
std::thread _th;

public:
Expand Down
8 changes: 5 additions & 3 deletions third-party/realdds/include/realdds/dds-topic-reader.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// License: Apache 2.0. See LICENSE file in root directory.
// Copyright(c) 2022 Intel Corporation. All Rights Reserved.
// Copyright(c) 2022-4 Intel Corporation. All Rights Reserved.
#pragma once

#include <fastdds/dds/subscriber/DataReaderListener.hpp>
Expand Down Expand Up @@ -31,11 +31,13 @@ class dds_subscriber;
// You may choose to create one via a 'subscriber' that manages the activities of several readers.
// on_data_available callback will be called when a sample is received.
//
class dds_topic_reader : public eprosima::fastdds::dds::DataReaderListener
class dds_topic_reader
: public eprosima::fastdds::dds::DataReaderListener
, public std::enable_shared_from_this< dds_topic_reader >
{
protected:
std::shared_ptr< dds_topic > const _topic;
std::shared_ptr < dds_subscriber > const _subscriber;
std::shared_ptr< dds_subscriber > const _subscriber;

eprosima::fastdds::dds::DataReader * _reader = nullptr;

Expand Down
11 changes: 11 additions & 0 deletions third-party/realdds/src/dds-device-impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ dds_device::impl::impl( std::shared_ptr< dds_participant > const & participant,
}


dds_device::impl::~impl()
{
if( _notifications_reader )
_notifications_reader->stop();
if( _metadata_reader )
_metadata_reader->stop();
}


void dds_device::impl::reset()
{
// _info should already be up-to-date
Expand All @@ -107,6 +116,8 @@ void dds_device::impl::reset()
_streams.clear();
_options.clear();
_extrinsics_map.clear();
if( _metadata_reader )
_metadata_reader->stop();
_metadata_reader.reset();
}

Expand Down
1 change: 1 addition & 0 deletions third-party/realdds/src/dds-device-impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class dds_device::impl

impl( std::shared_ptr< dds_participant > const & participant,
topics::device_info const & info );
~impl();

void reset();

Expand Down
9 changes: 9 additions & 0 deletions third-party/realdds/src/dds-stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ dds_stream::dds_stream( std::string const & stream_name, std::string const & sen
}


dds_stream::~dds_stream()
{
if( _reader )
_reader->stop();
}


void dds_video_stream::open( std::string const & topic_name, std::shared_ptr< dds_subscriber > const & subscriber )
{
if( is_open() )
Expand Down Expand Up @@ -63,6 +70,8 @@ void dds_motion_stream::open( std::string const & topic_name, std::shared_ptr< d

void dds_stream::close()
{
if( _reader )
_reader->stop();
_reader.reset();
}

Expand Down
48 changes: 34 additions & 14 deletions third-party/realdds/src/dds-topic-reader-thread.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// License: Apache 2.0. See LICENSE file in root directory.
// Copyright(c) 2023 Intel Corporation. All Rights Reserved.
// Copyright(c) 2023-4 Intel Corporation. All Rights Reserved.

#include <realdds/dds-topic-reader-thread.h>
#include <realdds/dds-topic.h>
Expand All @@ -10,6 +10,7 @@
#include <fastdds/dds/subscriber/DataReader.hpp>
#include <fastdds/dds/topic/Topic.hpp>

#include <fastdds/dds/core/condition/GuardCondition.hpp>
#include <fastdds/dds/core/condition/WaitSet.hpp>

namespace realdds {
Expand All @@ -24,6 +25,7 @@ dds_topic_reader_thread::dds_topic_reader_thread( std::shared_ptr< dds_topic > c
dds_topic_reader_thread::dds_topic_reader_thread( std::shared_ptr< dds_topic > const & topic,
std::shared_ptr< dds_subscriber > const & subscriber )
: super( topic, subscriber )
, _stopped( nullptr )
{
}

Expand All @@ -40,25 +42,31 @@ void dds_topic_reader_thread::run( qos const & rqos )
DDS_THROW( runtime_error, "on-data-available must be provided" );

_reader = DDS_API_CALL( _subscriber->get()->create_datareader( _topic->get(), rqos ) );
_stopped = std::make_shared< eprosima::fastdds::dds::GuardCondition >();

_th = std::thread(
[this, name = _topic->get()->get_name()]()
[this,
weak = std::weak_ptr< dds_topic_reader >( shared_from_this() ), // detect lifetime
name = _topic->get()->get_name(),
stopped = _stopped] // hold a copy so the wait-set is valid even if the reader is destroyed
{
eprosima::fastdds::dds::WaitSet wait_set;
auto & condition = _reader->get_statuscondition();
condition.set_enabled_statuses( eprosima::fastdds::dds::StatusMask::data_available()
<< eprosima::fastdds::dds::StatusMask::subscription_matched()
<< eprosima::fastdds::dds::StatusMask::sample_lost() );
wait_set.attach_condition( condition );

wait_set.attach_condition( _stopped );
if( auto strong = weak.lock() )
{
auto & condition = _reader->get_statuscondition();
condition.set_enabled_statuses( eprosima::fastdds::dds::StatusMask::data_available()
<< eprosima::fastdds::dds::StatusMask::subscription_matched()
<< eprosima::fastdds::dds::StatusMask::sample_lost() );
wait_set.attach_condition( condition );

while( ! _stopped.get_trigger_value() )
wait_set.attach_condition( *stopped );
}
// We'll keep locking the object so it cannot destruct mid-callback, and exit out if we detect destruction
while( auto strong = weak.lock() )
{
eprosima::fastdds::dds::ConditionSeq active_conditions;
wait_set.wait( active_conditions, eprosima::fastrtps::c_TimeInfinite );

if( _stopped.get_trigger_value() )
if( stopped->get_trigger_value() )
break;

auto & changed = _reader->get_status_changes();
Expand All @@ -67,16 +75,22 @@ void dds_topic_reader_thread::run( qos const & rqos )
eprosima::fastdds::dds::SampleLostStatus status;
_reader->get_sample_lost_status( status );
on_sample_lost( _reader, status );
if( stopped->get_trigger_value() )
break;
}
if( changed.is_active( eprosima::fastdds::dds::StatusMask::data_available() ) )
{
on_data_available( _reader );
if( stopped->get_trigger_value() )
break;
}
if( changed.is_active( eprosima::fastdds::dds::StatusMask::subscription_matched() ) )
{
eprosima::fastdds::dds::SubscriptionMatchedStatus status;
_reader->get_subscription_matched_status( status );
on_subscription_matched( _reader, status );
if( stopped->get_trigger_value() )
break;
}
}
} );
Expand All @@ -87,8 +101,14 @@ void dds_topic_reader_thread::stop()
{
if( _th.joinable() )
{
_stopped.set_trigger_value( true );
_th.join();
_stopped->set_trigger_value( true );
// If we try to stop from within the thread (e.g., inside on_data_available), join() will terminate!
// If we detect such a case, then it's also possible our object will get destroyed (we may get here from the
// dtor) so we detach the thread instead.
if( _th.get_id() != std::this_thread::get_id() )
_th.join();
else
_th.detach();
}
super::stop();
}
Expand Down

0 comments on commit 3b74738

Please sign in to comment.