diff --git a/src/plugins/output/CMakeLists.txt b/src/plugins/output/CMakeLists.txt
index aa123f2d..11681bdd 100644
--- a/src/plugins/output/CMakeLists.txt
+++ b/src/plugins/output/CMakeLists.txt
@@ -6,3 +6,4 @@ add_subdirectory(json-kafka)
add_subdirectory(timecheck)
add_subdirectory(viewer)
add_subdirectory(ipfix)
+add_subdirectory(forwarder)
diff --git a/src/plugins/output/forwarder/CMakeLists.txt b/src/plugins/output/forwarder/CMakeLists.txt
new file mode 100644
index 00000000..2bec16c3
--- /dev/null
+++ b/src/plugins/output/forwarder/CMakeLists.txt
@@ -0,0 +1,37 @@
+# Create a linkable module
+add_library(forwarder-output MODULE
+ src/main.cpp
+ src/config.h
+ src/Forwarder.h
+ src/ConnectionManager.h
+ src/ConnectionManager.cpp
+ src/ConnectionParams.h
+ src/Connection.h
+ src/Connection.cpp
+ src/ConnectionBuffer.h
+ src/SyncPipe.h
+ src/IPFIXMessage.h
+ src/MessageBuilder.h
+)
+
+install(
+ TARGETS forwarder-output
+ LIBRARY DESTINATION "${INSTALL_DIR_LIB}/ipfixcol2/"
+)
+
+if (ENABLE_DOC_MANPAGE)
+ # Build a manual page
+ set(SRC_FILE "${CMAKE_CURRENT_SOURCE_DIR}/doc/ipfixcol2-forwarder-output.7.rst")
+ set(DST_FILE "${CMAKE_CURRENT_BINARY_DIR}/ipfixcol2-forwarder-output.7")
+
+ add_custom_command(TARGET forwarder-output PRE_BUILD
+ COMMAND ${RST2MAN_EXECUTABLE} --syntax-highlight=none ${SRC_FILE} ${DST_FILE}
+ DEPENDS ${SRC_FILE}
+ VERBATIM
+ )
+
+ install(
+ FILES "${DST_FILE}"
+ DESTINATION "${INSTALL_DIR_MAN}/man7"
+ )
+endif()
diff --git a/src/plugins/output/forwarder/README.rst b/src/plugins/output/forwarder/README.rst
new file mode 100644
index 00000000..64613329
--- /dev/null
+++ b/src/plugins/output/forwarder/README.rst
@@ -0,0 +1,77 @@
+Forwarder (output plugin)
+==========================
+
+This plugin allows forwarding incoming IPFIX messages to other collector in various modes.
+
+It can be used to broadcast messages to multiple collectors (e.g. a main and a backup collector),
+or to distribute messages across multiple collectors (e.g. for load balancing).
+
+Example configuration
+---------------------
+
+.. code-block:: xml
+
+
+
+Parameters
+----------
+
+:``mode``:
+ The forwarding mode; round robin (messages are sent to one host at time and hosts are cycled through) or all (messages are broadcasted to all hosts)
+ [values: RoundRobin/All]
+
+:``protocol``:
+ The transport protocol to use
+ [values: TCP/UDP]
+
+:``connectionBufferSize``:
+ Size of the buffer of each connection (Warning: number of connections = number of input exporters * number of hosts)
+ [value: number of bytes, default: 4194304]
+
+:``templateRefreshIntervalSecs``:
+ Send templates again every N seconds (UDP only)
+ [value: number of seconds, default: 600]
+
+:``templateRefreshIntervalBytes``:
+ Send templates again every N bytes (UDP only)
+ [value: number of bytes, default: 5000000]
+
+:``reconnectIntervalSecs``:
+ Attempt to reconnect every N seconds in case the connection drops (TCP only)
+ [value: number of seconds, default: 10]
+
+:``hosts``:
+ The receiving hosts
+
+ :``host``:
+ :``name``:
+ Optional identification of the host
+ [value: string, default:
:]
+
+ :``address``:
+ The address of the host
+ [value: IPv4/IPv6 address or a hostname]
+
+ :``port``:
+ The port to connect to
+ [value: port number]
+
diff --git a/src/plugins/output/forwarder/TODO.txt b/src/plugins/output/forwarder/TODO.txt
new file mode 100644
index 00000000..37631f98
--- /dev/null
+++ b/src/plugins/output/forwarder/TODO.txt
@@ -0,0 +1,5 @@
+* Template withdrawals
+* More effective way of handling template changes - currently all the templates are being sent again every time any change in templates is detected
+* Message MTU
+* Possible bug: when testing, a small number of data records seems to be lost (something like 20 out of 1,000,000)
+* Connection buffer size
\ No newline at end of file
diff --git a/src/plugins/output/forwarder/doc/ipfixcol2-forwarder-output.7.rst b/src/plugins/output/forwarder/doc/ipfixcol2-forwarder-output.7.rst
new file mode 100644
index 00000000..a5d6767d
--- /dev/null
+++ b/src/plugins/output/forwarder/doc/ipfixcol2-forwarder-output.7.rst
@@ -0,0 +1,20 @@
+============================
+ ipfixcol2-forwarder-output
+============================
+
+--------------------------
+Forwarder (output plugin)
+--------------------------
+
+:Author: Michal Sedlak (xsedla0v@stud.fit.vutbr.cz)
+:Date: 2021-01-28
+:Copyright: Copyright © 2021 CESNET, z.s.p.o.
+:Version: 1.0
+:Manual section: 7
+:Manual group: IPFIXcol collector
+
+Description
+-----------
+
+.. include:: ../README.rst
+ :start-line: 3
diff --git a/src/plugins/output/forwarder/src/Connection.cpp b/src/plugins/output/forwarder/src/Connection.cpp
new file mode 100644
index 00000000..b7d2c5b6
--- /dev/null
+++ b/src/plugins/output/forwarder/src/Connection.cpp
@@ -0,0 +1,130 @@
+/**
+ * \file src/plugins/output/forwarder/src/Connection.cpp
+ * \author Michal Sedlak
+ * \brief Buffered socket connection
+ * \date 2021
+ */
+
+/* Copyright (C) 2021 CESNET, z.s.p.o.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * 3. Neither the name of the Company nor the names of its contributors
+ * may be used to endorse or promote products derived from this
+ * software without specific prior written permission.
+ *
+ * ALTERNATIVELY, provided that this notice is retained in full, this
+ * product may be distributed under the terms of the GNU General Public
+ * License (GPL) version 2 or later, in which case the provisions
+ * of the GPL apply INSTEAD OF those given above.
+ *
+ * This software is provided ``as is'', and any express or implied
+ * warranties, including, but not limited to, the implied warranties of
+ * merchantability and fitness for a particular purpose are disclaimed.
+ * In no event shall the company or contributors be liable for any
+ * direct, indirect, incidental, special, exemplary, or consequential
+ * damages (including, but not limited to, procurement of substitute
+ * goods or services; loss of use, data, or profits; or business
+ * interruption) however caused and on any theory of liability, whether
+ * in contract, strict liability, or tort (including negligence or
+ * otherwise) arising in any way out of the use of this software, even
+ * if advised of the possibility of such damage.
+ *
+ */
+
+#include "Connection.h"
+
+#include
+
+Connection::Connection(ConnectionManager &manager, ConnectionParams params, long buffer_size)
+: manager(manager)
+, params(params)
+, buffer(buffer_size)
+{
+}
+
+bool
+Connection::connect()
+{
+ if (sockfd >= 0) {
+ ::close(sockfd);
+ }
+ sockfd = params.make_socket();
+ return sockfd >= 0;
+}
+
+std::unique_lock
+Connection::begin_write()
+{
+ return std::unique_lock(buffer_mutex);
+}
+
+bool
+Connection::write(void *data, long length)
+{
+ return buffer.write((uint8_t *)data, length);
+}
+
+void
+Connection::rollback_write()
+{
+ buffer.rollback();
+}
+
+long
+Connection::writeable()
+{
+ return buffer.writeable();
+}
+
+void
+Connection::commit_write()
+{
+ buffer.commit();
+ manager.pipe.notify();
+ has_data_to_send = buffer.readable();
+}
+
+bool
+Connection::send_some()
+{
+ if (params.protocol == TransProto::Udp) {
+ while (1) {
+ fds_ipfix_msg_hdr ipfix_header;
+ if (!buffer.peek(ipfix_header)) {
+ return true;
+ }
+ auto message_length = ntohs(ipfix_header.length);
+ int ret = buffer.send_data(sockfd, message_length);
+ if (ret == 0 || !buffer.readable()) {
+ return true;
+ } else if (ret < 0) {
+ return false;
+ }
+ }
+ return true;
+ } else {
+ return buffer.send_data(sockfd) >= 0;
+ }
+}
+
+void
+Connection::close()
+{
+ close_flag = true;
+ manager.pipe.notify();
+}
+
+Connection::~Connection()
+{
+ if (sockfd >= 0) {
+ ::close(sockfd);
+ }
+}
\ No newline at end of file
diff --git a/src/plugins/output/forwarder/src/Connection.h b/src/plugins/output/forwarder/src/Connection.h
new file mode 100644
index 00000000..09af5a29
--- /dev/null
+++ b/src/plugins/output/forwarder/src/Connection.h
@@ -0,0 +1,117 @@
+/**
+ * \file src/plugins/output/forwarder/src/Connection.h
+ * \author Michal Sedlak
+ * \brief Buffered socket connection
+ * \date 2021
+ */
+
+/* Copyright (C) 2021 CESNET, z.s.p.o.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * 3. Neither the name of the Company nor the names of its contributors
+ * may be used to endorse or promote products derived from this
+ * software without specific prior written permission.
+ *
+ * ALTERNATIVELY, provided that this notice is retained in full, this
+ * product may be distributed under the terms of the GNU General Public
+ * License (GPL) version 2 or later, in which case the provisions
+ * of the GPL apply INSTEAD OF those given above.
+ *
+ * This software is provided ``as is'', and any express or implied
+ * warranties, including, but not limited to, the implied warranties of
+ * merchantability and fitness for a particular purpose are disclaimed.
+ * In no event shall the company or contributors be liable for any
+ * direct, indirect, incidental, special, exemplary, or consequential
+ * damages (including, but not limited to, procurement of substitute
+ * goods or services; loss of use, data, or profits; or business
+ * interruption) however caused and on any theory of liability, whether
+ * in contract, strict liability, or tort (including negligence or
+ * otherwise) arising in any way out of the use of this software, even
+ * if advised of the possibility of such damage.
+ *
+ */
+
+#pragma once
+
+#include "ConnectionManager.h"
+#include "ConnectionParams.h"
+#include "ConnectionBuffer.h"
+
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include
+
+class ConnectionManager;
+
+class Connection
+{
+friend class ConnectionManager;
+
+public:
+ /// Flag indicating that the connection was lost and the forwarder needs to resend templates etc.
+ /// The flag won't be reset when the connection is reestablished!
+ std::atomic connection_lost_flag { false };
+
+ Connection(ConnectionManager &manager, ConnectionParams params, long buffer_size);
+
+ bool
+ connect();
+
+ std::unique_lock
+ begin_write();
+
+ bool
+ write(void *data, long length);
+
+ bool
+ send_some();
+
+ void
+ commit_write();
+
+ void
+ rollback_write();
+
+ long
+ writeable();
+
+ void
+ close();
+
+ ~Connection();
+
+private:
+ /// The manager managing this connection
+ ConnectionManager &manager;
+
+ /// The parameters to estabilish the connection
+ ConnectionParams params;
+
+ /// The connection socket
+ int sockfd = -1;
+
+ /// Buffer for the data to send and a mutex guarding it
+ /// (buffer will be accessed from sender thread and writer thread)
+ std::mutex buffer_mutex;
+ ConnectionBuffer buffer;
+
+ /// Flag indicating whether the buffer has any data to send so we don't have to lock the mutex every time
+ /// (doesn't need to be atomic because we only set it while holding the mutex)
+ bool has_data_to_send = false;
+
+ /// Flag indicating that the connection has been closed and can be disposed of after the data is sent
+ std::atomic close_flag { false };
+};
\ No newline at end of file
diff --git a/src/plugins/output/forwarder/src/ConnectionBuffer.h b/src/plugins/output/forwarder/src/ConnectionBuffer.h
new file mode 100644
index 00000000..9db928a0
--- /dev/null
+++ b/src/plugins/output/forwarder/src/ConnectionBuffer.h
@@ -0,0 +1,221 @@
+/**
+ * \file src/plugins/output/forwarder/src/ConnectionBuffer.h
+ * \author Michal Sedlak
+ * \brief Ring buffer used by connections
+ * \date 2021
+ */
+
+/* Copyright (C) 2021 CESNET, z.s.p.o.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * 3. Neither the name of the Company nor the names of its contributors
+ * may be used to endorse or promote products derived from this
+ * software without specific prior written permission.
+ *
+ * ALTERNATIVELY, provided that this notice is retained in full, this
+ * product may be distributed under the terms of the GNU General Public
+ * License (GPL) version 2 or later, in which case the provisions
+ * of the GPL apply INSTEAD OF those given above.
+ *
+ * This software is provided ``as is'', and any express or implied
+ * warranties, including, but not limited to, the implied warranties of
+ * merchantability and fitness for a particular purpose are disclaimed.
+ * In no event shall the company or contributors be liable for any
+ * direct, indirect, incidental, special, exemplary, or consequential
+ * damages (including, but not limited to, procurement of substitute
+ * goods or services; loss of use, data, or profits; or business
+ * interruption) however caused and on any theory of liability, whether
+ * in contract, strict liability, or tort (including negligence or
+ * otherwise) arising in any way out of the use of this software, even
+ * if advised of the possibility of such damage.
+ *
+ */
+
+#pragma once
+
+#include
+
+#include
+#include
+#include
+#include
+
+class ConnectionBuffer
+{
+public:
+ ConnectionBuffer(long capacity)
+ : capacity(capacity)
+ , buffer(capacity)
+ {}
+
+ void
+ rollback()
+ {
+ write_offset = read_end_offset;
+ }
+
+ void
+ commit()
+ {
+ read_end_offset = write_offset;
+ }
+
+ long
+ writeable()
+ {
+ return writeable_from(write_offset);
+ }
+
+ bool
+ write(uint8_t *data, long length)
+ {
+ long pos = raw_write_at(write_offset, data, length);
+ if (pos == -1) {
+ return false;
+ }
+ write_offset = pos;
+ return true;
+ }
+
+ template
+ bool
+ write(T data)
+ {
+ return write((uint8_t *)&data, sizeof(T));
+ }
+
+ long
+ readable()
+ {
+ return read_offset > read_end_offset
+ ? capacity - read_offset + read_end_offset
+ : read_end_offset - read_offset;
+ }
+
+ bool
+ peek(uint8_t *data, long length)
+ {
+ if (readable() < length) {
+ return false;
+ }
+ raw_read_at(read_offset, data, length);
+ return true;
+ }
+
+ template
+ bool
+ peek(T &item)
+ {
+ return peek((uint8_t *)&item, sizeof(item));
+ }
+
+ int
+ send_data(int sockfd, long length = -1)
+ {
+ if (length == -1) {
+ length = readable();
+ }
+ iovec iov[2] = {};
+ iov[0].iov_len = std::min(cont_readable_from(read_offset), length);
+ iov[0].iov_base = &buffer[read_offset];
+ iov[1].iov_len = length - iov[0].iov_len;
+ iov[1].iov_base = &buffer[0];
+ msghdr msg_hdr = {};
+ msg_hdr.msg_iov = iov;
+ msg_hdr.msg_iovlen = 2;
+ int ret = sendmsg(sockfd, &msg_hdr, MSG_DONTWAIT | MSG_NOSIGNAL);
+ if (ret < 0) {
+ return (errno == EWOULDBLOCK || errno == EAGAIN) ? 0 : ret;
+ }
+ read_offset = advance(read_offset, ret);
+ return ret;
+ }
+
+private:
+ long capacity;
+ long read_offset = 0;
+ long read_end_offset = 0;
+ long write_offset = 0;
+ std::vector buffer;
+
+ long
+ advance(long pos, long n)
+ {
+ return (pos + n) % capacity;
+ }
+
+ long
+ readable_from(long pos)
+ {
+ return pos > read_end_offset
+ ? capacity - pos + read_end_offset
+ : read_end_offset - pos;
+ }
+
+ long
+ cont_readable_from(long pos)
+ {
+ return pos > read_end_offset
+ ? capacity - pos
+ : read_end_offset - pos;
+ }
+
+ long
+ raw_read_at(long pos, uint8_t *data, long length)
+ {
+ if (readable_from(pos) < length) {
+ return -1;
+ }
+ long read1 = std::min(cont_readable_from(pos), length);
+ long read2 = length - read1;
+ memcpy(&data[0], &buffer[pos], read1);
+ memcpy(&data[read1], &buffer[advance(pos, read1)], read2);
+ return advance(pos, length);
+ }
+
+ long
+ cont_writeable_from(long pos)
+ {
+ return read_offset > pos
+ ? read_offset - pos - 1
+ : (read_offset == 0 ? capacity - pos - 1 : capacity - pos);
+ }
+
+ long
+ writeable_from(long pos)
+ {
+ return read_offset > pos
+ ? read_offset - pos - 1
+ : capacity - pos + read_offset - 1;
+ }
+
+ long
+ raw_write_at(long pos, uint8_t *data, long length)
+ {
+ /// WARNING: Does not advance the write offset
+ if (writeable_from(pos) < length) {
+ return -1;
+ }
+ long write1 = std::min(length, cont_writeable_from(pos));
+ long write2 = length - write1;
+ memcpy(&buffer[pos], &data[0], write1);
+ memcpy(&buffer[advance(pos, write1)], &data[write1], write2);
+ return advance(pos, length);
+ }
+
+ template
+ bool
+ raw_write_at(long pos, T data)
+ {
+ return raw_write_at(pos, (uint8_t *)&data, sizeof(T));
+ }
+
+};
\ No newline at end of file
diff --git a/src/plugins/output/forwarder/src/ConnectionManager.cpp b/src/plugins/output/forwarder/src/ConnectionManager.cpp
new file mode 100644
index 00000000..f55dafaf
--- /dev/null
+++ b/src/plugins/output/forwarder/src/ConnectionManager.cpp
@@ -0,0 +1,170 @@
+/**
+ * \file src/plugins/output/forwarder/src/ConnectionManager.cpp
+ * \author Michal Sedlak
+ * \brief Connection manager
+ * \date 2021
+ */
+
+/* Copyright (C) 2021 CESNET, z.s.p.o.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * 3. Neither the name of the Company nor the names of its contributors
+ * may be used to endorse or promote products derived from this
+ * software without specific prior written permission.
+ *
+ * ALTERNATIVELY, provided that this notice is retained in full, this
+ * product may be distributed under the terms of the GNU General Public
+ * License (GPL) version 2 or later, in which case the provisions
+ * of the GPL apply INSTEAD OF those given above.
+ *
+ * This software is provided ``as is'', and any express or implied
+ * warranties, including, but not limited to, the implied warranties of
+ * merchantability and fitness for a particular purpose are disclaimed.
+ * In no event shall the company or contributors be liable for any
+ * direct, indirect, incidental, special, exemplary, or consequential
+ * damages (including, but not limited to, procurement of substitute
+ * goods or services; loss of use, data, or profits; or business
+ * interruption) however caused and on any theory of liability, whether
+ * in contract, strict liability, or tort (including negligence or
+ * otherwise) arising in any way out of the use of this software, even
+ * if advised of the possibility of such damage.
+ *
+ */
+
+#include "ConnectionManager.h"
+
+Connection &
+ConnectionManager::add_client(ConnectionParams params)
+{
+ auto connection_ptr = std::unique_ptr(new Connection(*this, params, connection_buffer_size));
+ auto &connection = *connection_ptr;
+ std::lock_guard guard(mutex);
+ if (connection.connect()) {
+ active_connections.push_back(std::move(connection_ptr));
+ } else {
+ reconnect_connections.push_back(std::move(connection_ptr));
+ }
+ return connection;
+}
+
+void
+ConnectionManager::send_loop()
+{
+ int max_fd;
+
+ fd_set pipe_fds;
+ FD_ZERO(&pipe_fds);
+ FD_SET(pipe.get_readfd(), &pipe_fds);
+
+ fd_set socket_fds;
+ FD_ZERO(&socket_fds);
+
+ auto watch_sock = [&](int fd) {
+ FD_SET(fd, &pipe_fds);
+ max_fd = std::max(max_fd, fd);
+ };
+
+ while (!exit_flag) {
+ max_fd = pipe.get_readfd();
+ FD_ZERO(&socket_fds);
+
+ {
+ std::lock_guard guard(mutex);
+ pipe.clear();
+ auto it = active_connections.begin();
+ while (it != active_connections.end()) {
+ auto &connection = **it;
+ if (connection.has_data_to_send) {
+ std::lock_guard guard(connection.buffer_mutex);
+ if (connection.send_some()) {
+ if (connection.buffer.readable()) {
+ watch_sock(connection.sockfd);
+ }
+ connection.has_data_to_send = connection.buffer.readable();
+ } else {
+ connection.connection_lost_flag = true;
+ reconnect_connections.push_back(std::move(*it));
+ it = active_connections.erase(it);
+ reconnect_cv.notify_one();
+ continue;
+ }
+ } else {
+ if (connection.close_flag) {
+ it = active_connections.erase(it);
+ continue;
+ }
+ }
+ it++;
+ }
+ }
+
+ select(max_fd + 1, &pipe_fds, &socket_fds, NULL, NULL);
+ }
+}
+
+void
+ConnectionManager::reconnect_loop()
+{
+ while (!exit_flag) {
+ auto lock = std::unique_lock(mutex);
+ auto it = reconnect_connections.begin();
+ while (it != reconnect_connections.end()) {
+ auto &connection = **it;
+ if (connection.connect()) {
+ active_connections.push_back(std::move(*it));
+ it = reconnect_connections.erase(it);
+ pipe.notify();
+ } else {
+ if (connection.close_flag) {
+ it = reconnect_connections.erase(it);
+ continue;
+ }
+ it++;
+ }
+ }
+
+ if (reconnect_connections.empty()) {
+ reconnect_cv.wait(lock);
+ } else {
+ reconnect_cv.wait_for(lock, std::chrono::seconds(reconnect_interval_secs));
+ }
+ }
+}
+
+void
+ConnectionManager::start()
+{
+ send_thread = std::thread([this]() { send_loop(); });
+ reconnect_thread = std::thread([this]() { reconnect_loop(); });
+}
+
+void
+ConnectionManager::stop()
+{
+ exit_flag = true;
+ pipe.notify();
+ reconnect_cv.notify_one();
+ send_thread.join();
+ reconnect_thread.join();
+}
+
+void
+ConnectionManager::set_reconnect_interval(int number_of_seconds)
+{
+ reconnect_interval_secs = number_of_seconds;
+}
+
+void
+ConnectionManager::set_connection_buffer_size(long number_of_bytes)
+{
+ connection_buffer_size = number_of_bytes;
+}
+
diff --git a/src/plugins/output/forwarder/src/ConnectionManager.h b/src/plugins/output/forwarder/src/ConnectionManager.h
new file mode 100644
index 00000000..2fe73237
--- /dev/null
+++ b/src/plugins/output/forwarder/src/ConnectionManager.h
@@ -0,0 +1,98 @@
+/**
+ * \file src/plugins/output/forwarder/src/ConnectionManager.h
+ * \author Michal Sedlak
+ * \brief Connection manager
+ * \date 2021
+ */
+
+/* Copyright (C) 2021 CESNET, z.s.p.o.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * 3. Neither the name of the Company nor the names of its contributors
+ * may be used to endorse or promote products derived from this
+ * software without specific prior written permission.
+ *
+ * ALTERNATIVELY, provided that this notice is retained in full, this
+ * product may be distributed under the terms of the GNU General Public
+ * License (GPL) version 2 or later, in which case the provisions
+ * of the GPL apply INSTEAD OF those given above.
+ *
+ * This software is provided ``as is'', and any express or implied
+ * warranties, including, but not limited to, the implied warranties of
+ * merchantability and fitness for a particular purpose are disclaimed.
+ * In no event shall the company or contributors be liable for any
+ * direct, indirect, incidental, special, exemplary, or consequential
+ * damages (including, but not limited to, procurement of substitute
+ * goods or services; loss of use, data, or profits; or business
+ * interruption) however caused and on any theory of liability, whether
+ * in contract, strict liability, or tort (including negligence or
+ * otherwise) arising in any way out of the use of this software, even
+ * if advised of the possibility of such damage.
+ *
+ */
+
+#pragma once
+
+#include "ConnectionParams.h"
+#include "Connection.h"
+#include "SyncPipe.h"
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+class Connection;
+
+static constexpr long DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
+static constexpr int DEFAULT_RECONNECT_INTERVAL_SECS = 5;
+
+class ConnectionManager
+{
+friend class Connection;
+
+public:
+ Connection &
+ add_client(ConnectionParams params);
+
+ void
+ start();
+
+ void
+ stop();
+
+ void
+ set_reconnect_interval(int number_of_seconds);
+
+ void
+ set_connection_buffer_size(long number_of_bytes);
+
+private:
+ long connection_buffer_size = DEFAULT_BUFFER_SIZE;
+ int reconnect_interval_secs = DEFAULT_RECONNECT_INTERVAL_SECS;
+ std::mutex mutex;
+ std::vector> active_connections;
+ std::vector> reconnect_connections;
+ std::thread send_thread;
+ std::thread reconnect_thread;
+ std::condition_variable reconnect_cv;
+ std::atomic exit_flag { false };
+ SyncPipe pipe;
+
+ void
+ send_loop();
+
+ void
+ reconnect_loop();
+};
diff --git a/src/plugins/output/forwarder/src/ConnectionParams.h b/src/plugins/output/forwarder/src/ConnectionParams.h
new file mode 100644
index 00000000..5f9fbfa7
--- /dev/null
+++ b/src/plugins/output/forwarder/src/ConnectionParams.h
@@ -0,0 +1,136 @@
+/**
+ * \file src/plugins/output/forwarder/src/ConnectionParams.h
+ * \author Michal Sedlak
+ * \brief Parameters for estabilishing connection
+ * \date 2021
+ */
+
+/* Copyright (C) 2021 CESNET, z.s.p.o.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * 3. Neither the name of the Company nor the names of its contributors
+ * may be used to endorse or promote products derived from this
+ * software without specific prior written permission.
+ *
+ * ALTERNATIVELY, provided that this notice is retained in full, this
+ * product may be distributed under the terms of the GNU General Public
+ * License (GPL) version 2 or later, in which case the provisions
+ * of the GPL apply INSTEAD OF those given above.
+ *
+ * This software is provided ``as is'', and any express or implied
+ * warranties, including, but not limited to, the implied warranties of
+ * merchantability and fitness for a particular purpose are disclaimed.
+ * In no event shall the company or contributors be liable for any
+ * direct, indirect, incidental, special, exemplary, or consequential
+ * damages (including, but not limited to, procurement of substitute
+ * goods or services; loss of use, data, or profits; or business
+ * interruption) however caused and on any theory of liability, whether
+ * in contract, strict liability, or tort (including negligence or
+ * otherwise) arising in any way out of the use of this software, even
+ * if advised of the possibility of such damage.
+ *
+ */
+
+#pragma once
+
+#include
+#include
+
+#include
+#include
+#include
+#include
+#include
+
+using unique_addrinfo = std::unique_ptr;
+
+enum class TransProto { Tcp, Udp };
+
+struct ConnectionParams
+{
+ ConnectionParams(std::string address, std::string port, TransProto protocol)
+ : address(address)
+ , port(port)
+ , protocol(protocol)
+ {
+ }
+
+ unique_addrinfo
+ resolve_address()
+ {
+ addrinfo *info;
+ addrinfo hints = {};
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = (protocol == TransProto::Tcp ? SOCK_STREAM : SOCK_DGRAM);
+ hints.ai_protocol = (protocol == TransProto::Tcp ? IPPROTO_TCP : IPPROTO_UDP);
+ if (getaddrinfo(address.c_str(), port.c_str(), &hints, &info) == 0) {
+ return unique_addrinfo(info, &freeaddrinfo);
+ } else {
+ return unique_addrinfo(NULL, &freeaddrinfo);
+ }
+ }
+
+ int
+ make_socket()
+ {
+ auto address_info = resolve_address();
+ if (!address_info) {
+ return -1;
+ }
+
+ int sockfd;
+ addrinfo *p;
+
+ for (p = address_info.get(); p != NULL; p = p->ai_next) {
+ sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
+ if (sockfd < 0) {
+ continue;
+ }
+
+ if (protocol == TransProto::Udp) {
+ sockaddr_in sa = {};
+ sa.sin_family = AF_INET;
+ sa.sin_port = 0;
+ sa.sin_addr.s_addr = INADDR_ANY;
+ sa.sin_port = 0;
+ if (bind(sockfd, (sockaddr *)&sa, sizeof(sa)) != 0) {
+ close(sockfd);
+ continue;
+ }
+ }
+
+ if (connect(sockfd, p->ai_addr, p->ai_addrlen) != 0) {
+ close(sockfd);
+ continue;
+ }
+
+ break;
+ }
+
+ if (!p) {
+ return -1;
+ }
+
+ return sockfd;
+ }
+
+ std::string
+ str()
+ {
+ return std::string(protocol == TransProto::Tcp ? "TCP" : "UDP") + ":"
+ + address + ":"
+ + port;
+ }
+
+ std::string address;
+ std::string port;
+ TransProto protocol;
+};
\ No newline at end of file
diff --git a/src/plugins/output/forwarder/src/Forwarder.h b/src/plugins/output/forwarder/src/Forwarder.h
new file mode 100644
index 00000000..803e84e5
--- /dev/null
+++ b/src/plugins/output/forwarder/src/Forwarder.h
@@ -0,0 +1,442 @@
+/**
+ * \file src/plugins/output/forwarder/src/Forwarder.h
+ * \author Michal Sedlak
+ * \brief Forwarder logic
+ * \date 2021
+ */
+
+/* Copyright (C) 2021 CESNET, z.s.p.o.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * 3. Neither the name of the Company nor the names of its contributors
+ * may be used to endorse or promote products derived from this
+ * software without specific prior written permission.
+ *
+ * ALTERNATIVELY, provided that this notice is retained in full, this
+ * product may be distributed under the terms of the GNU General Public
+ * License (GPL) version 2 or later, in which case the provisions
+ * of the GPL apply INSTEAD OF those given above.
+ *
+ * This software is provided ``as is'', and any express or implied
+ * warranties, including, but not limited to, the implied warranties of
+ * merchantability and fitness for a particular purpose are disclaimed.
+ * In no event shall the company or contributors be liable for any
+ * direct, indirect, incidental, special, exemplary, or consequential
+ * damages (including, but not limited to, procurement of substitute
+ * goods or services; loss of use, data, or profits; or business
+ * interruption) however caused and on any theory of liability, whether
+ * in contract, strict liability, or tort (including negligence or
+ * otherwise) arising in any way out of the use of this software, even
+ * if advised of the possibility of such damage.
+ *
+ */
+
+#pragma once
+
+#include "ConnectionManager.h"
+#include "IPFIXMessage.h"
+#include "MessageBuilder.h"
+
+#include
+#include
+
+#include