Skip to content

Commit

Permalink
fixup! device discovery tracking in -watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
maloel committed Jan 17, 2024
1 parent 641aa80 commit 195d77f
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 86 deletions.
20 changes: 12 additions & 8 deletions third-party/realdds/include/realdds/dds-device-watcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ class dds_topic_reader;
// Watches topics::DEVICE_INFO_TOPIC_NAME and sends out notifications of additions/removals.
//
// Devices are kept track of and an actual database of dds_device objects is maintained. Devices get created as soon as
// they're discovered and held as long as they're being broadcast. Only devices that are broadcast are managed.
// they're discovered and held as long as they're being broadcast. A device that loses discovery (e.g., is undergoing a
// HW reset) and is then discovered again will reuse the same dds_device object. The topic-root is how devices are
// distinguished.
//
class dds_device_watcher
{
Expand All @@ -38,7 +40,7 @@ class dds_device_watcher
dds_device_watcher( std::shared_ptr< dds_participant > const & );
~dds_device_watcher();

// The callback is called right after construction and right before deletion
// The callback is called whenever discovery is lost or gained
typedef std::function< void( std::shared_ptr< dds_device > const & ) > on_device_change_callback;

void on_device_added( on_device_change_callback callback ) { _on_device_added = std::move( callback ); }
Expand All @@ -48,8 +50,8 @@ class dds_device_watcher
void stop();
bool is_stopped() const;

// Iterate over devices until the callback returns false; returns true if all devices were iterated over and this
// never happened.
// Iterate over discovered devices until the callback returns false; returns true if all devices were iterated over
// and this never happened.
//
bool foreach_device( std::function< bool( std::shared_ptr< dds_device > const & ) > ) const;

Expand All @@ -62,8 +64,7 @@ class dds_device_watcher
bool is_device_broadcast( std::shared_ptr< dds_device > const & ) const;

private:
void init();
void remove_device( std::string const & root );
void init();

std::shared_ptr< dds_participant > _participant;
std::shared_ptr< dds_topic_reader > _device_info_topic;
Expand All @@ -74,12 +75,15 @@ class dds_device_watcher
struct device_liveliness
{
std::shared_ptr< dds_device > alive; // reset in remove_device()
std::weak_ptr< dds_device > in_use; // kept around to detect if it's still being used, to recreate alive
std::weak_ptr< dds_device > in_use; // when !alive, to detect if it's still being used
dds_guid writer_guid;
dds_time last_seen;
};

void device_discovery_lost( device_liveliness &, std::lock_guard< std::mutex > & lock );

using liveliness_map = std::map< std::string /*root*/, device_liveliness >;
liveliness_map _root_liveliness;
liveliness_map _device_by_root;
mutable std::mutex _devices_mutex;
};

Expand Down
145 changes: 67 additions & 78 deletions third-party/realdds/src/dds-device-watcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,38 +50,41 @@ dds_device_watcher::dds_device_watcher( std::shared_ptr< dds_participant > const
LOG_DEBUG( "device-info from " << _participant->print( guid ) << " is missing a topic-root; ignoring: " << j );
continue;
}
bool const stopping = j.nested( "stopping", &json::is_boolean ).default_value( false );

