diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index c71ae3ef089..c5b894aa45d 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -1390,6 +1390,10 @@ static int input_chunk_append_raw(struct flb_input_instance *in, size_t pre_real_size; struct flb_input_chunk *ic; struct flb_storage_input *si; + void *filtered_data_buffer; + size_t filtered_data_size; + void *final_data_buffer; + size_t final_data_size; /* memory ring-buffer checker */ if (in->storage_type == FLB_STORAGE_MEMRB) { @@ -1491,15 +1495,6 @@ static int input_chunk_append_raw(struct flb_input_instance *in, pre_real_size = flb_input_chunk_get_real_size(ic); } - /* Write the new data */ - ret = flb_input_chunk_write(ic, buf, buf_size); - if (ret == -1) { - flb_error("[input chunk] error writing data from %s instance", - in->name); - cio_chunk_tx_rollback(ic->chunk); - return -1; - } - #ifdef FLB_HAVE_CHUNK_TRACE flb_chunk_trace_do_input(ic); #endif /* FLB_HAVE_CHUNK_TRACE */ @@ -1529,11 +1524,45 @@ static int input_chunk_append_raw(struct flb_input_instance *in, } #endif + filtered_data_buffer = NULL; + final_data_buffer = (char *) buf; + final_data_size = buf_size; + /* Apply filters */ if (event_type == FLB_INPUT_LOGS) { flb_filter_do(ic, buf, buf_size, - tag, tag_len, in->config); + &filtered_data_buffer, + &filtered_data_size, + tag, tag_len, + in->config); + + if (filtered_data_buffer != NULL) { + final_data_buffer = filtered_data_buffer; + final_data_size = filtered_data_size; + } + } + + if (final_data_size > 0){ + ret = flb_input_chunk_write(ic, + final_data_buffer, + final_data_size); + } + else { + ret = 0; + } + + if (filtered_data_buffer != NULL && + filtered_data_buffer != buf) { + flb_free(filtered_data_buffer); + } + + if (ret == -1) { + flb_error("[input chunk] error writing data from %s instance", + in->name); + cio_chunk_tx_rollback(ic->chunk); + + return -1; } /* get the chunks content size */