From afc5ad443fe9d2ab7bc808f8cff3ac2275e9755e Mon Sep 17 00:00:00 2001 From: Florian Reimold <11774314+FlorianReimold@users.noreply.github.com> Date: Tue, 30 Jan 2024 11:44:00 +0100 Subject: [PATCH] Started implementing npcap based receiver --- .gitmodules | 3 + CMakeLists.txt | 17 ++ ecaludp/CMakeLists.txt | 39 +++- .../ecaludp/socket_udpcap.h | 97 +++++++++ ecaludp/src/async_udpcap_socket.cpp | 117 ++++++++++ ecaludp/src/async_udpcap_socket.h | 83 +++++++ ecaludp/src/socket_udpcap.cpp | 191 ++++++++++++++++ tests/ecaludp_npcap_test/CMakeLists.txt | 44 ++++ .../src/atomic_signalable.h | 205 ++++++++++++++++++ .../src/ecaludp_npcap_socket_test.cpp | 183 ++++++++++++++++ tests/ecaludp_test/CMakeLists.txt | 4 +- thirdparty/build-udpcap.cmake | 26 +++ thirdparty/udpcap | 1 + thirdparty/udpcap-module/Findudpcap.cmake | 1 + 14 files changed, 1006 insertions(+), 5 deletions(-) create mode 100644 ecaludp/include_with_udpcap/ecaludp/socket_udpcap.h create mode 100644 ecaludp/src/async_udpcap_socket.cpp create mode 100644 ecaludp/src/async_udpcap_socket.h create mode 100644 ecaludp/src/socket_udpcap.cpp create mode 100644 tests/ecaludp_npcap_test/CMakeLists.txt create mode 100644 tests/ecaludp_npcap_test/src/atomic_signalable.h create mode 100644 tests/ecaludp_npcap_test/src/ecaludp_npcap_socket_test.cpp create mode 100644 thirdparty/build-udpcap.cmake create mode 160000 thirdparty/udpcap create mode 100644 thirdparty/udpcap-module/Findudpcap.cmake diff --git a/.gitmodules b/.gitmodules index 0a83e87..7e8cef8 100644 --- a/.gitmodules +++ b/.gitmodules @@ -7,3 +7,6 @@ [submodule "thirdparty/recycle"] path = thirdparty/recycle url = https://github.com/steinwurf/recycle.git +[submodule "thirdparty/udpcap"] + path = thirdparty/udpcap + url = https://github.com/eclipse-ecal/udpcap.git diff --git a/CMakeLists.txt b/CMakeLists.txt index af5a4ae..728363e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -29,6 +29,9 @@ message(STATUS "Module Path: ${CMAKE_MODULE_PATH}") message(STATUS "Prefix Path: ${CMAKE_PREFIX_PATH}") # CMake Options +option(ECALUDP_ENABLE_UDPCAP + "Enable the NPCAP based socket emulation to receive UDP data without actually opening a socket." + OFF) option(ECALUDP_BUILD_SAMPLES "Build project samples." ON) @@ -42,6 +45,11 @@ option(ECALUDP_USE_BUILTIN_ASIO option(ECALUDP_USE_BUILTIN_RECYCLE "Use the builtin steinwurf::recycle submodule. If set to OFF, recycle must be available from somewhere else (e.g. system libs)." ON) +cmake_dependent_option(ECALUDP_USE_BUILTIN_UDPCAP + "Use the builtin udpcap submodule. Only needed if ECALUDP_WITH_UDPCAP is ON. If set to OFF, udpcap must be available from somewhere else (e.g. system libs). Setting this option to ON will also use the default dependencies of udpcap (npcap-sdk, pcapplusplus)." + ON # Default value if dependency is met + "ECALUDP_ENABLE_UDPCAP" # Dependency + OFF) # Default value if dependency is not met cmake_dependent_option(ECALUDP_USE_BUILTIN_GTEST "Use the builtin GoogleTest submodule. Only needed if ECALUDP_BUILD_TESTS is ON. If set to OFF, GoogleTest must be available from somewhere else (e.g. system libs)." ON # Default value if dependency is met @@ -63,6 +71,11 @@ if (ECALUDP_USE_BUILTIN_RECYCLE) include("${CMAKE_CURRENT_LIST_DIR}/thirdparty/build-recycle.cmake") endif() +# Use builtin recycle +if (ECALUDP_USE_BUILTIN_UDPCAP) + include("${CMAKE_CURRENT_LIST_DIR}/thirdparty/build-udpcap.cmake") +endif() + # Use builtin gtest if (ECALUDP_USE_BUILTIN_GTEST) include("${CMAKE_CURRENT_LIST_DIR}/thirdparty/build-gtest.cmake") @@ -88,6 +101,10 @@ if (ECALUDP_BUILD_TESTS) enable_testing() add_subdirectory("${CMAKE_CURRENT_LIST_DIR}/tests/ecaludp_test") + if (ECALUDP_ENABLE_UDPCAP) + add_subdirectory("${CMAKE_CURRENT_LIST_DIR}/tests/ecaludp_npcap_test") + endif() + # Check if ecaludp is a static lib. We can only add the private tests for # static libs and object libs, as we need to have access to the private # implementation details. diff --git a/ecaludp/CMakeLists.txt b/ecaludp/CMakeLists.txt index b0a216e..6b0c4ed 100644 --- a/ecaludp/CMakeLists.txt +++ b/ecaludp/CMakeLists.txt @@ -28,18 +28,24 @@ set(CMAKE_VISIBILITY_INLINES_HIDDEN 1) find_package(asio REQUIRED) find_package(recycle REQUIRED) +message(STATUS "ECALUDP_ENABLE_UDPCAP: ${ECALUDP_ENABLE_UDPCAP}") +if(ECALUDP_ENABLE_UDPCAP) + find_package(udpcap REQUIRED) +endif() + # Include GenerateExportHeader that will create export macros for us include(GenerateExportHeader) -# Public API include directory +############################################### +# Normal library sources (non-npcap) +############################################### set (includes - include/ecaludp/error.h + include/ecaludp/error.h include/ecaludp/owning_buffer.h include/ecaludp/raw_memory.h include/ecaludp/socket.h ) -# Private source files set(sources src/socket.cpp src/protocol/datagram_builder_v5.cpp @@ -53,6 +59,20 @@ set(sources src/protocol/reassembly_v5.h ) +############################################### +# Sources for npcap enabled build +############################################### +list(APPEND includes + include_with_udpcap/ecaludp/socket_udpcap.h +) + +list(APPEND sources + src/socket_udpcap.cpp + src/async_udpcap_socket.cpp + src/async_udpcap_socket.h +) + + # Build as object library add_library (${PROJECT_NAME} ${ECALUDP_LIBRARY_TYPE} ${includes} @@ -79,12 +99,15 @@ target_link_libraries(${PROJECT_NAME} $ $<$:ws2_32> $<$:wsock32> + $<$:udpcap::udpcap> ) target_compile_definitions(${PROJECT_NAME} PRIVATE ASIO_STANDALONE _WIN32_WINNT=0x0601 + PUBLIC + $<$:ECALUDP_UDPCAP_ENABLED> ) # Check if ecaludp is a static lib. We can only add the private tests for @@ -117,6 +140,12 @@ target_include_directories(${PROJECT_NAME} "src/" ) +# udpcap enabled includes +target_include_directories(${PROJECT_NAME} + PUBLIC + $ +) + set_target_properties(${PROJECT_NAME} PROPERTIES OUTPUT_NAME ${PROJECT_NAME} FOLDER ecal/udp @@ -130,6 +159,10 @@ source_group(TREE "${CMAKE_CURRENT_SOURCE_DIR}" FILES ) + + + + ################################################################################ ### Installation rules ################################################################################ diff --git a/ecaludp/include_with_udpcap/ecaludp/socket_udpcap.h b/ecaludp/include_with_udpcap/ecaludp/socket_udpcap.h new file mode 100644 index 0000000..882d349 --- /dev/null +++ b/ecaludp/include_with_udpcap/ecaludp/socket_udpcap.h @@ -0,0 +1,97 @@ +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include + +#include + +namespace ecaludp +{ + namespace v5 + { + class Reassembly; + } + + class recycle_shared_pool; + class AsyncUdpcapSocket; + + class SocketUdpcap + { + ///////////////////////////////////////////////////////////////// + // Constructor + ///////////////////////////////////////////////////////////////// + public: + ECALUDP_EXPORT SocketUdpcap(std::array magic_header_bytes); + + // Destructor + ECALUDP_EXPORT ~SocketUdpcap(); + + // Disable copy constructor and assignment operator + SocketUdpcap(const SocketUdpcap&) = delete; + SocketUdpcap& operator=(const SocketUdpcap&) = delete; + + // Disable move constructor and assignment operator + SocketUdpcap(SocketUdpcap&&) = delete; + SocketUdpcap& operator=(SocketUdpcap&&) = delete; + + ///////////////////////////////////////////////////////////////// + // Settings + ///////////////////////////////////////////////////////////////// + public: + ECALUDP_EXPORT void set_max_reassembly_age(std::chrono::steady_clock::duration max_reassembly_age); + ECALUDP_EXPORT std::chrono::steady_clock::duration get_max_reassembly_age() const; + + ///////////////////////////////////////////////////////////////// + // API "Passthrough" (and a bit conversion to asio types) + ///////////////////////////////////////////////////////////////// + public: + bool is_valid() const; + bool bind(const asio::ip::udp::endpoint& sender_endpoint); + bool is_bound() const; + asio::ip::udp::endpoint local_endpoint(); + bool set_receive_buffer_size(int size); + bool has_pending_datagrams() const; + bool join_multicast_group(const asio::ip::address_v4& group_address); + bool leave_multicast_group(const asio::ip::address_v4& group_address); + void set_multicast_loopback_enabled(bool enabled); + bool is_multicast_loopback_enabled() const; + void close(); + + ///////////////////////////////////////////////////////////////// + // Receiving + ///////////////////////////////////////////////////////////////// + public: + ECALUDP_EXPORT void async_receive_from(asio::ip::udp::endpoint& sender_endpoint + , const std::function&, ecaludp::Error)>& completion_handler); + + + private: + void receive_next_datagram_from(asio::ip::udp::endpoint& sender_endpoint + , const std::function&, ecaludp::Error)>& completion_handler); + + std::shared_ptr handle_datagram(const std::shared_ptr& buffer + , const std::shared_ptr& sender_endpoint + , ecaludp::Error& error); + + ///////////////////////////////////////////////////////////////// + // Member Variables + ///////////////////////////////////////////////////////////////// + private: + std::unique_ptr socket_; ///< The "socket" implementation + + std::unique_ptr datagram_buffer_pool_; ///< A reusable buffer pool for single datagrams (i.e. tyically 1500 byte fragments) + std::unique_ptr reassembly_v5_; ///< The reassembly for the eCAL v5 protocol + + std::array magic_header_bytes_; ///< The magic bytes that are expected to start each fragment. If the received datagram doesn't have those, it will be dropped immediatelly + std::chrono::steady_clock::duration max_reassembly_age_; ///< Fragments that are stored in the reassembly for longer than that period will be dropped. + }; +} \ No newline at end of file diff --git a/ecaludp/src/async_udpcap_socket.cpp b/ecaludp/src/async_udpcap_socket.cpp new file mode 100644 index 0000000..9ee27e6 --- /dev/null +++ b/ecaludp/src/async_udpcap_socket.cpp @@ -0,0 +1,117 @@ +#include "async_udpcap_socket.h" + +#include // TODO: Remove + +namespace ecaludp +{ + AsyncUdpcapSocket::AsyncUdpcapSocket() + : udpcap_socket_() + , is_closed(false) + {} + + AsyncUdpcapSocket::~AsyncUdpcapSocket() + { + close(); + // TODO: Stop the socket and the wait thread + if (wait_thread_) + { + wait_thread_->join(); + } + + // TODO: implement + } + + void AsyncUdpcapSocket::asyncReceiveFrom( char* buffer + , size_t max_buffer_size + , Udpcap::HostAddress& sender_address + , uint16_t& sender_port + , const std::function& read_handler) + { + std::unique_lock lock(wait_thread_trigger_mutex_); + std::cerr << "===Pushing async receive from parameters" << std::endl; // TODO REMOVE + async_receive_from_parameters_queue_.push_back({ buffer, max_buffer_size, &sender_address, &sender_port, read_handler }); + wait_thread_trigger_cv_.notify_one(); + } + ///////////////////////////////////////////////////// + // udpcap forwarded methods + ///////////////////////////////////////////////////// + bool AsyncUdpcapSocket::bind(const Udpcap::HostAddress& local_address, uint16_t local_port) + { + bool success = udpcap_socket_.bind(local_address, local_port); + + if (success) + { + if (wait_thread_ && wait_thread_->joinable()) + { + wait_thread_->join(); + } + + is_closed = false; + + wait_thread_ = std::make_unique(&AsyncUdpcapSocket::waitForData, this); + } + + return success; + } + + void AsyncUdpcapSocket::close() + { + udpcap_socket_.close(); + { + std::lock_guard lock(wait_thread_trigger_mutex_); + is_closed = true; + wait_thread_trigger_cv_.notify_one(); + } + } + + void AsyncUdpcapSocket::waitForData() + { + while (true) + { + //TODO: Revise this function, it is very important, but currently not safe. + + AsyncReceiveFromParameters next_async_receive_from_parameters{}; + + // Wait until there is somebody requesting some data. This is done by waiting for the callback queue to be non-empty. + { + std::unique_lock lock(wait_thread_trigger_mutex_); + wait_thread_trigger_cv_.wait(lock, [this] { return is_closed || !async_receive_from_parameters_queue_.empty(); }); + + if (!async_receive_from_parameters_queue_.empty()) + { + next_async_receive_from_parameters = async_receive_from_parameters_queue_.front(); + async_receive_from_parameters_queue_.pop_front(); + } + + if (async_receive_from_parameters_queue_.empty() && is_closed) + { + return; + } + } + + if (udpcap_socket_.isBound()) + { + std::cerr << "===Start receiving data" << std::endl; // TODO REMOVE + size_t rec_bytes = udpcap_socket_.receiveDatagram(next_async_receive_from_parameters.buffer_ + , next_async_receive_from_parameters.max_buffer_size_ + , next_async_receive_from_parameters.sender_address_ + , next_async_receive_from_parameters.sender_port_); + + std::cerr << "===Received " << rec_bytes << " bytes from " << next_async_receive_from_parameters.sender_address_->toString() << ":" << std::to_string(*next_async_receive_from_parameters.sender_port_) << std::endl; // TODO REMOVE + + if (next_async_receive_from_parameters.sender_address_->isValid()) + { + next_async_receive_from_parameters.read_handler_(ecaludp::Error::OK, rec_bytes); + } + else + { + next_async_receive_from_parameters.read_handler_(ecaludp::Error::GENERIC_ERROR, rec_bytes); + } + } + else + { + next_async_receive_from_parameters.read_handler_(ecaludp::Error::GENERIC_ERROR, 0); + } + } + } +} diff --git a/ecaludp/src/async_udpcap_socket.h b/ecaludp/src/async_udpcap_socket.h new file mode 100644 index 0000000..00fa972 --- /dev/null +++ b/ecaludp/src/async_udpcap_socket.h @@ -0,0 +1,83 @@ +#include +#include +#include +#include +#include +#include + +#include + +#include + +namespace ecaludp +{ + class AsyncUdpcapSocket + { + ///////////////////////////////////////////////////// + // Constructor/Destructor + ///////////////////////////////////////////////////// + public: + AsyncUdpcapSocket(); + ~AsyncUdpcapSocket(); + + // Disable copy and move + AsyncUdpcapSocket(const AsyncUdpcapSocket&) = delete; + AsyncUdpcapSocket(AsyncUdpcapSocket&&) = delete; + AsyncUdpcapSocket& operator=(const AsyncUdpcapSocket&) = delete; + AsyncUdpcapSocket& operator=(AsyncUdpcapSocket&&) = delete; + + ///////////////////////////////////////////////////// + // udpcap forwarded methods + ///////////////////////////////////////////////////// + public: + bool isValid() const { return udpcap_socket_.isValid(); } + bool bind(const Udpcap::HostAddress& local_address, uint16_t local_port); // This also starts the wait thread for async receive + bool isBound() const { return udpcap_socket_.isBound(); } + Udpcap::HostAddress localAddress() const { return udpcap_socket_.localAddress(); } + uint16_t localPort() const { return udpcap_socket_.localPort(); } + bool setReceiveBufferSize(int size) { return udpcap_socket_.setReceiveBufferSize(size); } + bool hasPendingDatagrams() const { return udpcap_socket_.hasPendingDatagrams(); } + bool joinMulticastGroup(const Udpcap::HostAddress& group_address) { return udpcap_socket_.joinMulticastGroup(group_address); } + bool leaveMulticastGroup(const Udpcap::HostAddress& group_address) { return udpcap_socket_.leaveMulticastGroup(group_address); } + void setMulticastLoopbackEnabled(bool enabled) { udpcap_socket_.setMulticastLoopbackEnabled(enabled); } + bool isMulticastLoopbackEnabled() const { return udpcap_socket_.isMulticastLoopbackEnabled(); } + void close(); + + ///////////////////////////////////////////////////// + // Public Methods + ///////////////////////////////////////////////////// + public: + void asyncReceiveFrom( char* buffer + , size_t max_buffer_size + , Udpcap::HostAddress& sender_address + , uint16_t& sender_port + , const std::function& read_handler); + + ///////////////////////////////////////////////////// + // Wait thread function + ///////////////////////////////////////////////////// + private: + void waitForData(); + + ///////////////////////////////////////////////////// + // Member Variables + ///////////////////////////////////////////////////// + private: + struct AsyncReceiveFromParameters + { + char* buffer_; + size_t max_buffer_size_; + Udpcap::HostAddress* sender_address_; + uint16_t* sender_port_; + std::function read_handler_; + }; + + Udpcap::UdpcapSocket udpcap_socket_; + + std::unique_ptr wait_thread_; + std::mutex wait_thread_trigger_mutex_; + std::condition_variable wait_thread_trigger_cv_; + std::deque async_receive_from_parameters_queue_; + bool is_closed; + }; +} diff --git a/ecaludp/src/socket_udpcap.cpp b/ecaludp/src/socket_udpcap.cpp new file mode 100644 index 0000000..c5e72c8 --- /dev/null +++ b/ecaludp/src/socket_udpcap.cpp @@ -0,0 +1,191 @@ +#include + +#include "async_udpcap_socket.h" + +#include "protocol/header_common.h" +#include "protocol/reassembly_v5.h" + +#include // TODO: remove + +namespace ecaludp +{ + + struct buffer_pool_lock_policy_ + { + using mutex_type = std::mutex; + using lock_type = std::lock_guard; + }; + + class recycle_shared_pool : public recycle::shared_pool{}; + + ///////////////////////////////////////////////////////////////// + // Constructor + ///////////////////////////////////////////////////////////////// + SocketUdpcap::SocketUdpcap(std::array magic_header_bytes) + : socket_ (std::make_unique()) + , datagram_buffer_pool_(std::make_unique()) + , reassembly_v5_ (std::make_unique()) + , magic_header_bytes_ (magic_header_bytes) + , max_reassembly_age_ (std::chrono::seconds(5)) + {} + + // Destructor + SocketUdpcap::~SocketUdpcap() + {} + + ///////////////////////////////////////////////////////////////// + // Settings + ///////////////////////////////////////////////////////////////// + void SocketUdpcap::set_max_reassembly_age(std::chrono::steady_clock::duration max_reassembly_age) + { + max_reassembly_age_ = max_reassembly_age; + } + + std::chrono::steady_clock::duration SocketUdpcap::get_max_reassembly_age() const + { + return max_reassembly_age_; + } + + ///////////////////////////////////////////////////////////////// + // API "Passthrough" (and a bit conversion to asio types) + ///////////////////////////////////////////////////////////////// + bool SocketUdpcap::is_valid() const { return socket_->isValid(); } + bool SocketUdpcap::bind(const asio::ip::udp::endpoint& sender_endpoint) { return socket_->bind(Udpcap::HostAddress(sender_endpoint.address().to_string()), sender_endpoint.port()); } + bool SocketUdpcap::is_bound() const { return socket_->isBound(); } + asio::ip::udp::endpoint SocketUdpcap::local_endpoint() { return asio::ip::udp::endpoint(asio::ip::make_address(socket_->localAddress().toString()), socket_->localPort()); } + bool SocketUdpcap::set_receive_buffer_size(int size) { return socket_->setReceiveBufferSize(size); } + bool SocketUdpcap::has_pending_datagrams() const { return socket_->hasPendingDatagrams(); } + bool SocketUdpcap::join_multicast_group(const asio::ip::address_v4& group_address) { return socket_->joinMulticastGroup(Udpcap::HostAddress(group_address.to_string())); } + bool SocketUdpcap::leave_multicast_group(const asio::ip::address_v4& group_address) { return socket_->leaveMulticastGroup(Udpcap::HostAddress(group_address.to_string())); } + void SocketUdpcap::set_multicast_loopback_enabled(bool enabled) { socket_->setMulticastLoopbackEnabled(enabled); } + bool SocketUdpcap::is_multicast_loopback_enabled() const { return socket_->isMulticastLoopbackEnabled(); } + void SocketUdpcap::close() { socket_->close(); } + + ///////////////////////////////////////////////////////////////// + // Receiving + ///////////////////////////////////////////////////////////////// + + void SocketUdpcap::async_receive_from(asio::ip::udp::endpoint& sender_endpoint + , const std::function&, ecaludp::Error)>& completion_handler) + { + receive_next_datagram_from(sender_endpoint, completion_handler); + } + + void SocketUdpcap::receive_next_datagram_from(asio::ip::udp::endpoint& sender_endpoint + , const std::function&, ecaludp::Error)>& completion_handler) + + { + auto datagram_buffer = datagram_buffer_pool_->allocate(); + datagram_buffer->resize(65535, false); // Max UDP datagram size. Overprovisioning is not required here, so we safe some time and memory. + + auto buffer = datagram_buffer_pool_->allocate(); + + buffer->resize(65535); // max datagram size + + auto sender_address = std::make_shared(); + auto sender_port = std::make_shared(); + + socket_->asyncReceiveFrom(reinterpret_cast(buffer->data()) + , buffer->size() + , *sender_address + , *sender_port + , [this, buffer, completion_handler, sender_address, sender_port, &sender_endpoint](ecaludp::Error& error, std::size_t bytes_received) + { + if (error) + { + std::cerr << "Error receiving: " << error.ToString() << std::endl; // TODO: Remove + completion_handler(nullptr, error); + return; + } + // resize the buffer to the actually received size + buffer->resize(bytes_received); + + std::cout << "Received " << bytes_received << " bytes from " << sender_address->toString() << ":" << *sender_port << std::endl; + + // Convert sender address and port to asio + auto sender_endpoint_of_this_datagram = std::make_shared(asio::ip::make_address(sender_address->toString()), *sender_port); + + // Handle the datagram + ecaludp::Error datagam_handle_error = ecaludp::Error::ErrorCode::GENERIC_ERROR; + auto completed_package = this->handle_datagram(buffer, sender_endpoint_of_this_datagram, datagam_handle_error); + + // TODO: Remove + if (datagam_handle_error) + { + std::cerr << "Error handling datagram: " << datagam_handle_error.ToString() << std::endl; + return; + } + + if (completed_package != nullptr) + { + sender_endpoint = *sender_endpoint_of_this_datagram; + completion_handler(completed_package, datagam_handle_error); + } + else + { + // Receive the next datagram + receive_next_datagram_from(sender_endpoint, completion_handler); + } + }); + + } + + std::shared_ptr SocketUdpcap::handle_datagram(const std::shared_ptr& buffer + , const std::shared_ptr& sender_endpoint + , ecaludp::Error& error) + { + // TODO: This function is code duplication. + + // Clean the reassembly from fragments that are too old + reassembly_v5_->remove_old_packages(std::chrono::steady_clock::now() - max_reassembly_age_); + + // Start to parse the header + + if (buffer->size() < sizeof(ecaludp::HeaderCommon)) // Magic number + version + { + error = ecaludp::Error(ecaludp::Error::MALFORMED_DATAGRAM, "Datagram too small to contain common header (" + std::to_string(buffer->size()) + " bytes)"); + return nullptr; + } + + auto* header = reinterpret_cast(buffer->data()); + + // Check the magic number + if (strncmp(header->magic, magic_header_bytes_.data(), 4) != 0) + { + error = ecaludp::Error(ecaludp::Error::MALFORMED_DATAGRAM, "Wrong magic bytes"); + return nullptr; + } + + std::shared_ptr finished_package; + + // Check the version and invoke the correct handler + if (header->version == 5) + { + // TODO Remove + std::cerr << "===Start handling datagram from sender_endpoint " << sender_endpoint->address().to_string() << ":" << std::to_string(sender_endpoint->port()) << std::endl; + finished_package = reassembly_v5_->handle_datagram(buffer, sender_endpoint, error); + + if (finished_package) + { + std::cerr << "==============FINISHED PACKAGE================" << std::endl; // TODO Remove + } + } + else if (header->version == 6) + { + error = ecaludp::Error(Error::UNSUPPORTED_PROTOCOL_VERSION, std::to_string(header->version)); + //handle_datagram_v6(buffer); + } + else + { + error = ecaludp::Error(Error::UNSUPPORTED_PROTOCOL_VERSION, std::to_string(header->version)); + } + + if (error) + { + return nullptr; + } + + return finished_package; + } + +} \ No newline at end of file diff --git a/tests/ecaludp_npcap_test/CMakeLists.txt b/tests/ecaludp_npcap_test/CMakeLists.txt new file mode 100644 index 0000000..31373df --- /dev/null +++ b/tests/ecaludp_npcap_test/CMakeLists.txt @@ -0,0 +1,44 @@ +################################################################################ +# Copyright (c) 2024 Continental Corporation +# +# This program and the accompanying materials are made available under the +# terms of the Apache License, Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0. +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# SPDX-License-Identifier: Apache-2.0 +################################################################################ + +project(ecaludp_npcap_test) + +find_package(Threads REQUIRED) +find_package(GTest REQUIRED) +find_package(ecaludp REQUIRED) + +set(sources + src/atomic_signalable.h + src/ecaludp_npcap_socket_test.cpp +) + +message (STATUS "todo remove=========== CMAKE_SIZEOF_VOID_P ${CMAKE_SIZEOF_VOID_P}") + +add_executable(${PROJECT_NAME} ${sources}) + +target_link_libraries(${PROJECT_NAME} + PRIVATE + ecaludp::ecaludp + GTest::gtest_main) + +target_compile_features(${PROJECT_NAME} PRIVATE cxx_std_14) + +source_group(TREE "${CMAKE_CURRENT_SOURCE_DIR}" FILES + ${sources} +) + +include(GoogleTest) +gtest_discover_tests(${PROJECT_NAME}) \ No newline at end of file diff --git a/tests/ecaludp_npcap_test/src/atomic_signalable.h b/tests/ecaludp_npcap_test/src/atomic_signalable.h new file mode 100644 index 0000000..237a59d --- /dev/null +++ b/tests/ecaludp_npcap_test/src/atomic_signalable.h @@ -0,0 +1,205 @@ +/******************************************************************************** + * Copyright (c) 2024 Continental Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#include +#include +#include +#include + +template +class atomic_signalable +{ +public: + atomic_signalable(T initial_value) : value(initial_value) {} + + atomic_signalable& operator=(const T new_value) + { + std::lock_guard lock(mutex); + value = new_value; + cv.notify_all(); + return *this; + } + + T operator++() + { + std::lock_guard lock(mutex); + T newValue = ++value; + cv.notify_all(); + return newValue; + } + + T operator++(T) + { + std::lock_guard lock(mutex); + T oldValue = value++; + cv.notify_all(); + return oldValue; + } + + T operator--() + { + std::lock_guard lock(mutex); + T newValue = --value; + cv.notify_all(); + return newValue; + } + + T operator--(T) + { + std::lock_guard lock(mutex); + T oldValue = value--; + cv.notify_all(); + return oldValue; + } + + T operator+=(const T& other) + { + std::lock_guard lock(mutex); + value += other; + cv.notify_all(); + return value; + } + + T operator-=(const T& other) + { + std::lock_guard lock(mutex); + value -= other; + cv.notify_all(); + return value; + } + + T operator*=(const T& other) + { + std::lock_guard lock(mutex); + value *= other; + cv.notify_all(); + return value; + } + + T operator/=(const T& other) + { + std::lock_guard lock(mutex); + value /= other; + cv.notify_all(); + return value; + } + + T operator%=(const T& other) + { + std::lock_guard lock(mutex); + value %= other; + cv.notify_all(); + return value; + } + + template + bool wait_for(Predicate predicate, std::chrono::milliseconds timeout) + { + std::unique_lock lock(mutex); + return cv.wait_for(lock, timeout, [&]() { return predicate(value); }); + } + + T get() const + { + std::lock_guard lock(mutex); + return value; + } + + bool operator==(T other) const + { + std::lock_guard lock(mutex); + return value == other; + } + + bool operator==(const atomic_signalable& other) const + { + std::lock_guard lock_this(mutex); + std::lock_guard lock_other(other.mutex); + return value == other.value; + } + + bool operator!=(T other) const + { + std::lock_guard lock(mutex); + return value != other; + } + + bool operator<(T other) const + { + std::lock_guard lock(mutex); + return value < other; + } + + bool operator<=(T other) const + { + std::lock_guard lock(mutex); + return value <= other; + } + + bool operator>(T other) const + { + std::lock_guard lock(mutex); + return value > other; + } + + bool operator>=(T other) const + { + std::lock_guard lock(mutex); + return value >= other; + } + +private: + T value; + std::condition_variable cv; + mutable std::mutex mutex; +}; + + +template +bool operator==(const T& other, const atomic_signalable& atomic) +{ + return atomic == other; +} + +template +bool operator!=(const T& other, const atomic_signalable& atomic) +{ + return atomic != other; +} + +template +bool operator<(const T& other, const atomic_signalable& atomic) +{ + return atomic > other; +} + +template +bool operator<=(const T& other, const atomic_signalable& atomic) +{ + return atomic >= other; +} + +template +bool operator>(const T& other, const atomic_signalable& atomic) +{ + return atomic < other; +} + +template +bool operator>=(const T& other, const atomic_signalable& atomic) +{ + return atomic <= other; +} diff --git a/tests/ecaludp_npcap_test/src/ecaludp_npcap_socket_test.cpp b/tests/ecaludp_npcap_test/src/ecaludp_npcap_socket_test.cpp new file mode 100644 index 0000000..72611a2 --- /dev/null +++ b/tests/ecaludp_npcap_test/src/ecaludp_npcap_socket_test.cpp @@ -0,0 +1,183 @@ +/******************************************************************************** + * Copyright (c) 2024 Continental Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#include + +#include + +#include +#include + +#include "atomic_signalable.h" + +//TEST(EcalUdpNpcapSocket, RAII_unbound) +//{ +// // Create the socket and destroy it +// ecaludp::SocketUdpcap receiver_socket({'E', 'C', 'A', 'L'}); +//} +// +//TEST(EcalUdpNpcapSocket, RAII_bound) +//{ +// // Create the socket, bind and destroy it +// ecaludp::SocketUdpcap receiver_socket({'E', 'C', 'A', 'L'}); +// receiver_socket.bind(asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), 14000)); +// int todo_remove = 0; +//} +// +//TEST(EcalUdpNpcapSocket, RAII_close) +//{ +// // Create the socket, bind and close it +// ecaludp::SocketUdpcap receiver_socket({'E', 'C', 'A', 'L'}); +// receiver_socket.bind(asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), 14000)); +// receiver_socket.close(); +//} +// +//TEST(EcalUdpNpcapSocket, HelloWorldMessage) +//{ +// atomic_signalable received_messages(0); +// +// asio::io_context io_context; +// +// // Create the sockets +// ecaludp::Socket sender_socket (io_context, {'E', 'C', 'A', 'L'}); +// ecaludp::SocketUdpcap receiver_socket({'E', 'C', 'A', 'L'}); +// +// // Open the sender_socket +// { +// asio::error_code ec; +// sender_socket.open(asio::ip::udp::v4(), ec); +// ASSERT_EQ(ec, asio::error_code()); +// } +// +// // Bind the receiver_socket +// { +// bool success = receiver_socket.bind(asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), 14000)); +// ASSERT_EQ(success, true); +// } +// +// auto work = std::make_unique(io_context); +// std::thread io_thread([&io_context]() { io_context.run(); }); +// +// std::shared_ptr sender_endpoint = std::make_shared(); +// std::shared_ptr message_to_send = std::make_shared("Hello World!"); +// +// // Wait for the next message +// receiver_socket.async_receive_from(*sender_endpoint +// , [sender_endpoint, &received_messages, message_to_send](const std::shared_ptr& buffer, ecaludp::Error error) +// { +// // No error +// if (error) +// { +// FAIL(); +// } +// +// // compare the messages +// std::string received_string(static_cast(buffer->data()), buffer->size()); +// ASSERT_EQ(received_string, *message_to_send); +// +// // increment +// received_messages++; +// }); +// +// // Send a message +// sender_socket.async_send_to({ asio::buffer(*message_to_send) } +// , asio::ip::udp::endpoint(asio::ip::address_v4::loopback() +// , 14000) +// , [message_to_send](asio::error_code ec) +// { +// // No error +// ASSERT_EQ(ec, asio::error_code()); +// }); +// +// // Wait for the message to be received +// received_messages.wait_for([](int received_messages) { return received_messages == 1; }, std::chrono::milliseconds(100)); +// +// ASSERT_EQ(received_messages, 1); +// +// work.reset(); +// io_thread.join(); +//} + +TEST(EcalUdpSocket, BigMessage) +{ + atomic_signalable received_messages(0); + + asio::io_context io_context; + + // Create the sockets + ecaludp::Socket sender_socket (io_context, {'E', 'C', 'A', 'L'}); + ecaludp::SocketUdpcap receiver_socket({'E', 'C', 'A', 'L'}); + + // Open the sender_socket + { + asio::error_code ec; + sender_socket.open(asio::ip::udp::v4(), ec); + ASSERT_EQ(ec, asio::error_code()); + } + + // Bind the receiver_socket + { + bool success = receiver_socket.bind(asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), 14000)); + ASSERT_EQ(success, true); + } + + receiver_socket.set_receive_buffer_size(1024 * 1024 * 10); // 10 MB + + auto work = std::make_unique(io_context); + std::thread io_thread([&io_context]() { io_context.run(); }); + + std::shared_ptr sender_endpoint = std::make_shared(); + std::shared_ptr message_to_send = std::make_shared(1024 * 2, 'a'); + + // Fill the message with random characters + std::generate(message_to_send->begin(), message_to_send->end(), []() { return static_cast(std::rand()); }); + + // Wait for the next message + receiver_socket.async_receive_from(*sender_endpoint + , [sender_endpoint, &received_messages, message_to_send](const std::shared_ptr& buffer, ecaludp::Error error) + { + // No error + if (error) + { + FAIL(); + } + + // compare the messages + std::string received_string(static_cast(buffer->data()), buffer->size()); + ASSERT_EQ(received_string, *message_to_send); + + // increment + received_messages++; + }); + + // Send a message + sender_socket.async_send_to({ asio::buffer(*message_to_send) } + , asio::ip::udp::endpoint(asio::ip::address_v4::loopback() + , 14000) + , [message_to_send](asio::error_code ec) + { + // No error + ASSERT_EQ(ec, asio::error_code()); + }); + + // Wait for the message to be received + received_messages.wait_for([](int received_messages) { return received_messages == 1; }, std::chrono::milliseconds(1000)); + + ASSERT_EQ(received_messages, 1); + + work.reset(); + io_thread.join(); +} \ No newline at end of file diff --git a/tests/ecaludp_test/CMakeLists.txt b/tests/ecaludp_test/CMakeLists.txt index 729f3a0..3ccd331 100644 --- a/tests/ecaludp_test/CMakeLists.txt +++ b/tests/ecaludp_test/CMakeLists.txt @@ -14,7 +14,7 @@ # SPDX-License-Identifier: Apache-2.0 ################################################################################ -project(ecal_udp_test) +project(ecaludp_test) find_package(Threads REQUIRED) find_package(GTest REQUIRED) @@ -29,7 +29,7 @@ add_executable(${PROJECT_NAME} ${sources}) target_link_libraries(${PROJECT_NAME} PRIVATE - ecaludp + ecaludp::ecaludp GTest::gtest_main) target_compile_features(${PROJECT_NAME} PRIVATE cxx_std_14) diff --git a/thirdparty/build-udpcap.cmake b/thirdparty/build-udpcap.cmake new file mode 100644 index 0000000..d0d0d3b --- /dev/null +++ b/thirdparty/build-udpcap.cmake @@ -0,0 +1,26 @@ +# Check if Npcap / Pcap++ are available as pre-downloaded .zip files +if(EXISTS "${CMAKE_CURRENT_LIST_DIR}/npcap/npcap-sdk.zip") + set(NPCAP_SDK_ARCHIVE_URL "${CMAKE_CURRENT_LIST_DIR}/npcap/npcap-sdk.zip") +endif() +if(EXISTS "${CMAKE_CURRENT_LIST_DIR}/npcap/pcapplusplus.zip") + set(PCAPPLUSPLUS_ARCHIVE_URL "${CMAKE_CURRENT_LIST_DIR}/npcap/pcapplusplus.zip") +endif() + +# Build as static library +set(BUILD_SHARED_LIBS_OLD ${BUILD_SHARED_LIBS}) +set(BUILD_SHARED_LIBS OFF) + +# Let udpcap pull the dependencies +set(UDPCAP_BUILD_SAMPLES OFF) +set(UDPCAP_THIRDPARTY_ENABLED ON) +set(UDPCAP_THIRDPARTY_USE_BUILTIN_ASIO OFF) + +# Add udpcap library from subdirectory +add_subdirectory("${CMAKE_CURRENT_LIST_DIR}/udpcap" EXCLUDE_FROM_ALL) +add_library(udpcap::udpcap ALIAS udpcap) + +# Reset static / shared libs to old value +set(BUILD_SHARED_LIBS ${BUILD_SHARED_LIBS_OLD}) +unset(BUILD_SHARED_LIBS_OLD) + +list(PREPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/udpcap-module") \ No newline at end of file diff --git a/thirdparty/udpcap b/thirdparty/udpcap new file mode 160000 index 0000000..4946dc2 --- /dev/null +++ b/thirdparty/udpcap @@ -0,0 +1 @@ +Subproject commit 4946dc27595179803f0deab72bad0fc9e0cf9515 diff --git a/thirdparty/udpcap-module/Findudpcap.cmake b/thirdparty/udpcap-module/Findudpcap.cmake new file mode 100644 index 0000000..5a86153 --- /dev/null +++ b/thirdparty/udpcap-module/Findudpcap.cmake @@ -0,0 +1 @@ +set(udpcap_FOUND TRUE) \ No newline at end of file