{
std::lock_guard< std::mutex > lock( _devices_mutex );
auto it = _root_liveliness.find( root );
if( it != _root_liveliness.end() )
auto it = _device_by_root.find( root );
if( it != _device_by_root.end() )
{
auto & is = it->second;
is.last_seen = now();
if( stopping )
auto & device = it->second;
device.last_seen = now();
if( j.nested( "stopping", &json::is_boolean ).default_value( false ) )
{
// We marked last-seen; nothing else to do with it
if( is.alive )
LOG_DEBUG( "[" << is.alive->debug_name() << "] device (from " << _participant->print( guid ) << ") is stopping" );
else
continue;
// This device is stopping for whatever reason (e.g., HW reset); remove it
if( device.alive )
{
LOG_DEBUG( "[" << device.alive->debug_name() << "] device (from " << _participant->print( guid ) << ") is stopping" );
device_discovery_lost( device, lock );
}
continue;
}
else if( is.alive )
else if( device.alive )
{
// We already know about this device; likely this was a broadcast meant for someone else
continue;
}
else if( is.alive = is.in_use.lock() )
else if( device.alive = device.in_use.lock() )
{
// Old device coming back to life
is.writer_guid = guid;
LOG_DEBUG( "[" << is.alive->debug_name() << "] device (from " << _participant->print( guid ) << ") back to life: " << j.dump( 4 ) );
device.writer_guid = guid;
LOG_DEBUG( "[" << device.alive->debug_name() << "] device (from "
<< _participant->print( guid ) << ") back to life: " << j.dump( 4 ) );
topics::device_info device_info = topics::device_info::from_json( j );
static_cast< dds_discovery_sink * >( is.alive.get() )->on_discovery_restored( device_info );
static_cast< dds_discovery_sink * >( device.alive.get() )
->on_discovery_restored( device_info );
if( _on_device_added )
{
std::thread( [device = is.alive, on_device_added = _on_device_added]()
std::thread( [device = device.alive, on_device_added = _on_device_added]()
{ on_device_added( device ); } )
.detach();
}
Expand All @@ -94,32 +97,25 @@ dds_device_watcher::dds_device_watcher( std::shared_ptr< dds_participant > const
}
}

if( stopping )
{
// This device is stopping for whatever reason (e.g., HW reset); remove it
remove_device( root );
continue;
}

LOG_DEBUG( "DDS device (from " << _participant->print( guid ) << ") detected: " << j.dump( 4 ) );
topics::device_info device_info = topics::device_info::from_json( j );

// Add a new device record into our dds devices map
std::shared_ptr< dds_device > device = std::make_shared< dds_device >( _participant, device_info );
std::shared_ptr< dds_device > new_device = std::make_shared< dds_device >( _participant, device_info );
{
std::lock_guard< std::mutex > lock( _devices_mutex );
auto & is = _root_liveliness[root];
is.alive = device;
is.writer_guid = guid;
is.last_seen = now();
auto & device = _device_by_root[root];
device.alive = new_device;
device.writer_guid = guid;
device.last_seen = now();
}

// NOTE: device removals are handled via the writer-removed notification; see on_subscription_matched() below
if( _on_device_added )
{
std::thread(
[device, on_device_added = _on_device_added]() { //
on_device_added( device );
[new_device, on_device_added = _on_device_added]() { //
on_device_added( new_device );
} )
.detach();
}
Expand All @@ -133,21 +129,21 @@ dds_device_watcher::dds_device_watcher( std::shared_ptr< dds_participant > const
{
dds_guid const guid
= status.last_publication_handle.operator const eprosima::fastrtps::rtps::GUID_t &();
liveliness_map::const_iterator it;
{
std::lock_guard< std::mutex > lock( _devices_mutex );
it = std::find_if( _root_liveliness.begin(),
_root_liveliness.end(),
[guid]( liveliness_map::value_type const & it )
{ return it.second.writer_guid == guid; } );
if( it == _root_liveliness.end() )
// This is OK, and is likely the broadcaster's writer itself; ignore
return;
}
if( auto device = it->second.alive )

std::lock_guard< std::mutex > lock( _devices_mutex );
auto it = std::find_if( _device_by_root.begin(),
_device_by_root.end(),
[guid]( liveliness_map::value_type const & it )
{ return it.second.writer_guid == guid; } );
// OK if not found: it's likely the broadcaster's writer itself; ignore
if( it != _device_by_root.end() )
{
LOG_DEBUG( "[" << device->debug_name() << "] device subscription lost (from " << _participant->print( guid ) << ")" );
remove_device( it->first );
auto & device = it->second;
if( device.alive )
{
LOG_DEBUG( "[" << device.alive->debug_name() << "] device subscription lost (from " << _participant->print( guid ) << ")" );
device_discovery_lost( device, lock );
}
}
}
} );
Expand Down Expand Up @@ -196,47 +192,40 @@ void dds_device_watcher::init()
}


void dds_device_watcher::remove_device( std::string const & root )
void dds_device_watcher::device_discovery_lost( device_liveliness & device, std::lock_guard< std::mutex > & )
{
std::shared_ptr< dds_device > device;
if( auto disconnected_device = device.alive )
{
std::lock_guard< std::mutex > lock( _devices_mutex );
auto it = _root_liveliness.find( root );
if( it == _root_liveliness.end() )
return;
auto & is = it->second;
device = is.alive;
if( ! device )
return;
static_cast< dds_discovery_sink * >( device.get() )->on_discovery_lost();
is.in_use = is.alive;
is.alive.reset(); // no longer alive; in_use will track whether it's being used
device.in_use = disconnected_device;
device.alive.reset(); // no longer alive; in_use will track whether it's being used
static_cast< dds_discovery_sink * >( disconnected_device.get() )->on_discovery_lost();

// rest must happen outside the mutex
std::thread(
[disconnected_device, on_device_removed = _on_device_removed]
{
if( on_device_removed )
on_device_removed( disconnected_device );
// If we're holding the device, it will get destroyed here, from another thread.
// Not sure why, but if we delete outside this thread (in the listener callback), it
// will cause some sort of invalid state in DDS. The thread will get killed and we won't get
// any notification of the remote participant getting removed... and the process will even
// hang on exit.
} )
.detach();
}
// rest must happen outside the mutex
std::thread(
[device, on_device_removed = _on_device_removed]()
{
if( on_device_removed )
on_device_removed( device );
// If we're holding the device, it will get destroyed here, from another thread.
// Not sure why, but if we delete the outside this thread (in the listener callback), it
// will cause some sort of invalid state in DDS. The thread will get killed and we won't get
// any notification of the remote participant getting removed... and the process will even
// hang on exit.
} )
.detach();
}


bool dds_device_watcher::foreach_device(
std::function< bool( std::shared_ptr< dds_device > const & ) > fn ) const
{
std::lock_guard< std::mutex > lock( _devices_mutex );
for( auto & root_liveliness : _root_liveliness )
for( auto & root_device : _device_by_root )
{
auto & is = root_liveliness.second;
if( is.alive )
if( ! fn( is.alive ) )
auto & is = root_device.second;
if( auto device = is.alive )
if( ! fn( device ) )
return false;
}
return true;
Expand All @@ -247,9 +236,9 @@ bool dds_device_watcher::is_device_broadcast( std::shared_ptr< dds_device > cons
{
auto & root = dev->device_info().topic_root();
std::lock_guard< std::mutex > lock( _devices_mutex );
auto it = _root_liveliness.find( root );
if( it == _root_liveliness.end() )
auto it = _device_by_root.find( root );
if( it == _device_by_root.end() )
return false;
return ! ! it->second.alive;
return !! it->second.alive;
}

0 comments on commit 195d77f

Please sign in to comment.