Skip to content

Commit

Permalink
add dds_topic_reader::wait_for_writers() and stop()
Browse files Browse the repository at this point in the history
  • Loading branch information
maloel committed May 15, 2024
1 parent d63bb35 commit cf094a3
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 1 deletion.
7 changes: 6 additions & 1 deletion third-party/realdds/include/realdds/dds-topic-reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

#include <fastdds/dds/subscriber/DataReaderListener.hpp>
#include <fastdds/dds/subscriber/qos/DataReaderQos.hpp>
#include "dds-defines.h"

#include <rsutils/json-fwd.h>
#include <functional>
#include <memory>
#include <atomic>


namespace eprosima {
Expand Down Expand Up @@ -41,7 +43,7 @@ class dds_topic_reader

eprosima::fastdds::dds::DataReader * _reader = nullptr;

int _n_writers = 0;
std::atomic< int > _n_writers;

public:
dds_topic_reader( std::shared_ptr< dds_topic > const & topic );
Expand Down Expand Up @@ -89,6 +91,9 @@ class dds_topic_reader
// The callbacks should be set before we actually create the underlying DDS objects, so the reader does not
virtual void run( qos const & );

// Waits until writers are detected; return false on timeout
bool wait_for_writers( dds_time timeout );

// Go back to a pre-run() state, such that is_running() returns false
virtual void stop();

Expand Down
4 changes: 4 additions & 0 deletions third-party/realdds/py/pyrealdds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ PYBIND11_MODULE(NAME, m) {
callback( self, status.total_count, status.total_count_change ); ) )
.def( "topic", &dds_topic_reader::topic )
.def( "run", &dds_topic_reader::run )
.def( "wait_for_writers", &dds_topic_reader::wait_for_writers )
.def( "stop", &dds_topic_reader::stop,
// GIL release needed: stop will wait and cause a hang if we're inside a callback
py::call_guard< py::gil_scoped_release >() )
.def( "qos", []() { return reader_qos(); } )
.def( "qos", []( reliability r, durability d ) { return reader_qos( r, d ); } );

Expand Down
17 changes: 17 additions & 0 deletions third-party/realdds/src/dds-topic-reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <fastdds/dds/topic/Topic.hpp>
#include <fastdds/dds/core/status/SubscriptionMatchedStatus.hpp>

#include <rsutils/time/timer.h>
#include <rsutils/json.h>


Expand All @@ -32,6 +33,7 @@ dds_topic_reader::dds_topic_reader( std::shared_ptr< dds_topic > const & topic,
std::shared_ptr< dds_subscriber > const & subscriber )
: _topic( topic )
, _subscriber( subscriber )
, _n_writers( 0 )
{
}

Expand Down Expand Up @@ -113,6 +115,21 @@ void dds_topic_reader::run( qos const & rqos )
}


bool dds_topic_reader::wait_for_writers( dds_time timeout )
{
// Better to use on_subscription_matched, but that would require additional data members etc.
// For now, keep it simple:
rsutils::time::timer timer( std::chrono::nanoseconds( timeout.to_ns() ) );
while( _n_writers.load() < 1 )
{
if( timer.has_expired() )
return false;
std::this_thread::sleep_for( std::chrono::milliseconds( 250 ) );
}
return true;
}


void dds_topic_reader::stop()
{
if( _subscriber )
Expand Down

0 comments on commit cf094a3

Please sign in to comment.