Skip to content

Commit

Permalink
Implemented timeout and fixed memory leak
Browse files Browse the repository at this point in the history
  • Loading branch information
FlorianReimold committed Feb 12, 2024
1 parent 13a98c7 commit 4303076
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 34 deletions.
146 changes: 137 additions & 9 deletions tests/udpcap_test/src/udpcap_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,18 @@

#include "atomic_signalable.h"

// Create and destroy as UdpcapSocket
TEST(udpcap, RAII)
{
// Create a udpcap socket
Udpcap::UdpcapSocket udpcap_socket;
ASSERT_TRUE(udpcap_socket.isValid());

// Delete the socket
{
// Create a udpcap socket
Udpcap::UdpcapSocket udpcap_socket;
ASSERT_TRUE(udpcap_socket.isValid());
// Delete the socket
}
}

// Create and destroy a abound UdpcapSocket
TEST(udpcap, RAIIWithClose)
{
// Create a udpcap socket
Expand All @@ -49,6 +52,7 @@ TEST(udpcap, RAIIWithClose)
udpcap_socket.close();
}

// Create and destroy a bound UdpcapSocket with a thread waiting for a datagram
TEST(udpcap, RAIIWithSomebodyWaiting)
{
// Create a udpcap socket
Expand Down Expand Up @@ -88,6 +92,7 @@ TEST(udpcap, RAIIWithSomebodyWaiting)
// Delete the socket
}

// Receive a simple Hello World Message
TEST(udpcap, SimpleReceive)
{
atomic_signalable<int> received_messages(0);
Expand Down Expand Up @@ -148,6 +153,7 @@ TEST(udpcap, SimpleReceive)
receive_thread.join();
}

// Receive multiple small packages with a small delay between sending
TEST(udpcap, MultipleSmallPackages)
{
constexpr int num_packages_to_send = 10;
Expand Down Expand Up @@ -223,6 +229,7 @@ TEST(udpcap, MultipleSmallPackages)
receive_thread.join();
}

// Receive a datagram after it has been sent, so it had to be buffered
TEST(udpcap, SimpleReceiveWithBuffer)
{
atomic_signalable<int> received_messages(0);
Expand Down Expand Up @@ -282,9 +289,10 @@ TEST(udpcap, SimpleReceiveWithBuffer)
receive_thread.join();
}

// Receive multiple datagrams slower than they are sent, so they have to be buffered
TEST(udpcap, DelayedPackageReceiveMultiplePackages)
{
constexpr int num_packages_to_send = 100; // TODO: increase
constexpr int num_packages_to_send = 100;
constexpr int size_per_package = 1024;
constexpr std::chrono::milliseconds receive_delay(10);

Expand Down Expand Up @@ -317,7 +325,7 @@ TEST(udpcap, DelayedPackageReceiveMultiplePackages)
std::vector<char> received_datagram;
received_datagram.resize(65536);

size_t bytes_received = udpcap_socket.receiveDatagram(received_datagram.data(), received_datagram.size(), 0, &sender_address, &sender_port, error);
size_t bytes_received = udpcap_socket.receiveDatagram(received_datagram.data(), received_datagram.size(), &sender_address, &sender_port, error);
received_datagram.resize(bytes_received);

if (error)
Expand Down Expand Up @@ -368,9 +376,129 @@ TEST(udpcap, DelayedPackageReceiveMultiplePackages)
receive_thread.join();
}

