Skip to content

Commit

Permalink
Fix Locator Memory Leak in RTPS/UDP Transport
Browse files Browse the repository at this point in the history
Locators were not being cleaned up correctly because there were two
functions for cleaning up after remotes in `RtpsUdpDataLink`. These
changes merge those two functions.

Also:
- Don't create locator entries for unknown remotes in
  `RtpsUdpDataLink::update_locators`
- Change `RemoteInfo::ref_count_` from `size_t` to a `DDS::UInt32` to
  save a bit of space in 64-bit builds. If the RtpsRelay SEDP transport
  is serving 10,000 participants with 12 `RemoteInfo` for each remote
  builtin entity, then this saves almost half a megabyte of memory.
- Methods for getting GUIDs from `DiscoveredParticipant` to simplify
  code
- Remove an unused function in Sedp
  • Loading branch information
iguessthislldo committed Jul 18, 2024
1 parent 9244c3e commit 2df190c
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 122 deletions.
18 changes: 16 additions & 2 deletions dds/DCPS/RTPS/DiscoveredEntities.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,7 @@ struct DiscoveredParticipant {
, participant_tokens_sent_(false)
#endif
{
const DCPS::GUID_t guid = DCPS::make_part_guid(p.participantProxy.guidPrefix);
assign(location_data_.guid, guid);
assign(location_data_.guid, make_part_guid());
location_data_.location = 0;
location_data_.change_mask = 0;
location_data_.local_timestamp.sec = 0;
Expand Down Expand Up @@ -206,6 +205,21 @@ struct DiscoveredParticipant {
return pdata_.dataKind == Security::DPDK_ENHANCED || pdata_.dataKind == Security::DPDK_SECURE;
}
#endif

const DCPS::GuidPrefix_t& prefix() const
{
return pdata_.participantProxy.guidPrefix;
}

DCPS::GUID_t make_guid(const DCPS::EntityId_t& entity) const
{
return DCPS::make_id(prefix(), entity);
}

DCPS::GUID_t make_part_guid() const
{
return DCPS::make_part_guid(prefix());
}
};

struct DiscoveredSubscription : DCPS::PoolAllocationBase {
Expand Down
134 changes: 65 additions & 69 deletions dds/DCPS/RTPS/Sedp.cpp

Large diffs are not rendered by default.

14 changes: 8 additions & 6 deletions dds/DCPS/transport/framework/DataLink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -509,8 +509,8 @@ DataLink::peer_ids(const GUID_t& local_id) const
/// with a simultaneous call (in another thread) to one of this
/// DataLink's make_reservation() methods.
void
DataLink::release_reservations(GUID_t remote_id, GUID_t local_id,
DataLinkSetMap& released_locals)
DataLink::release_reservations(const GUID_t& remote_id, const GUID_t& local_id,
DataLinkSetMap* released_locals)
{
DBG_ENTRY_LVL("DataLink", "release_reservations", 6);

Expand Down Expand Up @@ -550,11 +550,13 @@ DataLink::release_reservations(GUID_t remote_id, GUID_t local_id,
}
RepoIdSet& ris = assoc_by_local_[local_id].associated_;
if (ris.size() == 1) {
DataLinkSet_rch& links = released_locals[local_id];
if (links.is_nil()) {
links = make_rch<DataLinkSet>();
if (released_locals) {
DataLinkSet_rch& links = (*released_locals)[local_id];
if (links.is_nil()) {
links = make_rch<DataLinkSet>();
}
links->insert_link(rchandle_from(this));
}
links->insert_link(rchandle_from(this));
assoc_by_local_.erase(local_id);
} else {
ris.erase(remote_id);
Expand Down
6 changes: 3 additions & 3 deletions dds/DCPS/transport/framework/DataLink.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ class OpenDDS_Dcps_Export DataLink
/// make_reservation() methods. All we know is that the supplied
/// GUID_t is considered to be a remote id. It could be a
/// remote subscriber or a remote publisher.
void release_reservations(GUID_t remote_id,
GUID_t local_id,
DataLinkSetMap& released_locals);
void release_reservations(const GUID_t& remote_id,
const GUID_t& local_id,
DataLinkSetMap* released_locals = 0);

void schedule_delayed_release();

Expand Down
2 changes: 1 addition & 1 deletion dds/DCPS/transport/framework/TransportClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ TransportClient::disassociate(const GUID_t& peerId)
}

OPENDDS_ASSERT(guid_ != GUID_UNKNOWN);
link->release_reservations(peerId, guid_, released);
link->release_reservations(peerId, guid_, &released);

if (!released.empty()) {

Expand Down
84 changes: 49 additions & 35 deletions dds/DCPS/transport/rtps_udp/RtpsUdpDataLink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@

namespace {

bool compare_and_update_counts(CORBA::Long incoming, CORBA::Long& existing) {
bool compare_and_update_counts(CORBA::Long incoming, CORBA::Long& existing)
{
static const CORBA::Long ONE_QUARTER_MAX_POSITIVE = 0x20000000;
static const CORBA::Long THREE_QUARTER_MAX_POSITIVE = 0x60000000;
if (incoming <= existing &&
Expand Down Expand Up @@ -485,24 +486,41 @@ RtpsUdpDataLink::update_locators(const GUID_t& remote_id,
bool requires_inline_qos,
bool add_ref)
{
if (unicast_addresses.empty() && multicast_addresses.empty()) {
if (DCPS_debug_level > 0) {
ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: RtpsUdpDataLink::update_locators: no addresses for %C\n"), LogGuid(remote_id).c_str()));
}
const bool log_error = log_level >= LogLevel::Error;
if (log_error && unicast_addresses.empty() && multicast_addresses.empty()) {
ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: RtpsUdpDataLink::update_locators: "
"no addresses for %C\n", LogGuid(remote_id).c_str()));
}

remove_locator_and_bundling_cache(remote_id);

ACE_GUARD(ACE_Thread_Mutex, g, locators_lock_);

RemoteInfo& info = locators_[remote_id];
const bool log_unicast_change = DCPS_debug_level > 3 && info.unicast_addrs_ != unicast_addresses;
const bool log_multicast_change = DCPS_debug_level > 3 && info.multicast_addrs_ != multicast_addresses;
info.unicast_addrs_.swap(unicast_addresses);
info.multicast_addrs_.swap(multicast_addresses);
info.requires_inline_qos_ = requires_inline_qos;
RemoteInfo* info = 0;
if (add_ref) {
++info.ref_count_;
info = &locators_[remote_id];
} else {
RemoteInfoMap::iterator it = locators_.find(remote_id);
locators_.find(remote_id);
if (it == locators_.end()) {
if (log_error) {
g.release();
ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: RtpsUdpDataLink::update_locators: "
"no existing locators to update for %C\n", LogGuid(remote_id).c_str()));
}
return;
}
info = &it->second;
}

const bool log = DCPS_debug_level >= 4;
const bool log_unicast_change = log && info->unicast_addrs_ != unicast_addresses;
const bool log_multicast_change = log && info->multicast_addrs_ != multicast_addresses;
info->unicast_addrs_.swap(unicast_addresses);
info->multicast_addrs_.swap(multicast_addresses);
info->requires_inline_qos_ = requires_inline_qos;
if (add_ref) {
++info->ref_count_;
}

g.release();
Expand Down Expand Up @@ -653,29 +671,6 @@ RtpsUdpDataLink::associated(const GUID_t& local_id, const GUID_t& remote_id,
return associated;
}

void
RtpsUdpDataLink::disassociated(const GUID_t& local_id,
const GUID_t& remote_id)
{
release_reservations_i(remote_id, local_id);
remove_locator_and_bundling_cache(remote_id);
sq_.ignore_remote(remote_id);

ACE_GUARD(ACE_Thread_Mutex, g, locators_lock_);

RemoteInfoMap::iterator pos = locators_.find(remote_id);
if (pos != locators_.end()) {
OPENDDS_ASSERT(pos->second.ref_count_ > 0);

--pos->second.ref_count_;
if (pos->second.ref_count_ == 0) {
locators_.erase(pos);
}
} else if (Transport_debug_level > 3) {
ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::disassociated: local id %C does not have any locators\n", LogGuid(local_id).c_str()));
}
}

void
RtpsUdpDataLink::register_for_reader(const GUID_t& writerid,
const GUID_t& readerid,
Expand Down Expand Up @@ -956,6 +951,25 @@ RtpsUdpDataLink::release_reservations_i(const GUID_t& remote_id,

remove_locator_and_bundling_cache(remote_id);

{
ACE_GUARD(ACE_Thread_Mutex, g, locators_lock_);

RemoteInfoMap::iterator pos = locators_.find(remote_id);
if (pos != locators_.end()) {
OPENDDS_ASSERT(pos->second.ref_count_ > 0);

--pos->second.ref_count_;
if (pos->second.ref_count_ == 0) {
locators_.erase(pos);
}
} else if (Transport_debug_level >= 4) {
ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::disassociated: "
"remote id %C does not have any locators\n", String(conv).c_str()));
}
}

sq_.ignore_remote(remote_id);

for (TqeVector::iterator drop_it = to_drop.begin(); drop_it != to_drop.end(); ++drop_it) {
(*drop_it)->data_dropped(true);
}
Expand Down
4 changes: 1 addition & 3 deletions dds/DCPS/transport/rtps_udp/RtpsUdpDataLink.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,6 @@ class OpenDDS_Rtps_Udp_Export RtpsUdpDataLink
const NetworkAddress& last_addr_hint,
bool requires_inline_qos);

void disassociated(const GUID_t& local, const GUID_t& remote);

void register_for_reader(const GUID_t& writerid,
const GUID_t& readerid,
const NetworkAddressSet& addresses,
Expand Down Expand Up @@ -286,7 +284,7 @@ class OpenDDS_Rtps_Udp_Export RtpsUdpDataLink
bool requires_inline_qos_;
NetworkAddress last_recv_addr_;
MonotonicTimePoint last_recv_time_;
size_t ref_count_;
DDS::UInt32 ref_count_;
bool insert_recv_addr(NetworkAddressSet& aset) const;
};

Expand Down
2 changes: 1 addition & 1 deletion dds/DCPS/transport/rtps_udp/RtpsUdpTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ RtpsUdpTransport::stop_accepting_or_connecting(const TransportClient_wrch& clien
if (link_) {
TransportClient_rch c = client.lock();
if (c) {
link_->disassociated(c->get_guid(), remote_id);
link_->release_reservations(c->get_guid(), remote_id);
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions docs/news.d/rtps-udp-memory-leaks.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
.. news-prs: 4731
.. news-start-section: Fixes
- Fixed memory leak of remote locators in the RTPS/UDP transport.
.. news-end-section
3 changes: 1 addition & 2 deletions tools/rtpsrelay/GuidAddrSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ GuidAddrSet::record_activity(const AddrPort& remote_address,
auto result = remote_map_.insert(std::make_pair(remote, src_guid));
if (result.second) {
relay_stats_reporter_.remote_map_size(static_cast<uint32_t>(remote_map_.size()), now);
}
if (!result.second && result.first->second != src_guid) {
} else if (result.first->second != src_guid) {
if (config_.log_activity()) {
ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) INFO: GuidAddrSet::record_activity change detected %C -> %C\n"),
guid_to_string(result.first->second).c_str(),
Expand Down

0 comments on commit 2df190c

Please sign in to comment.