From 350c9b5106af7f45e5e99d594109318b904c07a2 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 22 Feb 2024 20:06:35 +0900 Subject: [PATCH] # This is a combination of 4 commits. # This is the 1st commit message: processor: input_metric: Prevent dangling pointer if cmt context is recreated Signed-off-by: Hiroshi Hatake # This is the commit message #2: processor_labels: Follow change of the signature of metrics callback Signed-off-by: Hiroshi Hatake # This is the commit message #3: processor_selector: Implement selector processor for metrics For future extensibility, we use "selector" as a name for this processor. Signed-off-by: Hiroshi Hatake # This is the commit message #4: lib: Support setter for processor Signed-off-by: Hiroshi Hatake --- CMakeLists.txt | 2 + include/fluent-bit/flb_lib.h | 4 + include/fluent-bit/flb_processor.h | 3 +- plugins/CMakeLists.txt | 1 + plugins/processor_labels/labels.c | 19 + plugins/processor_selector/CMakeLists.txt | 4 + plugins/processor_selector/selector.c | 515 ++++++++++++++++++++++ plugins/processor_selector/selector.h | 63 +++ src/flb_input_metric.c | 30 +- src/flb_lib.c | 36 ++ src/flb_processor.c | 13 +- 11 files changed, 680 insertions(+), 10 deletions(-) create mode 100644 plugins/processor_selector/CMakeLists.txt create mode 100644 plugins/processor_selector/selector.c create mode 100644 plugins/processor_selector/selector.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 420ef82803c..c0ed00a1e1d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -285,6 +285,8 @@ option(FLB_FILTER_WASM "Enable WASM filter" option(FLB_PROCESSOR_LABELS "Enable metrics label manipulation processor" Yes) option(FLB_PROCESSOR_ATTRIBUTES "Enable atributes manipulation processor" Yes) option(FLB_PROCESSOR_CONTENT_MODIFIER "Enable content modifier processor" Yes) +option(FLB_PROCESSOR_SELECTOR "Enable selector processor" Yes) + if(DEFINED FLB_NIGHTLY_BUILD AND NOT "${FLB_NIGHTLY_BUILD}" STREQUAL "") FLB_DEFINITION_VAL(FLB_NIGHTLY_BUILD ${FLB_NIGHTLY_BUILD}) diff --git a/include/fluent-bit/flb_lib.h b/include/fluent-bit/flb_lib.h index 5e213c406f1..f96b06eff86 100644 --- a/include/fluent-bit/flb_lib.h +++ b/include/fluent-bit/flb_lib.h @@ -46,11 +46,15 @@ struct flb_lib_out_cb { /* For Fluent Bit library callers, we only export the following symbols */ typedef struct flb_lib_ctx flb_ctx_t; +struct flb_processor; + FLB_EXPORT void flb_init_env(); FLB_EXPORT flb_ctx_t *flb_create(); FLB_EXPORT void flb_destroy(flb_ctx_t *ctx); FLB_EXPORT int flb_input(flb_ctx_t *ctx, const char *input, void *data); +FLB_EXPORT int flb_input_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc); FLB_EXPORT int flb_output(flb_ctx_t *ctx, const char *output, struct flb_lib_out_cb *cb); +FLB_EXPORT int flb_output_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc); FLB_EXPORT int flb_filter(flb_ctx_t *ctx, const char *filter, void *data); FLB_EXPORT int flb_input_set(flb_ctx_t *ctx, int ffd, ...); FLB_EXPORT int flb_input_property_check(flb_ctx_t *ctx, int ffd, char *key, char *val); diff --git a/include/fluent-bit/flb_processor.h b/include/fluent-bit/flb_processor.h index 1ad0d0ae813..84949664334 100644 --- a/include/fluent-bit/flb_processor.h +++ b/include/fluent-bit/flb_processor.h @@ -143,7 +143,8 @@ struct flb_processor_plugin { int); int (*cb_process_metrics) (struct flb_processor_instance *, - struct cmt *, + struct cmt *, /* in */ + struct cmt **, /* out */ const char *, int); diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 46295f14e5a..3b2d7ce70c8 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -283,6 +283,7 @@ REGISTER_IN_PLUGIN("in_random") # ========== REGISTER_PROCESSOR_PLUGIN("processor_labels") REGISTER_PROCESSOR_PLUGIN("processor_content_modifier") +REGISTER_PROCESSOR_PLUGIN("processor_selector") # OUTPUTS # ======= diff --git a/plugins/processor_labels/labels.c b/plugins/processor_labels/labels.c index c3fc929d027..f6e3b63f81c 100644 --- a/plugins/processor_labels/labels.c +++ b/plugins/processor_labels/labels.c @@ -1696,15 +1696,23 @@ static int hash_labels(struct cmt *metrics_context, static int cb_process_metrics(struct flb_processor_instance *processor_instance, struct cmt *metrics_context, + struct cmt **out_context, const char *tag, int tag_len) { + struct cmt *out_cmt; struct internal_processor_context *processor_context; int result; processor_context = (struct internal_processor_context *) processor_instance->context; + out_cmt = cmt_create(); + if (out_cmt == NULL) { + flb_plg_error(processor_instance, "could not create out_cmt context"); + return FLB_PROCESSOR_FAILURE; + } + result = delete_labels(metrics_context, &processor_context->delete_labels); @@ -1728,6 +1736,17 @@ static int cb_process_metrics(struct flb_processor_instance *processor_instance, &processor_context->hash_labels); } + if (result == FLB_PROCESSOR_SUCCESS) { + result = cmt_cat(out_cmt, metrics_context); + if (result != 0) { + cmt_destroy(out_cmt); + + return FLB_PROCESSOR_FAILURE; + } + + *out_context = out_cmt; + } + if (result != FLB_PROCESSOR_SUCCESS) { return FLB_PROCESSOR_FAILURE; } diff --git a/plugins/processor_selector/CMakeLists.txt b/plugins/processor_selector/CMakeLists.txt new file mode 100644 index 00000000000..ced84096d9d --- /dev/null +++ b/plugins/processor_selector/CMakeLists.txt @@ -0,0 +1,4 @@ +set(src + selector.c) + +FLB_PLUGIN(processor_selector "${src}" "") diff --git a/plugins/processor_selector/selector.c b/plugins/processor_selector/selector.c new file mode 100644 index 00000000000..4f9fdab5c3c --- /dev/null +++ b/plugins/processor_selector/selector.c @@ -0,0 +1,515 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "selector.h" + +static void delete_metrics_rules(struct selector_ctx *ctx) +{ + struct mk_list *tmp; + struct mk_list *head; + struct metrics_rule *metrics_rule; + + mk_list_foreach_safe(head, tmp, &ctx->metrics_rules) { + metrics_rule = mk_list_entry(head, struct metrics_rule, _head); + flb_free(metrics_rule->regex_pattern); + flb_regex_destroy(metrics_rule->regex); + mk_list_del(&metrics_rule->_head); + flb_free(metrics_rule); + } +} + +static void destroy_context(struct selector_ctx *context) +{ + if (context != NULL) { + delete_metrics_rules(context); + flb_free(context); + } +} + +static int set_metrics_rules(struct selector_ctx *ctx, struct flb_processor_instance *p_ins) +{ + int first_rule = SELECTOR_NO_RULE; + struct mk_list *head; + struct flb_kv *kv; + struct metrics_rule *metrics_rule; + + /* Iterate all filter properties for metrics.regex and metrics.exclude */ + mk_list_foreach(head, &p_ins->properties) { + kv = mk_list_entry(head, struct flb_kv, _head); + + /* Create a new rule */ + metrics_rule = flb_malloc(sizeof(struct metrics_rule)); + if (!metrics_rule) { + flb_errno(); + return -1; + } + + /* Get the type */ + if (strncasecmp(kv->key, "metrics.regex", 13) == 0) { + metrics_rule->type = SELECTOR_REGEX; + } + else if (strncasecmp(kv->key, "metrics.exclude", 15) == 0) { + metrics_rule->type = SELECTOR_EXCLUDE; + } + else { + /* Other property. Skip */ + flb_free(metrics_rule); + continue; + } + + if (ctx->logical_op != SELECTOR_LOGICAL_OP_LEGACY && first_rule != SELECTOR_NO_RULE) { + /* 'AND'/'OR' case */ + if (first_rule != metrics_rule->type) { + flb_plg_error(ctx->ins, "Both 'metrics.regex' and 'metrics.exclude' are set."); + delete_metrics_rules(ctx); + flb_free(metrics_rule); + return -1; + } + } + first_rule = metrics_rule->type; + + /* Get name (regular expression) */ + metrics_rule->regex_pattern = flb_strndup(kv->val, strlen(kv->val)); + if (metrics_rule->regex_pattern == NULL) { + flb_errno(); + delete_metrics_rules(ctx); + flb_free(metrics_rule); + return -1; + } + + /* Convert string to regex pattern for metrics */ + metrics_rule->regex = flb_regex_create(metrics_rule->regex_pattern); + if (!metrics_rule->regex) { + flb_plg_error(ctx->ins, "could not compile regex pattern '%s'", + metrics_rule->regex_pattern); + delete_metrics_rules(ctx); + flb_free(metrics_rule); + return -1; + } + + /* Link to parent list */ + mk_list_add(&metrics_rule->_head, &ctx->metrics_rules); + } + + return 0; +} + +static struct selector_ctx * + create_context(struct flb_processor_instance *processor_instance, + struct flb_config *config) +{ + int ret; + int result; + size_t len; + const char *val; + struct selector_ctx *ctx; + + ctx = flb_malloc(sizeof(struct selector_ctx)); + if (ctx != NULL) { + ctx->ins = processor_instance; + ctx->config = config; + + mk_list_init(&ctx->metrics_rules); + + result = flb_processor_instance_config_map_set(processor_instance, (void *) ctx); + + if (result == 0) { + ctx->logical_op = SELECTOR_LOGICAL_OP_LEGACY; + val = flb_processor_instance_get_property("logical_op", processor_instance); + if (val != NULL) { + len = strlen(val); + if (len == 3 && strncasecmp("AND", val, len) == 0) { + flb_plg_info(ctx->ins, "AND mode"); + ctx->logical_op = SELECTOR_LOGICAL_OP_AND; + } + else if (len == 2 && strncasecmp("OR", val, len) == 0) { + flb_plg_info(ctx->ins, "OR mode"); + ctx->logical_op = SELECTOR_LOGICAL_OP_OR; + } + else if (len == 6 && strncasecmp("legacy", val, len) == 0) { + flb_plg_info(ctx->ins, "legacy mode"); + ctx->logical_op = SELECTOR_LOGICAL_OP_LEGACY; + } + } + } + + if (result == 0) { + /* Load rules */ + ret = set_metrics_rules(ctx, processor_instance); + if (ret == -1) { + destroy_context(ctx); + ctx = NULL; + + return ctx; + } + } + + if (result != 0) { + destroy_context(ctx); + + ctx = NULL; + } + } + else { + flb_errno(); + } + + return ctx; +} + + +static int cb_selector_init(struct flb_processor_instance *processor_instance, + void *source_plugin_instance, + int source_plugin_type, + struct flb_config *config) +{ + /* Create context */ + processor_instance->context = (void *) create_context( + processor_instance, config); + + if (processor_instance->context == NULL) { + return FLB_PROCESSOR_FAILURE; + } + + return FLB_PROCESSOR_SUCCESS; +} + +#ifdef FLB_HAVE_METRICS +static int cmt_regex_match(void *ctx, const char *str, size_t slen) +{ + int ret; + struct flb_regex *r = (struct flb_regex *)ctx; + unsigned char *s = (unsigned char *)str; + ret = flb_regex_match(r, s, slen); + + if (ret == 1) { + ret = CMT_TRUE; + } + else { + ret = CMT_FALSE; + } + + return ret; +} + +static int cmt_regex_exclude(void *ctx, const char *str, size_t slen) +{ + int ret; + struct flb_regex *r = (struct flb_regex *)ctx; + unsigned char *s = (unsigned char *)str; + ret = flb_regex_match(r, s, slen); + + if (ret == 1) { + ret = CMT_FALSE; + } + else { + ret = CMT_TRUE; + } + + return ret; +} + +static inline int selector_metrics_or_op(struct cmt *cmt, struct cmt *out_cmt, struct selector_ctx *ctx) +{ + ssize_t ret; + int found = FLB_FALSE; + struct mk_list *head; + struct metrics_rule *metrics_rule; + struct cmt *tmp = NULL; + struct cmt *filtered = NULL; + + /* For each rule, validate against cmt context */ + mk_list_foreach(head, &ctx->metrics_rules) { + found = FLB_FALSE; + metrics_rule = mk_list_entry(head, struct metrics_rule, _head); + + tmp = cmt_create(); + if (tmp == NULL) { + flb_plg_error(ctx->ins, "could not create tmp context"); + return SELECTOR_FAILURE; + } + cmt_cat(tmp, cmt); + filtered = cmt_create(); + if (filtered == NULL) { + flb_plg_error(ctx->ins, "could not create filtered context"); + return SELECTOR_FAILURE; + } + + if (metrics_rule->type == SELECTOR_REGEX) { + ret = cmt_filter(filtered, tmp, NULL, NULL, metrics_rule->regex, cmt_regex_match, 0); + } + else if (metrics_rule->type == SELECTOR_EXCLUDE) { + ret = cmt_filter(filtered, tmp, NULL, NULL, metrics_rule->regex, cmt_regex_exclude, 0); + } + if (ret == 0) { + found = FLB_TRUE; + } + + if (found == FLB_TRUE) { + cmt_cat(out_cmt, filtered); + } + cmt_destroy(tmp); + cmt_destroy(filtered); + } + + if (metrics_rule->type == SELECTOR_REGEX) { + return found ? SELECTOR_RET_KEEP : SELECTOR_RET_EXCLUDE; + } + + /* The last rule is exclude */ + return found ? SELECTOR_RET_EXCLUDE : SELECTOR_RET_KEEP; +} + + +static inline int selector_metrics_and_op(struct cmt *cmt, struct cmt *out_cmt, + struct selector_ctx *ctx) +{ + int ret; + int found = FLB_FALSE; + struct mk_list *head; + struct metrics_rule *metrics_rule; + struct cmt *tmp = NULL; + struct cmt *swap = NULL; + struct cmt *filtered = NULL; + size_t rule_size; + int count = 1; + rule_size = mk_list_size(&ctx->metrics_rules); + + /* For each rule, validate against cmt context */ + mk_list_foreach(head, &ctx->metrics_rules) { + found = FLB_FALSE; + metrics_rule = mk_list_entry(head, struct metrics_rule, _head); + if (tmp == NULL) { + tmp = cmt_create(); + if (tmp == NULL) { + flb_plg_error(ctx->ins, "could not create tmp context"); + return SELECTOR_FAILURE; + } + cmt_cat(tmp, cmt); + } + filtered = cmt_create(); + if (filtered == NULL) { + flb_plg_error(ctx->ins, "could not create filtered context"); + cmt_destroy(tmp); + + return SELECTOR_FAILURE; + } + + if (metrics_rule->type == SELECTOR_REGEX) { + ret = cmt_filter(filtered, tmp, NULL, NULL, metrics_rule->regex, cmt_regex_match, 0); + } + else if (metrics_rule->type == SELECTOR_EXCLUDE) { + ret = cmt_filter(filtered, tmp, NULL, NULL, metrics_rule->regex, cmt_regex_exclude, 0); + } + + if (ret == 0) { + found = FLB_TRUE; + } + else if (ret != 0) { + flb_plg_debug(ctx->ins, "not matched for rule = \"%s\"", metrics_rule->regex_pattern); + } + + if (count >= rule_size) { + /* When tmp has a reference of swap, we just need to + * destroy swap instance here. */ + if (swap != NULL) { + cmt_destroy(swap); + swap = NULL; + } + else { + cmt_destroy(tmp); + } + cmt_cat(out_cmt, filtered); + cmt_destroy(filtered); + + goto selector_metrics_and_or_end; + } + + if (filtered != NULL && tmp != NULL) { + if (swap != NULL) { + cmt_destroy(swap); + swap = NULL; + } + swap = cmt_create(); + if (swap == NULL) { + flb_plg_error(ctx->ins, "could not create swap context"); + return SELECTOR_FAILURE; + } + cmt_cat(swap, filtered); + cmt_destroy(tmp); + cmt_destroy(filtered); + tmp = NULL; + tmp = swap; + } + count++; + } + + selector_metrics_and_or_end: + + if (metrics_rule->type == SELECTOR_REGEX) { + return found ? SELECTOR_RET_KEEP : SELECTOR_RET_EXCLUDE; + } + + /* The last rule is exclude */ + return found ? SELECTOR_RET_EXCLUDE : SELECTOR_RET_KEEP; +} + +/* Given a metrics context, do some select action based on the defined rules */ +static inline int selector_metrics(struct cmt *cmt, struct cmt *out_cmt, + struct selector_ctx *ctx) +{ + return selector_metrics_and_op(cmt, out_cmt, ctx); +} + +static inline int selector_metrics_and_or(struct cmt *cmt, struct cmt *out_cmt, + struct selector_ctx *ctx) +{ + ssize_t ret; + + if (ctx->logical_op == SELECTOR_LOGICAL_OP_OR) { + ret = selector_metrics_or_op(cmt, out_cmt, ctx); + } + else if (ctx->logical_op == SELECTOR_LOGICAL_OP_AND) { + ret = selector_metrics_and_op(cmt, out_cmt, ctx); + } + + return ret; +} + +static int process_metrics(struct flb_processor_instance *processor_instance, + struct cmt *metrics_context, + struct cmt **out_context, + const char *tag, + int tag_len) +{ + int ret; + struct selector_ctx *ctx; + struct cmt *out_cmt; + + ctx = (struct selector_ctx *) processor_instance->context; + + out_cmt = cmt_create(); + if (out_cmt == NULL) { + flb_plg_error(processor_instance, "could not create out_cmt context"); + return SELECTOR_FAILURE; + } + + if (ctx->logical_op == SELECTOR_LOGICAL_OP_LEGACY) { + ret = selector_metrics(metrics_context, out_cmt, ctx); + } + else { + ret = selector_metrics_and_or(metrics_context, out_cmt, ctx); + } + + if (ret == SELECTOR_RET_KEEP || ret == SELECTOR_RET_EXCLUDE) { + ret = SELECTOR_SUCCESS; + *out_context = out_cmt; + } + else { + /* destroy out_context contexts */ + cmt_destroy(out_cmt); + + ret = SELECTOR_FAILURE; + } + + return ret; +} +#endif + +static int cb_selector_process_metrics(struct flb_processor_instance *processor_instance, + struct cmt *metrics_context, + struct cmt **out_context, + const char *tag, + int tag_len) +{ + int result = SELECTOR_SUCCESS; + +#ifdef FLB_HAVE_METRICS + result = process_metrics(processor_instance, + metrics_context, + out_context, + tag, tag_len); +#endif + + if (result != SELECTOR_SUCCESS) { + return FLB_PROCESSOR_FAILURE; + } + + return FLB_PROCESSOR_SUCCESS; +} + +static int cb_selector_exit(struct flb_processor_instance *processor_instance) +{ + if (processor_instance != NULL && + processor_instance->context != NULL) { + destroy_context(processor_instance->context); + } + + return FLB_PROCESSOR_SUCCESS; +} + +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "metrics.regex", NULL, + FLB_CONFIG_MAP_MULT, FLB_FALSE, 0, + "Keep metrics in which the metric of name matches the regular expression." + }, + { + FLB_CONFIG_MAP_STR, "metrics.exclude", NULL, + FLB_CONFIG_MAP_MULT, FLB_FALSE, 0, + "Exclude metrics in which the metric of name matches the regular expression." + }, + { + FLB_CONFIG_MAP_STR, "logical_op", "legacy", + 0, FLB_FALSE, 0, + "Specify whether to use logical conjuciton or disjunction. legacy, AND and OR are allowed." + }, + /* EOF */ + {0} +}; + +struct flb_processor_plugin processor_selector_plugin = { + .name = "selector", + .description = "select metrics by specified name", + .cb_init = cb_selector_init, + .cb_process_logs = NULL, + .cb_process_metrics = cb_selector_process_metrics, + .cb_process_traces = NULL, + .cb_exit = cb_selector_exit, + .config_map = config_map, + .flags = 0 +}; diff --git a/plugins/processor_selector/selector.h b/plugins/processor_selector/selector.h new file mode 100644 index 00000000000..fcb8a4a1894 --- /dev/null +++ b/plugins/processor_selector/selector.h @@ -0,0 +1,63 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_PROCESSOR_SELECTOR_H +#define FLB_PROCESSOR_SELECTOR_H + +#include +#include +#include +#include +#include + +/* rule types */ +#define SELECTOR_NO_RULE 0 +#define SELECTOR_REGEX 1 +#define SELECTOR_EXCLUDE 2 + +/* actions */ +#define SELECTOR_RET_KEEP 0 +#define SELECTOR_RET_EXCLUDE 1 + +#define SELECTOR_SUCCESS 0 +#define SELECTOR_NOTOUCH 1 +#define SELECTOR_FAILURE 2 + +enum _logical_op{ + SELECTOR_LOGICAL_OP_LEGACY, + SELECTOR_LOGICAL_OP_OR, + SELECTOR_LOGICAL_OP_AND +} logical_op; + +struct selector_ctx { + struct mk_list rules; + struct mk_list metrics_rules; + int logical_op; + struct flb_processor_instance *ins; + struct flb_config *config; +}; + +struct metrics_rule { + int type; + char *regex_pattern; + struct flb_regex *regex; + struct mk_list _head; +}; + +#endif diff --git a/src/flb_input_metric.c b/src/flb_input_metric.c index 9c4fa478d6d..8b3652a85f9 100644 --- a/src/flb_input_metric.c +++ b/src/flb_input_metric.c @@ -32,6 +32,7 @@ static int input_metrics_append(struct flb_input_instance *ins, char *mt_buf; size_t mt_size; int processor_is_active; + struct cmt *out_context = NULL; processor_is_active = flb_processor_is_active(ins->processor); if (processor_is_active) { @@ -51,20 +52,35 @@ static int input_metrics_append(struct flb_input_instance *ins, FLB_PROCESSOR_METRICS, tag, tag_len, - (char *) cmt, - 0, NULL, NULL); + (char *) cmt, 0, + (void **)&out_context, NULL); if (ret == -1) { return -1; } } - /* Convert metrics to msgpack */ - ret = cmt_encode_msgpack_create(cmt, &mt_buf, &mt_size); - if (ret != 0) { - flb_plg_error(ins, "could not encode metrics"); - return -1; + if (out_context != NULL) { + /* Convert metrics to msgpack */ + ret = cmt_encode_msgpack_create(out_context, &mt_buf, &mt_size); + if (ret != 0) { + flb_plg_error(ins, "could not encode metrics"); + cmt_destroy(out_context); + + return -1; + } + + cmt_destroy(out_context); + } + else { + /* Convert metrics to msgpack */ + ret = cmt_encode_msgpack_create(cmt, &mt_buf, &mt_size); + if (ret != 0) { + flb_plg_error(ins, "could not encode metrics"); + return -1; + + } } /* Append packed metrics */ diff --git a/src/flb_lib.c b/src/flb_lib.c index b77f29b7a1d..1821a2e61ad 100644 --- a/src/flb_lib.c +++ b/src/flb_lib.c @@ -329,6 +329,24 @@ int flb_input_set(flb_ctx_t *ctx, int ffd, ...) return 0; } +int flb_input_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc) +{ + struct flb_input_instance *i_ins; + + i_ins = in_instance_get(ctx, ffd); + if (!i_ins) { + return -1; + } + + if (i_ins->processor) { + flb_processor_destroy(i_ins->processor); + } + + i_ins->processor = proc; + + return 0; +} + static inline int flb_config_map_property_check(char *plugin_name, struct mk_list *config_map, char *key, char *val) { struct flb_kv *kv; @@ -465,6 +483,24 @@ int flb_output_set(flb_ctx_t *ctx, int ffd, ...) return 0; } +int flb_output_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc) +{ + struct flb_output_instance *o_ins; + + o_ins = out_instance_get(ctx, ffd); + if (!o_ins) { + return -1; + } + + if (o_ins->processor) { + flb_processor_destroy(o_ins->processor); + } + + o_ins->processor = proc; + + return 0; +} + int flb_output_set_callback(flb_ctx_t *ctx, int ffd, char *name, void (*cb)(char *, void *, void *)) { diff --git a/src/flb_processor.c b/src/flb_processor.c index a7a6e2b20d9..8b9369eb5ea 100644 --- a/src/flb_processor.c +++ b/src/flb_processor.c @@ -430,9 +430,9 @@ int flb_processor_run(struct flb_processor *proc, { int ret; int finalize; - void *cur_buf; + void *cur_buf = NULL; size_t cur_size; - void *tmp_buf; + void *tmp_buf = NULL; size_t tmp_size; struct mk_list *head; struct mk_list *list = NULL; @@ -640,6 +640,7 @@ int flb_processor_run(struct flb_processor *proc, if (p_ins->p->cb_process_metrics != NULL) { ret = p_ins->p->cb_process_metrics(p_ins, (struct cmt *) cur_buf, + (struct cmt **) &tmp_buf, tag, tag_len); @@ -648,8 +649,16 @@ int flb_processor_run(struct flb_processor *proc, FLB_PROCESSOR_LOCK_RETRY_LIMIT, FLB_PROCESSOR_LOCK_RETRY_DELAY); + out_buf = NULL; + return -1; } + + if (cur_buf != data) { + cmt_destroy(cur_buf); + } + + cur_buf = (void *)tmp_buf; } } else if (type == FLB_PROCESSOR_TRACES) {