Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[20120] TCPSendResources cleanup (backport #4300) #4511

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/cpp/rtps/builtin/discovery/participant/PDP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
#include <fastrtps/types/TypeObjectFactory.h>
#include <fastrtps/types/DynamicPubSubType.h>

#include <fastdds/rtps/common/LocatorList.hpp>

#include <fastrtps/utils/TimeConversion.h>
#include <fastrtps/utils/IPLocator.h>
#include "fastrtps/utils/shared_mutex.hpp"
Expand Down Expand Up @@ -1104,6 +1106,21 @@ bool PDP::remove_remote_participant(

this->mp_mutex->lock();

// Delete from sender resource list (TCP only)
LocatorList_t remote_participant_locators;
for (auto& remote_participant_default_locator : pdata->default_locators.unicast)
{
remote_participant_locators.push_back(remote_participant_default_locator);
}
for (auto& remote_participant_metatraffic_locator : pdata->metatraffic_locators.unicast)
{
remote_participant_locators.push_back(remote_participant_metatraffic_locator);
}
if (!remote_participant_locators.empty())
{
mp_RTPSParticipant->update_removed_participant(remote_participant_locators);
}

// Return reader proxy objects to pool
for (auto pit : *pdata->m_readers)
{
Expand Down Expand Up @@ -1131,6 +1148,7 @@ bool PDP::remove_remote_participant(
participant_proxies_pool_.push_back(pdata);

this->mp_mutex->unlock();

return true;
}

Expand Down
21 changes: 20 additions & 1 deletion src/cpp/rtps/network/NetworkFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
#include <utility>

#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/common/LocatorList.hpp>
#include <fastdds/rtps/participant/RTPSParticipant.h>
#include <fastdds/rtps/transport/TransportDescriptorInterface.h>
#include <fastrtps/utils/IPFinder.h>
#include <fastrtps/utils/IPLocator.h>

#include <rtps/transport/UDPv4Transport.h>
#include <rtps/transport/TCPTransportInterface.h>

using namespace std;
using namespace eprosima::fastdds::rtps;
Expand Down Expand Up @@ -440,6 +441,24 @@ void NetworkFactory::update_network_interfaces()
}
}

void NetworkFactory::remove_participant_associated_send_resources(
SendResourceList& send_resource_list,
const LocatorList_t& remote_participant_locators,
const LocatorList_t& participant_initial_peers) const
{
for (auto& transport : mRegisteredTransports)
{
TCPTransportInterface* tcp_transport = dynamic_cast<TCPTransportInterface*>(transport.get());
if (tcp_transport)
{
tcp_transport->CloseOutputChannel(
send_resource_list,
remote_participant_locators,
participant_initial_peers);
}
}
}

} // namespace rtps
} // namespace fastrtps
} // namespace eprosima
13 changes: 13 additions & 0 deletions src/cpp/rtps/network/NetworkFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <memory>

#include <fastdds/rtps/common/Locator.h>
#include <fastdds/rtps/common/LocatorList.hpp>
#include <fastdds/rtps/common/LocatorSelector.hpp>
#include <fastdds/rtps/messages/MessageReceiver.h>
#include <fastdds/rtps/transport/SenderResource.h>
Expand Down Expand Up @@ -217,6 +218,18 @@ class NetworkFactory
*/
void update_network_interfaces();

/**
* Remove the given participants from the send resource list
*
* @param send_resource_list List of send resources associated to the local participant.
* @param remote_participant_locators List of locators associated to the remote participant.
* @param participant_initial_peers List of locators of the initial peers of the local participant.
*/
void remove_participant_associated_send_resources(
fastdds::rtps::SendResourceList& send_resource_list,
const LocatorList_t& remote_participant_locators,
const LocatorList_t& participant_initial_peers) const;

private:

std::vector<std::unique_ptr<fastdds::rtps::TransportInterface>> mRegisteredTransports;
Expand Down
15 changes: 15 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
#include <fastdds/rtps/writer/StatelessPersistentWriter.h>
#include <fastdds/rtps/writer/StatefulPersistentWriter.h>

#include <fastdds/rtps/common/LocatorList.hpp>

#include <fastrtps/utils/IPFinder.h>
#include <fastrtps/utils/Semaphore.h>
#include <fastrtps/xmlparser/XMLProfileManager.h>
Expand Down Expand Up @@ -2731,6 +2733,19 @@ bool RTPSParticipantImpl::should_match_local_endpoints(
return should_match_local_endpoints;
}

