From da3e191ecc8ab13476a56f491c145630c405debf Mon Sep 17 00:00:00 2001 From: Victor Cabezas Date: Mon, 17 Jul 2023 16:55:03 +0200 Subject: [PATCH 1/3] out_es: Add target_index variable using record accessor syntax. Add target_index variable for rendering ES index name using record accessor syntax without extra bytes (i.e.: no time component on index name). Signed-off-by: Victor Cabezas --- plugins/out_es/es.c | 52 ++++++++++++- plugins/out_es/es.h | 4 + plugins/out_es/es_conf.c | 15 ++++ tests/runtime/out_elasticsearch.c | 120 ++++++++++++++++++++++++++++++ 4 files changed, 188 insertions(+), 3 deletions(-) diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index db2bcee5bd7..552bdb34ac7 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -295,6 +295,7 @@ static int elasticsearch_format(struct flb_config *config, size_t off = 0; size_t off_prev = 0; char *es_index; + char target_index_value[256]; char logstash_index[256]; char time_formatted[256]; char index_formatted[256]; @@ -350,13 +351,15 @@ static int elasticsearch_format(struct flb_config *config, } /* - * If logstash format and id generation are disabled, pre-generate - * the index line for all records. + * If logstash format, target_index record accessor and id generation are disabled, + * pre-generate the index line for all records. * * The header stored in 'j_index' will be used for the all records on * this payload. */ - if (ctx->logstash_format == FLB_FALSE && ctx->generate_id == FLB_FALSE) { + if (ctx->logstash_format == FLB_FALSE && + ctx->generate_id == FLB_FALSE && ctx->ra_target_index == NULL) { + flb_time_get(&tms); gmtime_r(&tms.tm.tv_sec, &tm); strftime(index_formatted, sizeof(index_formatted) - 1, @@ -453,6 +456,42 @@ static int elasticsearch_format(struct flb_config *config, msgpack_pack_str_body(&tmp_pck, time_formatted, s); es_index = ctx->index; + if (ctx->ra_target_index) { + flb_sds_t v = flb_ra_translate_check (ctx->ra_target_index, + (char *) tag, tag_len, + map, NULL, FLB_TRUE); + if (v) { + len = flb_sds_len(v); + if (len > 128) { + len = 128; + } + memcpy(target_index_value, v, len); + target_index_value[len] = '\0'; + es_index_custom_len = len; + flb_sds_destroy(v); + es_index = target_index_value; + } + else { + flb_plg_warn(ctx->ins, + "the value of %s is missing for target_index_key", + ctx->target_index); + } + if (ctx->generate_id == FLB_FALSE) { + if (ctx->suppress_type_name) { + index_len = flb_sds_snprintf (&j_index, + flb_sds_alloc (j_index), + ES_BULK_INDEX_FMT_WITHOUT_TYPE, + ctx->es_action, es_index); + } + else { + index_len = flb_sds_snprintf (&j_index, + flb_sds_alloc (j_index), + ES_BULK_INDEX_FMT, + ctx->es_action, + es_index, ctx->type); + } + } + } if (ctx->logstash_format == FLB_TRUE) { ret = compose_index_header(ctx, es_index_custom_len, &logstash_index[0], sizeof(logstash_index), @@ -1180,6 +1219,13 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_elasticsearch, id_key), "If set, _id will be the value of the key from incoming record." }, + { + FLB_CONFIG_MAP_STR, "target_index", NULL, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, target_index), + "Compose index name using record accessor syntax. If any record accessor " + "variable is invalid (i.e.: The key is not present in event) the value of 'index' " + "will be used." + }, { FLB_CONFIG_MAP_BOOL, "replace_dots", "false", 0, FLB_TRUE, offsetof(struct flb_elasticsearch, replace_dots), diff --git a/plugins/out_es/es.h b/plugins/out_es/es.h index 5d187049f26..250a6a43a6d 100644 --- a/plugins/out_es/es.h +++ b/plugins/out_es/es.h @@ -118,6 +118,10 @@ struct flb_elasticsearch { flb_sds_t id_key; struct flb_record_accessor *ra_id_key; + /* target_index */ + flb_sds_t target_index; + struct flb_record_accessor *ra_target_index; + /* include_tag_key */ int include_tag_key; flb_sds_t tag_key; diff --git a/plugins/out_es/es_conf.c b/plugins/out_es/es_conf.c index 48c8c3e2516..d8e2630dd57 100644 --- a/plugins/out_es/es_conf.c +++ b/plugins/out_es/es_conf.c @@ -290,6 +290,17 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk", path); } + /* Set index record accessor */ + if (ctx->target_index) { + ctx->ra_target_index = flb_ra_create(ctx->target_index, FLB_TRUE); + fprintf(stderr, "record accessor at %p", ctx->ra_target_index); + if (!ctx->ra_target_index) { + flb_plg_error(ctx->ins, "invalid record accessor expression for index '%s'", ctx->target_index); + flb_es_conf_destroy(ctx); + return NULL; + } + } + if (ctx->id_key) { ctx->ra_id_key = flb_ra_create(ctx->id_key, FLB_FALSE); if (ctx->ra_id_key == NULL) { @@ -498,6 +509,10 @@ int flb_es_conf_destroy(struct flb_elasticsearch *ctx) flb_ra_destroy(ctx->ra_id_key); ctx->ra_id_key = NULL; } + if (ctx->ra_target_index) { + flb_ra_destroy(ctx->ra_target_index); + ctx->ra_target_index = NULL; + } if (ctx->es_action) { flb_free(ctx->es_action); } diff --git a/tests/runtime/out_elasticsearch.c b/tests/runtime/out_elasticsearch.c index 9efe7610a95..327ccac10d4 100644 --- a/tests/runtime/out_elasticsearch.c +++ b/tests/runtime/out_elasticsearch.c @@ -167,6 +167,35 @@ static void cb_check_id_key(void *ctx, int ffd, flb_free(res_data); } +static void cb_check_target_index(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + char *p; + char *out_js = res_data; + char *index_line = "{\"create\":{\"_index\":\"aaa-JSON_END\",\"_type\":\"_doc\"}"; + + p = strstr(out_js, index_line); + TEST_CHECK(p != NULL); + flb_free(res_data); +} + +static void cb_check_target_index_default(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + char *p; + char *out_js = res_data; + char *index_line = "{\"create\":{\"_index\":\"default\",\"_type\":\"_doc\"}"; + + p = strstr(out_js, index_line); + TEST_CHECK(p != NULL); + if(!TEST_CHECK(p != NULL)) { + TEST_MSG("Got: %s", out_js); + } + flb_free(res_data); +} + void flb_test_write_operation_index() { int ret; @@ -799,6 +828,95 @@ void flb_test_logstash_prefix_separator() flb_destroy(ctx); } +void flb_test_target_index() +{ + int ret; + int size = sizeof(JSON_ES) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Override defaults of index and type */ + flb_output_set(ctx, out_ffd, + "target_index", "aaa-$END_KEY", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_target_index, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_target_index_default() +{ + int ret; + int size = sizeof(JSON_ES) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Override defaults of index and type */ + flb_output_set(ctx, out_ffd, + "target_index", "aaa-$not_found_key", + "index", "default", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_target_index_default, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + /* Test list */ TEST_LIST = { {"long_index" , flb_test_long_index }, @@ -814,5 +932,7 @@ TEST_LIST = { {"replace_dots" , flb_test_replace_dots }, {"id_key" , flb_test_id_key }, {"logstash_prefix_separator" , flb_test_logstash_prefix_separator }, + {"target_index" , flb_test_target_index }, + {"target_index_default" , flb_test_target_index_default }, {NULL, NULL} }; From bdb3e2825eb1c7c27650176cfa8e88704292542b Mon Sep 17 00:00:00 2001 From: Victor Cabezas Date: Tue, 18 Jul 2023 01:14:53 +0200 Subject: [PATCH 2/3] out_es: Remove debug sprintf left in es_conf.c Signed-off-by: Victor Cabezas --- plugins/out_es/es_conf.c | 1 - 1 file changed, 1 deletion(-) diff --git a/plugins/out_es/es_conf.c b/plugins/out_es/es_conf.c index d8e2630dd57..88697100270 100644 --- a/plugins/out_es/es_conf.c +++ b/plugins/out_es/es_conf.c @@ -293,7 +293,6 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, /* Set index record accessor */ if (ctx->target_index) { ctx->ra_target_index = flb_ra_create(ctx->target_index, FLB_TRUE); - fprintf(stderr, "record accessor at %p", ctx->ra_target_index); if (!ctx->ra_target_index) { flb_plg_error(ctx->ins, "invalid record accessor expression for index '%s'", ctx->target_index); flb_es_conf_destroy(ctx); From 55be71f8a1c0043295a73da17f5812b7a9cd039a Mon Sep 17 00:00:00 2001 From: Victor Cabezas Date: Thu, 20 Jul 2023 10:17:39 +0200 Subject: [PATCH 3/3] out_es: Detect if index uses RA syntax as in out_opensearch plugin Signed-off-by: Victor Cabezas --- plugins/out_es/es.c | 30 ++++--------- plugins/out_es/es.h | 6 +-- plugins/out_es/es_conf.c | 29 +++++++------ run_code_analysis.sh | 1 + tests/runtime/out_elasticsearch.c | 72 +++---------------------------- 5 files changed, 34 insertions(+), 104 deletions(-) diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index 552bdb34ac7..42a4b9ebd1a 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -295,7 +295,7 @@ static int elasticsearch_format(struct flb_config *config, size_t off = 0; size_t off_prev = 0; char *es_index; - char target_index_value[256]; + char index_value[256]; char logstash_index[256]; char time_formatted[256]; char index_formatted[256]; @@ -351,14 +351,14 @@ static int elasticsearch_format(struct flb_config *config, } /* - * If logstash format, target_index record accessor and id generation are disabled, + * If logstash format, index record accessor and id generation are disabled, * pre-generate the index line for all records. * * The header stored in 'j_index' will be used for the all records on * this payload. */ if (ctx->logstash_format == FLB_FALSE && - ctx->generate_id == FLB_FALSE && ctx->ra_target_index == NULL) { + ctx->generate_id == FLB_FALSE && ctx->ra_index == NULL) { flb_time_get(&tms); gmtime_r(&tms.tm.tv_sec, &tm); @@ -456,25 +456,20 @@ static int elasticsearch_format(struct flb_config *config, msgpack_pack_str_body(&tmp_pck, time_formatted, s); es_index = ctx->index; - if (ctx->ra_target_index) { - flb_sds_t v = flb_ra_translate_check (ctx->ra_target_index, + if (ctx->ra_index) { + flb_sds_t v = flb_ra_translate(ctx->ra_index, (char *) tag, tag_len, - map, NULL, FLB_TRUE); + map, NULL); if (v) { len = flb_sds_len(v); if (len > 128) { len = 128; } - memcpy(target_index_value, v, len); - target_index_value[len] = '\0'; + memcpy(index_value, v, len); + index_value[len] = '\0'; es_index_custom_len = len; flb_sds_destroy(v); - es_index = target_index_value; - } - else { - flb_plg_warn(ctx->ins, - "the value of %s is missing for target_index_key", - ctx->target_index); + es_index = index_value; } if (ctx->generate_id == FLB_FALSE) { if (ctx->suppress_type_name) { @@ -1219,13 +1214,6 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_elasticsearch, id_key), "If set, _id will be the value of the key from incoming record." }, - { - FLB_CONFIG_MAP_STR, "target_index", NULL, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, target_index), - "Compose index name using record accessor syntax. If any record accessor " - "variable is invalid (i.e.: The key is not present in event) the value of 'index' " - "will be used." - }, { FLB_CONFIG_MAP_BOOL, "replace_dots", "false", 0, FLB_TRUE, offsetof(struct flb_elasticsearch, replace_dots), diff --git a/plugins/out_es/es.h b/plugins/out_es/es.h index 250a6a43a6d..1de0dff68a3 100644 --- a/plugins/out_es/es.h +++ b/plugins/out_es/es.h @@ -39,6 +39,8 @@ struct flb_elasticsearch { /* Elasticsearch index (database) and type (table) */ char *index; + struct flb_record_accessor *ra_index; + char *type; char suppress_type_name; @@ -118,10 +120,6 @@ struct flb_elasticsearch { flb_sds_t id_key; struct flb_record_accessor *ra_id_key; - /* target_index */ - flb_sds_t target_index; - struct flb_record_accessor *ra_target_index; - /* include_tag_key */ int include_tag_key; flb_sds_t tag_key; diff --git a/plugins/out_es/es_conf.c b/plugins/out_es/es_conf.c index 88697100270..02f4242efb4 100644 --- a/plugins/out_es/es_conf.c +++ b/plugins/out_es/es_conf.c @@ -269,6 +269,20 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, if (f_type) { ctx->type = flb_strdup(f_type->value); /* FIXME */ } + else { + /* Check if the index has been set in the configuration */ + if (ctx->index) { + /* do we have a record accessor pattern ? */ + if (strchr(ctx->index, '$')) { + ctx->ra_index = flb_ra_create(ctx->index, FLB_TRUE); + if (!ctx->ra_index) { + flb_plg_error(ctx->ins, "invalid record accessor pattern set for 'index' property"); + flb_es_conf_destroy(ctx); + return NULL; + } + } + } + } /* HTTP Payload (response) maximum buffer size (0 == unlimited) */ if (ctx->buffer_size == -1) { @@ -290,15 +304,6 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk", path); } - /* Set index record accessor */ - if (ctx->target_index) { - ctx->ra_target_index = flb_ra_create(ctx->target_index, FLB_TRUE); - if (!ctx->ra_target_index) { - flb_plg_error(ctx->ins, "invalid record accessor expression for index '%s'", ctx->target_index); - flb_es_conf_destroy(ctx); - return NULL; - } - } if (ctx->id_key) { ctx->ra_id_key = flb_ra_create(ctx->id_key, FLB_FALSE); @@ -508,9 +513,9 @@ int flb_es_conf_destroy(struct flb_elasticsearch *ctx) flb_ra_destroy(ctx->ra_id_key); ctx->ra_id_key = NULL; } - if (ctx->ra_target_index) { - flb_ra_destroy(ctx->ra_target_index); - ctx->ra_target_index = NULL; + if (ctx->ra_index) { + flb_ra_destroy(ctx->ra_index); + ctx->ra_index = NULL; } if (ctx->es_action) { flb_free(ctx->es_action); diff --git a/run_code_analysis.sh b/run_code_analysis.sh index 22adc47f7ef..9d59d9fd1e3 100755 --- a/run_code_analysis.sh +++ b/run_code_analysis.sh @@ -37,5 +37,6 @@ fi -e INPUT_DEPENDENCIES_DEBIAN="$ADDITIONAL_DEPS" \ -e INPUT_CMAKEFLAGS="$FLB_CMAKE_OPTIONS $SKIP" \ -e INPUT_PRE_COMMAND="cp -R /source /tmp" \ + -e INPUT_TEST_COMMAND="${INPUT_TEST_COMMAND}" \ -e INPUT_WORKING-DIRECTORY="/tmp/source" \ lpenz/ghaction-cmake:0.19 diff --git a/tests/runtime/out_elasticsearch.c b/tests/runtime/out_elasticsearch.c index 327ccac10d4..b0cde947551 100644 --- a/tests/runtime/out_elasticsearch.c +++ b/tests/runtime/out_elasticsearch.c @@ -167,7 +167,7 @@ static void cb_check_id_key(void *ctx, int ffd, flb_free(res_data); } -static void cb_check_target_index(void *ctx, int ffd, +static void cb_check_index_ra(void *ctx, int ffd, int res_ret, void *res_data, size_t res_size, void *data) { @@ -180,22 +180,6 @@ static void cb_check_target_index(void *ctx, int ffd, flb_free(res_data); } -static void cb_check_target_index_default(void *ctx, int ffd, - int res_ret, void *res_data, size_t res_size, - void *data) -{ - char *p; - char *out_js = res_data; - char *index_line = "{\"create\":{\"_index\":\"default\",\"_type\":\"_doc\"}"; - - p = strstr(out_js, index_line); - TEST_CHECK(p != NULL); - if(!TEST_CHECK(p != NULL)) { - TEST_MSG("Got: %s", out_js); - } - flb_free(res_data); -} - void flb_test_write_operation_index() { int ret; @@ -828,51 +812,7 @@ void flb_test_logstash_prefix_separator() flb_destroy(ctx); } -void flb_test_target_index() -{ - int ret; - int size = sizeof(JSON_ES) - 1; - flb_ctx_t *ctx; - int in_ffd; - int out_ffd; - - /* Create context, flush every second (some checks omitted here) */ - ctx = flb_create(); - flb_service_set(ctx, "flush", "1", "grace", "1", NULL); - - /* Lib input mode */ - in_ffd = flb_input(ctx, (char *) "lib", NULL); - flb_input_set(ctx, in_ffd, "tag", "test", NULL); - - /* Elasticsearch output */ - out_ffd = flb_output(ctx, (char *) "es", NULL); - flb_output_set(ctx, out_ffd, - "match", "test", - NULL); - - /* Override defaults of index and type */ - flb_output_set(ctx, out_ffd, - "target_index", "aaa-$END_KEY", - NULL); - - /* Enable test mode */ - ret = flb_output_set_test(ctx, out_ffd, "formatter", - cb_check_target_index, - NULL, NULL); - - /* Start */ - ret = flb_start(ctx); - TEST_CHECK(ret == 0); - - /* Ingest data sample */ - flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); - - sleep(2); - flb_stop(ctx); - flb_destroy(ctx); -} - -void flb_test_target_index_default() +void flb_test_index_ra() { int ret; int size = sizeof(JSON_ES) - 1; @@ -896,13 +836,12 @@ void flb_test_target_index_default() /* Override defaults of index and type */ flb_output_set(ctx, out_ffd, - "target_index", "aaa-$not_found_key", - "index", "default", + "index", "aaa-$END_KEY", NULL); /* Enable test mode */ ret = flb_output_set_test(ctx, out_ffd, "formatter", - cb_check_target_index_default, + cb_check_index_ra, NULL, NULL); /* Start */ @@ -926,13 +865,12 @@ TEST_LIST = { {"write_operation_update", flb_test_write_operation_update }, {"write_operation_upsert", flb_test_write_operation_upsert }, {"index_type" , flb_test_index_type }, + {"index_ra" , flb_test_index_ra}, {"logstash_format" , flb_test_logstash_format }, {"logstash_format_nanos" , flb_test_logstash_format_nanos }, {"tag_key" , flb_test_tag_key }, {"replace_dots" , flb_test_replace_dots }, {"id_key" , flb_test_id_key }, {"logstash_prefix_separator" , flb_test_logstash_prefix_separator }, - {"target_index" , flb_test_target_index }, - {"target_index_default" , flb_test_target_index_default }, {NULL, NULL} };