From d34a9e7c427f9b170e5efcaceb3b8cacb8b420ef Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 23 Jul 2024 16:43:44 +0900 Subject: [PATCH] out_prometheus_exporter: Handle multiply concatenated metrics type of events Signed-off-by: Hiroshi Hatake --- plugins/out_prometheus_exporter/prom.c | 70 +++++++++++++++++--------- 1 file changed, 46 insertions(+), 24 deletions(-) diff --git a/plugins/out_prometheus_exporter/prom.c b/plugins/out_prometheus_exporter/prom.c index e120f6c1761..b79854078a7 100644 --- a/plugins/out_prometheus_exporter/prom.c +++ b/plugins/out_prometheus_exporter/prom.c @@ -175,44 +175,66 @@ static void cb_prom_flush(struct flb_event_chunk *event_chunk, int add_ts; size_t off = 0; flb_sds_t metrics; - cfl_sds_t text; + cfl_sds_t text = NULL; + cfl_sds_t tmp = NULL; struct cmt *cmt; struct prom_exporter *ctx = out_context; + int ok = CMT_DECODE_MSGPACK_SUCCESS; + + text = flb_sds_create_size(128); + if (text == NULL) { + flb_plg_debug(ctx->ins, "failed to allocate buffer for text representation of metrics"); + FLB_OUTPUT_RETURN(FLB_ERROR); + } /* * A new set of metrics has arrived, perform decoding, apply labels, * convert to Prometheus text format and store the output in the * hash table for metrics. + * Note that metrics might be concatenated. So, we need to consume + * until the end of event_chunk. */ - ret = cmt_decode_msgpack_create(&cmt, - (char *) event_chunk->data, - event_chunk->size, &off); - if (ret != 0) { - FLB_OUTPUT_RETURN(FLB_ERROR); - } + while ((ret = cmt_decode_msgpack_create(&cmt, + (char *) event_chunk->data, + event_chunk->size, &off)) == ok) { - /* append labels set by config */ - append_labels(ctx, cmt); + if (ret != 0) { + FLB_OUTPUT_RETURN(FLB_ERROR); + } - /* add timestamp in the output format ? */ - if (ctx->add_timestamp) { - add_ts = CMT_TRUE; - } - else { - add_ts = CMT_FALSE; - } + /* append labels set by config */ + append_labels(ctx, cmt); + + /* add timestamp in the output format ? */ + if (ctx->add_timestamp) { + add_ts = CMT_TRUE; + } + else { + add_ts = CMT_FALSE; + } - /* convert to text representation */ - text = cmt_encode_prometheus_create(cmt, add_ts); - if (!text) { + /* convert to text representation */ + tmp = cmt_encode_prometheus_create(cmt, add_ts); + if (!tmp) { + cmt_destroy(cmt); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + ret = flb_sds_cat_safe(&text, tmp, flb_sds_len(tmp)); + if (ret != 0) { + flb_plg_error(ctx->ins, "could not concatenate text representant coming from: %s", + flb_input_name(ins)); + cmt_encode_prometheus_destroy(tmp); + flb_sds_destroy(text); + cmt_destroy(cmt); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + cmt_encode_prometheus_destroy(tmp); cmt_destroy(cmt); - FLB_OUTPUT_RETURN(FLB_ERROR); } - cmt_destroy(cmt); if (cfl_sds_len(text) == 0) { flb_plg_debug(ctx->ins, "context without metrics (empty)"); - cmt_encode_text_destroy(text); + flb_sds_destroy(text); FLB_OUTPUT_RETURN(FLB_OK); } @@ -221,11 +243,11 @@ static void cb_prom_flush(struct flb_event_chunk *event_chunk, if (ret == -1) { flb_plg_error(ctx->ins, "could not store metrics coming from: %s", flb_input_name(ins)); - cmt_encode_prometheus_destroy(text); + flb_sds_destroy(text); cmt_destroy(cmt); FLB_OUTPUT_RETURN(FLB_ERROR); } - cmt_encode_prometheus_destroy(text); + flb_sds_destroy(text); /* retrieve a full copy of all metrics */ metrics = hash_format_metrics(ctx);