From cee8e4898e528bf9b056d15c8a78139c117f26de Mon Sep 17 00:00:00 2001 From: Lukas Hutak Date: Fri, 21 Jun 2019 16:20:12 +0200 Subject: [PATCH 1/4] FDS output: initial version of the new plugin --- src/plugins/output/fds/CMakeLists.txt | 31 ++ src/plugins/output/fds/README.rst | 69 +++++ .../output/fds/doc/ipfixcol2-fds-output.7.rst | 20 ++ src/plugins/output/fds/src/Config.cpp | 190 ++++++++++++ src/plugins/output/fds/src/Config.hpp | 64 ++++ src/plugins/output/fds/src/Exception.hpp | 34 +++ src/plugins/output/fds/src/Storage.cpp | 286 ++++++++++++++++++ src/plugins/output/fds/src/Storage.hpp | 106 +++++++ src/plugins/output/fds/src/fds.cpp | 127 ++++++++ 9 files changed, 927 insertions(+) create mode 100644 src/plugins/output/fds/CMakeLists.txt create mode 100644 src/plugins/output/fds/README.rst create mode 100644 src/plugins/output/fds/doc/ipfixcol2-fds-output.7.rst create mode 100644 src/plugins/output/fds/src/Config.cpp create mode 100644 src/plugins/output/fds/src/Config.hpp create mode 100644 src/plugins/output/fds/src/Exception.hpp create mode 100644 src/plugins/output/fds/src/Storage.cpp create mode 100644 src/plugins/output/fds/src/Storage.hpp create mode 100644 src/plugins/output/fds/src/fds.cpp diff --git a/src/plugins/output/fds/CMakeLists.txt b/src/plugins/output/fds/CMakeLists.txt new file mode 100644 index 00000000..2c2a6a83 --- /dev/null +++ b/src/plugins/output/fds/CMakeLists.txt @@ -0,0 +1,31 @@ +# Create a linkable module +add_library(fds-output MODULE + src/Config.cpp + src/Config.hpp + src/Exception.hpp + src/fds.cpp + src/Storage.cpp + src/Storage.hpp +) + +install( + TARGETS fds-output + LIBRARY DESTINATION "${INSTALL_DIR_LIB}/ipfixcol2/" +) + +if (ENABLE_DOC_MANPAGE) + # Build a manual page + set(SRC_FILE "${CMAKE_CURRENT_SOURCE_DIR}/doc/ipfixcol2-fds-output.7.rst") + set(DST_FILE "${CMAKE_CURRENT_BINARY_DIR}/ipfixcol2-fds-output.7") + + add_custom_command(TARGET fds-output PRE_BUILD + COMMAND ${RST2MAN_EXECUTABLE} --syntax-highlight=none ${SRC_FILE} ${DST_FILE} + DEPENDS ${SRC_FILE} + VERBATIM + ) + + install( + FILES "${DST_FILE}" + DESTINATION "${INSTALL_DIR_MAN}/man7" + ) +endif() diff --git a/src/plugins/output/fds/README.rst b/src/plugins/output/fds/README.rst new file mode 100644 index 00000000..09b57efe --- /dev/null +++ b/src/plugins/output/fds/README.rst @@ -0,0 +1,69 @@ +Flow Data Storage (output plugin) +================================= + +The plugin converts and stores IPFIX Data Records into FDS file format. The file +is based on IPFIX, therefore, it provides highly-effective way for long-term +storage and stores complete flow records (including all Enterprise-specific +fields, biflow, etc.) together with identification of the flow exporters who +exported these records. + +All data are stored into flat files, which are automatically rotated and renamed +every N minutes (by default 5 minutes). + +Example configuration +--------------------- + +.. code-block:: xml + + + FDS output + fds + + /tmp/ipfixcol2/fds/ + none + + 300 + yes + + + + +Parameters +---------- + +:``storagePath``: + The path element specifies the storage directory for data files. Keep on + mind that the path must exist in your system. Otherwise, no files are stored. + All files will be stored based on the configuration using the following + template: ``/YYYY/MM/DD/flows..fds`` where ``YYYY/MM/DD`` + means year/month/day and ```` represents a UTC timestamp in + format ``YYMMDDhhmmss``. + +:``compression``: + Data compression helps to significantly reduce size of output files. + Following compression algorithms are available: + + :``none``: Compression disabled [default] + :``lz4``: LZ4 compression (very fast, slightly worse compression ration) + :``zstd``: ZSTD compression (slightly slower, good compression ration) + +:``dumpInterval``: + Configuration of output files rotation. + + :``timeWindow``: + Specifies time interval in seconds to rotate files i.e. close the current + file and create a new one. [default: 300] + + :``align``: + Align file rotation with next N minute interval. For example, if enabled + and window size is 5 minutes long, files will be created at 0, 5, 10, etc. + [values: yes/no, default: yes] + +:``asyncIO``: + Allows to use asynchronous I/O for writing to the file. Usually when parts + of the file are being written, the process is blocked on synchronous I/O + and waits for the operation to complete. However, asynchronous I/O allows + the plugin to simultaneously write to file and process flow records, which + significantly improves overall performance. (Note: a pool of service + threads shared among instances of FDS plugin might be created). + [values: true/false, default: true] diff --git a/src/plugins/output/fds/doc/ipfixcol2-fds-output.7.rst b/src/plugins/output/fds/doc/ipfixcol2-fds-output.7.rst new file mode 100644 index 00000000..a978730e --- /dev/null +++ b/src/plugins/output/fds/doc/ipfixcol2-fds-output.7.rst @@ -0,0 +1,20 @@ +====================== + ipfixcol2-fds-output +====================== + +--------------------------------- +Flow Data Storage (output plugin) +--------------------------------- + +:Author: Lukáš Huták (lukas.hutak@cesnet.cz) +:Date: 2019-07-01 +:Copyright: Copyright © 2019 CESNET, z.s.p.o. +:Version: 2.0 +:Manual section: 7 +:Manual group: IPFIXcol collector + +Description +----------- + +.. include:: ../README.rst + :start-line: 3 diff --git a/src/plugins/output/fds/src/Config.cpp b/src/plugins/output/fds/src/Config.cpp new file mode 100644 index 00000000..a06d61e2 --- /dev/null +++ b/src/plugins/output/fds/src/Config.cpp @@ -0,0 +1,190 @@ +/** + * \file src/plugins/output/fds/src/Config.cpp + * \author Lukas Hutak + * \brief Parser of XML configuration (source file) + * \date 2019 + * + * Copyright(c) 2019 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "Config.hpp" +#include +#include + +/* + * + * ... + * ... + * + * ... + * ... + * + * ... + * + */ + +/// XML nodes +enum params_xml_nodes { + NODE_STORAGE = 1, + NODE_COMPRESS, + NODE_DUMP, + NODE_ASYNCIO, + + DUMP_WINDOW, + DUMP_ALIGN +}; + +/// Definition of the \ node +static const struct fds_xml_args args_dump[] = { + FDS_OPTS_ELEM(DUMP_WINDOW, "timeWindow", FDS_OPTS_T_UINT, FDS_OPTS_P_OPT), + FDS_OPTS_ELEM(DUMP_ALIGN, "align", FDS_OPTS_T_BOOL, FDS_OPTS_P_OPT), + FDS_OPTS_END +}; + +/// Definition of the \ node +static const struct fds_xml_args args_params[] = { + FDS_OPTS_ROOT("params"), + FDS_OPTS_ELEM(NODE_STORAGE, "storagePath", FDS_OPTS_T_STRING, 0), + FDS_OPTS_ELEM(NODE_COMPRESS, "compression", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), + FDS_OPTS_NESTED(NODE_DUMP, "dumpInterval", args_dump, FDS_OPTS_P_OPT), + FDS_OPTS_ELEM(NODE_ASYNCIO, "asyncIO", FDS_OPTS_T_BOOL, FDS_OPTS_P_OPT), + FDS_OPTS_END +}; + +Config::Config(const char *params) +{ + set_default(); + + // Create XML parser + std::unique_ptr xml(fds_xml_create(), &fds_xml_destroy); + if (!xml) { + throw std::runtime_error("Failed to create an XML parser!"); + } + + if (fds_xml_set_args(xml.get(), args_params) != FDS_OK) { + throw std::runtime_error("Failed to parse the description of an XML document!"); + } + + fds_xml_ctx_t *params_ctx = fds_xml_parse_mem(xml.get(), params, true); + if (!params_ctx) { + std::string err = fds_xml_last_err(xml.get()); + throw std::runtime_error("Failed to parse the configuration: " + err); + } + + // Parse parameters and check configuration + try { + parse_root(params_ctx); + validate(); + } catch (std::exception &ex) { + throw std::runtime_error("Failed to parse the configuration: " + std::string(ex.what())); + } +} + +/** + * @brief Set default parameters + */ +void +Config::set_default() +{ + m_path.clear(); + m_calg = calg::NONE; + m_async = true; + + m_window.align = true; + m_window.size = WINDOW_SIZE; +} + +/** + * @brief Check if the configuration is valid + * @throw runtime_error if the configuration breaks some rules + */ +void +Config::validate() +{ + if (m_path.empty()) { + throw std::runtime_error("Storage path cannot be empty!"); + } + + if (m_window.size == 0) { + throw std::runtime_error("Window size cannot be zero!"); + } +} + +/** + * @brief Process \ node + * @param[in] ctx XML context to process + * @throw runtime_error if the parser fails + */ +void +Config::parse_root(fds_xml_ctx_t *ctx) +{ + const struct fds_xml_cont *content; + while (fds_xml_next(ctx, &content) != FDS_EOC) { + switch (content->id) { + case NODE_STORAGE: + // Storage path + assert(content->type == FDS_OPTS_T_STRING); + m_path = content->ptr_string; + break; + case NODE_COMPRESS: + // Compression method + assert(content->type == FDS_OPTS_T_STRING); + if (strcasecmp(content->ptr_string, "none") == 0) { + m_calg = calg::NONE; + } else if (strcasecmp(content->ptr_string, "lz4") == 0) { + m_calg = calg::LZ4; + } else if (strcasecmp(content->ptr_string, "zstd") == 0) { + m_calg = calg::ZSTD; + } else { + const std::string inv_str = content->ptr_string; + throw std::runtime_error("Unknown compression algorithm '" + inv_str + "'"); + } + break; + case NODE_ASYNCIO: + // Asynchronous I/O + assert(content->type == FDS_OPTS_T_BOOL); + m_async = content->val_bool; + break; + case NODE_DUMP: + // Dump window + assert(content->type == FDS_OPTS_T_CONTEXT); + parse_dump(content->ptr_ctx); + break; + default: + // Internal error + throw std::runtime_error("Unknown XML node"); + } + } +} + +/** + * @brief Auxiliary function for parsing \ options + * @param[in] ctx XML context to process + * @throw runtime_error if the parser fails + */ +void +Config::parse_dump(fds_xml_ctx_t *ctx) +{ + const struct fds_xml_cont *content; + while(fds_xml_next(ctx, &content) != FDS_EOC) { + switch (content->id) { + case DUMP_WINDOW: + // Window size + assert(content->type == FDS_OPTS_T_UINT); + if (content->val_uint > UINT32_MAX) { + throw std::runtime_error("Window size is too long!"); + } + m_window.size = static_cast(content->val_uint); + break; + case DUMP_ALIGN: + // Window alignment + assert(content->type == FDS_OPTS_T_BOOL); + m_window.align = content->val_bool; + break; + default: + // Internal error + throw std::runtime_error("Unknown XML node"); + } + } +} \ No newline at end of file diff --git a/src/plugins/output/fds/src/Config.hpp b/src/plugins/output/fds/src/Config.hpp new file mode 100644 index 00000000..876ef466 --- /dev/null +++ b/src/plugins/output/fds/src/Config.hpp @@ -0,0 +1,64 @@ +/** + * \file src/plugins/output/fds/src/Config.hpp + * \author Lukas Hutak + * \brief Parser of XML configuration (header file) + * \date 2019 + * + * Copyright(c) 2019 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef IPFIXCOL2_FDS_CONFIG_HPP +#define IPFIXCOL2_FDS_CONFIG_HPP + +#include +#include + +/** + * @brief Plugin configuration parser + */ +class Config { +public: + /** + * @brief Parse configuration of the plugin + * @param[in] params XML parameters to parse + * @throw runtime_exception on error + */ + Config(const char *params); + ~Config() = default; + + enum class calg { + NONE, ///< Do not use compression + LZ4, ///< LZ4 compression + ZSTD ///< ZSTD compression + }; + + /// Storage path + std::string m_path; + /// Compression algorithm + calg m_calg; + /// Asynchronous I/O enabled + bool m_async; + + struct { + bool align; ///< Enable/disable window alignment + uint32_t size; ///< Time window size + } m_window; ///< Window alignment + +private: + /// Default window size + static const uint32_t WINDOW_SIZE = 300U; + + void + set_default(); + void + validate(); + + void + parse_root(fds_xml_ctx_t *ctx); + void + parse_dump(fds_xml_ctx_t *ctx); +}; + + +#endif // IPFIXCOL2_FDS_CONFIG_HPP diff --git a/src/plugins/output/fds/src/Exception.hpp b/src/plugins/output/fds/src/Exception.hpp new file mode 100644 index 00000000..f5325c5f --- /dev/null +++ b/src/plugins/output/fds/src/Exception.hpp @@ -0,0 +1,34 @@ +/** + * \file src/plugins/output/fds/src/Exception.hpp + * \author Lukas Hutak + * \brief Plugin specific exception (header file) + * \date 2019 + * + * Copyright(c) 2019 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef IPFIXCOL2_FDS_EXCEPTION_HPP +#define IPFIXCOL2_FDS_EXCEPTION_HPP + +#include +#include + +/// Plugin specific exception +class FDS_exception : public std::runtime_error { +public: + /** + * @brief Constructor + * @param[in] str Error message + */ + FDS_exception(const std::string &str) : std::runtime_error(str) {}; + /** + * @brief Constructor + * @param[in] str Error message + */ + FDS_exception(const char *str) : std::runtime_error(str) {}; + // Default destructor + ~FDS_exception() = default; +}; + +#endif //IPFIXCOL2_FDS_EXCEPTION_HPP diff --git a/src/plugins/output/fds/src/Storage.cpp b/src/plugins/output/fds/src/Storage.cpp new file mode 100644 index 00000000..64655a66 --- /dev/null +++ b/src/plugins/output/fds/src/Storage.cpp @@ -0,0 +1,286 @@ +/** + * \file src/plugins/output/fds/src/Storage.cpp + * \author Lukas Hutak + * \brief FDS file storage (source file) + * \date June 2019 + * + * Copyright(c) 2019 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include +#include +#include +#include +#include +#include "Storage.hpp" + +Storage::Storage(ipx_ctx_t *ctx, const Config &cfg) : m_ctx(ctx), m_path(cfg.m_path) +{ + // Check if the directory exists + struct stat file_info; + memset(&file_info, 0, sizeof(file_info)); + if (stat(m_path.c_str(), &file_info) != 0 || !S_ISDIR(file_info.st_mode)) { + throw FDS_exception("Directory '" + m_path + "' doesn't exist or search permission is denied"); + } + + // Prepare flags for FDS file + m_flags = 0; + switch (cfg.m_calg) { + case Config::calg::LZ4: + m_flags |= FDS_FILE_LZ4; + break; + case Config::calg::ZSTD: + m_flags |= FDS_FILE_ZSTD; + break; + default: + break; + } + + if (!cfg.m_async) { + m_flags |= FDS_FILE_NOASYNC; + } + + m_flags |= FDS_FILE_APPEND; +} + +void +Storage::window_new(time_t ts) +{ + // Close the current window if exists + window_close(); + + // Open new file + const std::string new_file = filename_gen(ts); + std::unique_ptr new_file_cpy(strdup(new_file.c_str()), &free); + + char *dir2create; + if (!new_file_cpy || (dir2create = dirname(new_file_cpy.get())) == nullptr) { + throw FDS_exception("Failed to generate name of an output directory!"); + } + + if (ipx_utils_mkdir(dir2create, IPX_UTILS_MKDIR_DEF) != FDS_OK) { + throw FDS_exception("Failed to create directory '" + std::string(dir2create) + "'"); + } + + m_file.reset(fds_file_init()); + if (!m_file) { + throw FDS_exception("Failed to create FDS file handler!"); + } + + if (fds_file_open(m_file.get(), new_file.c_str(), m_flags) != FDS_OK) { + m_file.reset(); + const char *err_msg = "something"; //fds_file_error(m_file.get()); + throw FDS_exception("Failed to create file '" + new_file + "': " + err_msg); + } +} + +void +Storage::window_close() +{ + m_file.reset(); + m_session2params.clear(); +} + +void +Storage::process_msg(ipx_msg_ipfix_t *msg) +{ + if (!m_file) { + IPX_CTX_DEBUG(m_ctx, "Ignoring IPFIX Message due to undefined output file!", '\0'); + return; + } + + // Specify a Transport Session context + struct ipx_msg_ctx *msg_ctx = ipx_msg_ipfix_get_ctx(msg); + session_ctx &file_ctx = session_get(msg_ctx->session); + + auto hdr_ptr = reinterpret_cast(ipx_msg_ipfix_get_packet(msg)); + assert(ntohs(hdr_ptr->version) == FDS_IPFIX_VERSION && "Unexpected packet version"); + const uint32_t exp_time = ntohl(hdr_ptr->export_time); + + if (fds_file_write_ctx(m_file.get(), file_ctx.id, msg_ctx->odid, exp_time) != FDS_OK) { + const char *err_msg = fds_file_error(m_file.get()); + throw FDS_exception("Failed to configure file writer: " + std::string(err_msg)); + } + + // For each Data Record in the file + const uint32_t rec_cnt = ipx_msg_ipfix_get_drec_cnt(msg); + for (uint32_t i = 0; i < rec_cnt; ++i) { + ipx_ipfix_record *rec_ptr = ipx_msg_ipfix_get_drec(msg, i); + + // Insert the record // TODO: improve me! + const struct fds_template *rec_tmplt = rec_ptr->rec.tmplt; + uint16_t tmplt_id = rec_tmplt->id; + enum fds_template_type t_type; + const uint8_t *t_data; + uint16_t t_size; + + int rc = fds_file_write_tmplt_get(m_file.get(), tmplt_id, &t_type, &t_data, &t_size); + if (rc != FDS_OK && rc != FDS_ERR_NOTFOUND) { + // Something bad happened + const char *err_msg = fds_file_error(m_file.get()); + throw FDS_exception("fds_file_write_tmplt_get() failed: " + std::string(err_msg)); + } + + if (rc == FDS_ERR_NOTFOUND || t_type != rec_tmplt->type || t_size != rec_tmplt->raw.length + || memcmp(t_data, rec_tmplt->raw.data, rec_tmplt->raw.length) != 0) { + // Template not defined or the template are different + t_type = rec_tmplt->type; + t_data = rec_tmplt->raw.data; + t_size = rec_tmplt->raw.length; + + if (fds_file_write_tmplt_add(m_file.get(), t_type, t_data, t_size) != FDS_OK) { + const char *err_msg = fds_file_error(m_file.get()); + throw FDS_exception("Failed to add a template: " + std::string(err_msg)); + } + } + + // FIXME: check subTemplateList & subTemplateMultiList templates + + // Write the Data record + const uint8_t *rec_data = rec_ptr->rec.data; + const uint16_t rec_size = rec_ptr->rec.size; + if (fds_file_write_rec(m_file.get(), tmplt_id, rec_data, rec_size) != FDS_OK) { + const char *err_msg = fds_file_error(m_file.get()); + throw FDS_exception("Failed to add a Data Record: " + std::string(err_msg)); + } + } +} + +/** + * @brief Create a filename based for a user defined timestamp + * @note The timestamp will be expressed in Coordinated Universal Time (UTC) + * + * @param[in] ts Timestamp of the file + * @return New filename + * @throw FDS_exception if formatting functions fail. + */ +std::string +Storage::filename_gen(const time_t &ts) +{ + const char pattern[] = "%Y/%m/%d/flows.%Y%m%d%H%M%S.fds"; + constexpr size_t buffer_size = 64; + char buffer_data[buffer_size]; + + struct tm utc_time; + if (!gmtime_r(&ts, &utc_time)) { + throw FDS_exception("gmtime_r() failed"); + } + + if (strftime(buffer_data, buffer_size, pattern, &utc_time) == 0) { + throw FDS_exception("strftime() failed"); + } + + std::string new_path = m_path; + if (new_path.back() != '/') { + new_path += '/'; + } + + return new_path + buffer_data; +} + +/** + * @brief Convert IPv4 address to IPv4-mapped IPv6 address + * + * New IPv6 address has value corresponding to "::FFFF:\" + * @warning Size of @p in must be at least 4 bytes and @p out at least 16 bytes! + * @param[in] in IPv4 address + * @param[out] out IPv4-mapped IPv6 address + */ +void +Storage::ipv4toipv6(const uint8_t *in, uint8_t *out) +{ + memset(out, 0, 16U); + *(uint16_t *) &out[10] = UINT16_MAX; // 0xFFFF + memcpy(&out[12], in, 4U); // Copy IPv4 address +} + +/** + * @brief Get file identification of a Transport Session + * + * If the identification doesn't exist, the function will add it to the file and create + * a new internal record for it. + * + * @param[in] sptr Transport Session to find + * @return Internal description + * @throw FDS_exception if the function failed to add a new Transport Session + */ +struct Storage::session_ctx & +Storage::session_get(const struct ipx_session *sptr) +{ + auto res_it = m_session2params.find(sptr); + if (res_it != m_session2params.end()) { + // Found + return res_it->second; + } + + // Not found -> register a new session + assert(m_file != nullptr && "File must be opened!"); + + struct fds_file_session new_session; + fds_file_sid_t new_sid; + + session_ipx2fds(sptr, &new_session); + if (fds_file_session_add(m_file.get(), &new_session, &new_sid) != FDS_OK) { + const char *err_msg = fds_file_error(m_file.get()); + throw FDS_exception("Failed to register Transport Session '" + std::string(sptr->ident) + + "': " + err_msg); + } + + // Create a new session + struct session_ctx &ctx = m_session2params[sptr]; + ctx.id = new_sid; + return ctx; +} + +/** + * @brief Convert IPFIXcol representation of a Transport Session to FDS representation + * @param[in] ipx_desc IPFIXcol specific representation + * @param[out] fds_desc FDS specific representation + * @throw FDS_exception if conversion fails due to unsupported Transport Session type + */ +void +Storage::session_ipx2fds(const struct ipx_session *ipx_desc, struct fds_file_session *fds_desc) +{ + // Initialize Transport Session structure + memset(fds_desc, 0, sizeof *fds_desc); + + // Extract protocol type and description + const struct ipx_session_net *net_desc = nullptr; + switch (ipx_desc->type) { + case FDS_SESSION_UDP: + net_desc = &ipx_desc->udp.net; + fds_desc->proto = FDS_FILE_SESSION_UDP; + break; + case FDS_SESSION_TCP: + net_desc = &ipx_desc->tcp.net; + fds_desc->proto = FDS_FILE_SESSION_TCP; + break; + case FDS_SESSION_SCTP: + net_desc = &ipx_desc->sctp.net; + fds_desc->proto = FDS_FILE_SESSION_SCTP; + break; + default: + throw FDS_exception("Not implemented Transport Session type!"); + } + + // Convert ports + fds_desc->port_src = net_desc->port_src; + fds_desc->port_dst = net_desc->port_dst; + + // Convert IP addresses + if (net_desc->l3_proto == AF_INET) { + // IPv4 address + static_assert(sizeof(net_desc->addr_src.ipv4) >= 4U, "Invalid size"); + static_assert(sizeof(net_desc->addr_dst.ipv4) >= 4U, "Invalid size"); + ipv4toipv6(reinterpret_cast(&net_desc->addr_src.ipv4), fds_desc->ip_src); + ipv4toipv6(reinterpret_cast(&net_desc->addr_dst.ipv4), fds_desc->ip_dst); + } else { + // IPv6 address + static_assert(sizeof(fds_desc->ip_src) <= sizeof(net_desc->addr_src.ipv6), "Invalid size"); + static_assert(sizeof(fds_desc->ip_dst) <= sizeof(net_desc->addr_dst.ipv6), "Invalid size"); + memcpy(&fds_desc->ip_src, &net_desc->addr_src.ipv6, sizeof fds_desc->ip_src); + memcpy(&fds_desc->ip_dst, &net_desc->addr_dst.ipv6, sizeof fds_desc->ip_dst); + } +} + diff --git a/src/plugins/output/fds/src/Storage.hpp b/src/plugins/output/fds/src/Storage.hpp new file mode 100644 index 00000000..7c022a00 --- /dev/null +++ b/src/plugins/output/fds/src/Storage.hpp @@ -0,0 +1,106 @@ +/** + * \file src/plugins/output/fds/src/Storage.hpp + * \author Lukas Hutak + * \brief FDS file storage (header file) + * \date June 2019 + * + * Copyright(c) 2019 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef IPFIXCOL2_FDS_STORAGE_HPP +#define IPFIXCOL2_FDS_STORAGE_HPP + +#include +#include +#include +#include +#include + +#include "Exception.hpp" +#include "Config.hpp" + +/// Flow storage file +class Storage { +public: + /** + * @brief Create a flow storage file + * + * @note + * Output file for the current window MUST be specified using new_window() function. + * Otherwise, no flow records are stored. + * + * @param[in] ctx Plugin context (only for log) + * @param[in] cfg Configuration + * @throw FDS_exception if @p path directory doesn't exist in the system + */ + Storage(ipx_ctx_t *ctx, const Config &cfg); + virtual ~Storage() = default; + + // Disable copy constructors + Storage(const Storage &other) = delete; + Storage &operator=(const Storage &other) = delete; + + /** + * @brief Create a new time window + * + * @note Previous window is automatically closed, if exists. + * @param[in] ts Timestamp of the window + * @throw FDS_exception if the new window cannot be created + */ + void + window_new(time_t ts); + + /** + * @brief Close the current time window + * @note + * This can be also useful if a fatal error has occurred and we should not add more flow + * records to the file. + * @note + * No more Data Records will be added until a new window is created! + */ + void + window_close(); + + /** + * @brief Process IPFIX message + * + * Process all IPFIX Data Records in the message and store them to the file. + * @note If a time window is not opened, no Data Records are stored and no exception is thrown. + * @param[in] msg Message to process + * @throw FDS_exception if processing fails + */ + void + process_msg(ipx_msg_ipfix_t *msg); + +private: + /// Description parameters of a Transport Session + struct session_ctx { + /// FDS file ID + fds_file_sid_t id; + }; + + /// Plugin context only for logging! + ipx_ctx_t *m_ctx; + /// Storage path + std::string m_path; + /// Flags for opening file + uint32_t m_flags; + + /// Output FDS file + std::unique_ptr m_file = {nullptr, &fds_file_close}; + /// Mapping of Transport Sessions to FDS specific parameters + std::map m_session2params; + + std::string + filename_gen(const time_t &ts); + static void + ipv4toipv6(const uint8_t *in, uint8_t *out); + struct session_ctx & + session_get(const struct ipx_session *sptr); + void + session_ipx2fds(const struct ipx_session *ipx_desc, struct fds_file_session *fds_desc); +}; + + +#endif // IPFIXCOL2_FDS_STORAGE_HPP diff --git a/src/plugins/output/fds/src/fds.cpp b/src/plugins/output/fds/src/fds.cpp new file mode 100644 index 00000000..c0ecedbe --- /dev/null +++ b/src/plugins/output/fds/src/fds.cpp @@ -0,0 +1,127 @@ +/** + * \file src/plugins/output/dummy/dummy.c + * \author Lukas Hutak + * \brief Example output plugin for IPFIXcol 2 + * \date 2018 + * + * Copyright(c) 2019 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include +#include +#include +#include +#include + +#include "Config.hpp" +#include "Storage.hpp" + +/// Plugin description +IPX_API struct ipx_plugin_info ipx_plugin_info = { + // Plugin identification name + "fds", + // Brief description of plugin + "Flow Data Storage output plugin", + // Plugin type + IPX_PT_OUTPUT, + // Configuration flags (reserved for future use) + 0, + // Plugin version string (like "1.2.3") + "2.0.0", + // Minimal IPFIXcol version string (like "1.2.3") + "2.0.0" +}; + +/// Instance +struct Instance { + /// Parsed configuration + std::unique_ptr config_ptr = nullptr; + /// Storage file + std::unique_ptr storage_ptr = nullptr; + /// Start of the current window + time_t window_start = 0; +}; + +static void +window_check(struct Instance &inst) +{ + const Config &cfg = *inst.config_ptr; + + // Decide whether close file and create a new time window + time_t now = time(NULL); + if (difftime(now, inst.window_start) < cfg.m_window.size) { + // Nothing to do + return; + } + + if (cfg.m_window.align) { + const uint32_t window_size = cfg.m_window.size; + now /= window_size; + now *= window_size; + } + + inst.window_start = now; + inst.storage_ptr->window_new(now); +} + +int +ipx_plugin_init(ipx_ctx_t *ctx, const char *params) +{ + try { + // Parse configuration, try to create a storage and time window + std::unique_ptr instance(new Instance); + instance->config_ptr.reset(new Config(params)); + instance->storage_ptr.reset(new Storage(ctx, *instance->config_ptr)); + window_check(*instance); + // Everything seems OK + ipx_ctx_private_set(ctx, instance.release()); + } catch (const FDS_exception &ex) { + IPX_CTX_ERROR(ctx, "Initialization failed: %s", ex.what()); + return IPX_ERR_DENIED; + } catch (...) { + IPX_CTX_ERROR(ctx, "Unknown error has occurred!", '\0'); + return IPX_ERR_DENIED; + } + + return IPX_OK; +} + +void +ipx_plugin_destroy(ipx_ctx_t *ctx, void *cfg) +{ + (void) ctx; // Suppress warnings + + try { + auto inst = reinterpret_cast(cfg); + inst->storage_ptr.reset(); + inst->config_ptr.reset(); + delete inst; + } catch (...) { + IPX_CTX_ERROR(ctx, "Something bad happened during plugin destruction"); + } +} + +int +ipx_plugin_process(ipx_ctx_t *ctx, void *cfg, ipx_msg_t *msg) +{ + auto *inst = reinterpret_cast(cfg); + + try { + // Check if the current time window should be closed + window_check(*inst); + ipx_msg_ipfix_t *msg_ipfix = ipx_msg_base2ipfix(msg); + inst->storage_ptr->process_msg(msg_ipfix); + } catch (const FDS_exception &ex) { + IPX_CTX_ERROR(ctx, "%s", ex.what()); + inst->storage_ptr->window_close(); + } catch (std::exception &ex) { + IPX_CTX_ERROR(ctx, "Unexpected error has occurred: %s", ex.what()); + inst->storage_ptr->window_close(); + } catch (...) { + IPX_CTX_ERROR(ctx, "Unknown error has occurred!"); + inst->storage_ptr->window_close(); + } + + return IPX_OK; +} From 8002be6c7456ee888a82e270e48a25aae56f0027 Mon Sep 17 00:00:00 2001 From: Lukas Hutak Date: Fri, 21 Jun 2019 16:22:20 +0200 Subject: [PATCH 2/4] Utils: add a new section with utilities useful for plugins --- include/CMakeLists.txt | 1 + include/ipfixcol2.h | 1 + include/ipfixcol2/utils.h | 70 ++++++++++++++++++++++++++++++ src/core/utils.c | 91 ++++++++++++++++++++++++++++++++++++++- 4 files changed, 161 insertions(+), 2 deletions(-) create mode 100644 include/ipfixcol2/utils.h diff --git a/include/CMakeLists.txt b/include/CMakeLists.txt index e14eacfc..bc1eef5e 100644 --- a/include/CMakeLists.txt +++ b/include/CMakeLists.txt @@ -16,6 +16,7 @@ set(SUB_HEADERS ipfixcol2/message_session.h ipfixcol2/plugins.h ipfixcol2/session.h + ipfixcol2/utils.h ipfixcol2/verbose.h "${PROJECT_BINARY_DIR}/include/ipfixcol2/api.h" ) diff --git a/include/ipfixcol2.h b/include/ipfixcol2.h index 6ca3b2f4..d40fefbe 100644 --- a/include/ipfixcol2.h +++ b/include/ipfixcol2.h @@ -63,6 +63,7 @@ #include #include +#include #include diff --git a/include/ipfixcol2/utils.h b/include/ipfixcol2/utils.h new file mode 100644 index 00000000..e65fd143 --- /dev/null +++ b/include/ipfixcol2/utils.h @@ -0,0 +1,70 @@ +/** + * @file include/ipfixcol2/utils.h + * @author Lukas Hutak + * @brief Auxiliary utilities for plugins (header file) + * @date June 2019 + * + * Copyright(c) 2019 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef IPX_UTILS_H +#define IPX_UTILS_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include // mkdir file permissions + +/** +* \defgroup ipxSource Transport session identification +* \ingroup publicAPIs +* \brief Transport session interface +* +* Data types and API functions for identification and management of Transport +* session identification. The Exporting Process uses the Transport Session to +* send messages from multiple _independent_ Observation Domains to the +* Collecting Process. Moreover, in case of SCTP session messages are also send +* over _independent_ streams. +* +* Following structures represents Transport session between Exporting process +* and Collecting Process. However, proper processing of flows also requires +* distinguishing Observation Domain IDs and Stream identifications out of +* scope of these structures. +* +* @{ +*/ + +/** + * @brief Default file permission of newly created directories + * @note Read/write/execute for a user and his group, read/execute for others. + */ +#define IPX_UTILS_MKDIR_DEF (S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH) + +/** + * @brief Create recursively a directory + * + * @note + * File permission @p mode only affects newly created directories. In other + * words, if a directory (or subdirectory) already exists, file permission + * bits @p mode are not applied. + * @note + * The function is implemented as "recursive" wrapper over standard mkdir + * function. See man 3 mkdir for more information. + * @param[in] path Full directory path to create + * @param[in] mode The file permission bits of the new directories + * (see default value #IPX_UTILS_MKDIR_DEF) + * @return #IPX_OK on success + * @return #IPX_ERR_DENIED otherwise and errno is set appropriately. + */ +IPX_API int +ipx_utils_mkdir(const char *path, mode_t mode); + +/**@}*/ + +#ifdef __cplusplus +} +#endif +#endif // IPX_UTILS_H diff --git a/src/core/utils.c b/src/core/utils.c index 56aad98f..1d9049a6 100644 --- a/src/core/utils.c +++ b/src/core/utils.c @@ -43,10 +43,15 @@ #undef _GNU_SOURCE #define _POSIX_C_SOURCE 200809L + +#include +#include +#include #include -#include + #include "utils.h" + int ipx_strerror_fn(int errnum, char *buffer, size_t buffer_size) { @@ -57,4 +62,86 @@ ipx_strerror_fn(int errnum, char *buffer, size_t buffer_size) snprintf(buffer, buffer_size, "strerror_r() failed: Unable process error code %d!", errnum); return IPX_ERR_ARG; -} \ No newline at end of file +} + +int +ipx_utils_mkdir(const char *path, mode_t mode) +{ + const char ch_slash = '/'; + bool add_slash = false; + + // Check the parameter + size_t len = strlen(path); + if (path[len - 1] != ch_slash) { + len++; // We have to add another slash + add_slash = true; + } + + if (len > PATH_MAX - 1) { + errno = ENAMETOOLONG; + return IPX_ERR_DENIED; + } + + // Make a copy + char *path_cpy = malloc((len + 1) * sizeof(char)); // +1 for '\0' + if (!path_cpy) { + errno = ENOMEM; + return IPX_ERR_DENIED; + } + + strcpy(path_cpy, path); + if (add_slash) { + path_cpy[len - 1] = ch_slash; + path_cpy[len] = '\0'; + } + + struct stat info; + char *pos; + + // Create directories from the beginning + for (pos = path_cpy + 1; *pos; pos++) { + // Find a slash + if (*pos != ch_slash) { + continue; + } + + *pos = '\0'; // Temporarily truncate pathname + + // Check if a subdirectory exists + if (stat(path_cpy, &info) == 0) { + // Check if the "info" is about directory + if (!S_ISDIR(info.st_mode)) { + free(path_cpy); + errno = ENOTDIR; + return IPX_ERR_DENIED; + } + + // Fix the pathname and continue with the next subdirectory + *pos = ch_slash; + continue; + } + + // Errno is filled by stat() + if (errno != ENOENT) { + int errno_cpy = errno; + free(path_cpy); + errno = errno_cpy; + return IPX_ERR_DENIED; + } + + // Required directory doesn't exist -> create new one + if (mkdir(path_cpy, mode) != 0 && errno != EEXIST) { + // Failed (by the way, EEXIST because of race condition i.e. + // multiple applications creating the same folder) + int errno_cpy = errno; + free(path_cpy); + errno = errno_cpy; + return IPX_ERR_DENIED; + } + + *pos = ch_slash; + } + + free(path_cpy); + return IPX_OK; +} From 9b23fd8cc30b7b710056df027d370c96f0df7836 Mon Sep 17 00:00:00 2001 From: Lukas Hutak Date: Wed, 3 Jul 2019 17:03:00 +0200 Subject: [PATCH 3/4] FDS output: improved detection of Template modifications Detection is newly based on detection of snapshot changes. If a template snapshot of a Data Record is different (compared to a previous Data Record from the same source and ODID), all Templates in the new snapshot are compared to Templates already added to the file. New or redefined Templates are added and Templates that not defined in the new snapshot are removed. --- src/plugins/output/fds/src/Storage.cpp | 199 +++++++++++++++++++++---- src/plugins/output/fds/src/Storage.hpp | 20 ++- 2 files changed, 188 insertions(+), 31 deletions(-) diff --git a/src/plugins/output/fds/src/Storage.cpp b/src/plugins/output/fds/src/Storage.cpp index 64655a66..30db3d25 100644 --- a/src/plugins/output/fds/src/Storage.cpp +++ b/src/plugins/output/fds/src/Storage.cpp @@ -8,6 +8,8 @@ * SPDX-License-Identifier: BSD-3-Clause */ +#include +#include #include #include #include @@ -69,9 +71,9 @@ Storage::window_new(time_t ts) } if (fds_file_open(m_file.get(), new_file.c_str(), m_flags) != FDS_OK) { + std::string err_msg = fds_file_error(m_file.get()); m_file.reset(); - const char *err_msg = "something"; //fds_file_error(m_file.get()); - throw FDS_exception("Failed to create file '" + new_file + "': " + err_msg); + throw FDS_exception("Failed to create/append file '" + new_file + "': " + err_msg); } } @@ -100,51 +102,188 @@ Storage::process_msg(ipx_msg_ipfix_t *msg) if (fds_file_write_ctx(m_file.get(), file_ctx.id, msg_ctx->odid, exp_time) != FDS_OK) { const char *err_msg = fds_file_error(m_file.get()); - throw FDS_exception("Failed to configure file writer: " + std::string(err_msg)); + throw FDS_exception("Failed to configure the writer: " + std::string(err_msg)); } + // Get info about the last seen Template snapshot + struct snap_info &snap_last = file_ctx.odid2snap[msg_ctx->odid]; + // For each Data Record in the file const uint32_t rec_cnt = ipx_msg_ipfix_get_drec_cnt(msg); for (uint32_t i = 0; i < rec_cnt; ++i) { ipx_ipfix_record *rec_ptr = ipx_msg_ipfix_get_drec(msg, i); - // Insert the record // TODO: improve me! - const struct fds_template *rec_tmplt = rec_ptr->rec.tmplt; - uint16_t tmplt_id = rec_tmplt->id; - enum fds_template_type t_type; - const uint8_t *t_data; - uint16_t t_size; + // Check if the templates has been changed (detected by change of template snapshots) + if (rec_ptr->rec.snap != snap_last.ptr) { + const char *session_name = msg_ctx->session->ident; + uint32_t session_odid = msg_ctx->odid; + IPX_CTX_DEBUG(m_ctx, "Template snapshot of '%s' [ODID %" PRIu32 "] has been changed. " + "Updating template definitions...", session_name, session_odid); - int rc = fds_file_write_tmplt_get(m_file.get(), tmplt_id, &t_type, &t_data, &t_size); - if (rc != FDS_OK && rc != FDS_ERR_NOTFOUND) { - // Something bad happened + tmplts_update(snap_last, rec_ptr->rec.snap); + } + + // Write the Data Record + const uint8_t *rec_data = rec_ptr->rec.data; + uint16_t rec_size = rec_ptr->rec.size; + uint16_t tmplt_id = rec_ptr->rec.tmplt->id; + + if (fds_file_write_rec(m_file.get(), tmplt_id, rec_data, rec_size) != FDS_OK) { const char *err_msg = fds_file_error(m_file.get()); + throw FDS_exception("Failed to add a Data Record: " + std::string(err_msg)); + } + } +} + +/// Auxiliary data structure used in the snapshot iterator +struct tmplt_update_data { + /// Status of template processing + bool is_ok; + /// Plugin context (only for log!) + ipx_ctx_t *ctx; + + /// FDS file with specified context + fds_file_t *file; + /// Set of processed Templates in the snapshot + std::set ids; +}; + +/** + * @brief Callback function for updating definition of an IPFIX (Options) Template + * + * The function checks if the same Template is already defined in the current context of the file. + * If the Template is not present or it's different, the new Template definition is added to the + * file. + * @param[in] tmplt Template to process + * @param[in] data Auxiliary data structure \ref tmplt_update_data + * @return On success returns true. Otherwise returns false. + */ +static bool +tmplt_update_cb(const struct fds_template *tmplt, void *data) +{ + // Template type, raw data and size + enum fds_template_type t_type; + const uint8_t *t_data; + uint16_t t_size; + + auto info = reinterpret_cast(data); + + // No exceptions can be thrown in the C callback! + try { + uint16_t t_id = tmplt->id; + info->ids.emplace(t_id); + + // Get definition of the Template specified in the file + int res = fds_file_write_tmplt_get(info->file, t_id, &t_type, &t_data, &t_size); + + if (res != FDS_OK && res != FDS_ERR_NOTFOUND) { + // Something bad happened + const char *err_msg = fds_file_error(info->file); throw FDS_exception("fds_file_write_tmplt_get() failed: " + std::string(err_msg)); } - if (rc == FDS_ERR_NOTFOUND || t_type != rec_tmplt->type || t_size != rec_tmplt->raw.length - || memcmp(t_data, rec_tmplt->raw.data, rec_tmplt->raw.length) != 0) { - // Template not defined or the template are different - t_type = rec_tmplt->type; - t_data = rec_tmplt->raw.data; - t_size = rec_tmplt->raw.length; - - if (fds_file_write_tmplt_add(m_file.get(), t_type, t_data, t_size) != FDS_OK) { - const char *err_msg = fds_file_error(m_file.get()); - throw FDS_exception("Failed to add a template: " + std::string(err_msg)); - } + // Should we add/redefine the definition of the Template + if (res == FDS_OK + && tmplt->type == t_type + && tmplt->raw.length == t_size + && memcmp(tmplt->raw.data, t_data, t_size) == 0) { + // The same -> nothing to do + return info->is_ok; } - // FIXME: check subTemplateList & subTemplateMultiList templates + // Add the definition (i.e. templates are different or the template hasn't been defined) + IPX_CTX_DEBUG(info->ctx, "Adding/updating definition of Template ID %" PRIu16, t_id); - // Write the Data record - const uint8_t *rec_data = rec_ptr->rec.data; - const uint16_t rec_size = rec_ptr->rec.size; - if (fds_file_write_rec(m_file.get(), tmplt_id, rec_data, rec_size) != FDS_OK) { - const char *err_msg = fds_file_error(m_file.get()); - throw FDS_exception("Failed to add a Data Record: " + std::string(err_msg)); + t_type = tmplt->type; + t_data = tmplt->raw.data; + t_size = tmplt->raw.length; + + if (fds_file_write_tmplt_add(info->file, t_type, t_data, t_size) != FDS_OK) { + const char *err_msg = fds_file_error(info->file); + throw FDS_exception("fds_file_write_tmplt_add() failed: " + std::string(err_msg)); + } + + } catch (std::exception &ex) { + // Exceptions + IPX_CTX_ERROR(info->ctx, "Failure during update of Template ID %" PRIu16 ": %s", tmplt->id, + ex.what()); + info->is_ok = false; + } catch (...) { + // Other exceptions + IPX_CTX_ERROR(info->ctx, "Unknown exception thrown during template definition update", '\0'); + info->is_ok = false; + } + + return info->is_ok; +} + +/** + * @brief Update Template definitions for the current Transport Session and ODID + * + * The function compares Templates in the \p snap with Template definitions previously defined + * for the currently selected combination of Transport Session and ODID. For each different or + * previously undefined Template, its definition is added or updated. Definitions of Templates + * that were available in the previous snapshot but not available in the new one are removed. + * + * Finally, information (pointer, IDs) in \p info are updated to reflect the performed update. + * @warning + * Template definitions are always unique for a combination of Transport Session and ODID, + * therefore, appropriate file writer context MUST already set using fds_file_writer_ctx(). + * Parameters \p info and \p snap MUST also belong the same unique combination. + * @param[in] info Information about the last update of Templates (old snapshot ref. + list of IDs) + * @param[in] snap New Template snapshot with all valid Template definitions + */ +void +Storage::tmplts_update(struct snap_info &info, const fds_tsnapshot_t *snap) +{ + assert(info.ptr != snap && "Snapshots should be different"); + + // Prepare data for the callback function + struct tmplt_update_data data; + data.is_ok = true; + data.ctx = m_ctx; + data.file = m_file.get(); + data.ids.clear(); + + // Update templates + fds_tsnapshot_for(snap, &tmplt_update_cb, &data); + + // Check if the update failed + if (!data.is_ok) { + throw FDS_exception("Failed to update Template definitions"); + } + + // Check if there are any Template IDs that have been removed + std::set &ids_old = info.tmplt_ids; + std::set &ids_new = data.ids; + std::set ids2remove; + // Old Template IDs - New Templates IDs = Template IDs to remove + std::set_difference(ids_old.begin(), ids_old.end(), ids_new.begin(), ids_new.end(), + std::inserter(ids2remove, ids2remove.begin())); + + // Remove old templates that are not available in the new snapshot + for (uint16_t tid : ids2remove) { + IPX_CTX_DEBUG(m_ctx, "Removing definition of Template ID %" PRIu16, tid); + + int rc = fds_file_write_tmplt_remove(m_file.get(), tid); + if (rc == FDS_OK) { + continue; + } + + // Something bad happened + if (rc != FDS_ERR_NOTFOUND) { + std::string err_msg = fds_file_error(m_file.get()); + throw FDS_exception("fds_file_write_tmplt_remove() failed: " + err_msg); } + + // Weird, but not critical + IPX_CTX_WARNING(m_ctx, "Failed to remove undefined Template ID %" PRIu16 ". " + "Weird, this should not happen.", tid); } + + // Update information about the last update of Templates + info.ptr = snap; + std::swap(info.tmplt_ids, data.ids); } /** diff --git a/src/plugins/output/fds/src/Storage.hpp b/src/plugins/output/fds/src/Storage.hpp index 7c022a00..29b0528e 100644 --- a/src/plugins/output/fds/src/Storage.hpp +++ b/src/plugins/output/fds/src/Storage.hpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -74,10 +75,25 @@ class Storage { process_msg(ipx_msg_ipfix_t *msg); private: + /// Information about Templates in a snapshot + struct snap_info { + /// Last seen snapshot (might be already freed, do NOT dereference!) + const fds_tsnapshot_t *ptr; + /// Set of Template IDs in the snapshot + std::set tmplt_ids; + + snap_info() { + ptr = nullptr; + tmplt_ids.clear(); + } + }; + /// Description parameters of a Transport Session struct session_ctx { - /// FDS file ID + /// Session ID used in the FDS file fds_file_sid_t id; + /// Last seen snapshot for a specific ODID of the Transport Session + std::map odid2snap; }; /// Plugin context only for logging! @@ -100,6 +116,8 @@ class Storage { session_get(const struct ipx_session *sptr); void session_ipx2fds(const struct ipx_session *ipx_desc, struct fds_file_session *fds_desc); + void + tmplts_update(struct snap_info &info, const fds_tsnapshot_t *snap); }; From 1e8d2c41ccad87ab5cf6284d00b21086a86e565b Mon Sep 17 00:00:00 2001 From: Lukas Hutak Date: Wed, 3 Jul 2019 17:09:46 +0200 Subject: [PATCH 4/4] FDS output: update dependency of collector version, improve handling of exceptions during a message process --- src/plugins/output/fds/src/fds.cpp | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/plugins/output/fds/src/fds.cpp b/src/plugins/output/fds/src/fds.cpp index c0ecedbe..292e17ab 100644 --- a/src/plugins/output/fds/src/fds.cpp +++ b/src/plugins/output/fds/src/fds.cpp @@ -30,7 +30,7 @@ IPX_API struct ipx_plugin_info ipx_plugin_info = { // Plugin version string (like "1.2.3") "2.0.0", // Minimal IPFIXcol version string (like "1.2.3") - "2.0.0" + "2.1.0" }; /// Instance @@ -106,6 +106,7 @@ int ipx_plugin_process(ipx_ctx_t *ctx, void *cfg, ipx_msg_t *msg) { auto *inst = reinterpret_cast(cfg); + bool failed = false; try { // Check if the current time window should be closed @@ -114,12 +115,19 @@ ipx_plugin_process(ipx_ctx_t *ctx, void *cfg, ipx_msg_t *msg) inst->storage_ptr->process_msg(msg_ipfix); } catch (const FDS_exception &ex) { IPX_CTX_ERROR(ctx, "%s", ex.what()); - inst->storage_ptr->window_close(); + failed = true; } catch (std::exception &ex) { IPX_CTX_ERROR(ctx, "Unexpected error has occurred: %s", ex.what()); - inst->storage_ptr->window_close(); + failed = true; } catch (...) { IPX_CTX_ERROR(ctx, "Unknown error has occurred!"); + failed = true; + } + + if (failed) { + IPX_CTX_ERROR(ctx, "Due to the previous error(s), the output file is possibly corrupted. " + "Therefore, no flow records are stored until a new file is automatically opened " + "after current window expiration."); inst->storage_ptr->window_close(); }