Skip to content

Commit

Permalink
filter_log_to_metrics: use common grep APIs
Browse files Browse the repository at this point in the history
Signed-off-by: Takahiro Yamashita <[email protected]>
  • Loading branch information
nokute78 committed May 7, 2023
1 parent f0deb4a commit a3abb18
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 150 deletions.
150 changes: 19 additions & 131 deletions plugins/filter_log_to_metrics/log_to_metrics.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <fluent-bit/flb_storage.h>
#include <fluent-bit/flb_str.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_grep.h>
#include <cmetrics/cmetrics.h>
#include <cmetrics/cmt_gauge.h>
#include <cmetrics/cmt_counter.h>
Expand All @@ -47,23 +48,6 @@ static char kubernetes_label_keys[NUMBER_OF_KUBERNETES_LABELS][16] =
"pod_id"
};

static void delete_rules(struct log_to_metrics_ctx *ctx)
{
struct mk_list *tmp;
struct mk_list *head;
struct grep_rule *rule;

mk_list_foreach_safe(head, tmp, &ctx->rules) {
rule = mk_list_entry(head, struct grep_rule, _head);
flb_sds_destroy(rule->field);
flb_free(rule->regex_pattern);
flb_ra_destroy(rule->ra);
flb_regex_destroy(rule->regex);
mk_list_del(&rule->_head);
flb_free(rule);
}
}

static int log_to_metrics_destroy(struct log_to_metrics_ctx *ctx)
{
int i;
Expand All @@ -78,7 +62,7 @@ static int log_to_metrics_destroy(struct log_to_metrics_ctx *ctx)
cmt_destroy(ctx->cmt);
}

delete_rules(ctx);
flb_grep_destroy(ctx->grep);

