Skip to content

Commit

Permalink
Added doc and test for thread-safety of join/leaveMulticastGroup (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
FlorianReimold authored Feb 26, 2024
1 parent af37bfa commit d30fe4c
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 6 deletions.
160 changes: 159 additions & 1 deletion tests/udpcap_test/src/udpcap_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,7 @@ TEST(udpcap, MulticastReceive)
receive_thread2.join();
}

// Create and destroy a bound many Udpcap sockets with a thread waiting for a datagram
TEST(udpcap, ManySockets)
{
constexpr int num_udpcap_socket = 100;
Expand Down Expand Up @@ -805,4 +806,161 @@ TEST(udpcap, ManySockets)

// Join the send thread
send_thread.join();
}
}

// Create many Udpcap multicast sockets and join / leave multicast groups while receiving datagrams
TEST(udpcap, ManyMulticastSockets)
{
constexpr int num_udpcap_socket = 10;
constexpr int num_test_loops = 5;
constexpr char* multicast_group_1 = "225.0.0.1";
constexpr char* multicast_group_2 = "225.0.0.2";
constexpr uint16_t port = 14000;

// Create asio sockets to send datagrams to the multicast groups
asio::io_service io_service;
asio::ip::udp::socket asio_socket1(io_service, asio::ip::udp::v4());
asio::ip::udp::socket asio_socket2(io_service, asio::ip::udp::v4());
asio::ip::udp::endpoint endpoint1(asio::ip::make_address(multicast_group_1), port);
asio::ip::udp::endpoint endpoint2(asio::ip::make_address(multicast_group_2), port);
asio_socket1.set_option(asio::ip::multicast::hops(1));
asio_socket2.set_option(asio::ip::multicast::hops(1));
asio_socket1.set_option(asio::ip::multicast::enable_loopback(true));
asio_socket2.set_option(asio::ip::multicast::enable_loopback(true));

asio_socket1.connect(endpoint1);
asio_socket2.connect(endpoint2);

// Thread that constantly pushes datagrams via the asio sockets
std::thread send_thread1([&asio_socket1]()
{
std::string buffer_string = "Hello World";
while (true)
{
asio::error_code ec;
asio_socket1.send(asio::buffer(buffer_string), 0, ec);
if (ec)
{
break;
}
}
});

std::thread send_thread2([&asio_socket2]()
{
std::string buffer_string = "Hello World";
while (true)
{
asio::error_code ec;
asio_socket2.send(asio::buffer(buffer_string), 0, ec);
if (ec)
{
break;
}
}
});

// Create num_udpcap_socket udpcap sockets
std::vector<Udpcap::UdpcapSocket> udpcap_sockets;
std::vector<std::thread> receive_threads;

// Reserve space for the sockets
udpcap_sockets.reserve(num_udpcap_socket);

for (int i = 0; i < num_udpcap_socket; i++)
{
udpcap_sockets.emplace_back();
ASSERT_TRUE(udpcap_sockets.back().isValid());
const bool success = udpcap_sockets.back().bind(Udpcap::HostAddress::Any(), port);
ASSERT_TRUE(success);
udpcap_sockets.back().setMulticastLoopbackEnabled(true);

// Create a receive thread that constantly receives datagrams
receive_threads.emplace_back([&udpcap_sockets, i, multicast_group_1, multicast_group_2]()
{
while (true)
{
// Initialize variables for the sender's address and port
Udpcap::HostAddress sender_address;
uint16_t sender_port(0);
Udpcap::Error error = Udpcap::Error::ErrorCode::GENERIC_ERROR;

// Allocate buffer with max udp datagram size
std::vector<char> received_datagram;
received_datagram.resize(65536);

// blocking receive
const size_t received_bytes = udpcap_sockets[i].receiveDatagram(received_datagram.data(), received_datagram.size(), &sender_address, &sender_port, error);
received_datagram.resize(received_bytes);

if (error)
{
// Indicates that somebody closed the socket
ASSERT_EQ(error, Udpcap::Error(Udpcap::Error::ErrorCode::SOCKET_CLOSED));

// Check that the socket is closed
ASSERT_TRUE(udpcap_sockets[i].isClosed());

break;
}
else
{
// Check if the received datagram is valid and contains "Hello World"
ASSERT_FALSE(received_datagram.empty());
ASSERT_EQ(std::string(received_datagram.data(), received_datagram.size()), "Hello World");
}
}
});
}

for (int i = 0; i < num_test_loops; i++)
{
// Join the multicast group 1
for (auto& udpcap_socket : udpcap_sockets)
{
const bool success = udpcap_socket.joinMulticastGroup(Udpcap::HostAddress(multicast_group_1));
ASSERT_TRUE(success);
}

// Join the multicast group 2
for (auto& udpcap_socket : udpcap_sockets)
{
const bool success = udpcap_socket.joinMulticastGroup(Udpcap::HostAddress(multicast_group_2));
ASSERT_TRUE(success);
}

// Leave the multicast group 1
for (auto& udpcap_socket : udpcap_sockets)
{
const bool success = udpcap_socket.leaveMulticastGroup(Udpcap::HostAddress(multicast_group_1));
ASSERT_TRUE(success);
}

// Leave the multicast group 2
for (auto& udpcap_socket : udpcap_sockets)
{
const bool success = udpcap_socket.leaveMulticastGroup(Udpcap::HostAddress(multicast_group_2));
ASSERT_TRUE(success);
}
}

// Close the sockets
for (auto& udpcap_socket : udpcap_sockets)
{
udpcap_socket.close();
}

// Join the threads
for (auto& receive_thread : receive_threads)
{
receive_thread.join();
}

// Close the asio sockets
asio_socket1.close();
asio_socket2.close();

// Join the send threads
send_thread1.join();
send_thread2.join();
}
23 changes: 18 additions & 5 deletions udpcap/include/udpcap/udpcap_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ namespace Udpcap
*
* Thread safety:
* - There must only be 1 thread calling receiveDatagram() at the same time
* - It is safe to call close() while another thread is calling receiveDatagram()
* - It is safe to call close(), join and leave multicast groups while another thread is calling receiveDatagram()
* - Other modifications to the socket must not be made while another thread is calling receiveDatagram()
*/
class UdpcapSocket
Expand Down Expand Up @@ -152,9 +152,13 @@ namespace Udpcap
*
* Thread safety:
* - This method must not be called from multiple threads at the same time
* - While one thread is calling this method, another thread may call close()
* - While one thread is calling this method, no modifications must be made to the socket (except close())
*
* - While one thread is calling this method, another thread may call one (and only one) of the following functions:
* - close()
* - joinMulticastGroup()
* - leaveMulticastGroup()
* - setMulticastLoopbackEnabled()
* - While one thread is calling this method, no other modifications must be made to the socket
*
* @param data [out]: The destination memory
* @param max_len [in]: The maximum bytes available at the destination
* @param timeout_ms [in]: Maximum time to wait for a datagram in ms. If -1, the method will block until a datagram is available
Expand Down Expand Up @@ -195,6 +199,9 @@ namespace Udpcap
* Joining a multicast group fails, when the Socket is invalid, not bound,
* the given address is not a multicast address or this Socket has already
* joined the group.
*
* Thread safety:
* - This function may be called while another thread is calling receiveDatagram()
*
* @param group_address: The multicast group to join
*
Expand All @@ -208,6 +215,9 @@ namespace Udpcap
* Leaving a multicast group fails, when the Socket is invalid, not bound,
* the given address is not a multicast address or this Socket has not
* joined the group, yet.
*
* Thread safety:
* - This function may be called while another thread is calling receiveDatagram()
*
* @param group_address: The multicast group to leave
*
Expand All @@ -219,6 +229,9 @@ namespace Udpcap
* @brief Sets whether local multicast traffic should be received
*
* If not set, the default value is true.
*
* Thread safety:
* - This function may be called while another thread is calling receiveDatagram()
*
* @param enables whether local multicast traffic should be received
*/
Expand All @@ -233,7 +246,7 @@ namespace Udpcap
* @brief Closes the socket
*
* Thread safety:
* - It is safe to call this method while another thread is calling receiveDatagram()
* - This function may be called while another thread is calling receiveDatagram()
*/
UDPCAP_EXPORT void close();

Expand Down

0 comments on commit d30fe4c

Please sign in to comment.