Skip to content

Commit

Permalink
PR #12275 from Eran: Context callback changes and device mutex removal
Browse files Browse the repository at this point in the history
  • Loading branch information
maloel authored Oct 11, 2023
2 parents 2c31402 + 8310364 commit 0ecde9f
Show file tree
Hide file tree
Showing 12 changed files with 109 additions and 127 deletions.
1 change: 1 addition & 0 deletions src/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#pragma once
#include "context.h"
#include "types.h" // notification
#include "core/extension.h"
#include "device.h"
#include <rsutils/string/from.h>
Expand Down
87 changes: 25 additions & 62 deletions src/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
#include "dds/rsdds-device-factory.h"
#endif

#include <librealsense2/hpp/rs_types.hpp> // rs2_devices_changed_callback
#include <librealsense2/rs.h> // RS2_API_VERSION_STR
#include <src/librealsense-exception.h>

#include <rsutils/easylogging/easyloggingpp.h>
#include <rsutils/string/from.h>
#include <rsutils/json.h>
using json = nlohmann::json;
Expand All @@ -19,7 +24,6 @@ namespace librealsense
context::context( json const & settings )
: _settings( settings )
, _device_mask( rsutils::json::get< unsigned >( settings, "device-mask", RS2_PRODUCT_LINE_ANY ) )
, _devices_changed_callback( nullptr, []( rs2_devices_changed_callback * ) {} )
{
static bool version_logged = false;
if( ! version_logged )
Expand All @@ -30,13 +34,13 @@ namespace librealsense

_factories.push_back( std::make_shared< backend_device_factory >(
*this,
[this]( std::vector< rs2_device_info > & removed, std::vector< rs2_device_info > & added )
[this]( std::vector< rs2_device_info > const & removed, std::vector< rs2_device_info > const & added )
{ invoke_devices_changed_callbacks( removed, added ); } ) );

#ifdef BUILD_WITH_DDS
_factories.push_back( std::make_shared< rsdds_device_factory >(
*this,
[this]( std::vector< rs2_device_info > & removed, std::vector< rs2_device_info > & added )
[this]( std::vector< rs2_device_info > const & removed, std::vector< rs2_device_info > const & added )
{ invoke_devices_changed_callbacks( removed, added ); } ) );
#endif
}
Expand Down Expand Up @@ -90,70 +94,29 @@ namespace librealsense
}


void context::invoke_devices_changed_callbacks( std::vector<rs2_device_info> & rs2_devices_info_removed,
std::vector<rs2_device_info> & rs2_devices_info_added )
void context::invoke_devices_changed_callbacks( std::vector< rs2_device_info > const & rs2_devices_info_removed,
std::vector< rs2_device_info > const & rs2_devices_info_added )
{
std::map<uint64_t, devices_changed_callback_ptr> devices_changed_callbacks;
{
std::lock_guard<std::mutex> lock( _devices_changed_callbacks_mtx );
devices_changed_callbacks = _devices_changed_callbacks;
}

for( auto & kvp : devices_changed_callbacks )
{
try
{
kvp.second->on_devices_changed( new rs2_device_list( { shared_from_this(), rs2_devices_info_removed } ),
new rs2_device_list( { shared_from_this(), rs2_devices_info_added } ) );
}
catch( ... )
{
LOG_ERROR( "Exception thrown from user callback handler" );
}
}

raise_devices_changed( rs2_devices_info_removed, rs2_devices_info_added );
}

void context::raise_devices_changed(const std::vector<rs2_device_info>& removed, const std::vector<rs2_device_info>& added)
{
if (_devices_changed_callback)
{
try
{
_devices_changed_callback->on_devices_changed(new rs2_device_list({ shared_from_this(), removed }),
new rs2_device_list({ shared_from_this(), added }));
}
catch( std::exception const & e )
{
LOG_ERROR( "Exception thrown from user callback handler: " << e.what() );
}
catch (...)
{
LOG_ERROR("Exception thrown from user callback handler");
}
}
_devices_changed.raise( rs2_devices_info_removed, rs2_devices_info_added );
}


uint64_t context::register_internal_device_callback(devices_changed_callback_ptr callback)
rsutils::subscription context::on_device_changes( devices_changed_callback_ptr callback )
{
std::lock_guard<std::mutex> lock(_devices_changed_callbacks_mtx);
auto callback_id = unique_id::generate_id();
_devices_changed_callbacks.insert(std::make_pair(callback_id, std::move(callback)));
return callback_id;
}

void context::unregister_internal_device_callback(uint64_t cb_id)
{
std::lock_guard<std::mutex> lock(_devices_changed_callbacks_mtx);
_devices_changed_callbacks.erase(cb_id);
}

