From b763ad52923237e9aa8bd5f48fecb8e720ec5205 Mon Sep 17 00:00:00 2001 From: Leonardo Alminana Date: Mon, 23 Oct 2023 19:54:13 +0200 Subject: [PATCH] filter: modified flb_filter_do to operate in memory instead of a chunk Signed-off-by: Leonardo Alminana --- include/fluent-bit/flb_filter.h | 1 + src/flb_filter.c | 58 ++++++++++++++------------------- 2 files changed, 25 insertions(+), 34 deletions(-) diff --git a/include/fluent-bit/flb_filter.h b/include/fluent-bit/flb_filter.h index 12649073bd5..c176b474325 100644 --- a/include/fluent-bit/flb_filter.h +++ b/include/fluent-bit/flb_filter.h @@ -133,6 +133,7 @@ void flb_filter_instance_exit(struct flb_filter_instance *ins, void flb_filter_exit(struct flb_config *config); void flb_filter_do(struct flb_input_chunk *ic, const void *data, size_t bytes, + void **out_data, size_t *out_bytes, const char *tag, int tag_len, struct flb_config *config); const char *flb_filter_name(struct flb_filter_instance *ins); diff --git a/src/flb_filter.c b/src/flb_filter.c index 389709a9a4d..dc6370c8439 100644 --- a/src/flb_filter.c +++ b/src/flb_filter.c @@ -77,6 +77,7 @@ static inline int prop_key_check(const char *key, const char *kv, int k_len) void flb_filter_do(struct flb_input_chunk *ic, const void *data, size_t bytes, + void **out_data, size_t *out_bytes, const char *tag, int tag_len, struct flb_config *config) { @@ -90,13 +91,10 @@ void flb_filter_do(struct flb_input_chunk *ic, char *name; #endif char *ntag; - const char *work_data; + char *work_data; size_t work_size; void *out_buf; - size_t cur_size; size_t out_size; - ssize_t content_size; - ssize_t write_at; struct mk_list *head; struct flb_filter_instance *f_ins; struct flb_input_instance *i_ins = ic->in; @@ -106,6 +104,9 @@ void flb_filter_do(struct flb_input_chunk *ic, struct flb_time tm_finish; #endif /* FLB_HAVE_CHUNK_TRACE */ + *out_data = NULL; + *out_bytes = 0; + /* For the incoming Tag make sure to create a NULL terminated reference */ ntag = flb_malloc(tag_len + 1); if (!ntag) { @@ -116,7 +117,7 @@ void flb_filter_do(struct flb_input_chunk *ic, memcpy(ntag, tag, tag_len); ntag[tag_len] = '\0'; - work_data = (const char *) data; + work_data = (char *) data; work_size = bytes; #ifdef FLB_HAVE_METRICS @@ -131,9 +132,11 @@ void flb_filter_do(struct flb_input_chunk *ic, /* Iterate filters */ mk_list_foreach(head, &config->filters) { f_ins = mk_list_entry(head, struct flb_filter_instance, _head); + if (is_active(&f_ins->properties) == FLB_FALSE) { continue; } + if (flb_router_match(ntag, tag_len, f_ins->match #ifdef FLB_HAVE_REGEX , f_ins->match_regex @@ -145,16 +148,12 @@ void flb_filter_do(struct flb_input_chunk *ic, out_buf = NULL; out_size = 0; - content_size = cio_chunk_get_content_size(ic->chunk); - - /* where to position the new content if modified ? */ - write_at = (content_size - work_size); - #ifdef FLB_HAVE_CHUNK_TRACE if (ic->trace) { flb_time_get(&tm_start); } #endif /* FLB_HAVE_CHUNK_TRACE */ + /* Invoke the filter callback */ ret = f_ins->p->cb_filter(work_data, /* msgpack buffer */ work_size, /* msgpack size */ @@ -165,6 +164,7 @@ void flb_filter_do(struct flb_input_chunk *ic, i_ins, /* input instance */ f_ins->context, /* filter priv data */ config); + #ifdef FLB_HAVE_CHUNK_TRACE if (ic->trace) { flb_time_get(&tm_finish); @@ -176,19 +176,25 @@ void flb_filter_do(struct flb_input_chunk *ic, cmt_counter_add(f_ins->cmt_records, ts, in_records, 1, (char *[]) {name}); - cmt_counter_add(f_ins->cmt_bytes, ts, content_size, + cmt_counter_add(f_ins->cmt_bytes, ts, out_size, 1, (char *[]) {name}); flb_metrics_sum(FLB_METRIC_N_RECORDS, in_records, f_ins->metrics); - flb_metrics_sum(FLB_METRIC_N_BYTES, content_size, f_ins->metrics); + flb_metrics_sum(FLB_METRIC_N_BYTES, out_size, f_ins->metrics); #endif /* Override buffer just if it was modified */ if (ret == FLB_FILTER_MODIFIED) { + /* release intermediate buffer */ + if (work_data != data) { + flb_free(work_data); + } + + work_data = (char *) out_buf; + work_size = out_size; + /* all records removed, no data to continue processing */ if (out_size == 0) { - /* reset data content length */ - flb_input_chunk_write_at(ic, write_at, "", 0); #ifdef FLB_HAVE_CHUNK_TRACE if (ic->trace) { flb_chunk_trace_filter(ic->trace, (void *)f_ins, &tm_start, &tm_finish, "", 0); @@ -240,36 +246,19 @@ void flb_filter_do(struct flb_input_chunk *ic, ic->total_records = pre_records + in_records; #endif } - ret = flb_input_chunk_write_at(ic, write_at, - out_buf, out_size); - if (ret == -1) { - flb_error("[filter] could not write data to storage. " - "Skipping filtering."); - flb_free(out_buf); - continue; - } #ifdef FLB_HAVE_CHUNK_TRACE if (ic->trace) { flb_chunk_trace_filter(ic->trace, (void *)f_ins, &tm_start, &tm_finish, out_buf, out_size); } #endif /* FLB_HAVE_CHUNK_TRACE */ - - /* Point back the 'data' pointer to the new address */ - ret = cio_chunk_get_content(ic->chunk, - (char **) &work_data, &cur_size); - if (ret != CIO_OK) { - flb_error("[filter] error retrieving data chunk"); - } - else { - work_data += (cur_size - out_size); - work_size = out_size; - } - flb_free(out_buf); } } } + *out_data = work_data; + *out_bytes = work_size; + flb_free(ntag); } @@ -660,6 +649,7 @@ void flb_filter_instance_destroy(struct flb_filter_instance *ins) } mk_list_del(&ins->_head); + flb_free(ins); }