Skip to content

Commit

Permalink
Forwarder: Config changes
Browse files Browse the repository at this point in the history
  • Loading branch information
sedmicha committed Feb 2, 2021
1 parent afe771a commit 9772dd1
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 22 deletions.
4 changes: 4 additions & 0 deletions src/plugins/output/forwarder/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ Parameters
The transport protocol to use
[values: TCP/UDP]

:``connectionBufferSize``:
Size of the buffer of each connection (Warning: number of connections = number of input exporters * number of hosts)
[value: number of bytes, default: 4194304]

:``templateRefreshIntervalSecs``:
Send templates again every N seconds (UDP only)
[value: number of seconds, default: 600]
Expand Down
4 changes: 1 addition & 3 deletions src/plugins/output/forwarder/src/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@
#include <mutex>
#include <cstdint>

static constexpr int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;

class ConnectionManager;

class Connection
Expand All @@ -67,7 +65,7 @@ friend class ConnectionManager;
/// The flag won't be reset when the connection is reestablished!
std::atomic<bool> connection_lost_flag { false };

Connection(ConnectionManager &manager, ConnectionParams params, long buffer_size = DEFAULT_BUFFER_SIZE);
Connection(ConnectionManager &manager, ConnectionParams params, long buffer_size);

bool
connect();
Expand Down
12 changes: 9 additions & 3 deletions src/plugins/output/forwarder/src/ConnectionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
Connection &
ConnectionManager::add_client(ConnectionParams params)
{
auto connection_ptr = std::unique_ptr<Connection>(new Connection(*this, params));
auto connection_ptr = std::unique_ptr<Connection>(new Connection(*this, params, connection_buffer_size));
auto &connection = *connection_ptr;
std::lock_guard<std::mutex> guard(mutex);
if (connection.connect()) {
Expand Down Expand Up @@ -157,8 +157,14 @@ ConnectionManager::stop()
}

void
ConnectionManager::set_reconnect_interval(int secs)
ConnectionManager::set_reconnect_interval(int number_of_seconds)
{
reconnect_interval_secs = secs;
reconnect_interval_secs = number_of_seconds;
}

void
ConnectionManager::set_connection_buffer_size(long number_of_bytes)
{
connection_buffer_size = number_of_bytes;
}

7 changes: 6 additions & 1 deletion src/plugins/output/forwarder/src/ConnectionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

class Connection;

static constexpr long DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
static constexpr int DEFAULT_RECONNECT_INTERVAL_SECS = 5;

class ConnectionManager
Expand All @@ -72,9 +73,13 @@ friend class Connection;
stop();

void
set_reconnect_interval(int secs);
set_reconnect_interval(int number_of_seconds);

void
set_connection_buffer_size(long number_of_bytes);

private:
long connection_buffer_size = DEFAULT_BUFFER_SIZE;
int reconnect_interval_secs = DEFAULT_RECONNECT_INTERVAL_SECS;
std::mutex mutex;
std::vector<std::unique_ptr<Connection>> active_connections;
Expand Down
6 changes: 6 additions & 0 deletions src/plugins/output/forwarder/src/Forwarder.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ class Forwarder
this->forward_mode = forward_mode;
}

void
set_connection_buffer_size(long number_of_bytes)
{
connection_manager.set_connection_buffer_size(number_of_bytes);
}

