Skip to content

Commit

Permalink
Create InitialConnection for TCP initial peers 2.13 & 2.10 & 2.6 (#4947)
Browse files Browse the repository at this point in the history
* Refs #20650: Add test

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20650: Create initial connect for initial peers

Signed-off-by: cferreiragonz <[email protected]>

---------

Signed-off-by: cferreiragonz <[email protected]>
(cherry picked from commit a0a4fee)

# Conflicts:
#	test/blackbox/common/BlackboxTestsTransportTCP.cpp
  • Loading branch information
cferreiragonz authored and mergify[bot] committed Jun 27, 2024
1 parent 01ab5cd commit c477680
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 3 deletions.
12 changes: 11 additions & 1 deletion src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,17 @@ bool PDPSimple::create_dcps_participant_endpoints()

WriterAttributes watt = create_builtin_writer_attributes();
watt.endpoint.reliabilityKind = BEST_EFFORT;
watt.endpoint.remoteLocatorList = m_discovery.initialPeersList;
if (!m_discovery.initialPeersList.empty())
{
if (mp_RTPSParticipant->has_tcp_transports())
{
mp_RTPSParticipant->create_tcp_connections(m_discovery.initialPeersList);
}
else
{
watt.endpoint.remoteLocatorList = m_discovery.initialPeersList;
}
}

if (mp_RTPSParticipant->getRTPSParticipantAttributes().throughputController.bytesPerPeriod != UINT32_MAX &&
mp_RTPSParticipant->getRTPSParticipantAttributes().throughputController.periodMillisecs != 0)
Expand Down
172 changes: 170 additions & 2 deletions test/blackbox/common/BlackboxTestsTransportTCP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ TEST(TransportTCP, Client_reconnection)
delete requester;
}

// Test copy constructor and copy assignment for TCPv4
// Test zero listening port for TCPv4
TEST_P(TransportTCP, TCPv4_autofill_port)
{
PubSubReader<HelloWorldPubSubType> p1(TEST_TOPIC_NAME);
Expand Down Expand Up @@ -637,7 +637,7 @@ TEST_P(TransportTCP, TCPv4_autofill_port)
EXPECT_TRUE(IPLocator::getPhysicalPort(p2_locators.begin()[0]) == port);
}

// Test copy constructor and copy assignment for TCPv6
// Test zero listening port for TCPv6
TEST_P(TransportTCP, TCPv6_autofill_port)
{
PubSubReader<HelloWorldPubSubType> p1(TEST_TOPIC_NAME);
Expand Down Expand Up @@ -1188,6 +1188,174 @@ TEST_P(TransportTCP, send_resource_cleanup_initial_peer)
client->wait_discovery(2, std::chrono::seconds(0));
}

<<<<<<< HEAD
=======
// Test TCP transport on large message with best effort reliability
TEST_P(TransportTCP, large_message_send_receive)
{
// Prepare data to be sent before participants discovery so it is ready to be sent as soon as possible.
std::list<Data1mb> data;
data = default_data300kb_data_generator(1);

uint16_t writer_port = global_port;

/* Test configuration */
PubSubReader<Data1mbPubSubType> reader(TEST_TOPIC_NAME);
PubSubWriter<Data1mbPubSubType> writer(TEST_TOPIC_NAME);

std::shared_ptr<TCPTransportDescriptor> writer_transport;
std::shared_ptr<TCPTransportDescriptor> reader_transport;
Locator_t initialPeerLocator;
if (use_ipv6)
{
reader_transport = std::make_shared<eprosima::fastdds::rtps::TCPv6TransportDescriptor>();
writer_transport = std::make_shared<eprosima::fastdds::rtps::TCPv6TransportDescriptor>();
initialPeerLocator.kind = LOCATOR_KIND_TCPv6;
IPLocator::setIPv6(initialPeerLocator, "::1");
}
else
{
reader_transport = std::make_shared<eprosima::fastdds::rtps::TCPv4TransportDescriptor>();
writer_transport = std::make_shared<eprosima::fastdds::rtps::TCPv4TransportDescriptor>();
initialPeerLocator.kind = LOCATOR_KIND_TCPv4;
IPLocator::setIPv4(initialPeerLocator, 127, 0, 0, 1);
}
writer_transport->tcp_negotiation_timeout = 100;
reader_transport->tcp_negotiation_timeout = 100;

// Add listener port to server
writer_transport->add_listener_port(writer_port);

// Add initial peer to client
initialPeerLocator.port = writer_port;
LocatorList_t initial_peer_list;
initial_peer_list.push_back(initialPeerLocator);

// Setup participants
writer.disable_builtin_transport()
.add_user_transport_to_pparams(writer_transport);

reader.disable_builtin_transport()
.initial_peers(initial_peer_list)
.add_user_transport_to_pparams(reader_transport);

// Init participants
writer.init();
reader.init();
ASSERT_TRUE(writer.isInitialized());
ASSERT_TRUE(reader.isInitialized());

// Wait for discovery
writer.wait_discovery(1, std::chrono::seconds(0));
reader.wait_discovery(std::chrono::seconds(0), 1);

// Send and receive data
reader.startReception(data);

writer.send(data);
EXPECT_TRUE(data.empty());

reader.block_for_all();
}

