From 14d6b6f94d7f7c58ee8dc4f89961b50aaa4e1e90 Mon Sep 17 00:00:00 2001 From: Lukas Hutak Date: Sun, 26 Apr 2020 17:33:52 +0200 Subject: [PATCH] Core: fix early processing termination of NetFlow/IPFIX Parser when a slow stop is triggered by an Input plugin When a slow stop was triggered by an input plugin (for example, when End-of-file has been reached) it also immediatelly disabled data processing (i.e. IPFIX, Session) by the following NetFlow/IPFIX message parser and therefore some messeges in the input queue were unprocessed/ignored. --- src/core/configurator/configurator.cpp | 17 +++++++++++++---- src/core/configurator/instance_input.cpp | 6 ++++++ src/core/configurator/instance_input.hpp | 15 ++++++++++++++- 3 files changed, 33 insertions(+), 5 deletions(-) diff --git a/src/core/configurator/configurator.cpp b/src/core/configurator/configurator.cpp index 512224e4..fd8c458d 100644 --- a/src/core/configurator/configurator.cpp +++ b/src/core/configurator/configurator.cpp @@ -373,8 +373,8 @@ ipx_configurator::termination_handle(const struct ipx_cpipe_req &req, ipx_contro // First of all, check if termination process has been completed if (req.type == IPX_CPIPE_TYPE_TERM_DONE) { if (m_state != STATUS::STOP_SLOW && m_state != STATUS::STOP_FAST) { - IPX_ERROR(comp_str, "Got a termination done notification, but the termination process " - "is not in progress!", '\0'); + IPX_ERROR(comp_str, "[internal] Got a termination done notification, but the " + "termination process is not in progress!", '\0'); return false; } @@ -473,6 +473,7 @@ ipx_configurator::termination_stop_all() { for (auto &it : m_running_inputs) { it->set_processing(false); + it->set_parser_processing(false); }; for (auto &it : m_running_inter) { it->set_processing(false); @@ -508,8 +509,16 @@ ipx_configurator::termination_stop_partly(const ipx_ctx_t *ctx) it->set_processing(false); } - if (ctx_info->type == IPX_PT_INPUT || ctx_info == &ipx_plugin_parser_info) { - // The termination has been invoked by an input plugin or its message parser + if (ctx_info->type == IPX_PT_INPUT) { + return; + } + + // Stop all NetFlow/IPFIX message parsers + for (auto &it : m_running_inputs) { + it->set_parser_processing(false); + } + + if (ctx_info == &ipx_plugin_parser_info) { return; } diff --git a/src/core/configurator/instance_input.cpp b/src/core/configurator/instance_input.cpp index 6dd44b08..46a4e248 100644 --- a/src/core/configurator/instance_input.cpp +++ b/src/core/configurator/instance_input.cpp @@ -183,5 +183,11 @@ void ipx_instance_input::set_processing(bool en) { ipx_ctx_processing_set(_ctx, en); + +} + +void +ipx_instance_input::set_parser_processing(bool en) +{ ipx_ctx_processing_set(_parser_ctx, en); } \ No newline at end of file diff --git a/src/core/configurator/instance_input.hpp b/src/core/configurator/instance_input.hpp index e024ca16..2069a8c5 100644 --- a/src/core/configurator/instance_input.hpp +++ b/src/core/configurator/instance_input.hpp @@ -162,13 +162,26 @@ class ipx_instance_input : public ipx_instance { extensions_resolve(ipx_cfg_extensions *ext_mgr) override; /** - * \brief Enable/disable processing of data messages (IPFIX and Transport Session) + * \brief Enable/disable processing of data messages by the plugin (IPFIX and Transport Session) + * + * \warning This doesn't affect NetFlow/IPFIX Message parser, see set_parser_processing() * \note By default, data processing is enabled. * \see ipx_ctx_processing() for more details * \param[in] en Enable/disable processing */ void set_processing(bool en) override; + + /** + * \brief Enable/disable processing of data messages by the parser (IPFIX and Transport Session) + * + * \warning This doesn't affect the input plugin, see set_processing() + * \note By default, data processing is enabled. + * \see ipx_ctx_processing() for more details + * \param[in] en Enable/disable processing + */ + void + set_parser_processing(bool en); }; #endif //IPFIXCOL_INSTANCE_INPUT_HPP