Skip to content

Commit

Permalink
change to signal & on_device_changes(); remove mutex in device
Browse files Browse the repository at this point in the history
  • Loading branch information
maloel committed Oct 10, 2023
1 parent 5799bdb commit bc48b50
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 86 deletions.
1 change: 1 addition & 0 deletions src/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#pragma once
#include "context.h"
#include "types.h" // notification
#include "core/extension.h"
#include "device.h"
#include <rsutils/string/from.h>
Expand Down
70 changes: 25 additions & 45 deletions src/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
#include "dds/rsdds-device-factory.h"
#endif

#include <librealsense2/hpp/rs_types.hpp> // rs2_devices_changed_callback
#include <librealsense2/rs.h> // RS2_API_VERSION_STR
#include <src/librealsense-exception.h>

#include <rsutils/easylogging/easyloggingpp.h>
#include <rsutils/string/from.h>
#include <rsutils/json.h>
using json = nlohmann::json;
Expand All @@ -29,13 +34,13 @@ namespace librealsense

_factories.push_back( std::make_shared< backend_device_factory >(
*this,
[this]( std::vector< rs2_device_info > & removed, std::vector< rs2_device_info > & added )
[this]( std::vector< rs2_device_info > const & removed, std::vector< rs2_device_info > const & added )
{ invoke_devices_changed_callbacks( removed, added ); } ) );

#ifdef BUILD_WITH_DDS
_factories.push_back( std::make_shared< rsdds_device_factory >(
*this,
[this]( std::vector< rs2_device_info > & removed, std::vector< rs2_device_info > & added )
[this]( std::vector< rs2_device_info > const & removed, std::vector< rs2_device_info > const & added )
{ invoke_devices_changed_callbacks( removed, added ); } ) );
#endif
}
Expand Down Expand Up @@ -89,54 +94,29 @@ namespace librealsense
}


void context::invoke_devices_changed_callbacks( std::vector<rs2_device_info> & rs2_devices_info_removed,
std::vector<rs2_device_info> & rs2_devices_info_added )
void context::invoke_devices_changed_callbacks( std::vector< rs2_device_info > const & rs2_devices_info_removed,
std::vector< rs2_device_info > const & rs2_devices_info_added )
{
std::map<uint64_t, devices_changed_callback_ptr> devices_changed_callbacks;
{
std::lock_guard<std::mutex> lock( _devices_changed_callbacks_mtx );
devices_changed_callbacks = _devices_changed_callbacks;
}

for( auto & kvp : devices_changed_callbacks )
{
try
{
kvp.second->on_devices_changed( new rs2_device_list( { shared_from_this(), rs2_devices_info_removed } ),
new rs2_device_list( { shared_from_this(), rs2_devices_info_added } ) );
}
catch( std::exception const & e )
{
LOG_ERROR( "Exception thrown from user callback handler: " << e.what() );
}
catch( ... )
{
LOG_ERROR( "Exception thrown from user callback handler" );
}
}
_devices_changed.raise( rs2_devices_info_removed, rs2_devices_info_added );
}


uint64_t context::register_internal_device_callback(devices_changed_callback_ptr callback)
{
std::lock_guard<std::mutex> lock(_devices_changed_callbacks_mtx);
auto callback_id = unique_id::generate_id();
_devices_changed_callbacks.insert(std::make_pair(callback_id, std::move(callback)));
return callback_id;
}

void context::unregister_internal_device_callback(uint64_t cb_id)
rsutils::subscription context::on_device_changes( devices_changed_callback_ptr callback )
{
std::lock_guard<std::mutex> lock(_devices_changed_callbacks_mtx);
_devices_changed_callbacks.erase(cb_id);
}

void context::set_devices_changed_callback(devices_changed_callback_ptr callback)
{
std::lock_guard<std::mutex> lock(_devices_changed_callbacks_mtx);
// unique_id::generate_id() will never be 0; so we use 0 for the "public" callback so it'll get overriden on
// subsequent calls
_devices_changed_callbacks[0] = std::move( callback );
return _devices_changed.subscribe(
[ctx = shared_from_this(), callback]( std::vector< rs2_device_info > const & removed,
std::vector< rs2_device_info > const & added )
{
try
{
callback->on_devices_changed( new rs2_device_list( { ctx, removed } ),
new rs2_device_list( { ctx, added } ) );
}
catch( std::exception const & e )
{
LOG_ERROR( "Exception thrown from user callback handler: " << e.what() );
}
} );
}


