From c0e8c0f64986a08016d541fb0478f2b454ae2ad9 Mon Sep 17 00:00:00 2001 From: Eran Date: Wed, 17 Apr 2024 14:33:05 +0300 Subject: [PATCH 01/15] change ROS wrapper reference --- third-party/realdds/readme.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/third-party/realdds/readme.md b/third-party/realdds/readme.md index bed1a290b4..7aa2186514 100644 --- a/third-party/realdds/readme.md +++ b/third-party/realdds/readme.md @@ -39,7 +39,7 @@ Consumes data produced by a server: a *stream* is the client to a stream-server, #### ROS2 -[ROS2](https://docs.ros.org/) is one of the clients for which effort has been expended in order to be compliant. This compliance is direct-to-ROS compliance that does not require use of a custom node (based on librealsense) for translation (as the [ROS wrapper for librealsense](https://github.com/IntelRealSense/realsense-ros) is): ROS is a DDS participant like any other Client. +[ROS2](https://docs.ros.org/) is one of the clients for which effort has been expended in order to be compliant. This is direct-to-ROS compliance that does not require use of a custom node (based on librealsense) for translation (as the [ROS Wrapper for IntelĀ® RealSenseā„¢ cameras](https://github.com/IntelRealSense/realsense-ros) is): ROS is a DDS participant like any other Client. Specifically, at this time, the requirement is that ROS nodes should be able to: 1. See any streams that RealDDS exposes From c4da6fbb76a4923e79d0adeb203d6c9ac47e9487 Mon Sep 17 00:00:00 2001 From: Eran Date: Wed, 17 Apr 2024 14:21:00 +0300 Subject: [PATCH 02/15] add rsutils::ios::word-parser; make_less_screamy uses it --- .../rsutils/include/rsutils/ios/word-format.h | 105 ++++++++++++++++++ .../rsutils/string/make-less-screamy.h | 21 +++- third-party/rsutils/src/make-less-screamy.cpp | 33 ------ third-party/rsutils/src/word-format.cpp | 54 +++++++++ 4 files changed, 175 insertions(+), 38 deletions(-) create mode 100644 third-party/rsutils/include/rsutils/ios/word-format.h delete mode 100644 third-party/rsutils/src/make-less-screamy.cpp create mode 100644 third-party/rsutils/src/word-format.cpp diff --git a/third-party/rsutils/include/rsutils/ios/word-format.h b/third-party/rsutils/include/rsutils/ios/word-format.h new file mode 100644 index 0000000000..291fa584ce --- /dev/null +++ b/third-party/rsutils/include/rsutils/ios/word-format.h @@ -0,0 +1,105 @@ +// License: Apache 2.0. See LICENSE file in root directory. +// Copyright(c) 2024 Intel Corporation. All Rights Reserved. +#pragma once + +#include +#include + + +namespace rsutils { +namespace ios { + + +// Parse an input 0-terminated string into words, to output to a stream. +// +// Words are delimited_by() any of a set of characters, which can then be removed or replaced by others as needed to +// provide custom output. +// +// Custom capitalization can also be provided, with built-in uppercase, lower-case, or first-letter-only options. +// +// E.g., with underscores: +// MACRO_VARIABLE -> macro-variable // replace _ with -, make lower-case +// my_sharona -> My Sharona // replace _ with space and capitalize first letters +// +// By default: +// - spaces and tabs are delimiters +// - capitalization stays the same +// +struct word_format +{ + char const * const _c_str; + char const * _delimiters; + + typedef char ( *replacement_function )( char previous, char current ); + + replacement_function _delimiterizer = rf_same; // can replace any delimiters with another character + replacement_function _capitalizer = rf_same; // can capitalize non-delimiters as needed + + word_format( char const * c_str, char const * delimiters = " \t" ) + : _c_str( c_str ) + , _delimiters( delimiters ) + { + } + word_format( std::string const & str, char const * delimiters = " \t" ) + : word_format( str.c_str(), delimiters ) + { + } + + word_format & capitalize_with( replacement_function capitalizer ) + { + _capitalizer = capitalizer; + return *this; + } + word_format & capitalize_first_letter() { return capitalize_with( rf_cap_first ); } + word_format & uppercase() { return capitalize_with( rf_upper ); } + word_format & lowercase() { return capitalize_with( rf_lower ); } + + word_format & delimited_by( char const * delims, replacement_function delimiterizer = rf_same ) + { + _delimiters = delims; + _delimiterizer = delimiterizer; + return *this; + } + + std::string str() const; + + // Possible replacement functions + static char rf_same( char, char ch ) { return ch; } + static char rf_lower( char, char ch ) { return std::tolower( ch ); } + static char rf_upper( char, char ch ) { return std::toupper( ch ); } + static char rf_cap_first( char prev, char ch ) + { + bool const is_first = ( prev == 0 ); + return is_first ? std::toupper( ch ) : std::tolower( ch ); + } + static char rf_space( char, char ) { return ' '; } + static char rf_dash( char, char ) { return '-'; } +}; + + +std::ostream & operator<<( std::ostream & os, word_format const & ); + + +// Convert names to our standard "dash" notation: +// - delimited by whitespace, underscores, dashes +// - replaced by dashes +// - all lowercase +// E.g.: +// SomethingNice -> something-nice +// HA_HA -> ha-ha +// My name -> my-name +// This is known as dash-case or kebab-case. We use the former because it's just easier to remember... +// https://stackoverflow.com/questions/11273282/whats-the-name-for-hyphen-separated-case +// +inline word_format dash_case( char const * c_str ) +{ + return word_format( c_str ).lowercase().delimited_by( " _-\t", word_format::rf_dash ); +} +inline word_format dash_case( std::string const & str ) +{ + return dash_case( str.c_str() ); +} + + +} // namespace ios +} // namespace rsutils diff --git a/third-party/rsutils/include/rsutils/string/make-less-screamy.h b/third-party/rsutils/include/rsutils/string/make-less-screamy.h index 1d69110b9b..54fed2716b 100644 --- a/third-party/rsutils/include/rsutils/string/make-less-screamy.h +++ b/third-party/rsutils/include/rsutils/string/make-less-screamy.h @@ -1,9 +1,8 @@ // License: Apache 2.0. See LICENSE file in root directory. -// Copyright(c) 2023 Intel Corporation. All Rights Reserved. - +// Copyright(c) 2023-4 Intel Corporation. All Rights Reserved. #pragma once -#include +#include namespace rsutils { @@ -14,9 +13,21 @@ namespace string { // human-readable form (e.g., "Product Line D500"). // // - underscores are replaced by spaces -// - letters are made lower-case; the beginning of a words upper-case +// - letters are made lower-case; the beginning of words upper-case // -std::string make_less_screamy( std::string ); +inline std::string make_less_screamy( char const * c_str ) +{ + return rsutils::ios::word_format( c_str ) + .delimited_by( "_", rsutils::ios::word_format::rf_space ) + .capitalize_first_letter() + .str(); +} + + +inline std::string make_less_screamy( std::string const & str ) +{ + return make_less_screamy( str.c_str() ); +} } // namespace string diff --git a/third-party/rsutils/src/make-less-screamy.cpp b/third-party/rsutils/src/make-less-screamy.cpp deleted file mode 100644 index 1ccb3e2c6e..0000000000 --- a/third-party/rsutils/src/make-less-screamy.cpp +++ /dev/null @@ -1,33 +0,0 @@ -// License: Apache 2.0. See LICENSE file in root directory. -// Copyright(c) 2023 Intel Corporation. All Rights Reserved. - -#include - - -namespace rsutils { -namespace string { - - -std::string make_less_screamy( std::string res ) -{ - bool first = true; - for( char & ch : res ) - { - if( ch != '_' ) - { - if( ! first ) - ch = tolower( ch ); - first = false; - } - else - { - ch = ' '; - first = true; - } - } - return res; -} - - -} // namespace string -} // namespace rsutils diff --git a/third-party/rsutils/src/word-format.cpp b/third-party/rsutils/src/word-format.cpp new file mode 100644 index 0000000000..622f2e0512 --- /dev/null +++ b/third-party/rsutils/src/word-format.cpp @@ -0,0 +1,54 @@ +// License: Apache 2.0. See LICENSE file in root directory. +// Copyright(c) 2024 Intel Corporation. All Rights Reserved. + +#include +#include +#include + + +namespace rsutils { +namespace ios { + + +std::string word_format::str() const +{ + return rsutils::string::from( *this ); +} + + +std::ostream & operator<<( std::ostream & os, word_format const & word ) +{ + char prev_letter = 0; + char prev_delim = 0; + for( char const * pch = word._c_str; *pch; ++pch ) + { + if( std::strchr( word._delimiters, *pch ) ) + { + if( auto ch = word._delimiterizer( prev_delim, *pch ) ) + os << ch; + prev_letter = 0; + prev_delim = *pch; + } + else if( std::islower( prev_letter ) && std::isupper( *pch ) ) + { + // Going from lower-case to upper-case counts as a delimiter followed by a new word + // E.g.: leaseDuration -> lease duration + if( auto ch = word._delimiterizer( 0, 0 ) ) + os << ch; + if( auto ch = word._capitalizer( 0, prev_letter = *pch ) ) + os << ch; + } + else + { + if( auto ch = word._capitalizer( prev_letter, *pch ) ) + os << ch; + prev_delim = 0; + prev_letter = *pch; + } + } + return os; +} + + +} // namespace ios +} // namespace rsutils From 01c149d4bbc898a20671279467d297fd55a2ea70 Mon Sep 17 00:00:00 2001 From: Eran Date: Tue, 16 Apr 2024 20:07:43 +0300 Subject: [PATCH 03/15] add rsutils::ios::field & indent --- .../rsutils/include/rsutils/ios/field.h | 64 +++++++++++++++ .../rsutils/include/rsutils/ios/indent.h | 48 +++++++++++ third-party/rsutils/src/ios.cpp | 80 +++++++++++++++++++ 3 files changed, 192 insertions(+) create mode 100644 third-party/rsutils/include/rsutils/ios/field.h create mode 100644 third-party/rsutils/include/rsutils/ios/indent.h create mode 100644 third-party/rsutils/src/ios.cpp diff --git a/third-party/rsutils/include/rsutils/ios/field.h b/third-party/rsutils/include/rsutils/ios/field.h new file mode 100644 index 0000000000..fee0b3d31e --- /dev/null +++ b/third-party/rsutils/include/rsutils/ios/field.h @@ -0,0 +1,64 @@ +// License: Apache 2.0. See LICENSE file in root directory. +// Copyright(c) 2024 Intel Corporation. All Rights Reserved. +#pragma once + +#include +#include + + +namespace rsutils { +namespace ios { + + +// Many times when we write the contents of some structure to a stream, multiple "fields" are involved. We want to allow +// a standard way of providing: +// - field separation +// - value separation +// - both same-line and indented (multiline) options for big dumps +// - flexibility: not requiring a field name, value, nor any formatting or either +// +// Indented output puts each field on its own line. To enable multiline output, simply set the indent: +// os << rsutils::ios::indent(); +// os << my_structure; +// os << rsutils::ios::unindent(); +// See group() for another way of doing this. +// +struct field +{ + // Separate one field from the previous: + // os << field::separator << "whatever"; + // If any indent is set, this will be on a separate line! + static std::ostream & separator( std::ostream & ); + + // Separate one field from the previous, but force inline: + // os << field::sameline << "another one"; + static std::ostream & sameline( std::ostream & ); + + // Inserts a separator between the field name and value + // os << field::separator << "key" << field::value << value; + static std::ostream & value( std::ostream & ); + + // Start a "group" of fields that are indented, e.g.: + // os << "quality-of-service" << field::group() << _qos; + // Which will output: + // quality-of-service + // field1 value1 + // field2 value2 + // ... + // When inline: + // quality-of-service[ field1 value1 field2 value2 ... ] + // Only valid to use while the ostream is valid! + // + struct group + { + mutable std::ostream * pos = nullptr; + ~group(); // needed to unindent/close the group + }; +}; + + +std::ostream & operator<<( std::ostream &, field::group const & ); + + +} // namespace ios +} // namespace rsutils diff --git a/third-party/rsutils/include/rsutils/ios/indent.h b/third-party/rsutils/include/rsutils/ios/indent.h new file mode 100644 index 0000000000..47646d013c --- /dev/null +++ b/third-party/rsutils/include/rsutils/ios/indent.h @@ -0,0 +1,48 @@ +// License: Apache 2.0. See LICENSE file in root directory. +// Copyright(c) 2024 Intel Corporation. All Rights Reserved. +#pragma once + +#include +#include + + +namespace rsutils { +namespace ios { + + +long & indent_flag( std::ios_base & ); + + +inline long get_indent( std::ios_base & s ) { return indent_flag( s ); } +inline bool has_indent( std::ios_base & s ) { return get_indent( s ) > 0; } +inline void set_indent( std::ios_base & s, long indent ) { indent_flag( s ) = std::max( indent, 0L ); } +inline void add_indent( std::ios_base & s, long d ) { set_indent( s, get_indent( s ) + d ); } + + +// os << "struct" << ios::indent() +// << field( "member1" ) << s.member1 +// << field( "member2" ) << s.member2 +// << ios::unindent(); +struct indent +{ + int d; + + indent( int d_indent = 4 ) + : d( d_indent ) + { + } +}; +struct unindent : indent +{ + unindent( int d_indent = 4 ) + : indent( -d_indent ) + { + } +}; + + +std::ostream & operator<<( std::ostream &, indent const & ); + + +} // namespace ios +} // namespace rsutils diff --git a/third-party/rsutils/src/ios.cpp b/third-party/rsutils/src/ios.cpp new file mode 100644 index 0000000000..3b105f88ac --- /dev/null +++ b/third-party/rsutils/src/ios.cpp @@ -0,0 +1,80 @@ +// License: Apache 2.0. See LICENSE file in root directory. +// Copyright(c) 2024 Intel Corporation. All Rights Reserved. + +#include +#include +#include + + +namespace rsutils { +namespace ios { + + +long & indent_flag( std::ios_base & s ) +{ + static int const index = std::ios_base::xalloc(); + return s.iword( index ); +} + + +std::ostream & operator<<( std::ostream & os, indent const & indent ) +{ + add_indent( os, indent.d ); + return os; +} + + +/*static*/ std::ostream & field::sameline( std::ostream & os ) +{ + return os << ' '; +} + + +/*static*/ std::ostream & field::separator( std::ostream & os ) +{ + if( int i = get_indent( os ) ) + { + os << '\n'; + while( i-- ) + os << ' '; + } + else + { + sameline( os ); + } + return os; +} + + +/*static*/ std::ostream & field::value( std::ostream & os ) +{ + os << ' '; + return os; +} + + +std::ostream & operator<<( std::ostream & os, field::group const & group ) +{ + if( has_indent( os ) ) + os << indent(); + else + os << "["; + group.pos = &os; + return os; +} + + +field::group::~group() +{ + if( pos ) + { + if( has_indent( *pos ) ) + *pos << unindent(); + else + *pos << " ]"; + } +} + + +} +} // namespace rsutils From ae662cf9ccc007905b03cab37ddee1456b838e5b Mon Sep 17 00:00:00 2001 From: Eran Date: Tue, 2 Apr 2024 10:31:54 +0300 Subject: [PATCH 04/15] separate, overridable dds-participant::qos --- .../realdds/include/realdds/dds-participant.h | 25 +++++++- third-party/realdds/src/dds-participant.cpp | 60 ++++++++++--------- third-party/realdds/src/dds-serialization.cpp | 1 + 3 files changed, 56 insertions(+), 30 deletions(-) diff --git a/third-party/realdds/include/realdds/dds-participant.h b/third-party/realdds/include/realdds/dds-participant.h index d2c2370727..e195dad999 100644 --- a/third-party/realdds/include/realdds/dds-participant.h +++ b/third-party/realdds/include/realdds/dds-participant.h @@ -1,9 +1,11 @@ // License: Apache 2.0. See LICENSE file in root directory. -// Copyright(c) 2022 Intel Corporation. All Rights Reserved. +// Copyright(c) 2022-4 Intel Corporation. All Rights Reserved. #pragma once #include "dds-defines.h" +#include + #include #include #include @@ -57,12 +59,31 @@ class dds_participant dds_participant( const dds_participant & ) = delete; ~dds_participant(); +public: + // Centralizes our default QoS settings, so that the user can then override before calling init(). + // Note that init() will try to override further with information from the settings it is passed. + // + class qos : public eprosima::fastdds::dds::DomainParticipantQos + { + using super = eprosima::fastdds::dds::DomainParticipantQos; + + public: + qos( std::string const & participant_name ); + }; + // Creates the underlying DDS participant and sets the QoS. // If callbacks are needed, set them before calling init. Note they may be called before init returns! // // The domain ID may be -1: in this case the settings "domain" is queried and a default of 0 is used // - void init( dds_domain_id, std::string const & participant_name, rsutils::json const & settings ); + void init( dds_domain_id did, std::string const & participant_name, rsutils::json const & settings ) + { + qos pqos( participant_name ); + init( did, pqos, settings ); + } + // Same, with custom QoS + // + void init( dds_domain_id, qos &, rsutils::json const & settings ); bool is_valid() const { return ( nullptr != _participant ); } bool operator!() const { return ! is_valid(); } diff --git a/third-party/realdds/src/dds-participant.cpp b/third-party/realdds/src/dds-participant.cpp index b09f27fab3..3a75cbfabb 100644 --- a/third-party/realdds/src/dds-participant.cpp +++ b/third-party/realdds/src/dds-participant.cpp @@ -1,5 +1,5 @@ // License: Apache 2.0. See LICENSE file in root directory. -// Copyright(c) 2022 Intel Corporation. All Rights Reserved. +// Copyright(c) 2022-4 Intel Corporation. All Rights Reserved. #include #include @@ -186,12 +186,36 @@ struct dds_participant::listener_impl : public eprosima::fastdds::dds::DomainPar }; -void dds_participant::init( dds_domain_id domain_id, std::string const & participant_name, rsutils::json const & settings ) +dds_participant::qos::qos( std::string const & participant_name ) +{ + name( participant_name ); + + // Indicates for how much time should a remote DomainParticipant consider the local DomainParticipant to be alive. + wire_protocol().builtin.discovery_config.leaseDuration = { 10, 0 }; // [sec,nsec] + + // Disable shared memory, use only UDP + // Disabling because sometimes, after improper destruction (e.g. stopping debug) the shared memory is not opened + // correctly and the application is stuck. eProsima is working on it. Manual solution delete shared memory files, + // C:\ProgramData\eprosima\fastrtps_interprocess on Windows, /dev/shm on Linux + auto udp_transport = std::make_shared< eprosima::fastdds::rtps::UDPv4TransportDescriptor >(); + // Also change the receive buffers: we deal with lots of information and, without this, we'll get dropped frames and + // unusual behavior... + udp_transport->sendBufferSize = 16 * 1024 * 1024; + udp_transport->receiveBufferSize = 16 * 1024 * 1024; + // The server may not be able to handle fragmented packets; certain hardware cannot handle more than 1500 bytes! + // UDP default is 64K; overridable via 'udp/max-message-size' + //udp_transport->maxMessageSize = 1470; TODO this affects reading, too! We need writer-only property for this... + transport().use_builtin_transports = false; + transport().user_transports.push_back( udp_transport ); +} + + +void dds_participant::init( dds_domain_id domain_id, qos & pqos, rsutils::json const & settings ) { if( is_valid() ) { DDS_THROW( runtime_error, - "participant is already initialized; cannot init '" << participant_name << "' on domain id " + "participant is already initialized; cannot init '" << pqos.name() << "' on domain id " << domain_id ); } @@ -206,25 +230,7 @@ void dds_participant::init( dds_domain_id domain_id, std::string const & partici // Initialize the timestr "start" time timestr{ realdds::now() }; - DomainParticipantQos pqos; - pqos.name( participant_name ); - - // Indicates for how much time should a remote DomainParticipant consider the local DomainParticipant to be alive. - pqos.wire_protocol().builtin.discovery_config.leaseDuration = { 10, 0 }; // [sec,nsec] - - // Disable shared memory, use only UDP - // Disabling because sometimes, after improper destruction (e.g. stopping debug) the shared memory is not opened - // correctly and the application is stuck. eProsima is working on it. Manual solution delete shared memory files, - // C:\ProgramData\eprosima\fastrtps_interprocess on Windows, /dev/shm on Linux - auto udp_transport = std::make_shared< eprosima::fastdds::rtps::UDPv4TransportDescriptor >(); - // Also change the send/receive buffers: we deal with lots of information and, without this, we'll get dropped - // frames and unusual behavior... - udp_transport->sendBufferSize = 16 * 1024 * 1024; - udp_transport->receiveBufferSize = 16 * 1024 * 1024; - pqos.transport().use_builtin_transports = false; - pqos.transport().user_transports.push_back( udp_transport ); - - // Above are defaults + // The QoS given are what the user wants to use, but are supplied BEFORE any overrides from the settings override_participant_qos_from_json( pqos, settings ); // NOTE: the listener callbacks we use are all specific to FastDDS and so are always enabled: @@ -236,10 +242,7 @@ void dds_participant::init( dds_domain_id domain_id, std::string const & partici = DDS_API_CALL( _participant_factory->create_participant( domain_id, pqos, _domain_listener.get(), par_mask ) ); if( ! _participant ) - { - DDS_THROW( runtime_error, - "failed creating participant " + participant_name + " on domain id " + std::to_string( domain_id ) ); - } + DDS_THROW( runtime_error, "failed creating participant " << pqos.name() << " on domain id " << domain_id ); if( settings.is_object() ) _settings = settings; @@ -248,8 +251,9 @@ void dds_participant::init( dds_domain_id domain_id, std::string const & partici else DDS_THROW( runtime_error, "provided settings are invalid: " << settings ); - LOG_DEBUG( "participant '" << participant_name << "' (" << realdds::print_raw_guid( guid() ) << ") is up on domain " - << domain_id << " with settings: " << _settings.dump( 4 ) ); + LOG_DEBUG( "participant " << realdds::print_raw_guid( guid() ) << " " << pqos << "\nis up on domain " << domain_id + << " from settings: " << std::setw( 4 ) << _settings ); + #ifdef BUILD_EASYLOGGINGPP // DDS participant destruction happens when all contexts are done with it but, in some situations (e.g., Python), it // means that it may happen last -- even after ELPP has already been destroyed! So, just like we keep the participant diff --git a/third-party/realdds/src/dds-serialization.cpp b/third-party/realdds/src/dds-serialization.cpp index ac4ff1a79d..7bb5006470 100644 --- a/third-party/realdds/src/dds-serialization.cpp +++ b/third-party/realdds/src/dds-serialization.cpp @@ -360,6 +360,7 @@ static void override_udp_settings( eprosima::fastdds::rtps::UDPTransportDescript { j.nested( "send-buffer-size" ).get_ex( udp.sendBufferSize ); j.nested( "receive-buffer-size" ).get_ex( udp.receiveBufferSize ); + j.nested( "max-message-size" ).get_ex( udp.maxMessageSize ); if( ! parse_ip_list( j, "whitelist", &udp.interfaceWhiteList ) ) LOG_WARNING( "invalid UDP whitelist in settings" ); } From 31d18e9a09ea4d40790305b47c95312e7c57ab46 Mon Sep 17 00:00:00 2001 From: Eran Date: Wed, 17 Apr 2024 14:31:26 +0300 Subject: [PATCH 05/15] add participant, flow-controller serialization --- .../include/realdds/dds-serialization.h | 37 +- third-party/realdds/src/dds-serialization.cpp | 430 ++++++++++++++++-- 2 files changed, 434 insertions(+), 33 deletions(-) diff --git a/third-party/realdds/include/realdds/dds-serialization.h b/third-party/realdds/include/realdds/dds-serialization.h index c4d75fcfc4..e606ad7f74 100644 --- a/third-party/realdds/include/realdds/dds-serialization.h +++ b/third-party/realdds/include/realdds/dds-serialization.h @@ -1,5 +1,5 @@ // License: Apache 2.0. See LICENSE file in root directory. -// Copyright(c) 2023 Intel Corporation. All Rights Reserved. +// Copyright(c) 2023-4 Intel Corporation. All Rights Reserved. #pragma once #include @@ -15,14 +15,25 @@ std::ostream & operator<<( std::ostream &, ReliabilityQosPolicyKind ); std::ostream & operator<<( std::ostream &, ReliabilityQosPolicy const & ); std::ostream & operator<<( std::ostream &, DurabilityQosPolicyKind ); std::ostream & operator<<( std::ostream &, DurabilityQosPolicy const & ); +std::ostream & operator<<( std::ostream &, HistoryQosPolicyKind ); std::ostream & operator<<( std::ostream &, HistoryQosPolicy const & ); std::ostream & operator<<( std::ostream &, LivelinessQosPolicyKind ); std::ostream & operator<<( std::ostream &, LivelinessQosPolicy const & ); std::ostream & operator<<( std::ostream &, DataSharingQosPolicy const & ); std::ostream & operator<<( std::ostream &, RTPSEndpointQos const & ); +std::ostream & operator<<( std::ostream &, PublishModeQosPolicy const & ); class DomainParticipantQos; +std::ostream & operator<<( std::ostream &, DomainParticipantQos const & ); + +class DataWriterQos; +std::ostream & operator<<( std::ostream &, DataWriterQos const & ); + } // namespace dds +namespace rtps { +std::ostream & operator<<( std::ostream &, TransportDescriptorInterface const & ); +std::ostream & operator<<( std::ostream &, FlowControllerDescriptor const & ); +} } // namespace fastdds } // namespace eprosima @@ -37,6 +48,7 @@ void from_json( rsutils::json const &, Duration_t & ); namespace rtps { std::ostream & operator<<( std::ostream &, class WriterProxyData const & ); std::ostream & operator<<( std::ostream &, class ReaderProxyData const & ); +std::ostream & operator<<( std::ostream &, BuiltinAttributes const & ); } // namespace rtps } // namespace fastrtps @@ -46,6 +58,9 @@ std::ostream & operator<<( std::ostream &, class ReaderProxyData const & ); namespace realdds { +class dds_participant; + + eprosima::fastdds::dds::ReliabilityQosPolicyKind reliability_kind_from_string( std::string const & ); eprosima::fastdds::dds::DurabilityQosPolicyKind durability_kind_from_string( std::string const & ); eprosima::fastdds::dds::HistoryQosPolicyKind history_kind_from_string( std::string const & ); @@ -101,6 +116,7 @@ void override_liveliness_qos_from_json( eprosima::fastdds::dds::LivelinessQosPol // void override_data_sharing_qos_from_json( eprosima::fastdds::dds::DataSharingQosPolicy & qos, rsutils::json const & ); + // Override QoS endpoint from a JSON source. // The JSON is an object: // { @@ -110,6 +126,25 @@ void override_data_sharing_qos_from_json( eprosima::fastdds::dds::DataSharingQos void override_endpoint_qos_from_json( eprosima::fastdds::dds::RTPSEndpointQos & qos, rsutils::json const & ); +// Override QoS publish- from a JSON source. +// The JSON is an object: +// { +// "flow-control": "name" +// } +// +void override_publish_mode_qos_from_json( eprosima::fastdds::dds::PublishModeQosPolicy & qos, rsutils::json const &, dds_participant const & ); + + +// Override FlowController settings from a JSON source. +// The JSON is an object: +// { +// "max-bytes-per-period": 16384, +// "period-ms": 250 +// } +// +void override_flow_controller_from_json( eprosima::fastdds::rtps::FlowControllerDescriptor &, rsutils::json const & ); + + // Override participant QoS from a JSON source. // The JSON is an object: // { diff --git a/third-party/realdds/src/dds-serialization.cpp b/third-party/realdds/src/dds-serialization.cpp index 7bb5006470..e3e333d04a 100644 --- a/third-party/realdds/src/dds-serialization.cpp +++ b/third-party/realdds/src/dds-serialization.cpp @@ -1,19 +1,44 @@ // License: Apache 2.0. See LICENSE file in root directory. -// Copyright(c) 2023 Intel Corporation. All Rights Reserved. +// Copyright(c) 2023-4 Intel Corporation. All Rights Reserved. #include #include #include +#include + +#include +#include #include #include #include +#include #include #include #include #include +#include +#include +#include + +using rsutils::json; +using rsutils::ios::field; + + +#define field_changed( Inst, Field, Default ) \ + ( Inst.Field != Default.Field ? "*" : "" ) + +// Takes a member of a struct 'Inst.M' and outputs " [*]" where the '*' is there if the value differs from +// the default (contained in a variable _def). +// +#define dump_field_as( Inst, Field, Converter ) \ + field::separator << rsutils::ios::dash_case( #Field ) \ + << field::value << Converter( Inst.Field ) \ + << field_changed( Inst, Field, _def ) +#define dump_field( Inst, Field ) \ + dump_field_as( Inst, Field, /* no converter */ ) namespace eprosima { @@ -36,7 +61,7 @@ std::ostream & operator<<( std::ostream & os, ReliabilityQosPolicyKind kind ) std::ostream & operator<<( std::ostream & os, ReliabilityQosPolicy const & qos ) { - return os << qos.kind; + return os << field::separator << qos.kind; } @@ -59,12 +84,27 @@ std::ostream & operator<<( std::ostream & os, DurabilityQosPolicyKind kind ) std::ostream & operator<<( std::ostream & os, DurabilityQosPolicy const & qos ) { - return os << qos.kind; + return os << field::separator << qos.kind; +} + + +std::ostream & operator<<( std::ostream & os, HistoryQosPolicyKind kind ) +{ + switch( kind ) + { + case eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS: + return os << "keep-last"; + case eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS: + return os << "keep-all"; + } + return os << (int) kind; } std::ostream & operator<<( std::ostream & os, HistoryQosPolicy const & qos ) { + os << field::separator << "kind" << field::value << qos.kind; + os << field::separator << "depth" << field::value << qos.depth; return os; } @@ -108,7 +148,205 @@ std::ostream & operator<<( std::ostream & os, RTPSEndpointQos const & qos ) } +std::ostream & operator<<( std::ostream & os, TransportConfigQos const & qos ) +{ + for( auto const & transport : qos.user_transports ) + { + if( auto udp = std::dynamic_pointer_cast< rtps::UDPTransportDescriptor >( transport ) ) + os << field::separator << "udp" << field::group() << *udp; + else if( auto socket + = std::dynamic_pointer_cast< rtps::SocketTransportDescriptor >( transport ) ) + os << field::separator << "socket" << field::group() << *socket; + else + os << field::separator << "unknown" << field::group() << *transport; + } + TransportConfigQos const _def; + os << dump_field( qos, use_builtin_transports ); + os << dump_field( qos, listen_socket_buffer_size ); + os << dump_field( qos, send_socket_buffer_size ); + return os; +} + + +std::ostream & operator<<( std::ostream & os, WireProtocolConfigQos const & qos ) +{ + WireProtocolConfigQos _def; + os << dump_field( qos, participant_id ); + os << dump_field_as( qos, prefix, realdds::print_raw_guid_prefix ); + os << field::separator << "builtin" << field::group() << qos.builtin; + + //fastrtps::rtps::PortParameters port; + //fastrtps::rtps::ThroughputControllerDescriptor throughput_controller; + //rtps::LocatorList default_unicast_locator_list; + //rtps::LocatorList default_multicast_locator_list; + //rtps::ExternalLocators default_external_unicast_locators; + //bool ignore_non_matching_locators = false; + return os; +} + + +std::ostream & operator<<( std::ostream & os, PropertyPolicyQos const & qos ) +{ + for( auto & prop : qos.properties() ) + os << field::separator << prop.name() << field::value << prop.value(); + return os; +} + + +std::ostream & operator<<( std::ostream & os, DomainParticipantQos const & qos ) +{ + os << "'" << qos.name() << "'"; + os << rsutils::ios::indent(); + if( ! qos.flow_controllers().empty() ) + { + field::group g; + os << field::separator << "flow-controllers" << g; + for( auto const & controller : qos.flow_controllers() ) + os << field::separator << *controller; + } + //EntityFactoryQosPolicy entity_factory_; + //ParticipantResourceLimitsQos allocation_; + os << field::separator << "transport" << field::group() << qos.transport(); + os << field::separator << "wire-protocol" << field::group() << qos.wire_protocol(); + if( ! qos.properties().properties().empty() ) + os << field::separator << "properties" << field::group() << qos.properties(); + return os << rsutils::ios::unindent(); +} + + +std::ostream & operator<<( std::ostream & os, DataWriterQos const & qos ) +{ + os << /*field::separator << "reliability" << field::group() <<*/ qos.reliability(); + if( ! ( qos.durability() == eprosima::fastdds::dds::DurabilityQosPolicy() ) ) + os << /*field::separator << "durability" << field::group() <<*/ qos.durability(); + if( ! ( qos.liveliness() == eprosima::fastdds::dds::LivelinessQosPolicy() ) ) + os << field::separator << "liveliness" << field::value << qos.liveliness(); + if( ! ( qos.history() == eprosima::fastdds::dds::HistoryQosPolicy() ) ) + os << field::separator << "history" << field::group() << qos.history(); + if( qos.publish_mode().flow_controller_name + && qos.publish_mode().flow_controller_name != fastdds::rtps::FASTDDS_FLOW_CONTROLLER_DEFAULT ) + os << field::separator << "flow-control" << field::value << "'" << qos.publish_mode().flow_controller_name << "'"; + return os; + + //DurabilityServiceQosPolicy durability_service_; + //DeadlineQosPolicy deadline_; + //LatencyBudgetQosPolicy latency_budget_; + //DestinationOrderQosPolicy destination_order_; + //HistoryQosPolicy history_; + //ResourceLimitsQosPolicy resource_limits_; + //TransportPriorityQosPolicy transport_priority_; + //LifespanQosPolicy lifespan_; + //UserDataQosPolicy user_data_; + //OwnershipQosPolicy ownership_; + //OwnershipStrengthQosPolicy ownership_strength_; + //WriterDataLifecycleQosPolicy writer_data_lifecycle_; + //PublishModeQosPolicy publish_mode_; + //DataRepresentationQosPolicy representation_; + //PropertyPolicyQos properties_; + //RTPSReliableWriterQos reliable_writer_qos_; + //RTPSEndpointQos endpoint_; + //WriterResourceLimitsQos writer_resource_limits_; + //fastrtps::rtps::ThroughputControllerDescriptor throughput_controller_; + //DataSharingQosPolicy data_sharing_; +} + + } // namespace dds +namespace rtps { + + +std::ostream & operator<<( std::ostream & os, TransportDescriptorInterface const & transport ) +{ + os << field::separator << "max-initial-peers-range" << field::value << transport.max_initial_peers_range(); + os << field::separator << "max-message-size" << field::value << transport.max_message_size(); + return os; +} + + +std::ostream & operator<<( std::ostream & os, SocketTransportDescriptor const & socket ) +{ + os << field::separator << "send-buffer-size" << field::value << socket.sendBufferSize; + os << field::separator << "receive-buffer-size" << socket.receiveBufferSize; + os << field::separator << "time-to-live" << unsigned( socket.TTL ); + if( ! socket.interfaceWhiteList.empty() ) + { + os << field::separator << "whitelist"; + for( auto & ip : socket.interfaceWhiteList ) + os << ' ' << ip; + } + return operator<<( os, static_cast< TransportDescriptorInterface const & >( socket ) ); +} + + +std::ostream & operator<<( std::ostream & os, UDPTransportDescriptor const & udp ) +{ + os << field::separator << "non-blocking-send" << field::value << udp.non_blocking_send; + return operator<<( os, static_cast< SocketTransportDescriptor const & >( udp ) ); +} + + +std::ostream & operator<<( std::ostream & os, FlowControllerSchedulerPolicy scheduler ) +{ + switch( scheduler ) + { + case FlowControllerSchedulerPolicy::FIFO: + os << "FIFO"; + break; + case FlowControllerSchedulerPolicy::ROUND_ROBIN: + os << "ROUND-ROBIN"; + break; + case FlowControllerSchedulerPolicy::HIGH_PRIORITY: + os << "HIGH-PRIORITY"; + break; + case FlowControllerSchedulerPolicy::PRIORITY_WITH_RESERVATION: + os << "PRIORITY-WITH-RESERVATION"; + break; + } + return os; +} + + +void to_json( json & j, FlowControllerSchedulerPolicy scheduler ) +{ + j = ( rsutils::string::from() << scheduler ).str(); +} + + +void from_json( json const & j, FlowControllerSchedulerPolicy & scheduler ) +{ + if( ! j.is_string() ) + throw rsutils::json::type_error::create( 317, "scheduler should be a string", &j ); + auto str = j.string_ref(); + if( str == "FIFO" ) + scheduler = FlowControllerSchedulerPolicy::FIFO; + else if( str == "HIGH-PRIORITY" ) + scheduler = FlowControllerSchedulerPolicy::HIGH_PRIORITY; + else if( str == "ROUND-ROBIN" ) + scheduler = FlowControllerSchedulerPolicy::ROUND_ROBIN; + else if( str == "PRIORITY-WITH-RESERVATION" ) + scheduler = FlowControllerSchedulerPolicy::PRIORITY_WITH_RESERVATION; + else + throw json::type_error::create( 317, "unknown scheduler value '" + str + "'", &j ); +} + + +std::ostream & operator<<( std::ostream & os, FlowControllerDescriptor const & controller ) +{ + if( ! controller.name ) + { + os << 'null'; + } + else + { + os << "'" << controller.name << "'"; + os << " " << controller.scheduler; + os << " " << controller.max_bytes_per_period << " bytes every " << controller.period_ms << " ms"; + } + return os; +} + + +} // namespace rtps } // namespace fastdds } // namespace eprosima @@ -118,7 +356,7 @@ namespace fastrtps { // Allow j["key"] = qos.lease_duration; -void to_json( rsutils::json & j, Duration_t const & duration ) +void to_json( json & j, Duration_t const & duration ) { if( duration == c_TimeInfinite ) j = "infinite"; @@ -130,7 +368,7 @@ void to_json( rsutils::json & j, Duration_t const & duration ) // Allow j.get< eprosima::fastrtps::Duration_t >(); -void from_json( rsutils::json const & j, Duration_t & duration ) +void from_json( json const & j, Duration_t & duration ) { if( j.is_string() ) { @@ -140,7 +378,7 @@ void from_json( rsutils::json const & j, Duration_t & duration ) else if( rsutils::string::nocase_equal( s, "invalid" ) ) duration = c_TimeInvalid; else - throw rsutils::json::type_error::create( 317, "unknown duration value '" + s + "'", &j ); + throw json::type_error::create( 317, "unknown duration value '" + s + "'", &j ); } else duration = realdds::dds_time( j.get< double >() ); @@ -152,26 +390,68 @@ namespace rtps { std::ostream & operator<<( std::ostream & os, WriterProxyData const & info ) { - os << "'" << info.topicName() << "' ["; - os << /*"type " <<*/ info.typeName(); - os << /*" reliability"*/ " " << info.m_qos.m_reliability; + field::group group; + os << "'" << info.topicName() << "' " << group; + os << /*field::separator << "type" <<*/ field::value << info.typeName(); + os << /*field::separator << "reliability" << field::group() <<*/ info.m_qos.m_reliability; if( ! ( info.m_qos.m_durability == eprosima::fastdds::dds::DurabilityQosPolicy() ) ) - os << /*" durability"*/ " " << info.m_qos.m_durability; + os << /*field::separator << "durability" << field::group() <<*/ info.m_qos.m_durability; if( ! ( info.m_qos.m_liveliness == eprosima::fastdds::dds::LivelinessQosPolicy() ) ) - os << " liveliness" " " << info.m_qos.m_liveliness; - os << "]"; + os << field::separator << "liveliness" << field::value << info.m_qos.m_liveliness; + if( info.m_qos.m_publishMode.flow_controller_name ) + os << field::separator << "flow-controller" << field::value << "'" + << info.m_qos.m_publishMode.flow_controller_name << "'"; return os; } std::ostream & operator<<( std::ostream & os, ReaderProxyData const & info ) { - os << "'" << info.topicName() << "' ["; - os << /*"type " <<*/ info.typeName(); - os << /*" reliability"*/ " " << info.m_qos.m_reliability; - if( !(info.m_qos.m_durability == eprosima::fastdds::dds::DurabilityQosPolicy()) ) - os << /*" durability"*/ " " << info.m_qos.m_durability; - os << "]"; + field::group group; + os << "'" << info.topicName() << "' " << group; + os << /*field::separator << "type" <<*/ field::value << info.typeName(); + os << /*field::separator << "reliability" << field::group() <<*/ info.m_qos.m_reliability; + if( ! ( info.m_qos.m_durability == eprosima::fastdds::dds::DurabilityQosPolicy() ) ) + os << /*field::separator << "durability" << field::group() <<*/ info.m_qos.m_durability; + return os; +} + + +std::ostream & operator<<( std::ostream & os, DiscoverySettings const & qos ) +{ + DiscoverySettings const _def; + //DiscoveryProtocol_t discoveryProtocol = DiscoveryProtocol_t::SIMPLE; + //bool use_SIMPLE_EndpointDiscoveryProtocol = true; + //bool use_STATIC_EndpointDiscoveryProtocol = false; + os << dump_field_as( qos, leaseDuration, json ); + //Duration_t leaseDuration_announcementperiod = { 3, 0 }; + //InitialAnnouncementConfig initial_announcements; + //SimpleEDPAttributes m_simpleEDP; + //PDPFactory m_PDPfactory{}; + //Duration_t discoveryServer_client_syncperiod = { 0, 450 * 1000000 }; // 450 milliseconds + //eprosima::fastdds::rtps::RemoteServerList_t m_DiscoveryServers; + //ParticipantFilteringFlags_t ignoreParticipantFlags = ParticipantFilteringFlags::NO_FILTER; + return os; +} + + +std::ostream & operator<<( std::ostream & os, BuiltinAttributes const & qos ) +{ + os << field::separator << "discovery-config" << field::group() << qos.discovery_config; + //bool use_WriterLivelinessProtocol = true; + //TypeLookupSettings typelookup_config; + //LocatorList_t metatrafficUnicastLocatorList; + //LocatorList_t metatrafficMulticastLocatorList; + //fastdds::rtps::ExternalLocators metatraffic_external_unicast_locators; + //LocatorList_t initialPeersList; + //MemoryManagementPolicy_t readerHistoryMemoryPolicy = + // MemoryManagementPolicy_t::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + //uint32_t readerPayloadSize = BUILTIN_DATA_MAX_SIZE; + //MemoryManagementPolicy_t writerHistoryMemoryPolicy = + // MemoryManagementPolicy_t::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + //uint32_t writerPayloadSize = BUILTIN_DATA_MAX_SIZE; + //uint32_t mutation_tries = 100u; + //bool avoid_builtin_multicast = true; return os; } @@ -244,44 +524,44 @@ eprosima::fastrtps::rtps::MemoryManagementPolicy_t history_memory_policy_from_st } -void override_reliability_qos_from_json( eprosima::fastdds::dds::ReliabilityQosPolicy & qos, rsutils::json const & j ) +void override_reliability_qos_from_json( eprosima::fastdds::dds::ReliabilityQosPolicy & qos, json const & j ) { if( j.is_string() ) qos.kind = reliability_kind_from_string( j.string_ref() ); else if( j.is_object() ) { - if( auto kind_j = j.nested( "kind", &rsutils::json::is_string ) ) + if( auto kind_j = j.nested( "kind", &json::is_string ) ) qos.kind = reliability_kind_from_string( kind_j.string_ref() ); } } -void override_durability_qos_from_json( eprosima::fastdds::dds::DurabilityQosPolicy & qos, rsutils::json const & j ) +void override_durability_qos_from_json( eprosima::fastdds::dds::DurabilityQosPolicy & qos, json const & j ) { if( j.is_string() ) - qos.kind = durability_kind_from_string( j.get_ref< const rsutils::json::string_t & >() ); + qos.kind = durability_kind_from_string( j.get_ref< const json::string_t & >() ); else if( j.is_object() ) { - if( auto kind_j = j.nested( "kind", &rsutils::json::is_string ) ) + if( auto kind_j = j.nested( "kind", &json::is_string ) ) qos.kind = durability_kind_from_string( kind_j.string_ref() ); } } -void override_history_qos_from_json( eprosima::fastdds::dds::HistoryQosPolicy & qos, rsutils::json const & j ) +void override_history_qos_from_json( eprosima::fastdds::dds::HistoryQosPolicy & qos, json const & j ) { if( j.is_number_unsigned() ) qos.depth = j.get< int32_t >(); else if( j.is_object() ) { - if( auto kind_j = j.nested( "kind", &rsutils::json::is_string ) ) + if( auto kind_j = j.nested( "kind", &json::is_string ) ) qos.kind = history_kind_from_string( kind_j.string_ref() ); j.nested( "depth" ).get_ex( qos.depth ); } } -void override_liveliness_qos_from_json( eprosima::fastdds::dds::LivelinessQosPolicy & qos, rsutils::json const & j ) +void override_liveliness_qos_from_json( eprosima::fastdds::dds::LivelinessQosPolicy & qos, json const & j ) { if( j.is_object() ) { @@ -312,7 +592,7 @@ void override_liveliness_qos_from_json( eprosima::fastdds::dds::LivelinessQosPol } -void override_data_sharing_qos_from_json( eprosima::fastdds::dds::DataSharingQosPolicy & qos, rsutils::json const & j ) +void override_data_sharing_qos_from_json( eprosima::fastdds::dds::DataSharingQosPolicy & qos, json const & j ) { if( j.is_boolean() ) { @@ -328,17 +608,61 @@ void override_data_sharing_qos_from_json( eprosima::fastdds::dds::DataSharingQos } -void override_endpoint_qos_from_json( eprosima::fastdds::dds::RTPSEndpointQos & qos, rsutils::json const & j ) +void override_endpoint_qos_from_json( eprosima::fastdds::dds::RTPSEndpointQos & qos, json const & j ) { if( j.is_object() ) { - if( auto policy_j = j.nested( "history-memory-policy", &rsutils::json::is_string ) ) + if( auto policy_j = j.nested( "history-memory-policy", &json::is_string ) ) qos.history_memory_policy = history_memory_policy_from_string( policy_j.string_ref() ); } } -static bool parse_ip_list( rsutils::json const & j, std::string const & key, std::vector< std::string > * output ) +void override_publish_mode_qos_from_json( eprosima::fastdds::dds::PublishModeQosPolicy & qos, json const & j, dds_participant const & participant ) +{ + // At this time, this must be either missing, or a string for the flow-controller name + if( ! j.exists() ) + return; + if( auto j_name = j.nested( "flow-control" ) ) + { + if( j_name.is_null() ) + { + qos = eprosima::fastdds::dds::PublishModeQosPolicy(); + } + else if( j_name.is_string() ) + { + auto & controller_name = j_name.string_ref(); + for( auto & controller : participant.get()->get_qos().flow_controllers() ) + { + if( ! strcmp( controller_name.c_str(), controller->name ) ) + { + qos.kind = eprosima::fastdds::dds::ASYNCHRONOUS_PUBLISH_MODE; + qos.flow_controller_name = controller->name; + return; + } + } + DDS_THROW( runtime_error, "invalid flow-control name in publish-mode; got " << j_name ); + } + else + DDS_THROW( runtime_error, "flow-control must be a name; got " << j_name ); + } +} + + +void override_flow_controller_from_json( eprosima::fastdds::rtps::FlowControllerDescriptor & controller, json const & j ) +{ + if( ! j.exists() ) + return; + if( ! j.is_object() ) + DDS_THROW( runtime_error, "flow-controller must be an object; got " << j ); + + j.nested( "max-bytes-per-period", &json::is_number_integer ).get_ex( controller.max_bytes_per_period ); + j.nested( "period-ms", &json::is_number_integer ).get_ex( controller.period_ms ); + j.nested( "scheduler" ).get_ex( controller.scheduler ); +} + + +static bool parse_ip_list( json const & j, std::string const & key, std::vector< std::string > * output ) { if( auto whitelist_j = j.nested( key ) ) { @@ -356,7 +680,7 @@ static bool parse_ip_list( rsutils::json const & j, std::string const & key, std } -static void override_udp_settings( eprosima::fastdds::rtps::UDPTransportDescriptor & udp, rsutils::json const & j ) +static void override_udp_settings( eprosima::fastdds::rtps::UDPTransportDescriptor & udp, json const & j ) { j.nested( "send-buffer-size" ).get_ex( udp.sendBufferSize ); j.nested( "receive-buffer-size" ).get_ex( udp.receiveBufferSize ); @@ -366,7 +690,7 @@ static void override_udp_settings( eprosima::fastdds::rtps::UDPTransportDescript } -void override_participant_qos_from_json( eprosima::fastdds::dds::DomainParticipantQos & qos, rsutils::json const & j ) +void override_participant_qos_from_json( eprosima::fastdds::dds::DomainParticipantQos & qos, json const & j ) { if( ! j.is_object() ) return; @@ -383,6 +707,48 @@ void override_participant_qos_from_json( eprosima::fastdds::dds::DomainParticipa break; } } + + if( auto max_bytes_j = j.nested( "max-out-message-bytes", &json::is_number_unsigned ) ) + { + // Override maximum message size, in bytes, for SENDING messages on RTPS. + // If you need both send & receive, override udp/max-message-size. + auto & props = qos.properties().properties(); + auto value = std::to_string( max_bytes_j.get< uint32_t >() ); + bool found = false; + for( auto & prop : props ) + { + if( prop.name() == "fastdds.max_message_size" ) + { + prop.value() = value; + found = true; + break; + } + } + if( ! found ) + props.emplace_back( "fastdds.max_message_size", value ); + } + + if( auto controllers_j = j.nested( "flow-controllers", &json::is_object ) ) + { + class controller_wrapper : public eprosima::fastdds::rtps::FlowControllerDescriptor + { + std::string _name; + + public: + controller_wrapper( std::string const & controller_name ) + : _name( controller_name ) + { + this->name = _name.c_str(); + } + }; + for( auto it : controllers_j.get_json().items() ) + { + auto p_controller = std::make_shared< controller_wrapper >( it.key() ); + // Right now all the values are at their defaults; override them: + override_flow_controller_from_json( *p_controller, it.value() ); + qos.flow_controllers().push_back( p_controller ); + } + } } From 536b132b63307a8cf662726d105132a345065549 Mon Sep 17 00:00:00 2001 From: Eran Date: Wed, 17 Apr 2024 14:32:39 +0300 Subject: [PATCH 06/15] topic-writer qos overrides publish-mode; changed function --- .../realdds/include/realdds/dds-topic-writer.h | 9 ++++----- third-party/realdds/src/dds-device-impl.cpp | 2 +- third-party/realdds/src/dds-device-server.cpp | 2 +- .../realdds/src/dds-notification-server.cpp | 2 +- third-party/realdds/src/dds-topic-writer.cpp | 15 ++++++++------- 5 files changed, 15 insertions(+), 15 deletions(-) diff --git a/third-party/realdds/include/realdds/dds-topic-writer.h b/third-party/realdds/include/realdds/dds-topic-writer.h index bb6e6b13d3..dcb09d2dbf 100644 --- a/third-party/realdds/include/realdds/dds-topic-writer.h +++ b/third-party/realdds/include/realdds/dds-topic-writer.h @@ -1,6 +1,5 @@ // License: Apache 2.0. See LICENSE file in root directory. -// Copyright(c) 2022 Intel Corporation. All Rights Reserved. - +// Copyright(c) 2022-4 Intel Corporation. All Rights Reserved. #pragma once #include @@ -73,11 +72,11 @@ class dds_topic_writer : protected eprosima::fastdds::dds::DataWriterListener = eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS, // default eprosima::fastdds::dds::DurabilityQosPolicyKind durability = eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS ); // default is transient local - - // Override default values with JSON contents - void override_from_json( rsutils::json const & ); }; + // Override default values with JSON contents + void override_qos_from_json( qos &, rsutils::json const & ); + // The callbacks should be set before we actually create the underlying DDS objects, so the writer does not void run( qos const & = qos() ); diff --git a/third-party/realdds/src/dds-device-impl.cpp b/third-party/realdds/src/dds-device-impl.cpp index e732ae2fe9..98cc5009b5 100644 --- a/third-party/realdds/src/dds-device-impl.cpp +++ b/third-party/realdds/src/dds-device-impl.cpp @@ -556,7 +556,7 @@ void dds_device::impl::create_control_writer() _control_writer = std::make_shared< dds_topic_writer >( topic ); dds_topic_writer::qos wqos( eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS ); wqos.history().depth = 10; // default is 1 - wqos.override_from_json( _device_settings.nested( "control" ) ); + _control_writer->override_qos_from_json( wqos, _device_settings.nested( "control" ) ); _control_writer->run( wqos ); } diff --git a/third-party/realdds/src/dds-device-server.cpp b/third-party/realdds/src/dds-device-server.cpp index e59af7bcf8..b10a327735 100644 --- a/third-party/realdds/src/dds-device-server.cpp +++ b/third-party/realdds/src/dds-device-server.cpp @@ -219,7 +219,7 @@ void dds_device_server::init( std::vector< std::shared_ptr< dds_stream_server > _metadata_writer = std::make_shared< dds_topic_writer >( topic, _publisher ); dds_topic_writer::qos wqos( eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS ); wqos.history().depth = 10; // default is 1 - wqos.override_from_json( _subscriber->get_participant()->settings().nested( "device", "metadata" ) ); + _metadata_writer->override_qos_from_json( wqos, _subscriber->get_participant()->settings().nested( "device", "metadata" ) ); _metadata_writer->run( wqos ); } } diff --git a/third-party/realdds/src/dds-notification-server.cpp b/third-party/realdds/src/dds-notification-server.cpp index 8e6ea4fbbb..0bcbf0b783 100644 --- a/third-party/realdds/src/dds-notification-server.cpp +++ b/third-party/realdds/src/dds-notification-server.cpp @@ -86,7 +86,7 @@ dds_notification_server::dds_notification_server( std::shared_ptr< dds_publisher // If history is too small writer will not be able to re-transmit needed samples. // Setting history to cover known use-cases plus some spare wqos.history().depth = 24; - wqos.override_from_json( publisher->get_participant()->settings().nested( "device", "notification" ) ); + _writer->override_qos_from_json( wqos, publisher->get_participant()->settings().nested( "device", "notification" ) ); _writer->run( wqos ); } diff --git a/third-party/realdds/src/dds-topic-writer.cpp b/third-party/realdds/src/dds-topic-writer.cpp index 72ced8853e..7051b44f38 100644 --- a/third-party/realdds/src/dds-topic-writer.cpp +++ b/third-party/realdds/src/dds-topic-writer.cpp @@ -87,16 +87,17 @@ dds_topic_writer::qos::qos( eprosima::fastdds::dds::ReliabilityQosPolicyKind rel } -void dds_topic_writer::qos::override_from_json( rsutils::json const & qos_settings ) +void dds_topic_writer::override_qos_from_json( qos & wqos, rsutils::json const & qos_settings ) { // Default values should be set before we're called: // All we do here is override those - if specified! - override_reliability_qos_from_json( reliability(), qos_settings.nested( "reliability" ) ); - override_durability_qos_from_json( durability(), qos_settings.nested( "durability" ) ); - override_history_qos_from_json( history(), qos_settings.nested( "history" ) ); - override_liveliness_qos_from_json( liveliness(), qos_settings.nested( "liveliness" ) ); - override_data_sharing_qos_from_json( data_sharing(), qos_settings.nested( "data-sharing" ) ); - override_endpoint_qos_from_json( endpoint(), qos_settings.nested( "endpoint" ) ); + override_reliability_qos_from_json( wqos.reliability(), qos_settings.nested( "reliability" ) ); + override_durability_qos_from_json( wqos.durability(), qos_settings.nested( "durability" ) ); + override_history_qos_from_json( wqos.history(), qos_settings.nested( "history" ) ); + override_liveliness_qos_from_json( wqos.liveliness(), qos_settings.nested( "liveliness" ) ); + override_data_sharing_qos_from_json( wqos.data_sharing(), qos_settings.nested( "data-sharing" ) ); + override_endpoint_qos_from_json( wqos.endpoint(), qos_settings.nested( "endpoint" ) ); + override_publish_mode_qos_from_json( wqos.publish_mode(), qos_settings.nested( "publish-mode" ), *publisher()->get_participant() ); } From 4f03e5a2726a9df3d41329b642cd20b429e4854f Mon Sep 17 00:00:00 2001 From: Eran Date: Wed, 17 Apr 2024 14:36:24 +0300 Subject: [PATCH 07/15] update pyrealdds --- third-party/realdds/py/pyrealdds.cpp | 41 ++++++++++++++++++---------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/third-party/realdds/py/pyrealdds.cpp b/third-party/realdds/py/pyrealdds.cpp index 83a1c7d499..bc7531b55c 100644 --- a/third-party/realdds/py/pyrealdds.cpp +++ b/third-party/realdds/py/pyrealdds.cpp @@ -97,9 +97,17 @@ json load_rs_settings( json const & local_settings ) // Patch any script-specific settings // NOTE: this is also accessed by pyrealsense2, where a "context" hierarchy is still used - auto script = script_name(); - if( auto script_settings = config.nested( script, "context", "dds" ) ) - settings.override( script_settings, "config-file/" + script + "/context" ); + try + { + auto script = script_name(); + if( auto script_settings = config.nested( script, "context", "dds" ) ) + settings.override( script_settings, "config-file/" + script + "/context" ); + } + catch( std::exception const & e ) + { + // Expected if we're not in a script + LOG_DEBUG( "failed to get script name: " << e.what() ); + } // We should always have DDS enabled if( settings.is_object() ) @@ -257,19 +265,15 @@ PYBIND11_MODULE(NAME, m) { []( dds_participant & self, json const & local_settings, realdds::dds_domain_id domain_id ) { self.init( domain_id, script_name(), local_settings ); }, "local-settings"_a = json::object(), "domain-id"_a = -1 ) - .def( "init", &dds_participant::init, "domain-id"_a, "participant-name"_a, "local-settings"_a = json::object() ) + .def( "init", + py::overload_cast< realdds::dds_domain_id, std::string const &, json const & >( &dds_participant::init ), + "domain-id"_a, "participant-name"_a, "local-settings"_a = json::object() ) .def( "is_valid", &dds_participant::is_valid ) .def( "guid", &dds_participant::guid ) .def( "create_guid", &dds_participant::create_guid ) .def( "__bool__", &dds_participant::is_valid ) - .def( "name", - []( dds_participant const & self ) { - eprosima::fastdds::dds::DomainParticipantQos qos; - if( ReturnCode_t::RETCODE_OK == self.get()->get_qos( qos ) ) - return std::string( qos.name() ); - return std::string(); - } ) - .def( "name_from_guid", []( dds_guid const & guid ) { return dds_participant::name_from_guid( guid ); } ) + .def( "name", &dds_participant::name ) + .def_static( "name_from_guid", []( dds_guid const & guid ) { return dds_participant::name_from_guid( guid ); } ) .def( "names", []( dds_participant const & self ) { return self.get()->get_participant_names(); } ) .def( "settings", &dds_participant::settings ) .def( "__repr__", @@ -282,9 +286,7 @@ PYBIND11_MODULE(NAME, m) { } else { - eprosima::fastdds::dds::DomainParticipantQos qos; - if( ReturnCode_t::RETCODE_OK == self.get()->get_qos( qos ) ) - os << " \"" << qos.name() << "\""; + os << " \"" << self.name() << "\""; os << " " << realdds::print_guid( self.guid() ); } os << ">"; @@ -411,9 +413,13 @@ PYBIND11_MODULE(NAME, m) { using writer_qos = realdds::dds_topic_writer::qos; py::class_< writer_qos >( m, "writer_qos" ) // + .def_property_readonly( "flow_controller", + []( writer_qos const & self ) -> std::string + { return self.publish_mode().flow_controller_name; } ) .def( "__repr__", []( writer_qos const & self ) { std::ostringstream os; os << "<" SNAME ".writer_qos"; + os << self; os << ">"; return os.str(); } ); @@ -436,6 +442,7 @@ PYBIND11_MODULE(NAME, m) { ( eprosima::fastdds::dds::PublicationMatchedStatus const & status ), callback( self, status.current_count_change ); ) ) .def( "topic", &dds_topic_writer::topic ) + .def( "override_qos_from_json", &dds_topic_writer::override_qos_from_json ) .def( "run", &dds_topic_writer::run ) .def( "has_readers", &dds_topic_writer::has_readers ) .def( "wait_for_readers", &dds_topic_writer::wait_for_readers ) @@ -578,6 +585,7 @@ PYBIND11_MODULE(NAME, m) { .def_readwrite( "width", &image_msg::width ) .def_readwrite( "height", &image_msg::height ) .def_readwrite( "timestamp", &image_msg::timestamp ) + .def( "__bool__", &image_msg::is_valid ) .def( "__repr__", []( image_msg const & self ) { @@ -627,6 +635,8 @@ PYBIND11_MODULE(NAME, m) { std::string const & ) >( &blob_msg::create_topic ) ) .def( "data", []( blob_msg const & self ) { return self.data(); } ) .def( "size", []( blob_msg const & self ) { return self.data().size(); } ) + .def( "crc", []( blob_msg const & self ) { return rsutils::number::calc_crc32( self.data().data(), self.data().size() ); } ) + .def( "__bool__", &blob_msg::is_valid ) .def( "__repr__", []( blob_msg const & self ) { @@ -671,6 +681,7 @@ PYBIND11_MODULE(NAME, m) { "timestamp", []( imu_msg const & self ) { return self.timestamp(); }, []( imu_msg & self, dds_time time ) { self.timestamp( time ); } ) + .def( "__bool__", &imu_msg::is_valid ) .def( "__repr__", []( imu_msg const & self ) { From 7d34fd085d4d8fdf9942b78e755cc5812bdf5c5d Mon Sep 17 00:00:00 2001 From: Eran Date: Thu, 18 Apr 2024 08:56:49 +0300 Subject: [PATCH 08/15] add format_coversion doc --- src/device.h | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/device.h b/src/device.h index 1c2241020b..630df0ab42 100644 --- a/src/device.h +++ b/src/device.h @@ -21,10 +21,30 @@ namespace librealsense { +// Device profiles undergo format conversion before becoming available to the user. This is typically done within each +// sensor's init_stream_profiles(), but the behavior can be overriden by the user through the device JSON settings (see +// device ctor) to enable querying the device's raw or basic formats only. +// +// Note that streaming may be available only with full (the default) profiles. The others are for inspection only, and +// can directly be queried in rs-enumerate-devices. +// +// Note also that default profiles (those returned by get_profiles_tags()) must take the conversion method into account +// or profiles may not get tagged correctly in non-full modes. +// enum class format_conversion { + // Report raw profiles as provided by the camera, without any manipulation whatsoever by librealsense. raw, + + // Take the raw profiles, perform the librealsense mappings, but then remove any "unwanted" mappings: + // - any conversion between formats is thrown away (e.g., YUYV->RGB8) + // - colored infrared is removed + // - interleaved (Y12I, Y8I) are kept (so become Y16 and Y8, respectively) + // See formats_converter::drop_non_basic_formats() basic, + + // The default conversion mode: all the librealsense mappings are intact; the user will see profiles including + // format conversions (e.g., YUYV->RGB8), etc. full }; From c673840ddf4ee8556787f8957c8f474788da967b Mon Sep 17 00:00:00 2001 From: Eran Date: Wed, 24 Apr 2024 09:56:50 +0300 Subject: [PATCH 09/15] update dds-sniffer --- tools/dds/dds-sniffer/rs-dds-sniffer.cpp | 36 ++++++------------------ 1 file changed, 8 insertions(+), 28 deletions(-) diff --git a/tools/dds/dds-sniffer/rs-dds-sniffer.cpp b/tools/dds/dds-sniffer/rs-dds-sniffer.cpp index 17cf401534..bc925b8d5a 100644 --- a/tools/dds/dds-sniffer/rs-dds-sniffer.cpp +++ b/tools/dds/dds-sniffer/rs-dds-sniffer.cpp @@ -1,5 +1,5 @@ // License: Apache 2.0. See LICENSE file in root directory. -// Copyright(c) 2022 Intel Corporation. All Rights Reserved. +// Copyright(c) 2022-4 Intel Corporation. All Rights Reserved. #include "rs-dds-sniffer.h" @@ -32,22 +32,6 @@ using namespace TCLAP; using namespace eprosima::fastdds::dds; using namespace eprosima::fastrtps::types; -// FastDDS GUID_t: (MSB first, little-endian; see GuidUtils.hpp) -// 2 bytes - vendor ID -// 2 bytes - host -// 4 bytes - process (2 pid, 2 random) -// 4 bytes - participant -// 4 bytes - entity ID (reader/writer) -// For example: -// Participant 1 - 01.0f.be.05.f0.09.86.b6.01.00.00.00|0.0.1.c1 -// Writer under participant 1 - 01.0f.be.05.f0.09.86.b6.01.00.00.00|0.0.1.2 -// Participant 2 of same process - 01.0f.be.05.f0.09.86.b6.02.00.00.00|0.0.1.c1 -// Reader under participant 2 - 01.0f.be.05.f0.09.86.b6.02.00.00.00|0.0.1.7 -// Participant 3 other process - 01.0f.be.05.88.50.ea.4a.01.00.00.00|0.0.1.c1 -// Note same host for all, participant and entity IDs may be repeat for different processes -// To differentiate entities of different participant with same name we append process GUID values to the name -constexpr uint8_t GUID_PROCESS_LOCATION = 4; - static eprosima::fastrtps::rtps::GuidPrefix_t std_prefix; static inline realdds::print_guid print_guid( realdds::dds_guid const & guid ) @@ -335,7 +319,7 @@ void dds_sniffer::on_type_discovery( char const * topic_name, DynamicType_ptr dy // Register type with participant TypeSupport type_support( DDS_API_CALL( new DynamicPubSubType( dyn_type ) ) ); DDS_API_CALL( type_support.register_type( _participant.get() ) ); - std::cout << "Discovered topic '" << topic_name << "' of type '" << type_support->getName() << "'" << std::endl; + std::cout << "+Topic '" << topic_name << "' of type '" << type_support->getName() << "'" << std::endl; if( _print_topic_samples ) { @@ -500,8 +484,8 @@ void dds_sniffer::print_writer_discovered( realdds::dds_guid guid, } else { - std::cout << "DataWriter " << print_guid( guid ) << " publishing topic '" << topic_name - << ( discovered ? "' discovered" : "' removed" ) << std::endl; + std::cout << ( discovered ? "+" : "-" ) << print_guid( guid ) << " publishing topic '" << topic_name << "'" + << std::endl; } } @@ -518,8 +502,8 @@ void dds_sniffer::print_reader_discovered( realdds::dds_guid guid, } else { - std::cout << "DataReader " << print_guid( guid ) << " reading topic '" << topic_name - << ( discovered ? "' discovered" : "' removed" ) << std::endl; + std::cout << ( discovered ? "+" : "-" ) << print_guid( guid ) << " reading topic '" << topic_name << "'" + << std::endl; } } @@ -535,12 +519,8 @@ void dds_sniffer::print_participant_discovered( realdds::dds_guid guid, } else { - //prefix_.value[4] = static_cast( pid & 0xFF ); - //prefix_.value[5] = static_cast( ( pid >> 8 ) & 0xFF ); - uint16_t pid - = guid.guidPrefix.value[GUID_PROCESS_LOCATION] + ( guid.guidPrefix.value[GUID_PROCESS_LOCATION + 1] << 8 ); - std::cout << "Participant " << print_guid( guid ) << " '" << participant_name << "' (" << std::hex << pid - << std::dec << ") " << ( discovered ? " discovered" : " removed" ) << std::endl; + std::cout << ( discovered ? "+" : "-" ) << "Participant '" << participant_name << "' " + << realdds::print_raw_guid( guid ) << std::endl; } } From 953add0f1b9e129432dc5b578af66c8c54f08123 Mon Sep 17 00:00:00 2001 From: Eran Date: Sun, 12 May 2024 09:23:09 +0300 Subject: [PATCH 10/15] update topic-send; add topic-sink --- third-party/realdds/scripts/topic-send.py | 53 ++++++++------ third-party/realdds/scripts/topic-sink.py | 89 +++++++++++++++++++++++ 2 files changed, 121 insertions(+), 21 deletions(-) create mode 100644 third-party/realdds/scripts/topic-sink.py diff --git a/third-party/realdds/scripts/topic-send.py b/third-party/realdds/scripts/topic-send.py index 02c29233b5..6f9fb991e7 100644 --- a/third-party/realdds/scripts/topic-send.py +++ b/third-party/realdds/scripts/topic-send.py @@ -1,5 +1,5 @@ # License: Apache 2.0. See LICENSE file in root directory. -# Copyright(c) 2023 Intel Corporation. All Rights Reserved. +# Copyright(c) 2023-4 Intel Corporation. All Rights Reserved. from argparse import ArgumentParser from argparse import ArgumentTypeError as ArgumentError # NOTE: only ArgumentTypeError passes along the original error string @@ -42,7 +42,15 @@ def e( *a, **kw ): dds.debug( args.debug ) -settings = {} +settings = { + 'flow-controllers': { + 'blob': { + 'max-bytes-per-period': 256 * 1470, + 'period-ms': 250 + } + }, + 'max-out-message-bytes': 1470 + } participant = dds.participant() participant.init( dds.load_rs_settings( settings ), args.domain ) @@ -54,28 +62,28 @@ def e( *a, **kw ): e( '--blob requires --topic' ) sys.exit( 1 ) topic_path = args.topic + import os if not os.path.isfile( args.blob ): e( '--blob does not exist:', args.blob ) sys.exit( 1 ) writer = dds.topic_writer( dds.message.blob.create_topic( participant, topic_path )) - writer.run( dds.topic_writer.qos() ) # reliable - # Let the client pick up on the new entity - if we send it too quickly, they won't see it before we disappear... - time.sleep( 1 ) + wqos = dds.topic_writer.qos() # reliable + #writer.override_qos_from_json( wqos, { 'publish-mode': { 'flow-control': 'blob' } } ) + writer.run( wqos ) + if not writer.wait_for_readers( dds.time( 2. ) ): + e( 'Timeout waiting for readers' ) + sys.exit( 1 ) with open( args.blob, mode='rb' ) as file: # b is important -> binary blob = dds.message.blob( file.read() ) - if not writer.has_readers(): - e( 'No readers exist on topic:', topic_path ) - sys.exit( 1 ) i( f'Writing {blob} on {topic_path} ...' ) start = dds.now() blob.write_to( writer ) - if args.ack: - if not writer.wait_for_acks( dds.time( 5. ) ): # seconds - e( 'Timeout waiting for ack' ) - sys.exit( 1 ) - i( f'Acknowledged ({dds.timestr( dds.now(), start )})' ) - else: - i( f'Done' ) + # We must wait for acks, since we use a flow controller and write_to() will return before we've + # actually finished the send + if not writer.wait_for_acks( dds.time( 5. ) ): # seconds + e( 'Timeout waiting for ack' ) + sys.exit( 1 ) + i( f'Acknowledged' ) elif args.device: info = dds.message.device_info() @@ -91,8 +99,9 @@ def e( *a, **kw ): wait_for_reply = True i( f'Sending {message} on {info.topic_root}' ) + start = dds.now() reply = device.send_control( message, wait_for_reply ) - i( f'Got back {reply}' ) + i( f'{reply}' ) if args.debug or not wait_for_reply: # Sleep a bit, to allow us to catch and display any replies @@ -106,10 +115,8 @@ def e( *a, **kw ): topic_path = args.topic writer = dds.topic_writer( dds.message.flexible.create_topic( participant, topic_path )) writer.run( dds.topic_writer.qos() ) - # Let the client pick up on the new entity - if we send it too quickly, they won't see it before we disappear... - time.sleep( 1 ) - if not writer.has_readers(): - e( 'No readers exist on topic:', topic_path ) + if not writer.wait_for_readers( dds.time( 2. ) ): + e( 'Timeout waiting for readers' ) sys.exit( 1 ) start = dds.now() dds.message.flexible( message ).write_to( writer ) @@ -118,6 +125,10 @@ def e( *a, **kw ): if not writer.wait_for_acks( dds.time( 5. ) ): # seconds e( 'Timeout waiting for ack' ) sys.exit( 1 ) - i( f'Acknowledged ({dds.timestr( dds.now(), start )})' ) + i( f'Acknowledged' ) + # NOTE: if we don't wait for acks there's no guarrantee that the message is received; even if + # all the packets are sent, they may need resending (reliable) but if we exit they won't be... + +i( f'After {dds.timestr( dds.now(), start )}' ) diff --git a/third-party/realdds/scripts/topic-sink.py b/third-party/realdds/scripts/topic-sink.py new file mode 100644 index 0000000000..ba3df0514f --- /dev/null +++ b/third-party/realdds/scripts/topic-sink.py @@ -0,0 +1,89 @@ +# License: Apache 2.0. See LICENSE file in root directory. +# Copyright(c) 2024 Intel Corporation. All Rights Reserved. + +from argparse import ArgumentParser +from argparse import ArgumentTypeError as ArgumentError # NOTE: only ArgumentTypeError passes along the original error string +args = ArgumentParser() +args.add_argument( '--debug', action='store_true', help='enable debug mode' ) +args.add_argument( '--quiet', action='store_true', help='no output' ) +args.add_argument( '--topic', metavar='', help='the topic on which to listen' ) +args.add_argument( '--blob', action='store_true', help='when set, listen for blobs instead of flexible messages' ) +def domain_arg(x): + t = int(x) + if t <= 0 or t > 232: + raise ArgumentError( f'--domain should be [0-232]' ) + return t +args.add_argument( '--domain', metavar='<0-232>', type=domain_arg, default=-1, help='DDS domain to use (default=0)' ) +args = args.parse_args() + + +if args.quiet: + def i( *a, **kw ): + pass +else: + def i( *a, **kw ): + print( '-I-', *a, **kw ) +def e( *a, **kw ): + print( '-E-', *a, **kw ) + +import sys + +if not args.topic: + e( '--topic is required' ) + sys.exit( 1 ) +topic_path = args.topic + + +import pyrealdds as dds +import time + +dds.debug( args.debug ) + +settings = {} + +participant = dds.participant() +participant.init( dds.load_rs_settings( settings ), args.domain ) + +if args.blob: + reader = dds.topic_reader( dds.message.blob.create_topic( participant, topic_path )) + def on_data_available( reader ): + got_something = False + while True: + sample = dds.message.sample_info() + msg = dds.message.blob.take_next( reader, sample ) + if not msg: + if not got_something: + raise RuntimeError( "expected message not received!" ) + break + i( f'-----> {msg}', ) + got_something = True + +else: + reader = dds.topic_reader( dds.message.flexible.create_topic( participant, topic_path )) + import json + def on_data_available( reader ): + got_something = False + while True: + sample = dds.message.sample_info() + msg = dds.message.flexible.take_next( reader, sample ) + if not msg: + if not got_something: + raise RuntimeError( "expected message not received!" ) + break + i( f'-----> {json.dumps( msg.json_data(), indent=4 )}', ) + got_something = True + +reader.on_data_available( on_data_available ) +reader.run( dds.topic_reader.qos() ) + +# Keep waiting until the user breaks... +import signal +def handler(signum, frame): + #res = input("Ctrl-c was pressed. Do you really want to exit? y/n ") + #if res == 'y': + sys.exit( 0 ) +signal.signal( signal.SIGINT, handler ) +i( 'Press ctrl+C to break...' ) +while True: + time.sleep(1) + From 69a5d49a47a8bc431d24eb2d4bd1650b29f8e9f1 Mon Sep 17 00:00:00 2001 From: Eran Date: Wed, 8 May 2024 12:36:10 +0300 Subject: [PATCH 11/15] add recommended-filters to stream-options docs --- third-party/realdds/doc/initialization.md | 43 +++++++++++++++-------- 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/third-party/realdds/doc/initialization.md b/third-party/realdds/doc/initialization.md index c8b467213f..1a2443b192 100644 --- a/third-party/realdds/doc/initialization.md +++ b/third-party/realdds/doc/initialization.md @@ -174,33 +174,48 @@ Information about a specific stream: float bias_variances[3]; } ``` - e.g.: - ```JSON - "intrinsics": { - "accel": [1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0], - "gyro": [1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0] - } - ``` -- `options` is an array of option objects, same as `device-options` above +- `options` is an array of option objects, same as `device-options` above; stream options are shown in the Viewer +- `recommended-filters` is an array of filter names to be enabled in the Viewer -Stream options are shown in the Viewer. +E.g.: + +```JSON +{ + "id": "stream-options", + "stream-name": "Motion", + "intrinsics": { + "accel": [1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0], + "gyro": [1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0] + }, + "options": [], + "recommended-filters": [] +} +``` #### Video Stream Intrinsics ```JSON { "id": "stream-options", + "stream-name": "Depth", "intrinsics": { "width": 1280, "height": 720, "principal-point": [640.2379150390625,357.3431396484375], "focal-length": [631.3428955078125,631.3428955078125] }, - "options": [ - ["Backlight Compensation",0.0,0.0,1.0,1.0,0.0,"Enable / disable backlight compensation"], - ["Brightness",0.0,-64.0,64.0,1.0,0.0,"UVC image brightness"], - ], - "stream-name": "Infrared 1" + "options": [], + "recommended-filters": [ + "Decimation Filter", + "HDR Merge", + "Filter By Sequence id", + "Threshold Filter", + "Depth to Disparity", + "Spatial Filter", + "Temporal Filter", + "Hole Filling Filter", + "Disparity to Depth" + ] } ``` From d63bb3584ae085cb0a2ff3b628549d8404860fe6 Mon Sep 17 00:00:00 2001 From: Eran Date: Thu, 2 May 2024 18:40:37 +0300 Subject: [PATCH 12/15] add topic-sink --image --- third-party/realdds/scripts/topic-sink.py | 26 +++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/third-party/realdds/scripts/topic-sink.py b/third-party/realdds/scripts/topic-sink.py index ba3df0514f..c3e5a46ac5 100644 --- a/third-party/realdds/scripts/topic-sink.py +++ b/third-party/realdds/scripts/topic-sink.py @@ -7,7 +7,8 @@ args.add_argument( '--debug', action='store_true', help='enable debug mode' ) args.add_argument( '--quiet', action='store_true', help='no output' ) args.add_argument( '--topic', metavar='', help='the topic on which to listen' ) -args.add_argument( '--blob', action='store_true', help='when set, listen for blobs instead of flexible messages' ) +args.add_argument( '--blob', action='store_true', help='when set, listen for blobs' ) +args.add_argument( '--image', action='store_true', help='when set, listen for images' ) def domain_arg(x): t = int(x) if t <= 0 or t > 232: @@ -57,6 +58,24 @@ def on_data_available( reader ): break i( f'-----> {msg}', ) got_something = True + reader.on_data_available( on_data_available ) + reader.run( dds.topic_reader.qos() ) + +elif args.image: + reader = dds.topic_reader( dds.message.image.create_topic( participant, topic_path )) + def on_data_available( reader ): + got_something = False + while True: + sample = dds.message.sample_info() + msg = dds.message.image.take_next( reader, sample ) + if not msg: + if not got_something: + raise RuntimeError( "expected message not received!" ) + break + i( f'-----> {msg}', ) + got_something = True + reader.on_data_available( on_data_available ) + reader.run( dds.topic_reader.qos( dds.reliability.best_effort, dds.durability.volatile ) ) else: reader = dds.topic_reader( dds.message.flexible.create_topic( participant, topic_path )) @@ -72,9 +91,8 @@ def on_data_available( reader ): break i( f'-----> {json.dumps( msg.json_data(), indent=4 )}', ) got_something = True - -reader.on_data_available( on_data_available ) -reader.run( dds.topic_reader.qos() ) + reader.on_data_available( on_data_available ) + reader.run( dds.topic_reader.qos() ) # Keep waiting until the user breaks... import signal From cf094a3007e26305d9c330a236a9f87a7582caf8 Mon Sep 17 00:00:00 2001 From: Eran Date: Thu, 9 May 2024 14:04:40 +0300 Subject: [PATCH 13/15] add dds_topic_reader::wait_for_writers() and stop() --- .../realdds/include/realdds/dds-topic-reader.h | 7 ++++++- third-party/realdds/py/pyrealdds.cpp | 4 ++++ third-party/realdds/src/dds-topic-reader.cpp | 17 +++++++++++++++++ 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/third-party/realdds/include/realdds/dds-topic-reader.h b/third-party/realdds/include/realdds/dds-topic-reader.h index ccaaf1be0f..5f48ef76a1 100644 --- a/third-party/realdds/include/realdds/dds-topic-reader.h +++ b/third-party/realdds/include/realdds/dds-topic-reader.h @@ -4,10 +4,12 @@ #include #include +#include "dds-defines.h" #include #include #include +#include namespace eprosima { @@ -41,7 +43,7 @@ class dds_topic_reader eprosima::fastdds::dds::DataReader * _reader = nullptr; - int _n_writers = 0; + std::atomic< int > _n_writers; public: dds_topic_reader( std::shared_ptr< dds_topic > const & topic ); @@ -89,6 +91,9 @@ class dds_topic_reader // The callbacks should be set before we actually create the underlying DDS objects, so the reader does not virtual void run( qos const & ); + // Waits until writers are detected; return false on timeout + bool wait_for_writers( dds_time timeout ); + // Go back to a pre-run() state, such that is_running() returns false virtual void stop(); diff --git a/third-party/realdds/py/pyrealdds.cpp b/third-party/realdds/py/pyrealdds.cpp index bc7531b55c..a4c8b8a4ac 100644 --- a/third-party/realdds/py/pyrealdds.cpp +++ b/third-party/realdds/py/pyrealdds.cpp @@ -408,6 +408,10 @@ PYBIND11_MODULE(NAME, m) { callback( self, status.total_count, status.total_count_change ); ) ) .def( "topic", &dds_topic_reader::topic ) .def( "run", &dds_topic_reader::run ) + .def( "wait_for_writers", &dds_topic_reader::wait_for_writers ) + .def( "stop", &dds_topic_reader::stop, + // GIL release needed: stop will wait and cause a hang if we're inside a callback + py::call_guard< py::gil_scoped_release >() ) .def( "qos", []() { return reader_qos(); } ) .def( "qos", []( reliability r, durability d ) { return reader_qos( r, d ); } ); diff --git a/third-party/realdds/src/dds-topic-reader.cpp b/third-party/realdds/src/dds-topic-reader.cpp index be267403f5..6dcaa96b1a 100644 --- a/third-party/realdds/src/dds-topic-reader.cpp +++ b/third-party/realdds/src/dds-topic-reader.cpp @@ -16,6 +16,7 @@ #include #include +#include #include @@ -32,6 +33,7 @@ dds_topic_reader::dds_topic_reader( std::shared_ptr< dds_topic > const & topic, std::shared_ptr< dds_subscriber > const & subscriber ) : _topic( topic ) , _subscriber( subscriber ) + , _n_writers( 0 ) { } @@ -113,6 +115,21 @@ void dds_topic_reader::run( qos const & rqos ) } +bool dds_topic_reader::wait_for_writers( dds_time timeout ) +{ + // Better to use on_subscription_matched, but that would require additional data members etc. + // For now, keep it simple: + rsutils::time::timer timer( std::chrono::nanoseconds( timeout.to_ns() ) ); + while( _n_writers.load() < 1 ) + { + if( timer.has_expired() ) + return false; + std::this_thread::sleep_for( std::chrono::milliseconds( 250 ) ); + } + return true; +} + + void dds_topic_reader::stop() { if( _subscriber ) From b0a3a09aa057e24c4bdae5a3559e3edf1ce93c9e Mon Sep 17 00:00:00 2001 From: Eran Date: Thu, 9 May 2024 14:08:58 +0300 Subject: [PATCH 14/15] topic-sink now listens to multiple topics --- third-party/realdds/scripts/topic-send.py | 4 +- third-party/realdds/scripts/topic-sink.py | 161 +++++++++++++++------- 2 files changed, 111 insertions(+), 54 deletions(-) diff --git a/third-party/realdds/scripts/topic-send.py b/third-party/realdds/scripts/topic-send.py index 6f9fb991e7..cbd988de71 100644 --- a/third-party/realdds/scripts/topic-send.py +++ b/third-party/realdds/scripts/topic-send.py @@ -78,7 +78,7 @@ def e( *a, **kw ): i( f'Writing {blob} on {topic_path} ...' ) start = dds.now() blob.write_to( writer ) - # We must wait for acks, since we use a flow controller and write_to() will return before we've + # We must wait for acks, since we use a flow controller and write_to() will return before we've # actually finished the send if not writer.wait_for_acks( dds.time( 5. ) ): # seconds e( 'Timeout waiting for ack' ) @@ -126,7 +126,7 @@ def e( *a, **kw ): e( 'Timeout waiting for ack' ) sys.exit( 1 ) i( f'Acknowledged' ) - # NOTE: if we don't wait for acks there's no guarrantee that the message is received; even if + # NOTE: if we don't wait for acks there's no guarrantee that the message is received; even if # all the packets are sent, they may need resending (reliable) but if we exit they won't be... i( f'After {dds.timestr( dds.now(), start )}' ) diff --git a/third-party/realdds/scripts/topic-sink.py b/third-party/realdds/scripts/topic-sink.py index c3e5a46ac5..f66de33644 100644 --- a/third-party/realdds/scripts/topic-sink.py +++ b/third-party/realdds/scripts/topic-sink.py @@ -6,18 +6,26 @@ args = ArgumentParser() args.add_argument( '--debug', action='store_true', help='enable debug mode' ) args.add_argument( '--quiet', action='store_true', help='no output' ) -args.add_argument( '--topic', metavar='', help='the topic on which to listen' ) -args.add_argument( '--blob', action='store_true', help='when set, listen for blobs' ) -args.add_argument( '--image', action='store_true', help='when set, listen for images' ) +args.add_argument( '--flexible', metavar='', action='append', help='the flexible (reliable) topic on which to listen' ) +args.add_argument( '--flexible-be', metavar='', action='append', help='the flexible (best-effort) topic on which to listen' ) +args.add_argument( '--blob', metavar='', action='append', help='the blob topic on which to listen' ) +args.add_argument( '--image', metavar='', action='append', help='the image topic on which to listen' ) def domain_arg(x): t = int(x) if t <= 0 or t > 232: raise ArgumentError( f'--domain should be [0-232]' ) return t args.add_argument( '--domain', metavar='<0-232>', type=domain_arg, default=-1, help='DDS domain to use (default=0)' ) +def time_arg(x): + t = float(x) + if t < 0.: + raise ValueError( f'--time should be >=0' ) + return t +args.add_argument( '--wait', metavar='', type=time_arg, default=5., help='seconds to wait for writers (default 5; 0=disable)' ) +args.add_argument( '--time', metavar='', type=time_arg, default=5., help='runtime before stopping, in seconds (default 0=forever)' ) +args.add_argument( '--not-ready', action='store_true', help='start output immediately, without waiting for all topics' ) args = args.parse_args() - if args.quiet: def i( *a, **kw ): pass @@ -29,11 +37,9 @@ def e( *a, **kw ): import sys -if not args.topic: - e( '--topic is required' ) +if not args.flexible and not args.flexible_be and not args.blob and not args.image: + e( '--flexible, --flexible-be, --blob, or --image required' ) sys.exit( 1 ) -topic_path = args.topic - import pyrealdds as dds import time @@ -45,63 +51,114 @@ def e( *a, **kw ): participant = dds.participant() participant.init( dds.load_rs_settings( settings ), args.domain ) -if args.blob: +readers = [] +ready = args.not_ready + +# BLOB +def on_blob_available( reader ): + if not ready: + return + got_something = False + while True: + sample = dds.message.sample_info() + msg = dds.message.blob.take_next( reader, sample ) + if not msg: + if not got_something: + raise RuntimeError( "expected message not received!" ) + break + i( f'{msg}', ) + got_something = True +for topic_path in args.blob or []: reader = dds.topic_reader( dds.message.blob.create_topic( participant, topic_path )) - def on_data_available( reader ): - got_something = False - while True: - sample = dds.message.sample_info() - msg = dds.message.blob.take_next( reader, sample ) - if not msg: - if not got_something: - raise RuntimeError( "expected message not received!" ) - break - i( f'-----> {msg}', ) - got_something = True - reader.on_data_available( on_data_available ) + reader.on_data_available( on_blob_available ) reader.run( dds.topic_reader.qos() ) + i( f'Waiting for {topic_path}' ) + if args.wait and not reader.wait_for_writers( dds.time( args.wait ) ): + e( f'Timeout waiting for writers on {topic_path}' ) + sys.exit( 1 ) + readers.append( reader ) -elif args.image: +# IMAGE +def on_image_available( reader ): + if not ready: + return + got_something = False + while True: + sample = dds.message.sample_info() + msg = dds.message.image.take_next( reader, sample ) + if not msg: + if not got_something: + raise RuntimeError( "expected message not received!" ) + break + i( f'{msg}', ) + got_something = True +for topic_path in args.image or []: reader = dds.topic_reader( dds.message.image.create_topic( participant, topic_path )) - def on_data_available( reader ): - got_something = False - while True: - sample = dds.message.sample_info() - msg = dds.message.image.take_next( reader, sample ) - if not msg: - if not got_something: - raise RuntimeError( "expected message not received!" ) - break - i( f'-----> {msg}', ) - got_something = True - reader.on_data_available( on_data_available ) + reader.on_data_available( on_image_available ) reader.run( dds.topic_reader.qos( dds.reliability.best_effort, dds.durability.volatile ) ) + i( f'Waiting for {topic_path}' ) + if args.wait and not reader.wait_for_writers( dds.time( args.wait ) ): + e( f'Timeout waiting for writers on {topic_path}' ) + sys.exit( 1 ) + readers.append( reader ) -else: +# FLEXIBLE +def on_flexible_available( reader ): + if not ready: + return + got_something = False + while True: + sample = dds.message.sample_info() + msg = dds.message.flexible.take_next( reader, sample ) + if not msg: + if not got_something: + raise RuntimeError( "expected message not received!" ) + break + i( f'{json.dumps( msg.json_data(), indent=4 )}', ) + got_something = True +for topic_path in args.flexible_be or []: reader = dds.topic_reader( dds.message.flexible.create_topic( participant, topic_path )) import json - def on_data_available( reader ): - got_something = False - while True: - sample = dds.message.sample_info() - msg = dds.message.flexible.take_next( reader, sample ) - if not msg: - if not got_something: - raise RuntimeError( "expected message not received!" ) - break - i( f'-----> {json.dumps( msg.json_data(), indent=4 )}', ) - got_something = True - reader.on_data_available( on_data_available ) + reader.on_data_available( on_flexible_available ) + reader.run( dds.topic_reader.qos( dds.reliability.best_effort, dds.durability.volatile ) ) + i( f'Waiting for {topic_path}' ) + if args.wait and not reader.wait_for_writers( dds.time( args.wait ) ): + e( f'Timeout waiting for writers on {topic_path}' ) + sys.exit( 1 ) + readers.append( reader ) +for topic_path in args.flexible or []: + reader = dds.topic_reader( dds.message.flexible.create_topic( participant, topic_path )) + import json + reader.on_data_available( on_flexible_available ) reader.run( dds.topic_reader.qos() ) + i( f'Waiting for {topic_path}' ) + if args.wait and not reader.wait_for_writers( dds.time( args.wait ) ): + e( f'Timeout waiting for writers on {topic_path}' ) + sys.exit( 1 ) + readers.append( reader ) # Keep waiting until the user breaks... +def stop(): + global readers, ready + ready = False + for reader in readers: + reader.stop() # must stop or we get hangs! + del readers + sys.exit( 0 ) + import signal def handler(signum, frame): #res = input("Ctrl-c was pressed. Do you really want to exit? y/n ") #if res == 'y': - sys.exit( 0 ) -signal.signal( signal.SIGINT, handler ) -i( 'Press ctrl+C to break...' ) -while True: - time.sleep(1) + stop() +signal.signal( signal.SIGINT, lambda signum, frame: stop() ) + +ready = True +if args.time: + time.sleep( args.time ) + stop() +else: + i( 'Press ctrl+C to break...' ) + while True: + time.sleep(1) From 63b1b5338537fb4bedfcf435577b7585dfc2103f Mon Sep 17 00:00:00 2001 From: Eran Date: Thu, 9 May 2024 14:11:24 +0300 Subject: [PATCH 15/15] add topic-reader.stop() in dds tests for stability --- unit-tests/dds/test-device-discovery.py | 1 + 1 file changed, 1 insertion(+) diff --git a/unit-tests/dds/test-device-discovery.py b/unit-tests/dds/test-device-discovery.py index 95c170999c..5246dc0e22 100644 --- a/unit-tests/dds/test-device-discovery.py +++ b/unit-tests/dds/test-device-discovery.py @@ -300,6 +300,7 @@ def __exit__( self, type, value, traceback ): del watcher + device_info.stop() del device_info del participant test.print_results_and_exit()