Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Context callback changes and device mutex removal #12275

Merged
merged 5 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#pragma once
#include "context.h"
#include "types.h" // notification
#include "core/extension.h"
#include "device.h"
#include <rsutils/string/from.h>
Expand Down
87 changes: 25 additions & 62 deletions src/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
#include "dds/rsdds-device-factory.h"
#endif

#include <librealsense2/hpp/rs_types.hpp> // rs2_devices_changed_callback
#include <librealsense2/rs.h> // RS2_API_VERSION_STR
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure the comment is needed.
Tomorow if we add another used and it compiles, we will need to update this comment?
Will not happen :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it helps -- at least for this version of the code.
But you're right, it likely won't get updated.
Hopefully I'm getting the context to be focused-enough so that it'll never have to be updated again...

#include <src/librealsense-exception.h>

#include <rsutils/easylogging/easyloggingpp.h>
#include <rsutils/string/from.h>
#include <rsutils/json.h>
using json = nlohmann::json;
Expand All @@ -19,7 +24,6 @@ namespace librealsense
context::context( json const & settings )
: _settings( settings )
, _device_mask( rsutils::json::get< unsigned >( settings, "device-mask", RS2_PRODUCT_LINE_ANY ) )
, _devices_changed_callback( nullptr, []( rs2_devices_changed_callback * ) {} )
{
static bool version_logged = false;
if( ! version_logged )
Expand All @@ -30,13 +34,13 @@ namespace librealsense

_factories.push_back( std::make_shared< backend_device_factory >(
*this,
[this]( std::vector< rs2_device_info > & removed, std::vector< rs2_device_info > & added )
[this]( std::vector< rs2_device_info > const & removed, std::vector< rs2_device_info > const & added )
{ invoke_devices_changed_callbacks( removed, added ); } ) );

#ifdef BUILD_WITH_DDS
_factories.push_back( std::make_shared< rsdds_device_factory >(
*this,
[this]( std::vector< rs2_device_info > & removed, std::vector< rs2_device_info > & added )
[this]( std::vector< rs2_device_info > const & removed, std::vector< rs2_device_info > const & added )
{ invoke_devices_changed_callbacks( removed, added ); } ) );
#endif
}
Expand Down Expand Up @@ -90,70 +94,29 @@ namespace librealsense
}


void context::invoke_devices_changed_callbacks( std::vector<rs2_device_info> & rs2_devices_info_removed,
std::vector<rs2_device_info> & rs2_devices_info_added )
void context::invoke_devices_changed_callbacks( std::vector< rs2_device_info > const & rs2_devices_info_removed,
std::vector< rs2_device_info > const & rs2_devices_info_added )
{
std::map<uint64_t, devices_changed_callback_ptr> devices_changed_callbacks;
{
std::lock_guard<std::mutex> lock( _devices_changed_callbacks_mtx );
devices_changed_callbacks = _devices_changed_callbacks;
}

for( auto & kvp : devices_changed_callbacks )
{
try
{
kvp.second->on_devices_changed( new rs2_device_list( { shared_from_this(), rs2_devices_info_removed } ),
new rs2_device_list( { shared_from_this(), rs2_devices_info_added } ) );
}
catch( ... )
{
LOG_ERROR( "Exception thrown from user callback handler" );
}
}

raise_devices_changed( rs2_devices_info_removed, rs2_devices_info_added );
}

void context::raise_devices_changed(const std::vector<rs2_device_info>& removed, const std::vector<rs2_device_info>& added)
{
if (_devices_changed_callback)
{
try
{
_devices_changed_callback->on_devices_changed(new rs2_device_list({ shared_from_this(), removed }),
new rs2_device_list({ shared_from_this(), added }));
}
catch( std::exception const & e )
{
LOG_ERROR( "Exception thrown from user callback handler: " << e.what() );
}
catch (...)
{
LOG_ERROR("Exception thrown from user callback handler");
}
}
_devices_changed.raise( rs2_devices_info_removed, rs2_devices_info_added );
}


uint64_t context::register_internal_device_callback(devices_changed_callback_ptr callback)
rsutils::subscription context::on_device_changes( devices_changed_callback_ptr callback )
{
std::lock_guard<std::mutex> lock(_devices_changed_callbacks_mtx);
auto callback_id = unique_id::generate_id();
_devices_changed_callbacks.insert(std::make_pair(callback_id, std::move(callback)));
return callback_id;
}

void context::unregister_internal_device_callback(uint64_t cb_id)
{
std::lock_guard<std::mutex> lock(_devices_changed_callbacks_mtx);
_devices_changed_callbacks.erase(cb_id);
}

