Skip to content

Commit

Permalink
Merge pull request CESNET#47 from sedmicha/forwarder
Browse files Browse the repository at this point in the history
Forwarder output: define new output module
  • Loading branch information
Lukas955 authored Mar 14, 2021
2 parents 9bf092f + 13cebfe commit 936b1b6
Show file tree
Hide file tree
Showing 17 changed files with 2,219 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/plugins/output/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ add_subdirectory(json-kafka)
add_subdirectory(timecheck)
add_subdirectory(viewer)
add_subdirectory(ipfix)
add_subdirectory(forwarder)
37 changes: 37 additions & 0 deletions src/plugins/output/forwarder/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Create a linkable module
add_library(forwarder-output MODULE
src/main.cpp
src/config.h
src/Forwarder.h
src/ConnectionManager.h
src/ConnectionManager.cpp
src/ConnectionParams.h
src/Connection.h
src/Connection.cpp
src/ConnectionBuffer.h
src/SyncPipe.h
src/IPFIXMessage.h
src/MessageBuilder.h
)

install(
TARGETS forwarder-output
LIBRARY DESTINATION "${INSTALL_DIR_LIB}/ipfixcol2/"
)

if (ENABLE_DOC_MANPAGE)
# Build a manual page
set(SRC_FILE "${CMAKE_CURRENT_SOURCE_DIR}/doc/ipfixcol2-forwarder-output.7.rst")
set(DST_FILE "${CMAKE_CURRENT_BINARY_DIR}/ipfixcol2-forwarder-output.7")

add_custom_command(TARGET forwarder-output PRE_BUILD
COMMAND ${RST2MAN_EXECUTABLE} --syntax-highlight=none ${SRC_FILE} ${DST_FILE}
DEPENDS ${SRC_FILE}
VERBATIM
)

install(
FILES "${DST_FILE}"
DESTINATION "${INSTALL_DIR_MAN}/man7"
)
endif()
77 changes: 77 additions & 0 deletions src/plugins/output/forwarder/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
Forwarder (output plugin)
==========================

This plugin allows forwarding incoming IPFIX messages to other collector in various modes.

It can be used to broadcast messages to multiple collectors (e.g. a main and a backup collector),
or to distribute messages across multiple collectors (e.g. for load balancing).

Example configuration
---------------------

.. code-block:: xml
<output>
<name>Forwarder</name>
<plugin>forwarder</plugin>
<params>
<mode>roundrobin</mode>
<protocol>tcp</protocol>
<hosts>
<host>
<name>Subcollector 1</name>
<address>127.0.0.1</address>
<port>4751</port>
</host>
<host>
<name>Subcollector 2</name>
<address>localhost</address>
<port>4752</port>
</host>
</hosts>
</params>
</output>
Parameters
----------

:``mode``:
The forwarding mode; round robin (messages are sent to one host at time and hosts are cycled through) or all (messages are broadcasted to all hosts)
[values: RoundRobin/All]

:``protocol``:
The transport protocol to use
[values: TCP/UDP]

:``connectionBufferSize``:
Size of the buffer of each connection (Warning: number of connections = number of input exporters * number of hosts)
[value: number of bytes, default: 4194304]

:``templateRefreshIntervalSecs``:
Send templates again every N seconds (UDP only)
[value: number of seconds, default: 600]

:``templateRefreshIntervalBytes``:
Send templates again every N bytes (UDP only)
[value: number of bytes, default: 5000000]

:``reconnectIntervalSecs``:
Attempt to reconnect every N seconds in case the connection drops (TCP only)
[value: number of seconds, default: 10]

:``hosts``:
The receiving hosts

:``host``:
:``name``:
Optional identification of the host
[value: string, default: <address>:<port>]

:``address``:
The address of the host
[value: IPv4/IPv6 address or a hostname]

:``port``:
The port to connect to
[value: port number]

5 changes: 5 additions & 0 deletions src/plugins/output/forwarder/TODO.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
* Template withdrawals
* More effective way of handling template changes - currently all the templates are being sent again every time any change in templates is detected
* Message MTU
* Possible bug: when testing, a small number of data records seems to be lost (something like 20 out of 1,000,000)
* Connection buffer size
20 changes: 20 additions & 0 deletions src/plugins/output/forwarder/doc/ipfixcol2-forwarder-output.7.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
============================
ipfixcol2-forwarder-output
============================

--------------------------
Forwarder (output plugin)
--------------------------

:Author: Michal Sedlak ([email protected])
:Date: 2021-01-28
:Copyright: Copyright © 2021 CESNET, z.s.p.o.
:Version: 1.0
:Manual section: 7
:Manual group: IPFIXcol collector

Description
-----------

.. include:: ../README.rst
:start-line: 3
130 changes: 130 additions & 0 deletions src/plugins/output/forwarder/src/Connection.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/**
* \file src/plugins/output/forwarder/src/Connection.cpp
* \author Michal Sedlak <[email protected]>
* \brief Buffered socket connection
* \date 2021
*/