void RTPSParticipantImpl::update_removed_participant(
const LocatorList_t& remote_participant_locators)
{
if (!remote_participant_locators.empty())
{
std::lock_guard<std::timed_mutex> guard(m_send_resources_mutex_);
m_network_Factory.remove_participant_associated_send_resources(
send_resource_list_,
remote_participant_locators,
m_att.builtin.initialPeersList);
}
}

} /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */
9 changes: 9 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include <fastdds/rtps/builtin/data/ReaderProxyData.h>
#include <fastdds/rtps/builtin/data/WriterProxyData.h>
#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/common/LocatorList.hpp>
#include <fastdds/rtps/history/IChangePool.h>
#include <fastdds/rtps/history/IPayloadPool.h>
#include <fastdds/rtps/messages/MessageReceiver.h>
Expand Down Expand Up @@ -1133,6 +1134,14 @@ class RTPSParticipantImpl
return match_local_endpoints_;
}

/**
* Method called on participant removal with the set of locators associated to the participant.
*
* @param remote_participant_locators Set of locators associated to the participant removed.
*/
void update_removed_participant(
const LocatorList_t& remote_participant_locators);

};
} // namespace rtps
} /* namespace rtps */
Expand Down
6 changes: 5 additions & 1 deletion src/cpp/rtps/transport/TCPSenderResource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource
// Implementation functions are bound to the right transport parameters
clean_up = [this, &transport]()
{
<<<<<<< HEAD
transport.CloseOutputChannel(channel_);
=======
transport.SenderResourceHasBeenClosed(locator_);
>>>>>>> fe116500c (TCPSendResources cleanup (#4300))
};

send_lambda_ = [this, &transport](
Expand Down Expand Up @@ -68,7 +72,7 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource
}

static TCPSenderResource* cast(
TransportInterface& transport,
const TransportInterface& transport,
SenderResource* sender_resource)
{
TCPSenderResource* returned_resource = nullptr;
Expand Down
130 changes: 121 additions & 9 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,19 @@

#include <rtps/transport/TCPTransportInterface.h>

<<<<<<< HEAD
=======
#include <algorithm>
#include <cassert>
#include <chrono>
#include <cstring>
#include <map>
#include <set>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
>>>>>>> fe116500c (TCPSendResources cleanup (#4300))
#include <utility>
#include <cstring>
#include <algorithm>
Expand Down Expand Up @@ -224,6 +237,35 @@ void TCPTransportInterface::clean()
}
}

<<<<<<< HEAD
=======
Locator TCPTransportInterface::remote_endpoint_to_locator(
const std::shared_ptr<TCPChannelResource>& channel) const
{
Locator locator;
asio::error_code ec;
endpoint_to_locator(channel->remote_endpoint(ec), locator);
if (ec)
{
LOCATOR_INVALID(locator);
}
return locator;
}

Locator TCPTransportInterface::local_endpoint_to_locator(
const std::shared_ptr<TCPChannelResource>& channel) const
{
Locator locator;
asio::error_code ec;
endpoint_to_locator(channel->local_endpoint(ec), locator);
if (ec)
{
LOCATOR_INVALID(locator);
}
return locator;
}

>>>>>>> fe116500c (TCPSendResources cleanup (#4300))
void TCPTransportInterface::bind_socket(
std::shared_ptr<TCPChannelResource>& channel)
{
Expand Down Expand Up @@ -570,6 +612,7 @@ bool TCPTransportInterface::transform_remote_locator(
return true;
}

<<<<<<< HEAD
void TCPTransportInterface::CloseOutputChannel(
std::shared_ptr<TCPChannelResource>& channel)
{
Expand All @@ -579,6 +622,26 @@ void TCPTransportInterface::CloseOutputChannel(
auto channel_resource = channel_resources_.find(physical_locator);
assert(channel_resource != channel_resources_.end());
(void)channel_resource;
=======
void TCPTransportInterface::SenderResourceHasBeenClosed(
fastrtps::rtps::Locator_t& locator)
{
// The TCPSendResource associated channel cannot be removed from the channel_resources_ map. On transport's destruction
// this map is consulted to send the unbind requests. If not sending it, the other participant wouldn't disconnect the
// socket and keep a connection status of eEstablished. This would prevent new connect calls since it thinks it's already
// connected.
// If moving this unbind send with the respective channel disconnection to this point, the following problem arises:
// If receiving a SenderResourceHasBeenClosed call after receiving an unbinding message from a remote participant (our participant
// isn't disconnecting but we want to erase this send resource), the channel cannot be disconnected here since the listening thread has
// taken the read mutex (permanently waiting at read asio layer). This mutex is also needed to disconnect the socket (deadlock).
// Socket disconnection should always be done in the listening thread (or in the transport cleanup, when receiver resources have
// already been destroyed and the listening thread had consequently finished).
// An assert() clause finding the respective channel resource cannot be made since in LARGE DATA scenario, where the PDP discovery is done
// via UDP, a server's send resource can be created with without any associated channel resource until receiving a connection request from
// the client.
// The send resource locator is invalidated to prevent further use of associated channel.
LOCATOR_INVALID(locator);
>>>>>>> fe116500c (TCPSendResources cleanup (#4300))
}

bool TCPTransportInterface::CloseInputChannel(
Expand Down Expand Up @@ -1083,7 +1146,6 @@ bool TCPTransportInterface::Receive(
{
std::shared_ptr<RTCPMessageManager> rtcp_message_manager;
if (TCPChannelResource::eConnectionStatus::eDisconnected != channel->connection_status())

{
std::unique_lock<std::mutex> lock(rtcp_message_manager_mutex_);
rtcp_message_manager = rtcp_manager.lock();
Expand Down Expand Up @@ -1327,10 +1389,8 @@ void TCPTransportInterface::SocketAccepted(
channel_weak_ptr, rtcp_manager_weak_ptr));

EPROSIMA_LOG_INFO(RTCP, "Accepted connection (local: "
<< channel->local_endpoint().address() << ":"
<< channel->local_endpoint().port() << "), remote: "
<< channel->remote_endpoint().address() << ":"
<< channel->remote_endpoint().port() << ")");
<< local_endpoint_to_locator(channel) << ", remote: "
<< remote_endpoint_to_locator(channel) << ")");
}
else
{
Expand Down Expand Up @@ -1375,10 +1435,8 @@ void TCPTransportInterface::SecureSocketAccepted(
channel_weak_ptr, rtcp_manager_weak_ptr));

EPROSIMA_LOG_INFO(RTCP, " Accepted connection (local: "
<< socket->lowest_layer().local_endpoint().address() << ":"
<< socket->lowest_layer().local_endpoint().port() << "), remote: "
<< socket->lowest_layer().remote_endpoint().address() << ":"
<< socket->lowest_layer().remote_endpoint().port() << ")");
<< local_endpoint_to_locator(secure_channel) << ", remote: "
<< remote_endpoint_to_locator(secure_channel) << ")");
}
else
{
Expand Down Expand Up @@ -1727,6 +1785,60 @@ void TCPTransportInterface::fill_local_physical_port(
}
}

void TCPTransportInterface::CloseOutputChannel(
SendResourceList& send_resource_list,
const LocatorList& remote_participant_locators,
const LocatorList& participant_initial_peers) const
{
// Since send resources handle physical locators, we need to convert the remote participant locators to physical
std::set<Locator> remote_participant_physical_locators;
for (const Locator& remote_participant_locator : remote_participant_locators)
{
remote_participant_physical_locators.insert(IPLocator::toPhysicalLocator(remote_participant_locator));

// Also add the WANtoLANLocator ([0][WAN] address) if the remote locator is a WAN locator. In WAN scenario,
//initial peer can also work with the WANtoLANLocator of the remote participant.
if (IPLocator::hasWan(remote_participant_locator))
{
remote_participant_physical_locators.insert(IPLocator::toPhysicalLocator(IPLocator::WanToLanLocator(
remote_participant_locator)));
}
}

// Exlude initial peers.
for (const auto& initial_peer : participant_initial_peers)
{
if (std::find(remote_participant_physical_locators.begin(), remote_participant_physical_locators.end(),
IPLocator::toPhysicalLocator(initial_peer)) != remote_participant_physical_locators.end())
{
remote_participant_physical_locators.erase(IPLocator::toPhysicalLocator(initial_peer));
}
}

for (const auto& remote_participant_physical_locator : remote_participant_physical_locators)
{
if (!IsLocatorSupported(remote_participant_physical_locator))
{
continue;
}
// Remove send resources for the associated remote participant locator
for (auto it = send_resource_list.begin(); it != send_resource_list.end();)
{
TCPSenderResource* tcp_sender_resource = TCPSenderResource::cast(*this, it->get());

if (tcp_sender_resource)
{
if (tcp_sender_resource->locator() == remote_participant_physical_locator)
{
it = send_resource_list.erase(it);
continue;
}
}
++it;
}
}
}

} // namespace rtps
} // namespace fastrtps
} // namespace eprosima
Loading
Loading