Skip to content

Commit

Permalink
change device on-metadata-available to signal
Browse files Browse the repository at this point in the history
  • Loading branch information
maloel committed Dec 13, 2023
1 parent c84ba97 commit 905b37b
Show file tree
Hide file tree
Showing 13 changed files with 75 additions and 62 deletions.
4 changes: 2 additions & 2 deletions src/dds/rs-dds-depth-sensor-proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void dds_depth_sensor_proxy::add_no_metadata( frame * const f, streaming_impl &
}


void dds_depth_sensor_proxy::add_frame_metadata( frame * const f, nlohmann::json && dds_md, streaming_impl & streaming )
void dds_depth_sensor_proxy::add_frame_metadata( frame * const f, nlohmann::json const & dds_md, streaming_impl & streaming )
{
if( auto du = rsutils::json::nested( dds_md, metadata_header_key, depth_units_key ) )
{
Expand All @@ -56,7 +56,7 @@ void dds_depth_sensor_proxy::add_frame_metadata( frame * const f, nlohmann::json
f->additional_data.depth_units = get_depth_scale();
}

super::add_frame_metadata( f, std::move( dds_md ), streaming );
super::add_frame_metadata( f, dds_md, streaming );
}


Expand Down
2 changes: 1 addition & 1 deletion src/dds/rs-dds-depth-sensor-proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class dds_depth_sensor_proxy

protected:
void add_no_metadata( frame *, streaming_impl & ) override;
void add_frame_metadata( frame * const f, nlohmann::json && dds_md, streaming_impl & streaming ) override;
void add_frame_metadata( frame *, nlohmann::json const & md, streaming_impl & ) override;
};


