Skip to content

Commit

Permalink
FDS output: improved detection of Template modifications
Browse files Browse the repository at this point in the history
Detection is newly based on detection of snapshot changes. If a template snapshot of a Data Record is different (compared to a previous Data Record from the same source and ODID), all Templates in the new snapshot are compared to Templates already added to the file. New or redefined Templates are added and Templates that not defined in the new snapshot are removed.
  • Loading branch information
Lukas955 committed Jul 3, 2019
1 parent 8002be6 commit 9b23fd8
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 31 deletions.
199 changes: 169 additions & 30 deletions src/plugins/output/fds/src/Storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
* SPDX-License-Identifier: BSD-3-Clause
*/

#include <algorithm>
#include <cinttypes>
#include <cstdio>
#include <cstring>
#include <sys/stat.h>
Expand Down Expand Up @@ -69,9 +71,9 @@ Storage::window_new(time_t ts)
}

if (fds_file_open(m_file.get(), new_file.c_str(), m_flags) != FDS_OK) {
std::string err_msg = fds_file_error(m_file.get());
m_file.reset();
const char *err_msg = "something"; //fds_file_error(m_file.get());
throw FDS_exception("Failed to create file '" + new_file + "': " + err_msg);
throw FDS_exception("Failed to create/append file '" + new_file + "': " + err_msg);
}
}

Expand Down Expand Up @@ -100,51 +102,188 @@ Storage::process_msg(ipx_msg_ipfix_t *msg)

if (fds_file_write_ctx(m_file.get(), file_ctx.id, msg_ctx->odid, exp_time) != FDS_OK) {
const char *err_msg = fds_file_error(m_file.get());
throw FDS_exception("Failed to configure file writer: " + std::string(err_msg));
throw FDS_exception("Failed to configure the writer: " + std::string(err_msg));
}

// Get info about the last seen Template snapshot
struct snap_info &snap_last = file_ctx.odid2snap[msg_ctx->odid];

// For each Data Record in the file
const uint32_t rec_cnt = ipx_msg_ipfix_get_drec_cnt(msg);
for (uint32_t i = 0; i < rec_cnt; ++i) {
ipx_ipfix_record *rec_ptr = ipx_msg_ipfix_get_drec(msg, i);

// Insert the record // TODO: improve me!
const struct fds_template *rec_tmplt = rec_ptr->rec.tmplt;
uint16_t tmplt_id = rec_tmplt->id;
enum fds_template_type t_type;
const uint8_t *t_data;
uint16_t t_size;
// Check if the templates has been changed (detected by change of template snapshots)
if (rec_ptr->rec.snap != snap_last.ptr) {
const char *session_name = msg_ctx->session->ident;
uint32_t session_odid = msg_ctx->odid;
IPX_CTX_DEBUG(m_ctx, "Template snapshot of '%s' [ODID %" PRIu32 "] has been changed. "
"Updating template definitions...", session_name, session_odid);

int rc = fds_file_write_tmplt_get(m_file.get(), tmplt_id, &t_type, &t_data, &t_size);
if (rc != FDS_OK && rc != FDS_ERR_NOTFOUND) {
// Something bad happened
tmplts_update(snap_last, rec_ptr->rec.snap);
}

// Write the Data Record
const uint8_t *rec_data = rec_ptr->rec.data;
uint16_t rec_size = rec_ptr->rec.size;
uint16_t tmplt_id = rec_ptr->rec.tmplt->id;

if (fds_file_write_rec(m_file.get(), tmplt_id, rec_data, rec_size) != FDS_OK) {
const char *err_msg = fds_file_error(m_file.get());
throw FDS_exception("Failed to add a Data Record: " + std::string(err_msg));
}
}
}

/// Auxiliary data structure used in the snapshot iterator
struct tmplt_update_data {
/// Status of template processing
bool is_ok;
/// Plugin context (only for log!)
ipx_ctx_t *ctx;

/// FDS file with specified context
fds_file_t *file;
/// Set of processed Templates in the snapshot
std::set<uint16_t> ids;
};

