From afe771a07f1d0208c17f2733e9050055004272b3 Mon Sep 17 00:00:00 2001 From: Michal Sedlak Date: Fri, 29 Jan 2021 23:24:24 +0100 Subject: [PATCH 1/3] Forwarder: Added forwarder plugin --- src/plugins/output/CMakeLists.txt | 1 + src/plugins/output/forwarder/CMakeLists.txt | 37 ++ src/plugins/output/forwarder/README.rst | 73 +++ src/plugins/output/forwarder/TODO.txt | 5 + .../doc/ipfixcol2-forwarder-output.7.rst | 20 + .../output/forwarder/src/Connection.cpp | 131 ++++++ src/plugins/output/forwarder/src/Connection.h | 119 +++++ .../output/forwarder/src/ConnectionBuffer.h | 221 +++++++++ .../forwarder/src/ConnectionManager.cpp | 164 +++++++ .../output/forwarder/src/ConnectionManager.h | 93 ++++ .../output/forwarder/src/ConnectionParams.h | 136 ++++++ src/plugins/output/forwarder/src/Forwarder.h | 436 ++++++++++++++++++ .../output/forwarder/src/IPFIXMessage.h | 107 +++++ .../output/forwarder/src/MessageBuilder.h | 150 ++++++ src/plugins/output/forwarder/src/SyncPipe.h | 92 ++++ src/plugins/output/forwarder/src/config.h | 282 +++++++++++ src/plugins/output/forwarder/src/main.cpp | 110 +++++ 17 files changed, 2177 insertions(+) create mode 100644 src/plugins/output/forwarder/CMakeLists.txt create mode 100644 src/plugins/output/forwarder/README.rst create mode 100644 src/plugins/output/forwarder/TODO.txt create mode 100644 src/plugins/output/forwarder/doc/ipfixcol2-forwarder-output.7.rst create mode 100644 src/plugins/output/forwarder/src/Connection.cpp create mode 100644 src/plugins/output/forwarder/src/Connection.h create mode 100644 src/plugins/output/forwarder/src/ConnectionBuffer.h create mode 100644 src/plugins/output/forwarder/src/ConnectionManager.cpp create mode 100644 src/plugins/output/forwarder/src/ConnectionManager.h create mode 100644 src/plugins/output/forwarder/src/ConnectionParams.h create mode 100644 src/plugins/output/forwarder/src/Forwarder.h create mode 100644 src/plugins/output/forwarder/src/IPFIXMessage.h create mode 100644 src/plugins/output/forwarder/src/MessageBuilder.h create mode 100644 src/plugins/output/forwarder/src/SyncPipe.h create mode 100644 src/plugins/output/forwarder/src/config.h create mode 100644 src/plugins/output/forwarder/src/main.cpp diff --git a/src/plugins/output/CMakeLists.txt b/src/plugins/output/CMakeLists.txt index aa123f2d..11681bdd 100644 --- a/src/plugins/output/CMakeLists.txt +++ b/src/plugins/output/CMakeLists.txt @@ -6,3 +6,4 @@ add_subdirectory(json-kafka) add_subdirectory(timecheck) add_subdirectory(viewer) add_subdirectory(ipfix) +add_subdirectory(forwarder) diff --git a/src/plugins/output/forwarder/CMakeLists.txt b/src/plugins/output/forwarder/CMakeLists.txt new file mode 100644 index 00000000..2bec16c3 --- /dev/null +++ b/src/plugins/output/forwarder/CMakeLists.txt @@ -0,0 +1,37 @@ +# Create a linkable module +add_library(forwarder-output MODULE + src/main.cpp + src/config.h + src/Forwarder.h + src/ConnectionManager.h + src/ConnectionManager.cpp + src/ConnectionParams.h + src/Connection.h + src/Connection.cpp + src/ConnectionBuffer.h + src/SyncPipe.h + src/IPFIXMessage.h + src/MessageBuilder.h +) + +install( + TARGETS forwarder-output + LIBRARY DESTINATION "${INSTALL_DIR_LIB}/ipfixcol2/" +) + +if (ENABLE_DOC_MANPAGE) + # Build a manual page + set(SRC_FILE "${CMAKE_CURRENT_SOURCE_DIR}/doc/ipfixcol2-forwarder-output.7.rst") + set(DST_FILE "${CMAKE_CURRENT_BINARY_DIR}/ipfixcol2-forwarder-output.7") + + add_custom_command(TARGET forwarder-output PRE_BUILD + COMMAND ${RST2MAN_EXECUTABLE} --syntax-highlight=none ${SRC_FILE} ${DST_FILE} + DEPENDS ${SRC_FILE} + VERBATIM + ) + + install( + FILES "${DST_FILE}" + DESTINATION "${INSTALL_DIR_MAN}/man7" + ) +endif() diff --git a/src/plugins/output/forwarder/README.rst b/src/plugins/output/forwarder/README.rst new file mode 100644 index 00000000..8dd97c38 --- /dev/null +++ b/src/plugins/output/forwarder/README.rst @@ -0,0 +1,73 @@ +Forwarder (output plugin) +========================== + +This plugin allows forwarding incoming IPFIX messages to other collector in various modes. + +It can be used to broadcast messages to multiple collectors (e.g. a main and a backup collector), +or to distribute messages across multiple collectors (e.g. for load balancing). + +Example configuration +--------------------- + +.. code-block:: xml + + + Forwarder + forwarder + + roundrobin + tcp + + + Subcollector 1 +
127.0.0.1
+ 4751 +
+ + Subcollector 2 +
localhost
+ 4752 +
+
+
+
+ +Parameters +---------- + +:``mode``: + The forwarding mode; round robin (messages are sent to one host at time and hosts are cycled through) or all (messages are broadcasted to all hosts) + [values: RoundRobin/All] + +:``protocol``: + The transport protocol to use + [values: TCP/UDP] + +:``templateRefreshIntervalSecs``: + Send templates again every N seconds (UDP only) + [value: number of seconds, default: 600] + +:``templateRefreshIntervalBytes``: + Send templates again every N bytes (UDP only) + [value: number of bytes, default: 5000000] + +:``reconnectIntervalSecs``: + Attempt to reconnect every N seconds in case the connection drops (TCP only) + [value: number of seconds, default: 10] + +:``hosts``: + The receiving hosts + + :``host``: + :``name``: + Optional identification of the host + [value: string, default:
:] + + :``address``: + The address of the host + [value: IPv4/IPv6 address or a hostname] + + :``port``: + The port to connect to + [value: port number] + diff --git a/src/plugins/output/forwarder/TODO.txt b/src/plugins/output/forwarder/TODO.txt new file mode 100644 index 00000000..37631f98 --- /dev/null +++ b/src/plugins/output/forwarder/TODO.txt @@ -0,0 +1,5 @@ +* Template withdrawals +* More effective way of handling template changes - currently all the templates are being sent again every time any change in templates is detected +* Message MTU +* Possible bug: when testing, a small number of data records seems to be lost (something like 20 out of 1,000,000) +* Connection buffer size \ No newline at end of file diff --git a/src/plugins/output/forwarder/doc/ipfixcol2-forwarder-output.7.rst b/src/plugins/output/forwarder/doc/ipfixcol2-forwarder-output.7.rst new file mode 100644 index 00000000..a5d6767d --- /dev/null +++ b/src/plugins/output/forwarder/doc/ipfixcol2-forwarder-output.7.rst @@ -0,0 +1,20 @@ +============================ + ipfixcol2-forwarder-output +============================ + +-------------------------- +Forwarder (output plugin) +-------------------------- + +:Author: Michal Sedlak (xsedla0v@stud.fit.vutbr.cz) +:Date: 2021-01-28 +:Copyright: Copyright © 2021 CESNET, z.s.p.o. +:Version: 1.0 +:Manual section: 7 +:Manual group: IPFIXcol collector + +Description +----------- + +.. include:: ../README.rst + :start-line: 3 diff --git a/src/plugins/output/forwarder/src/Connection.cpp b/src/plugins/output/forwarder/src/Connection.cpp new file mode 100644 index 00000000..2dcdec0a --- /dev/null +++ b/src/plugins/output/forwarder/src/Connection.cpp @@ -0,0 +1,131 @@ +/** + * \file src/plugins/output/forwarder/src/Connection.cpp + * \author Michal Sedlak + * \brief Buffered socket connection + * \date 2021 + */ + +/* Copyright (C) 2021 CESNET, z.s.p.o. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * ALTERNATIVELY, provided that this notice is retained in full, this + * product may be distributed under the terms of the GNU General Public + * License (GPL) version 2 or later, in which case the provisions + * of the GPL apply INSTEAD OF those given above. + * + * This software is provided ``as is'', and any express or implied + * warranties, including, but not limited to, the implied warranties of + * merchantability and fitness for a particular purpose are disclaimed. + * In no event shall the company or contributors be liable for any + * direct, indirect, incidental, special, exemplary, or consequential + * damages (including, but not limited to, procurement of substitute + * goods or services; loss of use, data, or profits; or business + * interruption) however caused and on any theory of liability, whether + * in contract, strict liability, or tort (including negligence or + * otherwise) arising in any way out of the use of this software, even + * if advised of the possibility of such damage. + * + */ + +#include "Connection.h" + +#include + +Connection::Connection(ConnectionManager &manager, ConnectionParams params, long buffer_size) +: manager(manager) +, params(params) +, buffer(buffer_size) +{ +} + +bool +Connection::connect() +{ + if (sockfd >= 0) { + ::close(sockfd); + } + sockfd = params.make_socket(); + return sockfd >= 0; +} + +std::unique_lock +Connection::begin_write() +{ + return std::unique_lock(buffer_mutex); +} + +bool +Connection::write(void *data, long length) +{ + return buffer.write((uint8_t *)data, length); +} + +void +Connection::rollback_write() +{ + buffer.rollback(); +} + +long +Connection::writeable() +{ + return buffer.writeable(); +} + +void +Connection::commit_write() +{ + buffer.commit(); + manager.pipe.notify(); + has_data_to_send = buffer.readable(); +} + +bool +Connection::send_some() +{ + if (params.protocol == TransProto::Udp) { + while (1) { + fds_ipfix_msg_hdr ipfix_header; + if (!buffer.peek(ipfix_header)) { + return true; + } + auto message_length = ntohs(ipfix_header.length); + int ret = buffer.send_data(sockfd, message_length); + if (ret == 0 || !buffer.readable()) { + return true; + } else if (ret < 0) { + return false; + } + } + return true; + + } else if (params.protocol == TransProto::Tcp) { + return buffer.send_data(sockfd) >= 0; + } +} + +void +Connection::close() +{ + close_flag = true; + manager.pipe.notify(); +} + +Connection::~Connection() +{ + if (sockfd >= 0) { + ::close(sockfd); + } +} \ No newline at end of file diff --git a/src/plugins/output/forwarder/src/Connection.h b/src/plugins/output/forwarder/src/Connection.h new file mode 100644 index 00000000..7a3d6b37 --- /dev/null +++ b/src/plugins/output/forwarder/src/Connection.h @@ -0,0 +1,119 @@ +/** + * \file src/plugins/output/forwarder/src/Connection.h + * \author Michal Sedlak + * \brief Buffered socket connection + * \date 2021 + */ + +/* Copyright (C) 2021 CESNET, z.s.p.o. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * ALTERNATIVELY, provided that this notice is retained in full, this + * product may be distributed under the terms of the GNU General Public + * License (GPL) version 2 or later, in which case the provisions + * of the GPL apply INSTEAD OF those given above. + * + * This software is provided ``as is'', and any express or implied + * warranties, including, but not limited to, the implied warranties of + * merchantability and fitness for a particular purpose are disclaimed. + * In no event shall the company or contributors be liable for any + * direct, indirect, incidental, special, exemplary, or consequential + * damages (including, but not limited to, procurement of substitute + * goods or services; loss of use, data, or profits; or business + * interruption) however caused and on any theory of liability, whether + * in contract, strict liability, or tort (including negligence or + * otherwise) arising in any way out of the use of this software, even + * if advised of the possibility of such damage. + * + */ + +#pragma once + +#include "ConnectionManager.h" +#include "ConnectionParams.h" +#include "ConnectionBuffer.h" + +#include +#include +#include +#include + +#include +#include +#include + +static constexpr int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; + +class ConnectionManager; + +class Connection +{ +friend class ConnectionManager; + +public: + /// Flag indicating that the connection was lost and the forwarder needs to resend templates etc. + /// The flag won't be reset when the connection is reestablished! + std::atomic connection_lost_flag { false }; + + Connection(ConnectionManager &manager, ConnectionParams params, long buffer_size = DEFAULT_BUFFER_SIZE); + + bool + connect(); + + std::unique_lock + begin_write(); + + bool + write(void *data, long length); + + bool + send_some(); + + void + commit_write(); + + void + rollback_write(); + + long + writeable(); + + void + close(); + + ~Connection(); + +private: + /// The manager managing this connection + ConnectionManager &manager; + + /// The parameters to estabilish the connection + ConnectionParams params; + + /// The connection socket + int sockfd = -1; + + /// Buffer for the data to send and a mutex guarding it + /// (buffer will be accessed from sender thread and writer thread) + std::mutex buffer_mutex; + ConnectionBuffer buffer; + + /// Flag indicating whether the buffer has any data to send so we don't have to lock the mutex every time + /// (doesn't need to be atomic because we only set it while holding the mutex) + bool has_data_to_send = false; + + /// Flag indicating that the connection has been closed and can be disposed of after the data is sent + std::atomic close_flag { false }; +}; \ No newline at end of file diff --git a/src/plugins/output/forwarder/src/ConnectionBuffer.h b/src/plugins/output/forwarder/src/ConnectionBuffer.h new file mode 100644 index 00000000..9db928a0 --- /dev/null +++ b/src/plugins/output/forwarder/src/ConnectionBuffer.h @@ -0,0 +1,221 @@ +/** + * \file src/plugins/output/forwarder/src/ConnectionBuffer.h + * \author Michal Sedlak + * \brief Ring buffer used by connections + * \date 2021 + */ + +/* Copyright (C) 2021 CESNET, z.s.p.o. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * ALTERNATIVELY, provided that this notice is retained in full, this + * product may be distributed under the terms of the GNU General Public + * License (GPL) version 2 or later, in which case the provisions + * of the GPL apply INSTEAD OF those given above. + * + * This software is provided ``as is'', and any express or implied + * warranties, including, but not limited to, the implied warranties of + * merchantability and fitness for a particular purpose are disclaimed. + * In no event shall the company or contributors be liable for any + * direct, indirect, incidental, special, exemplary, or consequential + * damages (including, but not limited to, procurement of substitute + * goods or services; loss of use, data, or profits; or business + * interruption) however caused and on any theory of liability, whether + * in contract, strict liability, or tort (including negligence or + * otherwise) arising in any way out of the use of this software, even + * if advised of the possibility of such damage. + * + */ + +#pragma once + +#include + +#include +#include +#include +#include + +class ConnectionBuffer +{ +public: + ConnectionBuffer(long capacity) + : capacity(capacity) + , buffer(capacity) + {} + + void + rollback() + { + write_offset = read_end_offset; + } + + void + commit() + { + read_end_offset = write_offset; + } + + long + writeable() + { + return writeable_from(write_offset); + } + + bool + write(uint8_t *data, long length) + { + long pos = raw_write_at(write_offset, data, length); + if (pos == -1) { + return false; + } + write_offset = pos; + return true; + } + + template + bool + write(T data) + { + return write((uint8_t *)&data, sizeof(T)); + } + + long + readable() + { + return read_offset > read_end_offset + ? capacity - read_offset + read_end_offset + : read_end_offset - read_offset; + } + + bool + peek(uint8_t *data, long length) + { + if (readable() < length) { + return false; + } + raw_read_at(read_offset, data, length); + return true; + } + + template + bool + peek(T &item) + { + return peek((uint8_t *)&item, sizeof(item)); + } + + int + send_data(int sockfd, long length = -1) + { + if (length == -1) { + length = readable(); + } + iovec iov[2] = {}; + iov[0].iov_len = std::min(cont_readable_from(read_offset), length); + iov[0].iov_base = &buffer[read_offset]; + iov[1].iov_len = length - iov[0].iov_len; + iov[1].iov_base = &buffer[0]; + msghdr msg_hdr = {}; + msg_hdr.msg_iov = iov; + msg_hdr.msg_iovlen = 2; + int ret = sendmsg(sockfd, &msg_hdr, MSG_DONTWAIT | MSG_NOSIGNAL); + if (ret < 0) { + return (errno == EWOULDBLOCK || errno == EAGAIN) ? 0 : ret; + } + read_offset = advance(read_offset, ret); + return ret; + } + +private: + long capacity; + long read_offset = 0; + long read_end_offset = 0; + long write_offset = 0; + std::vector buffer; + + long + advance(long pos, long n) + { + return (pos + n) % capacity; + } + + long + readable_from(long pos) + { + return pos > read_end_offset + ? capacity - pos + read_end_offset + : read_end_offset - pos; + } + + long + cont_readable_from(long pos) + { + return pos > read_end_offset + ? capacity - pos + : read_end_offset - pos; + } + + long + raw_read_at(long pos, uint8_t *data, long length) + { + if (readable_from(pos) < length) { + return -1; + } + long read1 = std::min(cont_readable_from(pos), length); + long read2 = length - read1; + memcpy(&data[0], &buffer[pos], read1); + memcpy(&data[read1], &buffer[advance(pos, read1)], read2); + return advance(pos, length); + } + + long + cont_writeable_from(long pos) + { + return read_offset > pos + ? read_offset - pos - 1 + : (read_offset == 0 ? capacity - pos - 1 : capacity - pos); + } + + long + writeable_from(long pos) + { + return read_offset > pos + ? read_offset - pos - 1 + : capacity - pos + read_offset - 1; + } + + long + raw_write_at(long pos, uint8_t *data, long length) + { + /// WARNING: Does not advance the write offset + if (writeable_from(pos) < length) { + return -1; + } + long write1 = std::min(length, cont_writeable_from(pos)); + long write2 = length - write1; + memcpy(&buffer[pos], &data[0], write1); + memcpy(&buffer[advance(pos, write1)], &data[write1], write2); + return advance(pos, length); + } + + template + bool + raw_write_at(long pos, T data) + { + return raw_write_at(pos, (uint8_t *)&data, sizeof(T)); + } + +}; \ No newline at end of file diff --git a/src/plugins/output/forwarder/src/ConnectionManager.cpp b/src/plugins/output/forwarder/src/ConnectionManager.cpp new file mode 100644 index 00000000..22c40189 --- /dev/null +++ b/src/plugins/output/forwarder/src/ConnectionManager.cpp @@ -0,0 +1,164 @@ +/** + * \file src/plugins/output/forwarder/src/ConnectionManager.cpp + * \author Michal Sedlak + * \brief Connection manager + * \date 2021 + */ + +/* Copyright (C) 2021 CESNET, z.s.p.o. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * ALTERNATIVELY, provided that this notice is retained in full, this + * product may be distributed under the terms of the GNU General Public + * License (GPL) version 2 or later, in which case the provisions + * of the GPL apply INSTEAD OF those given above. + * + * This software is provided ``as is'', and any express or implied + * warranties, including, but not limited to, the implied warranties of + * merchantability and fitness for a particular purpose are disclaimed. + * In no event shall the company or contributors be liable for any + * direct, indirect, incidental, special, exemplary, or consequential + * damages (including, but not limited to, procurement of substitute + * goods or services; loss of use, data, or profits; or business + * interruption) however caused and on any theory of liability, whether + * in contract, strict liability, or tort (including negligence or + * otherwise) arising in any way out of the use of this software, even + * if advised of the possibility of such damage. + * + */ + +#include "ConnectionManager.h" + +Connection & +ConnectionManager::add_client(ConnectionParams params) +{ + auto connection_ptr = std::unique_ptr(new Connection(*this, params)); + auto &connection = *connection_ptr; + std::lock_guard guard(mutex); + if (connection.connect()) { + active_connections.push_back(std::move(connection_ptr)); + } else { + reconnect_connections.push_back(std::move(connection_ptr)); + } + return connection; +} + +void +ConnectionManager::send_loop() +{ + int max_fd; + + fd_set pipe_fds; + FD_ZERO(&pipe_fds); + FD_SET(pipe.get_readfd(), &pipe_fds); + + fd_set socket_fds; + FD_ZERO(&socket_fds); + + auto watch_sock = [&](int fd) { + FD_SET(fd, &pipe_fds); + max_fd = std::max(max_fd, fd); + }; + + while (!exit_flag) { + max_fd = pipe.get_readfd(); + FD_ZERO(&socket_fds); + + { + std::lock_guard guard(mutex); + pipe.clear(); + auto it = active_connections.begin(); + while (it != active_connections.end()) { + auto &connection = **it; + if (connection.has_data_to_send) { + std::lock_guard guard(connection.buffer_mutex); + if (connection.send_some()) { + if (connection.buffer.readable()) { + watch_sock(connection.sockfd); + } + connection.has_data_to_send = connection.buffer.readable(); + } else { + connection.connection_lost_flag = true; + reconnect_connections.push_back(std::move(*it)); + it = active_connections.erase(it); + reconnect_cv.notify_one(); + continue; + } + } else { + if (connection.close_flag) { + it = active_connections.erase(it); + continue; + } + } + it++; + } + } + + select(max_fd + 1, &pipe_fds, &socket_fds, NULL, NULL); + } +} + +void +ConnectionManager::reconnect_loop() +{ + while (!exit_flag) { + auto lock = std::unique_lock(mutex); + auto it = reconnect_connections.begin(); + while (it != reconnect_connections.end()) { + auto &connection = **it; + if (connection.connect()) { + active_connections.push_back(std::move(*it)); + it = reconnect_connections.erase(it); + pipe.notify(); + } else { + if (connection.close_flag) { + it = reconnect_connections.erase(it); + continue; + } + it++; + } + } + + if (reconnect_connections.empty()) { + reconnect_cv.wait(lock); + } else { + reconnect_cv.wait_for(lock, std::chrono::seconds(reconnect_interval_secs)); + } + } +} + +void +ConnectionManager::start() +{ + send_thread = std::thread([this]() { send_loop(); }); + reconnect_thread = std::thread([this]() { reconnect_loop(); }); +} + +void +ConnectionManager::stop() +{ + exit_flag = true; + pipe.notify(); + reconnect_cv.notify_one(); + send_thread.join(); + reconnect_thread.join(); +} + +void +ConnectionManager::set_reconnect_interval(int secs) +{ + reconnect_interval_secs = secs; +} + diff --git a/src/plugins/output/forwarder/src/ConnectionManager.h b/src/plugins/output/forwarder/src/ConnectionManager.h new file mode 100644 index 00000000..752cedaf --- /dev/null +++ b/src/plugins/output/forwarder/src/ConnectionManager.h @@ -0,0 +1,93 @@ +/** + * \file src/plugins/output/forwarder/src/ConnectionManager.h + * \author Michal Sedlak + * \brief Connection manager + * \date 2021 + */ + +/* Copyright (C) 2021 CESNET, z.s.p.o. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * ALTERNATIVELY, provided that this notice is retained in full, this + * product may be distributed under the terms of the GNU General Public + * License (GPL) version 2 or later, in which case the provisions + * of the GPL apply INSTEAD OF those given above. + * + * This software is provided ``as is'', and any express or implied + * warranties, including, but not limited to, the implied warranties of + * merchantability and fitness for a particular purpose are disclaimed. + * In no event shall the company or contributors be liable for any + * direct, indirect, incidental, special, exemplary, or consequential + * damages (including, but not limited to, procurement of substitute + * goods or services; loss of use, data, or profits; or business + * interruption) however caused and on any theory of liability, whether + * in contract, strict liability, or tort (including negligence or + * otherwise) arising in any way out of the use of this software, even + * if advised of the possibility of such damage. + * + */ + +#pragma once + +#include "ConnectionParams.h" +#include "Connection.h" +#include "SyncPipe.h" + +#include +#include +#include +#include +#include +#include +#include + +class Connection; + +static constexpr int DEFAULT_RECONNECT_INTERVAL_SECS = 5; + +class ConnectionManager +{ +friend class Connection; + +public: + Connection & + add_client(ConnectionParams params); + + void + start(); + + void + stop(); + + void + set_reconnect_interval(int secs); + +private: + int reconnect_interval_secs = DEFAULT_RECONNECT_INTERVAL_SECS; + std::mutex mutex; + std::vector> active_connections; + std::vector> reconnect_connections; + std::thread send_thread; + std::thread reconnect_thread; + std::condition_variable reconnect_cv; + std::atomic exit_flag { false }; + SyncPipe pipe; + + void + send_loop(); + + void + reconnect_loop(); +}; diff --git a/src/plugins/output/forwarder/src/ConnectionParams.h b/src/plugins/output/forwarder/src/ConnectionParams.h new file mode 100644 index 00000000..5f9fbfa7 --- /dev/null +++ b/src/plugins/output/forwarder/src/ConnectionParams.h @@ -0,0 +1,136 @@ +/** + * \file src/plugins/output/forwarder/src/ConnectionParams.h + * \author Michal Sedlak + * \brief Parameters for estabilishing connection + * \date 2021 + */ + +/* Copyright (C) 2021 CESNET, z.s.p.o. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * ALTERNATIVELY, provided that this notice is retained in full, this + * product may be distributed under the terms of the GNU General Public + * License (GPL) version 2 or later, in which case the provisions + * of the GPL apply INSTEAD OF those given above. + * + * This software is provided ``as is'', and any express or implied + * warranties, including, but not limited to, the implied warranties of + * merchantability and fitness for a particular purpose are disclaimed. + * In no event shall the company or contributors be liable for any + * direct, indirect, incidental, special, exemplary, or consequential + * damages (including, but not limited to, procurement of substitute + * goods or services; loss of use, data, or profits; or business + * interruption) however caused and on any theory of liability, whether + * in contract, strict liability, or tort (including negligence or + * otherwise) arising in any way out of the use of this software, even + * if advised of the possibility of such damage. + * + */ + +#pragma once + +#include +#include + +#include +#include +#include +#include +#include + +using unique_addrinfo = std::unique_ptr; + +enum class TransProto { Tcp, Udp }; + +struct ConnectionParams +{ + ConnectionParams(std::string address, std::string port, TransProto protocol) + : address(address) + , port(port) + , protocol(protocol) + { + } + + unique_addrinfo + resolve_address() + { + addrinfo *info; + addrinfo hints = {}; + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = (protocol == TransProto::Tcp ? SOCK_STREAM : SOCK_DGRAM); + hints.ai_protocol = (protocol == TransProto::Tcp ? IPPROTO_TCP : IPPROTO_UDP); + if (getaddrinfo(address.c_str(), port.c_str(), &hints, &info) == 0) { + return unique_addrinfo(info, &freeaddrinfo); + } else { + return unique_addrinfo(NULL, &freeaddrinfo); + } + } + + int + make_socket() + { + auto address_info = resolve_address(); + if (!address_info) { + return -1; + } + + int sockfd; + addrinfo *p; + + for (p = address_info.get(); p != NULL; p = p->ai_next) { + sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol); + if (sockfd < 0) { + continue; + } + + if (protocol == TransProto::Udp) { + sockaddr_in sa = {}; + sa.sin_family = AF_INET; + sa.sin_port = 0; + sa.sin_addr.s_addr = INADDR_ANY; + sa.sin_port = 0; + if (bind(sockfd, (sockaddr *)&sa, sizeof(sa)) != 0) { + close(sockfd); + continue; + } + } + + if (connect(sockfd, p->ai_addr, p->ai_addrlen) != 0) { + close(sockfd); + continue; + } + + break; + } + + if (!p) { + return -1; + } + + return sockfd; + } + + std::string + str() + { + return std::string(protocol == TransProto::Tcp ? "TCP" : "UDP") + ":" + + address + ":" + + port; + } + + std::string address; + std::string port; + TransProto protocol; +}; \ No newline at end of file diff --git a/src/plugins/output/forwarder/src/Forwarder.h b/src/plugins/output/forwarder/src/Forwarder.h new file mode 100644 index 00000000..c73e6a13 --- /dev/null +++ b/src/plugins/output/forwarder/src/Forwarder.h @@ -0,0 +1,436 @@ +/** + * \file src/plugins/output/forwarder/src/Forwarder.h + * \author Michal Sedlak + * \brief Forwarder logic + * \date 2021 + */ + +/* Copyright (C) 2021 CESNET, z.s.p.o. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * ALTERNATIVELY, provided that this notice is retained in full, this + * product may be distributed under the terms of the GNU General Public + * License (GPL) version 2 or later, in which case the provisions + * of the GPL apply INSTEAD OF those given above. + * + * This software is provided ``as is'', and any express or implied + * warranties, including, but not limited to, the implied warranties of + * merchantability and fitness for a particular purpose are disclaimed. + * In no event shall the company or contributors be liable for any + * direct, indirect, incidental, special, exemplary, or consequential + * damages (including, but not limited to, procurement of substitute + * goods or services; loss of use, data, or profits; or business + * interruption) however caused and on any theory of liability, whether + * in contract, strict liability, or tort (including negligence or + * otherwise) arising in any way out of the use of this software, even + * if advised of the possibility of such damage. + * + */ + +#pragma once + +#include "ConnectionManager.h" +#include "IPFIXMessage.h" +#include "MessageBuilder.h" + +#include +#include + +#include +#include + +enum class ForwardMode { SendToAll, RoundRobin }; + +struct Session; +struct Client; + +struct Odid +{ + Odid() {} // Default constructor needed because of std::map + + Odid(Session &session, uint32_t odid) + : session(&session) + , odid(odid) + {} + + Session *session; + uint32_t odid; + uint32_t seq_num = 0; + + const fds_tsnapshot_t *templates_snapshot = NULL; + std::time_t last_templates_send_time = 0; + unsigned bytes_since_templates_sent = 0; + + std::string + str(); + + void + reset_values() + { + seq_num = 0; + templates_snapshot = NULL; + last_templates_send_time = 0; + bytes_since_templates_sent = 0; + } +}; + +struct Session +{ + Session(Connection &connection, Client &client, std::string ident) + : connection(connection) + , client(client) + , ident(ident) + {} + + Connection &connection; + Client &client; + std::string ident; + + std::map odids; + + std::string + str(); +}; + +struct Client +{ + Client(ConnectionManager &connection_manager, ConnectionParams connection_params, std::string name = "") + : connection_manager(connection_manager) + , connection_params(connection_params) + , name(name) + {} + + ConnectionManager &connection_manager; + ConnectionParams connection_params; + std::string name; + std::map> sessions; + + std::string + str() + { + return name; + } +}; + +std::string +Odid::str() +{ + return session->ident + "(" + std::to_string(odid) + ") -> " + session->client.name; +} + +std::string +Session::str() +{ + return ident + " -> " + client.name; +} + +class Forwarder +{ +public: + Forwarder(ipx_ctx_t *log_ctx) + : log_ctx(log_ctx) + {} + + void + set_transport_protocol(TransProto transport_protocol) + { + this->transport_protocol = transport_protocol; + } + + void + set_forward_mode(ForwardMode forward_mode) + { + this->forward_mode = forward_mode; + } + + void + set_template_refresh_interval_secs(int number_of_seconds) + { + this->template_refresh_interval_secs = number_of_seconds; + } + + void + set_template_refresh_interval_bytes(int number_of_bytes) + { + this->template_refresh_interval_bytes = number_of_bytes; + } + + void + set_reconnect_interval(int secs) + { + connection_manager.set_reconnect_interval(secs); + } + + void + add_client(std::string address, std::string port, std::string name = "") + { + auto connection_params = ConnectionParams { address, port, transport_protocol }; + if (!connection_params.resolve_address()) { + throw "Cannot resolve address " + address; + } + if (name.empty()) { + name = connection_params.str(); + } + auto client = Client { connection_manager, connection_params, name }; + clients.push_back(std::move(client)); + IPX_CTX_INFO(log_ctx, "Added client %s @ %s\n", name.c_str(), connection_params.str().c_str()); + } + + void + on_session_message(ipx_msg_session_t *session_msg) + { + const ipx_session *session = ipx_msg_session_get_session(session_msg); + switch (ipx_msg_session_get_event(session_msg)) { + case IPX_MSG_SESSION_OPEN: + for (auto &client : clients) { + open_session(client, session); + } + break; + case IPX_MSG_SESSION_CLOSE: + for (auto &client : clients) { + close_session(client, session); + } + break; + } + } + + void + on_ipfix_message(ipx_msg_ipfix_t *ipfix_msg) + { + auto message = IPFIXMessage { ipfix_msg }; + switch (forward_mode) { + case ForwardMode::RoundRobin: + forward_round_robin(message); + break; + case ForwardMode::SendToAll: + forward_to_all(message); + break; + } + } + + void + start() + { + connection_manager.start(); + } + + void + stop() + { + connection_manager.stop(); + + IPX_CTX_INFO(log_ctx, "Total bytes forwarded: %ld", total_bytes); + IPX_CTX_INFO(log_ctx, "Dropped messages: %ld", dropped_messages); + IPX_CTX_INFO(log_ctx, "Dropped data records: %ld", dropped_data_records); + } + + ~Forwarder() + { + } + +private: + /// Logging context + ipx_ctx_t *log_ctx; + + /// Configuration + TransProto transport_protocol = TransProto::Tcp; + ForwardMode forward_mode = ForwardMode::SendToAll; + int template_refresh_interval_secs = 0; + int template_refresh_interval_bytes = 0; + + /// Mutating state + ConnectionManager connection_manager; + std::vector clients; + int rr_next_client = 0; + + /// Statistics + long dropped_messages = 0; + long dropped_data_records = 0; + long total_bytes = 0; + + void + open_session(Client &client, const ipx_session *session_info) + { + auto &connection = connection_manager.add_client(client.connection_params); + auto session_ptr = std::unique_ptr(new Session { connection, client, session_info->ident }); + IPX_CTX_INFO(log_ctx, "Opened session %s", session_ptr->str().c_str()); + client.sessions[session_info] = std::move(session_ptr); + } + + void + close_session(Client &client, const ipx_session *session_info) + { + auto &session = *client.sessions[session_info]; + session.connection.close(); + IPX_CTX_INFO(log_ctx, "Closed session %s", session.str().c_str()); + client.sessions.erase(session_info); + } + + void + forward_to_all(IPFIXMessage &message) + { + for (auto &client : clients) { + if (!forward_message(client, message)) { + dropped_messages += 1; + dropped_data_records += message.drec_count(); + } + } + } + + void + forward_round_robin(IPFIXMessage &message) + { + int i = 0; + while (1) { + auto &client = next_client(); + if (forward_message(client, message)) { + break; + } + + i++; + + // If we went through all the clients multiple times in a row and all the buffers are still full, + // let's just give up and move onto the next message. If we loop for too long we'll start losing messages! + if (i < (int)clients.size() * 10) { + dropped_messages += 1; + dropped_data_records += message.drec_count(); + break; + } + } + } + + /// Pick the next client in round-robin mode + Client & + next_client() + { + if (rr_next_client == (int)clients.size()) { + rr_next_client = 0; + } + return clients[rr_next_client++]; + } + + /// Send all templates from the templates snapshot obtained from the message + /// through the session connection and update the state accordingly + /// + /// \return true if there was enough space in the connection buffer, false otherwise + bool + send_templates(Session &session, Odid &odid, IPFIXMessage &message) + { + auto templates_snapshot = message.get_templates_snapshot(); + + auto header = *message.header(); + header.seq_num = htonl(odid.seq_num); + + MessageBuilder builder; + builder.begin_message(header); + + fds_tsnapshot_for(templates_snapshot, + [](const fds_template *tmplt, void *data) -> bool { + auto &builder = *(MessageBuilder *)data; + builder.write_template(tmplt); + return true; + }, &builder); + + builder.finalize_message(); + + auto lock = session.connection.begin_write(); + if (builder.message_length() > session.connection.writeable()) { + // IPX_CTX_WARNING(log_ctx, + // "[%s] Cannot send templates because buffer is full! (need %dB, have %ldB)", + // odid.str().c_str(), builder.message_length(), session.connection.writeable()); + return false; + } + session.connection.write(builder.message_data(), builder.message_length()); + session.connection.commit_write(); + + odid.templates_snapshot = templates_snapshot; + odid.bytes_since_templates_sent = 0; + odid.last_templates_send_time = std::time(NULL); + + total_bytes += builder.message_length(); + // IPX_CTX_INFO(log_ctx, "[%s] Sent templates", odid.str().c_str()); + + return true; + } + + bool + should_refresh_templates(Odid &odid) + { + if (transport_protocol != TransProto::Udp) { + return false; + } + auto time_since = (std::time(NULL) - odid.last_templates_send_time); + return (time_since > (unsigned)template_refresh_interval_secs) + || (odid.bytes_since_templates_sent > (unsigned)template_refresh_interval_bytes); + } + + bool + templates_changed(Odid &odid, IPFIXMessage &message) + { + auto templates_snapshot = message.get_templates_snapshot(); + return templates_snapshot && odid.templates_snapshot != templates_snapshot; + } + + /// Forward message to the client, including templates update if needed + /// + /// \return true if there was enough space in the connection buffer, false otherwise + bool + forward_message(Client &client, IPFIXMessage &message) + { + auto &session = *client.sessions[message.session()]; + + if (session.connection.connection_lost_flag) { + for (auto &p : session.odids) { + p.second.reset_values(); + } + session.connection.connection_lost_flag = false; + } + + if (session.odids.find(message.odid()) == session.odids.end()) { + session.odids[message.odid()] = Odid { session, message.odid() }; + IPX_CTX_INFO(log_ctx, "[%s] Seen new ODID %u", session.str().c_str(), message.odid()); + } + auto &odid = session.odids[message.odid()]; + + if (should_refresh_templates(odid) || templates_changed(odid, message)) { + if (!send_templates(session, odid, message)) { + return false; + } + } + + auto lock = session.connection.begin_write(); + if (message.length() > session.connection.writeable()) { + // IPX_CTX_WARNING(log_ctx, + // "[%s] Cannot forward message because buffer is full! (need %dB, have %ldB)", + // odid.str().c_str(), message.length(), session.connection.writeable()); + return false; + } + + auto header = *message.header(); + header.seq_num = htonl(odid.seq_num); + session.connection.write(&header, sizeof(header)); + session.connection.write(message.data() + sizeof(header), message.length() - sizeof(header)); + session.connection.commit_write(); + + // IPX_CTX_DEBUG(log_ctx, "[%s] Forwarded message", odid.str().c_str()); + + odid.bytes_since_templates_sent += message.length(); + odid.seq_num += message.drec_count(); + + total_bytes += message.length(); + + return true; + } +}; \ No newline at end of file diff --git a/src/plugins/output/forwarder/src/IPFIXMessage.h b/src/plugins/output/forwarder/src/IPFIXMessage.h new file mode 100644 index 00000000..3d2fce63 --- /dev/null +++ b/src/plugins/output/forwarder/src/IPFIXMessage.h @@ -0,0 +1,107 @@ +/** + * \file src/plugins/output/forwarder/src/IPFIXMessage.h + * \author Michal Sedlak + * \brief Simple IPFIX message wrapper + * \date 2021 + */ + +/* Copyright (C) 2021 CESNET, z.s.p.o. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * ALTERNATIVELY, provided that this notice is retained in full, this + * product may be distributed under the terms of the GNU General Public + * License (GPL) version 2 or later, in which case the provisions + * of the GPL apply INSTEAD OF those given above. + * + * This software is provided ``as is'', and any express or implied + * warranties, including, but not limited to, the implied warranties of + * merchantability and fitness for a particular purpose are disclaimed. + * In no event shall the company or contributors be liable for any + * direct, indirect, incidental, special, exemplary, or consequential + * damages (including, but not limited to, procurement of substitute + * goods or services; loss of use, data, or profits; or business + * interruption) however caused and on any theory of liability, whether + * in contract, strict liability, or tort (including negligence or + * otherwise) arising in any way out of the use of this software, even + * if advised of the possibility of such damage. + * + */ + +#pragma once + +#include + +class IPFIXMessage +{ +public: + IPFIXMessage(ipx_msg_ipfix_t *msg) + : msg(msg) + {} + + const ipx_session * + session() + { + ipx_msg_ctx *msg_ctx = ipx_msg_ipfix_get_ctx(msg); + return msg_ctx->session; + } + + uint8_t * + data() + { + return ipx_msg_ipfix_get_packet(msg); + } + + fds_ipfix_msg_hdr * + header() + { + return (fds_ipfix_msg_hdr *)data(); + } + + uint16_t + length() + { + return ntohs(header()->length); + } + + uint32_t + seq_num() + { + return ntohl(header()->seq_num); + } + + int + drec_count() + { + return ipx_msg_ipfix_get_drec_cnt(msg); + } + + uint32_t + odid() + { + return ntohl(header()->odid); + } + + const fds_tsnapshot_t * + get_templates_snapshot() + { + if (drec_count() == 0) { + return NULL; + } + return ipx_msg_ipfix_get_drec(msg, 0)->rec.snap; + } + +private: + ipx_msg_ipfix_t *msg; +}; \ No newline at end of file diff --git a/src/plugins/output/forwarder/src/MessageBuilder.h b/src/plugins/output/forwarder/src/MessageBuilder.h new file mode 100644 index 00000000..941dfd9a --- /dev/null +++ b/src/plugins/output/forwarder/src/MessageBuilder.h @@ -0,0 +1,150 @@ +/** + * \file src/plugins/output/forwarder/src/MessageBuilder.h + * \author Michal Sedlak + * \brief IPFIX message builder + * \date 2021 + */ + +/* Copyright (C) 2021 CESNET, z.s.p.o. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * ALTERNATIVELY, provided that this notice is retained in full, this + * product may be distributed under the terms of the GNU General Public + * License (GPL) version 2 or later, in which case the provisions + * of the GPL apply INSTEAD OF those given above. + * + * This software is provided ``as is'', and any express or implied + * warranties, including, but not limited to, the implied warranties of + * merchantability and fitness for a particular purpose are disclaimed. + * In no event shall the company or contributors be liable for any + * direct, indirect, incidental, special, exemplary, or consequential + * damages (including, but not limited to, procurement of substitute + * goods or services; loss of use, data, or profits; or business + * interruption) however caused and on any theory of liability, whether + * in contract, strict liability, or tort (including negligence or + * otherwise) arising in any way out of the use of this software, even + * if advised of the possibility of such damage. + * + */ + +#pragma once + +#include +#include + +#include +#include + +class MessageBuilder +{ +public: + void + begin_message(fds_ipfix_msg_hdr message_header) + { + write(&message_header, sizeof(message_header)); + } + + void + write_template(const fds_template *tmplt) + { + if (current_set_id != get_template_set_id(tmplt->type)) { + end_template_set(); + begin_template_set(tmplt->type); + } + + write(tmplt->raw.data, tmplt->raw.length); + current_set_length += tmplt->raw.length; + } + + void + finalize_message() + { + end_template_set(); + header()->length = htons(write_offset); + } + + uint8_t * + message_data() + { + return &buffer[0]; + } + + int + message_length() + { + return write_offset; + } + +private: + std::vector buffer; + int write_offset = 0; + int set_header_offset = -1; + int current_set_id = 0; + int current_set_length = 0; + + fds_ipfix_msg_hdr * + header() + { + return (fds_ipfix_msg_hdr *)&buffer[0]; + } + + fds_ipfix_set_hdr * + current_set_header() + { + return (fds_ipfix_set_hdr *)&buffer[set_header_offset]; + } + + void + write(void *data, int length) + { + if (((int)buffer.size() - write_offset) < length) { + buffer.resize(buffer.size() + 1024); + return write(data, length); + } + std::memcpy(&buffer[write_offset], data, length); + write_offset += length; + } + + uint16_t + get_template_set_id(fds_template_type template_type) + { + return (template_type == FDS_TYPE_TEMPLATE + ? FDS_IPFIX_SET_TMPLT : FDS_IPFIX_SET_OPTS_TMPLT); + } + + void + begin_template_set(fds_template_type template_type) + { + fds_ipfix_set_hdr set_header = {}; + set_header.flowset_id = htons(get_template_set_id(template_type)); + + set_header_offset = write_offset; + write(&set_header, sizeof(set_header)); + + current_set_length = sizeof(set_header); + current_set_id = get_template_set_id(template_type); + } + + void + end_template_set() + { + if (set_header_offset != -1) { + current_set_header()->length = htons(current_set_length); + } + current_set_id = 0; + current_set_length = 0; + set_header_offset = -1; + } +}; \ No newline at end of file diff --git a/src/plugins/output/forwarder/src/SyncPipe.h b/src/plugins/output/forwarder/src/SyncPipe.h new file mode 100644 index 00000000..aed128f8 --- /dev/null +++ b/src/plugins/output/forwarder/src/SyncPipe.h @@ -0,0 +1,92 @@ +/** + * \file src/plugins/output/forwarder/src/SyncPipe.h + * \author Michal Sedlak + * \brief Pipe used for synchronization of threads (when waiting on select) + * \date 2021 + */ + +/* Copyright (C) 2021 CESNET, z.s.p.o. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * ALTERNATIVELY, provided that this notice is retained in full, this + * product may be distributed under the terms of the GNU General Public + * License (GPL) version 2 or later, in which case the provisions + * of the GPL apply INSTEAD OF those given above. + * + * This software is provided ``as is'', and any express or implied + * warranties, including, but not limited to, the implied warranties of + * merchantability and fitness for a particular purpose are disclaimed. + * In no event shall the company or contributors be liable for any + * direct, indirect, incidental, special, exemplary, or consequential + * damages (including, but not limited to, procurement of substitute + * goods or services; loss of use, data, or profits; or business + * interruption) however caused and on any theory of liability, whether + * in contract, strict liability, or tort (including negligence or + * otherwise) arising in any way out of the use of this software, even + * if advised of the possibility of such damage. + * + */ + +#pragma once + +#include +#include + +#include + +class SyncPipe +{ +public: + SyncPipe() + { + int pipefd[2]; + if (pipe(pipefd) != 0) { + throw std::string("Cannot create pipe"); + } + readfd = pipefd[0]; + writefd = pipefd[1]; + + int flags = fcntl(readfd, F_GETFL, 0); + fcntl(readfd, F_SETFL, flags | O_NONBLOCK); + } + + void + notify() + { + write(writefd, "A", 1); + } + + void + clear() + { + char discard_buffer[128]; + while (1) { + int n = read(readfd, discard_buffer, 128); + if (n < 128) { + break; + } + } + } + + int + get_readfd() + { + return readfd; + } + +private: + int readfd; + int writefd; +}; diff --git a/src/plugins/output/forwarder/src/config.h b/src/plugins/output/forwarder/src/config.h new file mode 100644 index 00000000..6f28756a --- /dev/null +++ b/src/plugins/output/forwarder/src/config.h @@ -0,0 +1,282 @@ +/** + * \file src/plugins/output/forwarder/src/config.h + * \author Michal Sedlak + * \brief Plugin configuration + * \date 2021 + */ + +/* Copyright (C) 2021 CESNET, z.s.p.o. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * ALTERNATIVELY, provided that this notice is retained in full, this + * product may be distributed under the terms of the GNU General Public + * License (GPL) version 2 or later, in which case the provisions + * of the GPL apply INSTEAD OF those given above. + * + * This software is provided ``as is'', and any express or implied + * warranties, including, but not limited to, the implied warranties of + * merchantability and fitness for a particular purpose are disclaimed. + * In no event shall the company or contributors be liable for any + * direct, indirect, incidental, special, exemplary, or consequential + * damages (including, but not limited to, procurement of substitute + * goods or services; loss of use, data, or profits; or business + * interruption) however caused and on any theory of liability, whether + * in contract, strict liability, or tort (including negligence or + * otherwise) arising in any way out of the use of this software, even + * if advised of the possibility of such damage. + * + */ + +#pragma once + +#include "Forwarder.h" + +#include +#include + +#include +#include + +/// DIY std::optional +template +class Maybe +{ +public: + Maybe() + : has_value_(false) + {} + + Maybe(T value) + : has_value_(true) + , value_(value) + {} + + bool + has_value() + { + return has_value_; + } + + T + value() + { + return value_; + } + +private: + bool has_value_; + T value_; +}; + + +/// Parses the XML config string and configures forwarder accordingly +void +parse_and_configure(ipx_ctx_t *log_ctx, const char *xml_config, Forwarder &forwarder) +{ + /// + /// Config schema definition + /// + enum { + MODE, PROTOCOL, RECONNECT_INTERVAL_SECS, + TEMPLATE_REFRESH_INTERVAL_SECS, TEMPLATE_REFRESH_INTERVAL_BYTES, + HOSTS, HOST, NAME, ADDRESS, PORT + }; + + fds_xml_args host_schema[] = { + FDS_OPTS_ELEM(NAME, "name", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), + FDS_OPTS_ELEM(ADDRESS, "address", FDS_OPTS_T_STRING, 0), + FDS_OPTS_ELEM(PORT, "port", FDS_OPTS_T_STRING, 0), + FDS_OPTS_END + }; + + fds_xml_args hosts_schema[] = { + FDS_OPTS_NESTED(HOST, "host", host_schema, FDS_OPTS_P_MULTI), + FDS_OPTS_END + }; + + fds_xml_args params_schema[] = { + FDS_OPTS_ROOT("params"), + FDS_OPTS_ELEM(MODE, "mode", FDS_OPTS_T_STRING, 0), + FDS_OPTS_ELEM(PROTOCOL, "protocol", FDS_OPTS_T_STRING, 0), + FDS_OPTS_ELEM(TEMPLATE_REFRESH_INTERVAL_SECS, "templateRefreshIntervalSecs", FDS_OPTS_T_INT, FDS_OPTS_P_OPT), + FDS_OPTS_ELEM(TEMPLATE_REFRESH_INTERVAL_BYTES, "templateRefreshIntervalBytes", FDS_OPTS_T_INT, FDS_OPTS_P_OPT), + FDS_OPTS_ELEM(RECONNECT_INTERVAL_SECS, "reconnectIntervalSecs", FDS_OPTS_T_INT, FDS_OPTS_P_OPT), + FDS_OPTS_NESTED(HOSTS, "hosts", hosts_schema, 0), + FDS_OPTS_END + }; + + /// + /// Default parameter values + /// + const int default_template_refresh_interval_secs = 10 * 60; + const int default_template_refresh_interval_bytes = 5 * 1024 * 1024; + const int default_reconnect_interval_secs = 10; + + /// + /// Parsed parameters + /// + struct HostInfo { + std::string name; + std::string address; + std::string port; + }; + + std::string mode; + std::string protocol; + std::vector hosts; + Maybe template_refresh_interval_secs; + Maybe template_refresh_interval_bytes; + Maybe reconnect_interval_secs; + + /// + /// Parsing + /// + auto parser = std::unique_ptr(fds_xml_create(), &fds_xml_destroy); + if (!parser) { + throw std::string("Cannot create XML parser"); + } + + auto lower = [](std::string s) { + for (auto &c : s) { + c = std::tolower(c); + } + return s; + }; + + auto parser_error = [&]() { + throw std::string("XML parser error: ") + fds_xml_last_err(parser.get()); + }; + + if (fds_xml_set_args(parser.get(), params_schema) != FDS_OK) { + parser_error(); + } + + auto params_elem = fds_xml_parse_mem(parser.get(), xml_config, true); + if (!params_elem) { + parser_error(); + } + + auto process_host = [&](fds_xml_ctx_t *host_elem) { + HostInfo host; + const fds_xml_cont *content; + while (fds_xml_next(host_elem, &content) != FDS_EOC) { + switch (content->id) { + case NAME: + host.name = std::string(content->ptr_string); + break; + case ADDRESS: + host.address = std::string(content->ptr_string); + break; + case PORT: + host.port = std::string(content->ptr_string); + break; + } + } + hosts.push_back(host); + }; + + auto process_hosts = [&](fds_xml_ctx_t *hosts_elem) { + const fds_xml_cont *content; + while (fds_xml_next(hosts_elem, &content) != FDS_EOC) { + process_host(content->ptr_ctx); + } + }; + + const fds_xml_cont *content; + while (fds_xml_next(params_elem, &content) != FDS_EOC) { + switch (content->id) { + case MODE: + mode = std::string(content->ptr_string); + break; + case PROTOCOL: + protocol = std::string(content->ptr_string); + break; + case HOSTS: + process_hosts(content->ptr_ctx); + break; + case TEMPLATE_REFRESH_INTERVAL_SECS: + template_refresh_interval_secs = content->val_int; + break; + case TEMPLATE_REFRESH_INTERVAL_BYTES: + template_refresh_interval_bytes = content->val_int; + break; + case RECONNECT_INTERVAL_SECS: + reconnect_interval_secs = content->val_int; + break; + } + } + + /// + /// Check parameters and configure + /// + if (lower(mode) == "all" || lower(mode) == "send to all" || lower(mode) == "send-to-all") { + forwarder.set_forward_mode(ForwardMode::SendToAll); + } else if (lower(mode) == "roundrobin" || lower(mode) == "round robin" || lower(mode) == "round-robin") { + forwarder.set_forward_mode(ForwardMode::RoundRobin); + } else { + throw "Invalid mode '" + mode + "', possible values are: 'roundrobin', 'all'"; + } + + if (lower(protocol) == "udp") { + forwarder.set_transport_protocol(TransProto::Udp); + } else if (lower(protocol) == "tcp") { + forwarder.set_transport_protocol(TransProto::Tcp); + } else { + throw "Invalid protocol '" + protocol + "', possible values are: 'tcp', 'udp'"; + } + + if (template_refresh_interval_secs.has_value()) { + if (template_refresh_interval_secs.value() >= 0) { + if (lower(protocol) == "tcp") { + IPX_CTX_WARNING(log_ctx, "Templates refresh interval is set but transport protocol is TCP"); + } + forwarder.set_template_refresh_interval_secs(template_refresh_interval_secs.value()); + } else { + throw std::string("Invalid template refresh secs interval"); + } + } else { + forwarder.set_template_refresh_interval_secs(default_template_refresh_interval_secs); + } + + if (template_refresh_interval_bytes.has_value()) { + if (template_refresh_interval_bytes.value() >= 0) { + if (lower(protocol) == "tcp") { + IPX_CTX_WARNING(log_ctx, "Templates refresh interval is set but transport protocol is TCP"); + } + forwarder.set_template_refresh_interval_bytes(template_refresh_interval_bytes.value()); + } else { + throw std::string("Invalid template refresh bytes interval"); + } + } else { + forwarder.set_template_refresh_interval_secs(default_template_refresh_interval_bytes); + } + + if (template_refresh_interval_bytes.has_value()) { + if (reconnect_interval_secs.value() >= 0) { + if (lower(protocol) == "udp") { + IPX_CTX_WARNING(log_ctx, "Reconnect interval is set but transport protocol is UDP"); + } + forwarder.set_reconnect_interval(reconnect_interval_secs.value()); + } else { + throw std::string("Invalid reconnect interval"); + } + } else { + forwarder.set_template_refresh_interval_secs(default_reconnect_interval_secs); + } + + for (auto &host : hosts) { + forwarder.add_client(host.address, host.port, host.name); + } +} \ No newline at end of file diff --git a/src/plugins/output/forwarder/src/main.cpp b/src/plugins/output/forwarder/src/main.cpp new file mode 100644 index 00000000..adf5399c --- /dev/null +++ b/src/plugins/output/forwarder/src/main.cpp @@ -0,0 +1,110 @@ +/** + * \file src/plugins/output/forwarder/src/main.cpp + * \author Michal Sedlak + * \brief Main plugin entry point + * \date 2021 + */ + +/* Copyright (C) 2021 CESNET, z.s.p.o. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * ALTERNATIVELY, provided that this notice is retained in full, this + * product may be distributed under the terms of the GNU General Public + * License (GPL) version 2 or later, in which case the provisions + * of the GPL apply INSTEAD OF those given above. + * + * This software is provided ``as is'', and any express or implied + * warranties, including, but not limited to, the implied warranties of + * merchantability and fitness for a particular purpose are disclaimed. + * In no event shall the company or contributors be liable for any + * direct, indirect, incidental, special, exemplary, or consequential + * damages (including, but not limited to, procurement of substitute + * goods or services; loss of use, data, or profits; or business + * interruption) however caused and on any theory of liability, whether + * in contract, strict liability, or tort (including negligence or + * otherwise) arising in any way out of the use of this software, even + * if advised of the possibility of such damage. + * + */ + +#include "Forwarder.h" +#include "config.h" + +#include + +#include +#include + +/// Plugin definition +IPX_API struct ipx_plugin_info ipx_plugin_info = { + "forwarder", + "Example output plugin.", + IPX_PT_OUTPUT, + 0, + "1.0.0", + "2.0.0" +}; + +int +ipx_plugin_init(ipx_ctx_t *ctx, const char *xml_config) +{ + auto forwarder = std::unique_ptr(new Forwarder(ctx)); + try { + parse_and_configure(ctx, xml_config, *forwarder); + } catch (std::string &error_message) { + IPX_CTX_ERROR(ctx, "%s", error_message.c_str()); + return IPX_ERR_FORMAT; + } + forwarder->start(); + + ipx_msg_mask_t mask = IPX_MSG_IPFIX | IPX_MSG_SESSION; + ipx_ctx_subscribe(ctx, &mask, NULL); + ipx_ctx_private_set(ctx, forwarder.release()); + + return IPX_OK; +} + +void +ipx_plugin_destroy(ipx_ctx_t *ctx, void *priv) +{ + (void)ctx; + Forwarder *forwarder = (Forwarder *)(priv); + forwarder->stop(); + delete forwarder; +} + +int +ipx_plugin_process(ipx_ctx_t *ctx, void *priv, ipx_msg_t *msg) +{ + Forwarder *forwarder = (Forwarder *)(priv); + try { + switch (ipx_msg_get_type(msg)) { + case IPX_MSG_IPFIX: + forwarder->on_ipfix_message(ipx_msg_base2ipfix(msg)); + break; + case IPX_MSG_SESSION: + forwarder->on_session_message(ipx_msg_base2session(msg)); + break; + default: assert(0); + } + } catch (std::string &error_message) { + IPX_CTX_ERROR(ctx, "%s", error_message.c_str()); + return IPX_ERR_DENIED; + } catch (std::bad_alloc &ex) { + IPX_CTX_ERROR(ctx, "Memory error"); + return IPX_ERR_NOMEM; + } + return IPX_OK; +} From 9772dd11597dfcb402272d8582e0bbc7f129f97b Mon Sep 17 00:00:00 2001 From: Michal Sedlak Date: Wed, 3 Feb 2021 00:50:23 +0100 Subject: [PATCH 2/3] Forwarder: Config changes --- src/plugins/output/forwarder/README.rst | 4 ++ src/plugins/output/forwarder/src/Connection.h | 4 +- .../forwarder/src/ConnectionManager.cpp | 12 +++-- .../output/forwarder/src/ConnectionManager.h | 7 ++- src/plugins/output/forwarder/src/Forwarder.h | 6 +++ src/plugins/output/forwarder/src/config.h | 54 +++++++++++++------ 6 files changed, 65 insertions(+), 22 deletions(-) diff --git a/src/plugins/output/forwarder/README.rst b/src/plugins/output/forwarder/README.rst index 8dd97c38..64613329 100644 --- a/src/plugins/output/forwarder/README.rst +++ b/src/plugins/output/forwarder/README.rst @@ -43,6 +43,10 @@ Parameters The transport protocol to use [values: TCP/UDP] +:``connectionBufferSize``: + Size of the buffer of each connection (Warning: number of connections = number of input exporters * number of hosts) + [value: number of bytes, default: 4194304] + :``templateRefreshIntervalSecs``: Send templates again every N seconds (UDP only) [value: number of seconds, default: 600] diff --git a/src/plugins/output/forwarder/src/Connection.h b/src/plugins/output/forwarder/src/Connection.h index 7a3d6b37..09af5a29 100644 --- a/src/plugins/output/forwarder/src/Connection.h +++ b/src/plugins/output/forwarder/src/Connection.h @@ -54,8 +54,6 @@ #include #include -static constexpr int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; - class ConnectionManager; class Connection @@ -67,7 +65,7 @@ friend class ConnectionManager; /// The flag won't be reset when the connection is reestablished! std::atomic connection_lost_flag { false }; - Connection(ConnectionManager &manager, ConnectionParams params, long buffer_size = DEFAULT_BUFFER_SIZE); + Connection(ConnectionManager &manager, ConnectionParams params, long buffer_size); bool connect(); diff --git a/src/plugins/output/forwarder/src/ConnectionManager.cpp b/src/plugins/output/forwarder/src/ConnectionManager.cpp index 22c40189..f55dafaf 100644 --- a/src/plugins/output/forwarder/src/ConnectionManager.cpp +++ b/src/plugins/output/forwarder/src/ConnectionManager.cpp @@ -44,7 +44,7 @@ Connection & ConnectionManager::add_client(ConnectionParams params) { - auto connection_ptr = std::unique_ptr(new Connection(*this, params)); + auto connection_ptr = std::unique_ptr(new Connection(*this, params, connection_buffer_size)); auto &connection = *connection_ptr; std::lock_guard guard(mutex); if (connection.connect()) { @@ -157,8 +157,14 @@ ConnectionManager::stop() } void -ConnectionManager::set_reconnect_interval(int secs) +ConnectionManager::set_reconnect_interval(int number_of_seconds) { - reconnect_interval_secs = secs; + reconnect_interval_secs = number_of_seconds; +} + +void +ConnectionManager::set_connection_buffer_size(long number_of_bytes) +{ + connection_buffer_size = number_of_bytes; } diff --git a/src/plugins/output/forwarder/src/ConnectionManager.h b/src/plugins/output/forwarder/src/ConnectionManager.h index 752cedaf..2fe73237 100644 --- a/src/plugins/output/forwarder/src/ConnectionManager.h +++ b/src/plugins/output/forwarder/src/ConnectionManager.h @@ -55,6 +55,7 @@ class Connection; +static constexpr long DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; static constexpr int DEFAULT_RECONNECT_INTERVAL_SECS = 5; class ConnectionManager @@ -72,9 +73,13 @@ friend class Connection; stop(); void - set_reconnect_interval(int secs); + set_reconnect_interval(int number_of_seconds); + + void + set_connection_buffer_size(long number_of_bytes); private: + long connection_buffer_size = DEFAULT_BUFFER_SIZE; int reconnect_interval_secs = DEFAULT_RECONNECT_INTERVAL_SECS; std::mutex mutex; std::vector> active_connections; diff --git a/src/plugins/output/forwarder/src/Forwarder.h b/src/plugins/output/forwarder/src/Forwarder.h index c73e6a13..803e84e5 100644 --- a/src/plugins/output/forwarder/src/Forwarder.h +++ b/src/plugins/output/forwarder/src/Forwarder.h @@ -155,6 +155,12 @@ class Forwarder this->forward_mode = forward_mode; } + void + set_connection_buffer_size(long number_of_bytes) + { + connection_manager.set_connection_buffer_size(number_of_bytes); + } + void set_template_refresh_interval_secs(int number_of_seconds) { diff --git a/src/plugins/output/forwarder/src/config.h b/src/plugins/output/forwarder/src/config.h index 6f28756a..81298548 100644 --- a/src/plugins/output/forwarder/src/config.h +++ b/src/plugins/output/forwarder/src/config.h @@ -89,15 +89,23 @@ parse_and_configure(ipx_ctx_t *log_ctx, const char *xml_config, Forwarder &forwa /// Config schema definition /// enum { - MODE, PROTOCOL, RECONNECT_INTERVAL_SECS, - TEMPLATE_REFRESH_INTERVAL_SECS, TEMPLATE_REFRESH_INTERVAL_BYTES, - HOSTS, HOST, NAME, ADDRESS, PORT + MODE, + PROTOCOL, + RECONNECT_INTERVAL_SECS, + TEMPLATE_REFRESH_INTERVAL_SECS, + TEMPLATE_REFRESH_INTERVAL_BYTES, + CONNECTION_BUFFER_SIZE, + HOSTS, + HOST, + NAME, + ADDRESS, + PORT }; fds_xml_args host_schema[] = { - FDS_OPTS_ELEM(NAME, "name", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), - FDS_OPTS_ELEM(ADDRESS, "address", FDS_OPTS_T_STRING, 0), - FDS_OPTS_ELEM(PORT, "port", FDS_OPTS_T_STRING, 0), + FDS_OPTS_ELEM(NAME , "name" , FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), + FDS_OPTS_ELEM(ADDRESS, "address", FDS_OPTS_T_STRING, 0 ), + FDS_OPTS_ELEM(PORT , "port" , FDS_OPTS_T_STRING, 0 ), FDS_OPTS_END }; @@ -107,22 +115,24 @@ parse_and_configure(ipx_ctx_t *log_ctx, const char *xml_config, Forwarder &forwa }; fds_xml_args params_schema[] = { - FDS_OPTS_ROOT("params"), - FDS_OPTS_ELEM(MODE, "mode", FDS_OPTS_T_STRING, 0), - FDS_OPTS_ELEM(PROTOCOL, "protocol", FDS_OPTS_T_STRING, 0), - FDS_OPTS_ELEM(TEMPLATE_REFRESH_INTERVAL_SECS, "templateRefreshIntervalSecs", FDS_OPTS_T_INT, FDS_OPTS_P_OPT), - FDS_OPTS_ELEM(TEMPLATE_REFRESH_INTERVAL_BYTES, "templateRefreshIntervalBytes", FDS_OPTS_T_INT, FDS_OPTS_P_OPT), - FDS_OPTS_ELEM(RECONNECT_INTERVAL_SECS, "reconnectIntervalSecs", FDS_OPTS_T_INT, FDS_OPTS_P_OPT), - FDS_OPTS_NESTED(HOSTS, "hosts", hosts_schema, 0), + FDS_OPTS_ROOT ("params"), + FDS_OPTS_ELEM (MODE , "mode" , FDS_OPTS_T_STRING, 0 ), + FDS_OPTS_ELEM (PROTOCOL , "protocol" , FDS_OPTS_T_STRING, 0 ), + FDS_OPTS_ELEM (CONNECTION_BUFFER_SIZE , "connectionBufferSize" , FDS_OPTS_T_INT , FDS_OPTS_P_OPT), + FDS_OPTS_ELEM (TEMPLATE_REFRESH_INTERVAL_SECS , "templateRefreshIntervalSecs" , FDS_OPTS_T_INT , FDS_OPTS_P_OPT), + FDS_OPTS_ELEM (TEMPLATE_REFRESH_INTERVAL_BYTES, "templateRefreshIntervalBytes", FDS_OPTS_T_INT , FDS_OPTS_P_OPT), + FDS_OPTS_ELEM (RECONNECT_INTERVAL_SECS , "reconnectIntervalSecs" , FDS_OPTS_T_INT , FDS_OPTS_P_OPT), + FDS_OPTS_NESTED(HOSTS , "hosts" , hosts_schema , 0 ), FDS_OPTS_END }; /// /// Default parameter values /// - const int default_template_refresh_interval_secs = 10 * 60; + const int default_template_refresh_interval_secs = 10 * 60; const int default_template_refresh_interval_bytes = 5 * 1024 * 1024; - const int default_reconnect_interval_secs = 10; + const int default_reconnect_interval_secs = 10; + const int default_connection_buffer_size = 4 * 1024 * 1024; /// /// Parsed parameters @@ -136,6 +146,7 @@ parse_and_configure(ipx_ctx_t *log_ctx, const char *xml_config, Forwarder &forwa std::string mode; std::string protocol; std::vector hosts; + Maybe connection_buffer_size; Maybe template_refresh_interval_secs; Maybe template_refresh_interval_bytes; Maybe reconnect_interval_secs; @@ -206,6 +217,9 @@ parse_and_configure(ipx_ctx_t *log_ctx, const char *xml_config, Forwarder &forwa case HOSTS: process_hosts(content->ptr_ctx); break; + case CONNECTION_BUFFER_SIZE: + connection_buffer_size = content->val_int; + break; case TEMPLATE_REFRESH_INTERVAL_SECS: template_refresh_interval_secs = content->val_int; break; @@ -237,6 +251,16 @@ parse_and_configure(ipx_ctx_t *log_ctx, const char *xml_config, Forwarder &forwa throw "Invalid protocol '" + protocol + "', possible values are: 'tcp', 'udp'"; } + if (connection_buffer_size.has_value()) { + if (connection_buffer_size.value() > 0) { + forwarder.set_connection_buffer_size(connection_buffer_size.value()); + } else { + throw std::string("Invalid connection buffer size"); + } + } else { + forwarder.set_connection_buffer_size(default_connection_buffer_size); + } + if (template_refresh_interval_secs.has_value()) { if (template_refresh_interval_secs.value() >= 0) { if (lower(protocol) == "tcp") { From 13cebfec9b23a70cf4822c30a4d15881223587c4 Mon Sep 17 00:00:00 2001 From: Michal Sedlak Date: Wed, 3 Feb 2021 00:51:03 +0100 Subject: [PATCH 3/3] Forwarder: Get rid of warning --- src/plugins/output/forwarder/src/Connection.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/plugins/output/forwarder/src/Connection.cpp b/src/plugins/output/forwarder/src/Connection.cpp index 2dcdec0a..b7d2c5b6 100644 --- a/src/plugins/output/forwarder/src/Connection.cpp +++ b/src/plugins/output/forwarder/src/Connection.cpp @@ -110,8 +110,7 @@ Connection::send_some() } } return true; - - } else if (params.protocol == TransProto::Tcp) { + } else { return buffer.send_data(sockfd) >= 0; } }