if (ctx->label_keys != NULL) {
for (i = 0; i < MAX_LABEL_COUNT; i++) {
Expand All @@ -96,133 +80,30 @@ static int log_to_metrics_destroy(struct log_to_metrics_ctx *ctx)
static int set_rules(struct log_to_metrics_ctx *ctx,
struct flb_filter_instance *f_ins)
{
flb_sds_t tmp;
int ret = 0;
struct mk_list *head;
struct mk_list *split;
struct flb_split_entry *sentry;
struct flb_kv *kv;
struct grep_rule *rule;

/* Iterate all filter properties */
mk_list_foreach(head, &f_ins->properties) {
kv = mk_list_entry(head, struct flb_kv, _head);

/* Create a new rule */
rule = flb_malloc(sizeof(struct grep_rule));
if (!rule) {
flb_errno();
return -1;
}

/* Get the type */
if (strcasecmp(kv->key, "regex") == 0) {
rule->type = GREP_REGEX;
ret = flb_grep_set_rule_str(ctx->grep, FLB_GREP_REGEX, kv->val);
}
else if (strcasecmp(kv->key, "exclude") == 0) {
rule->type = GREP_EXCLUDE;
}
else {
flb_free(rule);
continue;
}

/* As a value we expect a pair of field name and a regular expression */
split = flb_utils_split(kv->val, ' ', 1);
if (mk_list_size(split) != 2) {
flb_plg_error(ctx->ins,
"invalid regex, expected field and regular expression");
delete_rules(ctx);
flb_free(rule);
flb_utils_split_free(split);
return -1;
}

/* Get first value (field) */
sentry = mk_list_entry_first(split, struct flb_split_entry, _head);
if (*sentry->value == '$') {
rule->field = flb_sds_create_len(sentry->value, sentry->len);
}
else {
rule->field = flb_sds_create_size(sentry->len + 2);
tmp = flb_sds_cat(rule->field, "$", 1);
rule->field = tmp;

tmp = flb_sds_cat(rule->field, sentry->value, sentry->len);
rule->field = tmp;
}

/* Get remaining content (regular expression) */
sentry = mk_list_entry_last(split, struct flb_split_entry, _head);
rule->regex_pattern = flb_strndup(sentry->value, sentry->len);
if (rule->regex_pattern == NULL) {
flb_errno();
delete_rules(ctx);
flb_free(rule);
flb_utils_split_free(split);
return -1;
ret = flb_grep_set_rule_str(ctx->grep, FLB_GREP_EXCLUDE, kv->val);
}

/* Release split */
flb_utils_split_free(split);

/* Create a record accessor context for this rule */
rule->ra = flb_ra_create(rule->field, FLB_FALSE);
if (!rule->ra) {
flb_plg_error(ctx->ins, "invalid record accessor? '%s'",
rule->field);
delete_rules(ctx);
flb_free(rule);
return -1;
if (ret < 0) {
return ret;
}

/* Convert string to regex pattern */
rule->regex = flb_regex_create(rule->regex_pattern);
if (!rule->regex) {
flb_plg_error(ctx->ins, "could not compile regex pattern '%s'",
rule->regex_pattern);
delete_rules(ctx);
flb_free(rule);
return -1;
}

/* Link to parent list */
mk_list_add(&rule->_head, &ctx->rules);
}

return 0;
}

/* Given a msgpack record, do some filter action based on the defined rules */
static inline int grep_filter_data(msgpack_object map,
struct log_to_metrics_ctx *ctx)
{
ssize_t ret;
struct mk_list *head;
struct grep_rule *rule;

/* For each rule, validate against map fields */
mk_list_foreach(head, &ctx->rules) {
rule = mk_list_entry(head, struct grep_rule, _head);

ret = flb_ra_regex_match(rule->ra, map, rule->regex, NULL);
if (ret <= 0) { /* no match */
if (rule->type == GREP_REGEX) {
return GREP_RET_EXCLUDE;
}
}
else {
if (rule->type == GREP_EXCLUDE) {
return GREP_RET_EXCLUDE;
}
else {
return GREP_RET_KEEP;
}
}
}

return GREP_RET_KEEP;
}

static int set_labels(struct log_to_metrics_ctx *ctx,
char **label_keys,
int *label_counter,
Expand Down Expand Up @@ -422,6 +303,8 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
struct flb_input_instance *input_ins;
int label_count;
int i;
enum flb_grep_logical_op op = FLB_GREP_LOGICAL_OP_LEGACY;

/* Create context */
ctx = flb_malloc(sizeof(struct log_to_metrics_ctx));
if (!ctx) {
Expand All @@ -434,10 +317,15 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
flb_free(ctx);
return -1;
}
mk_list_init(&ctx->rules);

ctx->ins = f_ins;

ctx->grep =flb_grep_create(op);
if (ctx->grep == NULL) {
flb_plg_error(f_ins, "flb_grep_create failed");
flb_free(ctx);
return -1;
}

/* Load rules */
ret = set_rules(ctx, f_ins);
if (ret == -1) {
Expand Down Expand Up @@ -661,8 +549,8 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes,
/* get time and map */
map = root.via.array.ptr[1];

ret = grep_filter_data(map, context);
if (ret == GREP_RET_KEEP) {
ret = flb_grep_filter(map, ctx->grep);
if (ret == FLB_GREP_RET_KEEP) {
ts = cfl_time_now();
if(ctx->kubernetes_mode){
for(i = 0; i < NUMBER_OF_KUBERNETES_LABELS; i++){
Expand Down Expand Up @@ -829,7 +717,7 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes,
flb_free(label_values);
}
}
else if (ret == GREP_RET_EXCLUDE) {
else if (ret == FLB_GREP_RET_EXCLUDE) {
/* Do nothing */
}
}
Expand Down
21 changes: 2 additions & 19 deletions plugins/filter_log_to_metrics/log_to_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,7 @@
#include <fluent-bit/flb_filter.h>
#include <fluent-bit/flb_sds.h>
#include <fluent-bit/flb_record_accessor.h>

/* rule types */
#define GREP_REGEX 1
#define GREP_EXCLUDE 2

/* actions */
#define GREP_RET_KEEP 0
#define GREP_RET_EXCLUDE 1
#include <fluent-bit/flb_grep.h>

/* modes */
#define FLB_LOG_TO_METRICS_COUNTER_STR "counter"
Expand All @@ -51,7 +44,6 @@

struct log_to_metrics_ctx
{
struct mk_list rules;
struct flb_filter_instance *ins;
int mode;
flb_sds_t metric_name;
Expand All @@ -69,16 +61,7 @@ struct log_to_metrics_ctx
flb_sds_t tag;
int *bucket_counter;
double *buckets;
};

struct grep_rule
{
int type;
flb_sds_t field;
char *regex_pattern;
struct flb_regex *regex;
struct flb_record_accessor *ra;
struct mk_list _head;
struct flb_grep *grep;
};

#endif

0 comments on commit a3abb18

Please sign in to comment.