Skip to content

Commit

Permalink
IPFIX input: add an input buffer to reduce small I/O operations
Browse files Browse the repository at this point in the history
  • Loading branch information
Lukas955 committed Apr 29, 2020
1 parent e7aca24 commit b2ac300
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 31 deletions.
6 changes: 5 additions & 1 deletion src/plugins/input/ipfix/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,8 @@ Parameters
Path to file(s) in IPFIX File format. It is possible to use asterisk instead of
a filename/directory, tilde character (i.e. "~") instead of the home directory of
the user, and brace expressions (i.e. "/tmp/{source1,source2}/file.ipfix").
Directories and non-IPFIX Files that match the file pattern are skipped/ignored.
Directories and non-IPFIX Files that match the file pattern are skipped/ignored.

:``bufferSize``:
Optional size of the internal buffer to which the content of the file is partly
preloaded. [default: 1048576, min: 131072]
20 changes: 19 additions & 1 deletion src/plugins/input/ipfix/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,25 @@
/*
* <params>
* <path>...</path> // required, exactly once
* <bufferSize>...</bufferSize> // optional
* </params>
*/

/** Default buffer size */
#define BSIZE_DEF (1048576U)
#define BSIZE_MIN (131072U)

/** XML nodes */
enum params_xml_nodes {
NODE_PATH = 1
NODE_PATH = 1,
NODE_BSIZE
};

/** Definition of the \<params\> node */
static const struct fds_xml_args args_params[] = {
FDS_OPTS_ROOT("params"),
FDS_OPTS_ELEM(NODE_PATH, "path", FDS_OPTS_T_STRING, 0),
FDS_OPTS_ELEM(NODE_BSIZE, "bufferSize", FDS_OPTS_T_UINT, FDS_OPTS_P_OPT),
FDS_OPTS_END
};

