From 8850ee14e92991150176ebe1f689906985cbab51 Mon Sep 17 00:00:00 2001 From: Braydon Kains <93549768+braydonk@users.noreply.github.com> Date: Sat, 16 Dec 2023 17:49:40 -0500 Subject: [PATCH] input_chunk: handle filter_do edge case (#8229) * input_chunk: handle filter_do edge case flb_filter_do may modify the input chunk's total records, meaning that if there is a filter in the pipeline the total records could be double-counted which breaks anything that relies on and event chunk's `total_records`. Signed-off-by: braydonk * input_chunk: clarify comment Clarify comment to demonstrate that flb_filter_do actually overwrites the value not just changing it. Signed-off-by: braydonk --- src/flb_input_chunk.c | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index 7437ca77654..079bfb26d90 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -1376,7 +1376,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in, const char *tag, size_t tag_len, const void *buf, size_t buf_size) { - int ret; + int ret, total_records_start; int set_down = FLB_FALSE; int min; int new_chunk = FLB_FALSE; @@ -1495,6 +1495,15 @@ static int input_chunk_append_raw(struct flb_input_instance *in, pre_real_size = flb_input_chunk_get_real_size(ic); } + /* + * Set the total_records based on the records that n_records + * says we should be writing. These values may be overwritten + * flb_filter_do, where a filter may add/remove records. + */ + total_records_start = ic->total_records; + ic->added_records = n_records; + ic->total_records += n_records; + #ifdef FLB_HAVE_CHUNK_TRACE flb_chunk_trace_do_input(ic); #endif /* FLB_HAVE_CHUNK_TRACE */ @@ -1530,9 +1539,13 @@ static int input_chunk_append_raw(struct flb_input_instance *in, flb_free(filtered_data_buffer); } - if (ret == CIO_OK) { - ic->added_records = n_records; - ic->total_records += n_records; + /* + * If the write failed, then we did not add any records. Reset + * the record counters to reflect this. + */ + if (ret != CIO_OK) { + ic->added_records = 0; + ic->total_records = total_records_start; } /* Update 'input' metrics */