Skip to content

Commit

Permalink
Core: new handling of termination requests [WIP]
Browse files Browse the repository at this point in the history
TBD: disable passing IPFIX and Session messages to plugin instances in context.c
  • Loading branch information
Lukas955 committed Mar 27, 2020
1 parent cbc4418 commit d652fe3
Show file tree
Hide file tree
Showing 14 changed files with 1,405 additions and 91 deletions.
338 changes: 302 additions & 36 deletions src/core/configurator/configurator.cpp

Large diffs are not rendered by default.

137 changes: 91 additions & 46 deletions src/core/configurator/configurator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
* \file src/core/configurator/configurator.hpp
* \author Lukas Hutak <[email protected]>
* \brief Main pipeline configurator (header file)
* \date 2018
* \date 2018-2020
*/

/* Copyright (C) 2018 CESNET, z.s.p.o.
/* Copyright (C) 2018-2020 CESNET, z.s.p.o.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
Expand Down Expand Up @@ -52,33 +52,15 @@
#include "instance_outmgr.hpp"
#include "instance_output.hpp"
#include "plugin_mgr.hpp"
#include "controller.hpp"

extern "C" {
#include <ipfixcol2.h>
#include "../context.h"
}

/** Main configurator of the internal pipeline */
// Main configurator of the internal pipeline
class ipx_configurator {
private:
/** Size of ring buffers */
uint32_t ring_size;
/** Directory with definitions of Information Elements */
std::string iemgr_dir;

/** Manager of Information Elements */
fds_iemgr_t *iemgr;
/** Vector of running instances of input plugins */
std::vector<std::unique_ptr<ipx_instance_input> > running_inputs;
/** Vector of running instances of intermediate plugins */
std::vector<std::unique_ptr<ipx_instance_intermediate> > running_inter;
/** Vector of running instances of output plugins */
std::vector<std::unique_ptr<ipx_instance_output> > running_outputs;

void model_check(const ipx_config_model &model);
fds_iemgr_t *iemgr_load(const std::string dir);
enum ipx_verb_level verbosity_str2level(const std::string &verb);

public:
/** Minimal size of ring buffers between instances of plugins */
static constexpr uint32_t RING_MIN_SIZE = 128;
Expand All @@ -94,37 +76,100 @@ class ipx_configurator {
ipx_configurator(const ipx_configurator &) = delete;
ipx_configurator& operator=(const ipx_configurator &) = delete;

/** Plugin manager */
/** Plugin manager (can be modified by a user i.e. add paths, change behaviour, etc) */
ipx_plugin_mgr plugins;

/**
* \brief Start the configuration with a new model
*
* Check if the model is valid (consists of at least one input and one output instance,
* etc.) and load definitions of Information Elements (i.e. iemgr_set_dir() must be defined).
* Then try to load all plugins, initialized them and, finally, start their execution threads.
*
* \param[in] model Configuration model (description of input/intermediate/output instances)
* \throw runtime_error in case of any error, e.g. failed to find plugins, failed to load
* definitions of Information Elements, failed to initialize an instance, etc.
* @brief Define a path to the directory of Information Elements definitions
* @param[in] path Path
*/
void start(const ipx_config_model &model);

/**
* \brief Stop and destroy all instances of all plugins
*/
void stop();
void
iemgr_set_dir(const std::string &path);
/**
* @brief Define a size of ring buffers
* @param[in] size Size
*/
void
set_buffer_size(uint32_t size);

/**
* \brief Define a path to the directory of Information Elements definitions
* \param[in] path Path
*/
void iemgr_set_dir(const std::string &path);
/**
* \brief Define a size of ring buffers
* \param[in] size Size
* @brief Run the collector based on a configuration from the controller
*
* @note
* Return code depends on the way how the collector was terminated. For example, if the
* termination was caused by invalid configuration of a plugin, memory allocation during
* processing, etc. return code corresponds to EXIT_FAILURE value.
* However, if the collector was terminated by a plugin, which informed the configurator
* that there are no more flow data for processing, return value is EXIT_SUCCESS.
*
* @param[in] ctrl Configuration controller
* @return Return code of the application
*/
void set_buffer_size(uint32_t size);
int
run(ipx_controller *ctrl);

private:
/// State type
enum class STATUS {
RUNNING, ///< No configuration change in progress
STOP_SLOW, ///< Stop stop in progress
STOP_FAST, ///< Fast stop in progress
} m_state; ///< Configuration state

/** Size of ring buffers */
uint32_t m_ring_size;
/** Directory with definitions of Information Elements */
std::string m_iemgr_dir;

/** Manager of Information Elements */
fds_iemgr_t *m_iemgr;
/** Vector of running instances of input plugins */
std::vector<std::unique_ptr<ipx_instance_input> > m_running_inputs;
/** Vector of running instances of intermediate plugins */
std::vector<std::unique_ptr<ipx_instance_intermediate> > m_running_inter;
/** Vector of running instances of output plugins */
std::vector<std::unique_ptr<ipx_instance_output> > m_running_outputs;

// Internal functions
void
model_check(const ipx_config_model &model);
fds_iemgr_t *
iemgr_load(const std::string dir);
enum ipx_verb_level
verbosity_str2level(const std::string &verb);

void
startup(const ipx_config_model &model);
void
cleanup();

bool
termination_handle(const struct ipx_cpipe_req &req, ipx_controller *ctrl);
void
termination_send_msg();

void
termination_stop_all();
void
termination_stop_partly(const ipx_ctx_t *ctx);
};

#endif //IPFIXCOL_CONFIGURATOR_H

///**
// * \brief Start the configuration with a new model
// *
// * Check if the model is valid (consists of at least one input and one output instance,
// * etc.) and load definitions of Information Elements (i.e. iemgr_set_dir() must be defined).
// * Then try to load all plugins, initialized them and, finally, start their execution threads.
// *
// * \param[in] model Configuration model (description of input/intermediate/output instances)
// * \throw runtime_error in case of any error, e.g. failed to find plugins, failed to load
// * definitions of Information Elements, failed to initialize an instance, etc.
// */
//void start(const ipx_config_model &model);
//
///**
// * \brief Stop and destroy all instances of all plugins
// */
//void stop();
153 changes: 153 additions & 0 deletions src/core/configurator/controller.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/**
* @file src/core/configurator/controller.hpp
* @author Lukas Hutak <[email protected]>
* @brief Abstract class for configuration controllers (header file)
* @date 2020
*
* Copyright(c) 2020 CESNET z.s.p.o.
* SPDX-License-Identifier: BSD-3-Clause
*/

#ifndef IPFIXCOL2_CONTROLLER_BASE
#define IPFIXCOL2_CONTROLLER_BASE

#include <string>
#include <stdexcept>

#include "model.hpp"

extern "C" {
#include "../verbose.h"
#include "cpipe.h"
}

/**
* @brief Abstract class for configuration controllers
*
* The purpose of this class is to provide common API for configuration and notification
* about changes in the configuration of the collector. A controller implementation, which
* has to provide at least a configuration model getter, should be implemented as a wrapper over
* configuration stored in a file, database, SysRepo, etc.
*
* @note
* By default, the pipeline configurator, which is using this controller, automatically
* registers termination signal handlers for SIGINT and SIGTERM. If necessary, the controller
* is supposed to register additional handlers in start_before().
*/
class ipx_controller {
public:
/// Forward declaration of a controller error
class error;

/// Configuration status code
enum class OP_STATUS {
SUCCESS, ///< Configuration operation has been successfully applied
FAILED ///< Configuration operation has failed (no changes)
};

/**
* @brief Get configuration model of the collector
*
* The function is called to get the current configuration model during startup
* or reconfiguration procedure (if supported by the controller).
* @note It is guaranteed that this function is called for the first time after start_before()
* function.
* @return Configuration model
* @throw ipx_controller::error in case of failure (e.g. unable to get the model)
*/
virtual ipx_config_model
model_get() = 0;

/**
* @brief Function called before start of the collector
*
* The controller might initialize its internal structures (for example, establish connection
* to a database, configuration repository, etc.) and register additional signal handlers.
* @throw ipx_controller::error in case of failure
*/
virtual void
start_before()
{
IPX_DEBUG(m_name, "Configuration process has started...", '\0');
};

/**
* @brief Function called after start of the collector
* @note
* If startup fails, termination functions i.e. terminate_before() and terminate_after()
* are not called!
* @param[in] status Operation status (success/failure)
* @param[in] msg Human readable result (usually a description of what went wrong)
* @note The function should not throw any exception!
*/
virtual void
start_after(OP_STATUS status, std::string msg)
{
switch (status) {
case OP_STATUS::SUCCESS:
IPX_INFO(m_name, "Collector started successfully!", '\0');
break;
case OP_STATUS::FAILED:
IPX_ERROR(m_name, "Collector failed to start: %s", msg.c_str());
break;
}
};

/**
* @brief Function is called when a termination request is received
*
* The termination process itself cannot be stopped as there might be situations that
* cannot be solved by the controller, such as a memory allocation error in a instance
* of pipeline plugin etc.
*
* @note
* This function might be called multiple times before termination is complete. It can
* happen, for example, if another termination request is received before the termination
* process is complete.
* @note
* The function is called only if the collector has been previously successfully initialized.
* @note The function should not throw any exception!
* @param[in] msg Contains information about the reason/source of termination
*/
virtual void
terminate_on_request(const struct ipx_cpipe_req &req, std::string msg)
{
(void) req;
IPX_INFO(m_name, "Received a termination request (%s)!", msg.c_str());
};

/**
* @brief Function called after termination
*
* All plugin instances have been already stopped and their context has been destroyed.
* The controller might destroy its internal structures here.
* @note The function should not throw any exception!
*/
virtual void
terminate_after()
{
IPX_DEBUG(m_name, "Termination process completed!", '\0');
};

private:
const char *m_name = "Configurator";
};

/**
* @brief Controller custom error
*/
class ipx_controller::error : public std::runtime_error {
public:
/**
* \brief Manager error constructor (string constructor)
* \param[in] msg An error message
*/
explicit error(const std::string &msg) : std::runtime_error(msg) {};
/**
* \brief Manager error constructor (char constructor)
* \param[in] msg An error message
*/
explicit error(const char *msg) : std::runtime_error(msg) {};
};

#endif // IPFIXCOL2_CONTROLLER_BASE
Loading

0 comments on commit d652fe3

Please sign in to comment.