Skip to content

Commit

Permalink
out_prometheus_exporter: Handle multiply concatenated metrics type of…
Browse files Browse the repository at this point in the history
… events

Signed-off-by: Hiroshi Hatake <[email protected]>
  • Loading branch information
cosmo0920 committed Jul 23, 2024
1 parent 574a69a commit 82a222f
Showing 1 changed file with 45 additions and 24 deletions.
69 changes: 45 additions & 24 deletions plugins/out_prometheus_exporter/prom.c
Original file line number Diff line number Diff line change
Expand Up @@ -175,44 +175,65 @@ static void cb_prom_flush(struct flb_event_chunk *event_chunk,
int add_ts;
size_t off = 0;
flb_sds_t metrics;
cfl_sds_t text;
cfl_sds_t text = NULL;
cfl_sds_t tmp = NULL;
struct cmt *cmt;
struct prom_exporter *ctx = out_context;
int ok = CMT_DECODE_MSGPACK_SUCCESS;

text = flb_sds_create_size(128);
if (text == NULL) {
flb_plg_debug(ctx->ins, "failed to allocate buffer for text representation of metrics");
FLB_OUTPUT_RETURN(FLB_ERROR);
}

/*
* A new set of metrics has arrived, perform decoding, apply labels,
* convert to Prometheus text format and store the output in the
* hash table for metrics.
* Note that metrics might be concatenated. So, we need to consume
* until the end of event_chunk.
*/
ret = cmt_decode_msgpack_create(&cmt,
(char *) event_chunk->data,
event_chunk->size, &off);
if (ret != 0) {
FLB_OUTPUT_RETURN(FLB_ERROR);
}
while ((ret = cmt_decode_msgpack_create(&cmt,
(char *) event_chunk->data,
event_chunk->size, &off)) == ok) {

/* append labels set by config */
append_labels(ctx, cmt);
if (ret != 0) {
FLB_OUTPUT_RETURN(FLB_ERROR);
}

/* add timestamp in the output format ? */
if (ctx->add_timestamp) {
add_ts = CMT_TRUE;
}
else {
add_ts = CMT_FALSE;
}
/* append labels set by config */
append_labels(ctx, cmt);

/* add timestamp in the output format ? */
if (ctx->add_timestamp) {
add_ts = CMT_TRUE;
}
else {
add_ts = CMT_FALSE;
}

/* convert to text representation */
text = cmt_encode_prometheus_create(cmt, add_ts);
if (!text) {
/* convert to text representation */
tmp = cmt_encode_prometheus_create(cmt, add_ts);
if (!tmp) {
cmt_destroy(cmt);
FLB_OUTPUT_RETURN(FLB_ERROR);
}
ret = flb_sds_cat_safe(&text, tmp, flb_sds_len(tmp));
if (ret != 0) {
flb_plg_error(ctx->ins, "could not concatenate text representant coming from: %s",
flb_input_name(ins));
cmt_encode_prometheus_destroy(tmp);
cmt_destroy(cmt);
FLB_OUTPUT_RETURN(FLB_ERROR);
}
cmt_encode_prometheus_destroy(tmp);
cmt_destroy(cmt);
FLB_OUTPUT_RETURN(FLB_ERROR);
}
cmt_destroy(cmt);

if (cfl_sds_len(text) == 0) {
flb_plg_debug(ctx->ins, "context without metrics (empty)");
cmt_encode_text_destroy(text);
flb_sds_destroy(text);
FLB_OUTPUT_RETURN(FLB_OK);
}

Expand All @@ -221,11 +242,11 @@ static void cb_prom_flush(struct flb_event_chunk *event_chunk,
if (ret == -1) {
flb_plg_error(ctx->ins, "could not store metrics coming from: %s",
flb_input_name(ins));
cmt_encode_prometheus_destroy(text);
flb_sds_destroy(text);
cmt_destroy(cmt);
FLB_OUTPUT_RETURN(FLB_ERROR);
}
cmt_encode_prometheus_destroy(text);
flb_sds_destroy(text);

/* retrieve a full copy of all metrics */
metrics = hash_format_metrics(ctx);
Expand Down

0 comments on commit 82a222f

Please sign in to comment.