From 750a09630112d183b8282e6b0c718472140283a3 Mon Sep 17 00:00:00 2001 From: Lukas Hutak Date: Mon, 4 May 2020 23:48:37 +0200 Subject: [PATCH] FDS input: initial version of FDS File reader --- src/plugins/input/CMakeLists.txt | 5 +- src/plugins/input/fds/Builder.cpp | 263 ++++++++++ src/plugins/input/fds/Builder.hpp | 119 +++++ src/plugins/input/fds/CMakeLists.txt | 33 ++ src/plugins/input/fds/Exception.hpp | 34 ++ src/plugins/input/fds/README.rst | 43 ++ src/plugins/input/fds/Reader.cpp | 484 ++++++++++++++++++ src/plugins/input/fds/Reader.hpp | 149 ++++++ src/plugins/input/fds/config.c | 188 +++++++ src/plugins/input/fds/config.h | 83 +++ .../input/fds/doc/ipfixcol2-fds-input.7.rst | 20 + src/plugins/input/fds/fds.cpp | 300 +++++++++++ 12 files changed, 1719 insertions(+), 2 deletions(-) create mode 100644 src/plugins/input/fds/Builder.cpp create mode 100644 src/plugins/input/fds/Builder.hpp create mode 100644 src/plugins/input/fds/CMakeLists.txt create mode 100644 src/plugins/input/fds/Exception.hpp create mode 100644 src/plugins/input/fds/README.rst create mode 100644 src/plugins/input/fds/Reader.cpp create mode 100644 src/plugins/input/fds/Reader.hpp create mode 100644 src/plugins/input/fds/config.c create mode 100644 src/plugins/input/fds/config.h create mode 100644 src/plugins/input/fds/doc/ipfixcol2-fds-input.7.rst create mode 100644 src/plugins/input/fds/fds.cpp diff --git a/src/plugins/input/CMakeLists.txt b/src/plugins/input/CMakeLists.txt index 532f235d..2ed5dff8 100644 --- a/src/plugins/input/CMakeLists.txt +++ b/src/plugins/input/CMakeLists.txt @@ -1,5 +1,6 @@ # List of input plugins to build and install add_subdirectory(dummy) -add_subdirectory(ipfix) add_subdirectory(tcp) -add_subdirectory(udp) \ No newline at end of file +add_subdirectory(udp) +add_subdirectory(ipfix) +add_subdirectory(fds) \ No newline at end of file diff --git a/src/plugins/input/fds/Builder.cpp b/src/plugins/input/fds/Builder.cpp new file mode 100644 index 00000000..a4b35116 --- /dev/null +++ b/src/plugins/input/fds/Builder.cpp @@ -0,0 +1,263 @@ +/** + * \file src/plugins/input/fds/Builder.cpp + * \author Lukas Hutak + * \brief IPFIX Message builder (implementation) + * \date May 2020 + */ + +#include +#include + +#include "Builder.hpp" +#include "Exception.hpp" + + +Builder::Builder(uint16_t size) +{ + struct fds_ipfix_msg_hdr *hdr_ptr; + + if (size < FDS_IPFIX_MSG_HDR_LEN) { + throw FDS_exception("[internal] Invalid size of a message to generate!"); + } + + m_msg.reset((uint8_t *) malloc(size)); + if (!m_msg) { + throw FDS_exception("Memory allocation error " + std::string(__PRETTY_FUNCTION__)); + } + + // Fill the message header (size will be filled on release) + hdr_ptr = reinterpret_cast(m_msg.get()); + hdr_ptr->version = htons(FDS_IPFIX_VERSION); + hdr_ptr->odid = 0; + hdr_ptr->seq_num = 0; + hdr_ptr->export_time = 0; + + // Positions + m_msg_alloc = size; + m_msg_valid = FDS_IPFIX_MSG_HDR_LEN; + m_set_offset = 0; + m_set_id = 0; // invalid +} + +void +Builder::resize(uint16_t size) +{ + uint8_t *new_ptr = (uint8_t *) realloc(m_msg.get(), size); + if (!new_ptr) { + throw FDS_exception("Memory allocation error " + std::string(__PRETTY_FUNCTION__)); + } + + m_msg.release(); // To avoid calling free() + m_msg.reset(new_ptr); + m_msg_alloc = size; + + if (m_msg_valid > m_msg_alloc) { + // The message has been trimmed! + m_msg_valid = m_msg_alloc; + } + + if (m_set_offset + FDS_IPFIX_SET_HDR_LEN > m_msg_alloc) { + // The current offset is out of range + m_set_offset = 0; + m_set_id = 0; + } +} + +bool +Builder::empty() +{ + return (!m_msg || m_msg_valid == FDS_IPFIX_MSG_HDR_LEN); +} + +uint8_t * +Builder::release() +{ + // Close the current set (if any) + fset_close(); + + // Update IPFIX Message header (size) + struct fds_ipfix_msg_hdr *hdr_ptr; + hdr_ptr = reinterpret_cast(m_msg.get()); + hdr_ptr->length = htons(m_msg_valid); + + m_msg_alloc = 0; + m_msg_valid = 0; + return m_msg.release(); +} + +/** + * @brief Create a new Set + * + * The previous Set is always closed even if the ID is the same. + * @param[in] sid Set ID + * @throw FDS_exception if the Message is full and the Set cannot be created + */ +void +Builder::fset_new(uint16_t sid) +{ + // Close the previous set (if any) + fset_close(); + + // Initialize a new IPFIX Set + if (FDS_IPFIX_SET_HDR_LEN > m_msg_alloc - m_msg_valid) { + throw FDS_exception("[internal] Insufficient space for Set in an IPFIX Message"); + } + + m_set_offset = m_msg_valid; + auto *set_ptr = reinterpret_cast(&m_msg.get()[m_set_offset]); + set_ptr->flowset_id = htons(sid); + m_msg_valid += FDS_IPFIX_SET_HDR_LEN; + m_set_size = FDS_IPFIX_SET_HDR_LEN; + m_set_id = sid; +} + +/** + * @brief Close the current Set (if any) + */ +void +Builder::fset_close() +{ + if (m_set_offset == 0) { + return; + } + + auto *set_ptr = reinterpret_cast(&m_msg.get()[m_set_offset]); + set_ptr->length = htons(m_set_size); + m_set_offset = 0; + m_set_id = 0; +} + +void +Builder::set_etime(uint32_t time) +{ + if (!m_msg) { + throw FDS_exception("[internal] IPFIX Message is not allocated!"); + } + + // Update IPFIX Message header (export time) + struct fds_ipfix_msg_hdr *hdr_ptr; + hdr_ptr = reinterpret_cast(m_msg.get()); + hdr_ptr->export_time = htonl(time); +} + +void +Builder::set_odid(uint32_t odid) +{ + if (!m_msg) { + throw FDS_exception("[internal] IPFIX Message is not allocated!"); + } + + // Update IPFIX Message header (export time) + struct fds_ipfix_msg_hdr *hdr_ptr; + hdr_ptr = reinterpret_cast(m_msg.get()); + hdr_ptr->odid = htonl(odid); +} + +void +Builder::set_seqnum(uint32_t seq_num) +{ + if (!m_msg) { + throw FDS_exception("[internal] IPFIX Message is not allocated!"); + } + + // Update IPFIX Message header (export time) + struct fds_ipfix_msg_hdr *hdr_ptr; + hdr_ptr = reinterpret_cast(m_msg.get()); + hdr_ptr->seq_num = htonl(seq_num); +} + +bool +Builder::add_template(const struct fds_template *tmplt) +{ + uint16_t tmplt_len = tmplt->raw.length; + uint16_t size_req = tmplt_len; + uint16_t set_id; + + switch (tmplt->type) { + case FDS_TYPE_TEMPLATE: + set_id = FDS_IPFIX_SET_TMPLT; + break; + case FDS_TYPE_TEMPLATE_OPTS: + set_id = FDS_IPFIX_SET_OPTS_TMPLT; + break; + default: + throw FDS_exception("[internal] Unexpected Template type cannot be used!"); + } + + if (m_set_offset == 0 || set_id != m_set_id) { + // New (Options) Template Set must be created + fset_close(); + size_req += FDS_IPFIX_SET_HDR_LEN; + } + + if (size_req > m_msg_alloc - m_msg_valid) { + // Unable to add + return false; + } + + if (m_set_offset == 0) { + fset_new(set_id); + } + + memcpy(&m_msg.get()[m_msg_valid], tmplt->raw.data, tmplt_len); + m_msg_valid += tmplt_len; + m_set_size += tmplt_len; + return true; +} + + +bool +Builder::add_record(const struct fds_drec *rec) +{ + uint16_t size_req = rec->size; + if (m_set_offset == 0 || rec->tmplt->id != m_set_id) { + // New Data Set must be created + fset_close(); + size_req += FDS_IPFIX_SET_HDR_LEN; + } + + if (size_req > m_msg_alloc - m_msg_valid) { + // Unable to add + return false; + } + + if (m_set_offset == 0) { + fset_new(rec->tmplt->id); + } + + memcpy(&m_msg.get()[m_msg_valid], rec->data, rec->size); + m_msg_valid += rec->size; + m_set_size += rec->size; + return true; +} + +bool +Builder::add_withdrawals() +{ + struct fds_ipfix_trec *rec_ptr = nullptr; + uint16_t size_req = 2U * FDS_IPFIX_WDRL_ALLSET_LEN; + + if (size_req > m_msg_alloc - m_msg_valid) { + return false; + } + + // All Templates Withdrawal + fset_new(FDS_IPFIX_SET_TMPLT); + rec_ptr = reinterpret_cast(&m_msg.get()[m_msg_valid]); + rec_ptr->template_id = htons(FDS_IPFIX_SET_TMPLT); + rec_ptr->count = htons(0); + m_msg_valid += 4U; // Only 4 bytes as specified in RFC 7011, 8.1 + m_set_size += 4U; + fset_close(); + + // All Options Template Withdrawal + fset_new(FDS_IPFIX_SET_OPTS_TMPLT); + rec_ptr = reinterpret_cast(&m_msg.get()[m_msg_valid]); + rec_ptr->template_id = htons(FDS_IPFIX_SET_OPTS_TMPLT); + rec_ptr->count = htons(0); + m_msg_valid += 4U; // Only 4 bytes as specified in RFC 7011, 8.1 + m_set_size += 4U; + fset_close(); + + return true; +} diff --git a/src/plugins/input/fds/Builder.hpp b/src/plugins/input/fds/Builder.hpp new file mode 100644 index 00000000..aa95e1a2 --- /dev/null +++ b/src/plugins/input/fds/Builder.hpp @@ -0,0 +1,119 @@ +/** + * \file src/plugins/input/fds/Builder.hpp + * \author Lukas Hutak + * \brief IPFIX Message builder + * \date May 2020 + */ + +#ifndef FDS_BUILDER_HPP +#define FDS_BUILDER_HPP + +#include +#include + +/// IPFIX Message builder +class Builder { +private: + /// Memory of IPFIX Message to generate (can be nullptr) + std::unique_ptr m_msg = {nullptr, &free}; + /// Allocated size (bytes) + uint16_t m_msg_alloc; + /// Filled size (bytes) + uint16_t m_msg_valid; + + /// Currently edited Flow Set (zero == invalid) + uint16_t m_set_offset; + /// Set ID of the current Flow Set + uint16_t m_set_id; + /// Size of the current IPFIX Set + uint16_t m_set_size; + + void + fset_new(uint16_t sid); + void + fset_close(); + +public: + /** + * @brief Create an IPFIX Message generator + * + * By default, ODID, Sequence Number, and Export Time are set to zeros. + * @param[in] size Maximal size of the message (allocation size) + */ + Builder(uint16_t size); + ~Builder() = default; + Builder(Builder &&other) = default; + + /** + * @brief Change maximal size of the message + * + * If the size is less than the size of the currently built message, the + * message is trimmed! + * @param[in] size Maximal size (i.e. allocation size) + */ + void + resize(uint16_t size); + /** + * @brief Test if the builder contains an IPFIX Message without content + * + * @note The builder is also considered as empty after release(). + * @return True/false + */ + bool + empty(); + /** + * @brief Release the generated IPFIX Message + * @warning After releasing, the class functions MUST NOT be used anymore! + * @return Pointer to the message (real size is part of the Message) + */ + uint8_t * + release(); + + /** + * @brief Set Export Time of the IPFIX Message + * @param[in] time Export Time + */ + void + set_etime(uint32_t time); + /** + * @brief Set Observation Domain ID (ODID) of the IPFIX Message + * @param[in] odid ODID + */ + void + set_odid(uint32_t odid); + /** + * @brief Set Sequence Number of the IPFIX Message + * @param[in] seq_num Sequence Number + */ + void + set_seqnum(uint32_t seq_num); + + /** + * @brief Add an (Options) Template Record + * @param[in] tmplt IPFIX (Options) Template + * @return True, if the Template has been successfully added + * @return False, if the Message is already full + */ + bool + add_template(const struct fds_template *tmplt); + /** + * @brief Add a Data Record + * @param[in] rec IPFIX Data Record + * @return True, if the Record has been successfully added + * @return False, if the Message is already full + */ + bool + add_record(const struct fds_drec *rec); + /** + * @brief Add an All (Options) Template Withdrawals (only TCP, SCTP, and File sessions) + * @note + * After calling the function, all previous (Options) Templates are considered to + * be invalid. + * @return True, if the Withdrawals has been successfully added + * @return False, if the Message is already full + */ + bool + add_withdrawals(); +}; + +#endif // FDS_BUILDER_HPP \ No newline at end of file diff --git a/src/plugins/input/fds/CMakeLists.txt b/src/plugins/input/fds/CMakeLists.txt new file mode 100644 index 00000000..b29b5dc3 --- /dev/null +++ b/src/plugins/input/fds/CMakeLists.txt @@ -0,0 +1,33 @@ +# Create a linkable module +add_library(fds-input MODULE + Builder.cpp + Builder.hpp + config.c + config.h + Exception.hpp + fds.cpp + Reader.cpp + Reader.hpp +) + +install( + TARGETS fds-input + LIBRARY DESTINATION "${INSTALL_DIR_LIB}/ipfixcol2/" +) + +if (ENABLE_DOC_MANPAGE) + # Build a manual page + set(SRC_FILE "${CMAKE_CURRENT_SOURCE_DIR}/doc/ipfixcol2-fds-input.7.rst") + set(DST_FILE "${CMAKE_CURRENT_BINARY_DIR}/ipfixcol2-fds-input.7") + + add_custom_command(TARGET fds-input PRE_BUILD + COMMAND ${RST2MAN_EXECUTABLE} --syntax-highlight=none ${SRC_FILE} ${DST_FILE} + DEPENDS ${SRC_FILE} + VERBATIM + ) + + install( + FILES "${DST_FILE}" + DESTINATION "${INSTALL_DIR_MAN}/man7" + ) +endif() diff --git a/src/plugins/input/fds/Exception.hpp b/src/plugins/input/fds/Exception.hpp new file mode 100644 index 00000000..b51febc2 --- /dev/null +++ b/src/plugins/input/fds/Exception.hpp @@ -0,0 +1,34 @@ +/** + * \file src/plugins/input/fds/Exception.hpp + * \author Lukas Hutak + * \brief Plugin specific exception (header file) + * \date 2019 + * + * Copyright(c) 2019 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef FDS_EXCEPTION_HPP +#define FDS_EXCEPTION_HPP + +#include +#include + +/// Plugin specific exception +class FDS_exception : public std::runtime_error { +public: + /** + * @brief Constructor + * @param[in] str Error message + */ + FDS_exception(const std::string &str) : std::runtime_error(str) {}; + /** + * @brief Constructor + * @param[in] str Error message + */ + FDS_exception(const char *str) : std::runtime_error(str) {}; + // Default destructor + ~FDS_exception() = default; +}; + +#endif // FDS_EXCEPTION_HPP diff --git a/src/plugins/input/fds/README.rst b/src/plugins/input/fds/README.rst new file mode 100644 index 00000000..bb592403 --- /dev/null +++ b/src/plugins/input/fds/README.rst @@ -0,0 +1,43 @@ +FDS File (input plugin) +========================= + +The plugin reads flow data from one or more files in FDS File format. It is possible to +use it to load flow records previously stored using FDS output plugin. + +Unlike UDP and TCP input plugins which infinitely waits for data from NetFlow/IPFIX +exporters, the plugin will terminate the collector after all files are processed. + +Example configuration +--------------------- + +.. code-block:: xml + + + FDS File + fds + + /tmp/flow/file.ipfix + + + +Parameters +---------- + +:``path``: + Path to file(s) in IPFIX File format. It is possible to use asterisk instead of + a filename/directory, tilde character (i.e. "~") instead of the home directory of + the user, and brace expressions (i.e. "/tmp/{source1,source2}/file.ipfix"). + Directories and non-IPFIX Files that match the file pattern are skipped/ignored. + +:``msgSize``: + Maximum size of the internally generated IPFIX Messages to which the content + of the file is inserted. [default: 32768, min: 512] + +:``asyncIO``: + Allows to use asynchronous I/O for reading the file. Usually when parts + of the file are being read, the process is blocked on synchronous I/O + and waits for the operation to complete. However, asynchronous I/O allows + the plugin to simultaneously read the file and process flow records, which + significantly improves overall performance. (Note: a pool of service + threads shared among instances of FDS plugin might be created). + [values: true/false, default: true] diff --git a/src/plugins/input/fds/Reader.cpp b/src/plugins/input/fds/Reader.cpp new file mode 100644 index 00000000..6e4bab4c --- /dev/null +++ b/src/plugins/input/fds/Reader.cpp @@ -0,0 +1,484 @@ +/** + * \file src/plugins/input/fds/Reader.hpp + * \author Lukas Hutak + * \brief FDS file reader (implementation) + * \date May 2020 + */ + +/* Copyright (C) 2020 CESNET, z.s.p.o. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * ALTERNATIVELY, provided that this notice is retained in full, this + * product may be distributed under the terms of the GNU General Public + * License (GPL) version 2 or later, in which case the provisions + * of the GPL apply INSTEAD OF those given above. + * + * This software is provided ``as is'', and any express or implied + * warranties, including, but not limited to, the implied warranties of + * merchantability and fitness for a particular purpose are disclaimed. + * In no event shall the company or contributors be liable for any + * direct, indirect, incidental, special, exemplary, or consequential + * damages (including, but not limited to, procurement of substitute + * goods or services; loss of use, data, or profits; or business + * interruption) however caused and on any theory of liability, whether + * in contract, strict liability, or tort (including negligence or + * otherwise) arising in any way out of the use of this software, even + * if advised of the possibility of such damage. + * + */ + +#include +#include +#include + +#include "Builder.hpp" +#include "Exception.hpp" +#include "Reader.hpp" + + +Reader::Reader(ipx_ctx_t *ctx, const fds_config *cfg, const char *path) + : m_ctx(ctx), m_cfg(cfg) +{ + uint32_t flags = FDS_FILE_READ; + flags |= (m_cfg->async) ? 0 : FDS_FILE_NOASYNC; + + m_file.reset(fds_file_init()); + if (!m_file) { + throw FDS_exception("fds_file_init() failed!"); + } + + if (fds_file_open(m_file.get(), path, flags) != FDS_OK) { + throw FDS_exception("Unable to open file '" + std::string(path)); + } +} + +Reader::~Reader() +{ + // Send notification about closing of all Transport Sessions + for (auto &it : m_sessions) { + session_close(it.second.info); + it.second.info = nullptr; + } +} + +/** + * @brief Get a Transport Session description given by FDS (Transport) Session ID + * + * The Session description is extracted from the FDS File and converted to particular + * IPFIXcol structure and returned. + * @param[in] sid Session ID + * @return Transport Session + * @throw FDS_exception in case of a failure (e.g. invalid ID, memory allocation error) + */ +struct ipx_session * +Reader::session_from_sid(fds_file_sid_t sid) +{ + const struct fds_file_session *desc; + struct ipx_session *session; + struct ipx_session_net session_net; + + if (fds_file_session_get(m_file.get(), sid, &desc) != FDS_OK) { + throw FDS_exception("Unable to get Transport Session with ID " + std::to_string(sid)); + } + + // Convert FDS structure to IPFIXcol structure + memset(&session_net, 0, sizeof(session_net)); + session_net.port_src = desc->port_src; + session_net.port_dst = desc->port_dst; + if (IN6_IS_ADDR_V4MAPPED(desc->ip_src) && IN6_IS_ADDR_V4MAPPED(desc->ip_dst)) { + session_net.l3_proto = AF_INET; + session_net.addr_src.ipv4 = *reinterpret_cast(&desc->ip_src[12]); + session_net.addr_dst.ipv4 = *reinterpret_cast(&desc->ip_dst[12]); + } else { + session_net.l3_proto = AF_INET6; + session_net.addr_src.ipv6 = *reinterpret_cast(&desc->ip_src[0]); + session_net.addr_dst.ipv6 = *reinterpret_cast(&desc->ip_dst[0]); + } + + switch (desc->proto) { + case FDS_FILE_SESSION_UDP: + session = ipx_session_new_udp(&session_net, UINT16_MAX, UINT16_MAX); + break; + case FDS_FILE_SESSION_TCP: + session = ipx_session_new_tcp(&session_net); + break; + case FDS_FILE_SESSION_SCTP: + session = ipx_session_new_sctp(&session_net); + break; + case FDS_FILE_SESSION_UNKNOWN: + session = ipx_session_new_file(("UnknownSID<" + std::to_string(sid) + ">").c_str()); + break; + default: + throw FDS_exception("Unknown FDS session type: " + std::to_string(desc->proto)); + } + + if (!session) { + throw FDS_exception("Failed to create a Transport Session " + "(probably a memory allocation error)"); + } + + return session; +} + +/** + * @brief Notify other plugins about a new Transport Session + * + * A new Session Message is generated and send to other plugins in the pipeline. + * @param[in] ts Transport Session + * @throw FDS_exception in case of failure + */ +void +Reader::session_open(struct ipx_session *ts) +{ + struct ipx_msg_session *msg; + + // Notify plugins further in the pipeline about the new session + msg = ipx_msg_session_create(ts, IPX_MSG_SESSION_OPEN); + if (!msg) { + throw FDS_exception("Failed to create a Transport Session notification"); + } + + if (ipx_ctx_msg_pass(m_ctx, ipx_msg_session2base(msg)) != IPX_OK) { + ipx_msg_session_destroy(msg); + throw FDS_exception("Failed to pass a Transport Session notification"); + } +} + +/** + * @brief Notify other plugins about a close of a Transport Session + * + * @warning + * User MUST stop using the Session as it is send in a garbage message to the + * pipeline and it will be automatically freed later. + * @param[in] ts Transport Session + * @throw FDS_exception in case of failure + */ +void +Reader::session_close(struct ipx_session *ts) +{ + ipx_msg_session_t *msg_session; + ipx_msg_garbage_t *msg_garbage; + ipx_msg_garbage_cb garbage_cb = (ipx_msg_garbage_cb) &ipx_session_destroy; + + msg_session = ipx_msg_session_create(ts, IPX_MSG_SESSION_CLOSE); + if (!msg_session) { + throw FDS_exception("Failed to create a Transport Session notification"); + } + + if (ipx_ctx_msg_pass(m_ctx, ipx_msg_session2base(msg_session)) != IPX_OK) { + ipx_msg_session_destroy(msg_session); + throw FDS_exception("Failed to pass a Transport Session notification"); + } + + msg_garbage = ipx_msg_garbage_create(ts, garbage_cb); + if (!msg_garbage) { + /* Memory leak... We cannot destroy the session as it can be used + * by other plugins further in the pipeline. */ + throw FDS_exception("Failed to create a garbage message with a Transport Session"); + } + + if (ipx_ctx_msg_pass(m_ctx, ipx_msg_garbage2base(msg_garbage)) != IPX_OK) { + /* Memory leak... We cannot destroy the message as it also destroys + * the session structure. */ + throw FDS_exception("Failed to pass a garbage message with a Transport Session"); + } +} + +/// Auxiliary data for snapshot interator callback +struct tmplt_cb_data { + std::vector msg_vec; ///< Vector of generated IPFIX Messages + uint16_t msg_size; ///< Allocation size of IPFIX Messages + uint32_t odid; ///< Observation Domain ID + uint32_t exp_time; ///< Export Time + uint32_t seq_num; ///< Sequence number + + bool is_ok; ///< Failure indicator +}; + +/** + * Snapshot iterator callback + * + * The purpose of this function is to add an (Options) Template to the current + * IPFIX Message or create a new one, if doesn't exist or it is full. + * + * @note + * The callback assumes that there is already at least one IPFIX builder prepared! + * @param[in] tmplt Template to add + * @param[in] data Callback data + * @return True on success + * @return False in case of a fatal failure + */ +static bool +tmplt_cb_func(const struct fds_template *tmplt, void *data) noexcept +{ + auto *cb_data = reinterpret_cast(data); + + // Callback MUST NOT throw an exception! + try { + // Try to insert (Options) Template to the current IPFIX Message + Builder *msg_ptr = &cb_data->msg_vec.back(); + if (msg_ptr->add_template(tmplt)) { + return true; // Success + } + + // Create a new IPFIX Message (the previous one is full) + cb_data->msg_vec.emplace_back(cb_data->msg_size); + msg_ptr = &cb_data->msg_vec.back(); + if (msg_ptr->add_template(tmplt)) { + return true; // Success + } + + // The (Options) Template doesn't fit into an empty IPFIX Message + msg_ptr->resize(UINT16_MAX); + if (msg_ptr->add_template(tmplt)) { + return true; // Success + } + + // This is really bad + cb_data->is_ok = false; + return false; + } catch (...) { + cb_data->is_ok = false; + return false; + } +} + +/** + * Generate and send one or more IPFIX Messages with all (Options) Templates + * + * Extract all (Options) Templates from a Template Snapshot, generate IPFIX Messages + * and sent the to the pipeline. + * @param[in] ts Transport Session + * @param[in] tsnap Template snapshot + * @param[in] odid Observation Domain ID (of the IPFIX Message(s)) + * @param[in] exp_time Export Time (of the IPFIX Message(s)) + * @param[in] seq_num Sequence Number (of the IPFIX Message(s)) + */ +void +Reader::send_templates(const struct ipx_session *ts, const fds_tsnapshot_t *tsnap, + uint32_t odid, uint32_t exp_time, uint32_t seq_num) +{ + // Prepare data for an iterator callback + struct tmplt_cb_data cb_data; + cb_data.msg_size = m_cfg->msize; + cb_data.odid = odid; + cb_data.exp_time = exp_time; + cb_data.seq_num = seq_num; + cb_data.is_ok = true; + + // Create an emptry IPFIX Message builder + cb_data.msg_vec.emplace_back(cb_data.msg_size); + + if (ts->type != FDS_SESSION_UDP) { + // Withdraw all (Options) Templates first + cb_data.msg_vec.back().add_withdrawals(); + } + + // Generate one or more IPFIX Messages with (Options) Templates + fds_tsnapshot_for(tsnap, &tmplt_cb_func, &cb_data); + if (!cb_data.is_ok) { + throw FDS_exception("Failed to generate IPFIX Message(s) with Templates!"); + } + + for (auto &msg : cb_data.msg_vec) { + // Update IPFIX Message header + msg.set_etime(exp_time); + msg.set_odid(odid); + msg.set_seqnum(seq_num); + + // Send it + send_ipfix(msg.release(), ts, odid); + } +} + +/** + * Send an IPFIX Message to the pipeline + * + * @note + * The function takes responsibility for the Message. Therefore, in case of + * failure, the Message will be freed. + * @param[in] msg Raw IPFIX Message to send + * @param[in] ts Transport Session + * @param[in] odid Observation Domain ID (of the message) + * @throw FDS_exception in case of failure + */ +void +Reader::send_ipfix(uint8_t *msg, const struct ipx_session *ts, uint32_t odid) +{ + uint16_t msg_size = ntohs(reinterpret_cast(msg)->length); + ipx_msg_ipfix_t *msg_ptr; + struct ipx_msg_ctx msg_ctx; + + msg_ctx.session = ts; + msg_ctx.odid = odid; + msg_ctx.stream = 0; // stream is not stored in the file + + msg_ptr = ipx_msg_ipfix_create(m_ctx, &msg_ctx, msg, msg_size); + if (!msg_ptr) { + free(msg); + throw FDS_exception("Failed to allocate an IPFIX Message!"); + } + + // Send it to the pipeline + if (ipx_ctx_msg_pass(m_ctx, ipx_msg_ipfix2base(msg_ptr)) != IPX_OK) { + ipx_msg_ipfix_destroy(msg_ptr); + throw FDS_exception("Failed to pass an IPFIX Message!"); + } +} + +/** + * @brief Get the next Data Record to process + * + * @warning + * The function will return still the same record until @p m_unproc is set to false! + * @param[out] rec Pointer to the Data Record + * @param[out] ctx Pointer to the context of the Data Record (ODID, Export Time, etc.) + * @return #IPX_OK on success + * @return #IPX_ERR_EOF if there are no more records in the file + * @throw FDS_exception in case of failure + */ +int +Reader::record_get(const struct fds_drec **rec, const struct fds_file_read_ctx **ctx) +{ + int ret; + + if (m_unproc) { + // Return previously unprocessed Data Record + *rec = &m_unproc_data; + *ctx = &m_unproc_ctx; + return IPX_OK; + } + + ret = fds_file_read_rec(m_file.get(), &m_unproc_data, &m_unproc_ctx); + switch (ret) { + case FDS_OK: // Success + break; + case FDS_EOC: // End of file + return IPX_ERR_EOF; + default: + throw FDS_exception("fds_file_read_rec() failed: " + + std::string(fds_file_error(m_file.get()))); + } + + *rec = &m_unproc_data; + *ctx = &m_unproc_ctx; + m_unproc = true; + return IPX_OK; +} + +int +Reader::send_batch() +{ + Builder new_msg(m_cfg->msize); + fds_file_sid_t msg_sid; + uint32_t msg_odid; + uint32_t msg_etime; + uint32_t msg_seqnum; + uint16_t rec_cnt = 0; + + struct Session *ptr_session = nullptr; + struct ODID *ptr_odid = nullptr; + + const struct fds_drec *drec; + const struct fds_file_read_ctx *dctx; + + // Get the first Data Record + switch (record_get(&drec, &dctx)) { + case IPX_OK: + break; + case IPX_ERR_EOF: + return IPX_ERR_EOF; + default: + throw FDS_exception("[internal] record_get() returned unexpected value!"); + } + + // Prepare contextual information for the IPFIX Message to generate + msg_sid = dctx->sid; + msg_odid = dctx->odid; + msg_etime = dctx->exp_time; + + ptr_session = &m_sessions[msg_sid]; + ptr_odid = &ptr_session->odids[msg_odid]; + msg_seqnum = ptr_odid->seq_num; + + // Make sure that Session is already defined and Templates are ok... + if (!ptr_session->info) { + ptr_session->info = session_from_sid(msg_sid); + IPX_CTX_DEBUG(m_ctx, "New TS '%s' detected!", ptr_session->info->ident); + session_open(ptr_session->info); + } + + if (ptr_odid->tsnap != drec->snap) { + IPX_CTX_DEBUG(m_ctx, "Sending all (Options) Templates of '%s:%" PRIu32 "'", + ptr_session->info->ident, msg_odid); + send_templates(ptr_session->info, drec->snap, msg_odid, msg_etime, msg_seqnum); + ptr_odid->tsnap = drec->snap; + } + + // Try to insert the first Data Record to the IPFIX Message + if (!new_msg.add_record(drec)) { + // The Data Record doesn't fit into an empty IPFIX Message! + new_msg.resize(UINT16_MAX); + + // Try again + if (!new_msg.add_record(drec)) { + throw FDS_exception("[internal] Failed to insert a Data Record into an IPFIX Message!"); + } + } + + // Consider the Data Record as successfully processed! + m_unproc = false; + rec_cnt += 1; + + /* Since we know that FDS File stores all Data Records in blocks, where each block + * contains records from the same context (i.e. Transport Session and ODID) and + * the records are described by the same templates, we can assume that consecutive + * read operations will be most likely return Data Records which share the same features + * and, therefore, they can be added to the same IPFIX Message. */ + while (true) { + int ret = record_get(&drec, &dctx); + if (ret != IPX_OK) { + // Probably end of file... + break; + } + + if (msg_sid != dctx->sid + || msg_odid != dctx->odid + || msg_etime != dctx->exp_time // due to relative timestamps in Data Record + || drec->snap != ptr_odid->tsnap) { + // The Data Record doesn't belong to this IPFIX Message... flush it! + break; + } + + if (!new_msg.add_record(drec)) { + // The IPFIX Message is full + break; + } + + m_unproc = false; + rec_cnt++; + } + + // Update IPFIX Message header and send it! + new_msg.set_etime(msg_etime); + new_msg.set_odid(msg_odid); + new_msg.set_seqnum(msg_seqnum); + ptr_odid->seq_num += rec_cnt; + + send_ipfix(new_msg.release(), ptr_session->info, msg_odid); + IPX_CTX_DEBUG(m_ctx, "New IPFIX Message with %" PRIu16 " records from '%s:%" PRIu32 "' sent!", + rec_cnt, ptr_session->info->ident, msg_odid); + return IPX_OK; +} diff --git a/src/plugins/input/fds/Reader.hpp b/src/plugins/input/fds/Reader.hpp new file mode 100644 index 00000000..4b42a1b2 --- /dev/null +++ b/src/plugins/input/fds/Reader.hpp @@ -0,0 +1,149 @@ +/** + * \file src/plugins/input/fds/Reader.hpp + * \author Lukas Hutak + * \brief FDS file reader + * \date May 2020 + */ + +/* Copyright (C) 2020 CESNET, z.s.p.o. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * ALTERNATIVELY, provided that this notice is retained in full, this + * product may be distributed under the terms of the GNU General Public + * License (GPL) version 2 or later, in which case the provisions + * of the GPL apply INSTEAD OF those given above. + * + * This software is provided ``as is'', and any express or implied + * warranties, including, but not limited to, the implied warranties of + * merchantability and fitness for a particular purpose are disclaimed. + * In no event shall the company or contributors be liable for any + * direct, indirect, incidental, special, exemplary, or consequential + * damages (including, but not limited to, procurement of substitute + * goods or services; loss of use, data, or profits; or business + * interruption) however caused and on any theory of liability, whether + * in contract, strict liability, or tort (including negligence or + * otherwise) arising in any way out of the use of this software, even + * if advised of the possibility of such damage. + * + */ + +#ifndef FDS_READER_HPP +#define FDS_READER_HPP + +#include +#include +#include +#include +#include + +#include "config.h" + +/// Observation Domain ID (contextual information) +struct ODID { + /// Sequence number of the next IPFIX Message + uint32_t seq_num; + /// Template Snapshot of the last record (only for detection of template changes) + const fds_tsnapshot_t *tsnap; + + // Default constructor + ODID() : seq_num(0), tsnap(nullptr) {}; +}; + +/// Transport Session (contextual information) +struct Session { + /// Transport Session identification + struct ipx_session *info; + /// Context for each Observation Domain ID (key: ODID) + std::map odids; + + // Default constructor + Session() : info(nullptr) {} +}; + +/// FDS File reader +class Reader { +public: + /** + * @brief Instance constructor + * + * Open a FDS File and initialize the reader + * @param[in] ctx Plugin context (for log and message passing) + * @param[in] cfg Parsed plugin configuration + * @param[in] path File to read + * @throw FDS_exception in case of failure (e.g. invalid file) + */ + Reader(ipx_ctx_t *ctx, const fds_config *cfg, const char *path); + /** + * @brief Instance destructor + * @note Close the file and send "close" notifications of all Transport Sessions + */ + ~Reader(); + + /** + * @brief Generate and send one IPFIX Message with Data Records from the file + * + * The IPFIX Message will contain Data Records that comes from the same + * Transport Session and shares the same features (i.e. ODID and Export Time). + * The generate Message will contain one or more Data Records. + * + * If the reader detects a new Transport Session, it will automatically create + * its particular instances and send an "open" notification to the processing + * pipeline. + * + * If one or more (Options) Templates must be defined, the function will also + * generate and pass one or more IPFIX Messages with the (Options) Templates. + * + * @return #IPX_OK on success + * @return #IPX_ERR_EOF if the are not more records in the current file + * @throw FDS_exception in case of a failure. + */ + int + send_batch(); + +private: + /// Plugin context (log and passing messages) + ipx_ctx_t *m_ctx; + /// Plugin configuration + const fds_config *m_cfg; + /// File handler (of the file current file) + std::unique_ptr m_file = {nullptr, &fds_file_close}; + /// Transport Sessions (from the current file) + std::map m_sessions; + + /// Signalization of an unprocessed Data Record + bool m_unproc = false; + /// Content of the unprocessed Data Record + struct fds_drec m_unproc_data; + /// Context of the unprocessed Data Record + struct fds_file_read_ctx m_unproc_ctx; + + struct ipx_session * + session_from_sid(fds_file_sid_t sid); + void + session_open(struct ipx_session *ts); + void + session_close(struct ipx_session *ts); + + void + send_templates(const struct ipx_session *ts, const fds_tsnapshot_t *tsnap, + uint32_t odid, uint32_t exp_time, uint32_t seq_num); + void + send_ipfix(uint8_t *msg, const struct ipx_session *ts, uint32_t odid); + + int + record_get(const struct fds_drec **rec, const struct fds_file_read_ctx **ctx); +}; + +#endif // FDS_READER_HPP \ No newline at end of file diff --git a/src/plugins/input/fds/config.c b/src/plugins/input/fds/config.c new file mode 100644 index 00000000..559e4d79 --- /dev/null +++ b/src/plugins/input/fds/config.c @@ -0,0 +1,188 @@ +/** + * \file src/plugins/input/fds/config.c + * \author Lukas Hutak + * \brief Configuration parser of FDS input plugin (source file) + * \date 2020 + */ + +/* Copyright (C) 2020 CESNET, z.s.p.o. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * ALTERNATIVELY, provided that this notice is retained in full, this + * product may be distributed under the terms of the GNU General Public + * License (GPL) version 2 or later, in which case the provisions + * of the GPL apply INSTEAD OF those given above. + * + * This software is provided ``as is'', and any express or implied + * warranties, including, but not limited to, the implied warranties of + * merchantability and fitness for a particular purpose are disclaimed. + * In no event shall the company or contributors be liable for any + * direct, indirect, incidental, special, exemplary, or consequential + * damages (including, but not limited to, procurement of substitute + * goods or services; loss of use, data, or profits; or business + * interruption) however caused and on any theory of liability, whether + * in contract, strict liability, or tort (including negligence or + * otherwise) arising in any way out of the use of this software, even + * if advised of the possibility of such damage. + * + */ + +#include +#include +#include +#include "config.h" + +/* + * + * ... // required, exactly once + * ... // optional + * + */ + +/** Default message size */ +#define MSG_SIZE_DEF (32768U) +/** Minimal message size */ +#define MSG_SIZE_MIN (512U) + +/** XML nodes */ +enum params_xml_nodes { + NODE_PATH = 1, + NODE_MSIZE, + NODE_ASYNCIO +}; + +/** Definition of the \ node */ +static const struct fds_xml_args args_params[] = { + FDS_OPTS_ROOT("params"), + FDS_OPTS_ELEM(NODE_PATH, "path", FDS_OPTS_T_STRING, 0), + FDS_OPTS_ELEM(NODE_MSIZE, "msgSize", FDS_OPTS_T_UINT, FDS_OPTS_P_OPT), + FDS_OPTS_ELEM(NODE_ASYNCIO, "asyncIO", FDS_OPTS_T_BOOL, FDS_OPTS_P_OPT), + FDS_OPTS_END +}; + +/** + * \brief Process \ node + * \param[in] ctx Plugin context + * \param[in] root XML context to process + * \param[in] cfg Parsed configuration + * \return #IPX_OK on success + * \return #IPX_ERR_FORMAT in case of failure + */ +static int +config_parser_root(ipx_ctx_t *ctx, fds_xml_ctx_t *root, struct fds_config *cfg) +{ + const struct fds_xml_cont *content; + while (fds_xml_next(root, &content) != FDS_EOC) { + switch (content->id) { + case NODE_PATH: + // File(s) path + assert(content->type == FDS_OPTS_T_STRING); + cfg->path = strdup(content->ptr_string); + break; + case NODE_MSIZE: + assert(content->type == FDS_OPTS_T_UINT); + if (content->val_uint > UINT16_MAX) { + IPX_CTX_ERROR(ctx, "Message size must be at most %u bytes!", + (unsigned int) UINT16_MAX); + return IPX_ERR_FORMAT; + } else if (content->val_uint < MSG_SIZE_MIN) { + IPX_CTX_ERROR(ctx, "Message size must be at least %u bytes!", + (unsigned int) MSG_SIZE_MIN); + return IPX_ERR_FORMAT; + } + cfg->msize = (uint16_t) content->val_uint; + break; + case NODE_ASYNCIO: + assert(content->type == FDS_OPTS_T_BOOL); + cfg->async = content->val_bool; + break; + default: + // Internal error + assert(false); + } + } + + if (!cfg->path) { + IPX_CTX_ERROR(ctx, "Memory allocation error (%s:%d)", __FILE__, __LINE__); + return IPX_ERR_FORMAT; + } + + return IPX_OK; +} + +/** + * \brief Set default parameters of the configuration + * \param[in] cfg Configuration + */ +static void +config_default_set(struct fds_config *cfg) +{ + cfg->path = NULL; + cfg->msize = MSG_SIZE_DEF; + cfg->async = true; +} + +struct fds_config * +config_parse(ipx_ctx_t *ctx, const char *params) +{ + struct fds_config *cfg = calloc(1, sizeof(*cfg)); + if (!cfg) { + IPX_CTX_ERROR(ctx, "Memory allocation error (%s:%d)", __FILE__, __LINE__); + return NULL; + } + + // Set default parameters + config_default_set(cfg); + + // Create an XML parser + fds_xml_t *parser = fds_xml_create(); + if (!parser) { + IPX_CTX_ERROR(ctx, "Memory allocation error (%s:%d)", __FILE__, __LINE__); + config_destroy(cfg); + return NULL; + } + + if (fds_xml_set_args(parser, args_params) != IPX_OK) { + IPX_CTX_ERROR(ctx, "Failed to parse the description of an XML document!", '\0'); + fds_xml_destroy(parser); + config_destroy(cfg); + return NULL; + } + + fds_xml_ctx_t *params_ctx = fds_xml_parse_mem(parser, params, true); + if (params_ctx == NULL) { + IPX_CTX_ERROR(ctx, "Failed to parse the configuration: %s", fds_xml_last_err(parser)); + fds_xml_destroy(parser); + config_destroy(cfg); + return NULL; + } + + // Parse parameters + int rc = config_parser_root(ctx, params_ctx, cfg); + fds_xml_destroy(parser); + if (rc != IPX_OK) { + config_destroy(cfg); + return NULL; + } + + return cfg; +} + +void +config_destroy(struct fds_config *cfg) +{ + free(cfg->path); + free(cfg); +} diff --git a/src/plugins/input/fds/config.h b/src/plugins/input/fds/config.h new file mode 100644 index 00000000..39ecb961 --- /dev/null +++ b/src/plugins/input/fds/config.h @@ -0,0 +1,83 @@ +/** + * \file src/plugins/input/fds/config.c + * \author Lukas Hutak + * \brief Configuration parser of FDS input plugin (source file) + * \date 2020 + */ + +/* Copyright (C) 2020 CESNET, z.s.p.o. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * ALTERNATIVELY, provided that this notice is retained in full, this + * product may be distributed under the terms of the GNU General Public + * License (GPL) version 2 or later, in which case the provisions + * of the GPL apply INSTEAD OF those given above. + * + * This software is provided ``as is'', and any express or implied + * warranties, including, but not limited to, the implied warranties of + * merchantability and fitness for a particular purpose are disclaimed. + * In no event shall the company or contributors be liable for any + * direct, indirect, incidental, special, exemplary, or consequential + * damages (including, but not limited to, procurement of substitute + * goods or services; loss of use, data, or profits; or business + * interruption) however caused and on any theory of liability, whether + * in contract, strict liability, or tort (including negligence or + * otherwise) arising in any way out of the use of this software, even + * if advised of the possibility of such damage. + * + */ + +#ifndef FDS_CONFIG_H +#define FDS_CONFIG_H + +#include +#include "stdint.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/** Configuration of a instance of the IPFIX plugin */ +struct fds_config { + /** File pattern */ + char *path; + /** Size of IPFIX Messages to generate */ + uint16_t msize; + /** Enable asynchronous I/O */ + bool async; +}; + +/** + * @brief Parse configuration of the plugin + * @param[in] ctx Instance context + * @param[in] params XML parameters + * @return Pointer to the parse configuration of the instance on success + * @return NULL if arguments are not valid or if a memory allocation error has occurred + */ +struct fds_config * +config_parse(ipx_ctx_t *ctx, const char *params); + +/** + * @brief Destroy parsed configuration + * @param[in] cfg Parsed configuration + */ +void +config_destroy(struct fds_config *cfg); + +#ifdef __cplusplus +} +#endif + +#endif // FDS_CONFIG_H diff --git a/src/plugins/input/fds/doc/ipfixcol2-fds-input.7.rst b/src/plugins/input/fds/doc/ipfixcol2-fds-input.7.rst new file mode 100644 index 00000000..dd6b8a9a --- /dev/null +++ b/src/plugins/input/fds/doc/ipfixcol2-fds-input.7.rst @@ -0,0 +1,20 @@ +===================== + ipfixcol2-fds-input +===================== + +----------------------- +FDS File (input plugin) +----------------------- + +:Author: Lukas Hutak (lukas.hutak@cesnet.cz) +:Date: 2020-05-04 +:Copyright: Copyright © 2020 CESNET, z.s.p.o. +:Version: 2.0 +:Manual section: 7 +:Manual group: IPFIXcol collector + +Description +----------- + +.. include:: ../README.rst + :start-line: 3 diff --git a/src/plugins/input/fds/fds.cpp b/src/plugins/input/fds/fds.cpp new file mode 100644 index 00000000..f1ca0c8f --- /dev/null +++ b/src/plugins/input/fds/fds.cpp @@ -0,0 +1,300 @@ +/** + * \file src/plugins/input/fds/fds.cpp + * \author Lukas Hutak + * \brief IPFIX File input plugin for IPFIXcol + * \date 2020 + */ + +/* Copyright (C) 2020 CESNET, z.s.p.o. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * ALTERNATIVELY, provided that this notice is retained in full, this + * product may be distributed under the terms of the GNU General Public + * License (GPL) version 2 or later, in which case the provisions + * of the GPL apply INSTEAD OF those given above. + * + * This software is provided ``as is'', and any express or implied + * warranties, including, but not limited to, the implied warranties of + * merchantability and fitness for a particular purpose are disclaimed. + * In no event shall the company or contributors be liable for any + * direct, indirect, incidental, special, exemplary, or consequential + * damages (including, but not limited to, procurement of substitute + * goods or services; loss of use, data, or profits; or business + * interruption) however caused and on any theory of liability, whether + * in contract, strict liability, or tort (including negligence or + * otherwise) arising in any way out of the use of this software, even + * if advised of the possibility of such damage. + * + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "config.h" +#include "Exception.hpp" +#include "Reader.hpp" + +/// Plugin description +IPX_API struct ipx_plugin_info ipx_plugin_info = { + // Plugin identification name + "fds", + // Brief description of plugin + "Input plugin for FDS File format.", + // Plugin type + IPX_PT_INPUT, + // Configuration flags (reserved for future use) + 0, + // Plugin version string (like "1.2.3") + "2.0.0", + // Minimal IPFIXcol version string (like "1.2.3") + "2.2.0" +}; + +/// Plugin instance +struct Instance { + /// Plugin context + ipx_ctx_t *m_ctx; + /// Parsed plugin configuration + std::unique_ptr m_cfg = {nullptr, &config_destroy}; + + /// List of files to read + glob_t m_list; + /// Index of the next file to read + size_t m_next_file = 0; + + // Current file reader + std::unique_ptr m_file = nullptr; +}; + +/** + * @brief Check if path is a directory + * + * @note Since we use GLOB_MARK flag, all directories ends with a slash. + * @param[in] filename Path + * @return True or false + */ +static inline bool +file_is_dir(const char *filename) +{ + size_t len = strlen(filename); + return (filename[len - 1] == '/'); +} + +/** + * @brief Initialize a list of files to read + * + * @param[in] inst Plugin instance + * @param[in] pattern Pattern of files to read + * @throw FDS_exception in case of a failure (or zero file matches) + */ +void +file_list_init(Instance *inst, const char *pattern) +{ + int glob_flags = GLOB_MARK | GLOB_BRACE | GLOB_TILDE_CHECK; + size_t file_cnt; + int ret; + + ret = glob(pattern, glob_flags, NULL, &inst->m_list); + switch (ret) { + case 0: // Success + break; + case GLOB_NOSPACE: + throw FDS_exception("Failed to list files to process due memory allocation error!"); + case GLOB_ABORTED: + throw FDS_exception("Failed to list files to process due read error"); + case GLOB_NOMATCH: + throw FDS_exception("No file matches the given file pattern!"); + default: + throw FDS_exception("glob() failed and returned unexpected value!"); + } + + file_cnt = 0; + for (size_t i = 0; i < inst->m_list.gl_pathc; ++i) { + const char *filename = inst->m_list.gl_pathv[i]; + if (file_is_dir(filename)) { + continue; + } + file_cnt++; + } + + if (!file_cnt) { + globfree(&inst->m_list); + throw FDS_exception("No FDS Files matches the given file pattern!"); + } + + inst->m_next_file = 0; +} + +/** + * @brief Destroy list of files to read + * @param[in] inst Plugin instance + */ +void +file_list_clean(Instance *inst) +{ + globfree(&inst->m_list); +} + +/** + * Open the next file for reading + * + * If any file is already opened, it will be closed and several Session Messages + * (close notification) will be send too. The function will try to open the next file + * in the list and makes sure that it is valid FDS File. Otherwise, it will be skipped + * and another file will be used. + * + * @warning + * As the function sends notification to other plugins further in the pipeline, it + * must have permission to pass messages. Therefore, this function cannot be called + * within ipx_plugin_init(). + * @param[in] inst Plugin instance + * @return #IPX_OK on success + * @return #IPX_ERR_EOF if no more files are available + */ +static int +file_next(Instance *inst) +{ + std::unique_ptr reader_new = nullptr; + const char *file_name = nullptr; + size_t idx_next; + size_t idx_max = inst->m_list.gl_pathc; + + // Close the previous file (if any) + inst->m_file.reset(); + + // Open new file + for (idx_next = inst->m_next_file; idx_next < idx_max; ++idx_next) { + file_name = inst->m_list.gl_pathv[idx_next]; + if (file_is_dir(file_name)) { + continue; + } + + try { + reader_new.reset(new Reader(inst->m_ctx, inst->m_cfg.get(), file_name)); + } catch (const FDS_exception &ex) { + IPX_CTX_ERROR(inst->m_ctx, "%s", ex.what()); + continue; + } + + // Success + break; + } + + inst->m_next_file = idx_next + 1; + if (!reader_new) { + return IPX_ERR_EOF; + } + + IPX_CTX_INFO(inst->m_ctx, "Reading from file '%s'...", file_name); + inst->m_file = std::move(reader_new); + return IPX_OK; +} + +// ------------------------------------------------------------------------- + +int +ipx_plugin_init(ipx_ctx_t *ctx, const char *params) +{ + try { + std::unique_ptr inst(new Instance); + inst->m_ctx = ctx; + inst->m_cfg.reset(config_parse(ctx, params)); + if (!inst->m_cfg) { + throw FDS_exception("Failed to parse the instance configuration!"); + } + file_list_init(inst.get(), inst->m_cfg->path); + // Everything seems OK + ipx_ctx_private_set(ctx, inst.release()); + } catch (const FDS_exception &ex) { + IPX_CTX_ERROR(ctx, "Initialization failed: %s", ex.what()); + return IPX_ERR_DENIED; + } catch (const std::exception &ex) { + IPX_CTX_ERROR(ctx, "Unexpected error has occurred: %s", ex.what()); + return IPX_ERR_DENIED; + } catch (...) { + IPX_CTX_ERROR(ctx, "Unknown error has occurred!", '\0'); + return IPX_ERR_DENIED; + } + + return IPX_OK; +} + +void +ipx_plugin_destroy(ipx_ctx_t *ctx, void *cfg) +{ + try { + auto *inst = reinterpret_cast(cfg); + file_list_clean(inst); + delete inst; + } catch (...) { + IPX_CTX_ERROR(ctx, "Something bad happened during plugin destruction"); + } +} + +int +ipx_plugin_get(ipx_ctx_t *ctx, void *cfg) +{ + // The plugin MUST NOT throw any exception! + try { + auto inst = reinterpret_cast(cfg); + + while (true) { + // Try to send an IPFIX Message with batch of Data Records + int ret = IPX_ERR_EOF; + if (inst->m_file) { + ret = inst->m_file->send_batch(); + } + + switch (ret) { + case IPX_OK: + return IPX_OK; + case IPX_ERR_EOF: + break; + default: + throw FDS_exception("[internal] send_batch() returned unexpected value!"); + } + + // Try to open the next file + ret = file_next(inst); + switch (ret) { + case IPX_OK: + continue; + case IPX_ERR_EOF: + return IPX_ERR_EOF; + default: + throw FDS_exception("[internal] file_next() returned unexpected value!"); + } + } + } catch (const FDS_exception &ex) { + IPX_CTX_ERROR(ctx, "Unable to extract data from a FDS file: %s", ex.what()); + return IPX_ERR_DENIED; + } catch (const std::exception &ex) { + IPX_CTX_ERROR(ctx, "Unexpected error has occurred: %s", ex.what()); + return IPX_ERR_DENIED; + } catch (...) { + IPX_CTX_ERROR(ctx, "Unknown error has occurred!", '\0'); + return IPX_ERR_DENIED; + } + + return IPX_OK; +}