diff --git a/src/api.h b/src/api.h index d451bb7aae..845d1fedb5 100644 --- a/src/api.h +++ b/src/api.h @@ -4,6 +4,7 @@ #pragma once #include "context.h" +#include "types.h" // notification #include "core/extension.h" #include "device.h" #include diff --git a/src/context.cpp b/src/context.cpp index 96e0694614..ec6a95bf16 100644 --- a/src/context.cpp +++ b/src/context.cpp @@ -9,6 +9,11 @@ #include "dds/rsdds-device-factory.h" #endif +#include // rs2_devices_changed_callback +#include // RS2_API_VERSION_STR +#include + +#include #include #include using json = nlohmann::json; @@ -19,7 +24,6 @@ namespace librealsense context::context( json const & settings ) : _settings( settings ) , _device_mask( rsutils::json::get< unsigned >( settings, "device-mask", RS2_PRODUCT_LINE_ANY ) ) - , _devices_changed_callback( nullptr, []( rs2_devices_changed_callback * ) {} ) { static bool version_logged = false; if( ! version_logged ) @@ -30,13 +34,13 @@ namespace librealsense _factories.push_back( std::make_shared< backend_device_factory >( *this, - [this]( std::vector< rs2_device_info > & removed, std::vector< rs2_device_info > & added ) + [this]( std::vector< rs2_device_info > const & removed, std::vector< rs2_device_info > const & added ) { invoke_devices_changed_callbacks( removed, added ); } ) ); #ifdef BUILD_WITH_DDS _factories.push_back( std::make_shared< rsdds_device_factory >( *this, - [this]( std::vector< rs2_device_info > & removed, std::vector< rs2_device_info > & added ) + [this]( std::vector< rs2_device_info > const & removed, std::vector< rs2_device_info > const & added ) { invoke_devices_changed_callbacks( removed, added ); } ) ); #endif } @@ -90,70 +94,29 @@ namespace librealsense } - void context::invoke_devices_changed_callbacks( std::vector & rs2_devices_info_removed, - std::vector & rs2_devices_info_added ) + void context::invoke_devices_changed_callbacks( std::vector< rs2_device_info > const & rs2_devices_info_removed, + std::vector< rs2_device_info > const & rs2_devices_info_added ) { - std::map devices_changed_callbacks; - { - std::lock_guard lock( _devices_changed_callbacks_mtx ); - devices_changed_callbacks = _devices_changed_callbacks; - } - - for( auto & kvp : devices_changed_callbacks ) - { - try - { - kvp.second->on_devices_changed( new rs2_device_list( { shared_from_this(), rs2_devices_info_removed } ), - new rs2_device_list( { shared_from_this(), rs2_devices_info_added } ) ); - } - catch( ... ) - { - LOG_ERROR( "Exception thrown from user callback handler" ); - } - } - - raise_devices_changed( rs2_devices_info_removed, rs2_devices_info_added ); - } - - void context::raise_devices_changed(const std::vector& removed, const std::vector& added) - { - if (_devices_changed_callback) - { - try - { - _devices_changed_callback->on_devices_changed(new rs2_device_list({ shared_from_this(), removed }), - new rs2_device_list({ shared_from_this(), added })); - } - catch( std::exception const & e ) - { - LOG_ERROR( "Exception thrown from user callback handler: " << e.what() ); - } - catch (...) - { - LOG_ERROR("Exception thrown from user callback handler"); - } - } + _devices_changed.raise( rs2_devices_info_removed, rs2_devices_info_added ); } - uint64_t context::register_internal_device_callback(devices_changed_callback_ptr callback) + rsutils::subscription context::on_device_changes( devices_changed_callback_ptr callback ) { - std::lock_guard lock(_devices_changed_callbacks_mtx); - auto callback_id = unique_id::generate_id(); - _devices_changed_callbacks.insert(std::make_pair(callback_id, std::move(callback))); - return callback_id; - } - - void context::unregister_internal_device_callback(uint64_t cb_id) - { - std::lock_guard lock(_devices_changed_callbacks_mtx); - _devices_changed_callbacks.erase(cb_id); - } - - void context::set_devices_changed_callback(devices_changed_callback_ptr callback) - { - std::lock_guard lock(_devices_changed_callbacks_mtx); - _devices_changed_callback = std::move(callback); + return _devices_changed.subscribe( + [ctx = shared_from_this(), callback]( std::vector< rs2_device_info > const & removed, + std::vector< rs2_device_info > const & added ) + { + try + { + callback->on_devices_changed( new rs2_device_list( { ctx, removed } ), + new rs2_device_list( { ctx, added } ) ); + } + catch( std::exception const & e ) + { + LOG_ERROR( "Exception thrown from user callback handler: " << e.what() ); + } + } ); } diff --git a/src/context.h b/src/context.h index fef0d444ea..57ab4d8a8e 100644 --- a/src/context.h +++ b/src/context.h @@ -3,13 +3,15 @@ #pragma once -#include "types.h" // devices_changed_callback_ptr - +#include #include #include #include +struct rs2_devices_changed_callback; + + namespace librealsense { class context; @@ -60,11 +62,19 @@ namespace librealsense // static unsigned combine_device_masks( unsigned requested_mask, unsigned mask_in_settings ); - std::vector> query_devices(int mask) const; + // Query any subset of available devices and return them as device-info objects from which actual devices can be + // created as needed. + // + // Devices will match both the requested mask and the device-mask from the context settings. See + // RS2_PRODUCT_LINE_... defines for possible values. + // + std::vector< std::shared_ptr< device_info > > query_devices( int mask ) const; - uint64_t register_internal_device_callback(devices_changed_callback_ptr callback); - void unregister_internal_device_callback(uint64_t cb_id); - void set_devices_changed_callback(devices_changed_callback_ptr callback); + using devices_changed_callback_ptr = std::shared_ptr< rs2_devices_changed_callback >; + + // Subscribe to a notification to receive when the device-list changes. + // + rsutils::subscription on_device_changes( devices_changed_callback_ptr callback ); // Let the context maintain a list of custom devices. These can be anything, like playback devices or devices // maintained by the user. @@ -74,20 +84,19 @@ namespace librealsense const nlohmann::json & get_settings() const { return _settings; } private: - void invoke_devices_changed_callbacks( std::vector & rs2_devices_info_removed, - std::vector & rs2_devices_info_added ); - void raise_devices_changed(const std::vector& removed, const std::vector& added); + void invoke_devices_changed_callbacks( std::vector< rs2_device_info > const & devices_removed, + std::vector< rs2_device_info > const & devices_added ); - std::map< std::string, std::weak_ptr< device_info > > _user_devices; - std::map _devices_changed_callbacks; + std::map< std::string /*address*/, std::weak_ptr< device_info > > _user_devices; + + rsutils::signal< std::vector< rs2_device_info > const & /*removed*/, + std::vector< rs2_device_info > const & /*added*/ > + _devices_changed; nlohmann::json _settings; // Save operation settings unsigned const _device_mask; std::vector< std::shared_ptr< device_factory > > _factories; - - devices_changed_callback_ptr _devices_changed_callback; - std::mutex _devices_changed_callbacks_mtx; }; } diff --git a/src/dds/rsdds-device-factory.cpp b/src/dds/rsdds-device-factory.cpp index 0716c977e5..aad693eed9 100644 --- a/src/dds/rsdds-device-factory.cpp +++ b/src/dds/rsdds-device-factory.cpp @@ -3,14 +3,16 @@ #include "rsdds-device-factory.h" #include "context.h" - #include "rs-dds-device-info.h" +#include // RS2_PRODUCT_LINE_... + #include #include #include #include +#include #include #include #include diff --git a/src/device.cpp b/src/device.cpp index 99e3ee4741..307820bf2c 100644 --- a/src/device.cpp +++ b/src/device.cpp @@ -34,49 +34,46 @@ librealsense::find_profile( rs2_stream stream, int index, std::vector< stream_in device::device( std::shared_ptr< const device_info > const & dev_info, bool device_changed_notifications ) : _dev_info( dev_info ) - , _is_valid( true ) - , _is_alive( std::make_shared< bool >( true ) ) + , _is_alive( std::make_shared< std::atomic< bool > >( true ) ) , _profiles_tags( [this]() { return get_profiles_tags(); } ) { if( device_changed_notifications ) { - std::weak_ptr< bool > weak = _is_alive; - auto cb = new devices_changed_callback_internal([this, weak](rs2_device_list* removed, rs2_device_list* added) - { - // The callback can be called from one thread while the object is being destroyed by another. - // Check if members can still be accessed. - auto alive = weak.lock(); - if( ! alive || ! *alive ) - return; - - // Update is_valid variable when device is invalid - std::lock_guard lock(_device_changed_mtx); - for (auto& dev_info : removed->list) + std::weak_ptr< std::atomic< bool > > weak_alive = _is_alive; + std::weak_ptr< const device_info > weak_dev_info = _dev_info; + auto cb = new devices_changed_callback_internal( + [weak_alive, weak_dev_info]( rs2_device_list * removed, rs2_device_list * added ) { - if( dev_info.info->is_same_as( _dev_info ) ) - { - _is_valid = false; + // The callback can be called from one thread while the object is being destroyed by another. + // Check if members can still be accessed. + auto alive = weak_alive.lock(); + if( ! alive || ! *alive ) return; + auto this_dev_info = weak_dev_info.lock(); + if( ! this_dev_info ) + return; + + // Update is_valid variable when device is invalid + for( auto & dev_info : removed->list ) + { + if( dev_info.info->is_same_as( this_dev_info ) ) + { + *alive = false; + return; + } } - } - }); + } ); - _device_changed_callback_id - = get_context()->register_internal_device_callback( { cb, - []( rs2_devices_changed_callback * p ) - { - p->release(); - } } ); + _device_change_subscription = get_context()->on_device_changes( { cb, + []( rs2_devices_changed_callback * p ) + { + p->release(); + } } ); } } device::~device() { - *_is_alive = false; - - if( _device_changed_callback_id ) - get_context()->unregister_internal_device_callback( _device_changed_callback_id ); - _sensors.clear(); } diff --git a/src/device.h b/src/device.h index c2f538d22d..27d36ae54a 100644 --- a/src/device.h +++ b/src/device.h @@ -10,9 +10,11 @@ #include "device-info.h" #include +#include #include #include #include +#include namespace librealsense { @@ -52,11 +54,7 @@ class device std::pair get_extrinsics(const stream_interface& stream) const override; - bool is_valid() const override - { - std::lock_guard lock(_device_changed_mtx); - return _is_valid; - } + bool is_valid() const override { return *_is_alive; } void tag_profiles(stream_profiles profiles) const override; @@ -66,7 +64,7 @@ class device virtual void stop_activity() const; - bool device_changed_notifications_on() const { return _device_changed_callback_id; } + bool device_changed_notifications_on() const { return _device_change_subscription.is_active(); } format_conversion get_format_conversion() const; @@ -83,12 +81,9 @@ class device private: std::vector> _sensors; std::shared_ptr< const device_info > _dev_info; - bool _is_valid; - mutable std::mutex _device_changed_mtx; - uint64_t _device_changed_callback_id = 0; + std::shared_ptr< std::atomic< bool > > _is_alive; + rsutils::subscription _device_change_subscription; rsutils::lazy< std::vector< tagged_profile > > _profiles_tags; - - std::shared_ptr< bool > _is_alive; // Ensures object can be accessed }; diff --git a/src/device_hub.cpp b/src/device_hub.cpp index 4d402cac47..82b92a8d00 100644 --- a/src/device_hub.cpp +++ b/src/device_hub.cpp @@ -14,7 +14,6 @@ namespace librealsense device_hub::device_hub(std::shared_ptr ctx, int mask) : _ctx( ctx ) , _mask( mask ) - , _device_changes_callback_id( 0 ) { _device_list = _ctx->query_devices(mask); @@ -32,13 +31,15 @@ namespace librealsense } }); - _device_changes_callback_id = _ctx->register_internal_device_callback({ cb, [](rs2_devices_changed_callback* p) { p->release(); } }); + _device_change_subscription = _ctx->on_device_changes( { cb, + []( rs2_devices_changed_callback * p ) + { + p->release(); + } } ); } device_hub::~device_hub() { - if (_device_changes_callback_id) - _ctx->unregister_internal_device_callback(_device_changes_callback_id); } std::shared_ptr device_hub::create_device(const std::string& serial, bool cycle_devices) diff --git a/src/device_hub.h b/src/device_hub.h index f01f036035..82b14455f1 100644 --- a/src/device_hub.h +++ b/src/device_hub.h @@ -5,7 +5,14 @@ #include "context.h" #include "device.h" +#include // RS2_PRODUCT_LINE_... + #include +#include +#include +#include +#include + namespace librealsense { @@ -52,7 +59,7 @@ namespace librealsense std::condition_variable _cv; std::vector> _device_list; int _camera_index = 0; - uint64_t _device_changes_callback_id; + rsutils::subscription _device_change_subscription; int _mask; }; } diff --git a/src/rs.cpp b/src/rs.cpp index aae13abdd2..a375c3de97 100644 --- a/src/rs.cpp +++ b/src/rs.cpp @@ -92,6 +92,7 @@ struct rs2_sensor : public rs2_options struct rs2_context { std::shared_ptr ctx; + mutable rsutils::subscription devices_changed_subscription; }; struct rs2_device_hub @@ -824,7 +825,7 @@ void rs2_set_devices_changed_callback(const rs2_context* context, rs2_devices_ch librealsense::devices_changed_callback_ptr cb( new librealsense::devices_changed_callback(callback, user), [](rs2_devices_changed_callback* p) { delete p; }); - context->ctx->set_devices_changed_callback(std::move(cb)); + context->devices_changed_subscription = context->ctx->on_device_changes( cb ); } HANDLE_EXCEPTIONS_AND_RETURN(, context, callback, user) @@ -882,7 +883,7 @@ void rs2_set_devices_changed_callback_cpp(rs2_context* context, rs2_devices_chan } }; VALIDATE_NOT_NULL(context); - context->ctx->set_devices_changed_callback( callback_ptr ); + context->devices_changed_subscription = context->ctx->on_device_changes( callback_ptr ); } HANDLE_EXCEPTIONS_AND_RETURN(, context, callback) diff --git a/src/rscore/device-factory.h b/src/rscore/device-factory.h index 592f45304b..92f6d01276 100644 --- a/src/rscore/device-factory.h +++ b/src/rscore/device-factory.h @@ -39,8 +39,8 @@ class device_factory public: // Callbacks take this form. // - using callback = std::function< void( std::vector< rs2_device_info > & devices_removed, - std::vector< rs2_device_info > & devices_added ) >; + using callback = std::function< void( std::vector< rs2_device_info > const & devices_removed, + std::vector< rs2_device_info > const & devices_added ) >; virtual ~device_factory() = default; diff --git a/third-party/rsutils/include/rsutils/signal.h b/third-party/rsutils/include/rsutils/signal.h index 29cfb51d0d..c798ef4ac9 100644 --- a/third-party/rsutils/include/rsutils/signal.h +++ b/third-party/rsutils/include/rsutils/signal.h @@ -9,6 +9,7 @@ #include #include #include +#include namespace rsutils { diff --git a/third-party/rsutils/include/rsutils/subscription.h b/third-party/rsutils/include/rsutils/subscription.h index dd3d70ddd3..4c37ef4c3c 100644 --- a/third-party/rsutils/include/rsutils/subscription.h +++ b/third-party/rsutils/include/rsutils/subscription.h @@ -44,6 +44,11 @@ class subscription return *this; } + // Return true if the subscription is active, meaning subscription-needs-to-be-cancelled-if-alive: + // The signal/subscription source may already be gone but we won't know that until we try to cancel, so active does + // not mean the source is alive! I.e., if true then a subscription was made and cancel() will get called. + bool is_active() const { return _d; } + // Unsubscribing is "canceling" the subscription. Will not throw if we've already cancelled. void cancel() {