Skip to content

Commit

Permalink
Updated udpcap, so the communication now works
Browse files Browse the repository at this point in the history
  • Loading branch information
FlorianReimold committed Feb 27, 2024
1 parent afc5ad4 commit 390c43d
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 118 deletions.
16 changes: 8 additions & 8 deletions ecaludp/src/async_udpcap_socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ namespace ecaludp
, const std::function<void(ecaludp::Error, size_t)>& read_handler)
{
std::unique_lock<std::mutex> lock(wait_thread_trigger_mutex_);
std::cerr << "===Pushing async receive from parameters" << std::endl; // TODO REMOVE
async_receive_from_parameters_queue_.push_back({ buffer, max_buffer_size, &sender_address, &sender_port, read_handler });
wait_thread_trigger_cv_.notify_one();
}
Expand Down Expand Up @@ -75,6 +74,7 @@ namespace ecaludp
// Wait until there is somebody requesting some data. This is done by waiting for the callback queue to be non-empty.
{
std::unique_lock<std::mutex> lock(wait_thread_trigger_mutex_);

wait_thread_trigger_cv_.wait(lock, [this] { return is_closed || !async_receive_from_parameters_queue_.empty(); });

if (!async_receive_from_parameters_queue_.empty())
Expand All @@ -91,21 +91,21 @@ namespace ecaludp

if (udpcap_socket_.isBound())
{
std::cerr << "===Start receiving data" << std::endl; // TODO REMOVE
Udpcap::Error error = Udpcap::Error::GENERIC_ERROR;
size_t rec_bytes = udpcap_socket_.receiveDatagram(next_async_receive_from_parameters.buffer_
, next_async_receive_from_parameters.max_buffer_size_
, next_async_receive_from_parameters.sender_address_
, next_async_receive_from_parameters.sender_port_);
, next_async_receive_from_parameters.sender_port_
, error);

std::cerr << "===Received " << rec_bytes << " bytes from " << next_async_receive_from_parameters.sender_address_->toString() << ":" << std::to_string(*next_async_receive_from_parameters.sender_port_) << std::endl; // TODO REMOVE

if (next_async_receive_from_parameters.sender_address_->isValid())
if (error)
{
next_async_receive_from_parameters.read_handler_(ecaludp::Error::OK, rec_bytes);
// TODO: Properly translate from Udpcap::Error to ecaludp::Error
next_async_receive_from_parameters.read_handler_(ecaludp::Error::GENERIC_ERROR, rec_bytes);
}
else
{
next_async_receive_from_parameters.read_handler_(ecaludp::Error::GENERIC_ERROR, rec_bytes);
next_async_receive_from_parameters.read_handler_(ecaludp::Error::OK, rec_bytes);
}
}
else
Expand Down
1 change: 0 additions & 1 deletion ecaludp/src/async_udpcap_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ namespace ecaludp
Udpcap::HostAddress localAddress() const { return udpcap_socket_.localAddress(); }
uint16_t localPort() const { return udpcap_socket_.localPort(); }
bool setReceiveBufferSize(int size) { return udpcap_socket_.setReceiveBufferSize(size); }
bool hasPendingDatagrams() const { return udpcap_socket_.hasPendingDatagrams(); }
bool joinMulticastGroup(const Udpcap::HostAddress& group_address) { return udpcap_socket_.joinMulticastGroup(group_address); }
bool leaveMulticastGroup(const Udpcap::HostAddress& group_address) { return udpcap_socket_.leaveMulticastGroup(group_address); }
void setMulticastLoopbackEnabled(bool enabled) { udpcap_socket_.setMulticastLoopbackEnabled(enabled); }
Expand Down
11 changes: 5 additions & 6 deletions ecaludp/src/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,13 @@ namespace ecaludp

class recycle_shared_pool : public recycle::shared_pool<ecaludp::RawMemory, buffer_pool_lock_policy_>{};


Socket::Socket(asio::io_service& io_service, std::array<char, 4> magic_header_bytes)
: socket_ (io_service)
, datagram_buffer_pool_(new recycle_shared_pool()) // TODO: make_unique
, reassembly_v5_ (new ecaludp::v5::Reassembly())
, magic_header_bytes_ (magic_header_bytes)
: socket_ (io_service)
, datagram_buffer_pool_ (std::make_unique<ecaludp::recycle_shared_pool>())
, reassembly_v5_ (std::make_unique<ecaludp::v5::Reassembly>())
, magic_header_bytes_ (magic_header_bytes)
, max_udp_datagram_size_(1448)
, max_reassembly_age_ (std::chrono::seconds(5))
, max_reassembly_age_ (std::chrono::seconds(5))
{}

