Skip to content

Commit

Permalink
added missing files, removed obsolete files
Browse files Browse the repository at this point in the history
  • Loading branch information
rex-schilasky committed Apr 3, 2024
1 parent 98558e1 commit 6cffa12
Show file tree
Hide file tree
Showing 19 changed files with 570 additions and 1,497 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,28 @@
*/

/**
* @brief raw message buffer handling
* @brief UDP receiver attributes
**/

#include <cstddef>
#pragma once

#include <functional>
#include <string>
#include <vector>

namespace IO
namespace eCAL
{
namespace UDP
{
size_t CreateSampleBuffer(const std::string& sample_name_, const std::vector<char>& serialized_sample_, std::vector<char>& payload_);
struct SReceiverAttr
{
std::string address;
int port = 0;
bool broadcast = false;
bool loopback = true;
int rcvbuf = 1024 * 1024;
};

using TransmitCallbackT = std::function<size_t(const void*, const size_t)>;
size_t SendFragmentedMessage(char* buf_, size_t buf_len_, const TransmitCallbackT& transmit_cb_);
using HasSampleCallbackT = std::function<bool(const std::string& sample_name_)>;
using ApplySampleCallbackT = std::function<void(const char* serialized_sample_data_, size_t serialized_sample_size_)>;
}
}
226 changes: 226 additions & 0 deletions ecal/core/src/io/udp/ecal_udp_sample_receiver_asio.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* ========================= eCAL LICENSE =================================
*/

/**
* @brief UDP sample receiver to receive messages of type eCAL::Sample
**/

#include "ecal_udp_sample_receiver_asio.h"

#include <array>
#include <iostream>