// Test the timeout of the receiveDatagram function
TEST(udpcap, Timeout)
{
atomic_signalable<int> received_messages(0);

// Create a udpcap socket
Udpcap::UdpcapSocket udpcap_socket;
ASSERT_TRUE(udpcap_socket.isValid());

{
bool success = udpcap_socket.bind(Udpcap::HostAddress::Any(), 14000);
ASSERT_TRUE(success);
}

// Initialize variables for the sender's address and port
Udpcap::HostAddress sender_address;
uint16_t sender_port(0);
Udpcap::Error error = Udpcap::Error::ErrorCode::GENERIC_ERROR;

// Intialize an asio socket
asio::io_service io_service;
const asio::ip::udp::endpoint endpoint(asio::ip::make_address("127.0.0.1"), 14000);
asio::ip::udp::socket asio_socket(io_service, endpoint.protocol());
std::string buffer_string = "Hello World";


// Allocate buffer with max udp datagram size
std::vector<char> received_datagram;
received_datagram.resize(65536);

// Nothing is received while waiting
{
// Take Start time
auto start_time = std::chrono::steady_clock::now();

// blocking receive with a 100ms timeout
size_t received_bytes = udpcap_socket.receiveDatagram(received_datagram.data(), received_datagram.size(), 100, &sender_address, &sender_port, error);

// Take End time
auto end_time = std::chrono::steady_clock::now();

ASSERT_EQ(error, Udpcap::Error::TIMEOUT);
ASSERT_EQ(received_bytes, 0);
ASSERT_GE(std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count(), 100);
}

// Something already is in the socket, so the call must return earlier
{
asio_socket.send_to(asio::buffer(buffer_string), endpoint);

// sleep 10ms
std::this_thread::sleep_for(std::chrono::milliseconds(10));

// Take Start time
auto start_time = std::chrono::steady_clock::now();

// blocking receive with a 500ms timeout
size_t received_bytes = udpcap_socket.receiveDatagram(received_datagram.data(), received_datagram.size(), 500, &sender_address, &sender_port, error);

// Take End time
auto end_time = std::chrono::steady_clock::now();

ASSERT_EQ(error, Udpcap::Error::OK);
ASSERT_EQ(received_bytes, buffer_string.size());
ASSERT_LE(std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count(), 500);

// Resize the buffer and check the content
received_datagram.resize(received_bytes);
ASSERT_EQ(std::string(received_datagram.data(), received_datagram.size()), buffer_string);
}

// 0ms timeout returns immediately, when nothing is in the socket
{
// Take Start time
auto start_time = std::chrono::steady_clock::now();

// blocking receive with a 0ms timeout
size_t received_bytes = udpcap_socket.receiveDatagram(received_datagram.data(), received_datagram.size(), 0, &sender_address, &sender_port, error);

// Take End time
auto end_time = std::chrono::steady_clock::now();

ASSERT_EQ(error, Udpcap::Error::TIMEOUT);
ASSERT_EQ(received_bytes, 0);
ASSERT_LE(std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count(), 100);
}

// 0ms timeout returns immediately when something is in the socket
{
asio_socket.send_to(asio::buffer(buffer_string), endpoint);

// Take Start time
auto start_time = std::chrono::steady_clock::now();

// blocking receive with a 0ms timeout
size_t received_bytes = udpcap_socket.receiveDatagram(received_datagram.data(), received_datagram.size(), 0, &sender_address, &sender_port, error);

// Take End time
auto end_time = std::chrono::steady_clock::now();

ASSERT_EQ(error, Udpcap::Error::OK);
ASSERT_EQ(received_bytes, buffer_string.size());
ASSERT_LE(std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count(), 100);

// Resize the buffer and check the content
received_datagram.resize(received_bytes);
ASSERT_EQ(std::string(received_datagram.data(), received_datagram.size()), buffer_string);
}

// Close the socket
udpcap_socket.close();
}

// TODO: Write a test that tests the Source Address and Source Port

// TODO: Write a test that tests the timeout
// TODO: Test the returned errors of the receiveDatagram function

// TODO: test isclosed function

// TODO: rapidly create and destroy sockets to see if the memory is freed correctly https://stackoverflow.com/questions/29174938/googletest-and-memory-leaks

// TODO: Test Multicast Receive

// TODO: Test with multiple multicast sockets, that each only receive their own multicast group

