diff --git a/CMakeLists.txt b/CMakeLists.txt index c0ed00a1e1d..fe5fb98881a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -283,9 +283,8 @@ option(FLB_FILTER_GEOIP2 "Enable geoip2 filter" option(FLB_FILTER_NIGHTFALL "Enable Nightfall filter" Yes) option(FLB_FILTER_WASM "Enable WASM filter" Yes) option(FLB_PROCESSOR_LABELS "Enable metrics label manipulation processor" Yes) -option(FLB_PROCESSOR_ATTRIBUTES "Enable atributes manipulation processor" Yes) +option(FLB_PROCESSOR_METRICS_SELECTOR "Enable metrics selector processor" Yes) option(FLB_PROCESSOR_CONTENT_MODIFIER "Enable content modifier processor" Yes) -option(FLB_PROCESSOR_SELECTOR "Enable selector processor" Yes) if(DEFINED FLB_NIGHTLY_BUILD AND NOT "${FLB_NIGHTLY_BUILD}" STREQUAL "") diff --git a/include/fluent-bit/flb_metrics.h b/include/fluent-bit/flb_metrics.h index ccd4694ebb1..25b0443224c 100644 --- a/include/fluent-bit/flb_metrics.h +++ b/include/fluent-bit/flb_metrics.h @@ -39,6 +39,7 @@ #include #include #include +#include /* Metrics IDs for general purpose (used by core and Plugins */ #define FLB_METRIC_N_RECORDS 0 diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 3b2d7ce70c8..0f8090e7e09 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -282,8 +282,8 @@ REGISTER_IN_PLUGIN("in_random") # PROCESSORS # ========== REGISTER_PROCESSOR_PLUGIN("processor_labels") +REGISTER_PROCESSOR_PLUGIN("processor_metrics_selector") REGISTER_PROCESSOR_PLUGIN("processor_content_modifier") -REGISTER_PROCESSOR_PLUGIN("processor_selector") # OUTPUTS # ======= diff --git a/plugins/processor_metrics_selector/CMakeLists.txt b/plugins/processor_metrics_selector/CMakeLists.txt new file mode 100644 index 00000000000..d151570ea47 --- /dev/null +++ b/plugins/processor_metrics_selector/CMakeLists.txt @@ -0,0 +1,4 @@ +set(src + selector.c) + +FLB_PLUGIN(processor_metrics_selector "${src}" "") diff --git a/plugins/processor_metrics_selector/selector.c b/plugins/processor_metrics_selector/selector.c new file mode 100644 index 00000000000..1e30f905d7e --- /dev/null +++ b/plugins/processor_metrics_selector/selector.c @@ -0,0 +1,394 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "selector.h" + +static void delete_metrics_rules(struct selector_ctx *ctx) +{ + if (ctx->name_regex != NULL) { + flb_regex_destroy(ctx->name_regex); + } +} + +static void destroy_context(struct selector_ctx *context) +{ + if (context != NULL) { + delete_metrics_rules(context); + flb_free(context->selector_pattern); + flb_free(context); + } +} + +static int set_metrics_rules(struct selector_ctx *ctx, struct flb_processor_instance *p_ins) +{ + const char *action; + const char *metric_name; + const char *op_type; + const char *context; + size_t name_len = 0; + + action = flb_processor_instance_get_property("action", p_ins); + if (action == NULL) { + ctx->action_type = SELECTOR_INCLUDE; + } + else if (strncasecmp(action, "include", 7) == 0) { + flb_plg_debug(ctx->ins, "action type INCLUDE"); + ctx->action_type = SELECTOR_INCLUDE; + } + else if (strncasecmp(action, "exclude", 7) == 0) { + flb_plg_debug(ctx->ins, "action type EXCLUDE"); + ctx->action_type = SELECTOR_EXCLUDE; + } + else { + flb_plg_error(ctx->ins, "unknown action type '%s'", action); + return -1; + } + + metric_name = flb_processor_instance_get_property("metric_name", p_ins); + if (metric_name == NULL) { + flb_plg_error(ctx->ins, "metric_name is needed for selector"); + return -1; + } + ctx->selector_pattern = flb_strdup(metric_name); + name_len = strlen(metric_name); + + op_type = flb_processor_instance_get_property("operation_type", p_ins); + if (op_type == NULL) { + ctx->op_type = SELECTOR_OPERATION_PREFIX; + } + else if (strncasecmp(op_type, "prefix", 6) == 0) { + flb_plg_debug(ctx->ins, "operation type PREFIX"); + ctx->op_type = SELECTOR_OPERATION_PREFIX; + } + else if (strncasecmp(op_type, "substring", 9) == 0) { + flb_plg_debug(ctx->ins, "operation type SUBSTRING"); + ctx->op_type = SELECTOR_OPERATION_SUBSTRING; + } + else { + flb_plg_error(ctx->ins, "unknown action type '%s'", op_type); + return -1; + } + + if (ctx->selector_pattern[0] == '/' && ctx->selector_pattern[name_len-1] == '/') { + /* Convert string to regex pattern for metrics */ + ctx->name_regex = flb_regex_create(ctx->selector_pattern); + if (!ctx->name_regex) { + flb_plg_error(ctx->ins, "could not compile regex pattern '%s'", + ctx->selector_pattern); + return -1; + } + ctx->op_type = SELECTOR_OPERATION_REGEX; + } + + context = flb_processor_instance_get_property("context", p_ins); + if (context == NULL) { + ctx->context_type = SELECTOR_CONTEXT_FQNAME; + } + else if (strncasecmp(context, "metric_name", 11) == 0) { + ctx->context_type = SELECTOR_CONTEXT_FQNAME; + } + else { + flb_plg_error(ctx->ins, "unknown context '%s'", context); + delete_metrics_rules(ctx); + return -1; + } + + return 0; +} + +static struct selector_ctx * + create_context(struct flb_processor_instance *processor_instance, + struct flb_config *config) +{ + int result; + struct selector_ctx *ctx; + + ctx = flb_malloc(sizeof(struct selector_ctx)); + if (ctx != NULL) { + ctx->ins = processor_instance; + ctx->config = config; + ctx->name_regex = NULL; + + result = flb_processor_instance_config_map_set(processor_instance, (void *) ctx); + + if (result == 0) { + /* Load rules */ + result= set_metrics_rules(ctx, processor_instance); + if (result == -1) { + destroy_context(ctx); + ctx = NULL; + + return ctx; + } + } + + if (result != 0) { + destroy_context(ctx); + + ctx = NULL; + } + } + else { + flb_errno(); + } + + return ctx; +} + + +static int cb_selector_init(struct flb_processor_instance *processor_instance, + void *source_plugin_instance, + int source_plugin_type, + struct flb_config *config) +{ + /* Create context */ + processor_instance->context = (void *) create_context( + processor_instance, config); + + if (processor_instance->context == NULL) { + return FLB_PROCESSOR_FAILURE; + } + + return FLB_PROCESSOR_SUCCESS; +} + +#ifdef FLB_HAVE_METRICS +static int cmt_regex_match(void *ctx, const char *str, size_t slen) +{ + int ret; + struct flb_regex *r = (struct flb_regex *)ctx; + unsigned char *s = (unsigned char *)str; + ret = flb_regex_match(r, s, slen); + + if (ret == 1) { + ret = CMT_TRUE; + } + else { + ret = CMT_FALSE; + } + + return ret; +} + +static int cmt_regex_exclude(void *ctx, const char *str, size_t slen) +{ + int ret; + struct flb_regex *r = (struct flb_regex *)ctx; + unsigned char *s = (unsigned char *)str; + ret = flb_regex_match(r, s, slen); + + if (ret == 1) { + ret = CMT_FALSE; + } + else { + ret = CMT_TRUE; + } + + return ret; +} + +static inline int selector_metrics_process_fqname(struct cmt *cmt, struct cmt *out_cmt, + struct selector_ctx *ctx) +{ + int ret; + int found = FLB_FALSE; + struct cmt *filtered = NULL; + int flags = 0; + + /* On processor_selector, we only process one rule in each of contexts */ + + filtered = cmt_create(); + if (filtered == NULL) { + flb_plg_error(ctx->ins, "could not create filtered context"); + + return SELECTOR_FAILURE; + } + + if (ctx->op_type == SELECTOR_OPERATION_REGEX) { + if (ctx->action_type == SELECTOR_INCLUDE) { + ret = cmt_filter(filtered, cmt, NULL, NULL, ctx->name_regex, cmt_regex_match, 0); + } + else if (ctx->action_type == SELECTOR_EXCLUDE) { + ret = cmt_filter(filtered, cmt, NULL, NULL, ctx->name_regex, cmt_regex_exclude, 0); + } + } + else if (ctx->selector_pattern != NULL) { + if (ctx->action_type == SELECTOR_EXCLUDE) { + flags |= CMT_FILTER_EXCLUDE; + } + + if (ctx->op_type == SELECTOR_OPERATION_PREFIX) { + flags |= CMT_FILTER_PREFIX; + } + else if (ctx->op_type == SELECTOR_OPERATION_SUBSTRING) { + flags |= CMT_FILTER_SUBSTRING; + } + + ret = cmt_filter(filtered, cmt, ctx->selector_pattern, NULL, NULL, NULL, flags); + } + + if (ret == 0) { + found = FLB_TRUE; + } + else if (ret != 0) { + flb_plg_debug(ctx->ins, "not matched for rule = \"%s\"", ctx->selector_pattern); + } + + cmt_cat(out_cmt, filtered); + cmt_destroy(filtered); + + if (ctx->action_type == SELECTOR_INCLUDE) { + return found ? SELECTOR_RET_KEEP : SELECTOR_RET_EXCLUDE; + } + + /* The last rule is exclude */ + return found ? SELECTOR_RET_EXCLUDE : SELECTOR_RET_KEEP; +} + +/* Given a metrics context, do some select action based on the defined rules */ +static inline int selector_metrics(struct cmt *cmt, struct cmt *out_cmt, + struct selector_ctx *ctx) +{ + if (ctx->context_type == SELECTOR_CONTEXT_FQNAME) { + return selector_metrics_process_fqname(cmt, out_cmt, ctx); + } + + return 0; +} + +static int process_metrics(struct flb_processor_instance *processor_instance, + struct cmt *metrics_context, + struct cmt **out_context, + const char *tag, + int tag_len) +{ + int ret; + struct selector_ctx *ctx; + struct cmt *out_cmt; + + ctx = (struct selector_ctx *) processor_instance->context; + + out_cmt = cmt_create(); + if (out_cmt == NULL) { + flb_plg_error(processor_instance, "could not create out_cmt context"); + return SELECTOR_FAILURE; + } + + ret = selector_metrics(metrics_context, out_cmt, ctx); + + if (ret == SELECTOR_RET_KEEP || ret == SELECTOR_RET_EXCLUDE) { + ret = SELECTOR_SUCCESS; + *out_context = out_cmt; + } + else { + /* destroy out_context contexts */ + cmt_destroy(out_cmt); + + ret = SELECTOR_FAILURE; + } + + return ret; +} +#endif + +static int cb_selector_process_metrics(struct flb_processor_instance *processor_instance, + struct cmt *metrics_context, + struct cmt **out_context, + const char *tag, + int tag_len) +{ + int result = SELECTOR_SUCCESS; + +#ifdef FLB_HAVE_METRICS + result = process_metrics(processor_instance, + metrics_context, + out_context, + tag, tag_len); +#endif + + if (result != SELECTOR_SUCCESS) { + return FLB_PROCESSOR_FAILURE; + } + + return FLB_PROCESSOR_SUCCESS; +} + +static int cb_selector_exit(struct flb_processor_instance *processor_instance) +{ + if (processor_instance != NULL && + processor_instance->context != NULL) { + destroy_context(processor_instance->context); + } + + return FLB_PROCESSOR_SUCCESS; +} + +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "metric_name", NULL, + FLB_CONFIG_MAP_MULT, FLB_FALSE, 0, + "Keep metrics in which the metric of name matches with the actual name or the regular expression." + }, + { + FLB_CONFIG_MAP_STR, "context", NULL, + FLB_CONFIG_MAP_MULT, FLB_FALSE, 0, + "Specify matching context. Currently, metric_name is only supported." + }, + { + FLB_CONFIG_MAP_STR, "action", NULL, + 0, FLB_FALSE, 0, + "Specify the action for specified metrics. INCLUDE and EXCLUDE are allowed." + }, + { + FLB_CONFIG_MAP_STR, "operation_type", NULL, + 0, FLB_FALSE, 0, + "Specify the operation type of action for metrics payloads. PREFIX and SUBSTRING are allowed." + }, /* EOF */ + {0} +}; + +struct flb_processor_plugin processor_metrics_selector_plugin = { + .name = "metrics_selector", + .description = "select metrics by specified name", + .cb_init = cb_selector_init, + .cb_process_logs = NULL, + .cb_process_metrics = cb_selector_process_metrics, + .cb_process_traces = NULL, + .cb_exit = cb_selector_exit, + .config_map = config_map, + .flags = 0 +}; diff --git a/plugins/processor_selector/selector.h b/plugins/processor_metrics_selector/selector.h similarity index 77% rename from plugins/processor_selector/selector.h rename to plugins/processor_metrics_selector/selector.h index fcb8a4a1894..502c34f9d83 100644 --- a/plugins/processor_selector/selector.h +++ b/plugins/processor_metrics_selector/selector.h @@ -28,7 +28,7 @@ /* rule types */ #define SELECTOR_NO_RULE 0 -#define SELECTOR_REGEX 1 +#define SELECTOR_INCLUDE 1 #define SELECTOR_EXCLUDE 2 /* actions */ @@ -39,25 +39,25 @@ #define SELECTOR_NOTOUCH 1 #define SELECTOR_FAILURE 2 -enum _logical_op{ - SELECTOR_LOGICAL_OP_LEGACY, - SELECTOR_LOGICAL_OP_OR, - SELECTOR_LOGICAL_OP_AND -} logical_op; +#define SELECTOR_OPERATION_REGEX 0 +#define SELECTOR_OPERATION_PREFIX 1 +#define SELECTOR_OPERATION_SUBSTRING 2 + +/* context */ +#define SELECTOR_CONTEXT_FQNAME 0 +#define SELECTOR_CONTEXT_LABELS 1 +#define SELECTOR_CONTEXT_DESC 2 struct selector_ctx { - struct mk_list rules; struct mk_list metrics_rules; - int logical_op; + flb_sds_t action; + int action_type; + int op_type; + int context_type; + char *selector_pattern; + struct flb_regex *name_regex; struct flb_processor_instance *ins; struct flb_config *config; }; -struct metrics_rule { - int type; - char *regex_pattern; - struct flb_regex *regex; - struct mk_list _head; -}; - #endif diff --git a/plugins/processor_selector/CMakeLists.txt b/plugins/processor_selector/CMakeLists.txt deleted file mode 100644 index ced84096d9d..00000000000 --- a/plugins/processor_selector/CMakeLists.txt +++ /dev/null @@ -1,4 +0,0 @@ -set(src - selector.c) - -FLB_PLUGIN(processor_selector "${src}" "") diff --git a/plugins/processor_selector/selector.c b/plugins/processor_selector/selector.c deleted file mode 100644 index 4f9fdab5c3c..00000000000 --- a/plugins/processor_selector/selector.c +++ /dev/null @@ -1,515 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2024 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "selector.h" - -static void delete_metrics_rules(struct selector_ctx *ctx) -{ - struct mk_list *tmp; - struct mk_list *head; - struct metrics_rule *metrics_rule; - - mk_list_foreach_safe(head, tmp, &ctx->metrics_rules) { - metrics_rule = mk_list_entry(head, struct metrics_rule, _head); - flb_free(metrics_rule->regex_pattern); - flb_regex_destroy(metrics_rule->regex); - mk_list_del(&metrics_rule->_head); - flb_free(metrics_rule); - } -} - -static void destroy_context(struct selector_ctx *context) -{ - if (context != NULL) { - delete_metrics_rules(context); - flb_free(context); - } -} - -static int set_metrics_rules(struct selector_ctx *ctx, struct flb_processor_instance *p_ins) -{ - int first_rule = SELECTOR_NO_RULE; - struct mk_list *head; - struct flb_kv *kv; - struct metrics_rule *metrics_rule; - - /* Iterate all filter properties for metrics.regex and metrics.exclude */ - mk_list_foreach(head, &p_ins->properties) { - kv = mk_list_entry(head, struct flb_kv, _head); - - /* Create a new rule */ - metrics_rule = flb_malloc(sizeof(struct metrics_rule)); - if (!metrics_rule) { - flb_errno(); - return -1; - } - - /* Get the type */ - if (strncasecmp(kv->key, "metrics.regex", 13) == 0) { - metrics_rule->type = SELECTOR_REGEX; - } - else if (strncasecmp(kv->key, "metrics.exclude", 15) == 0) { - metrics_rule->type = SELECTOR_EXCLUDE; - } - else { - /* Other property. Skip */ - flb_free(metrics_rule); - continue; - } - - if (ctx->logical_op != SELECTOR_LOGICAL_OP_LEGACY && first_rule != SELECTOR_NO_RULE) { - /* 'AND'/'OR' case */ - if (first_rule != metrics_rule->type) { - flb_plg_error(ctx->ins, "Both 'metrics.regex' and 'metrics.exclude' are set."); - delete_metrics_rules(ctx); - flb_free(metrics_rule); - return -1; - } - } - first_rule = metrics_rule->type; - - /* Get name (regular expression) */ - metrics_rule->regex_pattern = flb_strndup(kv->val, strlen(kv->val)); - if (metrics_rule->regex_pattern == NULL) { - flb_errno(); - delete_metrics_rules(ctx); - flb_free(metrics_rule); - return -1; - } - - /* Convert string to regex pattern for metrics */ - metrics_rule->regex = flb_regex_create(metrics_rule->regex_pattern); - if (!metrics_rule->regex) { - flb_plg_error(ctx->ins, "could not compile regex pattern '%s'", - metrics_rule->regex_pattern); - delete_metrics_rules(ctx); - flb_free(metrics_rule); - return -1; - } - - /* Link to parent list */ - mk_list_add(&metrics_rule->_head, &ctx->metrics_rules); - } - - return 0; -} - -static struct selector_ctx * - create_context(struct flb_processor_instance *processor_instance, - struct flb_config *config) -{ - int ret; - int result; - size_t len; - const char *val; - struct selector_ctx *ctx; - - ctx = flb_malloc(sizeof(struct selector_ctx)); - if (ctx != NULL) { - ctx->ins = processor_instance; - ctx->config = config; - - mk_list_init(&ctx->metrics_rules); - - result = flb_processor_instance_config_map_set(processor_instance, (void *) ctx); - - if (result == 0) { - ctx->logical_op = SELECTOR_LOGICAL_OP_LEGACY; - val = flb_processor_instance_get_property("logical_op", processor_instance); - if (val != NULL) { - len = strlen(val); - if (len == 3 && strncasecmp("AND", val, len) == 0) { - flb_plg_info(ctx->ins, "AND mode"); - ctx->logical_op = SELECTOR_LOGICAL_OP_AND; - } - else if (len == 2 && strncasecmp("OR", val, len) == 0) { - flb_plg_info(ctx->ins, "OR mode"); - ctx->logical_op = SELECTOR_LOGICAL_OP_OR; - } - else if (len == 6 && strncasecmp("legacy", val, len) == 0) { - flb_plg_info(ctx->ins, "legacy mode"); - ctx->logical_op = SELECTOR_LOGICAL_OP_LEGACY; - } - } - } - - if (result == 0) { - /* Load rules */ - ret = set_metrics_rules(ctx, processor_instance); - if (ret == -1) { - destroy_context(ctx); - ctx = NULL; - - return ctx; - } - } - - if (result != 0) { - destroy_context(ctx); - - ctx = NULL; - } - } - else { - flb_errno(); - } - - return ctx; -} - - -static int cb_selector_init(struct flb_processor_instance *processor_instance, - void *source_plugin_instance, - int source_plugin_type, - struct flb_config *config) -{ - /* Create context */ - processor_instance->context = (void *) create_context( - processor_instance, config); - - if (processor_instance->context == NULL) { - return FLB_PROCESSOR_FAILURE; - } - - return FLB_PROCESSOR_SUCCESS; -} - -#ifdef FLB_HAVE_METRICS -static int cmt_regex_match(void *ctx, const char *str, size_t slen) -{ - int ret; - struct flb_regex *r = (struct flb_regex *)ctx; - unsigned char *s = (unsigned char *)str; - ret = flb_regex_match(r, s, slen); - - if (ret == 1) { - ret = CMT_TRUE; - } - else { - ret = CMT_FALSE; - } - - return ret; -} - -static int cmt_regex_exclude(void *ctx, const char *str, size_t slen) -{ - int ret; - struct flb_regex *r = (struct flb_regex *)ctx; - unsigned char *s = (unsigned char *)str; - ret = flb_regex_match(r, s, slen); - - if (ret == 1) { - ret = CMT_FALSE; - } - else { - ret = CMT_TRUE; - } - - return ret; -} - -static inline int selector_metrics_or_op(struct cmt *cmt, struct cmt *out_cmt, struct selector_ctx *ctx) -{ - ssize_t ret; - int found = FLB_FALSE; - struct mk_list *head; - struct metrics_rule *metrics_rule; - struct cmt *tmp = NULL; - struct cmt *filtered = NULL; - - /* For each rule, validate against cmt context */ - mk_list_foreach(head, &ctx->metrics_rules) { - found = FLB_FALSE; - metrics_rule = mk_list_entry(head, struct metrics_rule, _head); - - tmp = cmt_create(); - if (tmp == NULL) { - flb_plg_error(ctx->ins, "could not create tmp context"); - return SELECTOR_FAILURE; - } - cmt_cat(tmp, cmt); - filtered = cmt_create(); - if (filtered == NULL) { - flb_plg_error(ctx->ins, "could not create filtered context"); - return SELECTOR_FAILURE; - } - - if (metrics_rule->type == SELECTOR_REGEX) { - ret = cmt_filter(filtered, tmp, NULL, NULL, metrics_rule->regex, cmt_regex_match, 0); - } - else if (metrics_rule->type == SELECTOR_EXCLUDE) { - ret = cmt_filter(filtered, tmp, NULL, NULL, metrics_rule->regex, cmt_regex_exclude, 0); - } - if (ret == 0) { - found = FLB_TRUE; - } - - if (found == FLB_TRUE) { - cmt_cat(out_cmt, filtered); - } - cmt_destroy(tmp); - cmt_destroy(filtered); - } - - if (metrics_rule->type == SELECTOR_REGEX) { - return found ? SELECTOR_RET_KEEP : SELECTOR_RET_EXCLUDE; - } - - /* The last rule is exclude */ - return found ? SELECTOR_RET_EXCLUDE : SELECTOR_RET_KEEP; -} - - -static inline int selector_metrics_and_op(struct cmt *cmt, struct cmt *out_cmt, - struct selector_ctx *ctx) -{ - int ret; - int found = FLB_FALSE; - struct mk_list *head; - struct metrics_rule *metrics_rule; - struct cmt *tmp = NULL; - struct cmt *swap = NULL; - struct cmt *filtered = NULL; - size_t rule_size; - int count = 1; - rule_size = mk_list_size(&ctx->metrics_rules); - - /* For each rule, validate against cmt context */ - mk_list_foreach(head, &ctx->metrics_rules) { - found = FLB_FALSE; - metrics_rule = mk_list_entry(head, struct metrics_rule, _head); - if (tmp == NULL) { - tmp = cmt_create(); - if (tmp == NULL) { - flb_plg_error(ctx->ins, "could not create tmp context"); - return SELECTOR_FAILURE; - } - cmt_cat(tmp, cmt); - } - filtered = cmt_create(); - if (filtered == NULL) { - flb_plg_error(ctx->ins, "could not create filtered context"); - cmt_destroy(tmp); - - return SELECTOR_FAILURE; - } - - if (metrics_rule->type == SELECTOR_REGEX) { - ret = cmt_filter(filtered, tmp, NULL, NULL, metrics_rule->regex, cmt_regex_match, 0); - } - else if (metrics_rule->type == SELECTOR_EXCLUDE) { - ret = cmt_filter(filtered, tmp, NULL, NULL, metrics_rule->regex, cmt_regex_exclude, 0); - } - - if (ret == 0) { - found = FLB_TRUE; - } - else if (ret != 0) { - flb_plg_debug(ctx->ins, "not matched for rule = \"%s\"", metrics_rule->regex_pattern); - } - - if (count >= rule_size) { - /* When tmp has a reference of swap, we just need to - * destroy swap instance here. */ - if (swap != NULL) { - cmt_destroy(swap); - swap = NULL; - } - else { - cmt_destroy(tmp); - } - cmt_cat(out_cmt, filtered); - cmt_destroy(filtered); - - goto selector_metrics_and_or_end; - } - - if (filtered != NULL && tmp != NULL) { - if (swap != NULL) { - cmt_destroy(swap); - swap = NULL; - } - swap = cmt_create(); - if (swap == NULL) { - flb_plg_error(ctx->ins, "could not create swap context"); - return SELECTOR_FAILURE; - } - cmt_cat(swap, filtered); - cmt_destroy(tmp); - cmt_destroy(filtered); - tmp = NULL; - tmp = swap; - } - count++; - } - - selector_metrics_and_or_end: - - if (metrics_rule->type == SELECTOR_REGEX) { - return found ? SELECTOR_RET_KEEP : SELECTOR_RET_EXCLUDE; - } - - /* The last rule is exclude */ - return found ? SELECTOR_RET_EXCLUDE : SELECTOR_RET_KEEP; -} - -/* Given a metrics context, do some select action based on the defined rules */ -static inline int selector_metrics(struct cmt *cmt, struct cmt *out_cmt, - struct selector_ctx *ctx) -{ - return selector_metrics_and_op(cmt, out_cmt, ctx); -} - -static inline int selector_metrics_and_or(struct cmt *cmt, struct cmt *out_cmt, - struct selector_ctx *ctx) -{ - ssize_t ret; - - if (ctx->logical_op == SELECTOR_LOGICAL_OP_OR) { - ret = selector_metrics_or_op(cmt, out_cmt, ctx); - } - else if (ctx->logical_op == SELECTOR_LOGICAL_OP_AND) { - ret = selector_metrics_and_op(cmt, out_cmt, ctx); - } - - return ret; -} - -static int process_metrics(struct flb_processor_instance *processor_instance, - struct cmt *metrics_context, - struct cmt **out_context, - const char *tag, - int tag_len) -{ - int ret; - struct selector_ctx *ctx; - struct cmt *out_cmt; - - ctx = (struct selector_ctx *) processor_instance->context; - - out_cmt = cmt_create(); - if (out_cmt == NULL) { - flb_plg_error(processor_instance, "could not create out_cmt context"); - return SELECTOR_FAILURE; - } - - if (ctx->logical_op == SELECTOR_LOGICAL_OP_LEGACY) { - ret = selector_metrics(metrics_context, out_cmt, ctx); - } - else { - ret = selector_metrics_and_or(metrics_context, out_cmt, ctx); - } - - if (ret == SELECTOR_RET_KEEP || ret == SELECTOR_RET_EXCLUDE) { - ret = SELECTOR_SUCCESS; - *out_context = out_cmt; - } - else { - /* destroy out_context contexts */ - cmt_destroy(out_cmt); - - ret = SELECTOR_FAILURE; - } - - return ret; -} -#endif - -static int cb_selector_process_metrics(struct flb_processor_instance *processor_instance, - struct cmt *metrics_context, - struct cmt **out_context, - const char *tag, - int tag_len) -{ - int result = SELECTOR_SUCCESS; - -#ifdef FLB_HAVE_METRICS - result = process_metrics(processor_instance, - metrics_context, - out_context, - tag, tag_len); -#endif - - if (result != SELECTOR_SUCCESS) { - return FLB_PROCESSOR_FAILURE; - } - - return FLB_PROCESSOR_SUCCESS; -} - -static int cb_selector_exit(struct flb_processor_instance *processor_instance) -{ - if (processor_instance != NULL && - processor_instance->context != NULL) { - destroy_context(processor_instance->context); - } - - return FLB_PROCESSOR_SUCCESS; -} - -static struct flb_config_map config_map[] = { - { - FLB_CONFIG_MAP_STR, "metrics.regex", NULL, - FLB_CONFIG_MAP_MULT, FLB_FALSE, 0, - "Keep metrics in which the metric of name matches the regular expression." - }, - { - FLB_CONFIG_MAP_STR, "metrics.exclude", NULL, - FLB_CONFIG_MAP_MULT, FLB_FALSE, 0, - "Exclude metrics in which the metric of name matches the regular expression." - }, - { - FLB_CONFIG_MAP_STR, "logical_op", "legacy", - 0, FLB_FALSE, 0, - "Specify whether to use logical conjuciton or disjunction. legacy, AND and OR are allowed." - }, - /* EOF */ - {0} -}; - -struct flb_processor_plugin processor_selector_plugin = { - .name = "selector", - .description = "select metrics by specified name", - .cb_init = cb_selector_init, - .cb_process_logs = NULL, - .cb_process_metrics = cb_selector_process_metrics, - .cb_process_traces = NULL, - .cb_exit = cb_selector_exit, - .config_map = config_map, - .flags = 0 -}; diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index f673a5fbe87..b3fd7d18684 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -130,6 +130,9 @@ if (FLB_CUSTOM_CALYPTIA) FLB_RT_TEST(FLB_CUSTOM_CALYPTIA "custom_calyptia_test.c") endif() +if (FLB_PROCESSOR_METRICS_SELECTOR) + FLB_RT_TEST(FLB_PROCESSOR_METRICS_SELECTOR "processor_metrics_selector.c") +endif() # HTTP Client Debug (requires -DFLB_HTTP_CLIENT_DEBUG=On) if(FLB_HTTP_CLIENT_DEBUG) diff --git a/tests/runtime/processor_metrics_selector.c b/tests/runtime/processor_metrics_selector.c new file mode 100644 index 00000000000..f25e9751a58 --- /dev/null +++ b/tests/runtime/processor_metrics_selector.c @@ -0,0 +1,565 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include +#include +#include "flb_tests_runtime.h" + +pthread_mutex_t result_mutex = PTHREAD_MUTEX_INITIALIZER; +int num_output = 0; + +static int cb_count_metrics_msgpack(void *record, size_t size, void *data) +{ + int i; + int ret; + size_t off = 0; + cfl_sds_t text = NULL; + struct cmt *cmt = NULL; + char *p; + + if (!TEST_CHECK(data != NULL)) { + flb_error("data is NULL"); + } + + /* get cmetrics context */ + ret = cmt_decode_msgpack_create(&cmt, (char *) record, size, &off); + if (ret != 0) { + flb_error("could not process metrics payload"); + return -1; + } + + /* convert to text representation */ + text = cmt_encode_text_create(cmt); + /* To inspect the metrics from the callback, just comment out below: */ + /* flb_info("[filter_grep][test] text = %s", text); */ + for (i = 0; i < strlen(text); i++) { + p = (char *)(text + i); + if (*p == '\n') { + num_output++; + } + } + + if (record) { + flb_free(record); + } + + /* destroy cmt context */ + cmt_destroy(cmt); + + cmt_encode_text_destroy(text); + + return 0; +} + + +static void clear_output_num() +{ + pthread_mutex_lock(&result_mutex); + num_output = 0; + pthread_mutex_unlock(&result_mutex); +} + +static int get_output_num() +{ + int ret; + pthread_mutex_lock(&result_mutex); + ret = num_output; + pthread_mutex_unlock(&result_mutex); + + return ret; +} + +#ifdef FLB_HAVE_METRICS +void flb_test_selector_regex_include(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct flb_processor *proc; + struct flb_processor_unit *pu; + struct cfl_variant var = { + .type = CFL_VARIANT_STRING, + .data.as_string = "/storage/", + }; + struct cfl_variant action = { + .type = CFL_VARIANT_STRING, + .data.as_string = "include", + }; + int got; + int n_metrics = 12; + int not_used = 0; + struct flb_lib_out_cb cb_data; + + /* Prepare output callback with expected result */ + cb_data.cb = cb_count_metrics_msgpack; + cb_data.data = ¬_used; + + ctx = flb_create(); + flb_service_set(ctx, + "Flush", "0.200000000", + "Grace", "2", + NULL); + + proc = flb_processor_create(ctx->config, "unit_test", NULL, 0); + TEST_CHECK(proc != NULL); + + pu = flb_processor_unit_create(proc, FLB_PROCESSOR_METRICS, "metrics_selector"); + TEST_CHECK(pu != NULL); + ret = flb_processor_unit_set_property(pu, "metric_name", &var); + TEST_CHECK(ret == 0); + ret = flb_processor_unit_set_property(pu, "action", &action); + TEST_CHECK(ret == 0); + + + /* Input */ + in_ffd = flb_input(ctx, (char *) "fluentbit_metrics", NULL); + TEST_CHECK(in_ffd >= 0); + ret = flb_input_set(ctx, in_ffd, "tag", "test", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "scrape_on_start", "true", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "scrape_interval", "1", NULL); + TEST_CHECK(ret == 0); + + /* set up processor */ + ret = flb_input_set_processor(ctx, in_ffd, proc); + TEST_CHECK(ret == 0); + + out_ffd = flb_output(ctx, (char *) "lib", &cb_data); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "test", NULL); + + clear_output_num(); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_time_msleep(1500); /* waiting flush */ + + got = get_output_num(); + if (!TEST_CHECK(got >= n_metrics)) { + TEST_MSG("expect: %d >= %d, got: %d < %d", got, n_metrics, got, n_metrics); + } + + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_selector_regex_exclude(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct flb_processor *proc; + struct flb_processor_unit *pu; + struct cfl_variant var = { + .type = CFL_VARIANT_STRING, + .data.as_string = "/input/", + }; + struct cfl_variant action = { + .type = CFL_VARIANT_STRING, + .data.as_string = "exclude", + }; + int got; + int n_metrics = 19; + int not_used = 0; + struct flb_lib_out_cb cb_data; + + /* Prepare output callback with expected result */ + cb_data.cb = cb_count_metrics_msgpack; + cb_data.data = ¬_used; + + ctx = flb_create(); + flb_service_set(ctx, + "Flush", "0.200000000", + "Grace", "2", + NULL); + + proc = flb_processor_create(ctx->config, "unit_test", NULL, 0); + TEST_CHECK(proc != NULL); + + pu = flb_processor_unit_create(proc, FLB_PROCESSOR_METRICS, "metrics_selector"); + TEST_CHECK(pu != NULL); + ret = flb_processor_unit_set_property(pu, "metric_name", &var); + TEST_CHECK(ret == 0); + ret = flb_processor_unit_set_property(pu, "action", &action); + TEST_CHECK(ret == 0); + + + /* Input */ + in_ffd = flb_input(ctx, (char *) "fluentbit_metrics", NULL); + TEST_CHECK(in_ffd >= 0); + ret = flb_input_set(ctx, in_ffd, "tag", "test", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "scrape_on_start", "true", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "scrape_interval", "1", NULL); + TEST_CHECK(ret == 0); + + /* set up processor */ + ret = flb_input_set_processor(ctx, in_ffd, proc); + TEST_CHECK(ret == 0); + + out_ffd = flb_output(ctx, (char *) "lib", &cb_data); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "test", NULL); + + clear_output_num(); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_time_msleep(1500); /* waiting flush */ + + got = get_output_num(); + if (!TEST_CHECK(got >= n_metrics)) { + TEST_MSG("expect: %d >= %d, got: %d < %d", got, n_metrics, got, n_metrics); + } + + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_selector_prefix_include(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct flb_processor *proc; + struct flb_processor_unit *pu; + struct cfl_variant var = { + .type = CFL_VARIANT_STRING, + .data.as_string = "fluentbit_input", + }; + struct cfl_variant action = { + .type = CFL_VARIANT_STRING, + .data.as_string = "include", + }; + struct cfl_variant op_type = { + .type = CFL_VARIANT_STRING, + .data.as_string = "prefix", + }; + int got; + int n_metrics = 11; + int not_used = 0; + struct flb_lib_out_cb cb_data; + + /* Prepare output callback with expected result */ + cb_data.cb = cb_count_metrics_msgpack; + cb_data.data = ¬_used; + + ctx = flb_create(); + flb_service_set(ctx, + "Flush", "0.200000000", + "Grace", "2", + NULL); + + proc = flb_processor_create(ctx->config, "unit_test", NULL, 0); + TEST_CHECK(proc != NULL); + + pu = flb_processor_unit_create(proc, FLB_PROCESSOR_METRICS, "metrics_selector"); + TEST_CHECK(pu != NULL); + ret = flb_processor_unit_set_property(pu, "metric_name", &var); + TEST_CHECK(ret == 0); + ret = flb_processor_unit_set_property(pu, "action", &action); + TEST_CHECK(ret == 0); + ret = flb_processor_unit_set_property(pu, "operation_type", &op_type); + TEST_CHECK(ret == 0); + + + /* Input */ + in_ffd = flb_input(ctx, (char *) "fluentbit_metrics", NULL); + TEST_CHECK(in_ffd >= 0); + ret = flb_input_set(ctx, in_ffd, "tag", "test", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "scrape_on_start", "true", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "scrape_interval", "1", NULL); + TEST_CHECK(ret == 0); + + /* set up processor */ + ret = flb_input_set_processor(ctx, in_ffd, proc); + TEST_CHECK(ret == 0); + + out_ffd = flb_output(ctx, (char *) "lib", &cb_data); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "test", NULL); + + clear_output_num(); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_time_msleep(1500); /* waiting flush */ + + got = get_output_num(); + if (!TEST_CHECK(got >= n_metrics)) { + TEST_MSG("expect: %d >= %d, got: %d < %d", got, n_metrics, got, n_metrics); + } + + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_selector_prefix_exclude(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct flb_processor *proc; + struct flb_processor_unit *pu; + struct cfl_variant var = { + .type = CFL_VARIANT_STRING, + .data.as_string = "fluentbit_storage", + }; + struct cfl_variant action = { + .type = CFL_VARIANT_STRING, + .data.as_string = "exclude", + }; + struct cfl_variant op_type = { + .type = CFL_VARIANT_STRING, + .data.as_string = "prefix", + }; + int got; + int n_metrics = 25; + int not_used = 0; + struct flb_lib_out_cb cb_data; + + /* Prepare output callback with expected result */ + cb_data.cb = cb_count_metrics_msgpack; + cb_data.data = ¬_used; + + ctx = flb_create(); + flb_service_set(ctx, + "Flush", "0.200000000", + "Grace", "2", + NULL); + + proc = flb_processor_create(ctx->config, "unit_test", NULL, 0); + TEST_CHECK(proc != NULL); + + pu = flb_processor_unit_create(proc, FLB_PROCESSOR_METRICS, "metrics_selector"); + TEST_CHECK(pu != NULL); + ret = flb_processor_unit_set_property(pu, "metric_name", &var); + TEST_CHECK(ret == 0); + ret = flb_processor_unit_set_property(pu, "action", &action); + TEST_CHECK(ret == 0); + ret = flb_processor_unit_set_property(pu, "operation_type", &op_type); + TEST_CHECK(ret == 0); + + + /* Input */ + in_ffd = flb_input(ctx, (char *) "fluentbit_metrics", NULL); + TEST_CHECK(in_ffd >= 0); + ret = flb_input_set(ctx, in_ffd, "tag", "test", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "scrape_on_start", "true", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "scrape_interval", "1", NULL); + TEST_CHECK(ret == 0); + + /* set up processor */ + ret = flb_input_set_processor(ctx, in_ffd, proc); + TEST_CHECK(ret == 0); + + out_ffd = flb_output(ctx, (char *) "lib", &cb_data); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "test", NULL); + + clear_output_num(); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_time_msleep(1500); /* waiting flush */ + + got = get_output_num(); + if (!TEST_CHECK(got >= n_metrics)) { + TEST_MSG("expect: %d >= %d, got: %d < %d", got, n_metrics, got, n_metrics); + } + + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_selector_substring_include(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct flb_processor *proc; + struct flb_processor_unit *pu; + struct cfl_variant var = { + .type = CFL_VARIANT_STRING, + .data.as_string = "dropped", + }; + struct cfl_variant action = { + .type = CFL_VARIANT_STRING, + .data.as_string = "include", + }; + struct cfl_variant op_type = { + .type = CFL_VARIANT_STRING, + .data.as_string = "substring", + }; + int got; + int n_metrics = 1; + int not_used = 0; + struct flb_lib_out_cb cb_data; + + /* Prepare output callback with expected result */ + cb_data.cb = cb_count_metrics_msgpack; + cb_data.data = ¬_used; + + ctx = flb_create(); + flb_service_set(ctx, + "Flush", "0.200000000", + "Grace", "2", + NULL); + + proc = flb_processor_create(ctx->config, "unit_test", NULL, 0); + TEST_CHECK(proc != NULL); + + pu = flb_processor_unit_create(proc, FLB_PROCESSOR_METRICS, "metrics_selector"); + TEST_CHECK(pu != NULL); + ret = flb_processor_unit_set_property(pu, "metric_name", &var); + TEST_CHECK(ret == 0); + ret = flb_processor_unit_set_property(pu, "action", &action); + TEST_CHECK(ret == 0); + ret = flb_processor_unit_set_property(pu, "operation_type", &op_type); + TEST_CHECK(ret == 0); + + + /* Input */ + in_ffd = flb_input(ctx, (char *) "fluentbit_metrics", NULL); + TEST_CHECK(in_ffd >= 0); + ret = flb_input_set(ctx, in_ffd, "tag", "test", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "scrape_on_start", "true", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "scrape_interval", "1", NULL); + TEST_CHECK(ret == 0); + + /* set up processor */ + ret = flb_input_set_processor(ctx, in_ffd, proc); + TEST_CHECK(ret == 0); + + out_ffd = flb_output(ctx, (char *) "lib", &cb_data); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "test", NULL); + + clear_output_num(); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_time_msleep(1500); /* waiting flush */ + + got = get_output_num(); + if (!TEST_CHECK(got >= n_metrics)) { + TEST_MSG("expect: %d >= %d, got: %d < %d", got, n_metrics, got, n_metrics); + } + + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_selector_substring_exclude(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct flb_processor *proc; + struct flb_processor_unit *pu; + struct cfl_variant var = { + .type = CFL_VARIANT_STRING, + .data.as_string = "connections", + }; + struct cfl_variant action = { + .type = CFL_VARIANT_STRING, + .data.as_string = "exclude", + }; + struct cfl_variant op_type = { + .type = CFL_VARIANT_STRING, + .data.as_string = "substring", + }; + int got; + int n_metrics = 28; + int not_used = 0; + struct flb_lib_out_cb cb_data; + + /* Prepare output callback with expected result */ + cb_data.cb = cb_count_metrics_msgpack; + cb_data.data = ¬_used; + + ctx = flb_create(); + flb_service_set(ctx, + "Flush", "0.200000000", + "Grace", "2", + NULL); + + proc = flb_processor_create(ctx->config, "unit_test", NULL, 0); + TEST_CHECK(proc != NULL); + + pu = flb_processor_unit_create(proc, FLB_PROCESSOR_METRICS, "metrics_selector"); + TEST_CHECK(pu != NULL); + ret = flb_processor_unit_set_property(pu, "metric_name", &var); + TEST_CHECK(ret == 0); + ret = flb_processor_unit_set_property(pu, "action", &action); + TEST_CHECK(ret == 0); + ret = flb_processor_unit_set_property(pu, "operation_type", &op_type); + TEST_CHECK(ret == 0); + + + /* Input */ + in_ffd = flb_input(ctx, (char *) "fluentbit_metrics", NULL); + TEST_CHECK(in_ffd >= 0); + ret = flb_input_set(ctx, in_ffd, "tag", "test", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "scrape_on_start", "true", NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx, in_ffd, "scrape_interval", "1", NULL); + TEST_CHECK(ret == 0); + + /* set up processor */ + ret = flb_input_set_processor(ctx, in_ffd, proc); + TEST_CHECK(ret == 0); + + out_ffd = flb_output(ctx, (char *) "lib", &cb_data); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "test", NULL); + + clear_output_num(); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_time_msleep(1500); /* waiting flush */ + + got = get_output_num(); + if (!TEST_CHECK(got >= n_metrics)) { + TEST_MSG("expect: %d >= %d, got: %d < %d", got, n_metrics, got, n_metrics); + } + + flb_stop(ctx); + flb_destroy(ctx); +} + +#endif + +/* Test list */ +TEST_LIST = { +#ifdef FLB_HAVE_METRICS + {"regex_include", flb_test_selector_regex_include}, + {"regex_exclude", flb_test_selector_regex_exclude}, + {"prefix_include", flb_test_selector_prefix_include}, + {"prefix_exclude", flb_test_selector_prefix_exclude}, + {"substring_include", flb_test_selector_substring_include}, + {"substring_exclude", flb_test_selector_substring_exclude}, +#endif + {NULL, NULL} +};