From b3d491ddf38c8d4e41fa0b4744c15093f7230c0e Mon Sep 17 00:00:00 2001 From: Victor Cabezas Date: Mon, 17 Jul 2023 16:55:03 +0200 Subject: [PATCH] out_es: Add target_index variable using record accessor syntax. Add target_index variable for rendering ES index name using record accessor syntax without extra bytes (i.e.: no time component on index name). Signed-off-by: Victor Cabezas --- plugins/out_es/es.c | 52 ++++++++++++- plugins/out_es/es.h | 4 + plugins/out_es/es_conf.c | 15 ++++ tests/runtime/out_elasticsearch.c | 120 ++++++++++++++++++++++++++++++ 4 files changed, 188 insertions(+), 3 deletions(-) diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index db2bcee5bd7..552bdb34ac7 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -295,6 +295,7 @@ static int elasticsearch_format(struct flb_config *config, size_t off = 0; size_t off_prev = 0; char *es_index; + char target_index_value[256]; char logstash_index[256]; char time_formatted[256]; char index_formatted[256]; @@ -350,13 +351,15 @@ static int elasticsearch_format(struct flb_config *config, } /* - * If logstash format and id generation are disabled, pre-generate - * the index line for all records. + * If logstash format, target_index record accessor and id generation are disabled, + * pre-generate the index line for all records. * * The header stored in 'j_index' will be used for the all records on * this payload. */ - if (ctx->logstash_format == FLB_FALSE && ctx->generate_id == FLB_FALSE) { + if (ctx->logstash_format == FLB_FALSE && + ctx->generate_id == FLB_FALSE && ctx->ra_target_index == NULL) { + flb_time_get(&tms); gmtime_r(&tms.tm.tv_sec, &tm); strftime(index_formatted, sizeof(index_formatted) - 1, @@ -453,6 +456,42 @@ static int elasticsearch_format(struct flb_config *config, msgpack_pack_str_body(&tmp_pck, time_formatted, s); es_index = ctx->index; + if (ctx->ra_target_index) { + flb_sds_t v = flb_ra_translate_check (ctx->ra_target_index, + (char *) tag, tag_len, + map, NULL, FLB_TRUE); + if (v) { + len = flb_sds_len(v); + if (len > 128) { + len = 128; + } + memcpy(target_index_value, v, len); + target_index_value[len] = '\0'; + es_index_custom_len = len; + flb_sds_destroy(v); + es_index = target_index_value; + } + else { + flb_plg_warn(ctx->ins, + "the value of %s is missing for target_index_key", + ctx->target_index); + } + if (ctx->generate_id == FLB_FALSE) { + if (ctx->suppress_type_name) { + index_len = flb_sds_snprintf (&j_index, + flb_sds_alloc (j_index), + ES_BULK_INDEX_FMT_WITHOUT_TYPE, + ctx->es_action, es_index); + } + else { + index_len = flb_sds_snprintf (&j_index, + flb_sds_alloc (j_index), + ES_BULK_INDEX_FMT, + ctx->es_action, + es_index, ctx->type); + } + } + } if (ctx->logstash_format == FLB_TRUE) { ret = compose_index_header(ctx, es_index_custom_len, &logstash_index[0], sizeof(logstash_index), @@ -1180,6 +1219,13 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_elasticsearch, id_key), "If set, _id will be the value of the key from incoming record." }, + { + FLB_CONFIG_MAP_STR, "target_index", NULL, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch, target_index), + "Compose index name using record accessor syntax. If any record accessor " + "variable is invalid (i.e.: The key is not present in event) the value of 'index' " + "will be used." + }, { FLB_CONFIG_MAP_BOOL, "replace_dots", "false", 0, FLB_TRUE, offsetof(struct flb_elasticsearch, replace_dots), diff --git a/plugins/out_es/es.h b/plugins/out_es/es.h index 5d187049f26..250a6a43a6d 100644 --- a/plugins/out_es/es.h +++ b/plugins/out_es/es.h @@ -118,6 +118,10 @@ struct flb_elasticsearch { flb_sds_t id_key; struct flb_record_accessor *ra_id_key; + /* target_index */ + flb_sds_t target_index; + struct flb_record_accessor *ra_target_index; + /* include_tag_key */ int include_tag_key; flb_sds_t tag_key; diff --git a/plugins/out_es/es_conf.c b/plugins/out_es/es_conf.c index 450a3482bdc..8a79c5c7a01 100644 --- a/plugins/out_es/es_conf.c +++ b/plugins/out_es/es_conf.c @@ -271,6 +271,17 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk", path); } + /* Set index record accessor */ + if (ctx->target_index) { + ctx->ra_target_index = flb_ra_create(ctx->target_index, FLB_TRUE); + fprintf(stderr, "record accessor at %p", ctx->ra_target_index); + if (!ctx->ra_target_index) { + flb_plg_error(ctx->ins, "invalid record accessor expression for index '%s'", ctx->target_index); + flb_es_conf_destroy(ctx); + return NULL; + } + } + if (ctx->id_key) { ctx->ra_id_key = flb_ra_create(ctx->id_key, FLB_FALSE); if (ctx->ra_id_key == NULL) { @@ -479,6 +490,10 @@ int flb_es_conf_destroy(struct flb_elasticsearch *ctx) flb_ra_destroy(ctx->ra_id_key); ctx->ra_id_key = NULL; } + if (ctx->ra_target_index) { + flb_ra_destroy(ctx->ra_target_index); + ctx->ra_target_index = NULL; + } if (ctx->es_action) { flb_free(ctx->es_action); } diff --git a/tests/runtime/out_elasticsearch.c b/tests/runtime/out_elasticsearch.c index 9efe7610a95..327ccac10d4 100644 --- a/tests/runtime/out_elasticsearch.c +++ b/tests/runtime/out_elasticsearch.c @@ -167,6 +167,35 @@ static void cb_check_id_key(void *ctx, int ffd, flb_free(res_data); } +static void cb_check_target_index(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + char *p; + char *out_js = res_data; + char *index_line = "{\"create\":{\"_index\":\"aaa-JSON_END\",\"_type\":\"_doc\"}"; + + p = strstr(out_js, index_line); + TEST_CHECK(p != NULL); + flb_free(res_data); +} + +static void cb_check_target_index_default(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + char *p; + char *out_js = res_data; + char *index_line = "{\"create\":{\"_index\":\"default\",\"_type\":\"_doc\"}"; + + p = strstr(out_js, index_line); + TEST_CHECK(p != NULL); + if(!TEST_CHECK(p != NULL)) { + TEST_MSG("Got: %s", out_js); + } + flb_free(res_data); +} + void flb_test_write_operation_index() { int ret; @@ -799,6 +828,95 @@ void flb_test_logstash_prefix_separator() flb_destroy(ctx); } +void flb_test_target_index() +{ + int ret; + int size = sizeof(JSON_ES) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Override defaults of index and type */ + flb_output_set(ctx, out_ffd, + "target_index", "aaa-$END_KEY", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_target_index, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_target_index_default() +{ + int ret; + int size = sizeof(JSON_ES) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Override defaults of index and type */ + flb_output_set(ctx, out_ffd, + "target_index", "aaa-$not_found_key", + "index", "default", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_target_index_default, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + /* Test list */ TEST_LIST = { {"long_index" , flb_test_long_index }, @@ -814,5 +932,7 @@ TEST_LIST = { {"replace_dots" , flb_test_replace_dots }, {"id_key" , flb_test_id_key }, {"logstash_prefix_separator" , flb_test_logstash_prefix_separator }, + {"target_index" , flb_test_target_index }, + {"target_index_default" , flb_test_target_index_default }, {NULL, NULL} };