/* Copyright (C) 2021 CESNET, z.s.p.o.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
* 3. Neither the name of the Company nor the names of its contributors
* may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* ALTERNATIVELY, provided that this notice is retained in full, this
* product may be distributed under the terms of the GNU General Public
* License (GPL) version 2 or later, in which case the provisions
* of the GPL apply INSTEAD OF those given above.
*
* This software is provided ``as is'', and any express or implied
* warranties, including, but not limited to, the implied warranties of
* merchantability and fitness for a particular purpose are disclaimed.
* In no event shall the company or contributors be liable for any
* direct, indirect, incidental, special, exemplary, or consequential
* damages (including, but not limited to, procurement of substitute
* goods or services; loss of use, data, or profits; or business
* interruption) however caused and on any theory of liability, whether
* in contract, strict liability, or tort (including negligence or
* otherwise) arising in any way out of the use of this software, even
* if advised of the possibility of such damage.
*
*/

#include "Connection.h"

#include <libfds.h>

Connection::Connection(ConnectionManager &manager, ConnectionParams params, long buffer_size)
: manager(manager)
, params(params)
, buffer(buffer_size)
{
}

bool
Connection::connect()
{
if (sockfd >= 0) {
::close(sockfd);
}
sockfd = params.make_socket();
return sockfd >= 0;
}

std::unique_lock<std::mutex>
Connection::begin_write()
{
return std::unique_lock<std::mutex>(buffer_mutex);
}

bool
Connection::write(void *data, long length)
{
return buffer.write((uint8_t *)data, length);
}

void
Connection::rollback_write()
{
buffer.rollback();
}

long
Connection::writeable()
{
return buffer.writeable();
}

void
Connection::commit_write()
{
buffer.commit();
manager.pipe.notify();
has_data_to_send = buffer.readable();
}

bool
Connection::send_some()
{
if (params.protocol == TransProto::Udp) {
while (1) {
fds_ipfix_msg_hdr ipfix_header;
if (!buffer.peek(ipfix_header)) {
return true;
}
auto message_length = ntohs(ipfix_header.length);
int ret = buffer.send_data(sockfd, message_length);
if (ret == 0 || !buffer.readable()) {
return true;
} else if (ret < 0) {
return false;
}
}
return true;
} else {
return buffer.send_data(sockfd) >= 0;
}
}

void
Connection::close()
{
close_flag = true;
manager.pipe.notify();
}

Connection::~Connection()
{
if (sockfd >= 0) {
::close(sockfd);
}
}
117 changes: 117 additions & 0 deletions src/plugins/output/forwarder/src/Connection.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* \file src/plugins/output/forwarder/src/Connection.h
* \author Michal Sedlak <[email protected]>
* \brief Buffered socket connection
* \date 2021
*/

/* Copyright (C) 2021 CESNET, z.s.p.o.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
* 3. Neither the name of the Company nor the names of its contributors
* may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* ALTERNATIVELY, provided that this notice is retained in full, this
* product may be distributed under the terms of the GNU General Public
* License (GPL) version 2 or later, in which case the provisions
* of the GPL apply INSTEAD OF those given above.
*
* This software is provided ``as is'', and any express or implied
* warranties, including, but not limited to, the implied warranties of
* merchantability and fitness for a particular purpose are disclaimed.
* In no event shall the company or contributors be liable for any
* direct, indirect, incidental, special, exemplary, or consequential
* damages (including, but not limited to, procurement of substitute
* goods or services; loss of use, data, or profits; or business
* interruption) however caused and on any theory of liability, whether
* in contract, strict liability, or tort (including negligence or
* otherwise) arising in any way out of the use of this software, even
* if advised of the possibility of such damage.
*
*/

#pragma once

#include "ConnectionManager.h"
#include "ConnectionParams.h"
#include "ConnectionBuffer.h"

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>

#include <atomic>
#include <mutex>
#include <cstdint>

class ConnectionManager;

class Connection
{
friend class ConnectionManager;

public:
/// Flag indicating that the connection was lost and the forwarder needs to resend templates etc.
/// The flag won't be reset when the connection is reestablished!
std::atomic<bool> connection_lost_flag { false };

Connection(ConnectionManager &manager, ConnectionParams params, long buffer_size);

bool
connect();

std::unique_lock<std::mutex>
begin_write();

bool
write(void *data, long length);

bool
send_some();

void
commit_write();

void
rollback_write();

long
writeable();

void
close();

~Connection();

private:
/// The manager managing this connection
ConnectionManager &manager;

/// The parameters to estabilish the connection
ConnectionParams params;

/// The connection socket
int sockfd = -1;

/// Buffer for the data to send and a mutex guarding it
/// (buffer will be accessed from sender thread and writer thread)
std::mutex buffer_mutex;
ConnectionBuffer buffer;

/// Flag indicating whether the buffer has any data to send so we don't have to lock the mutex every time
/// (doesn't need to be atomic because we only set it while holding the mutex)
bool has_data_to_send = false;

/// Flag indicating that the connection has been closed and can be disposed of after the data is sent
std::atomic<bool> close_flag { false };
};
Loading

0 comments on commit 936b1b6

Please sign in to comment.