From b2ac300403cc86821710d17520f0fbf391379159 Mon Sep 17 00:00:00 2001 From: Lukas Hutak Date: Wed, 29 Apr 2020 18:19:06 +0200 Subject: [PATCH] IPFIX input: add an input buffer to reduce small I/O operations --- src/plugins/input/ipfix/README.rst | 6 +- src/plugins/input/ipfix/config.c | 20 +++- src/plugins/input/ipfix/config.h | 2 + src/plugins/input/ipfix/ipfix.c | 142 +++++++++++++++++++++++------ 4 files changed, 139 insertions(+), 31 deletions(-) diff --git a/src/plugins/input/ipfix/README.rst b/src/plugins/input/ipfix/README.rst index 64ab54de..fd0e9a43 100644 --- a/src/plugins/input/ipfix/README.rst +++ b/src/plugins/input/ipfix/README.rst @@ -27,4 +27,8 @@ Parameters Path to file(s) in IPFIX File format. It is possible to use asterisk instead of a filename/directory, tilde character (i.e. "~") instead of the home directory of the user, and brace expressions (i.e. "/tmp/{source1,source2}/file.ipfix"). - Directories and non-IPFIX Files that match the file pattern are skipped/ignored. \ No newline at end of file + Directories and non-IPFIX Files that match the file pattern are skipped/ignored. + +:``bufferSize``: + Optional size of the internal buffer to which the content of the file is partly + preloaded. [default: 1048576, min: 131072] diff --git a/src/plugins/input/ipfix/config.c b/src/plugins/input/ipfix/config.c index 1402ee9e..43f4e589 100644 --- a/src/plugins/input/ipfix/config.c +++ b/src/plugins/input/ipfix/config.c @@ -47,18 +47,25 @@ /* * * ... // required, exactly once + * ... // optional * */ +/** Default buffer size */ +#define BSIZE_DEF (1048576U) +#define BSIZE_MIN (131072U) + /** XML nodes */ enum params_xml_nodes { - NODE_PATH = 1 + NODE_PATH = 1, + NODE_BSIZE }; /** Definition of the \ node */ static const struct fds_xml_args args_params[] = { FDS_OPTS_ROOT("params"), FDS_OPTS_ELEM(NODE_PATH, "path", FDS_OPTS_T_STRING, 0), + FDS_OPTS_ELEM(NODE_BSIZE, "bufferSize", FDS_OPTS_T_UINT, FDS_OPTS_P_OPT), FDS_OPTS_END }; @@ -81,6 +88,10 @@ config_parser_root(ipx_ctx_t *ctx, fds_xml_ctx_t *root, struct ipfix_config *cfg assert(content->type == FDS_OPTS_T_STRING); cfg->path = strdup(content->ptr_string); break; + case NODE_BSIZE: + assert(content->type == FDS_OPTS_T_UINT); + cfg->bsize = content->val_uint; + break; default: // Internal error assert(false); @@ -89,6 +100,12 @@ config_parser_root(ipx_ctx_t *ctx, fds_xml_ctx_t *root, struct ipfix_config *cfg if (!cfg->path) { IPX_CTX_ERROR(ctx, "Memory allocation error (%s:%d)", __FILE__, __LINE__); + return IPX_ERR_FORMAT; + } + + if (cfg->bsize < BSIZE_MIN) { + IPX_CTX_ERROR(ctx, "Buffer size must be at least %u bytes!" , (unsigned int) BSIZE_MIN); + return IPX_ERR_FORMAT; } return IPX_OK; @@ -102,6 +119,7 @@ static void config_default_set(struct ipfix_config *cfg) { cfg->path = NULL; + cfg->bsize = BSIZE_DEF; } struct ipfix_config * diff --git a/src/plugins/input/ipfix/config.h b/src/plugins/input/ipfix/config.h index a6d5f6c5..8b1afa6c 100644 --- a/src/plugins/input/ipfix/config.h +++ b/src/plugins/input/ipfix/config.h @@ -50,6 +50,8 @@ struct ipfix_config { /** File pattern */ char *path; + /** Read buffer size */ + uint64_t bsize; }; /** diff --git a/src/plugins/input/ipfix/ipfix.c b/src/plugins/input/ipfix/ipfix.c index aaa8d1e9..7ae8c1d9 100644 --- a/src/plugins/input/ipfix/ipfix.c +++ b/src/plugins/input/ipfix/ipfix.c @@ -79,8 +79,17 @@ struct plugin_data { FILE *current_file; /// Name/path of the current file const char *current_name; - /// Transport Session identification of the - struct ipx_session *ts; + /// Transport Session identification + struct ipx_session *current_ts; + + /// Buffer of preloaded data + uint8_t *buffer_data; + /// Size of the buffer + size_t buffer_size; + /// Valid size of the buffer + size_t buffer_valid; + /// Position of the reader in the buffer + size_t buffer_offset; }; /** @@ -175,7 +184,7 @@ files_list_get(ipx_ctx_t *ctx, const char *pattern, glob_t *list) * @param[in] filename New file which corresponds to the new Transport Session * @return New transport session */ -struct ipx_session * +static struct ipx_session * session_open(ipx_ctx_t *ctx, const char *filename) { struct ipx_session *res; @@ -215,7 +224,7 @@ session_open(ipx_ctx_t *ctx, const char *filename) * @param[in] ctx Plugin context (for sending notification and log) * @param[in] session Transport Session to close */ -void +static void session_close(ipx_ctx_t *ctx, struct ipx_session *session) { ipx_msg_session_t *msg_session; @@ -275,7 +284,7 @@ session_close(ipx_ctx_t *ctx, struct ipx_session *session) * @return #IPX_ERR_EOF if no more files are available * @return #iPX_ERR_NOMEM in case of a memory allocation error */ -int +static int next_file(struct plugin_data *data) { size_t idx_next; @@ -284,8 +293,8 @@ next_file(struct plugin_data *data) const char *name_new = NULL; // Signalize close of the current Transport Session - session_close(data->ctx, data->ts); - data->ts = NULL; + session_close(data->ctx, data->current_ts); + data->current_ts = NULL; if (data->current_file) { fclose(data->current_file); data->current_file = NULL; @@ -328,8 +337,8 @@ next_file(struct plugin_data *data) } // Signalize open of the new Transport Session - data->ts = session_open(data->ctx, name_new); - if (!data->ts) { + data->current_ts = session_open(data->ctx, name_new); + if (!data->current_ts) { fclose(file_new); return IPX_ERR_NOMEM; } @@ -337,6 +346,72 @@ next_file(struct plugin_data *data) IPX_CTX_INFO(data->ctx, "Reading from file '%s'...", name_new); data->current_file = file_new; data->current_name = name_new; + + data->buffer_valid = 0; + data->buffer_offset = 0; + return IPX_OK; +} + +/** + * @brief Get the next chunk of data + * + * Reads the chunk from the internal buffer with preloaded content of the file. + * If the buffer doesn't contain required amount of data, new content will be + * loaded from the file. Nevertheless, if the end-of-file has been reached, return + * codes #IPX_ERR_EOF or #IPX_ERR_FORMAT might be returned. + * + * @param[in] data Plugin data + * @param[out] out Output buffer to fill + * @param[in] out_size Size of the output buffer (i.e. required amount of data) + * + * @return #IPX_OK on success + * @return #IPX_ERR_EOF if the end-of-file has been reached (no more data) + * @return #IPX_ERR_FORMAT if the end-of-file has been reached but the internal + * buffer doesn't contain required amount of data + */ +static int +next_chunk(struct plugin_data *data, uint8_t *out, uint16_t out_size) +{ + size_t buffer_avail = data->buffer_valid - data->buffer_offset; + uint8_t *reader_ptr = &data->buffer_data[data->buffer_offset]; + + size_t new_size; + uint8_t *new_ptr; + size_t ret; + + // Check if the chunk is fully in the buffer + if (buffer_avail >= out_size) { + memcpy(out, reader_ptr, out_size); + data->buffer_offset += out_size; + return IPX_OK; + } + + // We need to load new data to the buffer + if (buffer_avail > 0) { + // A fragment of an unprocessed IPFIX Message must be preserved + memcpy(data->buffer_data, reader_ptr, buffer_avail); + data->buffer_valid = buffer_avail; + } else { + data->buffer_valid = 0; + } + + new_size = data->buffer_size - data->buffer_valid; + new_ptr = &data->buffer_data[data->buffer_valid]; + ret = fread(new_ptr, 1, new_size, data->current_file); + data->buffer_valid += ret; + data->buffer_offset = 0; + + // Check whether the EOF has been reached + if (data->buffer_valid == 0 && feof(data->current_file)) { + return IPX_ERR_EOF; + } + + if (data->buffer_valid < out_size) { + return IPX_ERR_FORMAT; + } + + memcpy(out, data->buffer_data, out_size); + data->buffer_offset += out_size; return IPX_OK; } @@ -346,10 +421,11 @@ next_file(struct plugin_data *data) * @param[in] data Plugin data * @param[out] msg IPFIX Message extracted from the file * @return #IPX_OK on success + * @return #IPX_ERR_EOF if the end-of-file has been reached * @return #IPX_ERR_FORMAT if the file is malformed * @return #IPX_ERR_NOMEM in case of a memory allocation error */ -int +static int next_message(struct plugin_data *data, ipx_msg_ipfix_t **msg) { struct fds_ipfix_msg_hdr ipfix_hdr; @@ -358,23 +434,21 @@ next_message(struct plugin_data *data, ipx_msg_ipfix_t **msg) struct ipx_msg_ctx ipfix_ctx; ipx_msg_ipfix_t *ipfix_msg; - size_t rc; + int ret; if (!data->current_file) { return IPX_ERR_EOF; } - // Get the IPFIX header - rc = fread(&ipfix_hdr, 1, FDS_IPFIX_MSG_HDR_LEN, data->current_file); - if (rc != FDS_IPFIX_MSG_HDR_LEN) { - // Check if the end of file has been reached - if (rc == 0 && feof(data->current_file)) { - return IPX_ERR_EOF; + // Get the IPFIX Message header + ret = next_chunk(data, (uint8_t *) &ipfix_hdr, FDS_IPFIX_MSG_HDR_LEN); + if (ret != IPX_OK) { + if (ret == IPX_ERR_FORMAT) { + IPX_CTX_ERROR(data->ctx, "File '%s' is corrupted (unexpected end of file)!", + data->current_name); } - IPX_CTX_ERROR(data->ctx, "File '%s' is corrupted (unexpected end of file)!", - data->current_name); - return IPX_ERR_FORMAT; + return ret; } ipfix_size = ntohs(ipfix_hdr.length); @@ -389,15 +463,14 @@ next_message(struct plugin_data *data, ipx_msg_ipfix_t **msg) IPX_CTX_ERROR(data->ctx, "Memory allocation failed! (%s:%d)", __FILE__, __LINE__); return IPX_ERR_NOMEM; } - memcpy(ipfix_data, &ipfix_hdr, FDS_IPFIX_MSG_HDR_LEN); - // Get the rest of the message body + // Get the rest of the IPFIX Message body if (ipfix_size > FDS_IPFIX_MSG_HDR_LEN) { uint8_t *data_ptr = ipfix_data + FDS_IPFIX_MSG_HDR_LEN; - size_t size_remain = ipfix_size - FDS_IPFIX_MSG_HDR_LEN; + uint16_t size_remain = ipfix_size - FDS_IPFIX_MSG_HDR_LEN; - if (fread(data_ptr, 1, size_remain, data->current_file) != size_remain) { + if (next_chunk(data, data_ptr, size_remain) != IPX_OK) { IPX_CTX_ERROR(data->ctx, "File '%s' is corrupted (unexpected end of file)!", data->current_name); free(ipfix_data); @@ -407,7 +480,7 @@ next_message(struct plugin_data *data, ipx_msg_ipfix_t **msg) // Wrap the IPFIX Message memset(&ipfix_ctx, 0, sizeof(ipfix_ctx)); - ipfix_ctx.session = data->ts; + ipfix_ctx.session = data->current_ts; ipfix_ctx.odid = ntohl(ipfix_hdr.odid); ipfix_ctx.stream = 0; @@ -441,8 +514,18 @@ ipx_plugin_init(ipx_ctx_t *ctx, const char *params) return IPX_ERR_DENIED; } + // Initialize reader buffer + data->buffer_size = data->cfg->bsize; + data->buffer_data = malloc(sizeof(uint8_t) * data->buffer_size); + if (!data->buffer_data) { + IPX_CTX_ERROR(ctx, "Memory allocation failed! (%s:%d)", __FILE__, __LINE__); + free(data); + return IPX_ERR_DENIED; + } + // Prepare list of all files to read if (files_list_get(ctx, data->cfg->path, &data->file_list) != IPX_OK) { + free(data->buffer_data); config_destroy(data->cfg); free(data); return IPX_ERR_DENIED; @@ -458,7 +541,7 @@ ipx_plugin_destroy(ipx_ctx_t *ctx, void *cfg) struct plugin_data *data = (struct plugin_data *) cfg; // Close the current session and file - session_close(ctx, data->ts); + session_close(ctx, data->current_ts); if (data->current_file) { fclose(data->current_file); } @@ -466,6 +549,7 @@ ipx_plugin_destroy(ipx_ctx_t *ctx, void *cfg) // Final cleanup files_list_free(&data->file_list); config_destroy(data->cfg); + free(data->buffer_data); free(data); } @@ -509,18 +593,18 @@ ipx_plugin_session_close(ipx_ctx_t *ctx, void *cfg, const struct ipx_session *se { struct plugin_data *data = (struct plugin_data *) cfg; // Do NOT dereference the session pointer because it can be already freed! - if (session != data->ts) { + if (session != data->current_ts) { // The session has been already closed return; } // Close the current session and file - session_close(ctx, data->ts); + session_close(ctx, data->current_ts); if (data->current_file) { fclose(data->current_file); } - data->ts = NULL; + data->current_ts = NULL; data->current_file = NULL; data->current_name = NULL; } \ No newline at end of file