Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
maloel committed Jan 16, 2024
1 parent 272f72e commit 43f27e1
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 50 deletions.
7 changes: 5 additions & 2 deletions third-party/realdds/src/dds-device-server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ dds_device_server::dds_device_server( std::shared_ptr< dds_participant > const &
, _topic_root( topic_root )
, _control_dispatcher( QUEUE_MAX_SIZE )
{
LOG_DEBUG( "device server created @ '" << _topic_root << "'" );
LOG_DEBUG( "[" << _topic_root << "] device server created" );
_control_dispatcher.start();
}

Expand All @@ -68,7 +68,7 @@ dds_guid const & dds_device_server::guid() const
dds_device_server::~dds_device_server()
{
_stream_name_to_server.clear();
LOG_DEBUG( "device server deleted @ '" << _topic_root << "'" );
LOG_DEBUG( "[" << _topic_root << "] device server deleted" );
}


Expand Down Expand Up @@ -237,9 +237,12 @@ void dds_device_server::broadcast( topics::device_info const & device_info )
{
if( _broadcaster )
DDS_THROW( runtime_error, "device server was already broadcast" );
if( ! _notification_server )
DDS_THROW( runtime_error, "not initialized" );
if( device_info.topic_root() != _topic_root )
DDS_THROW( runtime_error, "topic roots do not match" );
_broadcaster = std::make_shared< dds_device_broadcaster >( _publisher, device_info );
_notification_server->trigger_discovery_notifications();
}


Expand Down
15 changes: 10 additions & 5 deletions third-party/realdds/src/dds-device-watcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ dds_device_watcher::dds_device_watcher( std::shared_ptr< dds_participant > const
if( stopping )
{
// We marked last-seen; nothing else to do with it
if( is.alive )
LOG_DEBUG( "[" << is.alive->debug_name() << "] device (from " << _participant->print( guid ) << ") is stopping" );
else
continue;
}
else if( is.alive )
{
Expand All @@ -72,7 +76,7 @@ dds_device_watcher::dds_device_watcher( std::shared_ptr< dds_participant > const
{
// Old device coming back to life
is.writer_guid = guid;
LOG_DEBUG( "DDS device (from " << _participant->print( guid ) << ") back to life: " << j.dump( 4 ) );
LOG_DEBUG( "[" << is.alive->debug_name() << "] device (from " << _participant->print( guid ) << ") back to life: " << j.dump( 4 ) );
topics::device_info device_info = topics::device_info::from_json( j );
static_cast< dds_discovery_sink * >( is.alive.get() )->on_discovery_restored( device_info );
if( _on_device_added )
Expand All @@ -93,8 +97,6 @@ dds_device_watcher::dds_device_watcher( std::shared_ptr< dds_participant > const
if( stopping )
{
// This device is stopping for whatever reason (e.g., HW reset); remove it
LOG_DEBUG( "DDS device (from " << _participant->print( guid ) << ") is stopping: " << root );
// TODO notify the device?
remove_device( root );
continue;
}
Expand Down Expand Up @@ -142,8 +144,11 @@ dds_device_watcher::dds_device_watcher( std::shared_ptr< dds_participant > const
// This is OK, and is likely the broadcaster's writer itself; ignore
return;
}
LOG_DEBUG( "DDS device (from " << _participant->print( guid ) << ") disconnected: " << it->first );
remove_device( it->first );
if( auto device = it->second.alive )
{
LOG_DEBUG( "[" << device->debug_name() << "] device subscription lost (from " << _participant->print( guid ) << ")" );
remove_device( it->first );
}
}
} );

Expand Down
100 changes: 57 additions & 43 deletions unit-tests/dds/test-device-discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from rspy import log, test
import pyrealdds as dds
from time import sleep

with test.remote.fork( nested_indent=' S' ) as remote:
if remote is None: # we're the server fork
Expand All @@ -15,10 +16,11 @@
participant.init( 123, 'server' )

def create_device_info( props ):
di = dds.message.device_info()
di.serial = props.get( 'serial', str( participant.create_guid() ) )
di.name = props.get( 'name', f'device{di.serial}' )
di.topic_root = props.get( 'topic_root', f'path/to/{di.name}' )
global broadcasters, publisher
serial = props.setdefault( 'serial', str( participant.create_guid() ) )
props.setdefault( 'name', f'device{serial}' )
props.setdefault( 'topic-root', f'device{serial}' )
di = dds.message.device_info.from_json( props )
return di

def create_server( root ):
Expand Down Expand Up @@ -63,9 +65,8 @@ def on_device_info_available( reader ):
break
j = msg.json_data()
log.d( f'on_device_info_available {j}' )
di = dds.message.device_info.from_json( j )
global broadcast_devices
broadcast_devices.append( di )
broadcast_devices.append( j )
broadcast_received.set()
device_info.on_data_available( on_device_info_available )
device_info.run( dds.topic_reader.qos() )
Expand Down Expand Up @@ -167,7 +168,8 @@ def __exit__( self, type, value, traceback ):
remote.run( 'd1.broadcast( di1 )' )
test.check_equal( len(broadcast_devices), 1 )
test.check_equal( len(devices), 1 )
d1 = devices['path/to/device123'] # remember it -- we'll re-add it later and want to test it's the same!
d1 = devices[f'device123'] # remember it -- we'll re-add it later and want to test it's the same!
d1guid = d1.guid()

#############################################################################################
with test.closure( "Broadcast second device" ):
Expand All @@ -177,7 +179,7 @@ def __exit__( self, type, value, traceback ):
remote.run( 'd2.broadcast( di2 )' )
test.check_equal( len(broadcast_devices), 3 ) # each broadcast is of ALL the devices
test.check_equal( len(devices), 2 )
d2guid = devices[f'path/to/device456'].guid()
d2 = devices[f'device456'] # remember it -- we'll re-add it later and want to test it's the same!

#############################################################################################
with test.closure( "Add another client; expect rebroadcast of all" ):
Expand All @@ -195,62 +197,74 @@ def __exit__( self, type, value, traceback ):
test.check( watcher.is_device_broadcast( dev ) )

#############################################################################################
with test.closure( "Set one option to a non-default value" ):
option = next( o for o in d1.streams()[0].options() if o.get_name() == 'Custom Option' )
if test.check( option ):
test.check_equal( option.get_value(), 5. )
d1.set_option_value( option, 8. )
test.check_equal( option.stream().name(), 's1' )

#############################################################################################
with test.closure( "Remove both; this should stop the broadcaster thread" ):
with test.closure( "Disconnect one & remove the other" ):
with change_expected( n_removed=2 ):
remote.run( 'del d1' )
remote.run( 'd1.broadcast_disconnect( dds.time( 2. ) )' )
remote.run( 'del d2' )
test.check_equal( len(watcher.devices()), 0 )

#############################################################################################
with test.closure( "The devices should no longer be broadcasting" ):
with test.closure( "Both should go offline & not ready" ):
test.check_false( watcher.is_device_broadcast( d1 ) )
test.check_false( d1.is_online() )
test.check_false( d1.is_ready() )
test.check_false( watcher.is_device_broadcast( d2 ) )
test.check_false( d2.is_online() )
test.check_false( d2.is_ready() )

#############################################################################################
with test.closure( "Add one back, without a broadcast" ):
with test.closure( "Unbroadcast server still sends out init messages" ):
info = dds.message.device_info()
info.name = 'Test Device'
info.topic_root = 'device123'
dds.device( participant, info ).wait_until_ready() # Will cause a broadcast of init msgs

#############################################################################################
with test.closure( "Previous init should make the device ready (but still offline)" ):
test.check( d1.is_ready() )
test.check_false( d1.is_online() )

#############################################################################################
with test.closure( "Rebroadcast the disconnected device" ):
with change_expected( n_added=1 ):
remote.run( 'd1.broadcast( di1 )' )
test.check( watcher.is_device_broadcast( d1 ) )
test.check( d1.is_online() )
test.check( d1.is_ready() )

#############################################################################################
with test.closure( "It needs to reinitialize to get ready again" ):
d1.wait_until_ready() # NOTE: requires server to resend init messages on broadcast
test.check( d1.is_ready() )
test.check_equal( len(devices), 1 )
test.check_equal( devices['device123'].guid(), d1guid ) # Same device

#############################################################################################
with test.closure( "Add the other back, without a broadcast" ):
detect_broadcast()
detect_change()
remote.run( 'd1 = create_server( di1.topic_root )' )
remote.run( 'd2 = create_server( di2.topic_root )' )
#sleep( 2 ) # allow some time
test.check_equal( len(broadcast_devices), 0 )
test.check_equal( devices_added, 0 )
test.check_false( d2.is_online() )

#############################################################################################
with test.closure( "It should remain offline (not yet rediscovered)" ):
test.check_equal( len(devices), 0 )
test.check_equal( len(watcher.devices()), 0 )
test.check_false( d1.is_online() )
with test.closure( "It should get ready!" ):
d2.wait_until_ready()
test.check( d2.is_ready() )

#############################################################################################
with test.closure( "But it should get ready!" ):
d1.wait_until_ready()
test.check( d1.is_ready() )
test.check_false( d1.is_online() )
test.check_equal( len(devices), 0 )
test.check_false( watcher.is_device_broadcast( d1 ) )
with test.closure( "It should remain offline (not yet rediscovered)" ):
test.check_false( d2.is_online() )
test.check_false( watcher.is_device_broadcast( d2 ) )

#############################################################################################
with test.closure( "Now broadcast it; it should come online" ):
with change_expected( n_added=1 ):
remote.run( 'd1.broadcast( di1 )' )
test.check_equal( len(devices), 1 )
test.check( d1.is_online() )
test.check( watcher.is_device_broadcast( d1 ) )

#############################################################################################
with test.closure( "Check that the option value is the new value" ):
test.check_equal( option.get_value(), 8. ) # cached
test.check_false( option.stream() ) # no longer valid
d1.query_option_value( option )
new_option = next( o for o in d1.streams()[0].options() if o.get_name() == 'Custom Option' )
test.check_equal( new_option.get_value(), 8. ) # cached, but should get the current value on init...?
remote.run( 'd2.broadcast( di2 )' )
test.check( d2.is_online() )
test.check( watcher.is_device_broadcast( d2 ) )


del watcher
Expand Down

0 comments on commit 43f27e1

Please sign in to comment.