diff --git a/include/CMakeLists.txt b/include/CMakeLists.txt index e14eacfc..bc1eef5e 100644 --- a/include/CMakeLists.txt +++ b/include/CMakeLists.txt @@ -16,6 +16,7 @@ set(SUB_HEADERS ipfixcol2/message_session.h ipfixcol2/plugins.h ipfixcol2/session.h + ipfixcol2/utils.h ipfixcol2/verbose.h "${PROJECT_BINARY_DIR}/include/ipfixcol2/api.h" ) diff --git a/include/ipfixcol2.h b/include/ipfixcol2.h index 6ca3b2f4..d40fefbe 100644 --- a/include/ipfixcol2.h +++ b/include/ipfixcol2.h @@ -63,6 +63,7 @@ #include #include +#include #include diff --git a/include/ipfixcol2/utils.h b/include/ipfixcol2/utils.h new file mode 100644 index 00000000..e65fd143 --- /dev/null +++ b/include/ipfixcol2/utils.h @@ -0,0 +1,70 @@ +/** + * @file include/ipfixcol2/utils.h + * @author Lukas Hutak + * @brief Auxiliary utilities for plugins (header file) + * @date June 2019 + * + * Copyright(c) 2019 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef IPX_UTILS_H +#define IPX_UTILS_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include // mkdir file permissions + +/** +* \defgroup ipxSource Transport session identification +* \ingroup publicAPIs +* \brief Transport session interface +* +* Data types and API functions for identification and management of Transport +* session identification. The Exporting Process uses the Transport Session to +* send messages from multiple _independent_ Observation Domains to the +* Collecting Process. Moreover, in case of SCTP session messages are also send +* over _independent_ streams. +* +* Following structures represents Transport session between Exporting process +* and Collecting Process. However, proper processing of flows also requires +* distinguishing Observation Domain IDs and Stream identifications out of +* scope of these structures. +* +* @{ +*/ + +/** + * @brief Default file permission of newly created directories + * @note Read/write/execute for a user and his group, read/execute for others. + */ +#define IPX_UTILS_MKDIR_DEF (S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH) + +/** + * @brief Create recursively a directory + * + * @note + * File permission @p mode only affects newly created directories. In other + * words, if a directory (or subdirectory) already exists, file permission + * bits @p mode are not applied. + * @note + * The function is implemented as "recursive" wrapper over standard mkdir + * function. See man 3 mkdir for more information. + * @param[in] path Full directory path to create + * @param[in] mode The file permission bits of the new directories + * (see default value #IPX_UTILS_MKDIR_DEF) + * @return #IPX_OK on success + * @return #IPX_ERR_DENIED otherwise and errno is set appropriately. + */ +IPX_API int +ipx_utils_mkdir(const char *path, mode_t mode); + +/**@}*/ + +#ifdef __cplusplus +} +#endif +#endif // IPX_UTILS_H diff --git a/src/core/utils.c b/src/core/utils.c index 56aad98f..1d9049a6 100644 --- a/src/core/utils.c +++ b/src/core/utils.c @@ -43,10 +43,15 @@ #undef _GNU_SOURCE #define _POSIX_C_SOURCE 200809L + +#include +#include +#include #include -#include + #include "utils.h" + int ipx_strerror_fn(int errnum, char *buffer, size_t buffer_size) { @@ -57,4 +62,86 @@ ipx_strerror_fn(int errnum, char *buffer, size_t buffer_size) snprintf(buffer, buffer_size, "strerror_r() failed: Unable process error code %d!", errnum); return IPX_ERR_ARG; -} \ No newline at end of file +} + +int +ipx_utils_mkdir(const char *path, mode_t mode) +{ + const char ch_slash = '/'; + bool add_slash = false; + + // Check the parameter + size_t len = strlen(path); + if (path[len - 1] != ch_slash) { + len++; // We have to add another slash + add_slash = true; + } + + if (len > PATH_MAX - 1) { + errno = ENAMETOOLONG; + return IPX_ERR_DENIED; + } + + // Make a copy + char *path_cpy = malloc((len + 1) * sizeof(char)); // +1 for '\0' + if (!path_cpy) { + errno = ENOMEM; + return IPX_ERR_DENIED; + } + + strcpy(path_cpy, path); + if (add_slash) { + path_cpy[len - 1] = ch_slash; + path_cpy[len] = '\0'; + } + + struct stat info; + char *pos; + + // Create directories from the beginning + for (pos = path_cpy + 1; *pos; pos++) { + // Find a slash + if (*pos != ch_slash) { + continue; + } + + *pos = '\0'; // Temporarily truncate pathname + + // Check if a subdirectory exists + if (stat(path_cpy, &info) == 0) { + // Check if the "info" is about directory + if (!S_ISDIR(info.st_mode)) { + free(path_cpy); + errno = ENOTDIR; + return IPX_ERR_DENIED; + } + + // Fix the pathname and continue with the next subdirectory + *pos = ch_slash; + continue; + } + + // Errno is filled by stat() + if (errno != ENOENT) { + int errno_cpy = errno; + free(path_cpy); + errno = errno_cpy; + return IPX_ERR_DENIED; + } + + // Required directory doesn't exist -> create new one + if (mkdir(path_cpy, mode) != 0 && errno != EEXIST) { + // Failed (by the way, EEXIST because of race condition i.e. + // multiple applications creating the same folder) + int errno_cpy = errno; + free(path_cpy); + errno = errno_cpy; + return IPX_ERR_DENIED; + } + + *pos = ch_slash; + } + + free(path_cpy); + return IPX_OK; +} diff --git a/src/plugins/output/fds/CMakeLists.txt b/src/plugins/output/fds/CMakeLists.txt new file mode 100644 index 00000000..2c2a6a83 --- /dev/null +++ b/src/plugins/output/fds/CMakeLists.txt @@ -0,0 +1,31 @@ +# Create a linkable module +add_library(fds-output MODULE + src/Config.cpp + src/Config.hpp + src/Exception.hpp + src/fds.cpp + src/Storage.cpp + src/Storage.hpp +) + +install( + TARGETS fds-output + LIBRARY DESTINATION "${INSTALL_DIR_LIB}/ipfixcol2/" +) + +if (ENABLE_DOC_MANPAGE) + # Build a manual page + set(SRC_FILE "${CMAKE_CURRENT_SOURCE_DIR}/doc/ipfixcol2-fds-output.7.rst") + set(DST_FILE "${CMAKE_CURRENT_BINARY_DIR}/ipfixcol2-fds-output.7") + + add_custom_command(TARGET fds-output PRE_BUILD + COMMAND ${RST2MAN_EXECUTABLE} --syntax-highlight=none ${SRC_FILE} ${DST_FILE} + DEPENDS ${SRC_FILE} + VERBATIM + ) + + install( + FILES "${DST_FILE}" + DESTINATION "${INSTALL_DIR_MAN}/man7" + ) +endif() diff --git a/src/plugins/output/fds/README.rst b/src/plugins/output/fds/README.rst new file mode 100644 index 00000000..09b57efe --- /dev/null +++ b/src/plugins/output/fds/README.rst @@ -0,0 +1,69 @@ +Flow Data Storage (output plugin) +================================= + +The plugin converts and stores IPFIX Data Records into FDS file format. The file +is based on IPFIX, therefore, it provides highly-effective way for long-term +storage and stores complete flow records (including all Enterprise-specific +fields, biflow, etc.) together with identification of the flow exporters who +exported these records. + +All data are stored into flat files, which are automatically rotated and renamed +every N minutes (by default 5 minutes). + +Example configuration +--------------------- + +.. code-block:: xml + + + FDS output + fds + + /tmp/ipfixcol2/fds/ + none + + 300 + yes + + + + +Parameters +---------- + +:``storagePath``: + The path element specifies the storage directory for data files. Keep on + mind that the path must exist in your system. Otherwise, no files are stored. + All files will be stored based on the configuration using the following + template: ``/YYYY/MM/DD/flows..fds`` where ``YYYY/MM/DD`` + means year/month/day and ```` represents a UTC timestamp in + format ``YYMMDDhhmmss``. + +:``compression``: + Data compression helps to significantly reduce size of output files. + Following compression algorithms are available: + + :``none``: Compression disabled [default] + :``lz4``: LZ4 compression (very fast, slightly worse compression ration) + :``zstd``: ZSTD compression (slightly slower, good compression ration) + +:``dumpInterval``: + Configuration of output files rotation. + + :``timeWindow``: + Specifies time interval in seconds to rotate files i.e. close the current + file and create a new one. [default: 300] + + :``align``: + Align file rotation with next N minute interval. For example, if enabled + and window size is 5 minutes long, files will be created at 0, 5, 10, etc. + [values: yes/no, default: yes] + +:``asyncIO``: + Allows to use asynchronous I/O for writing to the file. Usually when parts + of the file are being written, the process is blocked on synchronous I/O + and waits for the operation to complete. However, asynchronous I/O allows + the plugin to simultaneously write to file and process flow records, which + significantly improves overall performance. (Note: a pool of service + threads shared among instances of FDS plugin might be created). + [values: true/false, default: true] diff --git a/src/plugins/output/fds/doc/ipfixcol2-fds-output.7.rst b/src/plugins/output/fds/doc/ipfixcol2-fds-output.7.rst new file mode 100644 index 00000000..a978730e --- /dev/null +++ b/src/plugins/output/fds/doc/ipfixcol2-fds-output.7.rst @@ -0,0 +1,20 @@ +====================== + ipfixcol2-fds-output +====================== + +--------------------------------- +Flow Data Storage (output plugin) +--------------------------------- + +:Author: Lukáš Huták (lukas.hutak@cesnet.cz) +:Date: 2019-07-01 +:Copyright: Copyright © 2019 CESNET, z.s.p.o. +:Version: 2.0 +:Manual section: 7 +:Manual group: IPFIXcol collector + +Description +----------- + +.. include:: ../README.rst + :start-line: 3 diff --git a/src/plugins/output/fds/src/Config.cpp b/src/plugins/output/fds/src/Config.cpp new file mode 100644 index 00000000..a06d61e2 --- /dev/null +++ b/src/plugins/output/fds/src/Config.cpp @@ -0,0 +1,190 @@ +/** + * \file src/plugins/output/fds/src/Config.cpp + * \author Lukas Hutak + * \brief Parser of XML configuration (source file) + * \date 2019 + * + * Copyright(c) 2019 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "Config.hpp" +#include +#include + +/* + * + * ... + * ... + * + * ... + * ... + * + * ... + * + */ + +/// XML nodes +enum params_xml_nodes { + NODE_STORAGE = 1, + NODE_COMPRESS, + NODE_DUMP, + NODE_ASYNCIO, + + DUMP_WINDOW, + DUMP_ALIGN +}; + +/// Definition of the \ node +static const struct fds_xml_args args_dump[] = { + FDS_OPTS_ELEM(DUMP_WINDOW, "timeWindow", FDS_OPTS_T_UINT, FDS_OPTS_P_OPT), + FDS_OPTS_ELEM(DUMP_ALIGN, "align", FDS_OPTS_T_BOOL, FDS_OPTS_P_OPT), + FDS_OPTS_END +}; + +/// Definition of the \ node +static const struct fds_xml_args args_params[] = { + FDS_OPTS_ROOT("params"), + FDS_OPTS_ELEM(NODE_STORAGE, "storagePath", FDS_OPTS_T_STRING, 0), + FDS_OPTS_ELEM(NODE_COMPRESS, "compression", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), + FDS_OPTS_NESTED(NODE_DUMP, "dumpInterval", args_dump, FDS_OPTS_P_OPT), + FDS_OPTS_ELEM(NODE_ASYNCIO, "asyncIO", FDS_OPTS_T_BOOL, FDS_OPTS_P_OPT), + FDS_OPTS_END +}; + +Config::Config(const char *params) +{ + set_default(); + + // Create XML parser + std::unique_ptr xml(fds_xml_create(), &fds_xml_destroy); + if (!xml) { + throw std::runtime_error("Failed to create an XML parser!"); + } + + if (fds_xml_set_args(xml.get(), args_params) != FDS_OK) { + throw std::runtime_error("Failed to parse the description of an XML document!"); + } + + fds_xml_ctx_t *params_ctx = fds_xml_parse_mem(xml.get(), params, true); + if (!params_ctx) { + std::string err = fds_xml_last_err(xml.get()); + throw std::runtime_error("Failed to parse the configuration: " + err); + } + + // Parse parameters and check configuration + try { + parse_root(params_ctx); + validate(); + } catch (std::exception &ex) { + throw std::runtime_error("Failed to parse the configuration: " + std::string(ex.what())); + } +} + +/** + * @brief Set default parameters + */ +void +Config::set_default() +{ + m_path.clear(); + m_calg = calg::NONE; + m_async = true; + + m_window.align = true; + m_window.size = WINDOW_SIZE; +} + +/** + * @brief Check if the configuration is valid + * @throw runtime_error if the configuration breaks some rules + */ +void +Config::validate() +{ + if (m_path.empty()) { + throw std::runtime_error("Storage path cannot be empty!"); + } + + if (m_window.size == 0) { + throw std::runtime_error("Window size cannot be zero!"); + } +} + +/** + * @brief Process \ node + * @param[in] ctx XML context to process + * @throw runtime_error if the parser fails + */ +void +Config::parse_root(fds_xml_ctx_t *ctx) +{ + const struct fds_xml_cont *content; + while (fds_xml_next(ctx, &content) != FDS_EOC) { + switch (content->id) { + case NODE_STORAGE: + // Storage path + assert(content->type == FDS_OPTS_T_STRING); + m_path = content->ptr_string; + break; + case NODE_COMPRESS: + // Compression method + assert(content->type == FDS_OPTS_T_STRING); + if (strcasecmp(content->ptr_string, "none") == 0) { + m_calg = calg::NONE; + } else if (strcasecmp(content->ptr_string, "lz4") == 0) { + m_calg = calg::LZ4; + } else if (strcasecmp(content->ptr_string, "zstd") == 0) { + m_calg = calg::ZSTD; + } else { + const std::string inv_str = content->ptr_string; + throw std::runtime_error("Unknown compression algorithm '" + inv_str + "'"); + } + break; + case NODE_ASYNCIO: + // Asynchronous I/O + assert(content->type == FDS_OPTS_T_BOOL); + m_async = content->val_bool; + break; + case NODE_DUMP: + // Dump window + assert(content->type == FDS_OPTS_T_CONTEXT); + parse_dump(content->ptr_ctx); + break; + default: + // Internal error + throw std::runtime_error("Unknown XML node"); + } + } +} + +/** + * @brief Auxiliary function for parsing \ options + * @param[in] ctx XML context to process + * @throw runtime_error if the parser fails + */ +void +Config::parse_dump(fds_xml_ctx_t *ctx) +{ + const struct fds_xml_cont *content; + while(fds_xml_next(ctx, &content) != FDS_EOC) { + switch (content->id) { + case DUMP_WINDOW: + // Window size + assert(content->type == FDS_OPTS_T_UINT); + if (content->val_uint > UINT32_MAX) { + throw std::runtime_error("Window size is too long!"); + } + m_window.size = static_cast(content->val_uint); + break; + case DUMP_ALIGN: + // Window alignment + assert(content->type == FDS_OPTS_T_BOOL); + m_window.align = content->val_bool; + break; + default: + // Internal error + throw std::runtime_error("Unknown XML node"); + } + } +} \ No newline at end of file diff --git a/src/plugins/output/fds/src/Config.hpp b/src/plugins/output/fds/src/Config.hpp new file mode 100644 index 00000000..876ef466 --- /dev/null +++ b/src/plugins/output/fds/src/Config.hpp @@ -0,0 +1,64 @@ +/** + * \file src/plugins/output/fds/src/Config.hpp + * \author Lukas Hutak + * \brief Parser of XML configuration (header file) + * \date 2019 + * + * Copyright(c) 2019 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef IPFIXCOL2_FDS_CONFIG_HPP +#define IPFIXCOL2_FDS_CONFIG_HPP + +#include +#include + +/** + * @brief Plugin configuration parser + */ +class Config { +public: + /** + * @brief Parse configuration of the plugin + * @param[in] params XML parameters to parse + * @throw runtime_exception on error + */ + Config(const char *params); + ~Config() = default; + + enum class calg { + NONE, ///< Do not use compression + LZ4, ///< LZ4 compression + ZSTD ///< ZSTD compression + }; + + /// Storage path + std::string m_path; + /// Compression algorithm + calg m_calg; + /// Asynchronous I/O enabled + bool m_async; + + struct { + bool align; ///< Enable/disable window alignment + uint32_t size; ///< Time window size + } m_window; ///< Window alignment + +private: + /// Default window size + static const uint32_t WINDOW_SIZE = 300U; + + void + set_default(); + void + validate(); + + void + parse_root(fds_xml_ctx_t *ctx); + void + parse_dump(fds_xml_ctx_t *ctx); +}; + + +#endif // IPFIXCOL2_FDS_CONFIG_HPP diff --git a/src/plugins/output/fds/src/Exception.hpp b/src/plugins/output/fds/src/Exception.hpp new file mode 100644 index 00000000..f5325c5f --- /dev/null +++ b/src/plugins/output/fds/src/Exception.hpp @@ -0,0 +1,34 @@ +/** + * \file src/plugins/output/fds/src/Exception.hpp + * \author Lukas Hutak + * \brief Plugin specific exception (header file) + * \date 2019 + * + * Copyright(c) 2019 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef IPFIXCOL2_FDS_EXCEPTION_HPP +#define IPFIXCOL2_FDS_EXCEPTION_HPP + +#include +#include + +/// Plugin specific exception +class FDS_exception : public std::runtime_error { +public: + /** + * @brief Constructor + * @param[in] str Error message + */ + FDS_exception(const std::string &str) : std::runtime_error(str) {}; + /** + * @brief Constructor + * @param[in] str Error message + */ + FDS_exception(const char *str) : std::runtime_error(str) {}; + // Default destructor + ~FDS_exception() = default; +}; + +#endif //IPFIXCOL2_FDS_EXCEPTION_HPP diff --git a/src/plugins/output/fds/src/Storage.cpp b/src/plugins/output/fds/src/Storage.cpp new file mode 100644 index 00000000..30db3d25 --- /dev/null +++ b/src/plugins/output/fds/src/Storage.cpp @@ -0,0 +1,425 @@ +/** + * \file src/plugins/output/fds/src/Storage.cpp + * \author Lukas Hutak + * \brief FDS file storage (source file) + * \date June 2019 + * + * Copyright(c) 2019 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include +#include +#include +#include +#include +#include +#include +#include "Storage.hpp" + +Storage::Storage(ipx_ctx_t *ctx, const Config &cfg) : m_ctx(ctx), m_path(cfg.m_path) +{ + // Check if the directory exists + struct stat file_info; + memset(&file_info, 0, sizeof(file_info)); + if (stat(m_path.c_str(), &file_info) != 0 || !S_ISDIR(file_info.st_mode)) { + throw FDS_exception("Directory '" + m_path + "' doesn't exist or search permission is denied"); + } + + // Prepare flags for FDS file + m_flags = 0; + switch (cfg.m_calg) { + case Config::calg::LZ4: + m_flags |= FDS_FILE_LZ4; + break; + case Config::calg::ZSTD: + m_flags |= FDS_FILE_ZSTD; + break; + default: + break; + } + + if (!cfg.m_async) { + m_flags |= FDS_FILE_NOASYNC; + } + + m_flags |= FDS_FILE_APPEND; +} + +void +Storage::window_new(time_t ts) +{ + // Close the current window if exists + window_close(); + + // Open new file + const std::string new_file = filename_gen(ts); + std::unique_ptr new_file_cpy(strdup(new_file.c_str()), &free); + + char *dir2create; + if (!new_file_cpy || (dir2create = dirname(new_file_cpy.get())) == nullptr) { + throw FDS_exception("Failed to generate name of an output directory!"); + } + + if (ipx_utils_mkdir(dir2create, IPX_UTILS_MKDIR_DEF) != FDS_OK) { + throw FDS_exception("Failed to create directory '" + std::string(dir2create) + "'"); + } + + m_file.reset(fds_file_init()); + if (!m_file) { + throw FDS_exception("Failed to create FDS file handler!"); + } + + if (fds_file_open(m_file.get(), new_file.c_str(), m_flags) != FDS_OK) { + std::string err_msg = fds_file_error(m_file.get()); + m_file.reset(); + throw FDS_exception("Failed to create/append file '" + new_file + "': " + err_msg); + } +} + +void +Storage::window_close() +{ + m_file.reset(); + m_session2params.clear(); +} + +void +Storage::process_msg(ipx_msg_ipfix_t *msg) +{ + if (!m_file) { + IPX_CTX_DEBUG(m_ctx, "Ignoring IPFIX Message due to undefined output file!", '\0'); + return; + } + + // Specify a Transport Session context + struct ipx_msg_ctx *msg_ctx = ipx_msg_ipfix_get_ctx(msg); + session_ctx &file_ctx = session_get(msg_ctx->session); + + auto hdr_ptr = reinterpret_cast(ipx_msg_ipfix_get_packet(msg)); + assert(ntohs(hdr_ptr->version) == FDS_IPFIX_VERSION && "Unexpected packet version"); + const uint32_t exp_time = ntohl(hdr_ptr->export_time); + + if (fds_file_write_ctx(m_file.get(), file_ctx.id, msg_ctx->odid, exp_time) != FDS_OK) { + const char *err_msg = fds_file_error(m_file.get()); + throw FDS_exception("Failed to configure the writer: " + std::string(err_msg)); + } + + // Get info about the last seen Template snapshot + struct snap_info &snap_last = file_ctx.odid2snap[msg_ctx->odid]; + + // For each Data Record in the file + const uint32_t rec_cnt = ipx_msg_ipfix_get_drec_cnt(msg); + for (uint32_t i = 0; i < rec_cnt; ++i) { + ipx_ipfix_record *rec_ptr = ipx_msg_ipfix_get_drec(msg, i); + + // Check if the templates has been changed (detected by change of template snapshots) + if (rec_ptr->rec.snap != snap_last.ptr) { + const char *session_name = msg_ctx->session->ident; + uint32_t session_odid = msg_ctx->odid; + IPX_CTX_DEBUG(m_ctx, "Template snapshot of '%s' [ODID %" PRIu32 "] has been changed. " + "Updating template definitions...", session_name, session_odid); + + tmplts_update(snap_last, rec_ptr->rec.snap); + } + + // Write the Data Record + const uint8_t *rec_data = rec_ptr->rec.data; + uint16_t rec_size = rec_ptr->rec.size; + uint16_t tmplt_id = rec_ptr->rec.tmplt->id; + + if (fds_file_write_rec(m_file.get(), tmplt_id, rec_data, rec_size) != FDS_OK) { + const char *err_msg = fds_file_error(m_file.get()); + throw FDS_exception("Failed to add a Data Record: " + std::string(err_msg)); + } + } +} + +/// Auxiliary data structure used in the snapshot iterator +struct tmplt_update_data { + /// Status of template processing + bool is_ok; + /// Plugin context (only for log!) + ipx_ctx_t *ctx; + + /// FDS file with specified context + fds_file_t *file; + /// Set of processed Templates in the snapshot + std::set ids; +}; + +/** + * @brief Callback function for updating definition of an IPFIX (Options) Template + * + * The function checks if the same Template is already defined in the current context of the file. + * If the Template is not present or it's different, the new Template definition is added to the + * file. + * @param[in] tmplt Template to process + * @param[in] data Auxiliary data structure \ref tmplt_update_data + * @return On success returns true. Otherwise returns false. + */ +static bool +tmplt_update_cb(const struct fds_template *tmplt, void *data) +{ + // Template type, raw data and size + enum fds_template_type t_type; + const uint8_t *t_data; + uint16_t t_size; + + auto info = reinterpret_cast(data); + + // No exceptions can be thrown in the C callback! + try { + uint16_t t_id = tmplt->id; + info->ids.emplace(t_id); + + // Get definition of the Template specified in the file + int res = fds_file_write_tmplt_get(info->file, t_id, &t_type, &t_data, &t_size); + + if (res != FDS_OK && res != FDS_ERR_NOTFOUND) { + // Something bad happened + const char *err_msg = fds_file_error(info->file); + throw FDS_exception("fds_file_write_tmplt_get() failed: " + std::string(err_msg)); + } + + // Should we add/redefine the definition of the Template + if (res == FDS_OK + && tmplt->type == t_type + && tmplt->raw.length == t_size + && memcmp(tmplt->raw.data, t_data, t_size) == 0) { + // The same -> nothing to do + return info->is_ok; + } + + // Add the definition (i.e. templates are different or the template hasn't been defined) + IPX_CTX_DEBUG(info->ctx, "Adding/updating definition of Template ID %" PRIu16, t_id); + + t_type = tmplt->type; + t_data = tmplt->raw.data; + t_size = tmplt->raw.length; + + if (fds_file_write_tmplt_add(info->file, t_type, t_data, t_size) != FDS_OK) { + const char *err_msg = fds_file_error(info->file); + throw FDS_exception("fds_file_write_tmplt_add() failed: " + std::string(err_msg)); + } + + } catch (std::exception &ex) { + // Exceptions + IPX_CTX_ERROR(info->ctx, "Failure during update of Template ID %" PRIu16 ": %s", tmplt->id, + ex.what()); + info->is_ok = false; + } catch (...) { + // Other exceptions + IPX_CTX_ERROR(info->ctx, "Unknown exception thrown during template definition update", '\0'); + info->is_ok = false; + } + + return info->is_ok; +} + +/** + * @brief Update Template definitions for the current Transport Session and ODID + * + * The function compares Templates in the \p snap with Template definitions previously defined + * for the currently selected combination of Transport Session and ODID. For each different or + * previously undefined Template, its definition is added or updated. Definitions of Templates + * that were available in the previous snapshot but not available in the new one are removed. + * + * Finally, information (pointer, IDs) in \p info are updated to reflect the performed update. + * @warning + * Template definitions are always unique for a combination of Transport Session and ODID, + * therefore, appropriate file writer context MUST already set using fds_file_writer_ctx(). + * Parameters \p info and \p snap MUST also belong the same unique combination. + * @param[in] info Information about the last update of Templates (old snapshot ref. + list of IDs) + * @param[in] snap New Template snapshot with all valid Template definitions + */ +void +Storage::tmplts_update(struct snap_info &info, const fds_tsnapshot_t *snap) +{ + assert(info.ptr != snap && "Snapshots should be different"); + + // Prepare data for the callback function + struct tmplt_update_data data; + data.is_ok = true; + data.ctx = m_ctx; + data.file = m_file.get(); + data.ids.clear(); + + // Update templates + fds_tsnapshot_for(snap, &tmplt_update_cb, &data); + + // Check if the update failed + if (!data.is_ok) { + throw FDS_exception("Failed to update Template definitions"); + } + + // Check if there are any Template IDs that have been removed + std::set &ids_old = info.tmplt_ids; + std::set &ids_new = data.ids; + std::set ids2remove; + // Old Template IDs - New Templates IDs = Template IDs to remove + std::set_difference(ids_old.begin(), ids_old.end(), ids_new.begin(), ids_new.end(), + std::inserter(ids2remove, ids2remove.begin())); + + // Remove old templates that are not available in the new snapshot + for (uint16_t tid : ids2remove) { + IPX_CTX_DEBUG(m_ctx, "Removing definition of Template ID %" PRIu16, tid); + + int rc = fds_file_write_tmplt_remove(m_file.get(), tid); + if (rc == FDS_OK) { + continue; + } + + // Something bad happened + if (rc != FDS_ERR_NOTFOUND) { + std::string err_msg = fds_file_error(m_file.get()); + throw FDS_exception("fds_file_write_tmplt_remove() failed: " + err_msg); + } + + // Weird, but not critical + IPX_CTX_WARNING(m_ctx, "Failed to remove undefined Template ID %" PRIu16 ". " + "Weird, this should not happen.", tid); + } + + // Update information about the last update of Templates + info.ptr = snap; + std::swap(info.tmplt_ids, data.ids); +} + +/** + * @brief Create a filename based for a user defined timestamp + * @note The timestamp will be expressed in Coordinated Universal Time (UTC) + * + * @param[in] ts Timestamp of the file + * @return New filename + * @throw FDS_exception if formatting functions fail. + */ +std::string +Storage::filename_gen(const time_t &ts) +{ + const char pattern[] = "%Y/%m/%d/flows.%Y%m%d%H%M%S.fds"; + constexpr size_t buffer_size = 64; + char buffer_data[buffer_size]; + + struct tm utc_time; + if (!gmtime_r(&ts, &utc_time)) { + throw FDS_exception("gmtime_r() failed"); + } + + if (strftime(buffer_data, buffer_size, pattern, &utc_time) == 0) { + throw FDS_exception("strftime() failed"); + } + + std::string new_path = m_path; + if (new_path.back() != '/') { + new_path += '/'; + } + + return new_path + buffer_data; +} + +/** + * @brief Convert IPv4 address to IPv4-mapped IPv6 address + * + * New IPv6 address has value corresponding to "::FFFF:\" + * @warning Size of @p in must be at least 4 bytes and @p out at least 16 bytes! + * @param[in] in IPv4 address + * @param[out] out IPv4-mapped IPv6 address + */ +void +Storage::ipv4toipv6(const uint8_t *in, uint8_t *out) +{ + memset(out, 0, 16U); + *(uint16_t *) &out[10] = UINT16_MAX; // 0xFFFF + memcpy(&out[12], in, 4U); // Copy IPv4 address +} + +/** + * @brief Get file identification of a Transport Session + * + * If the identification doesn't exist, the function will add it to the file and create + * a new internal record for it. + * + * @param[in] sptr Transport Session to find + * @return Internal description + * @throw FDS_exception if the function failed to add a new Transport Session + */ +struct Storage::session_ctx & +Storage::session_get(const struct ipx_session *sptr) +{ + auto res_it = m_session2params.find(sptr); + if (res_it != m_session2params.end()) { + // Found + return res_it->second; + } + + // Not found -> register a new session + assert(m_file != nullptr && "File must be opened!"); + + struct fds_file_session new_session; + fds_file_sid_t new_sid; + + session_ipx2fds(sptr, &new_session); + if (fds_file_session_add(m_file.get(), &new_session, &new_sid) != FDS_OK) { + const char *err_msg = fds_file_error(m_file.get()); + throw FDS_exception("Failed to register Transport Session '" + std::string(sptr->ident) + + "': " + err_msg); + } + + // Create a new session + struct session_ctx &ctx = m_session2params[sptr]; + ctx.id = new_sid; + return ctx; +} + +/** + * @brief Convert IPFIXcol representation of a Transport Session to FDS representation + * @param[in] ipx_desc IPFIXcol specific representation + * @param[out] fds_desc FDS specific representation + * @throw FDS_exception if conversion fails due to unsupported Transport Session type + */ +void +Storage::session_ipx2fds(const struct ipx_session *ipx_desc, struct fds_file_session *fds_desc) +{ + // Initialize Transport Session structure + memset(fds_desc, 0, sizeof *fds_desc); + + // Extract protocol type and description + const struct ipx_session_net *net_desc = nullptr; + switch (ipx_desc->type) { + case FDS_SESSION_UDP: + net_desc = &ipx_desc->udp.net; + fds_desc->proto = FDS_FILE_SESSION_UDP; + break; + case FDS_SESSION_TCP: + net_desc = &ipx_desc->tcp.net; + fds_desc->proto = FDS_FILE_SESSION_TCP; + break; + case FDS_SESSION_SCTP: + net_desc = &ipx_desc->sctp.net; + fds_desc->proto = FDS_FILE_SESSION_SCTP; + break; + default: + throw FDS_exception("Not implemented Transport Session type!"); + } + + // Convert ports + fds_desc->port_src = net_desc->port_src; + fds_desc->port_dst = net_desc->port_dst; + + // Convert IP addresses + if (net_desc->l3_proto == AF_INET) { + // IPv4 address + static_assert(sizeof(net_desc->addr_src.ipv4) >= 4U, "Invalid size"); + static_assert(sizeof(net_desc->addr_dst.ipv4) >= 4U, "Invalid size"); + ipv4toipv6(reinterpret_cast(&net_desc->addr_src.ipv4), fds_desc->ip_src); + ipv4toipv6(reinterpret_cast(&net_desc->addr_dst.ipv4), fds_desc->ip_dst); + } else { + // IPv6 address + static_assert(sizeof(fds_desc->ip_src) <= sizeof(net_desc->addr_src.ipv6), "Invalid size"); + static_assert(sizeof(fds_desc->ip_dst) <= sizeof(net_desc->addr_dst.ipv6), "Invalid size"); + memcpy(&fds_desc->ip_src, &net_desc->addr_src.ipv6, sizeof fds_desc->ip_src); + memcpy(&fds_desc->ip_dst, &net_desc->addr_dst.ipv6, sizeof fds_desc->ip_dst); + } +} + diff --git a/src/plugins/output/fds/src/Storage.hpp b/src/plugins/output/fds/src/Storage.hpp new file mode 100644 index 00000000..29b0528e --- /dev/null +++ b/src/plugins/output/fds/src/Storage.hpp @@ -0,0 +1,124 @@ +/** + * \file src/plugins/output/fds/src/Storage.hpp + * \author Lukas Hutak + * \brief FDS file storage (header file) + * \date June 2019 + * + * Copyright(c) 2019 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef IPFIXCOL2_FDS_STORAGE_HPP +#define IPFIXCOL2_FDS_STORAGE_HPP + +#include +#include +#include +#include +#include +#include + +#include "Exception.hpp" +#include "Config.hpp" + +/// Flow storage file +class Storage { +public: + /** + * @brief Create a flow storage file + * + * @note + * Output file for the current window MUST be specified using new_window() function. + * Otherwise, no flow records are stored. + * + * @param[in] ctx Plugin context (only for log) + * @param[in] cfg Configuration + * @throw FDS_exception if @p path directory doesn't exist in the system + */ + Storage(ipx_ctx_t *ctx, const Config &cfg); + virtual ~Storage() = default; + + // Disable copy constructors + Storage(const Storage &other) = delete; + Storage &operator=(const Storage &other) = delete; + + /** + * @brief Create a new time window + * + * @note Previous window is automatically closed, if exists. + * @param[in] ts Timestamp of the window + * @throw FDS_exception if the new window cannot be created + */ + void + window_new(time_t ts); + + /** + * @brief Close the current time window + * @note + * This can be also useful if a fatal error has occurred and we should not add more flow + * records to the file. + * @note + * No more Data Records will be added until a new window is created! + */ + void + window_close(); + + /** + * @brief Process IPFIX message + * + * Process all IPFIX Data Records in the message and store them to the file. + * @note If a time window is not opened, no Data Records are stored and no exception is thrown. + * @param[in] msg Message to process + * @throw FDS_exception if processing fails + */ + void + process_msg(ipx_msg_ipfix_t *msg); + +private: + /// Information about Templates in a snapshot + struct snap_info { + /// Last seen snapshot (might be already freed, do NOT dereference!) + const fds_tsnapshot_t *ptr; + /// Set of Template IDs in the snapshot + std::set tmplt_ids; + + snap_info() { + ptr = nullptr; + tmplt_ids.clear(); + } + }; + + /// Description parameters of a Transport Session + struct session_ctx { + /// Session ID used in the FDS file + fds_file_sid_t id; + /// Last seen snapshot for a specific ODID of the Transport Session + std::map odid2snap; + }; + + /// Plugin context only for logging! + ipx_ctx_t *m_ctx; + /// Storage path + std::string m_path; + /// Flags for opening file + uint32_t m_flags; + + /// Output FDS file + std::unique_ptr m_file = {nullptr, &fds_file_close}; + /// Mapping of Transport Sessions to FDS specific parameters + std::map m_session2params; + + std::string + filename_gen(const time_t &ts); + static void + ipv4toipv6(const uint8_t *in, uint8_t *out); + struct session_ctx & + session_get(const struct ipx_session *sptr); + void + session_ipx2fds(const struct ipx_session *ipx_desc, struct fds_file_session *fds_desc); + void + tmplts_update(struct snap_info &info, const fds_tsnapshot_t *snap); +}; + + +#endif // IPFIXCOL2_FDS_STORAGE_HPP diff --git a/src/plugins/output/fds/src/fds.cpp b/src/plugins/output/fds/src/fds.cpp new file mode 100644 index 00000000..292e17ab --- /dev/null +++ b/src/plugins/output/fds/src/fds.cpp @@ -0,0 +1,135 @@ +/** + * \file src/plugins/output/dummy/dummy.c + * \author Lukas Hutak + * \brief Example output plugin for IPFIXcol 2 + * \date 2018 + * + * Copyright(c) 2019 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include +#include +#include +#include +#include + +#include "Config.hpp" +#include "Storage.hpp" + +/// Plugin description +IPX_API struct ipx_plugin_info ipx_plugin_info = { + // Plugin identification name + "fds", + // Brief description of plugin + "Flow Data Storage output plugin", + // Plugin type + IPX_PT_OUTPUT, + // Configuration flags (reserved for future use) + 0, + // Plugin version string (like "1.2.3") + "2.0.0", + // Minimal IPFIXcol version string (like "1.2.3") + "2.1.0" +}; + +/// Instance +struct Instance { + /// Parsed configuration + std::unique_ptr config_ptr = nullptr; + /// Storage file + std::unique_ptr storage_ptr = nullptr; + /// Start of the current window + time_t window_start = 0; +}; + +static void +window_check(struct Instance &inst) +{ + const Config &cfg = *inst.config_ptr; + + // Decide whether close file and create a new time window + time_t now = time(NULL); + if (difftime(now, inst.window_start) < cfg.m_window.size) { + // Nothing to do + return; + } + + if (cfg.m_window.align) { + const uint32_t window_size = cfg.m_window.size; + now /= window_size; + now *= window_size; + } + + inst.window_start = now; + inst.storage_ptr->window_new(now); +} + +int +ipx_plugin_init(ipx_ctx_t *ctx, const char *params) +{ + try { + // Parse configuration, try to create a storage and time window + std::unique_ptr instance(new Instance); + instance->config_ptr.reset(new Config(params)); + instance->storage_ptr.reset(new Storage(ctx, *instance->config_ptr)); + window_check(*instance); + // Everything seems OK + ipx_ctx_private_set(ctx, instance.release()); + } catch (const FDS_exception &ex) { + IPX_CTX_ERROR(ctx, "Initialization failed: %s", ex.what()); + return IPX_ERR_DENIED; + } catch (...) { + IPX_CTX_ERROR(ctx, "Unknown error has occurred!", '\0'); + return IPX_ERR_DENIED; + } + + return IPX_OK; +} + +void +ipx_plugin_destroy(ipx_ctx_t *ctx, void *cfg) +{ + (void) ctx; // Suppress warnings + + try { + auto inst = reinterpret_cast(cfg); + inst->storage_ptr.reset(); + inst->config_ptr.reset(); + delete inst; + } catch (...) { + IPX_CTX_ERROR(ctx, "Something bad happened during plugin destruction"); + } +} + +int +ipx_plugin_process(ipx_ctx_t *ctx, void *cfg, ipx_msg_t *msg) +{ + auto *inst = reinterpret_cast(cfg); + bool failed = false; + + try { + // Check if the current time window should be closed + window_check(*inst); + ipx_msg_ipfix_t *msg_ipfix = ipx_msg_base2ipfix(msg); + inst->storage_ptr->process_msg(msg_ipfix); + } catch (const FDS_exception &ex) { + IPX_CTX_ERROR(ctx, "%s", ex.what()); + failed = true; + } catch (std::exception &ex) { + IPX_CTX_ERROR(ctx, "Unexpected error has occurred: %s", ex.what()); + failed = true; + } catch (...) { + IPX_CTX_ERROR(ctx, "Unknown error has occurred!"); + failed = true; + } + + if (failed) { + IPX_CTX_ERROR(ctx, "Due to the previous error(s), the output file is possibly corrupted. " + "Therefore, no flow records are stored until a new file is automatically opened " + "after current window expiration."); + inst->storage_ptr->window_close(); + } + + return IPX_OK; +}