void
set_template_refresh_interval_secs(int number_of_seconds)
{
Expand Down
54 changes: 39 additions & 15 deletions src/plugins/output/forwarder/src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,23 @@ parse_and_configure(ipx_ctx_t *log_ctx, const char *xml_config, Forwarder &forwa
/// Config schema definition
///
enum {
MODE, PROTOCOL, RECONNECT_INTERVAL_SECS,
TEMPLATE_REFRESH_INTERVAL_SECS, TEMPLATE_REFRESH_INTERVAL_BYTES,
HOSTS, HOST, NAME, ADDRESS, PORT
MODE,
PROTOCOL,
RECONNECT_INTERVAL_SECS,
TEMPLATE_REFRESH_INTERVAL_SECS,
TEMPLATE_REFRESH_INTERVAL_BYTES,
CONNECTION_BUFFER_SIZE,
HOSTS,
HOST,
NAME,
ADDRESS,
PORT
};

fds_xml_args host_schema[] = {
FDS_OPTS_ELEM(NAME, "name", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT),
FDS_OPTS_ELEM(ADDRESS, "address", FDS_OPTS_T_STRING, 0),
FDS_OPTS_ELEM(PORT, "port", FDS_OPTS_T_STRING, 0),
FDS_OPTS_ELEM(NAME , "name" , FDS_OPTS_T_STRING, FDS_OPTS_P_OPT),
FDS_OPTS_ELEM(ADDRESS, "address", FDS_OPTS_T_STRING, 0 ),
FDS_OPTS_ELEM(PORT , "port" , FDS_OPTS_T_STRING, 0 ),
FDS_OPTS_END
};

Expand All @@ -107,22 +115,24 @@ parse_and_configure(ipx_ctx_t *log_ctx, const char *xml_config, Forwarder &forwa
};

fds_xml_args params_schema[] = {
FDS_OPTS_ROOT("params"),
FDS_OPTS_ELEM(MODE, "mode", FDS_OPTS_T_STRING, 0),
FDS_OPTS_ELEM(PROTOCOL, "protocol", FDS_OPTS_T_STRING, 0),
FDS_OPTS_ELEM(TEMPLATE_REFRESH_INTERVAL_SECS, "templateRefreshIntervalSecs", FDS_OPTS_T_INT, FDS_OPTS_P_OPT),
FDS_OPTS_ELEM(TEMPLATE_REFRESH_INTERVAL_BYTES, "templateRefreshIntervalBytes", FDS_OPTS_T_INT, FDS_OPTS_P_OPT),
FDS_OPTS_ELEM(RECONNECT_INTERVAL_SECS, "reconnectIntervalSecs", FDS_OPTS_T_INT, FDS_OPTS_P_OPT),
FDS_OPTS_NESTED(HOSTS, "hosts", hosts_schema, 0),
FDS_OPTS_ROOT ("params"),
FDS_OPTS_ELEM (MODE , "mode" , FDS_OPTS_T_STRING, 0 ),
FDS_OPTS_ELEM (PROTOCOL , "protocol" , FDS_OPTS_T_STRING, 0 ),
FDS_OPTS_ELEM (CONNECTION_BUFFER_SIZE , "connectionBufferSize" , FDS_OPTS_T_INT , FDS_OPTS_P_OPT),
FDS_OPTS_ELEM (TEMPLATE_REFRESH_INTERVAL_SECS , "templateRefreshIntervalSecs" , FDS_OPTS_T_INT , FDS_OPTS_P_OPT),
FDS_OPTS_ELEM (TEMPLATE_REFRESH_INTERVAL_BYTES, "templateRefreshIntervalBytes", FDS_OPTS_T_INT , FDS_OPTS_P_OPT),
FDS_OPTS_ELEM (RECONNECT_INTERVAL_SECS , "reconnectIntervalSecs" , FDS_OPTS_T_INT , FDS_OPTS_P_OPT),
FDS_OPTS_NESTED(HOSTS , "hosts" , hosts_schema , 0 ),
FDS_OPTS_END
};

///
/// Default parameter values
///
const int default_template_refresh_interval_secs = 10 * 60;
const int default_template_refresh_interval_secs = 10 * 60;
const int default_template_refresh_interval_bytes = 5 * 1024 * 1024;
const int default_reconnect_interval_secs = 10;
const int default_reconnect_interval_secs = 10;
const int default_connection_buffer_size = 4 * 1024 * 1024;

///
/// Parsed parameters
Expand All @@ -136,6 +146,7 @@ parse_and_configure(ipx_ctx_t *log_ctx, const char *xml_config, Forwarder &forwa
std::string mode;
std::string protocol;
std::vector<HostInfo> hosts;
Maybe<int> connection_buffer_size;
Maybe<int> template_refresh_interval_secs;
Maybe<int> template_refresh_interval_bytes;
Maybe<int> reconnect_interval_secs;
Expand Down Expand Up @@ -206,6 +217,9 @@ parse_and_configure(ipx_ctx_t *log_ctx, const char *xml_config, Forwarder &forwa
case HOSTS:
process_hosts(content->ptr_ctx);
break;
case CONNECTION_BUFFER_SIZE:
connection_buffer_size = content->val_int;
break;
case TEMPLATE_REFRESH_INTERVAL_SECS:
template_refresh_interval_secs = content->val_int;
break;
Expand Down Expand Up @@ -237,6 +251,16 @@ parse_and_configure(ipx_ctx_t *log_ctx, const char *xml_config, Forwarder &forwa
throw "Invalid protocol '" + protocol + "', possible values are: 'tcp', 'udp'";
}

if (connection_buffer_size.has_value()) {
if (connection_buffer_size.value() > 0) {
forwarder.set_connection_buffer_size(connection_buffer_size.value());
} else {
throw std::string("Invalid connection buffer size");
}
} else {
forwarder.set_connection_buffer_size(default_connection_buffer_size);
}

if (template_refresh_interval_secs.has_value()) {
if (template_refresh_interval_secs.value() >= 0) {
if (lower(protocol) == "tcp") {
Expand Down

0 comments on commit 9772dd1

Please sign in to comment.