Skip to content

Commit

Permalink
out_es: Add target_index variable using record accessor syntax.
Browse files Browse the repository at this point in the history
Add target_index variable for rendering ES index name using record
accessor syntax without extra bytes (i.e.: no time component on index
name).

Signed-off-by: Victor Cabezas <[email protected]>
  • Loading branch information
Victor Cabezas committed Aug 16, 2023
1 parent 6fd4a75 commit b3d491d
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 3 deletions.
52 changes: 49 additions & 3 deletions plugins/out_es/es.c
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ static int elasticsearch_format(struct flb_config *config,
size_t off = 0;
size_t off_prev = 0;
char *es_index;
char target_index_value[256];
char logstash_index[256];
char time_formatted[256];
char index_formatted[256];
Expand Down Expand Up @@ -350,13 +351,15 @@ static int elasticsearch_format(struct flb_config *config,
}

/*
* If logstash format and id generation are disabled, pre-generate
* the index line for all records.
* If logstash format, target_index record accessor and id generation are disabled,
* pre-generate the index line for all records.
*
* The header stored in 'j_index' will be used for the all records on
* this payload.
*/
if (ctx->logstash_format == FLB_FALSE && ctx->generate_id == FLB_FALSE) {
if (ctx->logstash_format == FLB_FALSE &&
ctx->generate_id == FLB_FALSE && ctx->ra_target_index == NULL) {

flb_time_get(&tms);
gmtime_r(&tms.tm.tv_sec, &tm);
strftime(index_formatted, sizeof(index_formatted) - 1,
Expand Down Expand Up @@ -453,6 +456,42 @@ static int elasticsearch_format(struct flb_config *config,
msgpack_pack_str_body(&tmp_pck, time_formatted, s);

es_index = ctx->index;
if (ctx->ra_target_index) {
flb_sds_t v = flb_ra_translate_check (ctx->ra_target_index,
(char *) tag, tag_len,
map, NULL, FLB_TRUE);
if (v) {
len = flb_sds_len(v);
if (len > 128) {
len = 128;
}
memcpy(target_index_value, v, len);
target_index_value[len] = '\0';
es_index_custom_len = len;
flb_sds_destroy(v);
es_index = target_index_value;
}
else {
flb_plg_warn(ctx->ins,
"the value of %s is missing for target_index_key",
ctx->target_index);
}
if (ctx->generate_id == FLB_FALSE) {
if (ctx->suppress_type_name) {
index_len = flb_sds_snprintf (&j_index,
flb_sds_alloc (j_index),
ES_BULK_INDEX_FMT_WITHOUT_TYPE,
ctx->es_action, es_index);
}
else {
index_len = flb_sds_snprintf (&j_index,
flb_sds_alloc (j_index),
ES_BULK_INDEX_FMT,
ctx->es_action,
es_index, ctx->type);
}
}
}
if (ctx->logstash_format == FLB_TRUE) {
ret = compose_index_header(ctx, es_index_custom_len,
&logstash_index[0], sizeof(logstash_index),
Expand Down Expand Up @@ -1180,6 +1219,13 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_elasticsearch, id_key),
"If set, _id will be the value of the key from incoming record."
},
{
FLB_CONFIG_MAP_STR, "target_index", NULL,
0, FLB_TRUE, offsetof(struct flb_elasticsearch, target_index),
"Compose index name using record accessor syntax. If any record accessor "
"variable is invalid (i.e.: The key is not present in event) the value of 'index' "
"will be used."
},
{
FLB_CONFIG_MAP_BOOL, "replace_dots", "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch, replace_dots),
Expand Down
4 changes: 4 additions & 0 deletions plugins/out_es/es.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ struct flb_elasticsearch {
flb_sds_t id_key;
struct flb_record_accessor *ra_id_key;

/* target_index */
flb_sds_t target_index;
struct flb_record_accessor *ra_target_index;

/* include_tag_key */
int include_tag_key;
flb_sds_t tag_key;
Expand Down
15 changes: 15 additions & 0 deletions plugins/out_es/es_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,17 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins,
snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk", path);
}

/* Set index record accessor */
if (ctx->target_index) {
ctx->ra_target_index = flb_ra_create(ctx->target_index, FLB_TRUE);
fprintf(stderr, "record accessor at %p", ctx->ra_target_index);
if (!ctx->ra_target_index) {
flb_plg_error(ctx->ins, "invalid record accessor expression for index '%s'", ctx->target_index);
flb_es_conf_destroy(ctx);
return NULL;
}
}

if (ctx->id_key) {
ctx->ra_id_key = flb_ra_create(ctx->id_key, FLB_FALSE);
if (ctx->ra_id_key == NULL) {
Expand Down Expand Up @@ -479,6 +490,10 @@ int flb_es_conf_destroy(struct flb_elasticsearch *ctx)
flb_ra_destroy(ctx->ra_id_key);
ctx->ra_id_key = NULL;
}
if (ctx->ra_target_index) {
flb_ra_destroy(ctx->ra_target_index);
ctx->ra_target_index = NULL;
}
if (ctx->es_action) {
flb_free(ctx->es_action);
}
Expand Down
120 changes: 120 additions & 0 deletions tests/runtime/out_elasticsearch.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,35 @@ static void cb_check_id_key(void *ctx, int ffd,
flb_free(res_data);
}

static void cb_check_target_index(void *ctx, int ffd,
int res_ret, void *res_data, size_t res_size,
void *data)
{
char *p;
char *out_js = res_data;
char *index_line = "{\"create\":{\"_index\":\"aaa-JSON_END\",\"_type\":\"_doc\"}";

p = strstr(out_js, index_line);
TEST_CHECK(p != NULL);
flb_free(res_data);
}

static void cb_check_target_index_default(void *ctx, int ffd,
int res_ret, void *res_data, size_t res_size,
void *data)
{
char *p;
char *out_js = res_data;
char *index_line = "{\"create\":{\"_index\":\"default\",\"_type\":\"_doc\"}";

p = strstr(out_js, index_line);
TEST_CHECK(p != NULL);
if(!TEST_CHECK(p != NULL)) {
TEST_MSG("Got: %s", out_js);
}
flb_free(res_data);
}

void flb_test_write_operation_index()
{
int ret;
Expand Down Expand Up @@ -799,6 +828,95 @@ void flb_test_logstash_prefix_separator()
flb_destroy(ctx);
}

void flb_test_target_index()
{
int ret;
int size = sizeof(JSON_ES) - 1;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;

/* Create context, flush every second (some checks omitted here) */
ctx = flb_create();
flb_service_set(ctx, "flush", "1", "grace", "1", NULL);

/* Lib input mode */
in_ffd = flb_input(ctx, (char *) "lib", NULL);
flb_input_set(ctx, in_ffd, "tag", "test", NULL);

/* Elasticsearch output */
out_ffd = flb_output(ctx, (char *) "es", NULL);
flb_output_set(ctx, out_ffd,
"match", "test",
NULL);

/* Override defaults of index and type */
flb_output_set(ctx, out_ffd,
"target_index", "aaa-$END_KEY",
NULL);

/* Enable test mode */
ret = flb_output_set_test(ctx, out_ffd, "formatter",
cb_check_target_index,
NULL, NULL);

/* Start */
ret = flb_start(ctx);
TEST_CHECK(ret == 0);

/* Ingest data sample */
flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size);

sleep(2);
flb_stop(ctx);
flb_destroy(ctx);
}

void flb_test_target_index_default()
{
int ret;
int size = sizeof(JSON_ES) - 1;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;

/* Create context, flush every second (some checks omitted here) */
ctx = flb_create();
flb_service_set(ctx, "flush", "1", "grace", "1", NULL);

/* Lib input mode */
in_ffd = flb_input(ctx, (char *) "lib", NULL);
flb_input_set(ctx, in_ffd, "tag", "test", NULL);

/* Elasticsearch output */
out_ffd = flb_output(ctx, (char *) "es", NULL);
flb_output_set(ctx, out_ffd,
"match", "test",
NULL);

/* Override defaults of index and type */
flb_output_set(ctx, out_ffd,
"target_index", "aaa-$not_found_key",
"index", "default",
NULL);

/* Enable test mode */
ret = flb_output_set_test(ctx, out_ffd, "formatter",
cb_check_target_index_default,
NULL, NULL);

/* Start */
ret = flb_start(ctx);
TEST_CHECK(ret == 0);

/* Ingest data sample */
flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size);

sleep(2);
flb_stop(ctx);
flb_destroy(ctx);
}

/* Test list */
TEST_LIST = {
{"long_index" , flb_test_long_index },
Expand All @@ -814,5 +932,7 @@ TEST_LIST = {
{"replace_dots" , flb_test_replace_dots },
{"id_key" , flb_test_id_key },
{"logstash_prefix_separator" , flb_test_logstash_prefix_separator },
{"target_index" , flb_test_target_index },
{"target_index_default" , flb_test_target_index_default },
{NULL, NULL}
};

0 comments on commit b3d491d

Please sign in to comment.