Skip to content

Commit

Permalink
processor: add support for filter metrics inside input & output proce…
Browse files Browse the repository at this point in the history
…ssors (#9341)

Signed-off-by: juicer <[email protected]>
  • Loading branch information
cw-Guo authored Nov 9, 2024
1 parent 91e16e3 commit 22e7667
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 3 deletions.
31 changes: 29 additions & 2 deletions src/flb_metrics_exporter.c
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,10 @@ int flb_me_destroy(struct flb_me *me)
struct cmt *flb_me_get_cmetrics(struct flb_config *ctx)
{
int ret;
struct mk_list *head;
struct mk_list *head, *processor_head;
struct flb_input_instance *i; /* inputs */
struct flb_filter_instance *f; /* filter */
struct flb_processor_unit *pu; /* processors */
struct flb_filter_instance *f, *pf; /* filter */
struct flb_output_instance *o; /* output */
struct cmt *cmt;

Expand Down Expand Up @@ -308,6 +309,19 @@ struct cmt *flb_me_get_cmetrics(struct flb_config *ctx)
cmt_destroy(cmt);
return NULL;
}

mk_list_foreach(processor_head, &i->processor->logs) {
pu = mk_list_entry(processor_head, struct flb_processor_unit, _head);
if (pu->unit_type == FLB_PROCESSOR_UNIT_FILTER) {
pf = (struct flb_filter_instance *) pu->ctx;
ret = cmt_cat(cmt, pf->cmt);
if (ret == -1) {
flb_error("[metrics exporter] could not append metrics from %s", flb_filter_name(pf));
cmt_destroy(cmt);
return NULL;
}
}
}
}

mk_list_foreach(head, &ctx->filters) {
Expand All @@ -330,6 +344,19 @@ struct cmt *flb_me_get_cmetrics(struct flb_config *ctx)
cmt_destroy(cmt);
return NULL;
}

mk_list_foreach(processor_head, &o->processor->logs) {
pu = mk_list_entry(processor_head, struct flb_processor_unit, _head);
if (pu->unit_type == FLB_PROCESSOR_UNIT_FILTER) {
pf = (struct flb_filter_instance *) pu->ctx;
ret = cmt_cat(cmt, pf->cmt);
if (ret == -1) {
flb_error("[metrics exporter] could not append metrics from %s", flb_filter_name(pf));
cmt_destroy(cmt);
return NULL;
}
}
}
}

return cmt;
Expand Down
59 changes: 58 additions & 1 deletion src/flb_processor.c
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,13 @@ int flb_processor_run(struct flb_processor *proc,
struct flb_filter_instance *f_ins;
struct flb_processor_instance *p_ins;
struct flb_mp_chunk_cobj *chunk_cobj = NULL;
#ifdef FLB_HAVE_METRICS
int in_records = 0;
int out_records = 0;
int diff = 0;
uint64_t ts;
char *name;
#endif

if (type == FLB_PROCESSOR_LOGS) {
list = &proc->logs;
Expand All @@ -459,6 +466,11 @@ int flb_processor_run(struct flb_processor *proc,
list = &proc->traces;
}

#ifdef FLB_HAVE_METRICS
/* timestamp */
ts = cfl_time_now();
#endif

/* set current data buffer */
cur_buf = data;
cur_size = data_size;
Expand Down Expand Up @@ -499,7 +511,17 @@ int flb_processor_run(struct flb_processor *proc,
proc->data, /* (input/output) instance context */
f_ins->context, /* filter context */
proc->config);

#ifdef FLB_HAVE_METRICS
name = (char *) (flb_filter_name(f_ins));
in_records = flb_mp_count(cur_buf, cur_size);
cmt_counter_add(f_ins->cmt_records, ts, in_records,
1, (char *[]) {name});
cmt_counter_add(f_ins->cmt_bytes, ts, tmp_size,
1, (char *[]) {name});

flb_metrics_sum(FLB_METRIC_N_RECORDS, in_records, f_ins->metrics);
flb_metrics_sum(FLB_METRIC_N_BYTES, tmp_size, f_ins->metrics);
#endif
/*
* The cb_filter() function return status tells us if something changed
* during it process. The possible values are:
Expand All @@ -526,6 +548,15 @@ int flb_processor_run(struct flb_processor *proc,
*out_buf = NULL;
*out_size = 0;

#ifdef FLB_HAVE_METRICS
/* cmetrics */
cmt_counter_add(f_ins->cmt_drop_records, ts, in_records,
1, (char *[]) {name});

/* [OLD] Summarize all records removed */
flb_metrics_sum(FLB_METRIC_N_DROPPED,
in_records, f_ins->metrics);
#endif
release_lock(&pu->lock,
FLB_PROCESSOR_LOCK_RETRY_LIMIT,
FLB_PROCESSOR_LOCK_RETRY_DELAY);
Expand All @@ -536,6 +567,32 @@ int flb_processor_run(struct flb_processor *proc,
/* set new buffer */
cur_buf = tmp_buf;
cur_size = tmp_size;
out_records = flb_mp_count(tmp_buf, tmp_size);
#ifdef FLB_HAVE_METRICS
if (out_records > in_records) {
diff = (out_records - in_records);

/* cmetrics */
cmt_counter_add(f_ins->cmt_add_records, ts, diff,
1, (char *[]) {name});

/* [OLD] Summarize new records */
flb_metrics_sum(FLB_METRIC_N_ADDED,
diff, f_ins->metrics);
}
else if (out_records < in_records) {
diff = (in_records - out_records);

/* cmetrics */
cmt_counter_add(f_ins->cmt_drop_records, ts, diff,
1, (char *[]) {name});

/* [OLD] Summarize dropped records */
flb_metrics_sum(FLB_METRIC_N_DROPPED,
diff, f_ins->metrics);
}
#endif

}
else if (ret == FLB_FILTER_NOTOUCH) {
/* keep original data, do nothing */
Expand Down

0 comments on commit 22e7667

Please sign in to comment.