Socket::~Socket() = default;
Expand Down
10 changes: 0 additions & 10 deletions ecaludp/src/socket_udpcap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ namespace ecaludp
bool SocketUdpcap::is_bound() const { return socket_->isBound(); }
asio::ip::udp::endpoint SocketUdpcap::local_endpoint() { return asio::ip::udp::endpoint(asio::ip::make_address(socket_->localAddress().toString()), socket_->localPort()); }
bool SocketUdpcap::set_receive_buffer_size(int size) { return socket_->setReceiveBufferSize(size); }
bool SocketUdpcap::has_pending_datagrams() const { return socket_->hasPendingDatagrams(); }
bool SocketUdpcap::join_multicast_group(const asio::ip::address_v4& group_address) { return socket_->joinMulticastGroup(Udpcap::HostAddress(group_address.to_string())); }
bool SocketUdpcap::leave_multicast_group(const asio::ip::address_v4& group_address) { return socket_->leaveMulticastGroup(Udpcap::HostAddress(group_address.to_string())); }
void SocketUdpcap::set_multicast_loopback_enabled(bool enabled) { socket_->setMulticastLoopbackEnabled(enabled); }
Expand Down Expand Up @@ -100,8 +99,6 @@ namespace ecaludp
// resize the buffer to the actually received size
buffer->resize(bytes_received);

std::cout << "Received " << bytes_received << " bytes from " << sender_address->toString() << ":" << *sender_port << std::endl;

// Convert sender address and port to asio
auto sender_endpoint_of_this_datagram = std::make_shared<asio::ip::udp::endpoint>(asio::ip::make_address(sender_address->toString()), *sender_port);

Expand Down Expand Up @@ -161,14 +158,7 @@ namespace ecaludp
// Check the version and invoke the correct handler
if (header->version == 5)
{
// TODO Remove
std::cerr << "===Start handling datagram from sender_endpoint " << sender_endpoint->address().to_string() << ":" << std::to_string(sender_endpoint->port()) << std::endl;
finished_package = reassembly_v5_->handle_datagram(buffer, sender_endpoint, error);

if (finished_package)
{
std::cerr << "==============FINISHED PACKAGE================" << std::endl; // TODO Remove
}
}
else if (header->version == 6)
{
Expand Down
2 changes: 0 additions & 2 deletions tests/ecaludp_npcap_test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ set(sources
src/ecaludp_npcap_socket_test.cpp
)

message (STATUS "todo remove=========== CMAKE_SIZEOF_VOID_P ${CMAKE_SIZEOF_VOID_P}")

add_executable(${PROJECT_NAME} ${sources})

target_link_libraries(${PROJECT_NAME}
Expand Down
178 changes: 88 additions & 90 deletions tests/ecaludp_npcap_test/src/ecaludp_npcap_socket_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,95 +23,28 @@

#include "atomic_signalable.h"

//TEST(EcalUdpNpcapSocket, RAII_unbound)
//{
// // Create the socket and destroy it
// ecaludp::SocketUdpcap receiver_socket({'E', 'C', 'A', 'L'});
//}
//
//TEST(EcalUdpNpcapSocket, RAII_bound)
//{
// // Create the socket, bind and destroy it
// ecaludp::SocketUdpcap receiver_socket({'E', 'C', 'A', 'L'});
// receiver_socket.bind(asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), 14000));
// int todo_remove = 0;
//}
//
//TEST(EcalUdpNpcapSocket, RAII_close)
//{
// // Create the socket, bind and close it
// ecaludp::SocketUdpcap receiver_socket({'E', 'C', 'A', 'L'});
// receiver_socket.bind(asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), 14000));
// receiver_socket.close();
//}
//
//TEST(EcalUdpNpcapSocket, HelloWorldMessage)
//{
// atomic_signalable<int> received_messages(0);
//
// asio::io_context io_context;
//
// // Create the sockets
// ecaludp::Socket sender_socket (io_context, {'E', 'C', 'A', 'L'});
// ecaludp::SocketUdpcap receiver_socket({'E', 'C', 'A', 'L'});
//
// // Open the sender_socket
// {
// asio::error_code ec;
// sender_socket.open(asio::ip::udp::v4(), ec);
// ASSERT_EQ(ec, asio::error_code());
// }
//
// // Bind the receiver_socket
// {
// bool success = receiver_socket.bind(asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), 14000));
// ASSERT_EQ(success, true);
// }
//
// auto work = std::make_unique<asio::io_context::work>(io_context);
// std::thread io_thread([&io_context]() { io_context.run(); });
//
// std::shared_ptr<asio::ip::udp::endpoint> sender_endpoint = std::make_shared<asio::ip::udp::endpoint>();
// std::shared_ptr<std::string> message_to_send = std::make_shared<std::string>("Hello World!");
//
// // Wait for the next message
// receiver_socket.async_receive_from(*sender_endpoint
// , [sender_endpoint, &received_messages, message_to_send](const std::shared_ptr<ecaludp::OwningBuffer>& buffer, ecaludp::Error error)
// {
// // No error
// if (error)
// {
// FAIL();
// }
//
// // compare the messages
// std::string received_string(static_cast<const char*>(buffer->data()), buffer->size());
// ASSERT_EQ(received_string, *message_to_send);
//
// // increment
// received_messages++;
// });
//
// // Send a message
// sender_socket.async_send_to({ asio::buffer(*message_to_send) }
// , asio::ip::udp::endpoint(asio::ip::address_v4::loopback()
// , 14000)
// , [message_to_send](asio::error_code ec)
// {
// // No error
// ASSERT_EQ(ec, asio::error_code());
// });
//
// // Wait for the message to be received
// received_messages.wait_for([](int received_messages) { return received_messages == 1; }, std::chrono::milliseconds(100));
//
// ASSERT_EQ(received_messages, 1);
//
// work.reset();
// io_thread.join();
//}
TEST(EcalUdpNpcapSocket, RAII_unbound)
{
// Create the socket and destroy it
ecaludp::SocketUdpcap receiver_socket({'E', 'C', 'A', 'L'});
}

TEST(EcalUdpSocket, BigMessage)
TEST(EcalUdpNpcapSocket, RAII_bound)
{
// Create the socket, bind and destroy it
ecaludp::SocketUdpcap receiver_socket({'E', 'C', 'A', 'L'});
receiver_socket.bind(asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), 14000));
}