Expand Down
8 changes: 4 additions & 4 deletions src/dds/rs-dds-device-proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,13 +282,13 @@ dds_device_proxy::dds_device_proxy( std::shared_ptr< const device_info > const &

if( _dds_dev->supports_metadata() )
{
_dds_dev->on_metadata_available(
[this]( nlohmann::json && dds_md )
_metadata_subscription = _dds_dev->on_metadata_available(
[this]( std::shared_ptr< const nlohmann::json > const & dds_md )
{
std::string stream_name = rsutils::json::get< std::string >( dds_md, stream_name_key );
std::string const & stream_name = rsutils::json::nested( *dds_md, stream_name_key ).string_ref();
auto it = _stream_name_to_owning_sensor.find( stream_name );
if( it != _stream_name_to_owning_sensor.end() )
it->second->handle_new_metadata( stream_name, std::move( dds_md ) );
it->second->handle_new_metadata( stream_name, dds_md );
} );
}

Expand Down
2 changes: 2 additions & 0 deletions src/dds/rs-dds-device-proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class dds_device_proxy
std::map< std::string, std::shared_ptr< librealsense::stream > > _stream_name_to_librs_stream;
std::map< std::string, std::shared_ptr< dds_sensor_proxy > > _stream_name_to_owning_sensor;

rsutils::subscription _metadata_subscription;

int get_index_from_stream_name( const std::string & name ) const;
void set_profile_intrinsics( std::shared_ptr< stream_profile_interface > & profile,
const std::shared_ptr< realdds::dds_stream > & stream ) const;
Expand Down
22 changes: 14 additions & 8 deletions src/dds/rs-dds-sensor-proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,16 +337,20 @@ void dds_sensor_proxy::handle_motion_data( realdds::topics::imu_msg && imu,
}


void dds_sensor_proxy::handle_new_metadata( std::string const & stream_name, nlohmann::json && dds_md )
void dds_sensor_proxy::handle_new_metadata( std::string const & stream_name,
std::shared_ptr< const nlohmann::json > const & dds_md )
{
if( ! _md_enabled )
return;

auto it = _streaming_by_name.find( stream_name );
if( it != _streaming_by_name.end() )
it->second.syncer.enqueue_metadata(
rsutils::json::get< realdds::dds_nsec >( dds_md[metadata_header_key], timestamp_key ),
std::move( dds_md ) );
{
if( auto timestamp = rsutils::json::nested( *dds_md, metadata_header_key, timestamp_key ) )
it->second.syncer.enqueue_metadata( timestamp.value< realdds::dds_nsec >(), dds_md );
else
throw std::runtime_error( "missing metadata header/timestamp" );
}
// else we're not streaming -- must be another client that's subscribed
}

Expand All @@ -361,7 +365,9 @@ void dds_sensor_proxy::add_no_metadata( frame * const f, streaming_impl & stream
}


void dds_sensor_proxy::add_frame_metadata( frame * const f, nlohmann::json && dds_md, streaming_impl & streaming )
void dds_sensor_proxy::add_frame_metadata( frame * const f,
nlohmann::json const & dds_md,
streaming_impl & streaming )
{
nlohmann::json const & md_header = rsutils::json::nested( dds_md, metadata_header_key );
nlohmann::json const & md = rsutils::json::nested( dds_md, metadata_key );
Expand Down Expand Up @@ -434,14 +440,14 @@ void dds_sensor_proxy::start( rs2_frame_callback_sptr callback )
auto & streaming = _streaming_by_name[dds_stream->name()];
streaming.syncer.on_frame_release( frame_releaser );
streaming.syncer.on_frame_ready(
[this, &streaming]( syncer_type::frame_holder && fh, nlohmann::json && md )
[this, &streaming]( syncer_type::frame_holder && fh, std::shared_ptr< const nlohmann::json > const & md )
{
if( _is_streaming ) // stop was not called
{
if( md.empty() )
if( ! md )
add_no_metadata( static_cast< frame * >( fh.get() ), streaming );
else
add_frame_metadata( static_cast< frame * >( fh.get() ), std::move( md ), streaming );
add_frame_metadata( static_cast< frame * >( fh.get() ), *md, streaming );
invoke_new_frame( static_cast< frame * >( fh.release() ), nullptr, nullptr );
}
} );
Expand Down
5 changes: 3 additions & 2 deletions src/dds/rs-dds-sensor-proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,11 @@ class dds_sensor_proxy : public software_sensor
void handle_motion_data( realdds::topics::imu_msg &&,
const std::shared_ptr< stream_profile_interface > &,
streaming_impl & );
void handle_new_metadata( std::string const & stream_name, nlohmann::json && metadata );
void handle_new_metadata( std::string const & stream_name,
std::shared_ptr< const nlohmann::json > const & metadata );

virtual void add_no_metadata( frame *, streaming_impl & );
virtual void add_frame_metadata( frame * const, nlohmann::json && metadata, streaming_impl & );
virtual void add_frame_metadata( frame *, nlohmann::json const & metadata, streaming_impl & );

friend class dds_device_proxy; // Currently calls handle_new_metadata
};
Expand Down
5 changes: 3 additions & 2 deletions third-party/realdds/include/realdds/dds-device.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "dds-stream-profile.h"
#include "dds-stream.h"

#include <rsutils/subscription.h>
#include <nlohmann/json_fwd.hpp>
#include <memory>
#include <vector>
Expand Down Expand Up @@ -65,8 +66,8 @@ class dds_device

bool supports_metadata() const;

typedef std::function< void( nlohmann::json && md ) > on_metadata_available_callback;
void on_metadata_available( on_metadata_available_callback cb );
typedef std::function< void( std::shared_ptr< const nlohmann::json > const & md ) > on_metadata_available_callback;
rsutils::subscription on_metadata_available( on_metadata_available_callback && cb );

typedef std::function< void(
dds_time const & timestamp, char type, std::string const & text, nlohmann::json const & data ) >
Expand Down
14 changes: 7 additions & 7 deletions third-party/realdds/include/realdds/dds-metadata-syncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ class dds_metadata_syncer
typedef std::unique_ptr< frame_type, on_frame_release_callback > frame_holder;

// Metadata is intended to be JSON
typedef nlohmann::json metadata_type;
typedef std::shared_ptr< const nlohmann::json > metadata_type;

// So our main callback gets this generic frame and metadata:
typedef std::function< void( frame_holder &&, metadata_type && metadata ) > on_frame_ready_callback;
typedef std::function< void( frame_holder &&, metadata_type const & metadata ) > on_frame_ready_callback;

// And we provide other callbacks, for control, testing, etc.
typedef std::function< void( key_type, metadata_type && ) > on_metadata_dropped_callback;
typedef std::function< void( key_type, metadata_type const & ) > on_metadata_dropped_callback;

private:
using key_frame = std::pair< key_type, frame_holder >;
Expand All @@ -70,9 +70,9 @@ class dds_metadata_syncer
std::deque< key_metadata > _metadata_queue;
std::mutex _queues_lock;

on_frame_release_callback _on_frame_release = nullptr;
on_frame_ready_callback _on_frame_ready = nullptr;
on_metadata_dropped_callback _on_metadata_dropped = nullptr;
on_frame_release_callback _on_frame_release;
on_frame_ready_callback _on_frame_ready;
on_metadata_dropped_callback _on_metadata_dropped;

std::shared_ptr< bool > _is_alive; // Ensures object can be accessed

Expand All @@ -81,7 +81,7 @@ class dds_metadata_syncer
virtual ~dds_metadata_syncer();

void enqueue_frame( key_type, frame_holder && );
void enqueue_metadata( key_type, metadata_type && );
void enqueue_metadata( key_type, metadata_type const & );

void on_frame_release( on_frame_release_callback cb ) { _on_frame_release = cb; }
void on_frame_ready( on_frame_ready_callback cb ) { _on_frame_ready = cb; }
Expand Down
20 changes: 11 additions & 9 deletions third-party/realdds/py/pyrealdds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -885,8 +885,8 @@ PYBIND11_MODULE(NAME, m) {
.def( FN_FWD( dds_device,
on_metadata_available,
( dds_device &, py::object && ),
( nlohmann::json && j ),
callback( self, json_to_py( j ) ); ) )
( std::shared_ptr< const nlohmann::json > const & pj ),
callback( self, json_to_py( *pj ) ); ) )
.def( FN_FWD( dds_device,
on_device_log,
(dds_device &, dds_time const &, char, std::string const &, py::object && ),
Expand Down Expand Up @@ -1027,16 +1027,18 @@ PYBIND11_MODULE(NAME, m) {
.def( py::init<>() )
.def( FN_FWD( dds_metadata_syncer,
on_frame_ready,
( dds_metadata_syncer::frame_type, nlohmann::json && ),
( dds_metadata_syncer::frame_holder && fh, nlohmann::json && metadata ),
callback( self.get_frame( fh ), std::move( metadata ) ); ) )
( dds_metadata_syncer::frame_type, nlohmann::json const & ),
( dds_metadata_syncer::frame_holder && fh, std::shared_ptr< const nlohmann::json > const & metadata ),
callback( self.get_frame( fh ), metadata ? *metadata : nlohmann::json() ); ) )
.def( FN_FWD( dds_metadata_syncer,
on_metadata_dropped,
( dds_metadata_syncer::key_type, nlohmann::json && ),
( dds_metadata_syncer::key_type key, nlohmann::json && metadata ),
callback( key, std::move( metadata ) ); ) )
( dds_metadata_syncer::key_type, nlohmann::json const & ),
( dds_metadata_syncer::key_type key, std::shared_ptr< const nlohmann::json > const & metadata ),
callback( key, metadata ? *metadata : nlohmann::json() ); ) )
.def( "enqueue_frame", &dds_metadata_syncer::enqueue_frame )
.def( "enqueue_metadata", &dds_metadata_syncer::enqueue_metadata );
.def( "enqueue_metadata",
[]( dds_metadata_syncer & self, dds_metadata_syncer::key_type key, nlohmann::json const & j )
{ self.enqueue_metadata( key, std::make_shared< const nlohmann::json >( j ) ); } );
metadata_syncer.attr( "max_frame_queue_size" ) = dds_metadata_syncer::max_frame_queue_size;
metadata_syncer.attr( "max_md_queue_size" ) = dds_metadata_syncer::max_md_queue_size;
}
5 changes: 3 additions & 2 deletions third-party/realdds/src/dds-device-impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,11 +562,12 @@ void dds_device::impl::create_metadata_reader()
topics::flexible_msg message;
while( topics::flexible_msg::take_next( *_metadata_reader, &message ) )
{
if( message.is_valid() && _on_metadata_available )
if( message.is_valid() && _on_metadata_available.size() )
{
try
{
_on_metadata_available( std::move( message.json_data() ) );
auto sptr = std::make_shared< const nlohmann::json >( message.json_data() );
_on_metadata_available.raise( sptr );
}
catch( std::exception const & e )
{
Expand Down
11 changes: 8 additions & 3 deletions third-party/realdds/src/dds-device-impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include <fastdds/rtps/common/Guid.h>

#include <rsutils/signal.h>
#include <nlohmann/json.hpp>

#include <map>
Expand Down Expand Up @@ -85,8 +86,12 @@ class dds_device::impl
void set_option_value( const std::shared_ptr< dds_option > & option, float new_value );
float query_option_value( const std::shared_ptr< dds_option > & option );

typedef std::function< void( nlohmann::json && md ) > on_metadata_available_callback;
void on_metadata_available( on_metadata_available_callback cb ) { _on_metadata_available = cb; }
using on_metadata_available_signal = rsutils::signal< std::shared_ptr< const nlohmann::json > const & >;
using on_metadata_available_callback = on_metadata_available_signal::callback;
rsutils::subscription on_metadata_available( on_metadata_available_callback && cb )
{
return _on_metadata_available.subscribe( std::move( cb ) );
}

typedef std::function< void(
dds_time const & timestamp, char type, std::string const & text, nlohmann::json const & data ) >
Expand Down Expand Up @@ -117,7 +122,7 @@ class dds_device::impl
static notification_handlers const _notification_handlers;
void handle_notification( nlohmann::json const &, eprosima::fastdds::dds::SampleInfo const & );

on_metadata_available_callback _on_metadata_available;
on_metadata_available_signal _on_metadata_available;
on_device_log_callback _on_device_log;
on_notification_callback _on_notification;
};
Expand Down
4 changes: 2 additions & 2 deletions third-party/realdds/src/dds-device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ bool dds_device::supports_metadata() const
return !! _impl->_metadata_reader;
}

void dds_device::on_metadata_available( on_metadata_available_callback cb )
rsutils::subscription dds_device::on_metadata_available( on_metadata_available_callback && cb )
{
_impl->on_metadata_available( cb );
return _impl->on_metadata_available( std::move( cb ) );
}

void dds_device::on_device_log( on_device_log_callback cb )
Expand Down
35 changes: 15 additions & 20 deletions third-party/realdds/src/dds-metadata-syncer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ void dds_metadata_syncer::enqueue_frame( key_type id, frame_holder && frame )
std::unique_lock< std::mutex > lock( _queues_lock );
// Expect increasing order
if( ! _frame_queue.empty() && _frame_queue.back().first >= id )
DDS_THROW( runtime_error,
"frame " + std::to_string( id ) + " cannot be enqueued after "
+ std::to_string( _frame_queue.back().first ) );
DDS_THROW( runtime_error, "frame " << id << " cannot be enqueued after " << _frame_queue.back().first );

// We must push the new one before releasing the lock, else someone else may push theirs ahead of ours
_frame_queue.push_back( key_frame{ id, std::move( frame ) } );
Expand All @@ -52,21 +50,19 @@ void dds_metadata_syncer::enqueue_frame( key_type id, frame_holder && frame )
}