namespace eCAL
{
namespace UDP
{
CSampleReceiverAsio::CSampleReceiverAsio(const SReceiverAttr& attr_, HasSampleCallbackT has_sample_callback_, ApplySampleCallbackT apply_sample_callback_) :
m_has_sample_callback(std::move(has_sample_callback_)), m_apply_sample_callback(std::move(apply_sample_callback_)),
m_broadcast(attr_.broadcast)
{
// initialize io context
m_io_context = std::make_shared<asio::io_context>();
m_work = std::make_shared<asio::io_context::work>(*m_io_context);

InitializeSocket(attr_);

// join multicast group
AddMultiCastGroup(attr_.address.c_str());

// run the io context
m_io_thread = std::thread([this] { m_io_context->run(); });

// start receiving
Receive();
}

CSampleReceiverAsio::~CSampleReceiverAsio()
{
FinalizeSocket();

// stop io context
m_work.reset();
if (m_io_thread.joinable())
m_io_thread.join();
}

bool CSampleReceiverAsio::AddMultiCastGroup(const char* ipaddr_)
{
if (!m_broadcast)
{
asio::error_code ec;
m_socket->set_option(asio::ip::multicast::join_group(asio::ip::make_address(ipaddr_)), ec);
if (ec)
{
std::cerr << "CSampleReceiver: Unable to join multicast group: " << ec.message() << std::endl;
return false;
}
}
return true;
}

bool CSampleReceiverAsio::RemMultiCastGroup(const char* ipaddr_)
{
if (!m_broadcast)
{
asio::error_code ec;
m_socket->set_option(asio::ip::multicast::leave_group(asio::ip::make_address(ipaddr_)), ec);
if (ec)
{
std::cerr << "CSampleReceiver: Unable to leave multicast group: " << ec.message() << std::endl;
return false;
}
}
return true;
}

void CSampleReceiverAsio::InitializeSocket(const SReceiverAttr& attr_)
{
// create socket
m_socket = std::make_shared<ecaludp::Socket>(*m_io_context, std::array<char, 4>{'E', 'C', 'A', 'L'});

// open socket
const asio::ip::udp::endpoint listen_endpoint(asio::ip::udp::v4(), static_cast<unsigned short>(attr_.port));
{
asio::error_code ec;
m_socket->open(listen_endpoint.protocol(), ec);
if (ec)
{
std::cerr << "CSampleReceiver: Unable to open socket: " << ec.message() << std::endl;
return;
}
}

// set socket reuse
{
asio::error_code ec;
m_socket->set_option(asio::ip::udp::socket::reuse_address(true), ec);
if (ec)
{
std::cerr << "CSampleReceiver: Unable to set reuse-address option: " << ec.message() << std::endl;
}
}

// set loopback option
{
const asio::ip::multicast::enable_loopback loopback(attr_.loopback);
asio::error_code ec;
m_socket->set_option(loopback, ec);
if (ec)
{
std::cerr << "CSampleReceiver: Unable to enable loopback: " << ec.message() << std::endl;
}
}

// set receive buffer size (default = 1 MB)
{
int rcvbuf = 1024 * 1024;
if (attr_.rcvbuf > 0) rcvbuf = attr_.rcvbuf;
const asio::socket_base::receive_buffer_size recbufsize(rcvbuf);
asio::error_code ec;
m_socket->set_option(recbufsize, ec);
if (ec)
{
std::cerr << "CSampleReceiver: Unable to set receive buffer size: " << ec.message() << std::endl;
}
}
// bind socket
{
asio::error_code ec;
m_socket->bind(listen_endpoint, ec);
if (ec)
{
std::cerr << "CSampleReceiver: Unable to bind socket to " << listen_endpoint.address().to_string() << ":" << listen_endpoint.port() << ": " << ec.message() << std::endl;
return;
}
}
}

void CSampleReceiverAsio::FinalizeSocket()
{
// cancel async socket operations
m_socket->cancel();
}

void CSampleReceiverAsio::Receive()
{
m_socket->async_receive_from(m_sender_endpoint,
[this](const std::shared_ptr<ecaludp::OwningBuffer>& buffer, asio::error_code ec)
{
// triggered by m_socket->cancel in destructor
if (ec == asio::error::operation_aborted)
{
m_socket->close();
return;
}

if (ec)
{
std::cout << "CSampleReceiver: Error receiving: " << ec.message() << std::endl;
return;
}

// extract data from the buffer
auto receive_buffer = static_cast<const char*>(buffer->data());
bool processed = true;

// read sample_name size
unsigned short sample_name_size = 0;
memcpy(&sample_name_size, receive_buffer, 2);

// check for damaged data
if (sample_name_size > buffer->size())
{
std::cout << "CSampleReceiver: Received damaged data. Wrong sample name size." << std::endl;
processed = false;
}
else
{
// read sample_name
const std::string sample_name(receive_buffer + sizeof(sample_name_size));

// calculate payload offset
auto payload_offset = sizeof(sample_name_size) + sample_name_size;

// check for damaged data
if (payload_offset > buffer->size())
{
std::cout << "CSampleSender: Received damaged data. Wrong payload buffer offset." << std::endl;
processed = false;
}
else if (m_has_sample_callback(sample_name)) // if we are interested in the sample payload
{
// extract payload and its size
auto payload_buffer = receive_buffer + payload_offset;
auto payload_buffer_size = buffer->size() - payload_offset;

// apply the sample payload
m_apply_sample_callback(payload_buffer, payload_buffer_size);
}
}

// recursively call Receive() to continue listening for data
if (processed)
{
this->Receive();
}
});
}
}
}
63 changes: 63 additions & 0 deletions ecal/core/src/io/udp/ecal_udp_sample_receiver_asio.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* ========================= eCAL LICENSE =================================
*/

/**
* @brief UDP sample receiver to receive messages of type eCAL::Sample
**/

#pragma once

#include "io/udp/ecal_udp_receiver_attr.h"

#include <ecaludp/socket.h>
#include <memory>
#include <thread>

namespace eCAL
{
namespace UDP
{
class CSampleReceiverAsio
{
public:
CSampleReceiverAsio(const SReceiverAttr& attr_, HasSampleCallbackT has_sample_callback_, ApplySampleCallbackT apply_sample_callback_);
virtual ~CSampleReceiverAsio();

bool AddMultiCastGroup(const char* ipaddr_);
bool RemMultiCastGroup(const char* ipaddr_);

private:
void InitializeSocket(const SReceiverAttr& attr_);
void FinalizeSocket();

void Receive();

HasSampleCallbackT m_has_sample_callback;
ApplySampleCallbackT m_apply_sample_callback;
bool m_broadcast = false;

std::shared_ptr<asio::io_context> m_io_context;
std::shared_ptr<asio::io_context::work> m_work;
std::shared_ptr<ecaludp::Socket> m_socket;

asio::ip::udp::endpoint m_sender_endpoint;
std::thread m_io_thread;
};
}
}
Loading

0 comments on commit 6cffa12

Please sign in to comment.