Skip to content

Commit

Permalink
out_es: Detect if index uses RA syntax as in out_opensearch plugin
Browse files Browse the repository at this point in the history
Signed-off-by: Victor Cabezas <[email protected]>
  • Loading branch information
Victor Cabezas committed Dec 4, 2023
1 parent bdb3e28 commit 55be71f
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 104 deletions.
30 changes: 9 additions & 21 deletions plugins/out_es/es.c
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ static int elasticsearch_format(struct flb_config *config,
size_t off = 0;
size_t off_prev = 0;
char *es_index;
char target_index_value[256];
char index_value[256];
char logstash_index[256];
char time_formatted[256];
char index_formatted[256];
Expand Down Expand Up @@ -351,14 +351,14 @@ static int elasticsearch_format(struct flb_config *config,
}

/*
* If logstash format, target_index record accessor and id generation are disabled,
* If logstash format, index record accessor and id generation are disabled,
* pre-generate the index line for all records.
*
* The header stored in 'j_index' will be used for the all records on
* this payload.
*/
if (ctx->logstash_format == FLB_FALSE &&
ctx->generate_id == FLB_FALSE && ctx->ra_target_index == NULL) {
ctx->generate_id == FLB_FALSE && ctx->ra_index == NULL) {

flb_time_get(&tms);
gmtime_r(&tms.tm.tv_sec, &tm);
Expand Down Expand Up @@ -456,25 +456,20 @@ static int elasticsearch_format(struct flb_config *config,
msgpack_pack_str_body(&tmp_pck, time_formatted, s);

es_index = ctx->index;
if (ctx->ra_target_index) {
flb_sds_t v = flb_ra_translate_check (ctx->ra_target_index,
if (ctx->ra_index) {
flb_sds_t v = flb_ra_translate(ctx->ra_index,
(char *) tag, tag_len,
map, NULL, FLB_TRUE);
map, NULL);
if (v) {
len = flb_sds_len(v);
if (len > 128) {
len = 128;
}
memcpy(target_index_value, v, len);
target_index_value[len] = '\0';
memcpy(index_value, v, len);
index_value[len] = '\0';
es_index_custom_len = len;
flb_sds_destroy(v);
es_index = target_index_value;
}
else {
flb_plg_warn(ctx->ins,
"the value of %s is missing for target_index_key",
ctx->target_index);
es_index = index_value;
}
if (ctx->generate_id == FLB_FALSE) {
if (ctx->suppress_type_name) {
Expand Down Expand Up @@ -1219,13 +1214,6 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_elasticsearch, id_key),
"If set, _id will be the value of the key from incoming record."
},
{
FLB_CONFIG_MAP_STR, "target_index", NULL,
0, FLB_TRUE, offsetof(struct flb_elasticsearch, target_index),
"Compose index name using record accessor syntax. If any record accessor "
"variable is invalid (i.e.: The key is not present in event) the value of 'index' "
"will be used."
},
{
FLB_CONFIG_MAP_BOOL, "replace_dots", "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch, replace_dots),
Expand Down
6 changes: 2 additions & 4 deletions plugins/out_es/es.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
struct flb_elasticsearch {
/* Elasticsearch index (database) and type (table) */
char *index;
struct flb_record_accessor *ra_index;

char *type;
char suppress_type_name;

Expand Down Expand Up @@ -118,10 +120,6 @@ struct flb_elasticsearch {
flb_sds_t id_key;
struct flb_record_accessor *ra_id_key;

/* target_index */
flb_sds_t target_index;
struct flb_record_accessor *ra_target_index;

/* include_tag_key */
int include_tag_key;
flb_sds_t tag_key;
Expand Down
29 changes: 17 additions & 12 deletions plugins/out_es/es_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,20 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins,
if (f_type) {
ctx->type = flb_strdup(f_type->value); /* FIXME */
}
else {
/* Check if the index has been set in the configuration */
if (ctx->index) {
/* do we have a record accessor pattern ? */
if (strchr(ctx->index, '$')) {
ctx->ra_index = flb_ra_create(ctx->index, FLB_TRUE);
if (!ctx->ra_index) {
flb_plg_error(ctx->ins, "invalid record accessor pattern set for 'index' property");
flb_es_conf_destroy(ctx);
return NULL;
}
}
}
}

/* HTTP Payload (response) maximum buffer size (0 == unlimited) */
if (ctx->buffer_size == -1) {
Expand All @@ -290,15 +304,6 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins,
snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk", path);
}

/* Set index record accessor */
if (ctx->target_index) {
ctx->ra_target_index = flb_ra_create(ctx->target_index, FLB_TRUE);
if (!ctx->ra_target_index) {
flb_plg_error(ctx->ins, "invalid record accessor expression for index '%s'", ctx->target_index);
flb_es_conf_destroy(ctx);
return NULL;
}
}

if (ctx->id_key) {
ctx->ra_id_key = flb_ra_create(ctx->id_key, FLB_FALSE);
Expand Down Expand Up @@ -508,9 +513,9 @@ int flb_es_conf_destroy(struct flb_elasticsearch *ctx)
flb_ra_destroy(ctx->ra_id_key);
ctx->ra_id_key = NULL;
}
if (ctx->ra_target_index) {
flb_ra_destroy(ctx->ra_target_index);
ctx->ra_target_index = NULL;
if (ctx->ra_index) {
flb_ra_destroy(ctx->ra_index);
ctx->ra_index = NULL;
}
if (ctx->es_action) {
flb_free(ctx->es_action);
Expand Down
1 change: 1 addition & 0 deletions run_code_analysis.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@ fi
-e INPUT_DEPENDENCIES_DEBIAN="$ADDITIONAL_DEPS" \
-e INPUT_CMAKEFLAGS="$FLB_CMAKE_OPTIONS $SKIP" \
-e INPUT_PRE_COMMAND="cp -R /source /tmp" \
-e INPUT_TEST_COMMAND="${INPUT_TEST_COMMAND}" \
-e INPUT_WORKING-DIRECTORY="/tmp/source" \
lpenz/ghaction-cmake:0.19
72 changes: 5 additions & 67 deletions tests/runtime/out_elasticsearch.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ static void cb_check_id_key(void *ctx, int ffd,
flb_free(res_data);
}

static void cb_check_target_index(void *ctx, int ffd,
static void cb_check_index_ra(void *ctx, int ffd,
int res_ret, void *res_data, size_t res_size,
void *data)
{
Expand All @@ -180,22 +180,6 @@ static void cb_check_target_index(void *ctx, int ffd,
flb_free(res_data);
}

static void cb_check_target_index_default(void *ctx, int ffd,
int res_ret, void *res_data, size_t res_size,
void *data)
{
char *p;
char *out_js = res_data;
char *index_line = "{\"create\":{\"_index\":\"default\",\"_type\":\"_doc\"}";

p = strstr(out_js, index_line);
TEST_CHECK(p != NULL);
if(!TEST_CHECK(p != NULL)) {
TEST_MSG("Got: %s", out_js);
}
flb_free(res_data);
}

void flb_test_write_operation_index()
{
int ret;
Expand Down Expand Up @@ -828,51 +812,7 @@ void flb_test_logstash_prefix_separator()
flb_destroy(ctx);
}

void flb_test_target_index()
{
int ret;
int size = sizeof(JSON_ES) - 1;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;

/* Create context, flush every second (some checks omitted here) */
ctx = flb_create();
flb_service_set(ctx, "flush", "1", "grace", "1", NULL);

/* Lib input mode */
in_ffd = flb_input(ctx, (char *) "lib", NULL);
flb_input_set(ctx, in_ffd, "tag", "test", NULL);

/* Elasticsearch output */
out_ffd = flb_output(ctx, (char *) "es", NULL);
flb_output_set(ctx, out_ffd,
"match", "test",
NULL);

/* Override defaults of index and type */
flb_output_set(ctx, out_ffd,
"target_index", "aaa-$END_KEY",
NULL);

/* Enable test mode */
ret = flb_output_set_test(ctx, out_ffd, "formatter",
cb_check_target_index,
NULL, NULL);

/* Start */
ret = flb_start(ctx);
TEST_CHECK(ret == 0);

/* Ingest data sample */
flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size);

sleep(2);
flb_stop(ctx);
flb_destroy(ctx);
}

void flb_test_target_index_default()
void flb_test_index_ra()
{
int ret;
int size = sizeof(JSON_ES) - 1;
Expand All @@ -896,13 +836,12 @@ void flb_test_target_index_default()

/* Override defaults of index and type */
flb_output_set(ctx, out_ffd,
"target_index", "aaa-$not_found_key",
"index", "default",
"index", "aaa-$END_KEY",
NULL);

/* Enable test mode */
ret = flb_output_set_test(ctx, out_ffd, "formatter",
cb_check_target_index_default,
cb_check_index_ra,
NULL, NULL);

/* Start */
Expand All @@ -926,13 +865,12 @@ TEST_LIST = {
{"write_operation_update", flb_test_write_operation_update },
{"write_operation_upsert", flb_test_write_operation_upsert },
{"index_type" , flb_test_index_type },
{"index_ra" , flb_test_index_ra},
{"logstash_format" , flb_test_logstash_format },
{"logstash_format_nanos" , flb_test_logstash_format_nanos },
{"tag_key" , flb_test_tag_key },
{"replace_dots" , flb_test_replace_dots },
{"id_key" , flb_test_id_key },
{"logstash_prefix_separator" , flb_test_logstash_prefix_separator },
{"target_index" , flb_test_target_index },
{"target_index_default" , flb_test_target_index_default },
{NULL, NULL}
};

0 comments on commit 55be71f

Please sign in to comment.