From cf094a3007e26305d9c330a236a9f87a7582caf8 Mon Sep 17 00:00:00 2001 From: Eran Date: Thu, 9 May 2024 14:04:40 +0300 Subject: [PATCH] add dds_topic_reader::wait_for_writers() and stop() --- .../realdds/include/realdds/dds-topic-reader.h | 7 ++++++- third-party/realdds/py/pyrealdds.cpp | 4 ++++ third-party/realdds/src/dds-topic-reader.cpp | 17 +++++++++++++++++ 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/third-party/realdds/include/realdds/dds-topic-reader.h b/third-party/realdds/include/realdds/dds-topic-reader.h index ccaaf1be0f..5f48ef76a1 100644 --- a/third-party/realdds/include/realdds/dds-topic-reader.h +++ b/third-party/realdds/include/realdds/dds-topic-reader.h @@ -4,10 +4,12 @@ #include #include +#include "dds-defines.h" #include #include #include +#include namespace eprosima { @@ -41,7 +43,7 @@ class dds_topic_reader eprosima::fastdds::dds::DataReader * _reader = nullptr; - int _n_writers = 0; + std::atomic< int > _n_writers; public: dds_topic_reader( std::shared_ptr< dds_topic > const & topic ); @@ -89,6 +91,9 @@ class dds_topic_reader // The callbacks should be set before we actually create the underlying DDS objects, so the reader does not virtual void run( qos const & ); + // Waits until writers are detected; return false on timeout + bool wait_for_writers( dds_time timeout ); + // Go back to a pre-run() state, such that is_running() returns false virtual void stop(); diff --git a/third-party/realdds/py/pyrealdds.cpp b/third-party/realdds/py/pyrealdds.cpp index bc7531b55c..a4c8b8a4ac 100644 --- a/third-party/realdds/py/pyrealdds.cpp +++ b/third-party/realdds/py/pyrealdds.cpp @@ -408,6 +408,10 @@ PYBIND11_MODULE(NAME, m) { callback( self, status.total_count, status.total_count_change ); ) ) .def( "topic", &dds_topic_reader::topic ) .def( "run", &dds_topic_reader::run ) + .def( "wait_for_writers", &dds_topic_reader::wait_for_writers ) + .def( "stop", &dds_topic_reader::stop, + // GIL release needed: stop will wait and cause a hang if we're inside a callback + py::call_guard< py::gil_scoped_release >() ) .def( "qos", []() { return reader_qos(); } ) .def( "qos", []( reliability r, durability d ) { return reader_qos( r, d ); } ); diff --git a/third-party/realdds/src/dds-topic-reader.cpp b/third-party/realdds/src/dds-topic-reader.cpp index be267403f5..6dcaa96b1a 100644 --- a/third-party/realdds/src/dds-topic-reader.cpp +++ b/third-party/realdds/src/dds-topic-reader.cpp @@ -16,6 +16,7 @@ #include #include +#include #include @@ -32,6 +33,7 @@ dds_topic_reader::dds_topic_reader( std::shared_ptr< dds_topic > const & topic, std::shared_ptr< dds_subscriber > const & subscriber ) : _topic( topic ) , _subscriber( subscriber ) + , _n_writers( 0 ) { } @@ -113,6 +115,21 @@ void dds_topic_reader::run( qos const & rqos ) } +bool dds_topic_reader::wait_for_writers( dds_time timeout ) +{ + // Better to use on_subscription_matched, but that would require additional data members etc. + // For now, keep it simple: + rsutils::time::timer timer( std::chrono::nanoseconds( timeout.to_ns() ) ); + while( _n_writers.load() < 1 ) + { + if( timer.has_expired() ) + return false; + std::this_thread::sleep_for( std::chrono::milliseconds( 250 ) ); + } + return true; +} + + void dds_topic_reader::stop() { if( _subscriber )