Skip to content

Commit

Permalink
Started implementing npcap based receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
FlorianReimold committed Jan 30, 2024
1 parent 655bbf1 commit afc5ad4
Show file tree
Hide file tree
Showing 14 changed files with 1,006 additions and 5 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@
[submodule "thirdparty/recycle"]
path = thirdparty/recycle
url = https://github.com/steinwurf/recycle.git
[submodule "thirdparty/udpcap"]
path = thirdparty/udpcap
url = https://github.com/eclipse-ecal/udpcap.git
17 changes: 17 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ message(STATUS "Module Path: ${CMAKE_MODULE_PATH}")
message(STATUS "Prefix Path: ${CMAKE_PREFIX_PATH}")

# CMake Options
option(ECALUDP_ENABLE_UDPCAP
"Enable the NPCAP based socket emulation to receive UDP data without actually opening a socket."
OFF)
option(ECALUDP_BUILD_SAMPLES
"Build project samples."
ON)
Expand All @@ -42,6 +45,11 @@ option(ECALUDP_USE_BUILTIN_ASIO
option(ECALUDP_USE_BUILTIN_RECYCLE
"Use the builtin steinwurf::recycle submodule. If set to OFF, recycle must be available from somewhere else (e.g. system libs)."
ON)
cmake_dependent_option(ECALUDP_USE_BUILTIN_UDPCAP
"Use the builtin udpcap submodule. Only needed if ECALUDP_WITH_UDPCAP is ON. If set to OFF, udpcap must be available from somewhere else (e.g. system libs). Setting this option to ON will also use the default dependencies of udpcap (npcap-sdk, pcapplusplus)."
ON # Default value if dependency is met
"ECALUDP_ENABLE_UDPCAP" # Dependency
OFF) # Default value if dependency is not met
cmake_dependent_option(ECALUDP_USE_BUILTIN_GTEST
"Use the builtin GoogleTest submodule. Only needed if ECALUDP_BUILD_TESTS is ON. If set to OFF, GoogleTest must be available from somewhere else (e.g. system libs)."
ON # Default value if dependency is met
Expand All @@ -63,6 +71,11 @@ if (ECALUDP_USE_BUILTIN_RECYCLE)
include("${CMAKE_CURRENT_LIST_DIR}/thirdparty/build-recycle.cmake")
endif()

# Use builtin recycle
if (ECALUDP_USE_BUILTIN_UDPCAP)
include("${CMAKE_CURRENT_LIST_DIR}/thirdparty/build-udpcap.cmake")
endif()

# Use builtin gtest
if (ECALUDP_USE_BUILTIN_GTEST)
include("${CMAKE_CURRENT_LIST_DIR}/thirdparty/build-gtest.cmake")
Expand All @@ -88,6 +101,10 @@ if (ECALUDP_BUILD_TESTS)
enable_testing()
add_subdirectory("${CMAKE_CURRENT_LIST_DIR}/tests/ecaludp_test")

if (ECALUDP_ENABLE_UDPCAP)
add_subdirectory("${CMAKE_CURRENT_LIST_DIR}/tests/ecaludp_npcap_test")
endif()

# Check if ecaludp is a static lib. We can only add the private tests for
# static libs and object libs, as we need to have access to the private
# implementation details.
Expand Down
39 changes: 36 additions & 3 deletions ecaludp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,24 @@ set(CMAKE_VISIBILITY_INLINES_HIDDEN 1)
find_package(asio REQUIRED)
find_package(recycle REQUIRED)

message(STATUS "ECALUDP_ENABLE_UDPCAP: ${ECALUDP_ENABLE_UDPCAP}")
if(ECALUDP_ENABLE_UDPCAP)
find_package(udpcap REQUIRED)
endif()

# Include GenerateExportHeader that will create export macros for us
include(GenerateExportHeader)

# Public API include directory
###############################################
# Normal library sources (non-npcap)
###############################################
set (includes
include/ecaludp/error.h
include/ecaludp/error.h
include/ecaludp/owning_buffer.h
include/ecaludp/raw_memory.h
include/ecaludp/socket.h
)

# Private source files
set(sources
src/socket.cpp
src/protocol/datagram_builder_v5.cpp
Expand All @@ -53,6 +59,20 @@ set(sources
src/protocol/reassembly_v5.h
)

###############################################
# Sources for npcap enabled build
###############################################
list(APPEND includes
include_with_udpcap/ecaludp/socket_udpcap.h
)

list(APPEND sources
src/socket_udpcap.cpp
src/async_udpcap_socket.cpp
src/async_udpcap_socket.h
)


# Build as object library
add_library (${PROJECT_NAME} ${ECALUDP_LIBRARY_TYPE}
${includes}
Expand All @@ -79,12 +99,15 @@ target_link_libraries(${PROJECT_NAME}
$<BUILD_INTERFACE:steinwurf::recycle>
$<$<BOOL:${WIN32}>:ws2_32>
$<$<BOOL:${WIN32}>:wsock32>
$<$<BOOL:${ECALUDP_ENABLE_UDPCAP}>:udpcap::udpcap>
)

target_compile_definitions(${PROJECT_NAME}
PRIVATE
ASIO_STANDALONE
_WIN32_WINNT=0x0601
PUBLIC
$<$<BOOL:${ECALUDP_ENABLE_UDPCAP}>:ECALUDP_UDPCAP_ENABLED>
)

# Check if ecaludp is a static lib. We can only add the private tests for
Expand Down Expand Up @@ -117,6 +140,12 @@ target_include_directories(${PROJECT_NAME}
"src/"
)

# udpcap enabled includes
target_include_directories(${PROJECT_NAME}
PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include_with_udpcap>
)

set_target_properties(${PROJECT_NAME} PROPERTIES
OUTPUT_NAME ${PROJECT_NAME}
FOLDER ecal/udp
Expand All @@ -130,6 +159,10 @@ source_group(TREE "${CMAKE_CURRENT_SOURCE_DIR}" FILES
)






################################################################################
### Installation rules
################################################################################
Expand Down
97 changes: 97 additions & 0 deletions ecaludp/include_with_udpcap/ecaludp/socket_udpcap.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#include <array>
#include <cstddef>
#include <functional>
#include <memory>
#include <chrono>
#include <thread>

#include <asio.hpp>

#include <ecaludp/error.h>
#include <ecaludp/owning_buffer.h>
#include <ecaludp/raw_memory.h>
#include <vector>

#include <ecaludp/ecaludp_export.h>

namespace ecaludp
{
namespace v5
{
class Reassembly;
}

class recycle_shared_pool;
class AsyncUdpcapSocket;

class SocketUdpcap
{
/////////////////////////////////////////////////////////////////
// Constructor
/////////////////////////////////////////////////////////////////
public:
ECALUDP_EXPORT SocketUdpcap(std::array<char, 4> magic_header_bytes);

// Destructor
ECALUDP_EXPORT ~SocketUdpcap();

// Disable copy constructor and assignment operator
SocketUdpcap(const SocketUdpcap&) = delete;
SocketUdpcap& operator=(const SocketUdpcap&) = delete;

// Disable move constructor and assignment operator
SocketUdpcap(SocketUdpcap&&) = delete;
SocketUdpcap& operator=(SocketUdpcap&&) = delete;

/////////////////////////////////////////////////////////////////
// Settings
/////////////////////////////////////////////////////////////////
public:
ECALUDP_EXPORT void set_max_reassembly_age(std::chrono::steady_clock::duration max_reassembly_age);
ECALUDP_EXPORT std::chrono::steady_clock::duration get_max_reassembly_age() const;

/////////////////////////////////////////////////////////////////
// API "Passthrough" (and a bit conversion to asio types)
/////////////////////////////////////////////////////////////////
public:
bool is_valid() const;
bool bind(const asio::ip::udp::endpoint& sender_endpoint);
bool is_bound() const;
asio::ip::udp::endpoint local_endpoint();
bool set_receive_buffer_size(int size);
bool has_pending_datagrams() const;
bool join_multicast_group(const asio::ip::address_v4& group_address);
bool leave_multicast_group(const asio::ip::address_v4& group_address);
void set_multicast_loopback_enabled(bool enabled);
bool is_multicast_loopback_enabled() const;
void close();

/////////////////////////////////////////////////////////////////
// Receiving
/////////////////////////////////////////////////////////////////
public:
ECALUDP_EXPORT void async_receive_from(asio::ip::udp::endpoint& sender_endpoint
, const std::function<void(const std::shared_ptr<ecaludp::OwningBuffer>&, ecaludp::Error)>& completion_handler);


private:
void receive_next_datagram_from(asio::ip::udp::endpoint& sender_endpoint
, const std::function<void(const std::shared_ptr<ecaludp::OwningBuffer>&, ecaludp::Error)>& completion_handler);

std::shared_ptr<ecaludp::OwningBuffer> handle_datagram(const std::shared_ptr<ecaludp::RawMemory>& buffer
, const std::shared_ptr<asio::ip::udp::endpoint>& sender_endpoint
, ecaludp::Error& error);

/////////////////////////////////////////////////////////////////
// Member Variables
/////////////////////////////////////////////////////////////////
private:
std::unique_ptr<ecaludp::AsyncUdpcapSocket> socket_; ///< The "socket" implementation

std::unique_ptr<recycle_shared_pool> datagram_buffer_pool_; ///< A reusable buffer pool for single datagrams (i.e. tyically 1500 byte fragments)
std::unique_ptr<ecaludp::v5::Reassembly> reassembly_v5_; ///< The reassembly for the eCAL v5 protocol

std::array<char, 4> magic_header_bytes_; ///< The magic bytes that are expected to start each fragment. If the received datagram doesn't have those, it will be dropped immediatelly
std::chrono::steady_clock::duration max_reassembly_age_; ///< Fragments that are stored in the reassembly for longer than that period will be dropped.
};
}
117 changes: 117 additions & 0 deletions ecaludp/src/async_udpcap_socket.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#include "async_udpcap_socket.h"

#include <iostream> // TODO: Remove

namespace ecaludp
{
AsyncUdpcapSocket::AsyncUdpcapSocket()
: udpcap_socket_()
, is_closed(false)
{}

AsyncUdpcapSocket::~AsyncUdpcapSocket()
{
close();
// TODO: Stop the socket and the wait thread
if (wait_thread_)
{
wait_thread_->join();
}

// TODO: implement
}

void AsyncUdpcapSocket::asyncReceiveFrom( char* buffer
, size_t max_buffer_size
, Udpcap::HostAddress& sender_address
, uint16_t& sender_port
, const std::function<void(ecaludp::Error, size_t)>& read_handler)
{
std::unique_lock<std::mutex> lock(wait_thread_trigger_mutex_);
std::cerr << "===Pushing async receive from parameters" << std::endl; // TODO REMOVE
async_receive_from_parameters_queue_.push_back({ buffer, max_buffer_size, &sender_address, &sender_port, read_handler });
wait_thread_trigger_cv_.notify_one();
}
/////////////////////////////////////////////////////
// udpcap forwarded methods
/////////////////////////////////////////////////////
bool AsyncUdpcapSocket::bind(const Udpcap::HostAddress& local_address, uint16_t local_port)
{
bool success = udpcap_socket_.bind(local_address, local_port);

if (success)
{
if (wait_thread_ && wait_thread_->joinable())
{
wait_thread_->join();
}

is_closed = false;

wait_thread_ = std::make_unique<std::thread>(&AsyncUdpcapSocket::waitForData, this);
}

return success;
}

void AsyncUdpcapSocket::close()
{
udpcap_socket_.close();
{
std::lock_guard<std::mutex> lock(wait_thread_trigger_mutex_);
is_closed = true;
wait_thread_trigger_cv_.notify_one();
}
}

void AsyncUdpcapSocket::waitForData()
{
while (true)
{
//TODO: Revise this function, it is very important, but currently not safe.

AsyncReceiveFromParameters next_async_receive_from_parameters{};

// Wait until there is somebody requesting some data. This is done by waiting for the callback queue to be non-empty.
{
std::unique_lock<std::mutex> lock(wait_thread_trigger_mutex_);
wait_thread_trigger_cv_.wait(lock, [this] { return is_closed || !async_receive_from_parameters_queue_.empty(); });

if (!async_receive_from_parameters_queue_.empty())
{
next_async_receive_from_parameters = async_receive_from_parameters_queue_.front();
async_receive_from_parameters_queue_.pop_front();
}

if (async_receive_from_parameters_queue_.empty() && is_closed)
{
return;
}
}

if (udpcap_socket_.isBound())
{
std::cerr << "===Start receiving data" << std::endl; // TODO REMOVE
size_t rec_bytes = udpcap_socket_.receiveDatagram(next_async_receive_from_parameters.buffer_
, next_async_receive_from_parameters.max_buffer_size_
, next_async_receive_from_parameters.sender_address_
, next_async_receive_from_parameters.sender_port_);

std::cerr << "===Received " << rec_bytes << " bytes from " << next_async_receive_from_parameters.sender_address_->toString() << ":" << std::to_string(*next_async_receive_from_parameters.sender_port_) << std::endl; // TODO REMOVE

if (next_async_receive_from_parameters.sender_address_->isValid())
{
next_async_receive_from_parameters.read_handler_(ecaludp::Error::OK, rec_bytes);
}
else
{
next_async_receive_from_parameters.read_handler_(ecaludp::Error::GENERIC_ERROR, rec_bytes);
}
}
else
{
next_async_receive_from_parameters.read_handler_(ecaludp::Error::GENERIC_ERROR, 0);
}
}
}
}
Loading

0 comments on commit afc5ad4

Please sign in to comment.