void context::set_devices_changed_callback(devices_changed_callback_ptr callback)
{
std::lock_guard<std::mutex> lock(_devices_changed_callbacks_mtx);
_devices_changed_callback = std::move(callback);
return _devices_changed.subscribe(
[ctx = shared_from_this(), callback]( std::vector< rs2_device_info > const & removed,
std::vector< rs2_device_info > const & added )
{
try
{
callback->on_devices_changed( new rs2_device_list( { ctx, removed } ),
new rs2_device_list( { ctx, added } ) );
}
catch( std::exception const & e )
{
LOG_ERROR( "Exception thrown from user callback handler: " << e.what() );
}
} );
}


Expand Down
37 changes: 23 additions & 14 deletions src/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@

#pragma once

#include "types.h" // devices_changed_callback_ptr

#include <rsutils/signal.h>
#include <nlohmann/json.hpp>
#include <vector>
#include <map>


struct rs2_devices_changed_callback;


namespace librealsense
{
class context;
Expand Down Expand Up @@ -60,11 +62,19 @@ namespace librealsense
//
static unsigned combine_device_masks( unsigned requested_mask, unsigned mask_in_settings );

std::vector<std::shared_ptr<device_info>> query_devices(int mask) const;
// Query any subset of available devices and return them as device-info objects from which actual devices can be
// created as needed.
//
// Devices will match both the requested mask and the device-mask from the context settings. See
// RS2_PRODUCT_LINE_... defines for possible values.
//
std::vector< std::shared_ptr< device_info > > query_devices( int mask ) const;

uint64_t register_internal_device_callback(devices_changed_callback_ptr callback);
void unregister_internal_device_callback(uint64_t cb_id);
void set_devices_changed_callback(devices_changed_callback_ptr callback);
using devices_changed_callback_ptr = std::shared_ptr< rs2_devices_changed_callback >;

// Subscribe to a notification to receive when the device-list changes.
//
rsutils::subscription on_device_changes( devices_changed_callback_ptr callback );

// Let the context maintain a list of custom devices. These can be anything, like playback devices or devices
// maintained by the user.
Expand All @@ -74,20 +84,19 @@ namespace librealsense
const nlohmann::json & get_settings() const { return _settings; }

private:
void invoke_devices_changed_callbacks( std::vector<rs2_device_info> & rs2_devices_info_removed,
std::vector<rs2_device_info> & rs2_devices_info_added );
void raise_devices_changed(const std::vector<rs2_device_info>& removed, const std::vector<rs2_device_info>& added);
void invoke_devices_changed_callbacks( std::vector< rs2_device_info > const & devices_removed,
std::vector< rs2_device_info > const & devices_added );

std::map< std::string, std::weak_ptr< device_info > > _user_devices;
std::map<uint64_t, devices_changed_callback_ptr> _devices_changed_callbacks;
std::map< std::string /*address*/, std::weak_ptr< device_info > > _user_devices;

rsutils::signal< std::vector< rs2_device_info > const & /*removed*/,
std::vector< rs2_device_info > const & /*added*/ >
_devices_changed;

nlohmann::json _settings; // Save operation settings
unsigned const _device_mask;

std::vector< std::shared_ptr< device_factory > > _factories;

devices_changed_callback_ptr _devices_changed_callback;
std::mutex _devices_changed_callbacks_mtx;
};

}
4 changes: 3 additions & 1 deletion src/dds/rsdds-device-factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@

#include "rsdds-device-factory.h"
#include "context.h"

#include "rs-dds-device-info.h"

#include <librealsense2/h/rs_context.h> // RS2_PRODUCT_LINE_...

#include <realdds/dds-device-watcher.h>
#include <realdds/dds-participant.h>
#include <realdds/dds-device.h>
#include <realdds/topics/device-info-msg.h>