TEST(EcalUdpNpcapSocket, RAII_close)
{
// Create the socket, bind and close it
ecaludp::SocketUdpcap receiver_socket({'E', 'C', 'A', 'L'});
receiver_socket.bind(asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), 14000));
receiver_socket.close();
}

TEST(EcalUdpNpcapSocket, HelloWorldMessage)
{
atomic_signalable<int> received_messages(0);

Expand All @@ -134,13 +67,78 @@ TEST(EcalUdpSocket, BigMessage)
ASSERT_EQ(success, true);
}

receiver_socket.set_receive_buffer_size(1024 * 1024 * 10); // 10 MB
auto work = std::make_unique<asio::io_context::work>(io_context);
std::thread io_thread([&io_context]() { io_context.run(); });

std::shared_ptr<asio::ip::udp::endpoint> sender_endpoint = std::make_shared<asio::ip::udp::endpoint>();
std::shared_ptr<std::string> message_to_send = std::make_shared<std::string>("Hello World!");

// Wait for the next message
receiver_socket.async_receive_from(*sender_endpoint
, [sender_endpoint, &received_messages, message_to_send](const std::shared_ptr<ecaludp::OwningBuffer>& buffer, ecaludp::Error error)
{
// No error
if (error)
{
FAIL();
}

// compare the messages
std::string received_string(static_cast<const char*>(buffer->data()), buffer->size());
ASSERT_EQ(received_string, *message_to_send);

// increment
received_messages++;
});

// Send a message
sender_socket.async_send_to({ asio::buffer(*message_to_send) }
, asio::ip::udp::endpoint(asio::ip::address_v4::loopback()
, 14000)
, [message_to_send](asio::error_code ec)
{
// No error
ASSERT_EQ(ec, asio::error_code());
});

// Wait for the message to be received
received_messages.wait_for([](int received_messages) { return received_messages == 1; }, std::chrono::milliseconds(100));

ASSERT_EQ(received_messages, 1);

work.reset();
io_thread.join();
}

TEST(EcalUdpSocket, BigMessage)
{
constexpr int message_size = 1024 * 100;
atomic_signalable<int> received_messages(0);

asio::io_context io_context;

// Create the sockets
ecaludp::Socket sender_socket (io_context, {'E', 'C', 'A', 'L'});
ecaludp::SocketUdpcap receiver_socket({'E', 'C', 'A', 'L'});

// Open the sender_socket
{
asio::error_code ec;
sender_socket.open(asio::ip::udp::v4(), ec);
ASSERT_EQ(ec, asio::error_code());
}

// Bind the receiver_socket
{
bool success = receiver_socket.bind(asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), 14000));
ASSERT_EQ(success, true);
}

auto work = std::make_unique<asio::io_context::work>(io_context);
std::thread io_thread([&io_context]() { io_context.run(); });

std::shared_ptr<asio::ip::udp::endpoint> sender_endpoint = std::make_shared<asio::ip::udp::endpoint>();
std::shared_ptr<std::string> message_to_send = std::make_shared<std::string>(1024 * 2, 'a');
std::shared_ptr<std::string> message_to_send = std::make_shared<std::string>(message_size, 'a');

// Fill the message with random characters
std::generate(message_to_send->begin(), message_to_send->end(), []() { return static_cast<char>(std::rand()); });
Expand Down

0 comments on commit 390c43d

Please sign in to comment.