void context::set_devices_changed_callback(devices_changed_callback_ptr callback)
{
std::lock_guard<std::mutex> lock(_devices_changed_callbacks_mtx);
_devices_changed_callback = std::move(callback);
return _devices_changed.subscribe(
[ctx = shared_from_this(), callback]( std::vector< rs2_device_info > const & removed,
std::vector< rs2_device_info > const & added )
{
try
{
callback->on_devices_changed( new rs2_device_list( { ctx, removed } ),
new rs2_device_list( { ctx, added } ) );
}
catch( std::exception const & e )
{
LOG_ERROR( "Exception thrown from user callback handler: " << e.what() );
}
} );
}


Expand Down
37 changes: 23 additions & 14 deletions src/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@

#pragma once

#include "types.h" // devices_changed_callback_ptr

#include <rsutils/signal.h>
#include <nlohmann/json.hpp>
#include <vector>
#include <map>


struct rs2_devices_changed_callback;


namespace librealsense
{
class context;
Expand Down Expand Up @@ -60,11 +62,19 @@ namespace librealsense
//
static unsigned combine_device_masks( unsigned requested_mask, unsigned mask_in_settings );

std::vector<std::shared_ptr<device_info>> query_devices(int mask) const;
// Query any subset of available devices and return them as device-info objects from which actual devices can be
// created as needed.
//
// Devices will match both the requested mask and the device-mask from the context settings. See
// RS2_PRODUCT_LINE_... defines for possible values.
//
std::vector< std::shared_ptr< device_info > > query_devices( int mask ) const;

uint64_t register_internal_device_callback(devices_changed_callback_ptr callback);
void unregister_internal_device_callback(uint64_t cb_id);
void set_devices_changed_callback(devices_changed_callback_ptr callback);
using devices_changed_callback_ptr = std::shared_ptr< rs2_devices_changed_callback >;

// Subscribe to a notification to receive when the device-list changes.
//
rsutils::subscription on_device_changes( devices_changed_callback_ptr callback );

// Let the context maintain a list of custom devices. These can be anything, like playback devices or devices
// maintained by the user.
Expand All @@ -74,20 +84,19 @@ namespace librealsense
const nlohmann::json & get_settings() const { return _settings; }

private:
void invoke_devices_changed_callbacks( std::vector<rs2_device_info> & rs2_devices_info_removed,
std::vector<rs2_device_info> & rs2_devices_info_added );
void raise_devices_changed(const std::vector<rs2_device_info>& removed, const std::vector<rs2_device_info>& added);
void invoke_devices_changed_callbacks( std::vector< rs2_device_info > const & devices_removed,
std::vector< rs2_device_info > const & devices_added );

std::map< std::string, std::weak_ptr< device_info > > _user_devices;
std::map<uint64_t, devices_changed_callback_ptr> _devices_changed_callbacks;
std::map< std::string /*address*/, std::weak_ptr< device_info > > _user_devices;

rsutils::signal< std::vector< rs2_device_info > const & /*removed*/,
std::vector< rs2_device_info > const & /*added*/ >
_devices_changed;

nlohmann::json _settings; // Save operation settings
unsigned const _device_mask;

std::vector< std::shared_ptr< device_factory > > _factories;

devices_changed_callback_ptr _devices_changed_callback;
std::mutex _devices_changed_callbacks_mtx;
};

}
4 changes: 3 additions & 1 deletion src/dds/rsdds-device-factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@

#include "rsdds-device-factory.h"
#include "context.h"

#include "rs-dds-device-info.h"

#include <librealsense2/h/rs_context.h> // RS2_PRODUCT_LINE_...

#include <realdds/dds-device-watcher.h>
#include <realdds/dds-participant.h>
#include <realdds/dds-device.h>
#include <realdds/topics/device-info-msg.h>