Expand Down
35 changes: 23 additions & 12 deletions src/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@

#pragma once

#include "types.h" // devices_changed_callback_ptr

#include <rsutils/signal.h>
#include <nlohmann/json.hpp>
#include <vector>
#include <map>


struct rs2_devices_changed_callback;


namespace librealsense
{
class context;
Expand Down Expand Up @@ -60,11 +62,19 @@ namespace librealsense
//
static unsigned combine_device_masks( unsigned requested_mask, unsigned mask_in_settings );

std::vector<std::shared_ptr<device_info>> query_devices(int mask) const;
// Query any subset of available devices and return them as device-info objects from which actual devices can be
// created as needed.
//
// Devices will match both the requested mask and the device-mask from the context settings. See
// RS2_PRODUCT_LINE_... defines for possible values.
//
std::vector< std::shared_ptr< device_info > > query_devices( int mask ) const;

uint64_t register_internal_device_callback(devices_changed_callback_ptr callback);
void unregister_internal_device_callback(uint64_t cb_id);
void set_devices_changed_callback(devices_changed_callback_ptr callback);
using devices_changed_callback_ptr = std::shared_ptr< rs2_devices_changed_callback >;

// Subscribe to a notification to receive when the device-list changes.
//
rsutils::subscription on_device_changes( devices_changed_callback_ptr callback );

// Let the context maintain a list of custom devices. These can be anything, like playback devices or devices
// maintained by the user.
Expand All @@ -74,18 +84,19 @@ namespace librealsense
const nlohmann::json & get_settings() const { return _settings; }

private:
void invoke_devices_changed_callbacks( std::vector<rs2_device_info> & rs2_devices_info_removed,
std::vector<rs2_device_info> & rs2_devices_info_added );
void invoke_devices_changed_callbacks( std::vector< rs2_device_info > const & devices_removed,
std::vector< rs2_device_info > const & devices_added );

std::map< std::string, std::weak_ptr< device_info > > _user_devices;
std::map<uint64_t, devices_changed_callback_ptr> _devices_changed_callbacks;
std::map< std::string /*address*/, std::weak_ptr< device_info > > _user_devices;

rsutils::signal< std::vector< rs2_device_info > const & /*removed*/,
std::vector< rs2_device_info > const & /*added*/ >
_devices_changed;

nlohmann::json _settings; // Save operation settings
unsigned const _device_mask;

std::vector< std::shared_ptr< device_factory > > _factories;

std::mutex _devices_changed_callbacks_mtx;
};

}
16 changes: 5 additions & 11 deletions src/device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ device::device( std::shared_ptr< const device_info > const & dev_info,
return;

// Update is_valid variable when device is invalid
std::lock_guard<std::mutex> lock(_device_changed_mtx);
for (auto& dev_info : removed->list)
{
if( dev_info.info->is_same_as( _dev_info ) )
Expand All @@ -61,22 +60,17 @@ device::device( std::shared_ptr< const device_info > const & dev_info,
}
});

_device_changed_callback_id
= get_context()->register_internal_device_callback( { cb,
[]( rs2_devices_changed_callback * p )
{
p->release();
} } );
_device_change_subscription = get_context()->on_device_changes( { cb,
[]( rs2_devices_changed_callback * p )
{
p->release();
} } );
}
}

device::~device()
{
*_is_alive = false;

if( _device_changed_callback_id )
get_context()->unregister_internal_device_callback( _device_changed_callback_id );

_sensors.clear();
}

Expand Down
15 changes: 6 additions & 9 deletions src/device.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
#include "device-info.h"

#include <rsutils/lazy.h>
#include <rsutils/subscription.h>
#include <chrono>
#include <memory>
#include <vector>
#include <atomic>