Expand All @@ -81,6 +88,10 @@ config_parser_root(ipx_ctx_t *ctx, fds_xml_ctx_t *root, struct ipfix_config *cfg
assert(content->type == FDS_OPTS_T_STRING);
cfg->path = strdup(content->ptr_string);
break;
case NODE_BSIZE:
assert(content->type == FDS_OPTS_T_UINT);
cfg->bsize = content->val_uint;
break;
default:
// Internal error
assert(false);
Expand All @@ -89,6 +100,12 @@ config_parser_root(ipx_ctx_t *ctx, fds_xml_ctx_t *root, struct ipfix_config *cfg

if (!cfg->path) {
IPX_CTX_ERROR(ctx, "Memory allocation error (%s:%d)", __FILE__, __LINE__);
return IPX_ERR_FORMAT;
}

if (cfg->bsize < BSIZE_MIN) {
IPX_CTX_ERROR(ctx, "Buffer size must be at least %u bytes!" , (unsigned int) BSIZE_MIN);
return IPX_ERR_FORMAT;
}

return IPX_OK;
Expand All @@ -102,6 +119,7 @@ static void
config_default_set(struct ipfix_config *cfg)
{
cfg->path = NULL;
cfg->bsize = BSIZE_DEF;
}

struct ipfix_config *
Expand Down
2 changes: 2 additions & 0 deletions src/plugins/input/ipfix/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
struct ipfix_config {
/** File pattern */
char *path;
/** Read buffer size */
uint64_t bsize;
};

/**
Expand Down
142 changes: 113 additions & 29 deletions src/plugins/input/ipfix/ipfix.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,17 @@ struct plugin_data {
FILE *current_file;
/// Name/path of the current file
const char *current_name;
/// Transport Session identification of the
struct ipx_session *ts;
/// Transport Session identification
struct ipx_session *current_ts;

/// Buffer of preloaded data
uint8_t *buffer_data;
/// Size of the buffer
size_t buffer_size;
/// Valid size of the buffer
size_t buffer_valid;
/// Position of the reader in the buffer
size_t buffer_offset;
};

/**
Expand Down Expand Up @@ -175,7 +184,7 @@ files_list_get(ipx_ctx_t *ctx, const char *pattern, glob_t *list)
* @param[in] filename New file which corresponds to the new Transport Session
* @return New transport session
*/
struct ipx_session *
static struct ipx_session *
session_open(ipx_ctx_t *ctx, const char *filename)
{
struct ipx_session *res;
Expand Down Expand Up @@ -215,7 +224,7 @@ session_open(ipx_ctx_t *ctx, const char *filename)
* @param[in] ctx Plugin context (for sending notification and log)
* @param[in] session Transport Session to close
*/
void
static void
session_close(ipx_ctx_t *ctx, struct ipx_session *session)
{
ipx_msg_session_t *msg_session;
Expand Down Expand Up @@ -275,7 +284,7 @@ session_close(ipx_ctx_t *ctx, struct ipx_session *session)
* @return #IPX_ERR_EOF if no more files are available
* @return #iPX_ERR_NOMEM in case of a memory allocation error
*/
int
static int
next_file(struct plugin_data *data)
{
size_t idx_next;
Expand All @@ -284,8 +293,8 @@ next_file(struct plugin_data *data)
const char *name_new = NULL;

// Signalize close of the current Transport Session
session_close(data->ctx, data->ts);
data->ts = NULL;
session_close(data->ctx, data->current_ts);
data->current_ts = NULL;
if (data->current_file) {
fclose(data->current_file);
data->current_file = NULL;
Expand Down Expand Up @@ -328,15 +337,81 @@ next_file(struct plugin_data *data)
}

// Signalize open of the new Transport Session
data->ts = session_open(data->ctx, name_new);
if (!data->ts) {
data->current_ts = session_open(data->ctx, name_new);
if (!data->current_ts) {
fclose(file_new);
return IPX_ERR_NOMEM;
}

IPX_CTX_INFO(data->ctx, "Reading from file '%s'...", name_new);
data->current_file = file_new;
data->current_name = name_new;

data->buffer_valid = 0;
data->buffer_offset = 0;
return IPX_OK;
}

/**
* @brief Get the next chunk of data
*
* Reads the chunk from the internal buffer with preloaded content of the file.
* If the buffer doesn't contain required amount of data, new content will be
* loaded from the file. Nevertheless, if the end-of-file has been reached, return
* codes #IPX_ERR_EOF or #IPX_ERR_FORMAT might be returned.
*
* @param[in] data Plugin data
* @param[out] out Output buffer to fill
* @param[in] out_size Size of the output buffer (i.e. required amount of data)
*
* @return #IPX_OK on success
* @return #IPX_ERR_EOF if the end-of-file has been reached (no more data)
* @return #IPX_ERR_FORMAT if the end-of-file has been reached but the internal
* buffer doesn't contain required amount of data
*/
static int
next_chunk(struct plugin_data *data, uint8_t *out, uint16_t out_size)
{
size_t buffer_avail = data->buffer_valid - data->buffer_offset;
uint8_t *reader_ptr = &data->buffer_data[data->buffer_offset];

size_t new_size;
uint8_t *new_ptr;
size_t ret;

// Check if the chunk is fully in the buffer
if (buffer_avail >= out_size) {
memcpy(out, reader_ptr, out_size);
data->buffer_offset += out_size;
return IPX_OK;
}

// We need to load new data to the buffer
if (buffer_avail > 0) {
// A fragment of an unprocessed IPFIX Message must be preserved
memcpy(data->buffer_data, reader_ptr, buffer_avail);
data->buffer_valid = buffer_avail;
} else {
data->buffer_valid = 0;
}

new_size = data->buffer_size - data->buffer_valid;
new_ptr = &data->buffer_data[data->buffer_valid];
ret = fread(new_ptr, 1, new_size, data->current_file);
data->buffer_valid += ret;
data->buffer_offset = 0;

// Check whether the EOF has been reached
if (data->buffer_valid == 0 && feof(data->current_file)) {
return IPX_ERR_EOF;
}

if (data->buffer_valid < out_size) {
return IPX_ERR_FORMAT;
}

memcpy(out, data->buffer_data, out_size);
data->buffer_offset += out_size;
return IPX_OK;
}

Expand All @@ -346,10 +421,11 @@ next_file(struct plugin_data *data)
* @param[in] data Plugin data
* @param[out] msg IPFIX Message extracted from the file
* @return #IPX_OK on success
* @return #IPX_ERR_EOF if the end-of-file has been reached
* @return #IPX_ERR_FORMAT if the file is malformed
* @return #IPX_ERR_NOMEM in case of a memory allocation error
*/
int
static int
next_message(struct plugin_data *data, ipx_msg_ipfix_t **msg)
{
struct fds_ipfix_msg_hdr ipfix_hdr;
Expand All @@ -358,23 +434,21 @@ next_message(struct plugin_data *data, ipx_msg_ipfix_t **msg)

struct ipx_msg_ctx ipfix_ctx;
ipx_msg_ipfix_t *ipfix_msg;
size_t rc;
int ret;

if (!data->current_file) {
return IPX_ERR_EOF;
}

// Get the IPFIX header
rc = fread(&ipfix_hdr, 1, FDS_IPFIX_MSG_HDR_LEN, data->current_file);
if (rc != FDS_IPFIX_MSG_HDR_LEN) {
// Check if the end of file has been reached
if (rc == 0 && feof(data->current_file)) {
return IPX_ERR_EOF;
// Get the IPFIX Message header
ret = next_chunk(data, (uint8_t *) &ipfix_hdr, FDS_IPFIX_MSG_HDR_LEN);
if (ret != IPX_OK) {
if (ret == IPX_ERR_FORMAT) {
IPX_CTX_ERROR(data->ctx, "File '%s' is corrupted (unexpected end of file)!",
data->current_name);
}

IPX_CTX_ERROR(data->ctx, "File '%s' is corrupted (unexpected end of file)!",
data->current_name);
return IPX_ERR_FORMAT;
return ret;
}

ipfix_size = ntohs(ipfix_hdr.length);
Expand All @@ -389,15 +463,14 @@ next_message(struct plugin_data *data, ipx_msg_ipfix_t **msg)
IPX_CTX_ERROR(data->ctx, "Memory allocation failed! (%s:%d)", __FILE__, __LINE__);
return IPX_ERR_NOMEM;
}

memcpy(ipfix_data, &ipfix_hdr, FDS_IPFIX_MSG_HDR_LEN);

// Get the rest of the message body
// Get the rest of the IPFIX Message body
if (ipfix_size > FDS_IPFIX_MSG_HDR_LEN) {
uint8_t *data_ptr = ipfix_data + FDS_IPFIX_MSG_HDR_LEN;
size_t size_remain = ipfix_size - FDS_IPFIX_MSG_HDR_LEN;
uint16_t size_remain = ipfix_size - FDS_IPFIX_MSG_HDR_LEN;

if (fread(data_ptr, 1, size_remain, data->current_file) != size_remain) {
if (next_chunk(data, data_ptr, size_remain) != IPX_OK) {
IPX_CTX_ERROR(data->ctx, "File '%s' is corrupted (unexpected end of file)!",
data->current_name);
free(ipfix_data);
Expand All @@ -407,7 +480,7 @@ next_message(struct plugin_data *data, ipx_msg_ipfix_t **msg)

// Wrap the IPFIX Message
memset(&ipfix_ctx, 0, sizeof(ipfix_ctx));
ipfix_ctx.session = data->ts;
ipfix_ctx.session = data->current_ts;
ipfix_ctx.odid = ntohl(ipfix_hdr.odid);
ipfix_ctx.stream = 0;

Expand Down Expand Up @@ -441,8 +514,18 @@ ipx_plugin_init(ipx_ctx_t *ctx, const char *params)
return IPX_ERR_DENIED;
}

// Initialize reader buffer
data->buffer_size = data->cfg->bsize;
data->buffer_data = malloc(sizeof(uint8_t) * data->buffer_size);
if (!data->buffer_data) {
IPX_CTX_ERROR(ctx, "Memory allocation failed! (%s:%d)", __FILE__, __LINE__);
free(data);
return IPX_ERR_DENIED;
}

// Prepare list of all files to read
if (files_list_get(ctx, data->cfg->path, &data->file_list) != IPX_OK) {
free(data->buffer_data);
config_destroy(data->cfg);
free(data);
return IPX_ERR_DENIED;
Expand All @@ -458,14 +541,15 @@ ipx_plugin_destroy(ipx_ctx_t *ctx, void *cfg)
struct plugin_data *data = (struct plugin_data *) cfg;

// Close the current session and file
session_close(ctx, data->ts);
session_close(ctx, data->current_ts);
if (data->current_file) {
fclose(data->current_file);
}

// Final cleanup
files_list_free(&data->file_list);
config_destroy(data->cfg);
free(data->buffer_data);
free(data);
}

Expand Down Expand Up @@ -509,18 +593,18 @@ ipx_plugin_session_close(ipx_ctx_t *ctx, void *cfg, const struct ipx_session *se
{
struct plugin_data *data = (struct plugin_data *) cfg;
// Do NOT dereference the session pointer because it can be already freed!
if (session != data->ts) {
if (session != data->current_ts) {
// The session has been already closed
return;
}

// Close the current session and file
session_close(ctx, data->ts);
session_close(ctx, data->current_ts);
if (data->current_file) {
fclose(data->current_file);
}

data->ts = NULL;
data->current_ts = NULL;
data->current_file = NULL;
data->current_name = NULL;
}

0 comments on commit b2ac300

Please sign in to comment.