#include <rsutils/easylogging/easyloggingpp.h>
#include <rsutils/shared-ptr-singleton.h>
#include <rsutils/os/executable-name.h>
#include <rsutils/string/slice.h>
Expand Down
57 changes: 27 additions & 30 deletions src/device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,49 +34,46 @@ librealsense::find_profile( rs2_stream stream, int index, std::vector< stream_in
device::device( std::shared_ptr< const device_info > const & dev_info,
bool device_changed_notifications )
: _dev_info( dev_info )
, _is_valid( true )
, _is_alive( std::make_shared< bool >( true ) )
, _is_alive( std::make_shared< std::atomic< bool > >( true ) )
, _profiles_tags( [this]() { return get_profiles_tags(); } )
{
if( device_changed_notifications )
{
std::weak_ptr< bool > weak = _is_alive;
auto cb = new devices_changed_callback_internal([this, weak](rs2_device_list* removed, rs2_device_list* added)
{
// The callback can be called from one thread while the object is being destroyed by another.
// Check if members can still be accessed.
auto alive = weak.lock();
if( ! alive || ! *alive )
return;

// Update is_valid variable when device is invalid
std::lock_guard<std::mutex> lock(_device_changed_mtx);
for (auto& dev_info : removed->list)
std::weak_ptr< std::atomic< bool > > weak_alive = _is_alive;
std::weak_ptr< const device_info > weak_dev_info = _dev_info;
auto cb = new devices_changed_callback_internal(
[weak_alive, weak_dev_info]( rs2_device_list * removed, rs2_device_list * added )
{
if( dev_info.info->is_same_as( _dev_info ) )
{
_is_valid = false;
// The callback can be called from one thread while the object is being destroyed by another.
// Check if members can still be accessed.
auto alive = weak_alive.lock();
if( ! alive || ! *alive )
return;
auto this_dev_info = weak_dev_info.lock();
if( ! this_dev_info )
return;

// Update is_valid variable when device is invalid
for( auto & dev_info : removed->list )
{
if( dev_info.info->is_same_as( this_dev_info ) )
{
*alive = false;
return;
}
}
}
});
} );

_device_changed_callback_id
= get_context()->register_internal_device_callback( { cb,
[]( rs2_devices_changed_callback * p )
{
p->release();
} } );
_device_change_subscription = get_context()->on_device_changes( { cb,
[]( rs2_devices_changed_callback * p )
{
p->release();
} } );
}
}

device::~device()
{
*_is_alive = false;

if( _device_changed_callback_id )
get_context()->unregister_internal_device_callback( _device_changed_callback_id );

_sensors.clear();
}

Expand Down
17 changes: 6 additions & 11 deletions src/device.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
#include "device-info.h"

#include <rsutils/lazy.h>
#include <rsutils/subscription.h>
#include <chrono>
#include <memory>
#include <vector>
#include <atomic>


namespace librealsense {
Expand Down Expand Up @@ -52,11 +54,7 @@ class device

std::pair<uint32_t, rs2_extrinsics> get_extrinsics(const stream_interface& stream) const override;

bool is_valid() const override
{
std::lock_guard<std::mutex> lock(_device_changed_mtx);
return _is_valid;
}
bool is_valid() const override { return *_is_alive; }

void tag_profiles(stream_profiles profiles) const override;

Expand All @@ -66,7 +64,7 @@ class device

virtual void stop_activity() const;

bool device_changed_notifications_on() const { return _device_changed_callback_id; }
bool device_changed_notifications_on() const { return _device_change_subscription.is_active(); }

format_conversion get_format_conversion() const;

Expand All @@ -83,12 +81,9 @@ class device
private:
std::vector<std::shared_ptr<sensor_interface>> _sensors;
std::shared_ptr< const device_info > _dev_info;
bool _is_valid;
mutable std::mutex _device_changed_mtx;
uint64_t _device_changed_callback_id = 0;
std::shared_ptr< std::atomic< bool > > _is_alive;
rsutils::subscription _device_change_subscription;
rsutils::lazy< std::vector< tagged_profile > > _profiles_tags;

std::shared_ptr< bool > _is_alive; // Ensures object can be accessed
};


Expand Down
9 changes: 5 additions & 4 deletions src/device_hub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ namespace librealsense
device_hub::device_hub(std::shared_ptr<librealsense::context> ctx, int mask)
: _ctx( ctx )
, _mask( mask )
, _device_changes_callback_id( 0 )
{
_device_list = _ctx->query_devices(mask);

Expand All @@ -32,13 +31,15 @@ namespace librealsense
}
});

_device_changes_callback_id = _ctx->register_internal_device_callback({ cb, [](rs2_devices_changed_callback* p) { p->release(); } });
_device_change_subscription = _ctx->on_device_changes( { cb,
[]( rs2_devices_changed_callback * p )
{
p->release();
} } );
}

device_hub::~device_hub()
{
if (_device_changes_callback_id)
_ctx->unregister_internal_device_callback(_device_changes_callback_id);
}

std::shared_ptr<device_interface> device_hub::create_device(const std::string& serial, bool cycle_devices)
Expand Down
9 changes: 8 additions & 1 deletion src/device_hub.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@

#include "context.h"
#include "device.h"
#include <librealsense2/h/rs_context.h> // RS2_PRODUCT_LINE_...

#include <limits>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <memory>


namespace librealsense
{
Expand Down Expand Up @@ -52,7 +59,7 @@ namespace librealsense
std::condition_variable _cv;
std::vector<std::shared_ptr<device_info>> _device_list;
int _camera_index = 0;
uint64_t _device_changes_callback_id;
rsutils::subscription _device_change_subscription;
int _mask;
};
}
Loading

0 comments on commit 0ecde9f

Please sign in to comment.