From 8a6a4094e715459801d72bf079e47ae977f8e560 Mon Sep 17 00:00:00 2001 From: Victor Cabezas Date: Thu, 20 Jul 2023 10:17:39 +0200 Subject: [PATCH] out_es: Detect if index uses RA syntax as in out_opensearch plugin Signed-off-by: Victor Cabezas --- plugins/out_es/es.c | 30 ++++--------- plugins/out_es/es.h | 6 +-- plugins/out_es/es_conf.c | 29 +++++++------ run_code_analysis.sh | 1 + tests/runtime/out_elasticsearch.c | 72 +++---------------------------- 5 files changed, 34 insertions(+), 104 deletions(-) diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index 552bdb34ac7..42a4b9ebd1a 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -295,7 +295,7 @@ static int elasticsearch_format(struct flb_config *config, size_t off = 0; size_t off_prev = 0; char *es_index; - char target_index_value[256]; + char index_value[256]; char logstash_index[256]; char time_formatted[256]; char index_formatted[256]; @@ -351,14 +351,14 @@ static int elasticsearch_format(struct flb_config *config, } /* - * If logstash format, target_index record accessor and id generation are disabled, + * If logstash format, index record accessor and id generation are disabled, * pre-generate the index line for all records. * * The header stored in 'j_index' will be used for the all records on * this payload. */ if (ctx->logstash_format == FLB_FALSE && - ctx->generate_id == FLB_FALSE && ctx->ra_target_index == NULL) { + ctx->generate_id == FLB_FALSE && ctx->ra_index == NULL) { flb_time_get(&tms); gmtime_r(&tms.tm.tv_sec, &tm); @@ -456,25 +456,20 @@ static int elasticsearch_format(struct flb_config *config, msgpack_pack_str_body(&tmp_pck, time_formatted, s); es_index = ctx->index; - if (ctx->ra_target_index) { - flb_sds_t v = flb_ra_translate_check (ctx->ra_target_index, + if (ctx->ra_index) { + flb_sds_t v = flb_ra_translate(ctx->ra_index, (char *) tag, tag_len, - map, NULL, FLB_TRUE); + map, NULL); if (v) { len = flb_sds_len(v); if (len > 128) { len = 128; } - memcpy(target_index_value, v, len); - target_index_value[len] = '\0'; + memcpy(index_value, v, len); + index_value[len] = '\0'; es_index_custom_len = len; flb_sds_destroy(v); - es_index = target_index_value; - } - else { - flb_plg_warn(ctx->ins, - "the value of %s is missing for target_index_key", - ctx->target_index); + es_index = index_value; } if (ctx->generate_id == FLB_FALSE) { if (ctx->suppress_type_name) { @@ -1219,13 +1214,6 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_elasticsearch, id_key), "If set, _id will be the value of the key from incoming record." }, - { - FLB_CONFIG_MAP_STR, "target_index", NULL, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, target_index), - "Compose index name using record accessor syntax. If any record accessor " - "variable is invalid (i.e.: The key is not present in event) the value of 'index' " - "will be used." - }, { FLB_CONFIG_MAP_BOOL, "replace_dots", "false", 0, FLB_TRUE, offsetof(struct flb_elasticsearch, replace_dots), diff --git a/plugins/out_es/es.h b/plugins/out_es/es.h index 250a6a43a6d..1de0dff68a3 100644 --- a/plugins/out_es/es.h +++ b/plugins/out_es/es.h @@ -39,6 +39,8 @@ struct flb_elasticsearch { /* Elasticsearch index (database) and type (table) */ char *index; + struct flb_record_accessor *ra_index; + char *type; char suppress_type_name; @@ -118,10 +120,6 @@ struct flb_elasticsearch { flb_sds_t id_key; struct flb_record_accessor *ra_id_key; - /* target_index */ - flb_sds_t target_index; - struct flb_record_accessor *ra_target_index; - /* include_tag_key */ int include_tag_key; flb_sds_t tag_key; diff --git a/plugins/out_es/es_conf.c b/plugins/out_es/es_conf.c index 7096ac03c55..7143a2f138b 100644 --- a/plugins/out_es/es_conf.c +++ b/plugins/out_es/es_conf.c @@ -250,6 +250,20 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, if (f_type) { ctx->type = flb_strdup(f_type->value); /* FIXME */ } + else { + /* Check if the index has been set in the configuration */ + if (ctx->index) { + /* do we have a record accessor pattern ? */ + if (strchr(ctx->index, '$')) { + ctx->ra_index = flb_ra_create(ctx->index, FLB_TRUE); + if (!ctx->ra_index) { + flb_plg_error(ctx->ins, "invalid record accessor pattern set for 'index' property"); + flb_es_conf_destroy(ctx); + return NULL; + } + } + } + } /* HTTP Payload (response) maximum buffer size (0 == unlimited) */ if (ctx->buffer_size == -1) { @@ -271,15 +285,6 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk", path); } - /* Set index record accessor */ - if (ctx->target_index) { - ctx->ra_target_index = flb_ra_create(ctx->target_index, FLB_TRUE); - if (!ctx->ra_target_index) { - flb_plg_error(ctx->ins, "invalid record accessor expression for index '%s'", ctx->target_index); - flb_es_conf_destroy(ctx); - return NULL; - } - } if (ctx->id_key) { ctx->ra_id_key = flb_ra_create(ctx->id_key, FLB_FALSE); @@ -489,9 +494,9 @@ int flb_es_conf_destroy(struct flb_elasticsearch *ctx) flb_ra_destroy(ctx->ra_id_key); ctx->ra_id_key = NULL; } - if (ctx->ra_target_index) { - flb_ra_destroy(ctx->ra_target_index); - ctx->ra_target_index = NULL; + if (ctx->ra_index) { + flb_ra_destroy(ctx->ra_index); + ctx->ra_index = NULL; } if (ctx->es_action) { flb_free(ctx->es_action); diff --git a/run_code_analysis.sh b/run_code_analysis.sh index 22adc47f7ef..9d59d9fd1e3 100755 --- a/run_code_analysis.sh +++ b/run_code_analysis.sh @@ -37,5 +37,6 @@ fi -e INPUT_DEPENDENCIES_DEBIAN="$ADDITIONAL_DEPS" \ -e INPUT_CMAKEFLAGS="$FLB_CMAKE_OPTIONS $SKIP" \ -e INPUT_PRE_COMMAND="cp -R /source /tmp" \ + -e INPUT_TEST_COMMAND="${INPUT_TEST_COMMAND}" \ -e INPUT_WORKING-DIRECTORY="/tmp/source" \ lpenz/ghaction-cmake:0.19 diff --git a/tests/runtime/out_elasticsearch.c b/tests/runtime/out_elasticsearch.c index 327ccac10d4..b0cde947551 100644 --- a/tests/runtime/out_elasticsearch.c +++ b/tests/runtime/out_elasticsearch.c @@ -167,7 +167,7 @@ static void cb_check_id_key(void *ctx, int ffd, flb_free(res_data); } -static void cb_check_target_index(void *ctx, int ffd, +static void cb_check_index_ra(void *ctx, int ffd, int res_ret, void *res_data, size_t res_size, void *data) { @@ -180,22 +180,6 @@ static void cb_check_target_index(void *ctx, int ffd, flb_free(res_data); } -static void cb_check_target_index_default(void *ctx, int ffd, - int res_ret, void *res_data, size_t res_size, - void *data) -{ - char *p; - char *out_js = res_data; - char *index_line = "{\"create\":{\"_index\":\"default\",\"_type\":\"_doc\"}"; - - p = strstr(out_js, index_line); - TEST_CHECK(p != NULL); - if(!TEST_CHECK(p != NULL)) { - TEST_MSG("Got: %s", out_js); - } - flb_free(res_data); -} - void flb_test_write_operation_index() { int ret; @@ -828,51 +812,7 @@ void flb_test_logstash_prefix_separator() flb_destroy(ctx); } -void flb_test_target_index() -{ - int ret; - int size = sizeof(JSON_ES) - 1; - flb_ctx_t *ctx; - int in_ffd; - int out_ffd; - - /* Create context, flush every second (some checks omitted here) */ - ctx = flb_create(); - flb_service_set(ctx, "flush", "1", "grace", "1", NULL); - - /* Lib input mode */ - in_ffd = flb_input(ctx, (char *) "lib", NULL); - flb_input_set(ctx, in_ffd, "tag", "test", NULL); - - /* Elasticsearch output */ - out_ffd = flb_output(ctx, (char *) "es", NULL); - flb_output_set(ctx, out_ffd, - "match", "test", - NULL); - - /* Override defaults of index and type */ - flb_output_set(ctx, out_ffd, - "target_index", "aaa-$END_KEY", - NULL); - - /* Enable test mode */ - ret = flb_output_set_test(ctx, out_ffd, "formatter", - cb_check_target_index, - NULL, NULL); - - /* Start */ - ret = flb_start(ctx); - TEST_CHECK(ret == 0); - - /* Ingest data sample */ - flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); - - sleep(2); - flb_stop(ctx); - flb_destroy(ctx); -} - -void flb_test_target_index_default() +void flb_test_index_ra() { int ret; int size = sizeof(JSON_ES) - 1; @@ -896,13 +836,12 @@ void flb_test_target_index_default() /* Override defaults of index and type */ flb_output_set(ctx, out_ffd, - "target_index", "aaa-$not_found_key", - "index", "default", + "index", "aaa-$END_KEY", NULL); /* Enable test mode */ ret = flb_output_set_test(ctx, out_ffd, "formatter", - cb_check_target_index_default, + cb_check_index_ra, NULL, NULL); /* Start */ @@ -926,13 +865,12 @@ TEST_LIST = { {"write_operation_update", flb_test_write_operation_update }, {"write_operation_upsert", flb_test_write_operation_upsert }, {"index_type" , flb_test_index_type }, + {"index_ra" , flb_test_index_ra}, {"logstash_format" , flb_test_logstash_format }, {"logstash_format_nanos" , flb_test_logstash_format_nanos }, {"tag_key" , flb_test_tag_key }, {"replace_dots" , flb_test_replace_dots }, {"id_key" , flb_test_id_key }, {"logstash_prefix_separator" , flb_test_logstash_prefix_separator }, - {"target_index" , flb_test_target_index }, - {"target_index_default" , flb_test_target_index_default }, {NULL, NULL} };