From 086e9540dc26b9d2c1710c3770f5d44a2aaf58e0 Mon Sep 17 00:00:00 2001 From: Lukas Hutak Date: Mon, 30 Mar 2020 18:38:15 +0200 Subject: [PATCH] Core: new handling of termination requests --- include/ipfixcol2/plugins.h | 2 + src/core/CMakeLists.txt | 6 +- src/core/configurator/config_file.cpp | 519 ---------------------- src/core/configurator/config_file.hpp | 62 --- src/core/configurator/configurator.cpp | 74 +-- src/core/configurator/configurator.hpp | 2 + src/core/configurator/controller_file.cpp | 5 +- src/core/configurator/instance.hpp | 2 +- src/core/configurator/instance_input.cpp | 4 +- src/core/context.c | 113 ++++- src/core/context.h | 16 +- src/core/main.cpp | 1 - src/core/message_base.c | 2 +- src/core/message_terminate.c | 4 +- src/core/message_terminate.h | 12 +- 15 files changed, 162 insertions(+), 662 deletions(-) delete mode 100644 src/core/configurator/config_file.cpp delete mode 100644 src/core/configurator/config_file.hpp diff --git a/include/ipfixcol2/plugins.h b/include/ipfixcol2/plugins.h index b0e070cf..2b6326e0 100644 --- a/include/ipfixcol2/plugins.h +++ b/include/ipfixcol2/plugins.h @@ -264,6 +264,8 @@ ipx_plugin_get(ipx_ctx_t *ctx, void *cfg); * \return #IPX_OK on success * \return #IPX_ERR_DENIED if a fatal memory allocation error has occurred and/or the plugin cannot * continue to work properly (the collector will exit). + * \return #IPX_ERR_EOF if the plugin has reached expected goal (e.g. number of processed records). + * This function will not be called anymore and the collector will shut down. */ IPX_API int ipx_plugin_process(ipx_ctx_t *ctx, void *cfg, ipx_msg_t *msg); diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index 7cbc83a2..edd1b105 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -1,9 +1,11 @@ # Core source files set(CORE_SOURCE - configurator/config_file.cpp - configurator/config_file.hpp configurator/configurator.cpp configurator/configurator.hpp + configurator/controller_file.cpp + configurator/controller_file.hpp + configurator/cpipe.c + configurator/cpipe.h configurator/extensions.cpp configurator/extensions.hpp configurator/instance.hpp diff --git a/src/core/configurator/config_file.cpp b/src/core/configurator/config_file.cpp deleted file mode 100644 index 5c2a6ca2..00000000 --- a/src/core/configurator/config_file.cpp +++ /dev/null @@ -1,519 +0,0 @@ -/** - * \file src/core/configurator/config_file.cpp - * \author Lukas Hutak - * \brief Parser of configuration file (source file) - * \date 2018 - */ - -/* Copyright (C) 2018 CESNET, z.s.p.o. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in - * the documentation and/or other materials provided with the - * distribution. - * 3. Neither the name of the Company nor the names of its contributors - * may be used to endorse or promote products derived from this - * software without specific prior written permission. - * - * ALTERNATIVELY, provided that this notice is retained in full, this - * product may be distributed under the terms of the GNU General Public - * License (GPL) version 2 or later, in which case the provisions - * of the GPL apply INSTEAD OF those given above. - * - * This software is provided ``as is'', and any express or implied - * warranties, including, but not limited to, the implied warranties of - * merchantability and fitness for a particular purpose are disclaimed. - * In no event shall the company or contributors be liable for any - * direct, indirect, incidental, special, exemplary, or consequential - * damages (including, but not limited to, procurement of substitute - * goods or services; loss of use, data, or profits; or business - * interruption) however caused and on any theory of liability, whether - * in contract, strict liability, or tort (including negligence or - * otherwise) arising in any way out of the use of this software, even - * if advised of the possibility of such damage. - * - */ - -#include -#include -#include -#include - -#include -#include -#include "config_file.hpp" -#include "configurator.hpp" -#include "model.hpp" - -#include -#include -#include -#include -#include -#include - -extern "C" { -#include "../utils.h" -#include "../verbose.h" -} - -/** Component identification (for log) */ -static const char *comp_str = "Configurator"; - -/** Types of XML configuration nodes */ -enum file_xml_nodes { - // List of plugin instances - LIST_INPUTS = 1, - LIST_INTER, - LIST_OUTPUT, - // Instances - INSTANCE_INPUT, - INSTANCE_INTER, - INSTANCE_OUTPUT, - // Input plugin parameters - IN_PLUGIN_NAME, - IN_PLUGIN_PLUGIN, - IN_PLUGIN_PARAMS, - IN_PLUGIN_VERBOSITY, - // Intermediate plugin parameters - INTER_PLUGIN_NAME, - INTER_PLUGIN_PLUGIN, - INTER_PLUGIN_PARAMS, - INTER_PLUGIN_VERBOSITY, - // Output plugin parameters - OUT_PLUGIN_NAME, - OUT_PLUGIN_PLUGIN, - OUT_PLUGIN_PARAMS, - OUT_PLUGIN_VERBOSITY, - OUT_PLUGIN_ODID_ONLY, - OUT_PLUGIN_ODID_EXCEPT, -}; - -/** - * \brief Definition of the \ node - * \note Presence of the all required parameters is checked during building of the model - */ -static const struct fds_xml_args args_instance_input[] = { - FDS_OPTS_ELEM(IN_PLUGIN_NAME, "name", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), - FDS_OPTS_ELEM(IN_PLUGIN_PLUGIN, "plugin", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), - FDS_OPTS_ELEM(IN_PLUGIN_VERBOSITY, "verbosity", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), - FDS_OPTS_RAW( IN_PLUGIN_PARAMS, "params", FDS_OPTS_P_OPT), - FDS_OPTS_END -}; - -/** - * \brief Definition of the \ node - * \note The configurator checks later if at least one instance is present - */ -static const struct fds_xml_args args_list_inputs[] = { - FDS_OPTS_NESTED(INSTANCE_INPUT, "input", args_instance_input, FDS_OPTS_P_OPT | FDS_OPTS_P_MULTI), - FDS_OPTS_END -}; - -/** - * \brief Definition of the \ node - * \note Presence of the all required parameters is checked during building of the model - */ -static const struct fds_xml_args args_instance_inter[] = { - FDS_OPTS_ELEM(INTER_PLUGIN_NAME, "name", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), - FDS_OPTS_ELEM(INTER_PLUGIN_PLUGIN, "plugin", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), - FDS_OPTS_ELEM(INTER_PLUGIN_VERBOSITY, "verbosity", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), - FDS_OPTS_RAW( INTER_PLUGIN_PARAMS, "params", FDS_OPTS_P_OPT), - FDS_OPTS_END -}; - -/** Definition of the \ node */ -static const struct fds_xml_args args_list_inter[] = { - FDS_OPTS_NESTED(INSTANCE_INTER, "intermediate", args_instance_inter, FDS_OPTS_P_OPT | FDS_OPTS_P_MULTI), - FDS_OPTS_END -}; - -/** - * \brief Definition of the \ node - * \note Presence of the all required parameters is checked during building of the model - */ -static const struct fds_xml_args args_instance_output[] = { - FDS_OPTS_ELEM(OUT_PLUGIN_NAME, "name", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), - FDS_OPTS_ELEM(OUT_PLUGIN_PLUGIN, "plugin", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), - FDS_OPTS_ELEM(OUT_PLUGIN_VERBOSITY, "verbosity", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), - FDS_OPTS_ELEM(OUT_PLUGIN_ODID_EXCEPT, "odidExcept", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), - FDS_OPTS_ELEM(OUT_PLUGIN_ODID_ONLY, "odidOnly", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), - FDS_OPTS_RAW( OUT_PLUGIN_PARAMS, "params", FDS_OPTS_P_OPT), - FDS_OPTS_END -}; - -/** - * \brief Definition of the \ node - * \note The configurator checks later if at least one instance is present - */ -static const struct fds_xml_args args_list_output[] = { - FDS_OPTS_NESTED(INSTANCE_OUTPUT, "output", args_instance_output, FDS_OPTS_P_OPT | FDS_OPTS_P_MULTI), - FDS_OPTS_END -}; - -/** - * \brief Definition of the main \ node - * \note - * Missing an input or output instance is check during starting of a new pipeline in the - * configurator. - */ -static const struct fds_xml_args args_main[] = { - FDS_OPTS_ROOT("ipfixcol2"), - FDS_OPTS_NESTED(LIST_INPUTS, "inputPlugins", args_list_inputs, FDS_OPTS_P_OPT), - FDS_OPTS_NESTED(LIST_INTER, "intermediatePlugins", args_list_inter, FDS_OPTS_P_OPT), - FDS_OPTS_NESTED(LIST_OUTPUT, "outputPlugins", args_list_output, FDS_OPTS_P_OPT), - FDS_OPTS_END -}; - -/** - * \brief Terminating signal handler - */ -void termination_handler(int sig) -{ - (void) sig; - static const char *msg = "Another termination signal detected. Quiting without cleanup...\n"; - write(STDOUT_FILENO, msg, strlen(msg)); - abort(); -} - -/** - * \brief Parse \ node and add the parsed input instance to the model - * \param[in] ctx Parsed XML node - * \param[in] model Configuration model - * \throw invalid_argument if the parameters are not valid or missing - */ -static void -file_parse_instance_input(fds_xml_ctx_t *ctx, ipx_config_model &model) -{ - struct ipx_plugin_input input; - - const struct fds_xml_cont *content; - while (fds_xml_next(ctx, &content) != FDS_EOC) { - switch (content->id) { - case IN_PLUGIN_NAME: - input.name = content->ptr_string; - break; - case IN_PLUGIN_PLUGIN: - input.plugin = content->ptr_string; - break; - case IN_PLUGIN_VERBOSITY: - input.verbosity = content->ptr_string; - break; - case IN_PLUGIN_PARAMS: - input.params = content->ptr_string; - break; - default: - // Unexpected XML node within ! - assert(false); - } - } - - model.add_instance(input); -} - -/** - * \brief Parse \ node and add the parsed input instances to the model - * \param[in] ctx Parsed XML node - * \param[in] model Configuration model - * \throw invalid_argument if the parameters are not valid or missing - */ -static void -file_parse_list_input(fds_xml_ctx_t *ctx, ipx_config_model &model) -{ - unsigned int cnt = 0; - const struct fds_xml_cont *content; - - while (fds_xml_next(ctx, &content) != FDS_EOC) { - // Process an input plugin - assert(content->id == INSTANCE_INPUT); - cnt++; - - try { - file_parse_instance_input(content->ptr_ctx, model); - } catch (std::exception &ex) { - throw std::runtime_error("Failed to parse the configuration of the " - + std::to_string(cnt) + ". input plugin: " + ex.what()); - } - } -} - -/** - * \brief Parse \ node and add the parsed intermediate instance to the model - * \param[in] ctx Parsed XML node - * \param[in] model Configuration model - * \throw invalid_argument if the parameters are not valid or missing - */ -static void -file_parse_instance_inter(fds_xml_ctx_t *ctx, ipx_config_model &model) -{ - struct ipx_plugin_inter inter; - - const struct fds_xml_cont *content; - while (fds_xml_next(ctx, &content) != FDS_EOC) { - switch (content->id) { - case INTER_PLUGIN_NAME: - inter.name = content->ptr_string; - break; - case INTER_PLUGIN_PLUGIN: - inter.plugin = content->ptr_string; - break; - case INTER_PLUGIN_VERBOSITY: - inter.verbosity = content->ptr_string; - break; - case INTER_PLUGIN_PARAMS: - inter.params = content->ptr_string; - break; - default: - // "Unexpected XML node within !" - assert(false); - } - } - - model.add_instance(inter); -} - -/** - * \brief Parse \ node and add the parsed intermediate instances to the model - * \param[in] ctx Parsed XML node - * \param[in] model Configuration model - * \throw invalid_argument if the parameters are not valid or missing - */ -static void -file_parse_list_inter(fds_xml_ctx_t *ctx, ipx_config_model &model) -{ - unsigned int cnt = 0; - const struct fds_xml_cont *content; - - while (fds_xml_next(ctx, &content) != FDS_EOC) { - // Process an intermediate plugin - assert(content->id == INSTANCE_INTER); - cnt++; - - try { - file_parse_instance_inter(content->ptr_ctx, model); - } catch (std::exception &ex) { - throw std::runtime_error("Failed to parse the configuration of the " - + std::to_string(cnt) + ". intermediate plugin: " + ex.what()); - } - } -} - -/** - * \brief Parse \ node and add the parsed output instance to the model - * \param[in] ctx Parsed XML node - * \param[in] model Configuration model - * \throw invalid_argument if the parameters are not valid or missing - */ -static void -file_parse_instance_output(fds_xml_ctx_t *ctx, ipx_config_model &model) -{ - struct ipx_plugin_output output; - output.odid_type = IPX_ODID_FILTER_NONE; // default - bool odid_set = false; - - const struct fds_xml_cont *content; - while (fds_xml_next(ctx, &content) != FDS_EOC) { - switch (content->id) { - case OUT_PLUGIN_NAME: - output.name = content->ptr_string; - break; - case OUT_PLUGIN_PLUGIN: - output.plugin = content->ptr_string; - break; - case OUT_PLUGIN_VERBOSITY: - output.verbosity= content->ptr_string; - break; - case OUT_PLUGIN_PARAMS: - output.params = content->ptr_string; - break; - case OUT_PLUGIN_ODID_EXCEPT: - if (!odid_set) { - output.odid_type = IPX_ODID_FILTER_EXCEPT; - output.odid_expression = content->ptr_string; - odid_set = true; - break; - } - throw std::runtime_error("Multiple definitions of /!"); - case OUT_PLUGIN_ODID_ONLY: - if (!odid_set) { - output.odid_type = IPX_ODID_FILTER_ONLY; - output.odid_expression = content->ptr_string; - odid_set = true; - break; - } - throw std::runtime_error("Multiple definitions of /!"); - default: - // Unexpected XML node within ! - assert(false); - } - } - - model.add_instance(output); -} - -/** - * \brief Parse \ node and add the parsed intermediate instances to the model - * \param[in] ctx Parsed XML node - * \param[in] model Configuration model - * \throw invalid_argument if the parameters are not valid or missing - */ -static void -file_parse_list_output(fds_xml_ctx_t *ctx, ipx_config_model &model) -{ - unsigned int cnt = 0; - const struct fds_xml_cont *content; - - while (fds_xml_next(ctx, &content) != FDS_EOC) { - // Process an output plugin - assert(content->id == INSTANCE_OUTPUT); - cnt++; - - try { - file_parse_instance_output(content->ptr_ctx, model); - } catch (std::exception &ex) { - throw std::runtime_error("Failed to parse the configuration of the " - + std::to_string(cnt) + ". output plugin: " + ex.what()); - } - } -} - -/** - * \brief Parse startup configuration file - * - * - * \param[in] path Path to the startup file - * \return Parsed model - * \throw runtime_error if the file doesn't exists, it's not available or it is malformed - * \throw invalid_argument if some parameters are not valid or missing - */ -static ipx_config_model -file_parse_model(const std::string &path) -{ - // Is it really a configuration file - struct stat file_info; - std::unique_ptr real_path(realpath(path.c_str(), nullptr), &free); - if (!real_path || stat(real_path.get(), &file_info) != 0) { - throw std::runtime_error("Failed to get info about '" + path + "'. Check if the path " - "exists and the application has permission to access it."); - } - - if ((file_info.st_mode & S_IFREG) == 0) { - throw std::runtime_error("The specified path '" + path + "' doesn't lead to a valid " - "configuration file!"); - } - - // Load content of the configuration file - std::unique_ptr stream(fopen(path.c_str(), "r"), &fclose); - if (!stream) { - // Failed to open the file - std::string err_msg = "Unable to open the file '" + path + "'"; - throw std::runtime_error(err_msg); - } - - // Load whole content of the file - fseek(stream.get(), 0, SEEK_END); - long fsize = ftell(stream.get()); - if (fsize == -1) { - throw std::runtime_error("Unable to get the size of the file '" + path + "'"); - } - - rewind(stream.get()); - std::unique_ptr fcontent(new char[fsize + 1]); - if (fread(fcontent.get(), fsize, 1, stream.get()) != 1) { - throw std::runtime_error("Failed to load startup configuration."); - } - fcontent.get()[fsize] = '\0'; - - // Create a parser and try try to parse document - std::unique_ptr parser(fds_xml_create(), &fds_xml_destroy); - if (!parser) { - throw std::runtime_error("fds_xml_create() failed!"); - } - - if (fds_xml_set_args(parser.get(), args_main) != FDS_OK) { - throw std::runtime_error("fds_xml_set_args() failed: " - + std::string(fds_xml_last_err(parser.get()))); - } - - fds_xml_ctx_t *ctx = fds_xml_parse_mem(parser.get(), fcontent.get(), true); - if (!ctx) { - throw std::runtime_error("Failed to parse configuration: " - + std::string(fds_xml_last_err(parser.get()))); - } - - ipx_config_model model; - const struct fds_xml_cont *content; - while(fds_xml_next(ctx, &content) != FDS_EOC) { - assert(content->type == FDS_OPTS_T_CONTEXT); - switch (content->id) { - case LIST_INPUTS: - file_parse_list_input(content->ptr_ctx, model); - break; - case LIST_INTER: - file_parse_list_inter(content->ptr_ctx, model); - break; - case LIST_OUTPUT: - file_parse_list_output(content->ptr_ctx, model); - break; - default: - // Unexpected XML node within startup ! - assert(false); - } - } - - return model; -} - -int -ipx_config_file(ipx_configurator &conf, const std::string &path) -{ - // Try to parse the configuration model and start the pipeline - ipx_config_model model; - try { - model = file_parse_model(path); - conf.start(model); - } catch (const std::exception &ex) { - IPX_ERROR(comp_str, "%s", ex.what()); - return EXIT_FAILURE; - } - - // Wait for a termination signal - sigset_t mask_new, mask_old; - sigemptyset(&mask_new); - sigaddset(&mask_new, SIGINT); - sigaddset(&mask_new, SIGTERM); - pthread_sigmask(SIG_BLOCK, &mask_new, &mask_old); - - while (true) { - int sig; - if (sigwait(&mask_new, &sig) != 0) { // Waits for _pending_ signals - IPX_WARNING(comp_str, "sigwait() failed.", '\0'); - continue; - } - - if (sig == SIGINT || sig == SIGTERM) { - break; - } - } - - IPX_INFO(comp_str, "Received a termination signal.", '\0'); - pthread_sigmask(SIG_SETMASK, &mask_old, NULL); - - // Register a handler that terminates the collector if it is not responding - struct sigaction sa; - memset(&sa, 0, sizeof(sa)); - sigemptyset(&sa.sa_mask); - sa.sa_handler = termination_handler; - if (sigaction(SIGTERM, &sa, NULL) == -1 || sigaction(SIGINT, &sa, NULL) == -1) { - IPX_ERROR(comp_str, "Failed to register termination signal handlers!", '\0'); - } - - // Stop the pipeline - conf.stop(); - return EXIT_SUCCESS; -} \ No newline at end of file diff --git a/src/core/configurator/config_file.hpp b/src/core/configurator/config_file.hpp deleted file mode 100644 index e674a888..00000000 --- a/src/core/configurator/config_file.hpp +++ /dev/null @@ -1,62 +0,0 @@ -/** - * \file src/core/configurator/config_file.hpp - * \author Lukas Hutak - * \brief Parser of configuration file (header file) - * \date 2018 - */ - -/* Copyright (C) 2018 CESNET, z.s.p.o. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in - * the documentation and/or other materials provided with the - * distribution. - * 3. Neither the name of the Company nor the names of its contributors - * may be used to endorse or promote products derived from this - * software without specific prior written permission. - * - * ALTERNATIVELY, provided that this notice is retained in full, this - * product may be distributed under the terms of the GNU General Public - * License (GPL) version 2 or later, in which case the provisions - * of the GPL apply INSTEAD OF those given above. - * - * This software is provided ``as is'', and any express or implied - * warranties, including, but not limited to, the implied warranties of - * merchantability and fitness for a particular purpose are disclaimed. - * In no event shall the company or contributors be liable for any - * direct, indirect, incidental, special, exemplary, or consequential - * damages (including, but not limited to, procurement of substitute - * goods or services; loss of use, data, or profits; or business - * interruption) however caused and on any theory of liability, whether - * in contract, strict liability, or tort (including negligence or - * otherwise) arising in any way out of the use of this software, even - * if advised of the possibility of such damage. - * - */ - -#ifndef IPFIXCOL_CONFIG_FILE_H -#define IPFIXCOL_CONFIG_FILE_H - -#include -#include "configurator.hpp" - -/** - * \brief Pass control to the file parser - * - * The function tries to load and parse a startup configuration from a file defined by \p path. - * The parsed configuration is passed to the configurator \p conf and the pipeline is established. - * - * \note The function blocks until the collector is supposed to run. - * \param[in] conf Collector configurator - * \param[in] path Startup file - * \return EXIT_SUCCESS or EXIT_FAILURE - */ -IPX_API int -ipx_config_file(ipx_configurator &conf, const std::string &path); - -#endif //IPFIXCOL_CONFIG_FILE_H diff --git a/src/core/configurator/configurator.cpp b/src/core/configurator/configurator.cpp index f253fc75..4bd704c4 100644 --- a/src/core/configurator/configurator.cpp +++ b/src/core/configurator/configurator.cpp @@ -118,11 +118,17 @@ ipx_configurator::ipx_configurator() ipx_configurator::~ipx_configurator() { // Make sure that all threads are terminated - m_running_inputs.clear(); - m_running_inter.clear(); - m_running_outputs.clear(); + cleanup(); - // TODO: Disable the signal handler + // Disable the signal handler + struct sigaction sa; + memset(&sa, 0, sizeof(sa)); + sigemptyset(&sa.sa_mask); + sigaddset(&sa.sa_mask, SIGINT); + sigaddset(&sa.sa_mask, SIGTERM); + sa.sa_handler = SIG_DFL; + sigaction(SIGTERM, &sa, NULL); + sigaction(SIGINT, &sa, NULL); // Destroy the configuration pipe ipx_cpipe_destroy(); @@ -364,16 +370,25 @@ void ipx_configurator::cleanup() bool ipx_configurator::termination_handle(const struct ipx_cpipe_req &req, ipx_controller *ctrl) { - // First of all, check if termination has been complete + // First of all, check if termination process has been completed if (req.type == IPX_CPIPE_TYPE_TERM_DONE) { - if (m_state == STATUS::STOP_SLOW || m_state == STATUS::STOP_FAST) { - // Termination process complete - return true; + if (m_state != STATUS::STOP_SLOW && m_state != STATUS::STOP_FAST) { + IPX_ERROR(comp_str, "Got a termination done notification, but the termination process " + "is not in progress!", '\0'); + return false; + } + + if (m_term_sent == 0) { + IPX_ERROR(comp_str, "[internal] Unexpected termination message", '\0'); + abort(); + } + + if (--m_term_sent != 0) { + // There is still at least one termination message in the pipeline + return false; } - IPX_ERROR(comp_str, "Got a termination done notification, but the termination process is " - "not in progress!", '\0'); - return false; + return true; } // Format a status message and notify the controller @@ -415,17 +430,17 @@ ipx_configurator::termination_handle(const struct ipx_cpipe_req &req, ipx_contro case STATUS::RUNNING: /* The first request to stop the collector... * A termination message must be delivered to all input plugins and some plugins must - * immediatelly stop processing all IPFIX messages. + * immediately stop processing all IPFIX messages. */ termination_send_msg(); // fall through case STATUS::STOP_SLOW: /* Another request to stop the collector has been received... - * There might be more plugins that should immediatelly stop processing all IPFIX messages. - */ + * There might be more plugins that should immediately stop processing all IPFIX messages. + */ if (next_state == STATUS::STOP_FAST) { - termination_stop_all(); + termination_stop_all(); } else { termination_stop_partly(req.ctx); } @@ -447,11 +462,11 @@ ipx_configurator::termination_handle(const struct ipx_cpipe_req &req, ipx_contro /** * \brief Disable data processing by all plugins - * + * * Calling the getter callbacks (input plugin instances) and processing callbacks (intermediate - * and output plugin instances) will be disabled and all plugin contexts will immediatelly + * and output plugin instances) will be disabled and all plugin contexts will immediately * drop IPFIX and Transport Session messages on arrival. Only garbage and configuration - * pipeline messages are still procesed. + * pipeline messages are still processed. */ void ipx_configurator::termination_stop_all() @@ -469,21 +484,21 @@ ipx_configurator::termination_stop_all() /** * \brief Disabled data processing by all plugins before the given context (including) - * + * * Calling the getter callbacks (input plugin instances) and processing callbacks (intermediate * and output plugin instances) of the plugins before the given context (including) will be - * disabled. Only garbage and configuration pipeline messages are still procesed by these + * disabled. Only garbage and configuration pipeline messages are still processed by these * particular instances. - * + * * The plugin instances after the given context are untouched. - * \note If the \p ctx is nullptr, no plugins are immediatelly terminated! + * \note If the \p ctx is nullptr, no plugins are immediately terminated! * \param[in] ctx Context of the plugin which invoked the termination sequence */ void ipx_configurator::termination_stop_partly(const ipx_ctx_t *ctx) { if (!ctx) { - // Nothing to do... + // Nothing to do... return; } @@ -492,7 +507,7 @@ ipx_configurator::termination_stop_partly(const ipx_ctx_t *ctx) for (auto &it : m_running_inputs) { it->set_processing(false); } - + if (ctx_info->type == IPX_PT_INPUT || ctx_info == &ipx_plugin_parser_info) { // The termination has been invoked by an input plugin or its message parser return; @@ -516,11 +531,14 @@ ipx_configurator::termination_stop_partly(const ipx_ctx_t *ctx) /** * \brief Send a termination message to all input plugins * - * The termination message will call instance destructor and stop the instance thread + * The termination message will call instance destructor and stop the instance thread. + * This function must be called only once! */ void ipx_configurator::termination_send_msg() { + assert(m_term_sent == 0 && "The termination message counter must be zero!"); + for (auto &input : m_running_inputs) { ipx_msg_terminate_t *msg = ipx_msg_terminate_create(IPX_MSG_TERMINATE_INSTANCE); if (!msg) { @@ -533,6 +551,7 @@ ipx_configurator::termination_send_msg() IPX_DEBUG(comp_str, "Requests to terminate the pipeline sent! Waiting for instances to " "terminate.", '\0'); + m_term_sent = m_running_inputs.size(); } int @@ -560,8 +579,8 @@ ipx_configurator::run(ipx_controller *ctrl) } // Collector is running -> process termination/reconfiguration requests + m_state = STATUS::RUNNING; bool terminate = false; - while (!terminate) { struct ipx_cpipe_req req; if (ipx_cpipe_receive(&req) != IPX_OK) { @@ -583,7 +602,6 @@ ipx_configurator::run(ipx_controller *ctrl) }; // The collector has been terminated - cleanup(); ctrl->terminate_after(); - return EXIT_SUCCESS; // TODO: determine by termination type + return EXIT_SUCCESS; } \ No newline at end of file diff --git a/src/core/configurator/configurator.hpp b/src/core/configurator/configurator.hpp index b17f0b09..e957f573 100644 --- a/src/core/configurator/configurator.hpp +++ b/src/core/configurator/configurator.hpp @@ -129,6 +129,8 @@ class ipx_configurator { std::vector > m_running_inter; /** Vector of running instances of output plugins */ std::vector > m_running_outputs; + /** Number of sent termination messages */ + size_t m_term_sent = 0; // Internal functions void diff --git a/src/core/configurator/controller_file.cpp b/src/core/configurator/controller_file.cpp index 49f26fba..688e52c0 100644 --- a/src/core/configurator/controller_file.cpp +++ b/src/core/configurator/controller_file.cpp @@ -16,9 +16,6 @@ #include "controller_file.hpp" -/** Component identification (for log) */ -static const char *comp_str = "Configurator"; - /** Types of XML configuration nodes */ enum file_xml_nodes { // List of plugin instances @@ -187,7 +184,7 @@ ipx_controller_file::parse_file(const std::string &path) fds_xml_ctx_t *ctx = fds_xml_parse_mem(parser.get(), fcontent.get(), true); if (!ctx) { - std::string err_msg = std::string(fds_xml_last_err(parser.get()); + std::string err_msg = std::string(fds_xml_last_err(parser.get())); throw ipx_controller::error("Failed to parse configuration: " + err_msg); } diff --git a/src/core/configurator/instance.hpp b/src/core/configurator/instance.hpp index 51a404bc..dee1deb1 100644 --- a/src/core/configurator/instance.hpp +++ b/src/core/configurator/instance.hpp @@ -155,7 +155,7 @@ class ipx_instance { */ virtual void set_processing(bool en) { - ipx_ctx_processing(_ctx, en); + ipx_ctx_processing_set(_ctx, en); } }; diff --git a/src/core/configurator/instance_input.cpp b/src/core/configurator/instance_input.cpp index 085a28f3..6dd44b08 100644 --- a/src/core/configurator/instance_input.cpp +++ b/src/core/configurator/instance_input.cpp @@ -182,6 +182,6 @@ ipx_instance_input::extensions_resolve(ipx_cfg_extensions *ext_mgr) void ipx_instance_input::set_processing(bool en) { - ipx_ctx_processing(_ctx, en); - ipx_ctx_processing(_parser_ctx, en); + ipx_ctx_processing_set(_ctx, en); + ipx_ctx_processing_set(_parser_ctx, en); } \ No newline at end of file diff --git a/src/core/context.c b/src/core/context.c index 8123aaa2..fc4dc8dd 100644 --- a/src/core/context.c +++ b/src/core/context.c @@ -40,6 +40,7 @@ */ #include +#include #include #include #include @@ -53,6 +54,7 @@ #include "fpipe.h" #include "ring.h" #include "message_ipfix.h" +#include "configurator/cpipe.h" /** Identification of this component (for log) */ const char *comp_str = "Context"; @@ -92,6 +94,8 @@ struct ipx_ctx { enum ipx_ctx_state state; /** Thread identification (valid only if state == #IPX_CS_RUNNING) */ pthread_t thread_id; + /** Enable data processing by the plugin (enabled by default) */ + bool en_processing; struct { /** @@ -178,6 +182,7 @@ ipx_ctx_create(const char *name, const struct ipx_ctx_callbacks *callbacks) ctx->permissions = 0; // No permissions ctx->plugin_cbs = callbacks; ctx->state = IPX_CS_NEW; + ctx->en_processing = true; ctx->cfg_system.vlevel = ipx_verb_level_get(); ctx->cfg_system.rec_size = IPX_MSG_IPFIX_BASE_REC_SIZE; @@ -388,6 +393,25 @@ ipx_ctx_ext_defs(ipx_ctx_t *ctx, struct ipx_ctx_ext **arr, size_t *arr_size) *arr_size = ctx->cfg_extension.items_cnt; } +void +ipx_ctx_processing_set(ipx_ctx_t *ctx, bool en) +{ + __atomic_store_n(&ctx->en_processing, en, __ATOMIC_RELAXED); +} + +/** + * @brief Get data processing status + * + * @see ipx_ctx_processing_set() + * @param[in] ctx Plugin context + * @return true/false + */ +static bool +ipx_ctx_processing_get(const ipx_ctx_t *ctx) +{ + return __atomic_load_n(&ctx->en_processing, __ATOMIC_RELAXED); +} + // ------------------------------------------------------------------------------------------------- /** @@ -758,6 +782,44 @@ ipx_ctx_init(ipx_ctx_t *ctx, const char *params) return IPX_OK; } +/** + * @brief Common handle function for the getter and process callbacks + * + * If the plugin function fails, it will send a request to stop the collector as fast as possible + * and disabled data processing of the instance. + * If the plugin function reports end-of-stream/file, it will send "slow" termination request + * (i.e. remaining data further in the pipeline will be processed) and disable data processing + * of the instance. + * + * @param[in] ctx Plugin context + * @param[in] rc Return code from the function + */ +static void +thread_handle_rc(struct ipx_ctx *ctx, int rc) +{ + switch (rc) { + case IPX_OK: + break; + case IPX_ERR_EOF: + // No more data -> stop the collector + IPX_CTX_DEBUG(ctx, "The instance has signalized end-of-file/stream.", '\0'); + ipx_ctx_processing_set(ctx, false); + ipx_cpipe_send_term(ctx, IPX_CPIPE_TYPE_TERM_SLOW); + break; + case IPX_ERR_DENIED: + // Fatal error -> stop the collector as fast as possible + IPX_CTX_ERROR(ctx, "ipx_plugin_get()/ipx_plugin_process() failed! The collector cannot " + "work properly anymore!", '\0'); + ipx_ctx_processing_set(ctx, false); + ipx_cpipe_send_term(ctx, IPX_CPIPE_TYPE_TERM_FAST); + break; + default: + IPX_CTX_ERROR(ctx, "ipx_plugin_get()/ipx_plugin_process() returned unexpected return " + "code (%d)! Ignoring.", rc); + break; + } +} + /** * \brief Try to receive a request from the feedback pipe and process it * @@ -836,7 +898,6 @@ thread_input(void *arg) IPX_CTX_DEBUG(ctx, "Instance thread of the input plugin '%s' has started!", plugin_name); bool terminate = false; - while (!terminate) { int rc = thread_input_process_pipe(ctx); if (rc == IPX_ERR_EOF) { @@ -845,8 +906,14 @@ thread_input(void *arg) continue; } + if (!ipx_ctx_processing_get(ctx)) { + // Processing is disabled -> wait for message from the feedback pipe + continue; + } + // Try to get a new IPFIX message - ctx->plugin_cbs->get(ctx, ctx->cfg_plugin.private); // TODO: check return value + rc = ctx->plugin_cbs->get(ctx, ctx->cfg_plugin.private); + thread_handle_rc(ctx, rc); } IPX_CTX_DEBUG(ctx, "Instance thread of the input plugin '%s' has been terminated!", @@ -876,8 +943,6 @@ thread_intermediate(void *arg) enum ipx_msg_type msg_type; bool terminate = false; - bool process_en = true; // enable message processing - while (!terminate) { // Get a new message for the buffer msg_ptr = ipx_ring_pop(ctx->pipeline.src); @@ -892,28 +957,34 @@ thread_intermediate(void *arg) // Drop the message, we are still waiting for another termination request IPX_CTX_DEBUG(ctx, "Termination message dropped. Waiting for %u remaining input " "plugin(s) to terminate.", ctx->cfg_system.term_msg_cnt); - ipx_msg_termiante_destroy(terminate_msg); + ipx_msg_terminate_destroy(terminate_msg); continue; } - // It's time to stop processing - process_en = false; if (type == IPX_MSG_TERMINATE_INSTANCE) { terminate = true; } } - if ((process_en && (msg_type & ctx->cfg_system.msg_mask_selected) != 0) - || ctx->type == IPX_PT_OUTPUT_MGR) { // Always pass all messages to the output manager - // Process the message - ctx->plugin_cbs->process(ctx, ctx->cfg_plugin.private, msg_ptr); // TODO:check return value + if (!ipx_ctx_processing_get(ctx) + && (msg_type == IPX_MSG_IPFIX || msg_type == IPX_MSG_SESSION)) { + // Data processing is disabled -> drop IPFIX and Session messages + ipx_msg_destroy(msg_ptr); + continue; + } + + bool msg_for_plugin = (msg_type & ctx->cfg_system.msg_mask_selected) != 0; + if ((ipx_ctx_processing_get(ctx) || ctx->type == IPX_PT_OUTPUT_MGR) && msg_for_plugin) { + // Pass data to the plugin + int rc = ctx->plugin_cbs->process(ctx, ctx->cfg_plugin.private, msg_ptr); + thread_handle_rc(ctx, rc); processed = true; } + // The message hasn't been processed by the plugin if (!processed && terminate != true) { /* Not processed by the instance, pass the message. - * Note: Termination message is passed after intermediate instance destructor! - */ + * Note: Termination message is passed after intermediate instance destructor! */ assert(ctx->type != IPX_PT_OUTPUT_MGR); ipx_ring_push(ctx->pipeline.dst, msg_ptr); } @@ -953,26 +1024,22 @@ thread_output(void *arg) IPX_CTX_DEBUG(ctx, "Instance thread of the output plugin '%s' has started!", plugin_name); bool terminate = false; - bool process_en = true; // enable message processing - while (!terminate) { // Get a new message for the buffer ipx_msg_t *msg_ptr = ipx_ring_pop(ctx->pipeline.src); enum ipx_msg_type msg_type = ipx_msg_get_type(msg_ptr); + bool msg_for_plugin = (msg_type & ctx->cfg_system.msg_mask_selected) != 0; - if (process_en && (msg_type & ctx->cfg_system.msg_mask_selected) != 0) { - // Process the message - ctx->plugin_cbs->process(ctx, ctx->cfg_plugin.private, msg_ptr); // TODO:check return value + if (ipx_ctx_processing_get(ctx) && msg_for_plugin) { + // Process the message by the plugin + int rc = ctx->plugin_cbs->process(ctx, ctx->cfg_plugin.private, msg_ptr); + thread_handle_rc(ctx, rc); } if (msg_type == IPX_MSG_TERMINATE) { ipx_msg_terminate_t *terminate_msg = ipx_msg_base2terminate(msg_ptr); enum ipx_msg_terminate_type type = ipx_msg_terminate_get_type(terminate_msg); - - if (type == IPX_MSG_TERMINATE_PROCESSING) { - // We received a request to stop passing message to the instance - process_en = false; - } else { + if (type == IPX_MSG_TERMINATE_INSTANCE) { // We received a request to terminate the instance terminate = true; } diff --git a/src/core/context.h b/src/core/context.h index d72864a7..0e37e482 100644 --- a/src/core/context.h +++ b/src/core/context.h @@ -266,19 +266,21 @@ ipx_ctx_term_cnt_set(ipx_ctx_t *ctx, unsigned int cnt); /** * \brief Enable/disable data processing - * - * If disabled, plugins are not allowed to process IPFIX and Transport Session messages. - * In case of intermediate and output plugins, these types of messages are dropped on arrival. - * In case of input plugins, the getter function will not be called anymore. - * However, all plugins are still normally processing termination and garbage messages. - * + * + * If disabled, the plugin is not allowed to process IPFIX and Session messages i.e. the + * getter (input plugins) or processing function (intermediate and output plugins) will not + * be called on a message arrival. Moreover, IPFIX and Session Messages will be dropped as + * the plugin cannot process them. Other message types (termination, garbage, etc) will be + * processed by the thread controller and automatically passed to the following plugin(s) + * - usually valid only for intermediate plugins. + * * \note * By default, data processing is enabled. * \param[in] ctx Plugin context * \param[in] en Enable/disable processing */ IPX_API void -ipx_ctx_processing(ipx_ctx_t *ctx, bool en); +ipx_ctx_processing_set(ipx_ctx_t *ctx, bool en); /** * \brief Get registered extensions and dependencies diff --git a/src/core/main.cpp b/src/core/main.cpp index a0a21fb9..1c133b9e 100644 --- a/src/core/main.cpp +++ b/src/core/main.cpp @@ -50,7 +50,6 @@ #include #include #include -#include "configurator/config_file.hpp" #include "configurator/configurator.hpp" #include "configurator/controller_file.hpp" diff --git a/src/core/message_base.c b/src/core/message_base.c index 31deacca..a2110a02 100644 --- a/src/core/message_base.c +++ b/src/core/message_base.c @@ -65,7 +65,7 @@ ipx_msg_destroy(ipx_msg_t *msg) ipx_msg_garbage_destroy(ipx_msg_base2garbage(msg)); break; case IPX_MSG_TERMINATE: - ipx_msg_termiante_destroy(ipx_msg_base2terminate(msg)); + ipx_msg_terminate_destroy(ipx_msg_base2terminate(msg)); break; } } diff --git a/src/core/message_terminate.c b/src/core/message_terminate.c index 3a6eccac..6bc8b963 100644 --- a/src/core/message_terminate.c +++ b/src/core/message_terminate.c @@ -43,6 +43,7 @@ #include #include "message_terminate.h" #include "message_base.h" +#include "configurator/cpipe.h" /** \brief Structure of a terminate message */ struct ipx_msg_terminate { @@ -72,9 +73,10 @@ ipx_msg_terminate_create(enum ipx_msg_terminate_type type) } void -ipx_msg_termiante_destroy(ipx_msg_terminate_t *msg) +ipx_msg_terminate_destroy(ipx_msg_terminate_t *msg) { ipx_msg_header_destroy((ipx_msg_t *) msg); + ipx_cpipe_send_term(NULL, IPX_CPIPE_TYPE_TERM_DONE); free(msg); } diff --git a/src/core/message_terminate.h b/src/core/message_terminate.h index bcadba24..2c9a5170 100644 --- a/src/core/message_terminate.h +++ b/src/core/message_terminate.h @@ -53,16 +53,6 @@ typedef struct ipx_msg_terminate ipx_msg_terminate_t; /** Type of instance termination */ enum ipx_msg_terminate_type { - /** - * \brief Stop processing of all messages (except Terminate messages) - * - * After receiving this message, a context MUST stop processing all IPFIX and Transport Session - * messages and let them through to other plugins until #IPX_MSG_TERMINATE_INSTANCE is received. - * This type of termination is usually used if a instance is not able to work correctly and - * the collector is about to shut down. The corrupted instance use this type of message to - * signalize this situation to other plugins. - */ - IPX_MSG_TERMINATE_PROCESSING, /** * \brief Stop instance * @@ -89,7 +79,7 @@ ipx_msg_terminate_create(enum ipx_msg_terminate_type type); * \param[in] msg Pointer to the message */ IPX_API void -ipx_msg_termiante_destroy(ipx_msg_terminate_t *msg); +ipx_msg_terminate_destroy(ipx_msg_terminate_t *msg); /** * \brief Get the termination type