Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics_exporter: add support for filter metrics inside processors #9341

Merged
merged 1 commit into from
Nov 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions src/flb_metrics_exporter.c
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,10 @@ int flb_me_destroy(struct flb_me *me)
struct cmt *flb_me_get_cmetrics(struct flb_config *ctx)
{
int ret;
struct mk_list *head;
struct mk_list *head, *processor_head;
struct flb_input_instance *i; /* inputs */
struct flb_filter_instance *f; /* filter */
struct flb_processor_unit *pu; /* processors */
struct flb_filter_instance *f, *pf; /* filter */
struct flb_output_instance *o; /* output */
struct cmt *cmt;

Expand Down Expand Up @@ -308,6 +309,19 @@ struct cmt *flb_me_get_cmetrics(struct flb_config *ctx)
cmt_destroy(cmt);
return NULL;
}

mk_list_foreach(processor_head, &i->processor->logs) {
pu = mk_list_entry(processor_head, struct flb_processor_unit, _head);
if (pu->unit_type == FLB_PROCESSOR_UNIT_FILTER) {
pf = (struct flb_filter_instance *) pu->ctx;
ret = cmt_cat(cmt, pf->cmt);
if (ret == -1) {
flb_error("[metrics exporter] could not append metrics from %s", flb_filter_name(pf));
cmt_destroy(cmt);
return NULL;
}
}
}
}

mk_list_foreach(head, &ctx->filters) {
Expand All @@ -330,6 +344,19 @@ struct cmt *flb_me_get_cmetrics(struct flb_config *ctx)
cmt_destroy(cmt);
return NULL;
}

mk_list_foreach(processor_head, &o->processor->logs) {
pu = mk_list_entry(processor_head, struct flb_processor_unit, _head);
if (pu->unit_type == FLB_PROCESSOR_UNIT_FILTER) {
pf = (struct flb_filter_instance *) pu->ctx;
ret = cmt_cat(cmt, pf->cmt);
if (ret == -1) {
flb_error("[metrics exporter] could not append metrics from %s", flb_filter_name(pf));
cmt_destroy(cmt);
return NULL;
}
}
}
}

return cmt;
Expand Down
59 changes: 58 additions & 1 deletion src/flb_processor.c
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,13 @@ int flb_processor_run(struct flb_processor *proc,
struct flb_filter_instance *f_ins;
struct flb_processor_instance *p_ins;
struct flb_mp_chunk_cobj *chunk_cobj = NULL;
#ifdef FLB_HAVE_METRICS
int in_records = 0;
int out_records = 0;
int diff = 0;
uint64_t ts;
char *name;
#endif

if (type == FLB_PROCESSOR_LOGS) {
list = &proc->logs;
Expand All @@ -453,6 +460,11 @@ int flb_processor_run(struct flb_processor *proc,
list = &proc->traces;
}

#ifdef FLB_HAVE_METRICS
/* timestamp */
ts = cfl_time_now();
#endif

/* set current data buffer */
cur_buf = data;
cur_size = data_size;
Expand Down Expand Up @@ -493,7 +505,17 @@ int flb_processor_run(struct flb_processor *proc,
proc->data, /* (input/output) instance context */
f_ins->context, /* filter context */
proc->config);

#ifdef FLB_HAVE_METRICS
name = (char *) (flb_filter_name(f_ins));
in_records = flb_mp_count(cur_buf, cur_size);
cmt_counter_add(f_ins->cmt_records, ts, in_records,
1, (char *[]) {name});
cmt_counter_add(f_ins->cmt_bytes, ts, tmp_size,
1, (char *[]) {name});

flb_metrics_sum(FLB_METRIC_N_RECORDS, in_records, f_ins->metrics);
flb_metrics_sum(FLB_METRIC_N_BYTES, tmp_size, f_ins->metrics);
#endif
/*
* The cb_filter() function return status tells us if something changed
* during it process. The possible values are:
Expand All @@ -520,6 +542,15 @@ int flb_processor_run(struct flb_processor *proc,
*out_buf = NULL;
*out_size = 0;

#ifdef FLB_HAVE_METRICS
/* cmetrics */
cmt_counter_add(f_ins->cmt_drop_records, ts, in_records,
1, (char *[]) {name});

/* [OLD] Summarize all records removed */
flb_metrics_sum(FLB_METRIC_N_DROPPED,
in_records, f_ins->metrics);
#endif
release_lock(&pu->lock,
FLB_PROCESSOR_LOCK_RETRY_LIMIT,
FLB_PROCESSOR_LOCK_RETRY_DELAY);
Expand All @@ -530,6 +561,32 @@ int flb_processor_run(struct flb_processor *proc,
/* set new buffer */
cur_buf = tmp_buf;
cur_size = tmp_size;
out_records = flb_mp_count(tmp_buf, tmp_size);
#ifdef FLB_HAVE_METRICS
if (out_records > in_records) {
diff = (out_records - in_records);

/* cmetrics */
cmt_counter_add(f_ins->cmt_add_records, ts, diff,
1, (char *[]) {name});

/* [OLD] Summarize new records */
flb_metrics_sum(FLB_METRIC_N_ADDED,
diff, f_ins->metrics);
}
else if (out_records < in_records) {
diff = (in_records - out_records);

/* cmetrics */
cmt_counter_add(f_ins->cmt_drop_records, ts, diff,
1, (char *[]) {name});

/* [OLD] Summarize dropped records */
flb_metrics_sum(FLB_METRIC_N_DROPPED,
diff, f_ins->metrics);
}
#endif

}
else if (ret == FLB_FILTER_NOTOUCH) {
/* keep original data, do nothing */
Expand Down
Loading