From 8c4036b67762bc11033c0cd9c244fbe6004e879b Mon Sep 17 00:00:00 2001 From: Thiago Padilha Date: Sun, 26 May 2024 11:45:39 -0300 Subject: [PATCH] config_format: yaml: Support passing arbitrary objects to processors This was initially based on https://github.com/fluent/fluent-bit/pull/8656 and https://github.com/fluent/fluent-bit/pull/8661 but ended up making few different choices: - Instead of adding FLB_CONFIG_MAP_KVLIST and FLB_CONFIG_MAP_ARRAY as config map types, it adds a single FLB_CONFIG_MAP_VARIANT which accepts an array or map. The passed array/map can have nested values and accept any JSON/YAML types, which are converted to cfl types. The processor will simply receive a `cfl_variant` pointer but doesn't own it, the config section is responsible for releasing its memory. - The existing `flb_config_map_set` function is reused by passing the `cfl_variant` pointer (from the yaml parsing phase) as the value of a `flb_kv`. Then pointer is eventually passed to the processor context. Signed-off-by: Thiago Padilha --- include/fluent-bit/flb_config_map.h | 4 + include/fluent-bit/flb_processor.h | 2 +- src/config_format/flb_cf_yaml.c | 333 +++++++++++++++++++++++----- src/flb_config_map.c | 10 + src/flb_processor.c | 31 ++- 5 files changed, 311 insertions(+), 69 deletions(-) diff --git a/include/fluent-bit/flb_config_map.h b/include/fluent-bit/flb_config_map.h index c644df5330b..8ca3c1eb42e 100644 --- a/include/fluent-bit/flb_config_map.h +++ b/include/fluent-bit/flb_config_map.h @@ -24,6 +24,7 @@ #include #include #include +#include /* Configuration types */ #define FLB_CONFIG_MAP_STR 0 /* string */ @@ -47,6 +48,8 @@ #define FLB_CONFIG_MAP_SLIST_3 43 /* split up to 3 nodes + remaining data */ #define FLB_CONFIG_MAP_SLIST_4 44 /* split up to 4 nodes + remaining data */ +#define FLB_CONFIG_MAP_VARIANT 50 /* variant that wraps a kvlist or array */ + #define FLB_CONFIG_MAP_MULT 1 struct flb_config_map_val { @@ -57,6 +60,7 @@ struct flb_config_map_val { size_t s_num; /* FLB_CONFIG_MAP_SIZE */ flb_sds_t str; /* FLB_CONFIG_MAP_STR */ struct mk_list *list; /* FLB_CONFIG_MAP_CLIST and FLB_CONFIG_MAP_SLIST */ + struct cfl_variant *variant; /* FLB_CONFIG_MAP_VARIANT */ } val; struct mk_list *mult; struct mk_list _head; /* Link to list if this entry is a 'multiple' entry */ diff --git a/include/fluent-bit/flb_processor.h b/include/fluent-bit/flb_processor.h index 116e23e570d..c4abd7b43a0 100644 --- a/include/fluent-bit/flb_processor.h +++ b/include/fluent-bit/flb_processor.h @@ -243,7 +243,7 @@ int flb_processor_instance_check_properties( int flb_processor_instance_set_property( struct flb_processor_instance *ins, - const char *k, const char *v); + const char *k, struct cfl_variant *v); const char *flb_processor_instance_get_property( const char *key, diff --git a/src/config_format/flb_cf_yaml.c b/src/config_format/flb_cf_yaml.c index 6581692a422..eff4be4096f 100644 --- a/src/config_format/flb_cf_yaml.c +++ b/src/config_format/flb_cf_yaml.c @@ -122,6 +122,7 @@ enum state { STATE_PLUGIN_KEY, STATE_PLUGIN_VAL, STATE_PLUGIN_VAL_LIST, + STATE_PLUGIN_VARIANT, STATE_GROUP_KEY, STATE_GROUP_VAL, @@ -160,6 +161,10 @@ struct parser_state { struct cfl_kvlist *keyvals; /* pointer to current values in a list. */ struct cfl_array *values; + /* pointer to current variant */ + struct cfl_variant *variant; + /* if the current variant is reading the key of a kvlist */ + cfl_sds_t variant_kvlist_key; /* are we the owner of the values? */ int allocation_flags; @@ -169,6 +174,9 @@ struct parser_state { }; static struct parser_state *state_push(struct local_ctx *, enum state); +static struct parser_state *state_push_variant(struct local_ctx *, + struct parser_state *, + int); static struct parser_state *state_push_withvals(struct local_ctx *, struct parser_state *, enum state); @@ -181,6 +189,10 @@ static struct parser_state *state_push_key(struct local_ctx *, enum state, const char *key); static int state_create_section(struct flb_cf *, struct parser_state *, char *); static int state_create_group(struct flb_cf *, struct parser_state *, char *); +static struct cfl_variant * state_variant_parse_scalar(yaml_event_t *event); +static int state_variant_set_child(struct local_ctx *, + struct parser_state *, + struct cfl_variant *); static struct parser_state *state_pop(struct local_ctx *ctx); static struct parser_state *state_create(struct file_state *parent, struct file_state *file); static void state_destroy(struct parser_state *s); @@ -228,6 +240,8 @@ static char *state_str(enum state val) return "plugin-value"; case STATE_PLUGIN_VAL_LIST: return "plugin-values"; + case STATE_PLUGIN_VARIANT: + return "plugin-variant"; case STATE_GROUP_KEY: return "group-key"; case STATE_GROUP_VAL: @@ -588,16 +602,14 @@ static struct parser_state *get_current_state(struct local_ctx *ctx) return state; } -static enum status state_copy_into_config_group(struct parser_state *state, struct flb_cf_group *cf_group) +static enum status state_move_into_config_group(struct parser_state *state, struct flb_cf_group *cf_group) { struct cfl_list *head; + struct cfl_list *tmp; struct cfl_kvpair *kvp; - struct cfl_variant *var; struct cfl_variant *varr; struct cfl_array *arr; - struct cfl_array *carr; struct cfl_kvlist *copy; - int idx; if (cf_group == NULL) { flb_error("no group for processor properties"); @@ -634,64 +646,18 @@ static enum status state_copy_into_config_group(struct parser_state *state, stru return YAML_FAILURE; } - cfl_list_foreach(head, &state->keyvals->list) { + cfl_list_foreach_safe(head, tmp, &state->keyvals->list) { kvp = cfl_list_entry(head, struct cfl_kvpair, _head); - switch (kvp->val->type) { - case CFL_VARIANT_STRING: - - if (cfl_kvlist_insert_string(copy, kvp->key, kvp->val->data.as_string) < 0) { - flb_error("unable to allocate kvlist"); - cfl_kvlist_destroy(copy); - return YAML_FAILURE; - } - break; - case CFL_VARIANT_ARRAY: - carr = cfl_array_create(kvp->val->data.as_array->entry_count); - - if (carr == NULL) { - flb_error("unable to allocate array"); - cfl_kvlist_destroy(copy); - return YAML_FAILURE; - } - - for (idx = 0; idx < kvp->val->data.as_array->entry_count; idx++) { - var = cfl_array_fetch_by_index(kvp->val->data.as_array, idx); - - if (var == NULL) { - cfl_array_destroy(arr); - flb_error("unable to fetch from array by index"); - return YAML_FAILURE; - } - switch (var->type) { - case CFL_VARIANT_STRING: - if (cfl_array_append_string(carr, var->data.as_string) < 0) { - flb_error("unable to append string"); - cfl_kvlist_destroy(copy); - cfl_array_destroy(carr); - return YAML_FAILURE; - } - break; - default: - cfl_array_destroy(arr); - flb_error("unable to copy value for property"); - cfl_kvlist_destroy(copy); - cfl_array_destroy(carr); - return YAML_FAILURE; - } - } - - if (cfl_kvlist_insert_array(copy, kvp->key, carr) < 0) { - cfl_array_destroy(arr); - flb_error("unabelt to insert into array"); - flb_error("unable to insert array into kvlist"); - } - break; - default: - flb_error("unknown value type for properties: %d", kvp->val->type); + if (cfl_kvlist_insert(copy, kvp->key, kvp->val) < 0) { + flb_error("unable to insert to kvlist"); cfl_kvlist_destroy(copy); return YAML_FAILURE; } + + /* ownership moved to the config group */ + kvp->val = NULL; + cfl_kvpair_destroy(kvp); } if (cfl_array_append_kvlist(arr, copy) < 0) { @@ -766,6 +732,7 @@ static enum status state_copy_into_properties(struct parser_state *state, struct static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, yaml_event_t *event) { + struct cfl_variant *variant; struct parser_state *state; enum status status; int ret; @@ -1263,7 +1230,7 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, print_current_properties(state); if (state->section == SECTION_PROCESSOR) { - status = state_copy_into_config_group(state, state->cf_group); + status = state_move_into_config_group(state, state->cf_group); if (status != YAML_SUCCESS) { return status; @@ -1388,7 +1355,12 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, } break; case YAML_SEQUENCE_START_EVENT: /* start a new group */ - state = state_push_witharr(ctx, state, STATE_PLUGIN_VAL_LIST); + if (state->section == SECTION_PROCESSOR) { + state = state_push_variant(ctx, state, 0); + } + else { + state = state_push_witharr(ctx, state, STATE_PLUGIN_VAL_LIST); + } if (state == NULL) { flb_error("unable to allocate state"); @@ -1397,6 +1369,18 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, break; case YAML_MAPPING_START_EVENT: + if (state->section == SECTION_PROCESSOR) { + /* when in a processor section, we allow plugins to have nested + * properties which are returned as a cfl_variant */ + state = state_push_variant(ctx, state, 1); + + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } + break; + } + if (strcmp(state->key, "processors") == 0) { state = state_push(ctx, STATE_INPUT_PROCESSORS); @@ -1509,6 +1493,78 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, return YAML_FAILURE; } break; + + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + + + case STATE_PLUGIN_VARIANT: + switch(event->type) { + case YAML_MAPPING_START_EVENT: + case YAML_SEQUENCE_START_EVENT: /* nested map or array */ + state = state_push_variant(ctx, state, event->type == YAML_MAPPING_START_EVENT); + + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } + break; + + case YAML_SCALAR_EVENT: + if (state->variant->type == CFL_VARIANT_KVLIST && state->variant_kvlist_key == NULL) { + /* save the key for later */ + state->variant_kvlist_key = cfl_sds_create((const char *)event->data.scalar.value); + break; + } + + /* set the value */ + variant = state_variant_parse_scalar(event); + + if (variant == NULL) { + flb_error("unable to allocate memory for variant"); + return YAML_FAILURE; + } + + if (state_variant_set_child(ctx, state, variant)) { + flb_error("unable to add key to list map"); + return YAML_FAILURE; + } + + break; + + case YAML_MAPPING_END_EVENT: + case YAML_SEQUENCE_END_EVENT: + variant = state->variant; + + state = state_pop(ctx); + + if (state->state == STATE_PLUGIN_VAL) { + /* set variant to the parent state keyvals */ + if (cfl_kvlist_insert(state->keyvals, state->key, variant) < 0) { + flb_error("unable to insert variant"); + return YAML_FAILURE; + } + + state = state_pop(ctx); + + break; + } + + if (state->variant->type == CFL_VARIANT_KVLIST && state->variant_kvlist_key == NULL) { + flb_error("invalid state, should have a variant key"); + return YAML_FAILURE; + } + + if (state_variant_set_child(ctx, state, variant)) { + flb_error("unable to add key to list map"); + return YAML_FAILURE; + } + + break; + default: yaml_error_event(ctx, state, event); return YAML_FAILURE; @@ -1767,6 +1823,165 @@ static struct parser_state *state_push_key(struct local_ctx *ctx, return state; } +static int parse_uint64(const char *in, uint64_t *out) +{ + char *end; + uint64_t val; + + errno = 0; + val = strtoull(in, &end, 10); + if (end == in || *end != 0 || errno) { + return -1; + } + + *out = val; + return 0; +} + +static int parse_int64(const char *in, int64_t *out) +{ + char *end; + int64_t val; + + errno = 0; + val = strtoll(in, &end, 10); + if (end == in || *end != 0 || errno) { + return -1; + } + + *out = val; + return 0; +} + +static int parse_double(const char *in, double *out) +{ + char *end; + double val; + errno = 0; + val = strtod(in, &end); + if (end == in || *end != 0 || errno) { + return -1; + } + *out = val; + return 0; +} + +static struct cfl_variant * state_variant_parse_scalar(yaml_event_t *event) +{ + int64_t i64; + uint64_t u64; + double d; + char *value = (char *)event->data.scalar.value; + + if (event->data.scalar.style != YAML_PLAIN_SCALAR_STYLE) { + /* return a string */ + return cfl_variant_create_from_string(value); + } + + if (!strcmp(value, "null")) { + return cfl_variant_create_from_null(); + } + else if (!strcmp(value, "false")) { + return cfl_variant_create_from_bool(0); + } + else if (!strcmp(value, "true")) { + return cfl_variant_create_from_bool(1); + } + + if (value[0] != '-' && parse_uint64(value, &u64) == 0) { + return cfl_variant_create_from_uint64(u64); + } + else if (parse_int64(value, &i64) == 0) { + return cfl_variant_create_from_int64(i64); + } + else if (parse_double(value, &d) == 0) { + return cfl_variant_create_from_double(d); + } + + /* treat as a string */ + return cfl_variant_create_from_string(value); +} + +static int state_variant_set_child(struct local_ctx *ctx, + struct parser_state *state, + struct cfl_variant *variant) +{ + if (state->variant->type == CFL_VARIANT_ARRAY) { + return cfl_array_append(state->variant->data.as_array, variant); + } + + if (state->variant_kvlist_key == NULL) { + return -1; + } + else { + + if (cfl_kvlist_insert(state->variant->data.as_kvlist, + state->variant_kvlist_key, + variant) < 0) { + return -1; + } + cfl_sds_destroy(state->variant_kvlist_key); + state->variant_kvlist_key = NULL; + + } + + return 0; +} + +static struct parser_state *state_push_variant(struct local_ctx *ctx, + struct parser_state *parent, + int is_kvlist) +{ + struct parser_state *state; + struct cfl_variant *variant; + struct cfl_kvlist *kvlist; + struct cfl_array *array; + + if (is_kvlist) { + + kvlist = cfl_kvlist_create(); + + if (kvlist == NULL) { + return NULL; + } + + variant = cfl_variant_create_from_kvlist(kvlist); + + if (variant == NULL) { + cfl_kvlist_destroy(kvlist); + return NULL; + } + + } + else { + + array = cfl_array_create(10); + + if (array == NULL) { + return NULL; + } + + variant = cfl_variant_create_from_array(array); + + if (variant == NULL) { + cfl_array_destroy(array); + return NULL; + } + } + + state = state_push(ctx, STATE_PLUGIN_VARIANT); + + if (state == NULL) { + cfl_variant_destroy(variant); + return NULL; + } + + state->variant = variant; + state->variant_kvlist_key = NULL; + + return state; +} + static struct parser_state *state_push_withvals(struct local_ctx *ctx, struct parser_state *parent, enum state state_num) diff --git a/src/flb_config_map.c b/src/flb_config_map.c index 463f542889c..4db55411cb0 100644 --- a/src/flb_config_map.c +++ b/src/flb_config_map.c @@ -26,6 +26,7 @@ #include #include #include +#include static int check_list_size(struct mk_list *list, int type) { @@ -587,6 +588,7 @@ int flb_config_map_set(struct mk_list *properties, struct mk_list *map, void *co double *m_d_num; size_t *m_s_num; flb_sds_t *m_str; + struct cfl_variant **m_variant; struct flb_kv *kv; struct mk_list *head; struct mk_list *m_head; @@ -668,6 +670,7 @@ int flb_config_map_set(struct mk_list *properties, struct mk_list *map, void *co */ mk_list_foreach(head, properties) { kv = mk_list_entry(head, struct flb_kv, _head); + if (kv->val == NULL) { continue; } @@ -793,6 +796,13 @@ int flb_config_map_set(struct mk_list *properties, struct mk_list *map, void *co m_i_num = (int *) (base + m->offset); *m_i_num = flb_utils_time_to_seconds(kv->val); } + else if (m->type == FLB_CONFIG_MAP_VARIANT) { + m_variant = (struct cfl_variant **) (base + m->offset); + *m_variant = (struct cfl_variant *)kv->val; + /* Ownership of the object belongs to the config section, set it + * to NULL to prevent flb_kv_item_destroy to attempt freeing it */ + kv->val = NULL; + } else if (m->type >= FLB_CONFIG_MAP_CLIST || m->type <= FLB_CONFIG_MAP_SLIST_4) { list = parse_string_map_to_list(m, kv->val); diff --git a/src/flb_processor.c b/src/flb_processor.c index 14832fdfa82..a756940861a 100644 --- a/src/flb_processor.c +++ b/src/flb_processor.c @@ -29,6 +29,7 @@ #include #include #include +#include static int acquire_lock(pthread_mutex_t *lock, size_t retry_limit, @@ -300,7 +301,7 @@ int flb_processor_unit_set_property(struct flb_processor_unit *pu, const char *k return flb_processor_instance_set_property( (struct flb_processor_instance *) pu->ctx, - k, v->data.as_string); + k, v); } void flb_processor_unit_destroy(struct flb_processor_unit *pu) @@ -733,6 +734,7 @@ static int load_from_config_format_group(struct flb_processor *proc, int type, s struct cfl_kvlist *kvlist; struct cfl_kvpair *pair = NULL; struct cfl_list *head; + struct cfl_list *tmp2; struct flb_processor_unit *pu; struct flb_filter_instance *f_ins; @@ -769,7 +771,7 @@ static int load_from_config_format_group(struct flb_processor *proc, int type, s } /* iterate list of properties and set each one (skip name) */ - cfl_list_foreach(head, &kvlist->list) { + cfl_list_foreach_safe(head, tmp2, &kvlist->list) { pair = cfl_list_entry(head, struct cfl_kvpair, _head); if (strcmp(pair->key, "name") == 0) { @@ -869,18 +871,20 @@ static inline int prop_key_check(const char *key, const char *kv, int k_len) } int flb_processor_instance_set_property(struct flb_processor_instance *ins, - const char *k, const char *v) + const char *k, struct cfl_variant *v) { int len; int ret; - flb_sds_t tmp; struct flb_kv *kv; + cfl_sds_t tmp = NULL; len = strlen(k); - tmp = flb_env_var_translate(ins->config->env, v); + if (v->type == CFL_VARIANT_STRING) { + tmp = flb_env_var_translate(ins->config->env, v->data.as_string); - if (!tmp) { - return -1; + if (!tmp) { + return -1; + } } if (prop_key_check("alias", k, len) == 0 && tmp) { @@ -903,13 +907,22 @@ int flb_processor_instance_set_property(struct flb_processor_instance *ins, kv = flb_kv_item_create(&ins->properties, (char *) k, NULL); if (!kv) { - if (tmp) { flb_sds_destroy(tmp); } return -1; } - kv->val = tmp; + + + if (v->type == CFL_VARIANT_STRING) { + kv->val = tmp; + } + else { + /* Hacky workaround: We store the variant address in a char * just to pass + * the variant reference to the plugin. After this happens, + * kv->val must be set to NULL (done in flb_config_map.c) */ + kv->val = (void *)v; + } } return 0;