diff --git a/tests/udpcap_test/CMakeLists.txt b/tests/udpcap_test/CMakeLists.txt index e27936d..bd97916 100644 --- a/tests/udpcap_test/CMakeLists.txt +++ b/tests/udpcap_test/CMakeLists.txt @@ -35,6 +35,8 @@ add_executable (${PROJECT_NAME} target_compile_features(${PROJECT_NAME} PUBLIC cxx_std_14) +source_group(TREE ${CMAKE_CURRENT_SOURCE_DIR} FILES ${sources}) + target_link_libraries (${PROJECT_NAME} udpcap::udpcap GTest::gtest_main diff --git a/tests/udpcap_test/src/udpcap_test.cpp b/tests/udpcap_test/src/udpcap_test.cpp index 767bf90..a7fe88b 100644 --- a/tests/udpcap_test/src/udpcap_test.cpp +++ b/tests/udpcap_test/src/udpcap_test.cpp @@ -108,8 +108,15 @@ TEST(udpcap, SimpleReceive) ASSERT_TRUE(success); } + // Create an asio UDP sender socket + asio::io_service io_service; + const asio::ip::udp::endpoint endpoint(asio::ip::make_address("127.0.0.1"), 14000); + asio::ip::udp::socket asio_socket(io_service, endpoint.protocol()); + asio_socket.connect(endpoint); + const auto asio_local_endpoint = asio_socket.local_endpoint(); + // Blocking receive a datagram - std::thread receive_thread([&udpcap_socket, &received_messages]() + std::thread receive_thread([&udpcap_socket, &received_messages, &asio_local_endpoint]() { // Initialize variables for the sender's address and port Udpcap::HostAddress sender_address; @@ -131,15 +138,13 @@ TEST(udpcap, SimpleReceive) ASSERT_FALSE(received_datagram.empty()); ASSERT_EQ(std::string(received_datagram.data(), received_datagram.size()), "Hello World"); + // Check if the sender's address and port are correct + ASSERT_EQ(sender_address.toString(), asio_local_endpoint.address().to_string()); + ASSERT_EQ(sender_port, asio_local_endpoint.port()); + received_messages++; }); - // Create an asio UDP sender socket - asio::io_service io_service; - - const asio::ip::udp::endpoint endpoint(asio::ip::make_address("127.0.0.1"), 14000); - asio::ip::udp::socket asio_socket(io_service, endpoint.protocol()); - std::string buffer_string = "Hello World"; asio_socket.send_to(asio::buffer(buffer_string), endpoint); @@ -173,11 +178,21 @@ TEST(udpcap, MultipleSmallPackages) ASSERT_TRUE(success); } + // Create an asio UDP sender socket + asio::io_service io_service; + const asio::ip::udp::endpoint endpoint(asio::ip::make_address("127.0.0.1"), 14000); + asio::ip::udp::socket asio_socket(io_service, endpoint.protocol()); + asio_socket.connect(endpoint); + const auto asio_local_endpoint = asio_socket.local_endpoint(); + // Receive datagrams in a separate thread - std::thread receive_thread([&udpcap_socket, &received_messages, num_packages_to_send]() + std::thread receive_thread([&udpcap_socket, &received_messages, num_packages_to_send, &asio_local_endpoint]() { while (true) { + // Initialize variables for the sender's address and port + Udpcap::HostAddress sender_address; + uint16_t sender_port(0); Udpcap::Error error = Udpcap::Error::ErrorCode::GENERIC_ERROR; // Allocate buffer with max udp datagram size @@ -185,7 +200,7 @@ TEST(udpcap, MultipleSmallPackages) received_datagram.resize(65536); // blocking receive - const size_t received_bytes = udpcap_socket.receiveDatagram(received_datagram.data(), received_datagram.size(), error); + const size_t received_bytes = udpcap_socket.receiveDatagram(received_datagram.data(), received_datagram.size(), &sender_address, &sender_port, error); if (error) { @@ -207,16 +222,14 @@ TEST(udpcap, MultipleSmallPackages) ASSERT_FALSE(received_datagram.empty()); ASSERT_EQ(std::string(received_datagram.data(), received_datagram.size()), "Hello World"); + // Check if the sender's address and port are correct + ASSERT_EQ(sender_address.toString(), asio_local_endpoint.address().to_string()); + ASSERT_EQ(sender_port, asio_local_endpoint.port()); + received_messages++; } }); - // Create an asio UDP sender socket - asio::io_service io_service; - - const asio::ip::udp::endpoint endpoint(asio::ip::make_address("127.0.0.1"), 14000); - asio::ip::udp::socket asio_socket(io_service, endpoint.protocol()); - std::string buffer_string = "Hello World"; for (int i = 0; i < num_packages_to_send; i++) { @@ -252,9 +265,10 @@ TEST(udpcap, SimpleReceiveWithBuffer) // Create an asio UDP sender socket asio::io_service io_service; - const asio::ip::udp::endpoint endpoint(asio::ip::make_address("127.0.0.1"), 14000); asio::ip::udp::socket asio_socket(io_service, endpoint.protocol()); + asio_socket.connect(endpoint); + const auto asio_local_endpoint = asio_socket.local_endpoint(); // Send "Hello World" without currently polling the socket std::string buffer_string = "Hello World"; @@ -263,15 +277,18 @@ TEST(udpcap, SimpleReceiveWithBuffer) std::this_thread::sleep_for(std::chrono::milliseconds(10)); // Receive the datagram - std::thread receive_thread([&udpcap_socket, &received_messages]() + std::thread receive_thread([&udpcap_socket, &received_messages, &asio_local_endpoint]() { + // Initialize variables for the sender's address and port Udpcap::Error error = Udpcap::Error::ErrorCode::GENERIC_ERROR; + Udpcap::HostAddress sender_address; + uint16_t sender_port(0); // Create buffer with max udp datagram size std::vector received_datagram; received_datagram.resize(65536); - received_datagram.resize(udpcap_socket.receiveDatagram(received_datagram.data(), received_datagram.size(), error)); + received_datagram.resize(udpcap_socket.receiveDatagram(received_datagram.data(), received_datagram.size(), &sender_address, &sender_port, error)); // No error must have occurred ASSERT_FALSE(bool(error)); @@ -280,6 +297,10 @@ TEST(udpcap, SimpleReceiveWithBuffer) ASSERT_FALSE(received_datagram.empty()); ASSERT_EQ(std::string(received_datagram.data(), received_datagram.size()), "Hello World"); + // Check if the sender's address and port are correct + ASSERT_EQ(sender_address.toString(), asio_local_endpoint.address().to_string()); + ASSERT_EQ(sender_port, asio_local_endpoint.port()); + received_messages++; }); @@ -318,8 +339,16 @@ TEST(udpcap, DelayedPackageReceiveMultiplePackages) ASSERT_TRUE(success); } + // Create an asio UDP sender socket + asio::io_service io_service; + const asio::ip::udp::endpoint endpoint(asio::ip::make_address("127.0.0.1"), 14000); + asio::ip::udp::socket asio_socket(io_service, endpoint.protocol()); + asio_socket.connect(endpoint); + const auto asio_local_endpoint = asio_socket.local_endpoint(); + + // Receive datagrams in a separate thread - std::thread receive_thread([&udpcap_socket, &received_messages, num_packages_to_send, size_per_package, receive_delay]() + std::thread receive_thread([&udpcap_socket, &received_messages, num_packages_to_send, size_per_package, receive_delay, &asio_local_endpoint]() { while (true) { @@ -353,20 +382,15 @@ TEST(udpcap, DelayedPackageReceiveMultiplePackages) ASSERT_EQ(received_datagram.size(), size_per_package); received_messages++; + // Check the sender endpoint + ASSERT_EQ(sender_address.toString(), asio_local_endpoint.address().to_string()); + ASSERT_EQ(sender_port, asio_local_endpoint.port()); + // Wait a bit, so we force the udpcap socket to buffer the datagrams std::this_thread::sleep_for(receive_delay); } }); - // Create an asio UDP sender socket - asio::io_service io_service; - - const asio::ip::udp::endpoint endpoint(asio::ip::make_address("127.0.0.1"), 14000); - asio::ip::udp::socket asio_socket(io_service, endpoint.protocol()); - - // Capture the start time - auto start_time = std::chrono::steady_clock::now(); - // Send the buffers for (int i = 0; i < num_packages_to_send; i++) { @@ -379,11 +403,6 @@ TEST(udpcap, DelayedPackageReceiveMultiplePackages) // Check if the received message counter is equal to the sent messages ASSERT_EQ(received_messages.get(), num_packages_to_send); - // Capture the end time - auto end_time = std::chrono::steady_clock::now(); - - // TODO: check the entire delay - asio_socket.close(); udpcap_socket.close(); @@ -432,9 +451,6 @@ TEST(udpcap, Timeout) ASSERT_EQ(error, Udpcap::Error::TIMEOUT); ASSERT_EQ(received_bytes, 0); - // Print the used time in milliseconds to console - std::cout << "Time: " << std::chrono::duration_cast(end_time - start_time).count() << std::endl; - ASSERT_GE(std::chrono::duration_cast(end_time - start_time).count(), 100); // TODO: This sometimes fails. Check why! } @@ -582,7 +598,6 @@ TEST(udpcap, MulticastReceive) asio_socket.set_option(asio::ip::multicast::hops(1)); asio_socket.set_option(asio::ip::multicast::enable_loopback(true)); - // Receive datagrams in a separate thread for Socket1 (checks for 224.0.0.1) std::thread receive_thread1([&udpcap_socket1, &received_messages1]() { @@ -650,7 +665,7 @@ TEST(udpcap, MulticastReceive) break; } - + // Check if the received datagram is valid and contains "224.0.0.1" or "224.0.0.2" ASSERT_TRUE(std::string(received_datagram.data(), received_datagram.size()) == "224.0.0.1" || std::string(received_datagram.data(), received_datagram.size()) == "224.0.0.2"); @@ -677,8 +692,8 @@ TEST(udpcap, MulticastReceive) received_messages2.wait_for([](int value) { return value >= 2; }, std::chrono::milliseconds(500)); // Check if the received message counters - ASSERT_EQ(received_messages1.get(), 1) << "Make sure, your FIREWALL is DISABLED!!!"; - ASSERT_EQ(received_messages2.get(), 2) << "Make sure, your FIREWALL is DISABLED!!!"; + ASSERT_EQ(received_messages1.get(), 1) << "Make sure that your FIREWALL is DISABLED!!!"; + ASSERT_EQ(received_messages2.get(), 2) << "Make sure that your FIREWALL is DISABLED!!!"; // Close the sockets asio_socket.close(); @@ -690,6 +705,104 @@ TEST(udpcap, MulticastReceive) receive_thread2.join(); } -// TODO: Write a test that tests the Source Address and Source Port +TEST(udpcap, ManySockets) +{ + constexpr int num_udpcap_socket = 100; + constexpr char* ip_address = "127.0.0.1"; + constexpr uint16_t port = 14000; + + // Create an asio socket that sends datagrams to the ip address and port + asio::io_service io_service; + asio::ip::udp::socket asio_socket(io_service, asio::ip::udp::v4()); + asio::ip::udp::endpoint endpoint(asio::ip::make_address(ip_address), port); + asio_socket.connect(endpoint); + + // Thread that constantly pushes datagrams via the asio socket + std::thread send_thread([&asio_socket]() + { + std::string buffer_string = "Hello World"; + while(true) + { + asio::error_code ec; + asio_socket.send(asio::buffer(buffer_string), 0, ec); + if (ec) + { + break; + } + } + }); + + // Create num_udpcap_socket udpcap sockets + std::vector udpcap_sockets; + std::vector receive_threads; + + // Reserve space for the sockets + udpcap_sockets.reserve(num_udpcap_socket); -// TODO: Create many sockets in threads, wait for them and destroy them to see if there are any race conditions that lead to crashes + for (int i = 0; i < num_udpcap_socket; i++) + { + udpcap_sockets.emplace_back(); + ASSERT_TRUE(udpcap_sockets.back().isValid()); + const bool success = udpcap_sockets.back().bind(Udpcap::HostAddress(ip_address), port); + ASSERT_TRUE(success); + + // Create a receive thread that constantly receives datagrams + receive_threads.emplace_back([&udpcap_sockets, i]() + { + while (true) + { + // Initialize variables for the sender's address and port + Udpcap::HostAddress sender_address; + uint16_t sender_port(0); + Udpcap::Error error = Udpcap::Error::ErrorCode::GENERIC_ERROR; + + // Allocate buffer with max udp datagram size + std::vector received_datagram; + received_datagram.resize(65536); + + // blocking receive + const size_t received_bytes = udpcap_sockets[i].receiveDatagram(received_datagram.data(), received_datagram.size(), &sender_address, &sender_port, error); + received_datagram.resize(received_bytes); + + if (error) + { + // Indicates that somebody closed the socket + ASSERT_EQ(error, Udpcap::Error(Udpcap::Error::ErrorCode::SOCKET_CLOSED)); + + // Check that the socket is closed + ASSERT_TRUE(udpcap_sockets[i].isClosed()); + + break; + } + else + { + // Check if the received datagram is valid and contains "Hello World" + ASSERT_FALSE(received_datagram.empty()); + ASSERT_EQ(std::string(received_datagram.data(), received_datagram.size()), "Hello World"); + } + } + }); + } + + // wait 10ms + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + + // Close the sockets + for (auto& udpcap_socket : udpcap_sockets) + { + udpcap_socket.close(); + } + + // Join the threads + for (auto& receive_thread : receive_threads) + { + receive_thread.join(); + } + + // Close the asio socket + asio_socket.close(); + + // Join the send thread + send_thread.join(); +} \ No newline at end of file diff --git a/udpcap/include/udpcap/udpcap_socket.h b/udpcap/include/udpcap/udpcap_socket.h index 902632f..e292e64 100644 --- a/udpcap/include/udpcap/udpcap_socket.h +++ b/udpcap/include/udpcap/udpcap_socket.h @@ -62,6 +62,11 @@ namespace Udpcap * MulticastLoopbackEnabled=true and joining a multicast group, this * implementation will receive loopback multicast traffic. Winsocks would * not do that (It's not clear to me why). + * + * Thread safety: + * - There must only be 1 thread calling receiveDatagram() at the same time + * - It is safe to call close() while another thread is calling receiveDatagram() + * - Other modifications to the socket must not be made while another thread is calling receiveDatagram() */ class UdpcapSocket { @@ -136,8 +141,19 @@ namespace Udpcap * the according information from the packet. If the given time elapses * before a datagram was available, no data is copied and 0 is returned. * - * TODO: Document which error occurs in which case - * + * Possible errors: + * OK if no error occured + * NPCAP_NOT_INITIALIZED if npcap has not been initialized + * NOT_BOUND if the socket hasn't been bound, yet + * SOCKET_CLOSED if the socket has been closed by the user + * TIMEOUT if the given timeout has elapsed and no datagram was available + * GNERIC_ERROR in cases of internal libpcap errors + * + * Thread safety: + * - This method must not be called from multiple threads at the same time + * - While one thread is calling this method, another thread may call close() + * - While one thread is calling this method, no modifications must be made to the socket (except close()) + * * @param data [out]: The destination memory * @param max_len [in]: The maximum bytes available at the destination * @param timeout_ms [in]: Maximum time to wait for a datagram in ms. If -1, the method will block until a datagram is available @@ -154,18 +170,15 @@ namespace Udpcap , uint16_t* source_port , Udpcap::Error& error); - // TODO: Copy documentation here UDPCAP_EXPORT size_t receiveDatagram(char* data , size_t max_len , long long timeout_ms , Udpcap::Error& error); - // TODO: Copy documentation here UDPCAP_EXPORT size_t receiveDatagram(char* data , size_t max_len , Udpcap::Error& error); - // TODO: Copy documentation here UDPCAP_EXPORT size_t receiveDatagram(char* data , size_t max_len , HostAddress* source_address @@ -217,6 +230,9 @@ namespace Udpcap /** * @brief Closes the socket + * + * Thread safety: + * - It is safe to call this method while another thread is calling receiveDatagram() */ UDPCAP_EXPORT void close(); diff --git a/udpcap/src/ip_reassembly.h b/udpcap/src/ip_reassembly.h index c31c56e..097eadc 100644 --- a/udpcap/src/ip_reassembly.h +++ b/udpcap/src/ip_reassembly.h @@ -37,7 +37,7 @@ namespace Udpcap ///////////////////////////////////////// /// Constructor & Destructor ///////////////////////////////////////// - public: // TODO: Document + public: /** * A c'tor for this class. * diff --git a/udpcap/src/udpcap_socket_private.cpp b/udpcap/src/udpcap_socket_private.cpp index 419ebb8..d32c907 100644 --- a/udpcap/src/udpcap_socket_private.cpp +++ b/udpcap/src/udpcap_socket_private.cpp @@ -404,7 +404,7 @@ namespace Udpcap } else if (wait_result == WAIT_TIMEOUT) { - // LOG_DEBUG("Receive error: WAIT_TIMEOUT"); + //LOG_DEBUG("Receive error: WAIT_TIMEOUT"); error = Udpcap::Error::TIMEOUT; return 0; } @@ -457,7 +457,7 @@ namespace Udpcap multicast_groups_.emplace(group_address); // Update the capture filters, so the devices will capture the multicast traffic - updateAllCaptureFilters(); // TODO: I probably need to protect the pcap_devices_ list with a mutex here + updateAllCaptureFilters(); if (multicast_loopback_enabled_) { @@ -493,7 +493,7 @@ namespace Udpcap multicast_groups_.erase(group_it); // Update all capture filtes - updateAllCaptureFilters(); // TODO: I probably need to protect the pcap_devices_ list with a mutex here + updateAllCaptureFilters(); return true; } @@ -514,7 +514,7 @@ namespace Udpcap kickstartLoopbackMulticast(); } - updateAllCaptureFilters(); // TODO: I probably need to protect the pcap_devices_ list with a mutex here + updateAllCaptureFilters(); } bool UdpcapSocketPrivate::isMulticastLoopbackEnabled() const @@ -524,9 +524,6 @@ namespace Udpcap void UdpcapSocketPrivate::close() { - // TODO: make close thread safe, so one thread can wait for data while another thread closes the socket - // TODO: 2024-01-30: Check if this now is actually thread safe - { // Lock the lists of open pcap devices in read-mode. We may use the handles, // but not modify the lists themselfes. This is in order to assure that the @@ -537,8 +534,8 @@ namespace Udpcap { // Lock the callback lock. While the callback is running, we cannot close // the pcap handle, as that may invalidate the data pointer. - const std::lock_guard pcap_callback_lock(pcap_devices_callback_mutex_); - pcap_devices_closed_ = true; //todo: must i protect this variable with the lists lock or the callback lock + const std::lock_guard pcap_devices_callback_lock(pcap_devices_callback_mutex_); + pcap_devices_closed_ = true; for (auto& pcap_dev : pcap_devices_) { LOG_DEBUG(std::string("Closing ") + pcap_dev.device_name_);