// Test TCP transport on large message with best effort reliability and LARGE_DATA mode
TEST_P(TransportTCP, large_message_large_data_send_receive)
{
// Prepare data to be sent. before participants discovery so it is ready to be sent as soon as possible.
// The writer might try to send the data before the reader has negotiated the connection.
// If the negotiation timeout is too short, the writer will fail to send the data and the reader will not receive it.
// LARGE_DATA participant discovery is tipically faster than tcp negotiation.
std::list<Data1mb> data;
data = default_data300kb_data_generator(1);

/* Test configuration */
PubSubReader<Data1mbPubSubType> reader(TEST_TOPIC_NAME);
PubSubWriter<Data1mbPubSubType> writer(TEST_TOPIC_NAME);

uint32_t tcp_negotiation_timeout = 100;
writer.setup_large_data_tcp(use_ipv6, 0, tcp_negotiation_timeout);
reader.setup_large_data_tcp(use_ipv6, 0, tcp_negotiation_timeout);

// Init participants
writer.init();
reader.init();
ASSERT_TRUE(writer.isInitialized());
ASSERT_TRUE(reader.isInitialized());

// Wait for discovery
writer.wait_discovery(1, std::chrono::seconds(0));
reader.wait_discovery(std::chrono::seconds(0), 1);

// Send and receive data
reader.startReception(data);

writer.send(data);
EXPECT_TRUE(data.empty());

reader.block_for_all();
}

// Test CreateInitialConnection for TCP
TEST_P(TransportTCP, TCP_initial_peers_connection)
{
PubSubWriter<HelloWorldPubSubType> p1(TEST_TOPIC_NAME);
PubSubReader<HelloWorldPubSubType> p2(TEST_TOPIC_NAME);
PubSubReader<HelloWorldPubSubType> p3(TEST_TOPIC_NAME);

// Add TCP Transport with listening port
auto p1_transport = std::make_shared<eprosima::fastdds::rtps::TCPv4TransportDescriptor>();
p1_transport->add_listener_port(global_port);
auto p2_transport = std::make_shared<eprosima::fastdds::rtps::TCPv4TransportDescriptor>();
p2_transport->add_listener_port(global_port + 1);
auto p3_transport = std::make_shared<eprosima::fastdds::rtps::TCPv4TransportDescriptor>();
p3_transport->add_listener_port(global_port - 1);

// Add initial peer to client
Locator_t initialPeerLocator;
initialPeerLocator.kind = LOCATOR_KIND_TCPv4;
IPLocator::setIPv4(initialPeerLocator, 127, 0, 0, 1);
initialPeerLocator.port = global_port;
LocatorList_t initial_peer_list;
initial_peer_list.push_back(initialPeerLocator);

// Setup participants
p1.disable_builtin_transport()
.add_user_transport_to_pparams(p1_transport);

p2.disable_builtin_transport()
.initial_peers(initial_peer_list)
.add_user_transport_to_pparams(p2_transport);

p3.disable_builtin_transport()
.initial_peers(initial_peer_list)
.add_user_transport_to_pparams(p3_transport);

// Init participants
p1.init();
p2.init();
p3.init();
ASSERT_TRUE(p1.isInitialized());
ASSERT_TRUE(p2.isInitialized());
ASSERT_TRUE(p3.isInitialized());

// Wait for discovery
p1.wait_discovery(2, std::chrono::seconds(0));
p2.wait_discovery(std::chrono::seconds(0), 1);
p3.wait_discovery(std::chrono::seconds(0), 1);

// Send and receive data
auto data = default_helloworld_data_generator();
p2.startReception(data);
p3.startReception(data);

p1.send(data);
EXPECT_TRUE(data.empty());

p2.block_for_all();
p3.block_for_all();
}

>>>>>>> a0a4feeff (Create InitialConnection for TCP initial peers 2.13 & 2.10 & 2.6 (#4947))
#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down

0 comments on commit c477680

Please sign in to comment.