diff --git a/src/core/configurator/configurator.cpp b/src/core/configurator/configurator.cpp index 6bf289c6..f253fc75 100644 --- a/src/core/configurator/configurator.cpp +++ b/src/core/configurator/configurator.cpp @@ -2,10 +2,10 @@ * \file src/core/configurator/configurator.cpp * \author Lukas Hutak * \brief Main pipeline configurator (source file) - * \date 2018 + * \date 2018-2020 */ -/* Copyright (C) 2018 CESNET, z.s.p.o. +/* Copyright (C) 2018-2020 CESNET, z.s.p.o. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -39,8 +39,10 @@ * */ +#include // STDOUT_FILENO #include #include +#include #include #include #include @@ -53,24 +55,80 @@ extern "C" { #include "../plugin_parser.h" #include "../plugin_output_mgr.h" #include "../verbose.h" +#include "../context.h" +#include "cpipe.h" } /** Component identification (for log) */ static const char *comp_str = "Configurator"; +/** + * @brief Terminating signal handler + * @param[in] sig Signal + */ +static void +termination_handler(int sig) +{ + (void) sig; + + // In case we change 'errno' (e.g. write()) + int errno_backup = errno; + static int cnt = 0; + + if (cnt > 0) { + static const char *msg = "Another termination signal detected. Quiting without cleanup...\n"; + write(STDOUT_FILENO, msg, strlen(msg)); + abort(); + } + cnt++; + + // Send a termination request to the configurator + int rc = ipx_cpipe_send_term(NULL, IPX_CPIPE_TYPE_TERM_FAST); + if (rc != IPX_OK) { + static const char *msg = "ERROR: Signal handler: failed to send a termination request"; + write(STDOUT_FILENO, msg, strlen(msg)); + } + + errno = errno_backup; +} + ipx_configurator::ipx_configurator() { - iemgr = nullptr; - ring_size = RING_DEF_SIZE; + m_iemgr = nullptr; + m_ring_size = RING_DEF_SIZE; + + // Create a configuration pipe + if (ipx_cpipe_init() != IPX_OK) { + throw std::runtime_error("Failed to initialize configurator (ipx_cpipe_init() failed)"); + } + + // Register default signal handlers + struct sigaction sa; + memset(&sa, 0, sizeof(sa)); + sigemptyset(&sa.sa_mask); + sigaddset(&sa.sa_mask, SIGINT); + sigaddset(&sa.sa_mask, SIGTERM); + sa.sa_flags |= SA_RESTART; + sa.sa_handler = termination_handler; + if (sigaction(SIGTERM, &sa, NULL) == -1 || sigaction(SIGINT, &sa, NULL) == -1) { + throw std::runtime_error("Failed to register termination signal handlers!"); + } } ipx_configurator::~ipx_configurator() { - // First, stop all instances - stop(); + // Make sure that all threads are terminated + m_running_inputs.clear(); + m_running_inter.clear(); + m_running_outputs.clear(); - if (iemgr != nullptr) { - fds_iemgr_destroy(iemgr); + // TODO: Disable the signal handler + + // Destroy the configuration pipe + ipx_cpipe_destroy(); + + if (m_iemgr != nullptr) { + fds_iemgr_destroy(m_iemgr); } } @@ -153,7 +211,7 @@ ipx_configurator::verbosity_str2level(const std::string &verb) void ipx_configurator::iemgr_set_dir(const std::string &path) { - iemgr_dir = path; + m_iemgr_dir = path; } void @@ -164,18 +222,18 @@ ipx_configurator::set_buffer_size(uint32_t size) + std::to_string(RING_MIN_SIZE) + " records."); } - ring_size = size; + m_ring_size = size; } void -ipx_configurator::start(const ipx_config_model &model) +ipx_configurator::startup(const ipx_config_model &model) { // Check the model and prepare Information Elements (may throw an exception) model_check(model); - iemgr = iemgr_load(iemgr_dir); + m_iemgr = iemgr_load(m_iemgr_dir); IPX_INFO(comp_str, "Information Elements have been successfully loaded from '%s'.", - iemgr_dir.c_str()); + m_iemgr_dir.c_str()); // In case of an exception, smart pointers make sure that all instances are destroyed std::vector > outputs; @@ -185,21 +243,21 @@ ipx_configurator::start(const ipx_config_model &model) // Phase 1. Create all instances (i.e. find plugins) for (const auto &output : model.outputs) { ipx_plugin_mgr::plugin_ref *ref = plugins.plugin_get(IPX_PT_OUTPUT, output.plugin); - outputs.emplace_back(new ipx_instance_output(output.name, ref, ring_size)); + outputs.emplace_back(new ipx_instance_output(output.name, ref, m_ring_size)); } for (const auto &inter : model.inters) { ipx_plugin_mgr::plugin_ref *ref = plugins.plugin_get(IPX_PT_INTERMEDIATE, inter.plugin); - inters.emplace_back(new ipx_instance_intermediate(inter.name, ref, ring_size)); + inters.emplace_back(new ipx_instance_intermediate(inter.name, ref, m_ring_size)); } for (const auto &input : model.inputs) { ipx_plugin_mgr::plugin_ref *ref = plugins.plugin_get(IPX_PT_INPUT, input.plugin); - inputs.emplace_back(new ipx_instance_input(input.name, ref, ring_size)); + inputs.emplace_back(new ipx_instance_input(input.name, ref, m_ring_size)); } // Insert the output manager as the last intermediate plugin - ipx_instance_outmgr *output_manager = new ipx_instance_outmgr(ring_size); + ipx_instance_outmgr *output_manager = new ipx_instance_outmgr(m_ring_size); inters.emplace_back(output_manager); IPX_DEBUG(comp_str, "All plugins have been successfully loaded.", '\0'); @@ -232,20 +290,20 @@ ipx_configurator::start(const ipx_config_model &model) for (size_t i = 0; i < model.outputs.size(); ++i) { ipx_instance_output *instance = outputs[i].get(); const ipx_plugin_output &cfg = model.outputs[i]; - instance->init(cfg.params, iemgr, verbosity_str2level(cfg.verbosity)); + instance->init(cfg.params, m_iemgr, verbosity_str2level(cfg.verbosity)); } - output_manager->init(iemgr, ipx_verb_level_get()); + output_manager->init(m_iemgr, ipx_verb_level_get()); for (size_t i = 0; i < model.inters.size(); ++i) { ipx_instance_intermediate *instance = inters[i].get(); const ipx_plugin_inter &cfg = model.inters[i]; - instance->init(cfg.params, iemgr, verbosity_str2level(cfg.verbosity)); + instance->init(cfg.params, m_iemgr, verbosity_str2level(cfg.verbosity)); } for (size_t i = 0; i < model.inputs.size(); ++i) { ipx_instance_input *instance = inputs[i].get(); const ipx_plugin_input &cfg = model.inputs[i]; - instance->init(cfg.params, iemgr, verbosity_str2level(cfg.verbosity)); + instance->init(cfg.params, m_iemgr, verbosity_str2level(cfg.verbosity)); } IPX_DEBUG(comp_str, "All instances have been successfully initialized.", '\0'); @@ -288,36 +346,244 @@ ipx_configurator::start(const ipx_config_model &model) } IPX_DEBUG(comp_str, "All threads of instances has been successfully started.", '\0'); - running_inputs = std::move(inputs); - running_inter = std::move(inters); - running_outputs = std::move(outputs); + m_running_inputs = std::move(inputs); + m_running_inter = std::move(inters); + m_running_outputs = std::move(outputs); +} + +void ipx_configurator::cleanup() +{ + // Wait for termination (destructor of smart pointers will call instance destructor) + m_running_inputs.clear(); + m_running_inter.clear(); + m_running_outputs.clear(); + + IPX_DEBUG(comp_str, "Cleanup complete!", '\0'); +} + +bool +ipx_configurator::termination_handle(const struct ipx_cpipe_req &req, ipx_controller *ctrl) +{ + // First of all, check if termination has been complete + if (req.type == IPX_CPIPE_TYPE_TERM_DONE) { + if (m_state == STATUS::STOP_SLOW || m_state == STATUS::STOP_FAST) { + // Termination process complete + return true; + } + + IPX_ERROR(comp_str, "Got a termination done notification, but the termination process is " + "not in progress!", '\0'); + return false; + } + + // Format a status message and notify the controller + assert(req.type == IPX_CPIPE_TYPE_TERM_SLOW || req.type == IPX_CPIPE_TYPE_TERM_FAST); + std::string msg = (req.type == IPX_CPIPE_TYPE_TERM_SLOW) ? "Slow" : "Fast"; + msg += " termination request has been received"; + if (req.ctx != nullptr) { + const struct ipx_plugin_info *info = ipx_ctx_plugininfo_get(req.ctx); + const char *plugin_type; + switch (info->type) { + case IPX_PT_INPUT: + plugin_type = "input plugin"; + break; + case IPX_PT_INTERMEDIATE: + plugin_type = "intermediate plugin"; + break; + case IPX_PT_OUTPUT: + plugin_type = "output plugin"; + break; + default: + plugin_type = ""; + break; + } + + msg += " from " + std::string(plugin_type) + " '" + std::string(info->name) + "' by " + + "instance '"+ ipx_ctx_name_get(req.ctx) + "'"; + } else { + msg += " from an external source"; + } + ctrl->terminate_on_request(req, msg); + + // Next state + STATUS next_state = (req.type == IPX_CPIPE_TYPE_TERM_FAST) + ? STATUS::STOP_FAST : STATUS::STOP_SLOW; + + /* Send a termination message (if it hasn't been send yet) and stop processing Data and Session + * request by selected instances (if any) */ + switch (m_state) { + case STATUS::RUNNING: + /* The first request to stop the collector... + * A termination message must be delivered to all input plugins and some plugins must + * immediatelly stop processing all IPFIX messages. + */ + termination_send_msg(); + // fall through + + case STATUS::STOP_SLOW: + /* Another request to stop the collector has been received... + * There might be more plugins that should immediatelly stop processing all IPFIX messages. + */ + if (next_state == STATUS::STOP_FAST) { + termination_stop_all(); + } else { + termination_stop_partly(req.ctx); + } + + m_state = next_state; + break; + + case STATUS::STOP_FAST: + /* Another request to stop the collector has been received... + * However, processing of IPFIX Messages by all plugins have been already stopped. + */ + break; + default: + assert(false && "Unimplemented behavior for the current state!"); + } + + return false; +} + +/** + * \brief Disable data processing by all plugins + * + * Calling the getter callbacks (input plugin instances) and processing callbacks (intermediate + * and output plugin instances) will be disabled and all plugin contexts will immediatelly + * drop IPFIX and Transport Session messages on arrival. Only garbage and configuration + * pipeline messages are still procesed. + */ +void +ipx_configurator::termination_stop_all() +{ + for (auto &it : m_running_inputs) { + it->set_processing(false); + }; + for (auto &it : m_running_inter) { + it->set_processing(false); + } + for (auto &it : m_running_outputs) { + it->set_processing(false); + } } -void ipx_configurator::stop() +/** + * \brief Disabled data processing by all plugins before the given context (including) + * + * Calling the getter callbacks (input plugin instances) and processing callbacks (intermediate + * and output plugin instances) of the plugins before the given context (including) will be + * disabled. Only garbage and configuration pipeline messages are still procesed by these + * particular instances. + * + * The plugin instances after the given context are untouched. + * \note If the \p ctx is nullptr, no plugins are immediatelly terminated! + * \param[in] ctx Context of the plugin which invoked the termination sequence + */ +void +ipx_configurator::termination_stop_partly(const ipx_ctx_t *ctx) { - if (running_inputs.empty()) { - // No instances -> nothing to do + if (!ctx) { + // Nothing to do... return; } - // Send a request to terminate to all input plugins - for (auto &input : running_inputs) { + // Stop all input plugins + const struct ipx_plugin_info *ctx_info = ipx_ctx_plugininfo_get(ctx); + for (auto &it : m_running_inputs) { + it->set_processing(false); + } + + if (ctx_info->type == IPX_PT_INPUT || ctx_info == &ipx_plugin_parser_info) { + // The termination has been invoked by an input plugin or its message parser + return; + } + + // Stop intermediate plugins + for (auto &it : m_running_inter) { + it->set_processing(false); + if (it->get_ctx() == ctx) { + return; + } + } + + // Stop output plugins + assert(ctx_info->type == IPX_PT_OUTPUT && "Output context expected!"); + for (auto &it : m_running_outputs) { + it->set_processing(false); + } +} + +/** + * \brief Send a termination message to all input plugins + * + * The termination message will call instance destructor and stop the instance thread + */ +void +ipx_configurator::termination_send_msg() +{ + for (auto &input : m_running_inputs) { ipx_msg_terminate_t *msg = ipx_msg_terminate_create(IPX_MSG_TERMINATE_INSTANCE); if (!msg) { IPX_ERROR(comp_str, "Failed to create a termination message. The plugins cannot be " "properly terminated! (%s:%d)", __FILE__, __LINE__); - continue; + abort(); } ipx_fpipe_write(input->get_feedback(), ipx_msg_terminate2base(msg)); } IPX_DEBUG(comp_str, "Requests to terminate the pipeline sent! Waiting for instances to " "terminate.", '\0'); +} - // Wait for termination (destructor of smart pointers will call instance destructor) - running_inputs.clear(); - running_inter.clear(); - running_outputs.clear(); +int +ipx_configurator::run(ipx_controller *ctrl) +{ + // Status variables + std::string msg = "Success"; + ipx_controller::OP_STATUS status = ipx_controller::OP_STATUS::SUCCESS; + + // Try to start the collector (get the configuration model, initialize and start plugins) + ctrl->start_before(); + try { + ipx_config_model model = ctrl->model_get(); + startup(model); + } catch (const std::exception &ex) { + msg = ex.what(); + status = ipx_controller::OP_STATUS::FAILED; + } - IPX_DEBUG(comp_str, "All instances successfully terminated.", '\0'); -} + ctrl->start_after(status, msg); + if (status == ipx_controller::OP_STATUS::FAILED) { + // Something went wrong -> bye + cleanup(); + return EXIT_FAILURE; + } + + // Collector is running -> process termination/reconfiguration requests + bool terminate = false; + + while (!terminate) { + struct ipx_cpipe_req req; + if (ipx_cpipe_receive(&req) != IPX_OK) { + // This is really bad -> we cannot even safely terminate the collector + IPX_ERROR(comp_str, "Configuration pipe is broken. Terminating..."); + abort(); + } + + switch (req.type) { + case IPX_CPIPE_TYPE_TERM_SLOW: + case IPX_CPIPE_TYPE_TERM_FAST: + case IPX_CPIPE_TYPE_TERM_DONE: + terminate = termination_handle(req, ctrl); + break; + default: + IPX_ERROR(comp_str, "Ignoring unknown configuration request!", '\0'); + continue; + } + }; + + // The collector has been terminated + cleanup(); + ctrl->terminate_after(); + return EXIT_SUCCESS; // TODO: determine by termination type +} \ No newline at end of file diff --git a/src/core/configurator/configurator.hpp b/src/core/configurator/configurator.hpp index 110c2f01..b17f0b09 100644 --- a/src/core/configurator/configurator.hpp +++ b/src/core/configurator/configurator.hpp @@ -2,10 +2,10 @@ * \file src/core/configurator/configurator.hpp * \author Lukas Hutak * \brief Main pipeline configurator (header file) - * \date 2018 + * \date 2018-2020 */ -/* Copyright (C) 2018 CESNET, z.s.p.o. +/* Copyright (C) 2018-2020 CESNET, z.s.p.o. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -52,33 +52,15 @@ #include "instance_outmgr.hpp" #include "instance_output.hpp" #include "plugin_mgr.hpp" +#include "controller.hpp" extern "C" { #include #include "../context.h" } -/** Main configurator of the internal pipeline */ +// Main configurator of the internal pipeline class ipx_configurator { -private: - /** Size of ring buffers */ - uint32_t ring_size; - /** Directory with definitions of Information Elements */ - std::string iemgr_dir; - - /** Manager of Information Elements */ - fds_iemgr_t *iemgr; - /** Vector of running instances of input plugins */ - std::vector > running_inputs; - /** Vector of running instances of intermediate plugins */ - std::vector > running_inter; - /** Vector of running instances of output plugins */ - std::vector > running_outputs; - - void model_check(const ipx_config_model &model); - fds_iemgr_t *iemgr_load(const std::string dir); - enum ipx_verb_level verbosity_str2level(const std::string &verb); - public: /** Minimal size of ring buffers between instances of plugins */ static constexpr uint32_t RING_MIN_SIZE = 128; @@ -94,37 +76,100 @@ class ipx_configurator { ipx_configurator(const ipx_configurator &) = delete; ipx_configurator& operator=(const ipx_configurator &) = delete; - /** Plugin manager */ + /** Plugin manager (can be modified by a user i.e. add paths, change behaviour, etc) */ ipx_plugin_mgr plugins; /** - * \brief Start the configuration with a new model - * - * Check if the model is valid (consists of at least one input and one output instance, - * etc.) and load definitions of Information Elements (i.e. iemgr_set_dir() must be defined). - * Then try to load all plugins, initialized them and, finally, start their execution threads. - * - * \param[in] model Configuration model (description of input/intermediate/output instances) - * \throw runtime_error in case of any error, e.g. failed to find plugins, failed to load - * definitions of Information Elements, failed to initialize an instance, etc. + * @brief Define a path to the directory of Information Elements definitions + * @param[in] path Path */ - void start(const ipx_config_model &model); - - /** - * \brief Stop and destroy all instances of all plugins - */ - void stop(); + void + iemgr_set_dir(const std::string &path); + /** + * @brief Define a size of ring buffers + * @param[in] size Size + */ + void + set_buffer_size(uint32_t size); - /** - * \brief Define a path to the directory of Information Elements definitions - * \param[in] path Path - */ - void iemgr_set_dir(const std::string &path); /** - * \brief Define a size of ring buffers - * \param[in] size Size + * @brief Run the collector based on a configuration from the controller + * + * @note + * Return code depends on the way how the collector was terminated. For example, if the + * termination was caused by invalid configuration of a plugin, memory allocation during + * processing, etc. return code corresponds to EXIT_FAILURE value. + * However, if the collector was terminated by a plugin, which informed the configurator + * that there are no more flow data for processing, return value is EXIT_SUCCESS. + * + * @param[in] ctrl Configuration controller + * @return Return code of the application */ - void set_buffer_size(uint32_t size); + int + run(ipx_controller *ctrl); + +private: + /// State type + enum class STATUS { + RUNNING, ///< No configuration change in progress + STOP_SLOW, ///< Stop stop in progress + STOP_FAST, ///< Fast stop in progress + } m_state; ///< Configuration state + + /** Size of ring buffers */ + uint32_t m_ring_size; + /** Directory with definitions of Information Elements */ + std::string m_iemgr_dir; + + /** Manager of Information Elements */ + fds_iemgr_t *m_iemgr; + /** Vector of running instances of input plugins */ + std::vector > m_running_inputs; + /** Vector of running instances of intermediate plugins */ + std::vector > m_running_inter; + /** Vector of running instances of output plugins */ + std::vector > m_running_outputs; + + // Internal functions + void + model_check(const ipx_config_model &model); + fds_iemgr_t * + iemgr_load(const std::string dir); + enum ipx_verb_level + verbosity_str2level(const std::string &verb); + + void + startup(const ipx_config_model &model); + void + cleanup(); + + bool + termination_handle(const struct ipx_cpipe_req &req, ipx_controller *ctrl); + void + termination_send_msg(); + + void + termination_stop_all(); + void + termination_stop_partly(const ipx_ctx_t *ctx); }; #endif //IPFIXCOL_CONFIGURATOR_H + +///** +// * \brief Start the configuration with a new model +// * +// * Check if the model is valid (consists of at least one input and one output instance, +// * etc.) and load definitions of Information Elements (i.e. iemgr_set_dir() must be defined). +// * Then try to load all plugins, initialized them and, finally, start their execution threads. +// * +// * \param[in] model Configuration model (description of input/intermediate/output instances) +// * \throw runtime_error in case of any error, e.g. failed to find plugins, failed to load +// * definitions of Information Elements, failed to initialize an instance, etc. +// */ +//void start(const ipx_config_model &model); +// +///** +// * \brief Stop and destroy all instances of all plugins +// */ +//void stop(); \ No newline at end of file diff --git a/src/core/configurator/controller.hpp b/src/core/configurator/controller.hpp new file mode 100644 index 00000000..74d732cb --- /dev/null +++ b/src/core/configurator/controller.hpp @@ -0,0 +1,153 @@ +/** + * @file src/core/configurator/controller.hpp + * @author Lukas Hutak + * @brief Abstract class for configuration controllers (header file) + * @date 2020 + * + * Copyright(c) 2020 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef IPFIXCOL2_CONTROLLER_BASE +#define IPFIXCOL2_CONTROLLER_BASE + +#include +#include + +#include "model.hpp" + +extern "C" { +#include "../verbose.h" +#include "cpipe.h" +} + +/** + * @brief Abstract class for configuration controllers + * + * The purpose of this class is to provide common API for configuration and notification + * about changes in the configuration of the collector. A controller implementation, which + * has to provide at least a configuration model getter, should be implemented as a wrapper over + * configuration stored in a file, database, SysRepo, etc. + * + * @note + * By default, the pipeline configurator, which is using this controller, automatically + * registers termination signal handlers for SIGINT and SIGTERM. If necessary, the controller + * is supposed to register additional handlers in start_before(). + */ +class ipx_controller { +public: + /// Forward declaration of a controller error + class error; + + /// Configuration status code + enum class OP_STATUS { + SUCCESS, ///< Configuration operation has been successfully applied + FAILED ///< Configuration operation has failed (no changes) + }; + + /** + * @brief Get configuration model of the collector + * + * The function is called to get the current configuration model during startup + * or reconfiguration procedure (if supported by the controller). + * @note It is guaranteed that this function is called for the first time after start_before() + * function. + * @return Configuration model + * @throw ipx_controller::error in case of failure (e.g. unable to get the model) + */ + virtual ipx_config_model + model_get() = 0; + + /** + * @brief Function called before start of the collector + * + * The controller might initialize its internal structures (for example, establish connection + * to a database, configuration repository, etc.) and register additional signal handlers. + * @throw ipx_controller::error in case of failure + */ + virtual void + start_before() + { + IPX_DEBUG(m_name, "Configuration process has started...", '\0'); + }; + + /** + * @brief Function called after start of the collector + * @note + * If startup fails, termination functions i.e. terminate_before() and terminate_after() + * are not called! + * @param[in] status Operation status (success/failure) + * @param[in] msg Human readable result (usually a description of what went wrong) + * @note The function should not throw any exception! + */ + virtual void + start_after(OP_STATUS status, std::string msg) + { + switch (status) { + case OP_STATUS::SUCCESS: + IPX_INFO(m_name, "Collector started successfully!", '\0'); + break; + case OP_STATUS::FAILED: + IPX_ERROR(m_name, "Collector failed to start: %s", msg.c_str()); + break; + } + }; + + /** + * @brief Function is called when a termination request is received + * + * The termination process itself cannot be stopped as there might be situations that + * cannot be solved by the controller, such as a memory allocation error in a instance + * of pipeline plugin etc. + * + * @note + * This function might be called multiple times before termination is complete. It can + * happen, for example, if another termination request is received before the termination + * process is complete. + * @note + * The function is called only if the collector has been previously successfully initialized. + * @note The function should not throw any exception! + * @param[in] msg Contains information about the reason/source of termination + */ + virtual void + terminate_on_request(const struct ipx_cpipe_req &req, std::string msg) + { + (void) req; + IPX_INFO(m_name, "Received a termination request (%s)!", msg.c_str()); + }; + + /** + * @brief Function called after termination + * + * All plugin instances have been already stopped and their context has been destroyed. + * The controller might destroy its internal structures here. + * @note The function should not throw any exception! + */ + virtual void + terminate_after() + { + IPX_DEBUG(m_name, "Termination process completed!", '\0'); + }; + +private: + const char *m_name = "Configurator"; +}; + +/** + * @brief Controller custom error + */ +class ipx_controller::error : public std::runtime_error { +public: + /** + * \brief Manager error constructor (string constructor) + * \param[in] msg An error message + */ + explicit error(const std::string &msg) : std::runtime_error(msg) {}; + /** + * \brief Manager error constructor (char constructor) + * \param[in] msg An error message + */ + explicit error(const char *msg) : std::runtime_error(msg) {}; +}; + +#endif // IPFIXCOL2_CONTROLLER_BASE diff --git a/src/core/configurator/controller_file.cpp b/src/core/configurator/controller_file.cpp new file mode 100644 index 00000000..49f26fba --- /dev/null +++ b/src/core/configurator/controller_file.cpp @@ -0,0 +1,416 @@ +/** + * @file src/core/configurator/controller_file.cpp + * @author Lukas Hutak + * @brief Configuration controller for file based configuration (source file) + * @date 2019 + * + * Copyright(c) 2019 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include // unique_ptr +#include // realpath +#include // realpath +#include // fread, fseek +#include // stat + +#include "controller_file.hpp" + +/** Component identification (for log) */ +static const char *comp_str = "Configurator"; + +/** Types of XML configuration nodes */ +enum file_xml_nodes { + // List of plugin instances + LIST_INPUTS = 1, + LIST_INTER, + LIST_OUTPUT, + // Instances + INSTANCE_INPUT, + INSTANCE_INTER, + INSTANCE_OUTPUT, + // Input plugin parameters + IN_PLUGIN_NAME, + IN_PLUGIN_PLUGIN, + IN_PLUGIN_PARAMS, + IN_PLUGIN_VERBOSITY, + // Intermediate plugin parameters + INTER_PLUGIN_NAME, + INTER_PLUGIN_PLUGIN, + INTER_PLUGIN_PARAMS, + INTER_PLUGIN_VERBOSITY, + // Output plugin parameters + OUT_PLUGIN_NAME, + OUT_PLUGIN_PLUGIN, + OUT_PLUGIN_PARAMS, + OUT_PLUGIN_VERBOSITY, + OUT_PLUGIN_ODID_ONLY, + OUT_PLUGIN_ODID_EXCEPT, +}; + +/** + * \brief Definition of the \ node + * \note Presence of the all required parameters is checked during building of the model + */ +static const struct fds_xml_args args_instance_input[] = { + FDS_OPTS_ELEM(IN_PLUGIN_NAME, "name", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), + FDS_OPTS_ELEM(IN_PLUGIN_PLUGIN, "plugin", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), + FDS_OPTS_ELEM(IN_PLUGIN_VERBOSITY, "verbosity", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), + FDS_OPTS_RAW( IN_PLUGIN_PARAMS, "params", FDS_OPTS_P_OPT), + FDS_OPTS_END +}; + +/** + * \brief Definition of the \ node + * \note The configurator checks later if at least one instance is present + */ +static const struct fds_xml_args args_list_inputs[] = { + FDS_OPTS_NESTED(INSTANCE_INPUT, "input", args_instance_input, FDS_OPTS_P_OPT | FDS_OPTS_P_MULTI), + FDS_OPTS_END +}; + +/** + * \brief Definition of the \ node + * \note Presence of the all required parameters is checked during building of the model + */ +static const struct fds_xml_args args_instance_inter[] = { + FDS_OPTS_ELEM(INTER_PLUGIN_NAME, "name", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), + FDS_OPTS_ELEM(INTER_PLUGIN_PLUGIN, "plugin", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), + FDS_OPTS_ELEM(INTER_PLUGIN_VERBOSITY, "verbosity", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), + FDS_OPTS_RAW( INTER_PLUGIN_PARAMS, "params", FDS_OPTS_P_OPT), + FDS_OPTS_END +}; + +/** Definition of the \ node */ +static const struct fds_xml_args args_list_inter[] = { + FDS_OPTS_NESTED(INSTANCE_INTER, "intermediate", args_instance_inter, FDS_OPTS_P_OPT | FDS_OPTS_P_MULTI), + FDS_OPTS_END +}; + +/** + * \brief Definition of the \ node + * \note Presence of the all required parameters is checked during building of the model + */ +static const struct fds_xml_args args_instance_output[] = { + FDS_OPTS_ELEM(OUT_PLUGIN_NAME, "name", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), + FDS_OPTS_ELEM(OUT_PLUGIN_PLUGIN, "plugin", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), + FDS_OPTS_ELEM(OUT_PLUGIN_VERBOSITY, "verbosity", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), + FDS_OPTS_ELEM(OUT_PLUGIN_ODID_EXCEPT, "odidExcept", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), + FDS_OPTS_ELEM(OUT_PLUGIN_ODID_ONLY, "odidOnly", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), + FDS_OPTS_RAW( OUT_PLUGIN_PARAMS, "params", FDS_OPTS_P_OPT), + FDS_OPTS_END +}; + +/** + * \brief Definition of the \ node + * \note The configurator checks later if at least one instance is present + */ +static const struct fds_xml_args args_list_output[] = { + FDS_OPTS_NESTED(INSTANCE_OUTPUT, "output", args_instance_output, FDS_OPTS_P_OPT | FDS_OPTS_P_MULTI), + FDS_OPTS_END +}; + +/** + * \brief Definition of the main \ node + * \note + * Missing an input or output instance is check during starting of a new pipeline in the + * configurator. + */ +static const struct fds_xml_args args_main[] = { + FDS_OPTS_ROOT("ipfixcol2"), + FDS_OPTS_NESTED(LIST_INPUTS, "inputPlugins", args_list_inputs, FDS_OPTS_P_OPT), + FDS_OPTS_NESTED(LIST_INTER, "intermediatePlugins", args_list_inter, FDS_OPTS_P_OPT), + FDS_OPTS_NESTED(LIST_OUTPUT, "outputPlugins", args_list_output, FDS_OPTS_P_OPT), + FDS_OPTS_END +}; + +// ------------------------------------------------------------------------------------------------- + +ipx_controller_file::ipx_controller_file(std::string path) + : m_path(path) +{ +} + +ipx_config_model +ipx_controller_file::model_get() +{ + return parse_file(m_path); +} + +ipx_config_model +ipx_controller_file::parse_file(const std::string &path) +{ + // Is it really a configuration file + struct stat file_info; + std::unique_ptr real_path(realpath(path.c_str(), nullptr), &free); + if (!real_path || stat(real_path.get(), &file_info) != 0) { + throw ipx_controller::error("Failed to get info about '" + path + "'. " + + "Check if the path exists and the application has permission to access it."); + } + + if ((file_info.st_mode & S_IFREG) == 0) { + throw ipx_controller::error("The specified path '" + path + "' doesn't lead to a valid " + "configuration file!"); + } + + // Load content of the configuration file + std::unique_ptr stream(fopen(path.c_str(), "r"), &fclose); + if (!stream) { + // Failed to open the file + throw ipx_controller::error("Unable to open the file '" + path + "'."); + } + + // Load whole content of the file + fseek(stream.get(), 0, SEEK_END); + long fsize = ftell(stream.get()); + if (fsize == -1) { + throw ipx_controller::error("Unable to get the size of the file '" + path + "'"); + } + + rewind(stream.get()); + std::unique_ptr fcontent(new char[fsize + 1]); + if (fread(fcontent.get(), fsize, 1, stream.get()) != 1) { + throw ipx_controller::error("Failed to load startup configuration."); + } + fcontent.get()[fsize] = '\0'; + + // Create a parser and try try to parse document + std::unique_ptr parser(fds_xml_create(), &fds_xml_destroy); + if (!parser) { + throw ipx_controller::error("fds_xml_create() failed!"); + } + + if (fds_xml_set_args(parser.get(), args_main) != FDS_OK) { + std::string err_msg = std::string(fds_xml_last_err(parser.get())); + throw ipx_controller::error("fds_xml_set_args() failed: " + err_msg); + } + + fds_xml_ctx_t *ctx = fds_xml_parse_mem(parser.get(), fcontent.get(), true); + if (!ctx) { + std::string err_msg = std::string(fds_xml_last_err(parser.get()); + throw ipx_controller::error("Failed to parse configuration: " + err_msg); + } + + ipx_config_model model; + const struct fds_xml_cont *content; + while(fds_xml_next(ctx, &content) != FDS_EOC) { + assert(content->type == FDS_OPTS_T_CONTEXT); + switch (content->id) { + case LIST_INPUTS: + parse_list_input(content->ptr_ctx, model); + break; + case LIST_INTER: + parse_list_inter(content->ptr_ctx, model); + break; + case LIST_OUTPUT: + parse_list_output(content->ptr_ctx, model); + break; + default: + // Unexpected XML node within startup ! + assert(false); + } + } + + return model; +} + +/** + * \brief Parse \ node and add the parsed input instances to the model + * \param[in] ctx Parsed XML node + * \param[in] model Configuration model + * \throw ipx_controller::error if the parameters are not valid or missing + */ +void +ipx_controller_file::parse_list_input(fds_xml_ctx_t *ctx, ipx_config_model &model) +{ + unsigned int cnt = 0; + const struct fds_xml_cont *content; + + while (fds_xml_next(ctx, &content) != FDS_EOC) { + // Process an input plugin + assert(content->id == INSTANCE_INPUT); + cnt++; + + try { + parse_instance_input(content->ptr_ctx, model); + } catch (std::exception &ex) { + throw ipx_controller::error("Failed to parse the configuration of the " + + std::to_string(cnt) + ". input plugin (" + ex.what() + ")"); + } + } +} + +/** + * \brief Parse \ node and add the parsed intermediate instances to the model + * \param[in] ctx Parsed XML node + * \param[in] model Configuration model + * \throw ipx_controller::error if the parameters are not valid or missing + */ +void ipx_controller_file::parse_list_inter(fds_xml_ctx_t *ctx, ipx_config_model &model) +{ + unsigned int cnt = 0; + const struct fds_xml_cont *content; + + while (fds_xml_next(ctx, &content) != FDS_EOC) { + // Process an intermediate plugin + assert(content->id == INSTANCE_INTER); + cnt++; + + try { + parse_instance_inter(content->ptr_ctx, model); + } catch (std::exception &ex) { + throw ipx_controller::error("Failed to parse the configuration of the " + + std::to_string(cnt) + ". intermediate plugin (" + ex.what() + ")"); + } + } +} + +/** + * \brief Parse \ node and add the parsed intermediate instances to the model + * \param[in] ctx Parsed XML node + * \param[in] model Configuration model + * \throw ipx_controller::error if the parameters are not valid or missing + */ +void +ipx_controller_file::parse_list_output(fds_xml_ctx_t *ctx, ipx_config_model &model) +{ + unsigned int cnt = 0; + const struct fds_xml_cont *content; + + while (fds_xml_next(ctx, &content) != FDS_EOC) { + // Process an output plugin + assert(content->id == INSTANCE_OUTPUT); + cnt++; + + try { + parse_instance_output(content->ptr_ctx, model); + } catch (std::exception &ex) { + throw ipx_controller::error("Failed to parse the configuration of the " + + std::to_string(cnt) + ". output plugin (" + ex.what() + ")"); + } + } +} + +/** + * \brief Parse \ node and add the parsed input instance to the model + * \param[in] ctx Parsed XML node + * \param[in] model Configuration model + * \throw std::invalid_argument if the parameters are not valid or missing + */ +void +ipx_controller_file::parse_instance_input(fds_xml_ctx_t *ctx, ipx_config_model &model) +{ + struct ipx_plugin_input input; + + const struct fds_xml_cont *content; + while (fds_xml_next(ctx, &content) != FDS_EOC) { + switch (content->id) { + case IN_PLUGIN_NAME: + input.name = content->ptr_string; + break; + case IN_PLUGIN_PLUGIN: + input.plugin = content->ptr_string; + break; + case IN_PLUGIN_VERBOSITY: + input.verbosity = content->ptr_string; + break; + case IN_PLUGIN_PARAMS: + input.params = content->ptr_string; + break; + default: + // Unexpected XML node within ! + assert(false); + } + } + + model.add_instance(input); +} + + +/** + * \brief Parse \ node and add the parsed intermediate instance to the model + * \param[in] ctx Parsed XML node + * \param[in] model Configuration model + * \throw std::invalid_argument if the parameters are not valid or missing + */ +void +ipx_controller_file::parse_instance_inter(fds_xml_ctx_t *ctx, ipx_config_model &model) +{ + struct ipx_plugin_inter inter; + + const struct fds_xml_cont *content; + while (fds_xml_next(ctx, &content) != FDS_EOC) { + switch (content->id) { + case INTER_PLUGIN_NAME: + inter.name = content->ptr_string; + break; + case INTER_PLUGIN_PLUGIN: + inter.plugin = content->ptr_string; + break; + case INTER_PLUGIN_VERBOSITY: + inter.verbosity = content->ptr_string; + break; + case INTER_PLUGIN_PARAMS: + inter.params = content->ptr_string; + break; + default: + // "Unexpected XML node within !" + assert(false); + } + } + + model.add_instance(inter); +} + +/** + * \brief Parse \ node and add the parsed output instance to the model + * \param[in] ctx Parsed XML node + * \param[in] model Configuration model + * \throw std::invalid_argument if the parameters are not valid or missing + */ +void +ipx_controller_file::parse_instance_output(fds_xml_ctx_t *ctx, ipx_config_model &model) +{ + struct ipx_plugin_output output; + output.odid_type = IPX_ODID_FILTER_NONE; // default + bool odid_set = false; + + const struct fds_xml_cont *content; + while (fds_xml_next(ctx, &content) != FDS_EOC) { + switch (content->id) { + case OUT_PLUGIN_NAME: + output.name = content->ptr_string; + break; + case OUT_PLUGIN_PLUGIN: + output.plugin = content->ptr_string; + break; + case OUT_PLUGIN_VERBOSITY: + output.verbosity= content->ptr_string; + break; + case OUT_PLUGIN_PARAMS: + output.params = content->ptr_string; + break; + case OUT_PLUGIN_ODID_EXCEPT: + if (!odid_set) { + output.odid_type = IPX_ODID_FILTER_EXCEPT; + output.odid_expression = content->ptr_string; + odid_set = true; + break; + } + throw std::invalid_argument("Multiple definitions of /!"); + case OUT_PLUGIN_ODID_ONLY: + if (!odid_set) { + output.odid_type = IPX_ODID_FILTER_ONLY; + output.odid_expression = content->ptr_string; + odid_set = true; + break; + } + throw std::invalid_argument("Multiple definitions of /!"); + default: + // Unexpected XML node within ! + assert(false); + } + } + + model.add_instance(output); +} \ No newline at end of file diff --git a/src/core/configurator/controller_file.hpp b/src/core/configurator/controller_file.hpp new file mode 100644 index 00000000..3a44247a --- /dev/null +++ b/src/core/configurator/controller_file.hpp @@ -0,0 +1,63 @@ +/** + * @file src/core/configurator/controller_file.hpp + * @author Lukas Hutak + * @brief Configuration controller for file based configuration (header file) + * @date 2019 + * + * Copyright(c) 2019 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef IPFIXCOL2_CONTROLLER_FILE +#define IPFIXCOL2_CONTROLLER_FILE + +#include + +#include "model.hpp" +#include "controller.hpp" + +/** + * @brief Implementation of the controller based on a configuration file + * + * The controller is able to parse the file and create a configuration model based on it. + */ +class ipx_controller_file : public ipx_controller { +public: + /** + * @brief Controller constructor + * @param path Path to the configuration file + */ + ipx_controller_file(std::string path); + + /** + * @brief Parse the configuration file and create a model + * @return Configuration model + * @throw ipx_controller::error if the file is doesn't exist or it is malformed + */ + ipx_config_model + model_get() override; + +private: + /// Path to the configuration file + std::string m_path; + + // Internal functions + static ipx_config_model + parse_file(const std::string &path); + + static void + parse_list_input(fds_xml_ctx_t *ctx, ipx_config_model &model); + static void + parse_list_inter(fds_xml_ctx_t *ctx, ipx_config_model &model); + static void + parse_list_output(fds_xml_ctx_t *ctx, ipx_config_model &model); + + static void + parse_instance_input(fds_xml_ctx_t *ctx, ipx_config_model &model); + static void + parse_instance_inter(fds_xml_ctx_t *ctx, ipx_config_model &model); + static void + parse_instance_output(fds_xml_ctx_t *ctx, ipx_config_model &model); +}; + +#endif //IPFIXCOL2_CONTROLLER_FILE diff --git a/src/core/configurator/cpipe.c b/src/core/configurator/cpipe.c new file mode 100644 index 00000000..f9cae9b0 --- /dev/null +++ b/src/core/configurator/cpipe.c @@ -0,0 +1,142 @@ +/** + * @file src/core/configurator/cpipe.c + * @author Lukas Hutak + * @brief Configuration request pipe (source file) + * @date 2019 + * + * Copyright(c) 2020 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + + +#include +#include +#include +#include // PIPE_BUF + +#include + +#include "cpipe.h" +#include "../verbose.h" + +/// Invalid file descriptor value +#define INVALID_FD (-1) +/// Configuration pipe - cpipe_fd[0] for read, cpipe_fd[1] for write +static int cpipe_fd[2] = {INVALID_FD, INVALID_FD}; +/// Identification of the module (just for log!) +static const char *module = "Configuration pipe"; + +// Size of the request must allow atomic write! (see write() for help) +static_assert(sizeof(struct ipx_cpipe_req) <= PIPE_BUF); + +int +ipx_cpipe_init() +{ + assert(cpipe_fd[0] == INVALID_FD && cpipe_fd[1] == INVALID_FD && "Already initialized!"); + int rc; + const char *err_str; + + // Create a pipe + rc = pipe(cpipe_fd); + if (rc != 0) { + ipx_strerror(errno, err_str); + IPX_ERROR(module, "pipe() failed: %s", err_str); + return IPX_ERR_DENIED; + } + + // Make write end non-blocking + int flags = fcntl(cpipe_fd[1], F_GETFL); + if (flags == -1) { + ipx_strerror(errno, err_str); + IPX_ERROR(module, "fcntl(..., F_GETFL) failed: %s", err_str); + ipx_cpipe_destroy(); + return IPX_ERR_DENIED; + } + flags |= O_NONBLOCK; + if (fcntl(cpipe_fd[1], F_SETFL, flags) == -1) { + ipx_strerror(errno, err_str); + IPX_ERROR(module, "fcntl(..., F_SETFL) failed: %s", err_str); + ipx_cpipe_destroy(); + return IPX_ERR_DENIED; + } + + return IPX_OK; +} + +void +ipx_cpipe_destroy() +{ + for (int i = 0; i < 2; ++i) { + if (cpipe_fd[i] == INVALID_FD) { + continue; + } + + close(cpipe_fd[i]); + cpipe_fd[i] = INVALID_FD; + } +} + +int +ipx_cpipe_receive(struct ipx_cpipe_req *msg) +{ + const size_t buffer_size = sizeof(*msg); + size_t buffer_read = 0; + + errno = 0; + while (buffer_read < buffer_size) { + uint8_t *ptr = ((uint8_t *) msg) + buffer_read; + ssize_t rc = read(cpipe_fd[0], ptr, buffer_size - buffer_read); + if (rc > 0) { + buffer_read += (size_t) rc; + continue; + } + + if (rc == -1 && errno == EINTR) { + // Try again + continue; + } + + // Unable to read + if (rc == 0) { + IPX_ERROR(module, "read() failed (write end-point is probably closed)", '\0'); + } else { + const char *err_str; + ipx_strerror(errno, err_str); + IPX_ERROR(module, "read() failed: %s", err_str); + } + + return IPX_ERR_DENIED; + } + + return IPX_OK; +} + +int +ipx_cpipe_send_term(ipx_ctx_t *ctx, enum ipx_cpipe_type type) +{ + // WARNING: Keep on mind that this function can be called from signal handler! + + // In case we change 'errno' (e.g. write()) + int errno_backup = errno; + + if (type != IPX_CPIPE_TYPE_TERM_SLOW + && type != IPX_CPIPE_TYPE_TERM_FAST + && type != IPX_CPIPE_TYPE_TERM_DONE) { + return IPX_ERR_ARG; + } + + // Prepare a request + struct ipx_cpipe_req req; + memset(&req, 0, sizeof(req)); + req.type = type; + req.ctx = ctx; + + // Send it + int rc = write(cpipe_fd[1], &req, sizeof(req)); + // Just in case, write of this size is atomic so following must be always true + assert((rc == -1 || rc == sizeof(req)) && "Non-atomic write() is not allowed!"); + + errno = errno_backup; + return (rc == -1) ? IPX_ERR_DENIED : IPX_OK; +} + diff --git a/src/core/configurator/cpipe.h b/src/core/configurator/cpipe.h new file mode 100644 index 00000000..fb258ee6 --- /dev/null +++ b/src/core/configurator/cpipe.h @@ -0,0 +1,162 @@ +/** + * @file src/core/configurator/cpipe.h + * @author Lukas Hutak + * @brief Configuration request pipe (header file) + * @date 2019 + * + * Copyright(c) 2020 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef IPFIXCOL2_CONFIGURATOR_PIPE_H +#define IPFIXCOL2_CONFIGURATOR_PIPE_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "../context.h" + +/* + * Configuration pipeline + * + * The purpose of this pipe is to allow, for example, plugin instances and + * signal handlers to send requests to terminate or reconfigure the collector. + * + * Since the pipe MUST be accessible from signal handlers, the pipe is + * implemented as a global variable. + */ + +/// Type of configuration request +enum ipx_cpipe_type { + /** + * @brief Slow termination request + * + * Request to slowly terminate the collector. Usually this request should be send when + * there are NO more data to process. For example: + * - When an input plugin is reading flow data from a file and end-of-file has been reached. + * All loaded flow data in the processing pipeline will be processed and plugin instances, + * which completed processing, will be gradually terminated. + * - When an intermediate plugin is configured to stop after processing specified amount + * of flow records (e.g. filter plugin which should stop after matching 20 flow records). + * All flow records in the pipeline AFTER this plugin will be fully processed, however, + * flow records in the pipeline before this plugin instance are not interesting anymore and + * should be dropped. + * + * As a reaction to this request the configurator will send a termination message + * (type #IPX_MSG_TERMINATE_INSTANCE) to the processing pipeline. + * + * Moreover, if the context of a plugin instance, which sent the request, is not NULL, + * the configurator will also force stop calling processing functions (i.e. ipx_plugin_get() + * and ipx_plugin_process()) of plugin instances BEFORE (including) the calling instance + * as their data are no longer required. + * + * @note + * The plugin context (if not NULL) in the request will be used to inform a configuration + * controller about the source of termination. + */ + IPX_CPIPE_TYPE_TERM_SLOW, + /** + * @brief Fast termination request + * + * Request to terminate the collector as fast as possible. Usually this request should be + * send in case of a fatal failure of any plugin instance (by a particular instance thread) + * or on a user request to terminate the collector (e.g. SIGINT/SIGTERM signals). + * + * As a reaction to this request the configurator will force stop calling processing functions + * of ALL plugin instances (i.e. ipx_plugin_get() and ipx_plugin_process() callbacks) and send + * a termination message (type #IPX_MSG_TERMINATE_INSTANCE) to the processing pipeline. + * Unprocessed flow data in processing pipeline will be ignored. + * + * @note + * The plugin context (if not NULL) in the request will be used to inform a configuration + * controller about the source of termination. + */ + IPX_CPIPE_TYPE_TERM_FAST, + /** + * @brief Termination complete notification (internal only!) + * + * The request MUST be send after termination of all plugin instances to notify + * the configurator that it is safe to perform final cleanup. Usually, this request is + * automatically send when a termination message (i.e. ipx_msg_terminate_t), which was send + * by the configurator to the processing pipeline as a response to a previous + * #IPX_CPIPE_TYPE_TERM_SLOW or #IPX_CPIPE_TYPE_TERM_FAST request, is destroyed. + * + * Sending this request before termination of all plugin instances is considered as fatal. + */ + IPX_CPIPE_TYPE_TERM_DONE ///< Terminate request - complete + + // Proposed types for the future runtime reconfiguration + // IPX_CPIPE_TYPE_RECONF_START, + // IPX_CPIPE_TYPE_RECONF_DONE +}; + +/// Configuration request +struct ipx_cpipe_req { + /// Type of configuration message + enum ipx_cpipe_type type; + /// Plugin context which send the request (can be NULL) + ipx_ctx_t *ctx; + + /// Place for additional data +}; + +/** + * @brief Initialize internal configuration pipe + * + * @note + * This function MUST be called exactly ONCE before start of the configuration process. + * @return #IPX_OK on success + * @return #IPX_ERR_DENIED if initialization fails (e.g. unable to create a pipe, etc.) + */ +int +ipx_cpipe_init(); + +/** + * @brief Destroy internal configuration pipe + * + * @note + * This function MUST be called exactly ONCE after termination of the configuration process. + */ +void +ipx_cpipe_destroy(); + +/** + * @brief Get a request from the configuration pipe + * + * The function blocks until a request is received. If the waiting is interrupted (for example, + * by a signal handler), waiting for the request is silently restarted. + * + * @warning + * The function MUST be called only from the configurator! (as reading is not atomic operation) + * @param[out] msg Received message + * @return #IPX_OK on success and @p msg is filled + * @return #IPX_ERR_DENIED on a fatal error and the content of @p msg is undefined + * + */ +int +ipx_cpipe_receive(struct ipx_cpipe_req *msg); + +/** + * @brief Send a new termination request + * + * See the description of #IPX_CPIPE_TYPE_TERM_SLOW and #IPX_CPIPE_TYPE_TERM_FAST requests for + * more details. + * + * @note + * The function is safe to be called from a signal handler! In this case parameter @p ctx + * should be set to NULL. + * @param[in] ctx Plugin context (which is sending request) or NULL + * @param[in] type Type of termination request + * @return #IPX_OK on success + * @return #IPX_ERR_ARG if the @p type is not termination request i.e. IPX_CPIPE_TYPE_TERM_* + * @return #IPX_ERR_DENIED if the request failed to be sent + */ +int +ipx_cpipe_send_term(ipx_ctx_t *ctx, enum ipx_cpipe_type type); + +#ifdef __cplusplus +} +#endif + +#endif //IPFIXCOL2_CONFIGURATOR_PIPE_H diff --git a/src/core/configurator/instance.hpp b/src/core/configurator/instance.hpp index 89b9242e..51a404bc 100644 --- a/src/core/configurator/instance.hpp +++ b/src/core/configurator/instance.hpp @@ -146,6 +146,17 @@ class ipx_instance { extensions_resolve(ipx_cfg_extensions *ext_mgr) { ext_mgr->update_instance(_ctx); } + + /** + * \brief Enable/disable processing of data messages (IPFIX and Transport Session) + * \note By default, data processing is enabled. + * \see ipx_ctx_processing() for more details + * \param[in] en Enable/disable processing + */ + virtual void + set_processing(bool en) { + ipx_ctx_processing(_ctx, en); + } }; #endif //IPFIXCOL_INSTANCE_H diff --git a/src/core/configurator/instance_input.cpp b/src/core/configurator/instance_input.cpp index 07c484f1..085a28f3 100644 --- a/src/core/configurator/instance_input.cpp +++ b/src/core/configurator/instance_input.cpp @@ -177,4 +177,11 @@ ipx_instance_input::extensions_resolve(ipx_cfg_extensions *ext_mgr) { ext_mgr->update_instance(_ctx); ext_mgr->update_instance(_parser_ctx); +} + +void +ipx_instance_input::set_processing(bool en) +{ + ipx_ctx_processing(_ctx, en); + ipx_ctx_processing(_parser_ctx, en); } \ No newline at end of file diff --git a/src/core/configurator/instance_input.hpp b/src/core/configurator/instance_input.hpp index 9b78f8fe..e024ca16 100644 --- a/src/core/configurator/instance_input.hpp +++ b/src/core/configurator/instance_input.hpp @@ -160,6 +160,15 @@ class ipx_instance_input : public ipx_instance { */ void extensions_resolve(ipx_cfg_extensions *ext_mgr) override; + + /** + * \brief Enable/disable processing of data messages (IPFIX and Transport Session) + * \note By default, data processing is enabled. + * \see ipx_ctx_processing() for more details + * \param[in] en Enable/disable processing + */ + void + set_processing(bool en) override; }; #endif //IPFIXCOL_INSTANCE_INPUT_HPP diff --git a/src/core/configurator/instance_intermediate.hpp b/src/core/configurator/instance_intermediate.hpp index ef02256c..d7987601 100644 --- a/src/core/configurator/instance_intermediate.hpp +++ b/src/core/configurator/instance_intermediate.hpp @@ -152,6 +152,14 @@ class ipx_instance_intermediate : public ipx_instance { * \param[in] intermediate Intermediate plugin to receive our messages */ virtual void connect_to(ipx_instance_intermediate &intermediate); + + /** + * \brief Get the plugin context (read only) + */ + virtual const ipx_ctx_t * + get_ctx() { + return _ctx; + } }; #endif //IPFIXCOL_INSTANCE_INTERMEDIATE_HPP diff --git a/src/core/context.c b/src/core/context.c index 167b5a8b..8123aaa2 100644 --- a/src/core/context.c +++ b/src/core/context.c @@ -492,6 +492,12 @@ ipx_ctx_ext_consumer(ipx_ctx_t *ctx, const char *type, const char *name, ipx_ctx return IPX_OK; } +IPX_API const struct ipx_plugin_info * +ipx_ctx_plugininfo_get(const ipx_ctx_t *ctx) +{ + return ctx->plugin_cbs->info; +} + // ------------------------------------------------------------------------------------------------- /** diff --git a/src/core/context.h b/src/core/context.h index 5e1b418c..d72864a7 100644 --- a/src/core/context.h +++ b/src/core/context.h @@ -264,6 +264,22 @@ ipx_ctx_verb_set(ipx_ctx_t *ctx, enum ipx_verb_level verb); IPX_API int ipx_ctx_term_cnt_set(ipx_ctx_t *ctx, unsigned int cnt); +/** + * \brief Enable/disable data processing + * + * If disabled, plugins are not allowed to process IPFIX and Transport Session messages. + * In case of intermediate and output plugins, these types of messages are dropped on arrival. + * In case of input plugins, the getter function will not be called anymore. + * However, all plugins are still normally processing termination and garbage messages. + * + * \note + * By default, data processing is enabled. + * \param[in] ctx Plugin context + * \param[in] en Enable/disable processing + */ +IPX_API void +ipx_ctx_processing(ipx_ctx_t *ctx, bool en); + /** * \brief Get registered extensions and dependencies * @@ -279,4 +295,12 @@ ipx_ctx_term_cnt_set(ipx_ctx_t *ctx, unsigned int cnt); IPX_API void ipx_ctx_ext_defs(ipx_ctx_t *ctx, struct ipx_ctx_ext **arr, size_t *arr_size); +/** + * @brief Get plugin description (name, version, etc.) + * @param[in] ctx Plugin context + * @return Pointer to the plugin info (loaded directly from the plugin) + */ +IPX_API const struct ipx_plugin_info * +ipx_ctx_plugininfo_get(const ipx_ctx_t *ctx); + #endif // IPFIXCOL_CONTEXT_INTERNAL_H diff --git a/src/core/main.cpp b/src/core/main.cpp index 97487dd4..a0a21fb9 100644 --- a/src/core/main.cpp +++ b/src/core/main.cpp @@ -52,6 +52,7 @@ #include #include "configurator/config_file.hpp" #include "configurator/configurator.hpp" +#include "configurator/controller_file.hpp" extern "C" { #include "verbose.h" @@ -204,7 +205,7 @@ int main(int argc, char *argv[]) const char *ring_size = nullptr; bool daemon_en = false; bool list_only = false; - ipx_configurator conf; + ipx_configurator configurator; // Parse configuration int opt; @@ -230,7 +231,7 @@ int main(int argc, char *argv[]) daemon_en = true; break; case 'p': // Plugin search path - conf.plugins.path_add(std::string(optarg)); + configurator.plugins.path_add(std::string(optarg)); break; case 'e': // Redefine path to Information Elements definition cfg_iedir = optarg; @@ -242,7 +243,7 @@ int main(int argc, char *argv[]) ring_size = optarg; break; case 'u': // Disable automatic plugin unload - conf.plugins.auto_unload(false); + configurator.plugins.auto_unload(false); break; default: // ? std::cerr << "Unknown parameter '" << static_cast(optopt) << "'!" << std::endl; @@ -261,11 +262,11 @@ int main(int argc, char *argv[]) } // Always use the default directory for looking for plugins, but with the lowest priority - conf.plugins.path_add(IPX_DEFAULT_PLUGINS_DIR); - conf.iemgr_set_dir(cfg_iedir); + configurator.plugins.path_add(IPX_DEFAULT_PLUGINS_DIR); + configurator.iemgr_set_dir(cfg_iedir); if (list_only) { - conf.plugins.plugin_list(); + configurator.plugins.plugin_list(); return EXIT_SUCCESS; } @@ -280,7 +281,7 @@ int main(int argc, char *argv[]) } } - if (ring_size != nullptr && ring_size_change(conf, ring_size) != IPX_OK) { + if (ring_size != nullptr && ring_size_change(configurator, ring_size) != IPX_OK) { // Failed to set the size return EXIT_FAILURE; } @@ -290,10 +291,11 @@ int main(int argc, char *argv[]) pid_file = nullptr; // Prevent removing the file } - // Pass control to the parser of the configuration file + // Create a configuration controller and use it to start the collector int rc; try { - rc = ipx_config_file(conf, std::string(cfg_startup)); + ipx_controller_file ctrl_file(cfg_startup); + rc = configurator.run(&ctrl_file); } catch (std::exception &ex) { std::cerr << "An unexpected error has occurred: " << ex.what() << std::endl; return EXIT_FAILURE;