diff --git a/src/plugins/input/ipfix/README.rst b/src/plugins/input/ipfix/README.rst
index 64ab54de..fd0e9a43 100644
--- a/src/plugins/input/ipfix/README.rst
+++ b/src/plugins/input/ipfix/README.rst
@@ -27,4 +27,8 @@ Parameters
Path to file(s) in IPFIX File format. It is possible to use asterisk instead of
a filename/directory, tilde character (i.e. "~") instead of the home directory of
the user, and brace expressions (i.e. "/tmp/{source1,source2}/file.ipfix").
- Directories and non-IPFIX Files that match the file pattern are skipped/ignored.
\ No newline at end of file
+ Directories and non-IPFIX Files that match the file pattern are skipped/ignored.
+
+:``bufferSize``:
+ Optional size of the internal buffer to which the content of the file is partly
+ preloaded. [default: 1048576, min: 131072]
diff --git a/src/plugins/input/ipfix/config.c b/src/plugins/input/ipfix/config.c
index 1402ee9e..43f4e589 100644
--- a/src/plugins/input/ipfix/config.c
+++ b/src/plugins/input/ipfix/config.c
@@ -47,18 +47,25 @@
/*
*
* ... // required, exactly once
+ * ... // optional
*
*/
+/** Default buffer size */
+#define BSIZE_DEF (1048576U)
+#define BSIZE_MIN (131072U)
+
/** XML nodes */
enum params_xml_nodes {
- NODE_PATH = 1
+ NODE_PATH = 1,
+ NODE_BSIZE
};
/** Definition of the \ node */
static const struct fds_xml_args args_params[] = {
FDS_OPTS_ROOT("params"),
FDS_OPTS_ELEM(NODE_PATH, "path", FDS_OPTS_T_STRING, 0),
+ FDS_OPTS_ELEM(NODE_BSIZE, "bufferSize", FDS_OPTS_T_UINT, FDS_OPTS_P_OPT),
FDS_OPTS_END
};
@@ -81,6 +88,10 @@ config_parser_root(ipx_ctx_t *ctx, fds_xml_ctx_t *root, struct ipfix_config *cfg
assert(content->type == FDS_OPTS_T_STRING);
cfg->path = strdup(content->ptr_string);
break;
+ case NODE_BSIZE:
+ assert(content->type == FDS_OPTS_T_UINT);
+ cfg->bsize = content->val_uint;
+ break;
default:
// Internal error
assert(false);
@@ -89,6 +100,12 @@ config_parser_root(ipx_ctx_t *ctx, fds_xml_ctx_t *root, struct ipfix_config *cfg
if (!cfg->path) {
IPX_CTX_ERROR(ctx, "Memory allocation error (%s:%d)", __FILE__, __LINE__);
+ return IPX_ERR_FORMAT;
+ }
+
+ if (cfg->bsize < BSIZE_MIN) {
+ IPX_CTX_ERROR(ctx, "Buffer size must be at least %u bytes!" , (unsigned int) BSIZE_MIN);
+ return IPX_ERR_FORMAT;
}
return IPX_OK;
@@ -102,6 +119,7 @@ static void
config_default_set(struct ipfix_config *cfg)
{
cfg->path = NULL;
+ cfg->bsize = BSIZE_DEF;
}
struct ipfix_config *
diff --git a/src/plugins/input/ipfix/config.h b/src/plugins/input/ipfix/config.h
index a6d5f6c5..8b1afa6c 100644
--- a/src/plugins/input/ipfix/config.h
+++ b/src/plugins/input/ipfix/config.h
@@ -50,6 +50,8 @@
struct ipfix_config {
/** File pattern */
char *path;
+ /** Read buffer size */
+ uint64_t bsize;
};
/**
diff --git a/src/plugins/input/ipfix/ipfix.c b/src/plugins/input/ipfix/ipfix.c
index aaa8d1e9..7ae8c1d9 100644
--- a/src/plugins/input/ipfix/ipfix.c
+++ b/src/plugins/input/ipfix/ipfix.c
@@ -79,8 +79,17 @@ struct plugin_data {
FILE *current_file;
/// Name/path of the current file
const char *current_name;
- /// Transport Session identification of the
- struct ipx_session *ts;
+ /// Transport Session identification
+ struct ipx_session *current_ts;
+
+ /// Buffer of preloaded data
+ uint8_t *buffer_data;
+ /// Size of the buffer
+ size_t buffer_size;
+ /// Valid size of the buffer
+ size_t buffer_valid;
+ /// Position of the reader in the buffer
+ size_t buffer_offset;
};
/**
@@ -175,7 +184,7 @@ files_list_get(ipx_ctx_t *ctx, const char *pattern, glob_t *list)
* @param[in] filename New file which corresponds to the new Transport Session
* @return New transport session
*/
-struct ipx_session *
+static struct ipx_session *
session_open(ipx_ctx_t *ctx, const char *filename)
{
struct ipx_session *res;
@@ -215,7 +224,7 @@ session_open(ipx_ctx_t *ctx, const char *filename)
* @param[in] ctx Plugin context (for sending notification and log)
* @param[in] session Transport Session to close
*/
-void
+static void
session_close(ipx_ctx_t *ctx, struct ipx_session *session)
{
ipx_msg_session_t *msg_session;
@@ -275,7 +284,7 @@ session_close(ipx_ctx_t *ctx, struct ipx_session *session)
* @return #IPX_ERR_EOF if no more files are available
* @return #iPX_ERR_NOMEM in case of a memory allocation error
*/
-int
+static int
next_file(struct plugin_data *data)
{
size_t idx_next;
@@ -284,8 +293,8 @@ next_file(struct plugin_data *data)
const char *name_new = NULL;
// Signalize close of the current Transport Session
- session_close(data->ctx, data->ts);
- data->ts = NULL;
+ session_close(data->ctx, data->current_ts);
+ data->current_ts = NULL;
if (data->current_file) {
fclose(data->current_file);
data->current_file = NULL;
@@ -328,8 +337,8 @@ next_file(struct plugin_data *data)
}
// Signalize open of the new Transport Session
- data->ts = session_open(data->ctx, name_new);
- if (!data->ts) {
+ data->current_ts = session_open(data->ctx, name_new);
+ if (!data->current_ts) {
fclose(file_new);
return IPX_ERR_NOMEM;
}
@@ -337,6 +346,72 @@ next_file(struct plugin_data *data)
IPX_CTX_INFO(data->ctx, "Reading from file '%s'...", name_new);
data->current_file = file_new;
data->current_name = name_new;
+
+ data->buffer_valid = 0;
+ data->buffer_offset = 0;
+ return IPX_OK;
+}
+
+/**
+ * @brief Get the next chunk of data
+ *
+ * Reads the chunk from the internal buffer with preloaded content of the file.
+ * If the buffer doesn't contain required amount of data, new content will be
+ * loaded from the file. Nevertheless, if the end-of-file has been reached, return
+ * codes #IPX_ERR_EOF or #IPX_ERR_FORMAT might be returned.
+ *
+ * @param[in] data Plugin data
+ * @param[out] out Output buffer to fill
+ * @param[in] out_size Size of the output buffer (i.e. required amount of data)
+ *
+ * @return #IPX_OK on success
+ * @return #IPX_ERR_EOF if the end-of-file has been reached (no more data)
+ * @return #IPX_ERR_FORMAT if the end-of-file has been reached but the internal
+ * buffer doesn't contain required amount of data
+ */
+static int
+next_chunk(struct plugin_data *data, uint8_t *out, uint16_t out_size)
+{
+ size_t buffer_avail = data->buffer_valid - data->buffer_offset;
+ uint8_t *reader_ptr = &data->buffer_data[data->buffer_offset];
+
+ size_t new_size;
+ uint8_t *new_ptr;
+ size_t ret;
+
+ // Check if the chunk is fully in the buffer
+ if (buffer_avail >= out_size) {
+ memcpy(out, reader_ptr, out_size);
+ data->buffer_offset += out_size;
+ return IPX_OK;
+ }
+
+ // We need to load new data to the buffer
+ if (buffer_avail > 0) {
+ // A fragment of an unprocessed IPFIX Message must be preserved
+ memcpy(data->buffer_data, reader_ptr, buffer_avail);
+ data->buffer_valid = buffer_avail;
+ } else {
+ data->buffer_valid = 0;
+ }
+
+ new_size = data->buffer_size - data->buffer_valid;
+ new_ptr = &data->buffer_data[data->buffer_valid];
+ ret = fread(new_ptr, 1, new_size, data->current_file);
+ data->buffer_valid += ret;
+ data->buffer_offset = 0;
+
+ // Check whether the EOF has been reached
+ if (data->buffer_valid == 0 && feof(data->current_file)) {
+ return IPX_ERR_EOF;
+ }
+
+ if (data->buffer_valid < out_size) {
+ return IPX_ERR_FORMAT;
+ }
+
+ memcpy(out, data->buffer_data, out_size);
+ data->buffer_offset += out_size;
return IPX_OK;
}
@@ -346,10 +421,11 @@ next_file(struct plugin_data *data)
* @param[in] data Plugin data
* @param[out] msg IPFIX Message extracted from the file
* @return #IPX_OK on success
+ * @return #IPX_ERR_EOF if the end-of-file has been reached
* @return #IPX_ERR_FORMAT if the file is malformed
* @return #IPX_ERR_NOMEM in case of a memory allocation error
*/
-int
+static int
next_message(struct plugin_data *data, ipx_msg_ipfix_t **msg)
{
struct fds_ipfix_msg_hdr ipfix_hdr;
@@ -358,23 +434,21 @@ next_message(struct plugin_data *data, ipx_msg_ipfix_t **msg)
struct ipx_msg_ctx ipfix_ctx;
ipx_msg_ipfix_t *ipfix_msg;
- size_t rc;
+ int ret;
if (!data->current_file) {
return IPX_ERR_EOF;
}
- // Get the IPFIX header
- rc = fread(&ipfix_hdr, 1, FDS_IPFIX_MSG_HDR_LEN, data->current_file);
- if (rc != FDS_IPFIX_MSG_HDR_LEN) {
- // Check if the end of file has been reached
- if (rc == 0 && feof(data->current_file)) {
- return IPX_ERR_EOF;
+ // Get the IPFIX Message header
+ ret = next_chunk(data, (uint8_t *) &ipfix_hdr, FDS_IPFIX_MSG_HDR_LEN);
+ if (ret != IPX_OK) {
+ if (ret == IPX_ERR_FORMAT) {
+ IPX_CTX_ERROR(data->ctx, "File '%s' is corrupted (unexpected end of file)!",
+ data->current_name);
}
- IPX_CTX_ERROR(data->ctx, "File '%s' is corrupted (unexpected end of file)!",
- data->current_name);
- return IPX_ERR_FORMAT;
+ return ret;
}
ipfix_size = ntohs(ipfix_hdr.length);
@@ -389,15 +463,14 @@ next_message(struct plugin_data *data, ipx_msg_ipfix_t **msg)
IPX_CTX_ERROR(data->ctx, "Memory allocation failed! (%s:%d)", __FILE__, __LINE__);
return IPX_ERR_NOMEM;
}
-
memcpy(ipfix_data, &ipfix_hdr, FDS_IPFIX_MSG_HDR_LEN);
- // Get the rest of the message body
+ // Get the rest of the IPFIX Message body
if (ipfix_size > FDS_IPFIX_MSG_HDR_LEN) {
uint8_t *data_ptr = ipfix_data + FDS_IPFIX_MSG_HDR_LEN;
- size_t size_remain = ipfix_size - FDS_IPFIX_MSG_HDR_LEN;
+ uint16_t size_remain = ipfix_size - FDS_IPFIX_MSG_HDR_LEN;
- if (fread(data_ptr, 1, size_remain, data->current_file) != size_remain) {
+ if (next_chunk(data, data_ptr, size_remain) != IPX_OK) {
IPX_CTX_ERROR(data->ctx, "File '%s' is corrupted (unexpected end of file)!",
data->current_name);
free(ipfix_data);
@@ -407,7 +480,7 @@ next_message(struct plugin_data *data, ipx_msg_ipfix_t **msg)
// Wrap the IPFIX Message
memset(&ipfix_ctx, 0, sizeof(ipfix_ctx));
- ipfix_ctx.session = data->ts;
+ ipfix_ctx.session = data->current_ts;
ipfix_ctx.odid = ntohl(ipfix_hdr.odid);
ipfix_ctx.stream = 0;
@@ -441,8 +514,18 @@ ipx_plugin_init(ipx_ctx_t *ctx, const char *params)
return IPX_ERR_DENIED;
}
+ // Initialize reader buffer
+ data->buffer_size = data->cfg->bsize;
+ data->buffer_data = malloc(sizeof(uint8_t) * data->buffer_size);
+ if (!data->buffer_data) {
+ IPX_CTX_ERROR(ctx, "Memory allocation failed! (%s:%d)", __FILE__, __LINE__);
+ free(data);
+ return IPX_ERR_DENIED;
+ }
+
// Prepare list of all files to read
if (files_list_get(ctx, data->cfg->path, &data->file_list) != IPX_OK) {
+ free(data->buffer_data);
config_destroy(data->cfg);
free(data);
return IPX_ERR_DENIED;
@@ -458,7 +541,7 @@ ipx_plugin_destroy(ipx_ctx_t *ctx, void *cfg)
struct plugin_data *data = (struct plugin_data *) cfg;
// Close the current session and file
- session_close(ctx, data->ts);
+ session_close(ctx, data->current_ts);
if (data->current_file) {
fclose(data->current_file);
}
@@ -466,6 +549,7 @@ ipx_plugin_destroy(ipx_ctx_t *ctx, void *cfg)
// Final cleanup
files_list_free(&data->file_list);
config_destroy(data->cfg);
+ free(data->buffer_data);
free(data);
}
@@ -509,18 +593,18 @@ ipx_plugin_session_close(ipx_ctx_t *ctx, void *cfg, const struct ipx_session *se
{
struct plugin_data *data = (struct plugin_data *) cfg;
// Do NOT dereference the session pointer because it can be already freed!
- if (session != data->ts) {
+ if (session != data->current_ts) {
// The session has been already closed
return;
}
// Close the current session and file
- session_close(ctx, data->ts);
+ session_close(ctx, data->current_ts);
if (data->current_file) {
fclose(data->current_file);
}
- data->ts = NULL;
+ data->current_ts = NULL;
data->current_file = NULL;
data->current_name = NULL;
}
\ No newline at end of file