Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
maloel committed Jan 2, 2024
1 parent ce93e02 commit 838d463
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 51 deletions.
50 changes: 25 additions & 25 deletions third-party/realdds/py/pyrealdds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,17 @@ std::string script_name()
}


nlohmann::json load_rs_settings( nlohmann::json const & local_settings )
rsutils::json load_rs_settings( rsutils::json const & local_settings )
{
nlohmann::json config;
rsutils::json config;

// Load the realsense configuration file settings
std::ifstream f( rsutils::os::get_special_folder( rsutils::os::special_folder::app_data ) + "realsense-config.json" );
if( f.good() )
{
try
{
config = nlohmann::json::parse( f );
config = rsutils::json::parse( f );
}
catch( std::exception const & e )
{
Expand Down Expand Up @@ -201,7 +201,7 @@ PYBIND11_MODULE(NAME, m) {
( char const * topic_name, eprosima::fastrtps::types::DynamicType_ptr dyn_type ),
callback( topic_name, dyn_type->get_name() ); ) );

m.def( "load_rs_settings", &load_rs_settings, "local-settings"_a = nlohmann::json::object() );
m.def( "load_rs_settings", &load_rs_settings, "local-settings"_a = rsutils::json::object() );
m.def( "script_name", &script_name );

py::class_< dds_participant,
Expand All @@ -210,10 +210,10 @@ PYBIND11_MODULE(NAME, m) {
participant( m, "participant" );
participant.def( py::init<>() )
.def( "init",
[]( dds_participant & self, nlohmann::json const & local_settings, realdds::dds_domain_id domain_id )
[]( dds_participant & self, rsutils::json const & local_settings, realdds::dds_domain_id domain_id )
{ self.init( domain_id, script_name(), local_settings ); },
"local-settings"_a = nlohmann::json::object(), "domain-id"_a = -1 )
.def( "init", &dds_participant::init, "domain-id"_a, "participant-name"_a, "local-settings"_a = nlohmann::json::object() )
"local-settings"_a = rsutils::json::object(), "domain-id"_a = -1 )
.def( "init", &dds_participant::init, "domain-id"_a, "participant-name"_a, "local-settings"_a = rsutils::json::object() )
.def( "is_valid", &dds_participant::is_valid )
.def( "guid", &dds_participant::guid )
.def( "create_guid", &dds_participant::create_guid )
Expand Down Expand Up @@ -469,7 +469,7 @@ PYBIND11_MODULE(NAME, m) {
"json"_a,
"version"_a = 0 )
.def( py::init( []( std::string const & json_string ) {
return flexible_msg( flexible_msg::data_format::JSON, nlohmann::json::parse( json_string ) );
return flexible_msg( flexible_msg::data_format::JSON, rsutils::json::parse( json_string ) );
} ) )
.def_static( "create_topic", static_cast< flexible_msg_create_topic * >( &flexible_msg::create_topic ) )
.def_readwrite( "data_format", &flexible_msg::_data_format )
Expand Down Expand Up @@ -777,10 +777,10 @@ PYBIND11_MODULE(NAME, m) {
// def callback( json ):
// print( json['value'] ) # calls __getitem__
// json['value'] = { 'more': True } # calls __setitem__
struct json_ref { nlohmann::json & j; };
struct json_ref { rsutils::json & j; };
py::class_< json_ref, std::shared_ptr< json_ref > >( m, "json_ref" )
.def( "__getitem__", []( json_ref const & jr, std::string const & key ) { return jr.j.at( key ); } )
.def( "__setitem__", []( json_ref & jr, std::string const & key, nlohmann::json const & value ) { jr.j[key] = value; } );
.def( "__setitem__", []( json_ref & jr, std::string const & key, rsutils::json const & value ) { jr.j[key] = value; } );

using realdds::dds_device_server;
py::class_< dds_device_server, std::shared_ptr< dds_device_server > >( m, "device_server" )
Expand All @@ -797,14 +797,14 @@ PYBIND11_MODULE(NAME, m) {
} )
.def(
"publish_notification",
[]( dds_device_server & self, nlohmann::json const & j ) { self.publish_notification( j ); },
[]( dds_device_server & self, rsutils::json const & j ) { self.publish_notification( j ); },
py::call_guard< py::gil_scoped_release >() )
.def( "publish_metadata", &dds_device_server::publish_metadata, py::call_guard< py::gil_scoped_release >() )
.def( "broadcast", &dds_device_server::broadcast )
.def( FN_FWD_R( dds_device_server, on_control,
false,
(dds_device_server &, std::string const &, py::object &&, json_ref &&),
( std::string const & id, nlohmann::json const & control, nlohmann::json & reply ),
( std::string const & id, rsutils::json const & control, rsutils::json & reply ),
return callback( self, id, json_to_py( control ), json_ref{ reply } ); ) );

using realdds::dds_stream;
Expand Down Expand Up @@ -894,21 +894,21 @@ PYBIND11_MODULE(NAME, m) {
[]( dds_device & self, std::function< void( dds_device &, py::object && ) > callback )
{
return std::make_shared< subscription >( self.on_metadata_available(
[&self, callback]( std::shared_ptr< const nlohmann::json > const & pj )
[&self, callback]( std::shared_ptr< const rsutils::json > const & pj )
{ FN_FWD_CALL( dds_device, "on_metadata_available", callback( self, json_to_py( *pj ) ); ) } ) );
} )
.def( "on_device_log",
[]( dds_device & self, std::function< void( dds_device &, dds_nsec, char, std::string const &, py::object && ) > callback )
{
return std::make_shared< subscription >( self.on_device_log(
[&self, callback]( dds_nsec timestamp, char type, std::string const & text, nlohmann::json const & data )
[&self, callback]( dds_nsec timestamp, char type, std::string const & text, rsutils::json const & data )
{ FN_FWD_CALL( dds_device, "on_device_log", callback( self, timestamp, type, text, json_to_py( data ) ); ) } ) );
} )
.def( "on_notification",
[]( dds_device & self, std::function< void( dds_device &, std::string const &, py::object && ) > callback )
{
return std::make_shared< subscription >( self.on_notification(
[&self, callback]( std::string const & id, nlohmann::json const & data )
[&self, callback]( std::string const & id, rsutils::json const & data )
{ FN_FWD_CALL( dds_device, "on_notification", callback( self, id, json_to_py( data ) ); ) } ) );
} )
.def( "n_streams", &dds_device::number_of_streams )
Expand All @@ -929,9 +929,9 @@ PYBIND11_MODULE(NAME, m) {
.def( "query_option_value", &dds_device::query_option_value )
.def(
"send_control",
[]( dds_device & self, nlohmann::json const & j, bool wait_for_reply )
[]( dds_device & self, rsutils::json const & j, bool wait_for_reply )
{
nlohmann::json reply;
rsutils::json reply;
self.send_control( j, wait_for_reply ? &reply : nullptr );
return reply;
},
Expand Down Expand Up @@ -1041,18 +1041,18 @@ PYBIND11_MODULE(NAME, m) {
.def( py::init<>() )
.def( FN_FWD( dds_metadata_syncer,
on_frame_ready,
( dds_metadata_syncer::frame_type, nlohmann::json const & ),
( dds_metadata_syncer::frame_holder && fh, std::shared_ptr< const nlohmann::json > const & metadata ),
callback( self.get_frame( fh ), metadata ? *metadata : nlohmann::json() ); ) )
( dds_metadata_syncer::frame_type, rsutils::json const & ),
( dds_metadata_syncer::frame_holder && fh, std::shared_ptr< const rsutils::json > const & metadata ),
callback( self.get_frame( fh ), metadata ? *metadata : rsutils::json() ); ) )
.def( FN_FWD( dds_metadata_syncer,
on_metadata_dropped,
( dds_metadata_syncer::key_type, nlohmann::json const & ),
( dds_metadata_syncer::key_type key, std::shared_ptr< const nlohmann::json > const & metadata ),
callback( key, metadata ? *metadata : nlohmann::json() ); ) )
( dds_metadata_syncer::key_type, rsutils::json const & ),
( dds_metadata_syncer::key_type key, std::shared_ptr< const rsutils::json > const & metadata ),
callback( key, metadata ? *metadata : rsutils::json() ); ) )
.def( "enqueue_frame", &dds_metadata_syncer::enqueue_frame )
.def( "enqueue_metadata",
[]( dds_metadata_syncer & self, dds_metadata_syncer::key_type key, nlohmann::json const & j )
{ self.enqueue_metadata( key, std::make_shared< const nlohmann::json >( j ) ); } );
[]( dds_metadata_syncer & self, dds_metadata_syncer::key_type key, rsutils::json const & j )
{ self.enqueue_metadata( key, std::make_shared< const rsutils::json >( j ) ); } );
metadata_syncer.attr( "max_frame_queue_size" ) = dds_metadata_syncer::max_frame_queue_size;
metadata_syncer.attr( "max_md_queue_size" ) = dds_metadata_syncer::max_md_queue_size;
}
12 changes: 6 additions & 6 deletions third-party/realdds/src/dds-device-server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,21 +139,21 @@ static void on_discovery_stream_header( std::shared_ptr< dds_stream_server > con
else if( auto motion_stream = std::dynamic_pointer_cast< dds_motion_stream_server >( stream ) )
{
intrinsics = rsutils::json::object( {
{ "accel", motion_stream->get_accel_intrinsics().to_json() },
{ "gyro", motion_stream->get_gyro_intrinsics().to_json() }
{ "accel", motion_stream->get_accel_intrinsics().to_json().moved() },
{ "gyro", motion_stream->get_gyro_intrinsics().to_json().moved() }
} );
}

auto stream_filters = rsutils::json::array();
for( auto & filter : stream->recommended_filters() )
stream_filters.push_back( filter );
topics::flexible_msg stream_options_message( json {
topics::flexible_msg stream_options_message( json::object( {
{ id_key, "stream-options" },
{ "stream-name", stream->name() },
{ "options" , std::move( stream_options ) },
{ "intrinsics" , intrinsics },
{ "options", std::move( stream_options ) },
{ "intrinsics", intrinsics.moved() },
{ "recommended-filters", std::move( stream_filters ) },
} );
} ) );
json_string = slice( stream_options_message.custom_data< char const >(), stream_options_message._data.size() );
LOG_DEBUG( "-----> JSON = " << shorten_json_string( json_string, 300 ) << " size " << json_string.length() );
//LOG_DEBUG( "-----> CBOR size = " << json::to_cbor( stream_options_message.json_data() ).size() );
Expand Down
2 changes: 0 additions & 2 deletions third-party/realdds/src/dds-stream-sensor-bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
#include <algorithm>
#include <iostream>

using nlohmann::json;


namespace realdds {

Expand Down
3 changes: 3 additions & 0 deletions third-party/rsutils/include/rsutils/json.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ class json : public json_type
using json_type::operator=;


json_type && moved() { return std::move( *this ); }


// Returns true if the json has a certain key.
// Does not check the value at all, so it could be any type or null.
static bool has( json_type const & j, json_key const & key )
Expand Down
36 changes: 36 additions & 0 deletions third-party/rsutils/include/rsutils/py/pybind11.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,42 @@ constexpr auto json_to_py = pyjson::from_json;
constexpr auto py_to_json = pyjson::to_json;


// pybind11 caster specifically for rsutils::json
// This is a copy of the code for nlohmann, in pybind11_json.hpp above
#include <rsutils/json.h>
namespace pybind11 {
namespace detail {

template<>
struct type_caster< rsutils::json >
{
public:
PYBIND11_TYPE_CASTER( rsutils::json, _( "json" ) );

bool load( handle src, bool )
{
try
{
value = pyjson::to_json( src );
return true;
}
catch( ... )
{
return false;
}
}

static handle cast( rsutils::json src, return_value_policy /* policy */, handle /* parent */ )
{
object obj = pyjson::from_json( src );
return obj.release();
}
};

} // namespace detail
} // namespace pybind11


namespace py = pybind11;
using namespace pybind11::literals;

Expand Down
24 changes: 12 additions & 12 deletions tools/dds/dds-adapter/lrs-device-controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ using rsutils::string::hexarray;
#include <algorithm>
#include <iostream>

using nlohmann::json;
using rsutils::json;
using namespace realdds;
using tools::lrs_device_controller;

Expand Down Expand Up @@ -337,7 +337,7 @@ extrinsics_map get_extrinsics_map( const rs2::device & dev )
}


std::shared_ptr< dds_stream_profile > create_dds_stream_profile( std::string const & type_string, nlohmann::json const & j )
std::shared_ptr< dds_stream_profile > create_dds_stream_profile( std::string const & type_string, rsutils::json const & j )
{
if( "motion" == type_string )
return dds_stream_profile::from_json< dds_motion_stream_profile >( j );
Expand Down Expand Up @@ -503,7 +503,7 @@ lrs_device_controller::lrs_device_controller( rs2::device dev, std::shared_ptr<
return query_option( option );
} );
_dds_device_server->on_control(
[this]( std::string const & id, nlohmann::json const & control, nlohmann::json & reply )
[this]( std::string const & id, rsutils::json const & control, rsutils::json & reply )
{ return on_control( id, control, reply ); } );

_device_sn = _rs_dev.get_info( RS2_CAMERA_INFO_SERIAL_NUMBER );
Expand Down Expand Up @@ -599,7 +599,7 @@ lrs_device_controller::lrs_device_controller( rs2::device dev, std::shared_ptr<
_bridge.on_error(
[this]( std::string const & error_string )
{
nlohmann::json j = nlohmann::json::object( {
rsutils::json j = rsutils::json::object( {
{ "id", "error" },
{ "error", error_string },
} );
Expand All @@ -622,7 +622,7 @@ lrs_device_controller::~lrs_device_controller()
}


bool lrs_device_controller::on_open_streams( nlohmann::json const & control, nlohmann::json & reply )
bool lrs_device_controller::on_open_streams( rsutils::json const & control, rsutils::json & reply )
{
// Note that this function is called "start-streaming" but it's really a response to "open-streams" so does not
// actually start streaming. It simply sets and locks in which streams should be open when streaming starts.
Expand Down Expand Up @@ -666,23 +666,23 @@ void lrs_device_controller::publish_frame_metadata( const rs2::frame & f, realdd
if( ! _dds_device_server->has_metadata_readers() )
return;

nlohmann::json md_header = nlohmann::json::object( {
rsutils::json md_header = rsutils::json::object( {
{ "frame-number", f.get_frame_number() }, // communicated; up to client to pick up
{ "timestamp", timestamp.to_ns() }, // syncer key: needs to match the image timestamp, bit-for-bit!
{ "timestamp-domain", f.get_frame_timestamp_domain() } // needed if we're dealing with different domains!
} );
if( f.is< rs2::depth_frame >() )
md_header["depth-units"] = f.as< rs2::depth_frame >().get_units();

nlohmann::json metadata = nlohmann::json::object();
rsutils::json metadata = rsutils::json::object();
for( size_t i = 0; i < static_cast< size_t >( RS2_FRAME_METADATA_COUNT ); ++i )
{
rs2_frame_metadata_value val = static_cast< rs2_frame_metadata_value >( i );
if( f.supports_frame_metadata( val ) )
metadata[rs2_frame_metadata_to_string( val )] = f.get_frame_metadata( val );
}

nlohmann::json md_msg = nlohmann::json::object( {
rsutils::json md_msg = rsutils::json::object( {
{ "stream-name", stream_name_from_rs2( f.get_profile() ) },
{ "header", std::move( md_header ) },
{ "metadata", std::move( metadata ) },
Expand Down Expand Up @@ -838,9 +838,9 @@ size_t lrs_device_controller::get_index_of_profile( const realdds::dds_stream_pr
}


bool lrs_device_controller::on_control( std::string const & id, nlohmann::json const & control, nlohmann::json & reply )
bool lrs_device_controller::on_control( std::string const & id, rsutils::json const & control, rsutils::json & reply )
{
static std::map< std::string, bool ( lrs_device_controller::* )( nlohmann::json const &, nlohmann::json & ) > const
static std::map< std::string, bool ( lrs_device_controller::* )(rsutils::json const &, rsutils::json & ) > const
control_handlers{
{ "hw-reset", &lrs_device_controller::on_hardware_reset },
{ "open-streams", &lrs_device_controller::on_open_streams },
Expand All @@ -854,14 +854,14 @@ bool lrs_device_controller::on_control( std::string const & id, nlohmann::json c
}


bool lrs_device_controller::on_hardware_reset( nlohmann::json const & control, nlohmann::json & reply )
bool lrs_device_controller::on_hardware_reset( rsutils::json const & control, rsutils::json & reply )
{
_rs_dev.hardware_reset();
return true;
}


bool lrs_device_controller::on_hwm( nlohmann::json const & control, nlohmann::json & reply )
bool lrs_device_controller::on_hwm( rsutils::json const & control, rsutils::json & reply )
{
rs2::debug_protocol dp( _rs_dev );
if( ! dp )
Expand Down
12 changes: 6 additions & 6 deletions tools/dds/dds-adapter/lrs-device-controller.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
// License: Apache 2.0. See LICENSE file in root directory.
// Copyright(c) 2022 Intel Corporation. All Rights Reserved.

#pragma once

#include <librealsense2/rs.hpp> // Include RealSense Cross Platform API
#include <realdds/dds-stream-sensor-bridge.h>
#include <realdds/dds-stream-profile.h>
#include <nlohmann/json_fwd.hpp>

#include <rsutils/json-fwd.h>
#include <unordered_map>
#include <vector>

Expand Down Expand Up @@ -40,10 +40,10 @@ class lrs_device_controller

void publish_frame_metadata( const rs2::frame & f, realdds::dds_time const & );

bool on_control( std::string const & id, nlohmann::json const & control, nlohmann::json & reply );
bool on_hardware_reset( nlohmann::json const &, nlohmann::json & );
bool on_hwm( nlohmann::json const &, nlohmann::json & );
bool on_open_streams( nlohmann::json const &, nlohmann::json & );
bool on_control( std::string const & id, rsutils::json const & control, rsutils::json & reply );
bool on_hardware_reset( rsutils::json const &, rsutils::json & );
bool on_hwm( rsutils::json const &, rsutils::json & );
bool on_open_streams( rsutils::json const &, rsutils::json & );

void override_default_profiles( const std::map< std::string, realdds::dds_stream_profiles > & stream_name_to_profiles,
std::map< std::string, size_t > & stream_name_to_default_profile ) const;
Expand Down
15 changes: 15 additions & 0 deletions unit-tests/dds/test-control-reply.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,21 @@
device_info = dds.message.device_info()
device_info.topic_root = 'server/device'


with test.closure( 'Start the server participant' ):
participant = dds.participant()
participant.init( 123, 'server' )

with test.closure( 'Create the server' ):
device_info.name = 'Some device'
s1p1 = dds.video_stream_profile( 9, dds.video_encoding.rgb, 10, 10 )
s1profiles = [s1p1]
s1 = dds.color_stream_server( 's1', 'sensor' )
s1.init_profiles( s1profiles, 0 )
server = dds.device_server( participant, device_info.topic_root )
server.init( [s1], [], {} )


with test.remote.fork( nested_indent=None ) as remote:
if remote is None: # we're the fork

Expand Down

0 comments on commit 838d463

Please sign in to comment.