/**
* @brief Callback function for updating definition of an IPFIX (Options) Template
*
* The function checks if the same Template is already defined in the current context of the file.
* If the Template is not present or it's different, the new Template definition is added to the
* file.
* @param[in] tmplt Template to process
* @param[in] data Auxiliary data structure \ref tmplt_update_data
* @return On success returns true. Otherwise returns false.
*/
static bool
tmplt_update_cb(const struct fds_template *tmplt, void *data)
{
// Template type, raw data and size
enum fds_template_type t_type;
const uint8_t *t_data;
uint16_t t_size;

auto info = reinterpret_cast<tmplt_update_data *>(data);

// No exceptions can be thrown in the C callback!
try {
uint16_t t_id = tmplt->id;
info->ids.emplace(t_id);

// Get definition of the Template specified in the file
int res = fds_file_write_tmplt_get(info->file, t_id, &t_type, &t_data, &t_size);

if (res != FDS_OK && res != FDS_ERR_NOTFOUND) {
// Something bad happened
const char *err_msg = fds_file_error(info->file);
throw FDS_exception("fds_file_write_tmplt_get() failed: " + std::string(err_msg));
}

if (rc == FDS_ERR_NOTFOUND || t_type != rec_tmplt->type || t_size != rec_tmplt->raw.length
|| memcmp(t_data, rec_tmplt->raw.data, rec_tmplt->raw.length) != 0) {
// Template not defined or the template are different
t_type = rec_tmplt->type;
t_data = rec_tmplt->raw.data;
t_size = rec_tmplt->raw.length;

if (fds_file_write_tmplt_add(m_file.get(), t_type, t_data, t_size) != FDS_OK) {
const char *err_msg = fds_file_error(m_file.get());
throw FDS_exception("Failed to add a template: " + std::string(err_msg));
}
// Should we add/redefine the definition of the Template
if (res == FDS_OK
&& tmplt->type == t_type
&& tmplt->raw.length == t_size
&& memcmp(tmplt->raw.data, t_data, t_size) == 0) {
// The same -> nothing to do
return info->is_ok;
}

// FIXME: check subTemplateList & subTemplateMultiList templates
// Add the definition (i.e. templates are different or the template hasn't been defined)
IPX_CTX_DEBUG(info->ctx, "Adding/updating definition of Template ID %" PRIu16, t_id);

// Write the Data record
const uint8_t *rec_data = rec_ptr->rec.data;
const uint16_t rec_size = rec_ptr->rec.size;
if (fds_file_write_rec(m_file.get(), tmplt_id, rec_data, rec_size) != FDS_OK) {
const char *err_msg = fds_file_error(m_file.get());
throw FDS_exception("Failed to add a Data Record: " + std::string(err_msg));
t_type = tmplt->type;
t_data = tmplt->raw.data;
t_size = tmplt->raw.length;

if (fds_file_write_tmplt_add(info->file, t_type, t_data, t_size) != FDS_OK) {
const char *err_msg = fds_file_error(info->file);
throw FDS_exception("fds_file_write_tmplt_add() failed: " + std::string(err_msg));
}

} catch (std::exception &ex) {
// Exceptions
IPX_CTX_ERROR(info->ctx, "Failure during update of Template ID %" PRIu16 ": %s", tmplt->id,
ex.what());
info->is_ok = false;
} catch (...) {
// Other exceptions
IPX_CTX_ERROR(info->ctx, "Unknown exception thrown during template definition update", '\0');
info->is_ok = false;
}

return info->is_ok;
}

/**
* @brief Update Template definitions for the current Transport Session and ODID
*
* The function compares Templates in the \p snap with Template definitions previously defined
* for the currently selected combination of Transport Session and ODID. For each different or
* previously undefined Template, its definition is added or updated. Definitions of Templates
* that were available in the previous snapshot but not available in the new one are removed.
*
* Finally, information (pointer, IDs) in \p info are updated to reflect the performed update.
* @warning
* Template definitions are always unique for a combination of Transport Session and ODID,
* therefore, appropriate file writer context MUST already set using fds_file_writer_ctx().
* Parameters \p info and \p snap MUST also belong the same unique combination.
* @param[in] info Information about the last update of Templates (old snapshot ref. + list of IDs)
* @param[in] snap New Template snapshot with all valid Template definitions
*/
void
Storage::tmplts_update(struct snap_info &info, const fds_tsnapshot_t *snap)
{
assert(info.ptr != snap && "Snapshots should be different");

// Prepare data for the callback function
struct tmplt_update_data data;
data.is_ok = true;
data.ctx = m_ctx;
data.file = m_file.get();
data.ids.clear();

// Update templates
fds_tsnapshot_for(snap, &tmplt_update_cb, &data);

// Check if the update failed
if (!data.is_ok) {
throw FDS_exception("Failed to update Template definitions");
}

// Check if there are any Template IDs that have been removed
std::set<uint16_t> &ids_old = info.tmplt_ids;
std::set<uint16_t> &ids_new = data.ids;
std::set<uint16_t> ids2remove;
// Old Template IDs - New Templates IDs = Template IDs to remove
std::set_difference(ids_old.begin(), ids_old.end(), ids_new.begin(), ids_new.end(),
std::inserter(ids2remove, ids2remove.begin()));

// Remove old templates that are not available in the new snapshot
for (uint16_t tid : ids2remove) {
IPX_CTX_DEBUG(m_ctx, "Removing definition of Template ID %" PRIu16, tid);

int rc = fds_file_write_tmplt_remove(m_file.get(), tid);
if (rc == FDS_OK) {
continue;
}

// Something bad happened
if (rc != FDS_ERR_NOTFOUND) {
std::string err_msg = fds_file_error(m_file.get());
throw FDS_exception("fds_file_write_tmplt_remove() failed: " + err_msg);
}

// Weird, but not critical
IPX_CTX_WARNING(m_ctx, "Failed to remove undefined Template ID %" PRIu16 ". "
"Weird, this should not happen.", tid);
}

// Update information about the last update of Templates
info.ptr = snap;
std::swap(info.tmplt_ids, data.ids);
}

/**
Expand Down
20 changes: 19 additions & 1 deletion src/plugins/output/fds/src/Storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <ipfixcol2.h>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <libfds.h>

Expand Down Expand Up @@ -74,10 +75,25 @@ class Storage {
process_msg(ipx_msg_ipfix_t *msg);

private:
/// Information about Templates in a snapshot
struct snap_info {
/// Last seen snapshot (might be already freed, do NOT dereference!)
const fds_tsnapshot_t *ptr;
/// Set of Template IDs in the snapshot
std::set<uint16_t> tmplt_ids;

snap_info() {
ptr = nullptr;
tmplt_ids.clear();
}
};

/// Description parameters of a Transport Session
struct session_ctx {
/// FDS file ID
/// Session ID used in the FDS file
fds_file_sid_t id;
/// Last seen snapshot for a specific ODID of the Transport Session
std::map<uint32_t, struct snap_info> odid2snap;
};

/// Plugin context only for logging!
Expand All @@ -100,6 +116,8 @@ class Storage {
session_get(const struct ipx_session *sptr);
void
session_ipx2fds(const struct ipx_session *ipx_desc, struct fds_file_session *fds_desc);
void
tmplts_update(struct snap_info &info, const fds_tsnapshot_t *snap);
};


Expand Down

0 comments on commit 9b23fd8

Please sign in to comment.