diff --git a/ecaludp/src/async_udpcap_socket.cpp b/ecaludp/src/async_udpcap_socket.cpp index 9ee27e6..f3ffe9a 100644 --- a/ecaludp/src/async_udpcap_socket.cpp +++ b/ecaludp/src/async_udpcap_socket.cpp @@ -28,7 +28,6 @@ namespace ecaludp , const std::function& read_handler) { std::unique_lock lock(wait_thread_trigger_mutex_); - std::cerr << "===Pushing async receive from parameters" << std::endl; // TODO REMOVE async_receive_from_parameters_queue_.push_back({ buffer, max_buffer_size, &sender_address, &sender_port, read_handler }); wait_thread_trigger_cv_.notify_one(); } @@ -75,6 +74,7 @@ namespace ecaludp // Wait until there is somebody requesting some data. This is done by waiting for the callback queue to be non-empty. { std::unique_lock lock(wait_thread_trigger_mutex_); + wait_thread_trigger_cv_.wait(lock, [this] { return is_closed || !async_receive_from_parameters_queue_.empty(); }); if (!async_receive_from_parameters_queue_.empty()) @@ -91,21 +91,21 @@ namespace ecaludp if (udpcap_socket_.isBound()) { - std::cerr << "===Start receiving data" << std::endl; // TODO REMOVE + Udpcap::Error error = Udpcap::Error::GENERIC_ERROR; size_t rec_bytes = udpcap_socket_.receiveDatagram(next_async_receive_from_parameters.buffer_ , next_async_receive_from_parameters.max_buffer_size_ , next_async_receive_from_parameters.sender_address_ - , next_async_receive_from_parameters.sender_port_); + , next_async_receive_from_parameters.sender_port_ + , error); - std::cerr << "===Received " << rec_bytes << " bytes from " << next_async_receive_from_parameters.sender_address_->toString() << ":" << std::to_string(*next_async_receive_from_parameters.sender_port_) << std::endl; // TODO REMOVE - - if (next_async_receive_from_parameters.sender_address_->isValid()) + if (error) { - next_async_receive_from_parameters.read_handler_(ecaludp::Error::OK, rec_bytes); + // TODO: Properly translate from Udpcap::Error to ecaludp::Error + next_async_receive_from_parameters.read_handler_(ecaludp::Error::GENERIC_ERROR, rec_bytes); } else { - next_async_receive_from_parameters.read_handler_(ecaludp::Error::GENERIC_ERROR, rec_bytes); + next_async_receive_from_parameters.read_handler_(ecaludp::Error::OK, rec_bytes); } } else diff --git a/ecaludp/src/async_udpcap_socket.h b/ecaludp/src/async_udpcap_socket.h index 00fa972..94e9371 100644 --- a/ecaludp/src/async_udpcap_socket.h +++ b/ecaludp/src/async_udpcap_socket.h @@ -36,7 +36,6 @@ namespace ecaludp Udpcap::HostAddress localAddress() const { return udpcap_socket_.localAddress(); } uint16_t localPort() const { return udpcap_socket_.localPort(); } bool setReceiveBufferSize(int size) { return udpcap_socket_.setReceiveBufferSize(size); } - bool hasPendingDatagrams() const { return udpcap_socket_.hasPendingDatagrams(); } bool joinMulticastGroup(const Udpcap::HostAddress& group_address) { return udpcap_socket_.joinMulticastGroup(group_address); } bool leaveMulticastGroup(const Udpcap::HostAddress& group_address) { return udpcap_socket_.leaveMulticastGroup(group_address); } void setMulticastLoopbackEnabled(bool enabled) { udpcap_socket_.setMulticastLoopbackEnabled(enabled); } diff --git a/ecaludp/src/socket.cpp b/ecaludp/src/socket.cpp index 26fae85..961e1ac 100644 --- a/ecaludp/src/socket.cpp +++ b/ecaludp/src/socket.cpp @@ -77,14 +77,13 @@ namespace ecaludp class recycle_shared_pool : public recycle::shared_pool{}; - Socket::Socket(asio::io_service& io_service, std::array magic_header_bytes) - : socket_ (io_service) - , datagram_buffer_pool_(new recycle_shared_pool()) // TODO: make_unique - , reassembly_v5_ (new ecaludp::v5::Reassembly()) - , magic_header_bytes_ (magic_header_bytes) + : socket_ (io_service) + , datagram_buffer_pool_ (std::make_unique()) + , reassembly_v5_ (std::make_unique()) + , magic_header_bytes_ (magic_header_bytes) , max_udp_datagram_size_(1448) - , max_reassembly_age_ (std::chrono::seconds(5)) + , max_reassembly_age_ (std::chrono::seconds(5)) {} Socket::~Socket() = default; diff --git a/ecaludp/src/socket_udpcap.cpp b/ecaludp/src/socket_udpcap.cpp index c5e72c8..82f3758 100644 --- a/ecaludp/src/socket_udpcap.cpp +++ b/ecaludp/src/socket_udpcap.cpp @@ -54,7 +54,6 @@ namespace ecaludp bool SocketUdpcap::is_bound() const { return socket_->isBound(); } asio::ip::udp::endpoint SocketUdpcap::local_endpoint() { return asio::ip::udp::endpoint(asio::ip::make_address(socket_->localAddress().toString()), socket_->localPort()); } bool SocketUdpcap::set_receive_buffer_size(int size) { return socket_->setReceiveBufferSize(size); } - bool SocketUdpcap::has_pending_datagrams() const { return socket_->hasPendingDatagrams(); } bool SocketUdpcap::join_multicast_group(const asio::ip::address_v4& group_address) { return socket_->joinMulticastGroup(Udpcap::HostAddress(group_address.to_string())); } bool SocketUdpcap::leave_multicast_group(const asio::ip::address_v4& group_address) { return socket_->leaveMulticastGroup(Udpcap::HostAddress(group_address.to_string())); } void SocketUdpcap::set_multicast_loopback_enabled(bool enabled) { socket_->setMulticastLoopbackEnabled(enabled); } @@ -100,8 +99,6 @@ namespace ecaludp // resize the buffer to the actually received size buffer->resize(bytes_received); - std::cout << "Received " << bytes_received << " bytes from " << sender_address->toString() << ":" << *sender_port << std::endl; - // Convert sender address and port to asio auto sender_endpoint_of_this_datagram = std::make_shared(asio::ip::make_address(sender_address->toString()), *sender_port); @@ -161,14 +158,7 @@ namespace ecaludp // Check the version and invoke the correct handler if (header->version == 5) { - // TODO Remove - std::cerr << "===Start handling datagram from sender_endpoint " << sender_endpoint->address().to_string() << ":" << std::to_string(sender_endpoint->port()) << std::endl; finished_package = reassembly_v5_->handle_datagram(buffer, sender_endpoint, error); - - if (finished_package) - { - std::cerr << "==============FINISHED PACKAGE================" << std::endl; // TODO Remove - } } else if (header->version == 6) { diff --git a/tests/ecaludp_npcap_test/CMakeLists.txt b/tests/ecaludp_npcap_test/CMakeLists.txt index 31373df..5447d2a 100644 --- a/tests/ecaludp_npcap_test/CMakeLists.txt +++ b/tests/ecaludp_npcap_test/CMakeLists.txt @@ -25,8 +25,6 @@ set(sources src/ecaludp_npcap_socket_test.cpp ) -message (STATUS "todo remove=========== CMAKE_SIZEOF_VOID_P ${CMAKE_SIZEOF_VOID_P}") - add_executable(${PROJECT_NAME} ${sources}) target_link_libraries(${PROJECT_NAME} diff --git a/tests/ecaludp_npcap_test/src/ecaludp_npcap_socket_test.cpp b/tests/ecaludp_npcap_test/src/ecaludp_npcap_socket_test.cpp index 72611a2..46297bc 100644 --- a/tests/ecaludp_npcap_test/src/ecaludp_npcap_socket_test.cpp +++ b/tests/ecaludp_npcap_test/src/ecaludp_npcap_socket_test.cpp @@ -23,95 +23,28 @@ #include "atomic_signalable.h" -//TEST(EcalUdpNpcapSocket, RAII_unbound) -//{ -// // Create the socket and destroy it -// ecaludp::SocketUdpcap receiver_socket({'E', 'C', 'A', 'L'}); -//} -// -//TEST(EcalUdpNpcapSocket, RAII_bound) -//{ -// // Create the socket, bind and destroy it -// ecaludp::SocketUdpcap receiver_socket({'E', 'C', 'A', 'L'}); -// receiver_socket.bind(asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), 14000)); -// int todo_remove = 0; -//} -// -//TEST(EcalUdpNpcapSocket, RAII_close) -//{ -// // Create the socket, bind and close it -// ecaludp::SocketUdpcap receiver_socket({'E', 'C', 'A', 'L'}); -// receiver_socket.bind(asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), 14000)); -// receiver_socket.close(); -//} -// -//TEST(EcalUdpNpcapSocket, HelloWorldMessage) -//{ -// atomic_signalable received_messages(0); -// -// asio::io_context io_context; -// -// // Create the sockets -// ecaludp::Socket sender_socket (io_context, {'E', 'C', 'A', 'L'}); -// ecaludp::SocketUdpcap receiver_socket({'E', 'C', 'A', 'L'}); -// -// // Open the sender_socket -// { -// asio::error_code ec; -// sender_socket.open(asio::ip::udp::v4(), ec); -// ASSERT_EQ(ec, asio::error_code()); -// } -// -// // Bind the receiver_socket -// { -// bool success = receiver_socket.bind(asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), 14000)); -// ASSERT_EQ(success, true); -// } -// -// auto work = std::make_unique(io_context); -// std::thread io_thread([&io_context]() { io_context.run(); }); -// -// std::shared_ptr sender_endpoint = std::make_shared(); -// std::shared_ptr message_to_send = std::make_shared("Hello World!"); -// -// // Wait for the next message -// receiver_socket.async_receive_from(*sender_endpoint -// , [sender_endpoint, &received_messages, message_to_send](const std::shared_ptr& buffer, ecaludp::Error error) -// { -// // No error -// if (error) -// { -// FAIL(); -// } -// -// // compare the messages -// std::string received_string(static_cast(buffer->data()), buffer->size()); -// ASSERT_EQ(received_string, *message_to_send); -// -// // increment -// received_messages++; -// }); -// -// // Send a message -// sender_socket.async_send_to({ asio::buffer(*message_to_send) } -// , asio::ip::udp::endpoint(asio::ip::address_v4::loopback() -// , 14000) -// , [message_to_send](asio::error_code ec) -// { -// // No error -// ASSERT_EQ(ec, asio::error_code()); -// }); -// -// // Wait for the message to be received -// received_messages.wait_for([](int received_messages) { return received_messages == 1; }, std::chrono::milliseconds(100)); -// -// ASSERT_EQ(received_messages, 1); -// -// work.reset(); -// io_thread.join(); -//} +TEST(EcalUdpNpcapSocket, RAII_unbound) +{ + // Create the socket and destroy it + ecaludp::SocketUdpcap receiver_socket({'E', 'C', 'A', 'L'}); +} -TEST(EcalUdpSocket, BigMessage) +TEST(EcalUdpNpcapSocket, RAII_bound) +{ + // Create the socket, bind and destroy it + ecaludp::SocketUdpcap receiver_socket({'E', 'C', 'A', 'L'}); + receiver_socket.bind(asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), 14000)); +} + +TEST(EcalUdpNpcapSocket, RAII_close) +{ + // Create the socket, bind and close it + ecaludp::SocketUdpcap receiver_socket({'E', 'C', 'A', 'L'}); + receiver_socket.bind(asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), 14000)); + receiver_socket.close(); +} + +TEST(EcalUdpNpcapSocket, HelloWorldMessage) { atomic_signalable received_messages(0); @@ -134,13 +67,78 @@ TEST(EcalUdpSocket, BigMessage) ASSERT_EQ(success, true); } - receiver_socket.set_receive_buffer_size(1024 * 1024 * 10); // 10 MB + auto work = std::make_unique(io_context); + std::thread io_thread([&io_context]() { io_context.run(); }); + + std::shared_ptr sender_endpoint = std::make_shared(); + std::shared_ptr message_to_send = std::make_shared("Hello World!"); + + // Wait for the next message + receiver_socket.async_receive_from(*sender_endpoint + , [sender_endpoint, &received_messages, message_to_send](const std::shared_ptr& buffer, ecaludp::Error error) + { + // No error + if (error) + { + FAIL(); + } + + // compare the messages + std::string received_string(static_cast(buffer->data()), buffer->size()); + ASSERT_EQ(received_string, *message_to_send); + + // increment + received_messages++; + }); + + // Send a message + sender_socket.async_send_to({ asio::buffer(*message_to_send) } + , asio::ip::udp::endpoint(asio::ip::address_v4::loopback() + , 14000) + , [message_to_send](asio::error_code ec) + { + // No error + ASSERT_EQ(ec, asio::error_code()); + }); + + // Wait for the message to be received + received_messages.wait_for([](int received_messages) { return received_messages == 1; }, std::chrono::milliseconds(100)); + + ASSERT_EQ(received_messages, 1); + + work.reset(); + io_thread.join(); +} + +TEST(EcalUdpSocket, BigMessage) +{ + constexpr int message_size = 1024 * 100; + atomic_signalable received_messages(0); + + asio::io_context io_context; + + // Create the sockets + ecaludp::Socket sender_socket (io_context, {'E', 'C', 'A', 'L'}); + ecaludp::SocketUdpcap receiver_socket({'E', 'C', 'A', 'L'}); + + // Open the sender_socket + { + asio::error_code ec; + sender_socket.open(asio::ip::udp::v4(), ec); + ASSERT_EQ(ec, asio::error_code()); + } + + // Bind the receiver_socket + { + bool success = receiver_socket.bind(asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), 14000)); + ASSERT_EQ(success, true); + } auto work = std::make_unique(io_context); std::thread io_thread([&io_context]() { io_context.run(); }); std::shared_ptr sender_endpoint = std::make_shared(); - std::shared_ptr message_to_send = std::make_shared(1024 * 2, 'a'); + std::shared_ptr message_to_send = std::make_shared(message_size, 'a'); // Fill the message with random characters std::generate(message_to_send->begin(), message_to_send->end(), []() { return static_cast(std::rand()); }); diff --git a/thirdparty/udpcap b/thirdparty/udpcap index 4946dc2..25b05eb 160000 --- a/thirdparty/udpcap +++ b/thirdparty/udpcap @@ -1 +1 @@ -Subproject commit 4946dc27595179803f0deab72bad0fc9e0cf9515 +Subproject commit 25b05ebaa9bcc0ebc6599d24d6c486ec32dcbc3f