// TODO: test isclosed function
// TODO: Create many sockets in threads, wait for them and destroy them to see if there are any race conditions that lead to crashes
4 changes: 2 additions & 2 deletions udpcap/include/udpcap/udpcap_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ namespace Udpcap
/**
* @brief Blocks for the given time until a packet arives and copies it to the given memory
*
* If the socket is not bound, this method will return immediatelly.
* If the socket is not bound, this method will return immediately.
* If a source_adress or source_port is provided, these will be filled with
* the according information from the packet. If the given time elapses
* before a datagram was available, no data is copied and 0 is returned.
Expand All @@ -146,7 +146,7 @@ namespace Udpcap
*
* @param data [out]: The destination memory
* @param max_len [in]: The maximum bytes available at the destination
* @param timeout_ms [in]: Maximum time to wait for a datagram in ms
* @param timeout_ms [in]: Maximum time to wait for a datagram in ms. If -1, the method will block until a datagram is available
* @param source_address [out]: the sender address of the datagram
* @param source_port [out]: the sender port of the datagram
* @param error [out]: The error that occured
Expand Down
4 changes: 2 additions & 2 deletions udpcap/src/udpcap_socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ namespace Udpcap

size_t UdpcapSocket::receiveDatagram(char* data, size_t max_len, unsigned long timeout_ms, HostAddress* source_address, uint16_t* source_port, Udpcap::Error& error) { return udpcap_socket_private_->receiveDatagram(data, max_len, timeout_ms, source_address, source_port, error); }
size_t UdpcapSocket::receiveDatagram(char* data, size_t max_len, unsigned long timeout_ms, Udpcap::Error& error) { return udpcap_socket_private_->receiveDatagram(data, max_len, timeout_ms, nullptr, nullptr, error); }
size_t UdpcapSocket::receiveDatagram(char* data, size_t max_len, Udpcap::Error& error) { return udpcap_socket_private_->receiveDatagram(data, max_len, 0, nullptr, nullptr, error); }
size_t UdpcapSocket::receiveDatagram(char* data, size_t max_len, HostAddress* source_address, uint16_t* source_port, Udpcap::Error& error) { return udpcap_socket_private_->receiveDatagram(data, max_len, 0, source_address, source_port, error); }
size_t UdpcapSocket::receiveDatagram(char* data, size_t max_len, Udpcap::Error& error) { return udpcap_socket_private_->receiveDatagram(data, max_len, -1, nullptr, nullptr, error); }
size_t UdpcapSocket::receiveDatagram(char* data, size_t max_len, HostAddress* source_address, uint16_t* source_port, Udpcap::Error& error) { return udpcap_socket_private_->receiveDatagram(data, max_len, -1, source_address, source_port, error); }

bool UdpcapSocket::joinMulticastGroup (const HostAddress& group_address) { return udpcap_socket_private_->joinMulticastGroup(group_address); }
bool UdpcapSocket::leaveMulticastGroup (const HostAddress& group_address) { return udpcap_socket_private_->leaveMulticastGroup(group_address); }
Expand Down
84 changes: 64 additions & 20 deletions udpcap/src/udpcap_socket_private.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ namespace Udpcap
{
}

UdpcapSocketPrivate::~UdpcapSocketPrivate()
{
close();
}
//UdpcapSocketPrivate::~UdpcapSocketPrivate()
//{
// // @todo: reinvestigate why it crashes on close. (Maybe check if i have implemented copy / move constructors properly)
Expand Down Expand Up @@ -486,6 +490,17 @@ namespace Udpcap
, uint16_t* source_port
, Udpcap::Error& error)
{
// calculate until when to wait. If timeout_ms is 0 or smaller, we will wait forever.
std::chrono::steady_clock::time_point wait_until;
if (timeout_ms < 0)
{
wait_until = std::chrono::steady_clock::time_point::max();
}
else
{
wait_until = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
}

if (!is_valid_)
{
// Invalid socket, cannot bind => fail!
Expand Down Expand Up @@ -529,7 +544,16 @@ namespace Udpcap
return 0;
}

// Iterate through all devices and check if they have data
// Iterate through all devices and check if they have data. There is
// no other API (that I know of) to check whether data is available on
// a PCAP device other than trying to claim it. There is a very valid
// possibility that no device will have any data available. In that
// case, we use the native Win32 event handles to wait for new data
// becoming available. We however cannot do that here before trying to
// receive the data, as waiting on the event would clear the event
// state and we don't have information about the amount of data being
// availabe (e.g. there are 2 packets available, but the event is
// cleared after we waited for the first one).
for (const auto& pcap_dev : pcap_devices_)
{
CallbackArgsRawPtr callback_args(data, max_len, source_address, source_port, bound_port_, pcpp::LinkLayerType::LINKTYPE_NULL);
Expand Down Expand Up @@ -565,22 +589,25 @@ namespace Udpcap
// may still be data left in the buffer without the event being set.
if (!received_any_data)
{
// TODO: make WaitForMultipleObjects use the timeout
unsigned long remaining_time_to_wait_ms = INFINITE;
//unsigned long remaining_time_to_wait_ms = 0;
//if (wait_forever)
//{
// remaining_time_to_wait_ms = INFINITE;
//}
//else
//{
// auto now = std::chrono::steady_clock::now();
// if (now < wait_until)
// {
// remaining_time_to_wait_ms = static_cast<unsigned long>(std::chrono::duration_cast<std::chrono::milliseconds>(wait_until - now).count());
// }
//}
// Check if we are out of time and return an error if so.
auto now = std::chrono::steady_clock::now();
if (now >= wait_until)
{
error = Udpcap::Error::TIMEOUT;
return 0;
}

// If we are not out of time, we calculate how many milliseconds we are allowed to wait for new data.
unsigned long remaining_time_to_wait_ms = 0;
const bool wait_forever = (timeout_ms < 0); // Original parameter "timeout_ms" is negative if we want to wait forever
if (wait_forever)
{
remaining_time_to_wait_ms = INFINITE;
}
else
{
remaining_time_to_wait_ms = static_cast<unsigned long>(std::chrono::duration_cast<std::chrono::milliseconds>(wait_until - now).count());
}

DWORD num_handles = static_cast<DWORD>(pcap_win32_handles_.size());
if (num_handles > MAXIMUM_WAIT_OBJECTS)
Expand All @@ -599,9 +626,24 @@ namespace Udpcap
// all pcap devices for data.
continue;
}
else
else if ((wait_result >= WAIT_ABANDONED_0) && wait_result <= (WAIT_ABANDONED_0 + num_handles - 1))
{
error = Udpcap::Error(Udpcap::Error::GENERIC_ERROR, "Internal error \"WAIT_ABANDONED\" while waiting for data: " + std::system_category().message(GetLastError()));
LOG_DEBUG(error.ToString()); // This should never happen in a proper application
}
else if (wait_result == WAIT_TIMEOUT)
{
// LOG_DEBUG("Receive error: WAIT_TIMEOUT");
error = Udpcap::Error::TIMEOUT;
return 0;
}
else if (wait_result == WAIT_FAILED)
{
// TODO: Handle errors, especially closed and timeout errors
// This probably indicates a closed socket. But we don't need to
// check it here, we can simply continue the loop, as the first
// thing the loop does is checking for a closed socket.
LOG_DEBUG("Receive error: WAIT_FAILED: " + std::system_category().message(GetLastError()));
continue;
}
}
}
Expand Down Expand Up @@ -995,11 +1037,13 @@ namespace Udpcap
else
{
// Set the filter
if (pcap_setfilter(pcap_dev.pcap_handle_, &filter_program) == PCAP_ERROR)
auto set_filter_error = pcap_setfilter(pcap_dev.pcap_handle_, &filter_program);
if (set_filter_error == PCAP_ERROR)
{
pcap_perror(pcap_dev.pcap_handle_, ("UdpcapSocket ERROR: Unable to set filter \"" + filter_string + "\"").c_str());
pcap_freecode(&filter_program); // TODO: Check if I need to free the filter program at other places as well (e.g. destructor)
}

pcap_freecode(&filter_program);
}
}

Expand Down
3 changes: 2 additions & 1 deletion udpcap/src/udpcap_socket_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ namespace Udpcap
static const int MAX_PACKET_SIZE = 65536; // Npcap Doc: A snapshot length of 65535 should be sufficient, on most if not all networks, to capture all the data available from the packet.

UdpcapSocketPrivate();
~UdpcapSocketPrivate() = default;
~UdpcapSocketPrivate();

// Copy
UdpcapSocketPrivate(UdpcapSocketPrivate const&) = delete;
Expand All @@ -145,6 +145,7 @@ namespace Udpcap
std::vector<char> receiveDatagram_OLD(HostAddress* source_address = nullptr, uint16_t* source_port = nullptr);
std::vector<char> receiveDatagram_OLD(unsigned long timeout_ms, HostAddress* source_address = nullptr, uint16_t* source_port = nullptr);

// TODO: cleanup
size_t receiveDatagram_OLD(char* data, size_t max_len, HostAddress* source_address = nullptr, uint16_t* source_port = nullptr);
size_t receiveDatagram_OLD(char* data, size_t max_len, unsigned long timeout_ms, HostAddress* source_address = nullptr, uint16_t* source_port = nullptr);

Expand Down

0 comments on commit 4303076

Please sign in to comment.