#include <rsutils/easylogging/easyloggingpp.h>
#include <rsutils/shared-ptr-singleton.h>
#include <rsutils/os/executable-name.h>
#include <rsutils/string/slice.h>
Expand Down
57 changes: 27 additions & 30 deletions src/device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,49 +34,46 @@ librealsense::find_profile( rs2_stream stream, int index, std::vector< stream_in
device::device( std::shared_ptr< const device_info > const & dev_info,
bool device_changed_notifications )
: _dev_info( dev_info )
, _is_valid( true )
, _is_alive( std::make_shared< bool >( true ) )
, _is_alive( std::make_shared< std::atomic< bool > >( true ) )
, _profiles_tags( [this]() { return get_profiles_tags(); } )
{
if( device_changed_notifications )
{
std::weak_ptr< bool > weak = _is_alive;
auto cb = new devices_changed_callback_internal([this, weak](rs2_device_list* removed, rs2_device_list* added)
{
// The callback can be called from one thread while the object is being destroyed by another.
// Check if members can still be accessed.
auto alive = weak.lock();
if( ! alive || ! *alive )
return;

// Update is_valid variable when device is invalid
std::lock_guard<std::mutex> lock(_device_changed_mtx);
for (auto& dev_info : removed->list)
std::weak_ptr< std::atomic< bool > > weak_alive = _is_alive;
std::weak_ptr< const device_info > weak_dev_info = _dev_info;
auto cb = new devices_changed_callback_internal(
[weak_alive, weak_dev_info]( rs2_device_list * removed, rs2_device_list * added )
{
if( dev_info.info->is_same_as( _dev_info ) )
{
_is_valid = false;
// The callback can be called from one thread while the object is being destroyed by another.
// Check if members can still be accessed.
auto alive = weak_alive.lock();
if( ! alive || ! *alive )
return;
auto this_dev_info = weak_dev_info.lock();
if( ! this_dev_info )
return;

// Update is_valid variable when device is invalid
for( auto & dev_info : removed->list )
{
if( dev_info.info->is_same_as( this_dev_info ) )
{
*alive = false;
return;
}
}
}
});
} );

_device_changed_callback_id
= get_context()->register_internal_device_callback( { cb,
[]( rs2_devices_changed_callback * p )
{
p->release();
} } );
_device_change_subscription = get_context()->on_device_changes( { cb,
[]( rs2_devices_changed_callback * p )
{
p->release();
} } );
}
}

device::~device()
{
*_is_alive = false;

if( _device_changed_callback_id )
get_context()->unregister_internal_device_callback( _device_changed_callback_id );

_sensors.clear();
}

Expand Down
17 changes: 6 additions & 11 deletions src/device.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
#include "device-info.h"

#include <rsutils/lazy.h>
#include <rsutils/subscription.h>
#include <chrono>
#include <memory>
#include <vector>
#include <atomic>


namespace librealsense {
Expand Down Expand Up @@ -52,11 +54,7 @@ class device

std::pair<uint32_t, rs2_extrinsics> get_extrinsics(const stream_interface& stream) const override;

bool is_valid() const override
{
std::lock_guard<std::mutex> lock(_device_changed_mtx);
return _is_valid;
}
bool is_valid() const override { return *_is_alive; }

void tag_profiles(stream_profiles profiles) const override;

Expand All @@ -66,7 +64,7 @@ class device

virtual void stop_activity() const;

bool device_changed_notifications_on() const { return _device_changed_callback_id; }
bool device_changed_notifications_on() const { return _device_change_subscription.is_active(); }

format_conversion get_format_conversion() const;

Expand All @@ -83,12 +81,9 @@ class device
private:
std::vector<std::shared_ptr<sensor_interface>> _sensors;
std::shared_ptr< const device_info > _dev_info;
bool _is_valid;
mutable std::mutex _device_changed_mtx;
uint64_t _device_changed_callback_id = 0;
std::shared_ptr< std::atomic< bool > > _is_alive;
rsutils::subscription _device_change_subscription;
rsutils::lazy< std::vector< tagged_profile > > _profiles_tags;

std::shared_ptr< bool > _is_alive; // Ensures object can be accessed
};


Expand Down
9 changes: 5 additions & 4 deletions src/device_hub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ namespace librealsense
device_hub::device_hub(std::shared_ptr<librealsense::context> ctx, int mask)
: _ctx( ctx )
, _mask( mask )
, _device_changes_callback_id( 0 )
{
_device_list = _ctx->query_devices(mask);

Expand All @@ -32,13 +31,15 @@ namespace librealsense
}
});

_device_changes_callback_id = _ctx->register_internal_device_callback({ cb, [](rs2_devices_changed_callback* p) { p->release(); } });
_device_change_subscription = _ctx->on_device_changes( { cb,
[]( rs2_devices_changed_callback * p )
{
p->release();
} } );
}

device_hub::~device_hub()
{
if (_device_changes_callback_id)
_ctx->unregister_internal_device_callback(_device_changes_callback_id);
}

std::shared_ptr<device_interface> device_hub::create_device(const std::string& serial, bool cycle_devices)
Expand Down
9 changes: 8 additions & 1 deletion src/device_hub.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@

#include "context.h"
#include "device.h"
#include <librealsense2/h/rs_context.h> // RS2_PRODUCT_LINE_...

#include <limits>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <memory>


namespace librealsense
{
Expand Down Expand Up @@ -52,7 +59,7 @@ namespace librealsense
std::condition_variable _cv;
std::vector<std::shared_ptr<device_info>> _device_list;
int _camera_index = 0;
uint64_t _device_changes_callback_id;
rsutils::subscription _device_change_subscription;
int _mask;
};
}
Loading
Loading