Skip to content

Commit

Permalink
filter: modified flb_filter_do to operate in memory instead of a chunk
Browse files Browse the repository at this point in the history
Signed-off-by: Leonardo Alminana <[email protected]>
  • Loading branch information
leonardo-albertovich authored and edsiper committed Nov 4, 2023
1 parent 8207c71 commit b763ad5
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 34 deletions.
1 change: 1 addition & 0 deletions include/fluent-bit/flb_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ void flb_filter_instance_exit(struct flb_filter_instance *ins,
void flb_filter_exit(struct flb_config *config);
void flb_filter_do(struct flb_input_chunk *ic,
const void *data, size_t bytes,
void **out_data, size_t *out_bytes,
const char *tag, int tag_len,
struct flb_config *config);
const char *flb_filter_name(struct flb_filter_instance *ins);
Expand Down
58 changes: 24 additions & 34 deletions src/flb_filter.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ static inline int prop_key_check(const char *key, const char *kv, int k_len)

void flb_filter_do(struct flb_input_chunk *ic,
const void *data, size_t bytes,
void **out_data, size_t *out_bytes,
const char *tag, int tag_len,
struct flb_config *config)
{
Expand All @@ -90,13 +91,10 @@ void flb_filter_do(struct flb_input_chunk *ic,
char *name;
#endif
char *ntag;
const char *work_data;
char *work_data;
size_t work_size;
void *out_buf;
size_t cur_size;
size_t out_size;
ssize_t content_size;
ssize_t write_at;
struct mk_list *head;
struct flb_filter_instance *f_ins;
struct flb_input_instance *i_ins = ic->in;
Expand All @@ -106,6 +104,9 @@ void flb_filter_do(struct flb_input_chunk *ic,
struct flb_time tm_finish;
#endif /* FLB_HAVE_CHUNK_TRACE */

*out_data = NULL;
*out_bytes = 0;

/* For the incoming Tag make sure to create a NULL terminated reference */
ntag = flb_malloc(tag_len + 1);
if (!ntag) {
Expand All @@ -116,7 +117,7 @@ void flb_filter_do(struct flb_input_chunk *ic,
memcpy(ntag, tag, tag_len);
ntag[tag_len] = '\0';

work_data = (const char *) data;
work_data = (char *) data;
work_size = bytes;

#ifdef FLB_HAVE_METRICS
Expand All @@ -131,9 +132,11 @@ void flb_filter_do(struct flb_input_chunk *ic,
/* Iterate filters */
mk_list_foreach(head, &config->filters) {
f_ins = mk_list_entry(head, struct flb_filter_instance, _head);

if (is_active(&f_ins->properties) == FLB_FALSE) {
continue;
}

if (flb_router_match(ntag, tag_len, f_ins->match
#ifdef FLB_HAVE_REGEX
, f_ins->match_regex
Expand All @@ -145,16 +148,12 @@ void flb_filter_do(struct flb_input_chunk *ic,
out_buf = NULL;
out_size = 0;

content_size = cio_chunk_get_content_size(ic->chunk);

/* where to position the new content if modified ? */
write_at = (content_size - work_size);

#ifdef FLB_HAVE_CHUNK_TRACE
if (ic->trace) {
flb_time_get(&tm_start);
}
#endif /* FLB_HAVE_CHUNK_TRACE */

/* Invoke the filter callback */
ret = f_ins->p->cb_filter(work_data, /* msgpack buffer */
work_size, /* msgpack size */
Expand All @@ -165,6 +164,7 @@ void flb_filter_do(struct flb_input_chunk *ic,
i_ins, /* input instance */
f_ins->context, /* filter priv data */
config);

#ifdef FLB_HAVE_CHUNK_TRACE
if (ic->trace) {
flb_time_get(&tm_finish);
Expand All @@ -176,19 +176,25 @@ void flb_filter_do(struct flb_input_chunk *ic,

cmt_counter_add(f_ins->cmt_records, ts, in_records,
1, (char *[]) {name});
cmt_counter_add(f_ins->cmt_bytes, ts, content_size,
cmt_counter_add(f_ins->cmt_bytes, ts, out_size,
1, (char *[]) {name});

flb_metrics_sum(FLB_METRIC_N_RECORDS, in_records, f_ins->metrics);
flb_metrics_sum(FLB_METRIC_N_BYTES, content_size, f_ins->metrics);
flb_metrics_sum(FLB_METRIC_N_BYTES, out_size, f_ins->metrics);
#endif

/* Override buffer just if it was modified */
if (ret == FLB_FILTER_MODIFIED) {
/* release intermediate buffer */
if (work_data != data) {
flb_free(work_data);
}

work_data = (char *) out_buf;
work_size = out_size;

/* all records removed, no data to continue processing */
if (out_size == 0) {
/* reset data content length */
flb_input_chunk_write_at(ic, write_at, "", 0);
#ifdef FLB_HAVE_CHUNK_TRACE
if (ic->trace) {
flb_chunk_trace_filter(ic->trace, (void *)f_ins, &tm_start, &tm_finish, "", 0);
Expand Down Expand Up @@ -240,36 +246,19 @@ void flb_filter_do(struct flb_input_chunk *ic,
ic->total_records = pre_records + in_records;
#endif
}
ret = flb_input_chunk_write_at(ic, write_at,
out_buf, out_size);
if (ret == -1) {
flb_error("[filter] could not write data to storage. "
"Skipping filtering.");
flb_free(out_buf);
continue;
}

#ifdef FLB_HAVE_CHUNK_TRACE
if (ic->trace) {
flb_chunk_trace_filter(ic->trace, (void *)f_ins, &tm_start, &tm_finish, out_buf, out_size);
}
#endif /* FLB_HAVE_CHUNK_TRACE */

/* Point back the 'data' pointer to the new address */
ret = cio_chunk_get_content(ic->chunk,
(char **) &work_data, &cur_size);
if (ret != CIO_OK) {
flb_error("[filter] error retrieving data chunk");
}
else {
work_data += (cur_size - out_size);
work_size = out_size;
}
flb_free(out_buf);
}
}
}

*out_data = work_data;
*out_bytes = work_size;

flb_free(ntag);
}

Expand Down Expand Up @@ -660,6 +649,7 @@ void flb_filter_instance_destroy(struct flb_filter_instance *ins)
}

mk_list_del(&ins->_head);

flb_free(ins);
}

Expand Down

0 comments on commit b763ad5

Please sign in to comment.