diff --git a/src/dds/rs-dds-device-proxy.cpp b/src/dds/rs-dds-device-proxy.cpp index 8f99bb46d3..945e1764a5 100644 --- a/src/dds/rs-dds-device-proxy.cpp +++ b/src/dds/rs-dds-device-proxy.cpp @@ -23,6 +23,7 @@ #include #include #include +#include using rsutils::json; @@ -586,10 +587,14 @@ bool dds_device_proxy::check_fw_compatibility( const std::vector< uint8_t > & im try { // Start DFU + auto const crc = rsutils::number::calc_crc32( image.data(), image.size() ); + json dfu_start{ + { realdds::topics::control::key::id, realdds::topics::control::dfu_start::id }, + { realdds::topics::control::dfu_start::key::size, image.size() }, + { realdds::topics::control::dfu_start::key::crc, crc }, + }; json reply; - _dds_dev->send_control( - json::object( { { realdds::topics::control::key::id, realdds::topics::control::dfu_start::id } } ), - &reply ); + _dds_dev->send_control( dfu_start, &reply ); // Set up a reply handler that will get the "dfu-ready" message std::mutex mutex; diff --git a/third-party/realdds/include/realdds/dds-stream.h b/third-party/realdds/include/realdds/dds-stream.h index cc4571297b..4c1f837a94 100644 --- a/third-party/realdds/include/realdds/dds-stream.h +++ b/third-party/realdds/include/realdds/dds-stream.h @@ -38,6 +38,8 @@ class dds_stream : public dds_stream_base dds_stream( std::string const & stream_name, std::string const & sensor_name ); public: + ~dds_stream(); + bool is_open() const override { return !! _reader; } virtual void open( std::string const & topic_name, std::shared_ptr< dds_subscriber > const & ) = 0; virtual void close(); diff --git a/third-party/realdds/include/realdds/dds-topic-reader-thread.h b/third-party/realdds/include/realdds/dds-topic-reader-thread.h index 5cbf426f42..61a4eb3b3a 100644 --- a/third-party/realdds/include/realdds/dds-topic-reader-thread.h +++ b/third-party/realdds/include/realdds/dds-topic-reader-thread.h @@ -1,14 +1,21 @@ // License: Apache 2.0. See LICENSE file in root directory. -// Copyright(c) 2023 Intel Corporation. All Rights Reserved. - +// Copyright(c) 2023-4 Intel Corporation. All Rights Reserved. #pragma once #include "dds-topic-reader.h" -#include #include +namespace eprosima { +namespace fastdds { +namespace dds { +class GuardCondition; +} // namespace dds +} // namespace fastdds +} // namespace eprosima + + namespace realdds { @@ -31,7 +38,7 @@ class dds_topic_reader_thread : public dds_topic_reader { typedef dds_topic_reader super; - eprosima::fastdds::dds::GuardCondition _stopped; + std::shared_ptr< eprosima::fastdds::dds::GuardCondition > _stopped; std::thread _th; public: diff --git a/third-party/realdds/include/realdds/dds-topic-reader.h b/third-party/realdds/include/realdds/dds-topic-reader.h index 18b18cb6ce..ccaaf1be0f 100644 --- a/third-party/realdds/include/realdds/dds-topic-reader.h +++ b/third-party/realdds/include/realdds/dds-topic-reader.h @@ -1,5 +1,5 @@ // License: Apache 2.0. See LICENSE file in root directory. -// Copyright(c) 2022 Intel Corporation. All Rights Reserved. +// Copyright(c) 2022-4 Intel Corporation. All Rights Reserved. #pragma once #include @@ -31,11 +31,13 @@ class dds_subscriber; // You may choose to create one via a 'subscriber' that manages the activities of several readers. // on_data_available callback will be called when a sample is received. // -class dds_topic_reader : public eprosima::fastdds::dds::DataReaderListener +class dds_topic_reader + : public eprosima::fastdds::dds::DataReaderListener + , public std::enable_shared_from_this< dds_topic_reader > { protected: std::shared_ptr< dds_topic > const _topic; - std::shared_ptr < dds_subscriber > const _subscriber; + std::shared_ptr< dds_subscriber > const _subscriber; eprosima::fastdds::dds::DataReader * _reader = nullptr; diff --git a/third-party/realdds/include/realdds/topics/dds-topic-names.h b/third-party/realdds/include/realdds/topics/dds-topic-names.h index b83e430ff0..2d241a4a40 100644 --- a/third-party/realdds/include/realdds/topics/dds-topic-names.h +++ b/third-party/realdds/include/realdds/topics/dds-topic-names.h @@ -153,6 +153,10 @@ namespace control { } namespace dfu_start { extern std::string const id; + namespace key { + extern std::string const crc; + extern std::string const size; + } } namespace dfu_apply { using notification::dfu_apply::id; diff --git a/third-party/realdds/src/dds-device-impl.cpp b/third-party/realdds/src/dds-device-impl.cpp index 852e97e534..d0d87d8540 100644 --- a/third-party/realdds/src/dds-device-impl.cpp +++ b/third-party/realdds/src/dds-device-impl.cpp @@ -92,6 +92,15 @@ dds_device::impl::impl( std::shared_ptr< dds_participant > const & participant, } +dds_device::impl::~impl() +{ + if( _notifications_reader ) + _notifications_reader->stop(); + if( _metadata_reader ) + _metadata_reader->stop(); +} + + void dds_device::impl::reset() { // _info should already be up-to-date @@ -107,6 +116,8 @@ void dds_device::impl::reset() _streams.clear(); _options.clear(); _extrinsics_map.clear(); + if( _metadata_reader ) + _metadata_reader->stop(); _metadata_reader.reset(); } diff --git a/third-party/realdds/src/dds-device-impl.h b/third-party/realdds/src/dds-device-impl.h index 3dfaae4188..c029566356 100644 --- a/third-party/realdds/src/dds-device-impl.h +++ b/third-party/realdds/src/dds-device-impl.h @@ -73,6 +73,7 @@ class dds_device::impl impl( std::shared_ptr< dds_participant > const & participant, topics::device_info const & info ); + ~impl(); void reset(); diff --git a/third-party/realdds/src/dds-stream.cpp b/third-party/realdds/src/dds-stream.cpp index 478ae8c7b5..87176fc300 100644 --- a/third-party/realdds/src/dds-stream.cpp +++ b/third-party/realdds/src/dds-stream.cpp @@ -25,6 +25,13 @@ dds_stream::dds_stream( std::string const & stream_name, std::string const & sen } +dds_stream::~dds_stream() +{ + if( _reader ) + _reader->stop(); +} + + void dds_video_stream::open( std::string const & topic_name, std::shared_ptr< dds_subscriber > const & subscriber ) { if( is_open() ) @@ -63,6 +70,8 @@ void dds_motion_stream::open( std::string const & topic_name, std::shared_ptr< d void dds_stream::close() { + if( _reader ) + _reader->stop(); _reader.reset(); } diff --git a/third-party/realdds/src/dds-topic-reader-thread.cpp b/third-party/realdds/src/dds-topic-reader-thread.cpp index d397d1ae1f..742553da0d 100644 --- a/third-party/realdds/src/dds-topic-reader-thread.cpp +++ b/third-party/realdds/src/dds-topic-reader-thread.cpp @@ -1,5 +1,5 @@ // License: Apache 2.0. See LICENSE file in root directory. -// Copyright(c) 2023 Intel Corporation. All Rights Reserved. +// Copyright(c) 2023-4 Intel Corporation. All Rights Reserved. #include #include @@ -10,6 +10,7 @@ #include #include +#include #include namespace realdds { @@ -24,6 +25,7 @@ dds_topic_reader_thread::dds_topic_reader_thread( std::shared_ptr< dds_topic > c dds_topic_reader_thread::dds_topic_reader_thread( std::shared_ptr< dds_topic > const & topic, std::shared_ptr< dds_subscriber > const & subscriber ) : super( topic, subscriber ) + , _stopped( nullptr ) { } @@ -40,25 +42,31 @@ void dds_topic_reader_thread::run( qos const & rqos ) DDS_THROW( runtime_error, "on-data-available must be provided" ); _reader = DDS_API_CALL( _subscriber->get()->create_datareader( _topic->get(), rqos ) ); + _stopped = std::make_shared< eprosima::fastdds::dds::GuardCondition >(); _th = std::thread( - [this, name = _topic->get()->get_name()]() + [this, + weak = std::weak_ptr< dds_topic_reader >( shared_from_this() ), // detect lifetime + name = _topic->get()->get_name(), + stopped = _stopped] // hold a copy so the wait-set is valid even if the reader is destroyed { eprosima::fastdds::dds::WaitSet wait_set; - auto & condition = _reader->get_statuscondition(); - condition.set_enabled_statuses( eprosima::fastdds::dds::StatusMask::data_available() - << eprosima::fastdds::dds::StatusMask::subscription_matched() - << eprosima::fastdds::dds::StatusMask::sample_lost() ); - wait_set.attach_condition( condition ); - - wait_set.attach_condition( _stopped ); + if( auto strong = weak.lock() ) + { + auto & condition = _reader->get_statuscondition(); + condition.set_enabled_statuses( eprosima::fastdds::dds::StatusMask::data_available() + << eprosima::fastdds::dds::StatusMask::subscription_matched() + << eprosima::fastdds::dds::StatusMask::sample_lost() ); + wait_set.attach_condition( condition ); - while( ! _stopped.get_trigger_value() ) + wait_set.attach_condition( *stopped ); + } + // We'll keep locking the object so it cannot destruct mid-callback, and exit out if we detect destruction + while( auto strong = weak.lock() ) { eprosima::fastdds::dds::ConditionSeq active_conditions; wait_set.wait( active_conditions, eprosima::fastrtps::c_TimeInfinite ); - - if( _stopped.get_trigger_value() ) + if( stopped->get_trigger_value() ) break; auto & changed = _reader->get_status_changes(); @@ -67,16 +75,22 @@ void dds_topic_reader_thread::run( qos const & rqos ) eprosima::fastdds::dds::SampleLostStatus status; _reader->get_sample_lost_status( status ); on_sample_lost( _reader, status ); + if( stopped->get_trigger_value() ) + break; } if( changed.is_active( eprosima::fastdds::dds::StatusMask::data_available() ) ) { on_data_available( _reader ); + if( stopped->get_trigger_value() ) + break; } if( changed.is_active( eprosima::fastdds::dds::StatusMask::subscription_matched() ) ) { eprosima::fastdds::dds::SubscriptionMatchedStatus status; _reader->get_subscription_matched_status( status ); on_subscription_matched( _reader, status ); + if( stopped->get_trigger_value() ) + break; } } } ); @@ -87,8 +101,14 @@ void dds_topic_reader_thread::stop() { if( _th.joinable() ) { - _stopped.set_trigger_value( true ); - _th.join(); + _stopped->set_trigger_value( true ); + // If we try to stop from within the thread (e.g., inside on_data_available), join() will terminate! + // If we detect such a case, then it's also possible our object will get destroyed (we may get here from the + // dtor) so we detach the thread instead. + if( _th.get_id() != std::this_thread::get_id() ) + _th.join(); + else + _th.detach(); } super::stop(); } diff --git a/third-party/realdds/src/topics/dds-topic-names.cpp b/third-party/realdds/src/topics/dds-topic-names.cpp index a4002928ac..5c067b6f1c 100644 --- a/third-party/realdds/src/topics/dds-topic-names.cpp +++ b/third-party/realdds/src/topics/dds-topic-names.cpp @@ -136,6 +136,10 @@ namespace control { } namespace dfu_start { std::string const id( "dfu-start", 9 ); + namespace key { + std::string const crc( "crc", 3 ); + std::string const size( "size", 4 ); + } } namespace dfu_apply { //using notification::dfu_apply::id; diff --git a/tools/dds/dds-adapter/lrs-device-controller.cpp b/tools/dds/dds-adapter/lrs-device-controller.cpp index 84b753e56f..876c599bd4 100644 --- a/tools/dds/dds-adapter/lrs-device-controller.cpp +++ b/tools/dds/dds-adapter/lrs-device-controller.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -1205,9 +1206,11 @@ struct lrs_device_controller::dfu_support std::string uid; std::weak_ptr< dds_device_server > server; std::weak_ptr< lrs_device_controller > controller; - std::shared_ptr< realdds::dds_topic_reader > reader; + std::shared_ptr< realdds::dds_topic_reader_thread > reader; std::shared_ptr< realdds::topics::blob_msg > image; realdds::dds_guid_prefix initiator; + size_t image_size; + uint32_t image_crc; std::string debug_name() const { @@ -1215,6 +1218,19 @@ struct lrs_device_controller::dfu_support return rsutils::string::from() << "[" << dev->debug_name() << "] "; return {}; } + + // reset to a non-DFU state + void reset( char const * error = nullptr ) + { + if( reader ) + reader->stop(); + if( auto c = controller.lock() ) + { + if( error ) + LOG_ERROR( debug_name() << "DFU " << error ); + c->_dfu.reset(); // no longer in DFU state + } + } }; @@ -1239,6 +1255,8 @@ bool lrs_device_controller::on_dfu_start( rsutils::json const & control, rsutils _dfu->initiator = realdds::guid_from_string( reply[realdds::topics::reply::key::sample][0].string_ref() ).guidPrefix; _dfu->uid = _rs_dev.get_info( RS2_CAMERA_INFO_FIRMWARE_UPDATE_ID ); + control.at( realdds::topics::control::dfu_start::key::size ).get_to( _dfu->image_size ); // throws + control.at( realdds::topics::control::dfu_start::key::crc ).get_to( _dfu->image_crc ); // throws // Open a DFU topic and wait for the image on another thread auto topic = topics::blob_msg::create_topic( _dds_device_server->participant(), @@ -1269,19 +1287,29 @@ bool lrs_device_controller::on_dfu_start( rsutils::json const & control, rsutils } else { - size_t const n_bytes = blob.data().size(); - auto const crc = rsutils::number::calc_crc32( blob.data().data(), blob.data().size() ); - LOG_INFO( dfu->debug_name() << "DFU image received, " << n_bytes << " bytes, crc " << crc ); + LOG_INFO( dfu->debug_name() << "DFU image received" ); // Build a reply rsutils::json j = rsutils::json::object( { { realdds::topics::notification::key::id, realdds::topics::notification::dfu_ready::id }, - { "size", n_bytes }, - { "crc", crc } } ); + } ); try { // Check the image + size_t const n_bytes = blob.data().size(); + if( n_bytes != dfu->image_size ) + throw std::runtime_error( rsutils::string::from() + << "image size (" << n_bytes << ") does not match expected (" + << dfu->image_size << ")" ); + + auto const crc = rsutils::number::calc_crc32( blob.data().data(), blob.data().size() ); + if( crc != dfu->image_crc ) + throw std::runtime_error( rsutils::string::from() + << "image CRC (" << rsutils::string::hexdump( crc ) + << ") does not match expected (" + << rsutils::string::hexdump( dfu->image_crc ) << ")" ); + rs2_error * e = nullptr; bool is_compatible = rs2_check_firmware_compatibility( dfu->rsdev.get().get(), blob.data().data(), @@ -1300,8 +1328,7 @@ bool lrs_device_controller::on_dfu_start( rsutils::json const & control, rsutils j[realdds::topics::reply::key::status] = "check-fw-compat"; j[realdds::topics::reply::key::explanation] = e.what(); LOG_ERROR( dfu->debug_name() << "DFU image check failed: " << e.what() << "; exiting DFU state" ); - if( auto controller = dfu->controller.lock() ) - controller->_dfu.reset(); // no longer in DFU state + dfu->reset(); } if( auto server = dfu->server.lock() ) @@ -1323,11 +1350,7 @@ bool lrs_device_controller::on_dfu_start( rsutils::json const & control, rsutils { if( ! dfu->image ) { - if( auto controller = dfu->controller.lock() ) - { - LOG_ERROR( dfu->debug_name() << "DFU timed out waiting for image; resetting" ); - controller->_dfu.reset(); // no longer in DFU state - } + dfu->reset( "timed out waiting for image; resetting" ); if( auto server = dfu->server.lock() ) { rsutils::json j = rsutils::json::object( @@ -1347,11 +1370,7 @@ bool lrs_device_controller::on_dfu_start( rsutils::json const & control, rsutils std::this_thread::sleep_for( std::chrono::seconds( 10 ) ); if( auto dfu = weak_dfu.lock() ) { - if( auto controller = dfu->controller.lock() ) - { - LOG_ERROR( dfu->debug_name() << "DFU timed out waiting for apply; resetting" ); - controller->_dfu.reset(); // no longer in DFU state - } + dfu->reset( "timed out waiting for apply; resetting" ); if( auto server = dfu->server.lock() ) { rsutils::json j = rsutils::json::object( @@ -1510,8 +1529,7 @@ bool lrs_device_controller::on_dfu_apply( rsutils::json const & control, rsutils } // Whether successful or not, we're done with the DFU - if( auto controller = dfu->controller.lock() ) - controller->_dfu.reset(); + dfu->reset(); } ) .detach(); diff --git a/unit-tests/dds/metadata-server.py b/unit-tests/dds/metadata-server.py deleted file mode 100644 index 0b4a1c182c..0000000000 --- a/unit-tests/dds/metadata-server.py +++ /dev/null @@ -1,54 +0,0 @@ -# License: Apache 2.0. See LICENSE file in root directory. -# Copyright(c) 2023 Intel Corporation. All Rights Reserved. - -import pyrealdds as dds -from rspy import log, test - -dds.debug( log.is_debug_on(), log.nested ) - -import d435i - - -participant = dds.participant() -participant.init( 123, "server" ) - - -# set up a server device with a single color stream -device_server = dds.device_server( participant, d435i.device_info.topic_root ) - -color_stream = dds.color_stream_server( "Color", "RGB Camera" ) -color_stream.enable_metadata() # not there in d435i by default -color_stream.init_profiles( d435i.color_stream_profiles(), 0 ) -color_stream.init_options( [] ) -color_stream.set_intrinsics( d435i.color_stream_intrinsics() ) - -def on_control( server, id, control, reply ): - # the control has already been output to debug by the calling code, as will the reply - return True # otherwise the control will be flagged as error - -device_server.on_control( on_control ) -device_server.init( [color_stream], [], {} ) - - -def broadcast(): - global device_server - device_server.broadcast( d435i.device_info ) - - -def new_image( width, height, bpp, timestamp_as_ns = None ): - i = dds.message.image() - i.width = width - i.height = height - i.data = bytearray( width * height * bpp ) - if timestamp_as_ns is not None: - i.timestamp = dds.time.from_ns( timestamp_as_ns ) - return i - - -def publish_image( img, timestamp ): - img.timestamp = timestamp - color_stream.publish_image( img ) - - -# From here down, we're in "interactive" mode (see test-metadata.py) -# ... diff --git a/unit-tests/dds/test-metadata-syncer.py b/unit-tests/dds/test-md-syncer.py similarity index 100% rename from unit-tests/dds/test-metadata-syncer.py rename to unit-tests/dds/test-md-syncer.py diff --git a/unit-tests/dds/test-metadata.py b/unit-tests/dds/test-metadata.py index 2184851aab..71340221ce 100644 --- a/unit-tests/dds/test-metadata.py +++ b/unit-tests/dds/test-metadata.py @@ -10,29 +10,73 @@ # latter and we have a problem where the broadcaster does not work. #test:donotrun:linux - import pyrealdds as dds from rspy import log, test -from time import sleep -from rspy.timer import Timer +import d435i -dds.debug( log.is_debug_on(), 'C ' ) -log.nested = 'C ' -import d435i +with test.remote.fork( nested_indent=' S' ) as remote: + if remote is None: # we're the server fork + + dds.debug( log.is_debug_on(), log.nested ) + + participant = dds.participant() + participant.init( 123, "server" ) + + # set up a server device with a single color stream + device_server = dds.device_server( participant, d435i.device_info.topic_root ) + + color_stream = dds.color_stream_server( 'Color', 'RGB Camera' ) + color_stream.enable_metadata() # not there in d435i by default + color_stream.init_profiles( d435i.color_stream_profiles(), 0 ) + color_stream.init_options( [] ) + color_stream.set_intrinsics( d435i.color_stream_intrinsics() ) + + def on_control( server, id, control, reply ): + # the control has already been output to debug by the calling code, as will the reply + return True # otherwise the control will be flagged as error + + device_server.on_control( on_control ) + device_server.init( [color_stream], [], {} ) + + + def broadcast(): + global device_server + device_server.broadcast( d435i.device_info ) + + def new_image( width, height, bpp, timestamp_as_ns = None ): + i = dds.message.image() + i.width = width + i.height = height + i.data = bytearray( width * height * bpp ) + if timestamp_as_ns is not None: + i.timestamp = dds.time.from_ns( timestamp_as_ns ) + return i -participant = dds.participant() -participant.init( 123, "client" ) + def publish_image( img, timestamp ): + img.timestamp = timestamp + color_stream.publish_image( img ) -# run the server in another process: GHA has some issue with it in the same process... -import os.path -cwd = os.path.dirname(os.path.realpath(__file__)) -remote_script = os.path.join( cwd, 'metadata-server.py' ) -with test.remote( remote_script, nested_indent=" S" ) as remote: - remote.wait_until_ready() + # From here down, we're in "interactive" mode (see test-metadata.py) + # ... + raise StopIteration() + + + ############################################################################################################### + # The client + # + + from time import sleep + from rspy.timer import Timer + + dds.debug( log.is_debug_on(), 'C ' ) + log.nested = 'C ' + + participant = dds.participant() + participant.init( 123, "client" ) # set up the client device and keep all its streams - this is connected directly and we can get notifications on it! device_direct = dds.device( participant, d435i.device_info ) @@ -233,7 +277,6 @@ def __exit__( self, type, value, traceback ): # with test.closure( "Metadata without a stream name is ignored" ): pass - # - ############################################################################################# - # -test.print_results_and_exit() + + +test.print_results()