Skip to content

Commit

Permalink
PR IntelRealSense#12517 from Eran: DDS device signals, bulk query-opt…
Browse files Browse the repository at this point in the history
…ion & misc
  • Loading branch information
maloel authored Dec 26, 2023
2 parents c1154ea + 05b9b43 commit 9b58f84
Show file tree
Hide file tree
Showing 36 changed files with 626 additions and 351 deletions.
2 changes: 1 addition & 1 deletion src/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ namespace librealsense {

/*static*/ std::shared_ptr< context > context::make( char const * json_settings )
{
return make( json_settings ? json::parse( json_settings ) : json::object() );
return make( ( ! json_settings || ! *json_settings ) ? json::object() : json::parse( json_settings ) );
}


Expand Down
4 changes: 2 additions & 2 deletions src/dds/rs-dds-depth-sensor-proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void dds_depth_sensor_proxy::add_no_metadata( frame * const f, streaming_impl &
}


void dds_depth_sensor_proxy::add_frame_metadata( frame * const f, nlohmann::json && dds_md, streaming_impl & streaming )
void dds_depth_sensor_proxy::add_frame_metadata( frame * const f, nlohmann::json const & dds_md, streaming_impl & streaming )
{
if( auto du = rsutils::json::nested( dds_md, metadata_header_key, depth_units_key ) )
{
Expand All @@ -56,7 +56,7 @@ void dds_depth_sensor_proxy::add_frame_metadata( frame * const f, nlohmann::json
f->additional_data.depth_units = get_depth_scale();
}

super::add_frame_metadata( f, std::move( dds_md ), streaming );
super::add_frame_metadata( f, dds_md, streaming );
}


Expand Down
2 changes: 1 addition & 1 deletion src/dds/rs-dds-depth-sensor-proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class dds_depth_sensor_proxy

protected:
void add_no_metadata( frame *, streaming_impl & ) override;
void add_frame_metadata( frame * const f, nlohmann::json && dds_md, streaming_impl & streaming ) override;
void add_frame_metadata( frame *, nlohmann::json const & md, streaming_impl & ) override;
};


Expand Down
20 changes: 4 additions & 16 deletions src/dds/rs-dds-device-proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,13 +282,13 @@ dds_device_proxy::dds_device_proxy( std::shared_ptr< const device_info > const &

if( _dds_dev->supports_metadata() )
{
_dds_dev->on_metadata_available(
[this]( nlohmann::json && dds_md )
_metadata_subscription = _dds_dev->on_metadata_available(
[this]( std::shared_ptr< const nlohmann::json > const & dds_md )
{
std::string stream_name = rsutils::json::get< std::string >( dds_md, stream_name_key );
std::string const & stream_name = rsutils::json::nested( *dds_md, stream_name_key ).string_ref();
auto it = _stream_name_to_owning_sensor.find( stream_name );
if( it != _stream_name_to_owning_sensor.end() )
it->second->handle_new_metadata( stream_name, std::move( dds_md ) );
it->second->handle_new_metadata( stream_name, dds_md );
} );
}

Expand Down Expand Up @@ -496,10 +496,6 @@ void dds_device_proxy::hardware_reset()
nlohmann::json control = nlohmann::json::object( { { "id", "hw-reset" } } );
nlohmann::json reply;
_dds_dev->send_control( control, &reply );
std::string default_status( "OK", 2 );
if( rsutils::json::get( reply, "status", default_status ) != default_status )
throw std::runtime_error( "Failed to reset: "
+ rsutils::json::get( reply, "explanation", std::string( "unknown reason" ) ) );
}


Expand All @@ -510,10 +506,6 @@ std::vector< uint8_t > dds_device_proxy::send_receive_raw_data( const std::vecto
nlohmann::json control = nlohmann::json::object( { { "id", "hwm" }, { "data", hexdata } } );
nlohmann::json reply;
_dds_dev->send_control( control, &reply );
std::string default_status( "OK", 2 );
if( rsutils::json::get( reply, "status", default_status ) != default_status )
throw std::runtime_error( "Failed HWM: "
+ rsutils::json::get( reply, "explanation", std::string( "unknown reason" ) ) );
rsutils::string::hexarray data;
if( ! rsutils::json::get_ex( reply, "data", &data ) )
throw std::runtime_error( "Failed HWM: missing 'data' in reply" );
Expand Down Expand Up @@ -541,10 +533,6 @@ std::vector< uint8_t > dds_device_proxy::build_command( uint32_t opcode,
{ "build-command", true } } );
nlohmann::json reply;
_dds_dev->send_control( control, &reply );
std::string default_status( "OK", 2 );
if( rsutils::json::get( reply, "status", default_status ) != default_status )
throw std::runtime_error( "Failed build-command: "
+ rsutils::json::get( reply, "explanation", std::string( "unknown reason" ) ) );
if( ! rsutils::json::get_ex( reply, "data", &hexdata ) )
throw std::runtime_error( "Failed HWM: missing 'data' in reply" );
return hexdata.detach();
Expand Down
2 changes: 2 additions & 0 deletions src/dds/rs-dds-device-proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class dds_device_proxy
std::map< std::string, std::shared_ptr< librealsense::stream > > _stream_name_to_librs_stream;
std::map< std::string, std::shared_ptr< dds_sensor_proxy > > _stream_name_to_owning_sensor;

rsutils::subscription _metadata_subscription;

int get_index_from_stream_name( const std::string & name ) const;
void set_profile_intrinsics( std::shared_ptr< stream_profile_interface > & profile,
const std::shared_ptr< realdds::dds_stream > & stream ) const;
Expand Down
22 changes: 14 additions & 8 deletions src/dds/rs-dds-sensor-proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,16 +337,20 @@ void dds_sensor_proxy::handle_motion_data( realdds::topics::imu_msg && imu,
}


void dds_sensor_proxy::handle_new_metadata( std::string const & stream_name, nlohmann::json && dds_md )
void dds_sensor_proxy::handle_new_metadata( std::string const & stream_name,
std::shared_ptr< const nlohmann::json > const & dds_md )
{
if( ! _md_enabled )
return;

auto it = _streaming_by_name.find( stream_name );
if( it != _streaming_by_name.end() )
it->second.syncer.enqueue_metadata(
rsutils::json::get< realdds::dds_nsec >( dds_md[metadata_header_key], timestamp_key ),
std::move( dds_md ) );
{
if( auto timestamp = rsutils::json::nested( *dds_md, metadata_header_key, timestamp_key ) )
it->second.syncer.enqueue_metadata( timestamp.value< realdds::dds_nsec >(), dds_md );
else
throw std::runtime_error( "missing metadata header/timestamp" );
}
// else we're not streaming -- must be another client that's subscribed
}

Expand All @@ -361,7 +365,9 @@ void dds_sensor_proxy::add_no_metadata( frame * const f, streaming_impl & stream
}


void dds_sensor_proxy::add_frame_metadata( frame * const f, nlohmann::json && dds_md, streaming_impl & streaming )
void dds_sensor_proxy::add_frame_metadata( frame * const f,
nlohmann::json const & dds_md,
streaming_impl & streaming )
{
nlohmann::json const & md_header = rsutils::json::nested( dds_md, metadata_header_key );
nlohmann::json const & md = rsutils::json::nested( dds_md, metadata_key );
Expand Down Expand Up @@ -434,14 +440,14 @@ void dds_sensor_proxy::start( rs2_frame_callback_sptr callback )
auto & streaming = _streaming_by_name[dds_stream->name()];
streaming.syncer.on_frame_release( frame_releaser );
streaming.syncer.on_frame_ready(
[this, &streaming]( syncer_type::frame_holder && fh, nlohmann::json && md )
[this, &streaming]( syncer_type::frame_holder && fh, std::shared_ptr< const nlohmann::json > const & md )
{
if( _is_streaming ) // stop was not called
{
if( md.empty() )
if( ! md )
add_no_metadata( static_cast< frame * >( fh.get() ), streaming );
else
add_frame_metadata( static_cast< frame * >( fh.get() ), std::move( md ), streaming );
add_frame_metadata( static_cast< frame * >( fh.get() ), *md, streaming );
invoke_new_frame( static_cast< frame * >( fh.release() ), nullptr, nullptr );
}
} );
Expand Down
5 changes: 3 additions & 2 deletions src/dds/rs-dds-sensor-proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,11 @@ class dds_sensor_proxy : public software_sensor
void handle_motion_data( realdds::topics::imu_msg &&,
const std::shared_ptr< stream_profile_interface > &,
streaming_impl & );
void handle_new_metadata( std::string const & stream_name, nlohmann::json && metadata );
void handle_new_metadata( std::string const & stream_name,
std::shared_ptr< const nlohmann::json > const & metadata );

virtual void add_no_metadata( frame *, streaming_impl & );
virtual void add_frame_metadata( frame * const, nlohmann::json && metadata, streaming_impl & );
virtual void add_frame_metadata( frame *, nlohmann::json const & metadata, streaming_impl & );

friend class dds_device_proxy; // Currently calls handle_new_metadata
};
Expand Down
4 changes: 3 additions & 1 deletion src/ds/d500/d500-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace librealsense
namespace ds
{
const uint16_t D555E_PID = 0x0B56;
const uint16_t D555E_RECOVERY_PID = 0x0ADE;

namespace xu_id
{
Expand All @@ -37,7 +38,8 @@ namespace librealsense
};

static const std::map< std::uint16_t, std::string > rs500_sku_names = {
{ ds::D555E_PID, "Intel RealSense D555e" }
{ ds::D555E_PID, "Intel RealSense D555e" },
{ ds::D555E_RECOVERY_PID, "Intel RealSense D555e Recovery" }
};

bool d500_try_fetch_usb_device(std::vector<platform::usb_device_info>& devices,
Expand Down
4 changes: 4 additions & 0 deletions src/fw-update/fw-update-factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ namespace librealsense
return RS2_PRODUCT_LINE_D400;
if( ds::RS_D400_USB2_RECOVERY_PID == usb_info.pid )
return RS2_PRODUCT_LINE_D400;
if( ds::D555E_RECOVERY_PID == usb_info.pid )
return RS2_PRODUCT_LINE_D500;
return 0;
}

Expand Down Expand Up @@ -57,6 +59,8 @@ namespace librealsense
case ds::RS_D400_RECOVERY_PID:
case ds::RS_D400_USB2_RECOVERY_PID:
return std::make_shared< ds_d400_update_device >( shared_from_this(), usb );
case ds::D555E_RECOVERY_PID:
return std::make_shared< ds_d500_update_device >( shared_from_this(), usb );
default:
// Do nothing
break;
Expand Down
3 changes: 3 additions & 0 deletions third-party/realdds/doc/control.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ Returns or sets the current `value` of an option within a device.

- `id` is `query-option` or `set-option`
- `option-name` is the unique option name within its owner
- Usually this is a single string value, but can also be an array of names for bulk queries: `["option1", "option2", ...]`
- `stream-name` is the unique name of the stream it's in within the device
- for a device option, this may be omitted
- for `set-option`, `value` is the new value (float)
Expand All @@ -75,6 +76,8 @@ Returns or sets the current `value` of an option within a device.
The reply should include the original control, plus:

- `value` will include the current or new value
- In the case of bulk queries, this will be an array of values, in the same size and ordering as the control's `option-name` array: `[<value-of-option1>, <value-of-option2>, ...]`
- If the `option-name` array is empty, all option values will be returned in a `option-values` key, instead (see below)

```JSON
{
Expand Down
16 changes: 11 additions & 5 deletions third-party/realdds/include/realdds/dds-device.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "dds-stream-profile.h"
#include "dds-stream.h"

#include <rsutils/subscription.h>
#include <nlohmann/json_fwd.hpp>
#include <memory>
#include <vector>
Expand Down Expand Up @@ -46,6 +47,11 @@ class dds_device
// Wait until ready. Will throw if not ready within the timeout!
void wait_until_ready( size_t timeout_ns = 5000 );

// Utility function for checking replies:
// If 'p_explanation' is nullptr, will throw if the reply status is not 'ok'.
// Otherise will return a false if not 'ok', and the explanation will be filled out.
static bool check_reply( nlohmann::json const & reply, std::string * p_explanation = nullptr );

//----------- below this line, a device must be running!

size_t number_of_streams() const;
Expand All @@ -65,16 +71,16 @@ class dds_device

bool supports_metadata() const;

typedef std::function< void( nlohmann::json && md ) > on_metadata_available_callback;
void on_metadata_available( on_metadata_available_callback cb );
typedef std::function< void( std::shared_ptr< const nlohmann::json > const & md ) > on_metadata_available_callback;
rsutils::subscription on_metadata_available( on_metadata_available_callback && );

typedef std::function< void(
dds_time const & timestamp, char type, std::string const & text, nlohmann::json const & data ) >
on_device_log_callback;
void on_device_log( on_device_log_callback cb );
rsutils::subscription on_device_log( on_device_log_callback && cb );

typedef std::function< bool( std::string const & id, nlohmann::json const & ) > on_notification_callback;
void on_notification( on_notification_callback );
typedef std::function< void( std::string const & id, nlohmann::json const & ) > on_notification_callback;
rsutils::subscription on_notification( on_notification_callback && );

private:
class impl;
Expand Down
14 changes: 7 additions & 7 deletions third-party/realdds/include/realdds/dds-metadata-syncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ class dds_metadata_syncer
typedef std::unique_ptr< frame_type, on_frame_release_callback > frame_holder;

// Metadata is intended to be JSON
typedef nlohmann::json metadata_type;
typedef std::shared_ptr< const nlohmann::json > metadata_type;

// So our main callback gets this generic frame and metadata:
typedef std::function< void( frame_holder &&, metadata_type && metadata ) > on_frame_ready_callback;
typedef std::function< void( frame_holder &&, metadata_type const & metadata ) > on_frame_ready_callback;

// And we provide other callbacks, for control, testing, etc.
typedef std::function< void( key_type, metadata_type && ) > on_metadata_dropped_callback;
typedef std::function< void( key_type, metadata_type const & ) > on_metadata_dropped_callback;

private:
using key_frame = std::pair< key_type, frame_holder >;
Expand All @@ -70,9 +70,9 @@ class dds_metadata_syncer
std::deque< key_metadata > _metadata_queue;
std::mutex _queues_lock;

on_frame_release_callback _on_frame_release = nullptr;
on_frame_ready_callback _on_frame_ready = nullptr;
on_metadata_dropped_callback _on_metadata_dropped = nullptr;
on_frame_release_callback _on_frame_release;
on_frame_ready_callback _on_frame_ready;
on_metadata_dropped_callback _on_metadata_dropped;

std::shared_ptr< bool > _is_alive; // Ensures object can be accessed

Expand All @@ -81,7 +81,7 @@ class dds_metadata_syncer
virtual ~dds_metadata_syncer();

void enqueue_frame( key_type, frame_holder && );
void enqueue_metadata( key_type, metadata_type && );
void enqueue_metadata( key_type, metadata_type const & );

void on_frame_release( on_frame_release_callback cb ) { _on_frame_release = cb; }
void on_frame_ready( on_frame_ready_callback cb ) { _on_frame_ready = cb; }
Expand Down
6 changes: 5 additions & 1 deletion third-party/realdds/include/realdds/dds-topic-writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <nlohmann/json_fwd.hpp>
#include <memory>
#include <atomic>


namespace eprosima {
Expand Down Expand Up @@ -39,7 +40,7 @@ class dds_topic_writer : protected eprosima::fastdds::dds::DataWriterListener

eprosima::fastdds::dds::DataWriter * _writer = nullptr;

int _n_readers = 0;
std::atomic< int > _n_readers;

public:
dds_topic_writer( std::shared_ptr< dds_topic > const & topic );
Expand Down Expand Up @@ -80,6 +81,9 @@ class dds_topic_writer : protected eprosima::fastdds::dds::DataWriterListener
// The callbacks should be set before we actually create the underlying DDS objects, so the writer does not
void run( qos const & = qos() );

// Waits until readers are detected; return false on timeout
bool wait_for_readers( dds_time timeout );

// Waits until all changes were acknowledged; return false on timeout
bool wait_for_acks( dds_time timeout );

Expand Down
Loading

0 comments on commit 9b58f84

Please sign in to comment.