void dds_metadata_syncer::enqueue_metadata( key_type id, metadata_type && md )
void dds_metadata_syncer::enqueue_metadata( key_type id, metadata_type const & md )
{
std::weak_ptr< bool > alive = _is_alive;
if( !alive.lock() ) // Check if was destructed by another thread
if( ! alive.lock() ) // Check if was destructed by another thread
return;

std::unique_lock< std::mutex > lock( _queues_lock );
// Expect increasing order
if( ! _metadata_queue.empty() && _metadata_queue.back().first >= id )
DDS_THROW( runtime_error,
"metadata " + std::to_string( id ) + " cannot be enqueued after "
+ std::to_string( _metadata_queue.back().first ) );
DDS_THROW( runtime_error, "metadata " << id << " cannot be enqueued after " << _metadata_queue.back().first );

// We must push the new one before releasing the lock, else someone else may push theirs ahead of ours
_metadata_queue.push_back( key_metadata{ id, std::move( md ) } );
_metadata_queue.push_back( key_metadata{ id, md } );

while( _metadata_queue.size() > max_md_queue_size )
if( ! drop_metadata( lock ) ) // Lock released and aquired around callbacks, check we are alive
Expand All @@ -79,7 +75,7 @@ void dds_metadata_syncer::enqueue_metadata( key_type id, metadata_type && md )
void dds_metadata_syncer::search_for_match( std::unique_lock< std::mutex > & lock )
{
// Wait for frame + metadata set
while( !_frame_queue.empty() && !_metadata_queue.empty() )
while( ! _frame_queue.empty() && ! _metadata_queue.empty() )
{
// We're looking for metadata with the same ID as the next frame
auto const frame_key = _frame_queue.front().first;
Expand All @@ -88,18 +84,18 @@ void dds_metadata_syncer::search_for_match( std::unique_lock< std::mutex > & loc
if( frame_key < md_key )
{
// Newer metadata: we can release the frame
if( !handle_frame_without_metadata( lock ) )
if( ! handle_frame_without_metadata( lock ) )
return;
}
else if( frame_key == md_key )
{
if( !handle_match( lock ) )
if( ! handle_match( lock ) )
return;
}
else
{
// Throw away any old metadata (with ID < the frame) since the frame ID will keep increasing
if( !drop_metadata( lock ) )
if( ! drop_metadata( lock ) )
return;
}
}
Expand All @@ -118,8 +114,8 @@ bool dds_metadata_syncer::handle_match( std::unique_lock< std::mutex > & lock )
if( _on_frame_ready )
{
lock.unlock();
_on_frame_ready( std::move( fh ), std::move( md ) );
if( !alive.lock() ) // Check if was destructed by another thread during callback
_on_frame_ready( std::move( fh ), md );
if( ! alive.lock() ) // Check if was destructed by another thread during callback
return false;
lock.lock();
}
Expand All @@ -138,9 +134,8 @@ bool dds_metadata_syncer::handle_frame_without_metadata( std::unique_lock< std::
if( _on_frame_ready )
{
lock.unlock();
metadata_type md;
_on_frame_ready( std::move( fh ), std::move( md ) );
if( !alive.lock() ) // Check if was destructed by another thread during callback
_on_frame_ready( std::move( fh ), metadata_type() );
if( ! alive.lock() ) // Check if was destructed by another thread during callback
return false;
lock.lock();
}
Expand All @@ -159,8 +154,8 @@ bool dds_metadata_syncer::drop_metadata( std::unique_lock< std::mutex > & lock )
if( _on_metadata_dropped )
{
lock.unlock();
_on_metadata_dropped( key, std::move( md ) );
if( !alive.lock() ) // Check if was destructed by another thread during callback
_on_metadata_dropped( key, md );
if( ! alive.lock() ) // Check if was destructed by another thread during callback
return false;
lock.lock();
}
Expand Down

0 comments on commit 905b37b

Please sign in to comment.