namespace librealsense {
Expand Down Expand Up @@ -52,11 +54,7 @@ class device

std::pair<uint32_t, rs2_extrinsics> get_extrinsics(const stream_interface& stream) const override;

bool is_valid() const override
{
std::lock_guard<std::mutex> lock(_device_changed_mtx);
return _is_valid;
}
bool is_valid() const override { return _is_valid; }

void tag_profiles(stream_profiles profiles) const override;

Expand All @@ -66,7 +64,7 @@ class device

virtual void stop_activity() const;

bool device_changed_notifications_on() const { return _device_changed_callback_id; }
bool device_changed_notifications_on() const { return _device_change_subscription.is_active(); }

format_conversion get_format_conversion() const;

Expand All @@ -83,9 +81,8 @@ class device
private:
std::vector<std::shared_ptr<sensor_interface>> _sensors;
std::shared_ptr< const device_info > _dev_info;
bool _is_valid;
mutable std::mutex _device_changed_mtx;
uint64_t _device_changed_callback_id = 0;
std::atomic< bool > _is_valid;
rsutils::subscription _device_change_subscription;
rsutils::lazy< std::vector< tagged_profile > > _profiles_tags;

std::shared_ptr< bool > _is_alive; // Ensures object can be accessed
Expand Down
9 changes: 5 additions & 4 deletions src/device_hub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ namespace librealsense
device_hub::device_hub(std::shared_ptr<librealsense::context> ctx, int mask)
: _ctx( ctx )
, _mask( mask )
, _device_changes_callback_id( 0 )
{
_device_list = _ctx->query_devices(mask);

Expand All @@ -32,13 +31,15 @@ namespace librealsense
}
});

_device_changes_callback_id = _ctx->register_internal_device_callback({ cb, [](rs2_devices_changed_callback* p) { p->release(); } });
_device_change_subscription = _ctx->on_device_changes( { cb,
[]( rs2_devices_changed_callback * p )
{
p->release();
} } );
}

device_hub::~device_hub()
{
if (_device_changes_callback_id)
_ctx->unregister_internal_device_callback(_device_changes_callback_id);
}

std::shared_ptr<device_interface> device_hub::create_device(const std::string& serial, bool cycle_devices)
Expand Down
3 changes: 2 additions & 1 deletion src/device_hub.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#pragma once

#include <librealsense2/h/rs_context.h> // RS2_PRODUCT_LINE_...
#include "context.h"
#include "device.h"
#include <limits>
Expand Down Expand Up @@ -52,7 +53,7 @@ namespace librealsense
std::condition_variable _cv;
std::vector<std::shared_ptr<device_info>> _device_list;
int _camera_index = 0;
uint64_t _device_changes_callback_id;
rsutils::subscription _device_change_subscription;
int _mask;
};
}
5 changes: 3 additions & 2 deletions src/rs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ struct rs2_sensor : public rs2_options
struct rs2_context
{
std::shared_ptr<librealsense::context> ctx;
mutable rsutils::subscription devices_changed_subscription;
};

struct rs2_device_hub
Expand Down Expand Up @@ -824,7 +825,7 @@ void rs2_set_devices_changed_callback(const rs2_context* context, rs2_devices_ch
librealsense::devices_changed_callback_ptr cb(
new librealsense::devices_changed_callback(callback, user),
[](rs2_devices_changed_callback* p) { delete p; });
context->ctx->set_devices_changed_callback(std::move(cb));
context->devices_changed_subscription = context->ctx->on_device_changes( cb );
}
HANDLE_EXCEPTIONS_AND_RETURN(, context, callback, user)

Expand Down Expand Up @@ -882,7 +883,7 @@ void rs2_set_devices_changed_callback_cpp(rs2_context* context, rs2_devices_chan
} };

VALIDATE_NOT_NULL(context);
context->ctx->set_devices_changed_callback( callback_ptr );
context->devices_changed_subscription = context->ctx->on_device_changes( callback_ptr );
}
HANDLE_EXCEPTIONS_AND_RETURN(, context, callback)

Expand Down
4 changes: 2 additions & 2 deletions src/rscore/device-factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class device_factory
public:
// Callbacks take this form.
//
using callback = std::function< void( std::vector< rs2_device_info > & devices_removed,
std::vector< rs2_device_info > & devices_added ) >;
using callback = std::function< void( std::vector< rs2_device_info > const & devices_removed,
std::vector< rs2_device_info > const & devices_added ) >;

virtual ~device_factory() = default;

Expand Down
5 changes: 5 additions & 0 deletions third-party/rsutils/include/rsutils/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ class subscription
return *this;
}

// Return true if the subscription is active, meaning subscription-needs-to-be-cancelled-if-alive:
// The signal/subscription source may already be gone but we won't know that until we try to cancel, so active does
// not mean the source is alive! I.e., if true then a subscription was made and cancel() will get called.
bool is_active() const { return _d; }

// Unsubscribing is "canceling" the subscription. Will not throw if we've already cancelled.
void cancel()
{
Expand Down

0 comments on commit bc48b50

Please sign in to comment.