diff --git a/src/plugins/input/ipfix/README.rst b/src/plugins/input/ipfix/README.rst
index b077ce13..64ab54de 100644
--- a/src/plugins/input/ipfix/README.rst
+++ b/src/plugins/input/ipfix/README.rst
@@ -1,7 +1,11 @@
-IPFIX file (input plugin)
+IPFIX File (input plugin)
=========================
-TODO
+The plugin reads flow data from one or more files in IPFIX File format. It is possible to
+use it to load flow records previously stored using IPFIX output plugin.
+
+Unlike UDP and TCP input plugins which infinitely waits for data from NetFlow/IPFIX
+exporters, the plugin will terminate the collector after all files are processed.
Example configuration
---------------------
@@ -9,15 +13,18 @@ Example configuration
.. code-block:: xml
- IPFIX input
+ IPFIX Fileipfix
- 4739
-
+ /tmp/flow/file.ipfix
Parameters
----------
-TODO
\ No newline at end of file
+:``path``:
+ Path to file(s) in IPFIX File format. It is possible to use asterisk instead of
+ a filename/directory, tilde character (i.e. "~") instead of the home directory of
+ the user, and brace expressions (i.e. "/tmp/{source1,source2}/file.ipfix").
+ Directories and non-IPFIX Files that match the file pattern are skipped/ignored.
\ No newline at end of file
diff --git a/src/plugins/input/ipfix/config.h b/src/plugins/input/ipfix/config.h
index 1498353b..a6d5f6c5 100644
--- a/src/plugins/input/ipfix/config.h
+++ b/src/plugins/input/ipfix/config.h
@@ -48,8 +48,8 @@
/** Configuration of a instance of the IPFIX plugin */
struct ipfix_config {
- /** Local port */
- const char *path;
+ /** File pattern */
+ char *path;
};
/**
diff --git a/src/plugins/input/ipfix/ipfix.c b/src/plugins/input/ipfix/ipfix.c
index 888ee4d7..aaa8d1e9 100644
--- a/src/plugins/input/ipfix/ipfix.c
+++ b/src/plugins/input/ipfix/ipfix.c
@@ -39,6 +39,7 @@
*
*/
+#include
#include
#include
#include
@@ -53,7 +54,7 @@ IPX_API struct ipx_plugin_info ipx_plugin_info = {
// Plugin identification name
.name = "ipfix",
// Brief description of plugin
- .dsc = "Input plugin for IPFIX File.",
+ .dsc = "Input plugin for IPFIX File format",
// Configuration flags (reserved for future use)
.flags = 0,
// Plugin version string (like "1.2.3")
@@ -64,6 +65,8 @@ IPX_API struct ipx_plugin_info ipx_plugin_info = {
/// Plugin instance data
struct plugin_data {
+ /// Plugin context (log only!)
+ ipx_ctx_t *ctx;
/// Parsed plugin configuration
struct ipfix_config *cfg;
@@ -72,12 +75,21 @@ struct plugin_data {
/// Index of the next file to read (see file_list->gl_pathv)
size_t file_next_idx;
- /// Currently opened file
+ /// Handler of the currently file
FILE *current_file;
+ /// Name/path of the current file
+ const char *current_name;
/// Transport Session identification of the
struct ipx_session *ts;
};
+/**
+ * @brief Check if path is a directory
+ *
+ * @note Since we use GLOB_MARK flag, all directories ends with a slash.
+ * @param[in] filename Path
+ * @return True or false
+ */
static inline bool
filename_is_dir(const char *filename)
{
@@ -85,12 +97,28 @@ filename_is_dir(const char *filename)
return (filename[len - 1] == '/');
}
-static inline int
+/**
+ * @brief Free list of files to read
+ *
+ * @param[in] list List to free
+ */
+static inline void
files_list_free(glob_t *list)
{
globfree(list);
}
+/**
+ * @brief Get list of files to read
+ *
+ * @param[in] ctx Plugin context (log only)
+ * @param[in] pattern File pattern
+ * @param[out] list List of files
+ * @return #IPX_OK on success
+ * @return #IPX_ERR_NOTFOUND if no files matches the given pattern
+ * @return #IPX_ERR_DENIED if the list cannot be obtained due to a read error
+ * @return #IPX_ERR_NOMEM in case of a memory allocation error
+ */
static inline int
files_list_get(ipx_ctx_t *ctx, const char *pattern, glob_t *list)
{
@@ -108,7 +136,7 @@ files_list_get(ipx_ctx_t *ctx, const char *pattern, glob_t *list)
IPX_CTX_ERROR(ctx, "Failed to list files to process due read error", '\0');
return IPX_ERR_DENIED;
case GLOB_NOMATCH:
- IPX_CTX_ERROR(ctx, "No files matches the given file pattern!", '\0');
+ IPX_CTX_ERROR(ctx, "No file matches the given file pattern!", '\0');
return IPX_ERR_NOTFOUND;
default:
IPX_CTX_ERROR(ctx, "glob() failed and returned unexpected value!", '\0');
@@ -136,6 +164,17 @@ files_list_get(ipx_ctx_t *ctx, const char *pattern, glob_t *list)
return IPX_OK;
}
+/**
+ * @brief Create a new transport session and send "open" notification
+ *
+ * @warning
+ * As the function sends notification to other plugins further in the pipeline, it must have
+ * permission to pass messages. Therefore, this function cannot be called within
+ * ipx_plugin_init().
+ * @param[in] ctx Plugin context (for sending notification and log)
+ * @param[in] filename New file which corresponds to the new Transport Session
+ * @return New transport session
+ */
struct ipx_session *
session_open(ipx_ctx_t *ctx, const char *filename)
{
@@ -164,6 +203,18 @@ session_open(ipx_ctx_t *ctx, const char *filename)
return res;
}
+/**
+ * @brief Close a transport session and send "close" notification
+ *
+ * User MUST stop using the session as it is send in a garbage message to the pipeline and
+ * it will be automatically freed.
+ * @warning
+ * As the function sends notification to other plugins further in the pipeline, it must have
+ * permission to pass messages. Therefore, this function cannot be called within
+ * ipx_plugin_init().
+ * @param[in] ctx Plugin context (for sending notification and log)
+ * @param[in] session Transport Session to close
+ */
void
session_close(ipx_ctx_t *ctx, struct ipx_session *session)
{
@@ -190,7 +241,7 @@ session_close(ipx_ctx_t *ctx, struct ipx_session *session)
msg_garbage = ipx_msg_garbage_create(session, garbage_cb);
if (!msg_garbage) {
- /* Memory leak... We cannot destroy the session as it can used by
+ /* Memory leak... We cannot destroy the session as it can be used
* by other plugins further in the pipeline.
*/
IPX_CTX_ERROR(ctx, "Failed to create a garbage message with a Transport Session", '\0');
@@ -206,8 +257,172 @@ session_close(ipx_ctx_t *ctx, struct ipx_session *session)
}
}
-// -------------------------------------------------------------------------------------------------
+/**
+ * @brief Open the next file for reading
+ *
+ * If any file is already opened, it will be closed and a session message (close notification)
+ * will be send too. The function will try to open the next file in the list and makes sure
+ * that it contains at least one IPFIX Message. Otherwise, it will be skipped and another file
+ * will be used. When a suitable file is found, a new Transport Session will created and
+ * particular session message (open notification) will be sent.
+ *
+ * @warning
+ * As the function sends notification to other plugins further in the pipeline, it must have
+ * permission to pass messages. Therefore, this function cannot be called within
+ * ipx_plugin_init().
+ * @param[in] data Plugin data
+ * @return #IPX_OK on success
+ * @return #IPX_ERR_EOF if no more files are available
+ * @return #iPX_ERR_NOMEM in case of a memory allocation error
+ */
+int
+next_file(struct plugin_data *data)
+{
+ size_t idx_next;
+ size_t idx_max = data->file_list.gl_pathc;
+ FILE *file_new = NULL;
+ const char *name_new = NULL;
+
+ // Signalize close of the current Transport Session
+ session_close(data->ctx, data->ts);
+ data->ts = NULL;
+ if (data->current_file) {
+ fclose(data->current_file);
+ data->current_file = NULL;
+ data->current_name = NULL;
+ }
+
+ // Open new file
+ for (idx_next = data->file_next_idx; idx_next < idx_max; ++idx_next) {
+ name_new = data->file_list.gl_pathv[idx_next];
+ if (filename_is_dir(name_new)) {
+ continue;
+ }
+
+ file_new = fopen(name_new, "rb");
+ if (!file_new) {
+ const char *err_str;
+ ipx_strerror(errno, err_str);
+ IPX_CTX_ERROR(data->ctx, "Failed to open '%s': %s", name_new, err_str);
+ continue;
+ }
+ struct fds_ipfix_msg_hdr ipfix_hdr;
+ if (fread(&ipfix_hdr, FDS_IPFIX_MSG_HDR_LEN, 1, file_new) != 1
+ || ntohs(ipfix_hdr.version) != FDS_IPFIX_VERSION
+ || ntohs(ipfix_hdr.length) < FDS_IPFIX_MSG_HDR_LEN) {
+ IPX_CTX_ERROR(data->ctx, "Skipping non-IPFIX File '%s'", name_new);
+ fclose(file_new);
+ file_new = NULL;
+ continue;
+ }
+
+ // Success
+ rewind(file_new);
+ break;
+ }
+
+ data->file_next_idx = idx_next + 1;
+ if (!file_new) {
+ return IPX_ERR_EOF;
+ }
+
+ // Signalize open of the new Transport Session
+ data->ts = session_open(data->ctx, name_new);
+ if (!data->ts) {
+ fclose(file_new);
+ return IPX_ERR_NOMEM;
+ }
+
+ IPX_CTX_INFO(data->ctx, "Reading from file '%s'...", name_new);
+ data->current_file = file_new;
+ data->current_name = name_new;
+ return IPX_OK;
+}
+
+/**
+ * @brief Get the next IPFIX Message from currently opened file
+ *
+ * @param[in] data Plugin data
+ * @param[out] msg IPFIX Message extracted from the file
+ * @return #IPX_OK on success
+ * @return #IPX_ERR_FORMAT if the file is malformed
+ * @return #IPX_ERR_NOMEM in case of a memory allocation error
+ */
+int
+next_message(struct plugin_data *data, ipx_msg_ipfix_t **msg)
+{
+ struct fds_ipfix_msg_hdr ipfix_hdr;
+ uint16_t ipfix_size;
+ uint8_t *ipfix_data = NULL;
+
+ struct ipx_msg_ctx ipfix_ctx;
+ ipx_msg_ipfix_t *ipfix_msg;
+ size_t rc;
+
+ if (!data->current_file) {
+ return IPX_ERR_EOF;
+ }
+
+ // Get the IPFIX header
+ rc = fread(&ipfix_hdr, 1, FDS_IPFIX_MSG_HDR_LEN, data->current_file);
+ if (rc != FDS_IPFIX_MSG_HDR_LEN) {
+ // Check if the end of file has been reached
+ if (rc == 0 && feof(data->current_file)) {
+ return IPX_ERR_EOF;
+ }
+
+ IPX_CTX_ERROR(data->ctx, "File '%s' is corrupted (unexpected end of file)!",
+ data->current_name);
+ return IPX_ERR_FORMAT;
+ }
+
+ ipfix_size = ntohs(ipfix_hdr.length);
+ if (ntohs(ipfix_hdr.version) != FDS_IPFIX_VERSION
+ || ipfix_size < FDS_IPFIX_MSG_HDR_LEN) {
+ IPX_CTX_ERROR(data->ctx, "File '%s' is corrupted (unexpected data)!", data->current_name);
+ return IPX_ERR_FORMAT;
+ }
+
+ ipfix_data = malloc(ipfix_size);
+ if (!ipfix_data) {
+ IPX_CTX_ERROR(data->ctx, "Memory allocation failed! (%s:%d)", __FILE__, __LINE__);
+ return IPX_ERR_NOMEM;
+ }
+
+ memcpy(ipfix_data, &ipfix_hdr, FDS_IPFIX_MSG_HDR_LEN);
+
+ // Get the rest of the message body
+ if (ipfix_size > FDS_IPFIX_MSG_HDR_LEN) {
+ uint8_t *data_ptr = ipfix_data + FDS_IPFIX_MSG_HDR_LEN;
+ size_t size_remain = ipfix_size - FDS_IPFIX_MSG_HDR_LEN;
+
+ if (fread(data_ptr, 1, size_remain, data->current_file) != size_remain) {
+ IPX_CTX_ERROR(data->ctx, "File '%s' is corrupted (unexpected end of file)!",
+ data->current_name);
+ free(ipfix_data);
+ return IPX_ERR_FORMAT;
+ }
+ }
+
+ // Wrap the IPFIX Message
+ memset(&ipfix_ctx, 0, sizeof(ipfix_ctx));
+ ipfix_ctx.session = data->ts;
+ ipfix_ctx.odid = ntohl(ipfix_hdr.odid);
+ ipfix_ctx.stream = 0;
+
+ ipfix_msg = ipx_msg_ipfix_create(data->ctx, &ipfix_ctx, ipfix_data, ipfix_size);
+ if (!ipfix_msg) {
+ IPX_CTX_ERROR(data->ctx, "Memory allocation failed! (%s:%d)", __FILE__, __LINE__);
+ free(ipfix_data);
+ return IPX_ERR_NOMEM;
+ }
+
+ *msg = ipfix_msg;
+ return IPX_OK;
+}
+
+// -------------------------------------------------------------------------------------------------
int
ipx_plugin_init(ipx_ctx_t *ctx, const char *params)
@@ -219,6 +434,7 @@ ipx_plugin_init(ipx_ctx_t *ctx, const char *params)
}
// Parse configuration
+ data->ctx = ctx;
data->cfg = config_parse(ctx, params);
if (!data->cfg) {
free(data);
@@ -229,6 +445,7 @@ ipx_plugin_init(ipx_ctx_t *ctx, const char *params)
if (files_list_get(ctx, data->cfg->path, &data->file_list) != IPX_OK) {
config_destroy(data->cfg);
free(data);
+ return IPX_ERR_DENIED;
}
ipx_ctx_private_set(ctx, data);
@@ -256,33 +473,54 @@ int
ipx_plugin_get(ipx_ctx_t *ctx, void *cfg)
{
struct plugin_data *data = (struct plugin_data *) cfg;
+ ipx_msg_ipfix_t *msg2send;
+
+ while (true) {
+ // Get a new message from the currently opened file
+ switch (next_message(data, &msg2send)) {
+ case IPX_OK:
+ ipx_ctx_msg_pass(ctx, ipx_msg_ipfix2base(msg2send));
+ return IPX_OK;
+ case IPX_ERR_EOF:
+ case IPX_ERR_FORMAT:
+ // Open the next file
+ break;
+ default:
+ IPX_CTX_ERROR(ctx, "Fatal error!", '\0');
+ return IPX_ERR_DENIED;
+ }
- // dummy code
- size_t file_idx = data->file_next_idx;
- if (file_idx == data->file_list.gl_pathc) {
- return IPX_ERR_EOF;
- }
-
- const char *file_path = data->file_list.gl_pathv[file_idx];
- IPX_CTX_INFO(ctx, "New session: %s", file_path);
- data->ts = session_open(ctx, file_path);
- if (!data->ts) {
- return IPX_ERR_DENIED;
+ // Open the next file
+ switch (next_file(data)) {
+ case IPX_OK:
+ continue;
+ case IPX_ERR_EOF:
+ // No more data:
+ return IPX_ERR_EOF;
+ default:
+ IPX_CTX_ERROR(ctx, "Fatal error!", '\0');
+ return IPX_ERR_DENIED;
+ }
}
- session_close(ctx, data->ts);
- data->ts = NULL;
- data->file_next_idx++;
-
- return IPX_OK;
}
-/*
void
ipx_plugin_session_close(ipx_ctx_t *ctx, void *cfg, const struct ipx_session *session)
{
- struct tcp_data *data = (struct tcp_data *) cfg;
+ struct plugin_data *data = (struct plugin_data *) cfg;
// Do NOT dereference the session pointer because it can be already freed!
+ if (session != data->ts) {
+ // The session has been already closed
+ return;
+ }
- // TODO: if matches the current file, skip it and open the next one...
-}
-*/
\ No newline at end of file
+ // Close the current session and file
+ session_close(ctx, data->ts);
+ if (data->current_file) {
+ fclose(data->current_file);
+ }
+
+ data->ts = NULL;
+ data->current_file = NULL;
+ data->current_name = NULL;
+}
\ No newline at end of file