diff --git a/src/plugins/output/forwarder/README.rst b/src/plugins/output/forwarder/README.rst index 8dd97c38..64613329 100644 --- a/src/plugins/output/forwarder/README.rst +++ b/src/plugins/output/forwarder/README.rst @@ -43,6 +43,10 @@ Parameters The transport protocol to use [values: TCP/UDP] +:``connectionBufferSize``: + Size of the buffer of each connection (Warning: number of connections = number of input exporters * number of hosts) + [value: number of bytes, default: 4194304] + :``templateRefreshIntervalSecs``: Send templates again every N seconds (UDP only) [value: number of seconds, default: 600] diff --git a/src/plugins/output/forwarder/src/Connection.h b/src/plugins/output/forwarder/src/Connection.h index 7a3d6b37..09af5a29 100644 --- a/src/plugins/output/forwarder/src/Connection.h +++ b/src/plugins/output/forwarder/src/Connection.h @@ -54,8 +54,6 @@ #include #include -static constexpr int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; - class ConnectionManager; class Connection @@ -67,7 +65,7 @@ friend class ConnectionManager; /// The flag won't be reset when the connection is reestablished! std::atomic connection_lost_flag { false }; - Connection(ConnectionManager &manager, ConnectionParams params, long buffer_size = DEFAULT_BUFFER_SIZE); + Connection(ConnectionManager &manager, ConnectionParams params, long buffer_size); bool connect(); diff --git a/src/plugins/output/forwarder/src/ConnectionManager.cpp b/src/plugins/output/forwarder/src/ConnectionManager.cpp index 22c40189..f55dafaf 100644 --- a/src/plugins/output/forwarder/src/ConnectionManager.cpp +++ b/src/plugins/output/forwarder/src/ConnectionManager.cpp @@ -44,7 +44,7 @@ Connection & ConnectionManager::add_client(ConnectionParams params) { - auto connection_ptr = std::unique_ptr(new Connection(*this, params)); + auto connection_ptr = std::unique_ptr(new Connection(*this, params, connection_buffer_size)); auto &connection = *connection_ptr; std::lock_guard guard(mutex); if (connection.connect()) { @@ -157,8 +157,14 @@ ConnectionManager::stop() } void -ConnectionManager::set_reconnect_interval(int secs) +ConnectionManager::set_reconnect_interval(int number_of_seconds) { - reconnect_interval_secs = secs; + reconnect_interval_secs = number_of_seconds; +} + +void +ConnectionManager::set_connection_buffer_size(long number_of_bytes) +{ + connection_buffer_size = number_of_bytes; } diff --git a/src/plugins/output/forwarder/src/ConnectionManager.h b/src/plugins/output/forwarder/src/ConnectionManager.h index 752cedaf..2fe73237 100644 --- a/src/plugins/output/forwarder/src/ConnectionManager.h +++ b/src/plugins/output/forwarder/src/ConnectionManager.h @@ -55,6 +55,7 @@ class Connection; +static constexpr long DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; static constexpr int DEFAULT_RECONNECT_INTERVAL_SECS = 5; class ConnectionManager @@ -72,9 +73,13 @@ friend class Connection; stop(); void - set_reconnect_interval(int secs); + set_reconnect_interval(int number_of_seconds); + + void + set_connection_buffer_size(long number_of_bytes); private: + long connection_buffer_size = DEFAULT_BUFFER_SIZE; int reconnect_interval_secs = DEFAULT_RECONNECT_INTERVAL_SECS; std::mutex mutex; std::vector> active_connections; diff --git a/src/plugins/output/forwarder/src/Forwarder.h b/src/plugins/output/forwarder/src/Forwarder.h index c73e6a13..803e84e5 100644 --- a/src/plugins/output/forwarder/src/Forwarder.h +++ b/src/plugins/output/forwarder/src/Forwarder.h @@ -155,6 +155,12 @@ class Forwarder this->forward_mode = forward_mode; } + void + set_connection_buffer_size(long number_of_bytes) + { + connection_manager.set_connection_buffer_size(number_of_bytes); + } + void set_template_refresh_interval_secs(int number_of_seconds) { diff --git a/src/plugins/output/forwarder/src/config.h b/src/plugins/output/forwarder/src/config.h index 6f28756a..81298548 100644 --- a/src/plugins/output/forwarder/src/config.h +++ b/src/plugins/output/forwarder/src/config.h @@ -89,15 +89,23 @@ parse_and_configure(ipx_ctx_t *log_ctx, const char *xml_config, Forwarder &forwa /// Config schema definition /// enum { - MODE, PROTOCOL, RECONNECT_INTERVAL_SECS, - TEMPLATE_REFRESH_INTERVAL_SECS, TEMPLATE_REFRESH_INTERVAL_BYTES, - HOSTS, HOST, NAME, ADDRESS, PORT + MODE, + PROTOCOL, + RECONNECT_INTERVAL_SECS, + TEMPLATE_REFRESH_INTERVAL_SECS, + TEMPLATE_REFRESH_INTERVAL_BYTES, + CONNECTION_BUFFER_SIZE, + HOSTS, + HOST, + NAME, + ADDRESS, + PORT }; fds_xml_args host_schema[] = { - FDS_OPTS_ELEM(NAME, "name", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), - FDS_OPTS_ELEM(ADDRESS, "address", FDS_OPTS_T_STRING, 0), - FDS_OPTS_ELEM(PORT, "port", FDS_OPTS_T_STRING, 0), + FDS_OPTS_ELEM(NAME , "name" , FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), + FDS_OPTS_ELEM(ADDRESS, "address", FDS_OPTS_T_STRING, 0 ), + FDS_OPTS_ELEM(PORT , "port" , FDS_OPTS_T_STRING, 0 ), FDS_OPTS_END }; @@ -107,22 +115,24 @@ parse_and_configure(ipx_ctx_t *log_ctx, const char *xml_config, Forwarder &forwa }; fds_xml_args params_schema[] = { - FDS_OPTS_ROOT("params"), - FDS_OPTS_ELEM(MODE, "mode", FDS_OPTS_T_STRING, 0), - FDS_OPTS_ELEM(PROTOCOL, "protocol", FDS_OPTS_T_STRING, 0), - FDS_OPTS_ELEM(TEMPLATE_REFRESH_INTERVAL_SECS, "templateRefreshIntervalSecs", FDS_OPTS_T_INT, FDS_OPTS_P_OPT), - FDS_OPTS_ELEM(TEMPLATE_REFRESH_INTERVAL_BYTES, "templateRefreshIntervalBytes", FDS_OPTS_T_INT, FDS_OPTS_P_OPT), - FDS_OPTS_ELEM(RECONNECT_INTERVAL_SECS, "reconnectIntervalSecs", FDS_OPTS_T_INT, FDS_OPTS_P_OPT), - FDS_OPTS_NESTED(HOSTS, "hosts", hosts_schema, 0), + FDS_OPTS_ROOT ("params"), + FDS_OPTS_ELEM (MODE , "mode" , FDS_OPTS_T_STRING, 0 ), + FDS_OPTS_ELEM (PROTOCOL , "protocol" , FDS_OPTS_T_STRING, 0 ), + FDS_OPTS_ELEM (CONNECTION_BUFFER_SIZE , "connectionBufferSize" , FDS_OPTS_T_INT , FDS_OPTS_P_OPT), + FDS_OPTS_ELEM (TEMPLATE_REFRESH_INTERVAL_SECS , "templateRefreshIntervalSecs" , FDS_OPTS_T_INT , FDS_OPTS_P_OPT), + FDS_OPTS_ELEM (TEMPLATE_REFRESH_INTERVAL_BYTES, "templateRefreshIntervalBytes", FDS_OPTS_T_INT , FDS_OPTS_P_OPT), + FDS_OPTS_ELEM (RECONNECT_INTERVAL_SECS , "reconnectIntervalSecs" , FDS_OPTS_T_INT , FDS_OPTS_P_OPT), + FDS_OPTS_NESTED(HOSTS , "hosts" , hosts_schema , 0 ), FDS_OPTS_END }; /// /// Default parameter values /// - const int default_template_refresh_interval_secs = 10 * 60; + const int default_template_refresh_interval_secs = 10 * 60; const int default_template_refresh_interval_bytes = 5 * 1024 * 1024; - const int default_reconnect_interval_secs = 10; + const int default_reconnect_interval_secs = 10; + const int default_connection_buffer_size = 4 * 1024 * 1024; /// /// Parsed parameters @@ -136,6 +146,7 @@ parse_and_configure(ipx_ctx_t *log_ctx, const char *xml_config, Forwarder &forwa std::string mode; std::string protocol; std::vector hosts; + Maybe connection_buffer_size; Maybe template_refresh_interval_secs; Maybe template_refresh_interval_bytes; Maybe reconnect_interval_secs; @@ -206,6 +217,9 @@ parse_and_configure(ipx_ctx_t *log_ctx, const char *xml_config, Forwarder &forwa case HOSTS: process_hosts(content->ptr_ctx); break; + case CONNECTION_BUFFER_SIZE: + connection_buffer_size = content->val_int; + break; case TEMPLATE_REFRESH_INTERVAL_SECS: template_refresh_interval_secs = content->val_int; break; @@ -237,6 +251,16 @@ parse_and_configure(ipx_ctx_t *log_ctx, const char *xml_config, Forwarder &forwa throw "Invalid protocol '" + protocol + "', possible values are: 'tcp', 'udp'"; } + if (connection_buffer_size.has_value()) { + if (connection_buffer_size.value() > 0) { + forwarder.set_connection_buffer_size(connection_buffer_size.value()); + } else { + throw std::string("Invalid connection buffer size"); + } + } else { + forwarder.set_connection_buffer_size(default_connection_buffer_size); + } + if (template_refresh_interval_secs.has_value()) { if (template_refresh_interval_secs.value() >= 0) { if (lower(protocol) == "tcp") {