diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index 3bfee6277a3..d3798ef6342 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -202,7 +202,8 @@ struct flb_config { char *storage_max_chunk_size_str; /* chunk size limit */ char *storage_bl_mem_limit; /* storage backlog memory limit */ struct flb_storage_metrics *storage_metrics_ctx; /* storage metrics context */ - size_t storage_max_chunk_size; /* chunk size limit */ + size_t storage_max_chunk_size; /* chunk size limit */ + int storage_trim_files; /* enable/disable file trimming */ /* Embedded SQL Database support (SQLite3) */ #ifdef FLB_HAVE_SQLDB @@ -307,6 +308,7 @@ enum conf_type { #define FLB_CONF_STORAGE_BL_MEM_LIMIT "storage.backlog.mem_limit" #define FLB_CONF_STORAGE_MAX_CHUNKS_UP "storage.max_chunks_up" #define FLB_CONF_STORAGE_MAX_CHUNK_SIZE "storage.max_chunk_size" +#define FLB_CONF_STORAGE_TRIM_FILES "storage.trim_files" /* Coroutines */ #define FLB_CONF_STR_CORO_STACK_SIZE "Coro_Stack_Size" diff --git a/lib/chunkio/include/chunkio/chunkio.h b/lib/chunkio/include/chunkio/chunkio.h index e60ee372a17..d348a1ed1fc 100644 --- a/lib/chunkio/include/chunkio/chunkio.h +++ b/lib/chunkio/include/chunkio/chunkio.h @@ -43,6 +43,7 @@ #define CIO_OPEN_RD 2 /* open and read/mmap content if exists */ #define CIO_CHECKSUM 4 /* enable checksum verification (crc32) */ #define CIO_FULL_SYNC 8 /* force sync to fs through MAP_SYNC */ +#define CIO_TRIM_FILES 16 /* trim files to their required size */ /* Return status */ #define CIO_CORRUPTED -3 /* Indicate that a chunk is corrupted */ @@ -100,6 +101,9 @@ void cio_set_log_callback(struct cio_ctx *ctx, void (*log_cb)); int cio_set_log_level(struct cio_ctx *ctx, int level); int cio_set_max_chunks_up(struct cio_ctx *ctx, int n); +void cio_enable_file_trimming(struct cio_ctx *ctx); +void cio_disable_file_trimming(struct cio_ctx *ctx); + int cio_meta_write(struct cio_chunk *ch, char *buf, size_t size); int cio_meta_cmp(struct cio_chunk *ch, char *meta_buf, int meta_len); int cio_meta_read(struct cio_chunk *ch, char **meta_buf, int *meta_len); diff --git a/lib/chunkio/include/chunkio/cio_file_st.h b/lib/chunkio/include/chunkio/cio_file_st.h index 028922f3174..c2a1ac0ea11 100644 --- a/lib/chunkio/include/chunkio/cio_file_st.h +++ b/lib/chunkio/include/chunkio/cio_file_st.h @@ -36,7 +36,10 @@ * +--------------+----------------+ * | 0xC1 | 0x00 +--> Header 2 bytes * +--------------+----------------+ - * | 4 BYTES CRC32 + 16 BYTES +--> CRC32(Content) + Padding + * | 4 BYTES +--> CRC32(Content) + * | 4 BYTES +--> CRC32(Padding) + * | 4 BYTES +--> Content length + * | 8 BYTES +--> Padding * +-------------------------------+ * | Content | * | +-------------------------+ | @@ -55,10 +58,14 @@ * +-------------------------------+ */ -#define CIO_FILE_ID_00 0xc1 /* header: first byte */ -#define CIO_FILE_ID_01 0x00 /* header: second byte */ -#define CIO_FILE_HEADER_MIN 24 /* 24 bytes for the header */ -#define CIO_FILE_CONTENT_OFFSET 22 +#define CIO_FILE_ID_00 0xc1 /* header: first byte */ +#define CIO_FILE_ID_01 0x00 /* header: second byte */ +#define CIO_FILE_HEADER_MIN 24 /* 24 bytes for the header */ +#define CIO_FILE_CONTENT_OFFSET 22 +#define CIO_FILE_CONTENT_LENGTH_OFFSET 10 /* We store the content length + * right after the checksum in + * what used to be padding + */ /* Return pointer to hash position */ static inline char *cio_file_st_get_hash(char *map) @@ -94,22 +101,34 @@ static inline char *cio_file_st_get_content(char *map) return map + CIO_FILE_HEADER_MIN + len; } -static inline ssize_t cio_file_st_get_content_size(char *map, size_t size) +/* Get content length */ +static inline ssize_t cio_file_st_get_content_len(char *map, size_t size) { - int meta_len; - size_t s; + uint8_t *content_length_buffer; if (size < CIO_FILE_HEADER_MIN) { return -1; } - meta_len = cio_file_st_get_meta_len(map); - s = (size - CIO_FILE_HEADER_MIN) - meta_len; - if (s < size) { - return s; - } + content_length_buffer = (uint8_t *) &map[CIO_FILE_CONTENT_LENGTH_OFFSET]; + + return (ssize_t) (((uint32_t) content_length_buffer[0]) << 24) | + (((uint32_t) content_length_buffer[1]) << 16) | + (((uint32_t) content_length_buffer[2]) << 8) | + (((uint32_t) content_length_buffer[3]) << 0); +} + +/* Set content length */ +static inline void cio_file_st_set_content_len(char *map, uint32_t len) +{ + uint8_t *content_length_buffer; + + content_length_buffer = (uint8_t *) &map[CIO_FILE_CONTENT_LENGTH_OFFSET]; - return -1; + content_length_buffer[0] = (uint8_t) ((len & 0xFF000000) >> 24); + content_length_buffer[1] = (uint8_t) ((len & 0x00FF0000) >> 16); + content_length_buffer[2] = (uint8_t) ((len & 0x0000FF00) >> 8); + content_length_buffer[3] = (uint8_t) ((len & 0x000000FF) >> 0); } #endif diff --git a/lib/chunkio/src/chunkio.c b/lib/chunkio/src/chunkio.c index 07804a97a6a..2ac10ea32a4 100644 --- a/lib/chunkio/src/chunkio.c +++ b/lib/chunkio/src/chunkio.c @@ -227,3 +227,13 @@ int cio_set_max_chunks_up(struct cio_ctx *ctx, int n) ctx->max_chunks_up = n; return 0; } + +void cio_enable_file_trimming(struct cio_ctx *ctx) +{ + ctx->flags |= CIO_TRIM_FILES; +} + +void cio_disable_file_trimming(struct cio_ctx *ctx) +{ + ctx->flags &= ~CIO_TRIM_FILES; +} diff --git a/lib/chunkio/src/cio_file.c b/lib/chunkio/src/cio_file.c index 1129dcfd33a..4099734a384 100644 --- a/lib/chunkio/src/cio_file.c +++ b/lib/chunkio/src/cio_file.c @@ -16,7 +16,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - #define _GNU_SOURCE #include @@ -60,25 +59,17 @@ char cio_file_init_bytes[] = { #define ROUND_UP(N, S) ((((N) + (S) - 1) / (S)) * (S)) -/* Get the number of bytes in the Content section */ -static size_t content_len(struct cio_file *cf) -{ - int meta; - size_t len; - - meta = cio_file_st_get_meta_len(cf->map); - len = 2 + meta + cf->data_size; - return len; -} - -/* Calculate content checksum in a variable */ + /* Calculate content checksum in a variable */ void cio_file_calculate_checksum(struct cio_file *cf, crc_t *out) { crc_t val; size_t len; unsigned char *in_data; - len = content_len(cf); + len = cio_file_st_get_content_len(cf->map, CIO_FILE_HEADER_MIN); + len += cio_file_st_get_meta_len(cf->map); + len += 2; + in_data = (unsigned char *) cf->map + CIO_FILE_CONTENT_OFFSET; val = cio_crc32_update(cf->crc_cur, in_data, len); *out = val; @@ -110,6 +101,7 @@ static void finalize_checksum(struct cio_file *cf) crc = cio_crc32_finalize(cf->crc_cur); crc = htonl(crc); + memcpy(cf->map + 2, &crc, sizeof(crc)); } @@ -147,21 +139,26 @@ static void write_init_header(struct cio_chunk *ch, struct cio_file *cf) cf->map[4] = 0; cf->map[5] = 0; } + + cio_file_st_set_content_len(cf->map, 0); } /* Return the available size in the file map to write data */ static size_t get_available_size(struct cio_file *cf, int *meta_len) { size_t av; - int len; + int metadata_len; /* Get metadata length */ - len = cio_file_st_get_meta_len(cf->map); + metadata_len = cio_file_st_get_meta_len(cf->map); - av = cf->alloc_size - cf->data_size; - av -= (CIO_FILE_HEADER_MIN + len); + av = cf->alloc_size; + av -= CIO_FILE_HEADER_MIN; + av -= metadata_len; + av -= cf->data_size; + + *meta_len = metadata_len; - *meta_len = len; return av; } @@ -172,6 +169,9 @@ static size_t get_available_size(struct cio_file *cf, int *meta_len) static int cio_file_format_check(struct cio_chunk *ch, struct cio_file *cf, int flags) { + size_t metadata_length; + ssize_t content_length; + ssize_t logical_length; unsigned char *p; crc_t crc_check; crc_t crc; @@ -185,6 +185,7 @@ static int cio_file_format_check(struct cio_chunk *ch, cio_log_warn(ch->ctx, "[cio file] cannot initialize chunk (read-only)"); cio_error_set(ch, CIO_ERR_PERMISSION); + return -1; } @@ -192,6 +193,7 @@ static int cio_file_format_check(struct cio_chunk *ch, if (cf->alloc_size < CIO_FILE_HEADER_MIN) { cio_log_warn(ch->ctx, "[cio file] cannot initialize chunk"); cio_error_set(ch, CIO_ERR_BAD_LAYOUT); + return -1; } @@ -209,6 +211,31 @@ static int cio_file_format_check(struct cio_chunk *ch, cio_log_debug(ch->ctx, "[cio file] invalid header at %s", ch->name); cio_error_set(ch, CIO_ERR_BAD_LAYOUT); + + return -1; + } + + /* Expected / logical file size verification */ + content_length = cio_file_st_get_content_len(cf->map, cf->fs_size); + if (content_length == -1) { + cio_log_debug(ch->ctx, "[cio file] truncated header (%zu / %zu) %s", + cf->fs_size, CIO_FILE_HEADER_MIN, ch->name); + cio_error_set(ch, CIO_ERR_BAD_FILE_SIZE); + + return -1; + } + + metadata_length = cio_file_st_get_meta_len(cf->map); + + logical_length = CIO_FILE_HEADER_MIN + + metadata_length + + content_length; + + if (logical_length > cf->fs_size) { + cio_log_debug(ch->ctx, "[cio file] truncated file (%zd / %zd) %s", + cf->fs_size, logical_length, ch->name); + cio_error_set(ch, CIO_ERR_BAD_FILE_SIZE); + return -1; } @@ -226,12 +253,15 @@ static int cio_file_format_check(struct cio_chunk *ch, /* Compare */ crc_check = cio_crc32_finalize(crc); crc_check = htonl(crc_check); + if (memcmp(p, &crc_check, sizeof(crc_check)) != 0) { cio_log_debug(ch->ctx, "[cio file] invalid crc32 at %s/%s", ch->name, cf->path); cio_error_set(ch, CIO_ERR_BAD_CHECKSUM); + return -1; } + cf->crc_cur = crc; } } @@ -360,7 +390,6 @@ static int mmap_file(struct cio_ctx *ctx, struct cio_chunk *ch, size_t size) } /* Map the file */ - size = ROUND_UP(size, ctx->page_size); cf->map = mmap(0, size, oflags, MAP_SHARED, cf->fd, 0); if (cf->map == MAP_FAILED) { cio_errno(); @@ -372,7 +401,7 @@ static int mmap_file(struct cio_ctx *ctx, struct cio_chunk *ch, size_t size) /* check content data size */ if (fs_size > 0) { - content_size = cio_file_st_get_content_size(cf->map, fs_size); + content_size = cio_file_st_get_content_len(cf->map, fs_size); if (content_size == -1) { cio_error_set(ch, CIO_ERR_BAD_FILE_SIZE); cio_log_error(ctx, "invalid content size %s", cf->path); @@ -937,6 +966,14 @@ int cio_file_write(struct cio_chunk *ch, const void *buf, size_t count) cf->alloc_size = new_size; } + /* If crc_reset was toggled we know that data_size was + * modified by cio_chunk_write_at which means we need + * to update the header before we recalculate the checksum + */ + if (cf->crc_reset) { + cio_file_st_set_content_len(cf->map, cf->data_size); + } + if (ch->ctx->flags & CIO_CHECKSUM) { update_checksum(cf, (unsigned char *) buf, count); } @@ -947,6 +984,8 @@ int cio_file_write(struct cio_chunk *ch, const void *buf, size_t count) cf->data_size += count; cf->synced = CIO_FALSE; + cio_file_st_set_content_len(cf->map, cf->data_size); + return 0; } @@ -1073,30 +1112,35 @@ int cio_file_sync(struct cio_chunk *ch) /* Save current mmap size */ old_size = cf->alloc_size; - /* If there are extra space, truncate the file size */ - av_size = get_available_size(cf, &meta_len); - if (av_size > 0) { - size = cf->alloc_size - av_size; - ret = cio_file_fs_size_change(cf, size); - if (ret == -1) { - cio_errno(); - cio_log_error(ch->ctx, - "[cio file sync] error adjusting size at: " - " %s/%s", ch->st->name, ch->name); + /* File trimming has been made opt-in because it causes + * performance degradation and excessive fragmentation + * in XFS. + */ + if ((ch->ctx->flags & CIO_TRIM_FILES) != 0) { + /* If there are extra space, truncate the file size */ + av_size = get_available_size(cf, &meta_len); + if (av_size > 0) { + size = cf->alloc_size - av_size; + ret = cio_file_fs_size_change(cf, size); + if (ret == -1) { + cio_errno(); + cio_log_error(ch->ctx, + "[cio file sync] error adjusting size at: " + " %s/%s", ch->st->name, ch->name); + } + cf->alloc_size = size; } - cf->alloc_size = size; - } - else if (cf->alloc_size > fst.st_size) { - ret = cio_file_fs_size_change(cf, cf->alloc_size); - if (ret == -1) { - cio_errno(); - cio_log_error(ch->ctx, - "[cio file sync] error adjusting size at: " - " %s/%s", ch->st->name, ch->name); + else if (cf->alloc_size > fst.st_size) { + ret = cio_file_fs_size_change(cf, cf->alloc_size); + if (ret == -1) { + cio_errno(); + cio_log_error(ch->ctx, + "[cio file sync] error adjusting size at: " + " %s/%s", ch->st->name, ch->name); + } } } - /* If the mmap size changed, adjust mapping to the proper size */ if (old_size != cf->alloc_size) { #ifndef MREMAP_MAYMOVE /* OSX */ @@ -1151,6 +1195,7 @@ int cio_file_sync(struct cio_chunk *ch) cio_log_debug(ch->ctx, "[cio file] synced at: %s/%s", ch->st->name, ch->name); + return 0; } diff --git a/src/flb_config.c b/src/flb_config.c index 380b216290f..40fa0dd1593 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -141,6 +141,9 @@ struct flb_service_config service_configs[] = { {FLB_CONF_STORAGE_MAX_CHUNK_SIZE, FLB_CONF_TYPE_STR, offsetof(struct flb_config, storage_max_chunk_size_str)}, + {FLB_CONF_STORAGE_TRIM_FILES, + FLB_CONF_TYPE_BOOL, + offsetof(struct flb_config, storage_trim_files)}, /* Coroutines */ {FLB_CONF_STR_CORO_STACK_SIZE, diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index bbcbe452b06..dad8656a28f 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -1105,17 +1105,6 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in, } } - if (ic != NULL) { - new_chunk_size = flb_input_chunk_get_real_size(ic); - new_chunk_size += chunk_size; - - if (in->config->storage_max_chunk_size > 0 && - in->config->storage_max_chunk_size < new_chunk_size) { - ic = NULL; - } - } - - /* No chunk was found, we need to create a new one */ if (!ic) { ic = flb_input_chunk_create(in, (char *) tag, tag_len); diff --git a/src/flb_storage.c b/src/flb_storage.c index 4f4214c99de..ebf4ae8e5c9 100644 --- a/src/flb_storage.c +++ b/src/flb_storage.c @@ -506,6 +506,11 @@ int flb_storage_create(struct flb_config *ctx) flags |= CIO_CHECKSUM; } + /* file trimming */ + if (ctx->storage_trim_files == FLB_TRUE) { + flags |= CIO_TRIM_FILES; + } + /* Create chunkio context */ cio = cio_create(ctx->storage_path, log_cb, CIO_LOG_DEBUG, flags); if (!cio) {