From 63ad8e5766e6f5311ea0072f3bdf2c588f0c7432 Mon Sep 17 00:00:00 2001 From: Marat Abrarov Date: Sun, 18 Jun 2023 00:36:37 +0300 Subject: [PATCH] pipeline: outputs: es: support overriding the most plugin parameters with Upstream node configuration For Elastic cloud authentication these parameters are always taken from plugin configuration and never from Upstream node configuration: cloud_id. For AWS authentication these parameters are always taken from plugin configuration and never from Upstream node configuration: http_proxy, no_proxy, tls*. Signed-off-by: Marat Abrarov --- plugins/out_es/es.c | 73 ++-- plugins/out_es/es.h | 26 +- plugins/out_es/es_conf.c | 657 ++++++++++++++++++++++++++++----- plugins/out_es/es_conf.h | 5 + plugins/out_es/es_conf_parse.c | 12 + plugins/out_es/es_conf_prop.h | 64 ++++ 6 files changed, 701 insertions(+), 136 deletions(-) create mode 100644 plugins/out_es/es_conf_prop.h diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index d14f3adb02c..e869b1c793b 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -35,6 +35,7 @@ #include "es.h" #include "es_conf.h" +#include "es_conf_prop.h" #include "es_bulk.h" #include "murmur3.h" @@ -1030,48 +1031,48 @@ static int cb_es_exit(void *data, struct flb_config *config) /* Configuration properties map */ static struct flb_config_map config_map[] = { { - FLB_CONFIG_MAP_STR, "index", FLB_ES_DEFAULT_INDEX, + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_INDEX, FLB_ES_DEFAULT_INDEX, 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, index), "Set an index name" }, { - FLB_CONFIG_MAP_STR, "type", FLB_ES_DEFAULT_TYPE, + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_TYPE, FLB_ES_DEFAULT_TYPE, 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, type), "Set the document type property" }, { - FLB_CONFIG_MAP_BOOL, "suppress_type_name", "false", + FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_SUPPRESS_TYPE_NAME, "false", 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, suppress_type_name), "If true, mapping types is removed. (for v7.0.0 or later)" }, /* HTTP Authentication */ { - FLB_CONFIG_MAP_STR, "http_user", NULL, + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_HTTP_USER, NULL, 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, http_user), "Optional username credential for Elastic X-Pack access" }, { - FLB_CONFIG_MAP_STR, "http_passwd", "", + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_HTTP_PASSWD, "", 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, http_passwd), "Password for user defined in HTTP_User" }, /* HTTP Compression */ { - FLB_CONFIG_MAP_STR, "compress", NULL, + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_COMPRESS, NULL, 0, FLB_FALSE, 0, "Set payload compression mechanism. Option available is 'gzip'" }, /* Cloud Authentication */ { - FLB_CONFIG_MAP_STR, "cloud_id", NULL, + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_CLOUD_ID, NULL, 0, FLB_FALSE, 0, "Elastic cloud ID of the cluster to connect to" }, { - FLB_CONFIG_MAP_STR, "cloud_auth", NULL, + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_CLOUD_AUTH, NULL, 0, FLB_FALSE, 0, "Elastic cloud authentication credentials" }, @@ -1079,37 +1080,37 @@ static struct flb_config_map config_map[] = { /* AWS Authentication */ #ifdef FLB_HAVE_AWS { - FLB_CONFIG_MAP_BOOL, "aws_auth", "false", + FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_AWS_AUTH, "false", 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, has_aws_auth), "Enable AWS Sigv4 Authentication" }, { - FLB_CONFIG_MAP_STR, "aws_region", NULL, + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_REGION, NULL, 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, aws_region), "AWS Region of your Amazon OpenSearch Service cluster" }, { - FLB_CONFIG_MAP_STR, "aws_sts_endpoint", NULL, + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_STS_ENDPOINT, NULL, 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, aws_sts_endpoint), "Custom endpoint for the AWS STS API, used with the AWS_Role_ARN option" }, { - FLB_CONFIG_MAP_STR, "aws_role_arn", NULL, + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_ROLE_ARN, NULL, 0, FLB_FALSE, 0, "AWS IAM Role to assume to put records to your Amazon OpenSearch cluster" }, { - FLB_CONFIG_MAP_STR, "aws_external_id", NULL, + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_EXTERNAL_ID, NULL, 0, FLB_FALSE, 0, "External ID for the AWS IAM Role specified with `aws_role_arn`" }, { - FLB_CONFIG_MAP_STR, "aws_service_name", "es", + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_SERVICE_NAME, "es", 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, aws_service_name), "AWS Service Name" }, { - FLB_CONFIG_MAP_STR, "aws_profile", NULL, + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_PROFILE, NULL, 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, aws_profile), "AWS Profile name. AWS Profiles can be configured with AWS CLI and are usually stored in " "$HOME/.aws/ directory." @@ -1118,12 +1119,12 @@ static struct flb_config_map config_map[] = { /* Logstash compatibility */ { - FLB_CONFIG_MAP_BOOL, "logstash_format", "false", + FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_LOGSTASH_FORMAT, "false", 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_format), "Enable Logstash format compatibility" }, { - FLB_CONFIG_MAP_STR, "logstash_prefix", FLB_ES_DEFAULT_PREFIX, + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX, FLB_ES_DEFAULT_PREFIX, 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_prefix), "When Logstash_Format is enabled, the Index name is composed using a prefix " "and the date, e.g: If Logstash_Prefix is equals to 'mydata' your index will " @@ -1131,12 +1132,12 @@ static struct flb_config_map config_map[] = { "when the data is being generated" }, { - FLB_CONFIG_MAP_STR, "logstash_prefix_separator", "-", + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX_SEPARATOR, "-", 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_prefix_separator), "Set a separator between logstash_prefix and date." }, { - FLB_CONFIG_MAP_STR, "logstash_prefix_key", NULL, + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX_KEY, NULL, 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_prefix_key), "When included: the value in the record that belongs to the key will be looked " "up and over-write the Logstash_Prefix for index generation. If the key/value " @@ -1144,42 +1145,42 @@ static struct flb_config_map config_map[] = { "fallback. Nested keys are supported through record accessor pattern" }, { - FLB_CONFIG_MAP_STR, "logstash_dateformat", FLB_ES_DEFAULT_TIME_FMT, + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_LOGSTASH_DATEFORMAT, FLB_ES_DEFAULT_TIME_FMT, 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_dateformat), "Time format (based on strftime) to generate the second part of the Index name" }, /* Custom Time and Tag keys */ { - FLB_CONFIG_MAP_STR, "time_key", FLB_ES_DEFAULT_TIME_KEY, + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_TIME_KEY, FLB_ES_DEFAULT_TIME_KEY, 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, time_key), "When Logstash_Format is enabled, each record will get a new timestamp field. " "The Time_Key property defines the name of that field" }, { - FLB_CONFIG_MAP_STR, "time_key_format", FLB_ES_DEFAULT_TIME_KEYF, + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_TIME_KEY_FORMAT, FLB_ES_DEFAULT_TIME_KEYF, 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, time_key_format), "When Logstash_Format is enabled, this property defines the format of the " "timestamp" }, { - FLB_CONFIG_MAP_BOOL, "time_key_nanos", "false", + FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_TIME_KEY_NANOS, "false", 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, time_key_nanos), "When Logstash_Format is enabled, enabling this property sends nanosecond " "precision timestamps" }, { - FLB_CONFIG_MAP_BOOL, "include_tag_key", "false", + FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_INCLUDE_TAG_KEY, "false", 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, include_tag_key), "When enabled, it append the Tag name to the record" }, { - FLB_CONFIG_MAP_STR, "tag_key", FLB_ES_DEFAULT_TAG_KEY, + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_TAG_KEY, FLB_ES_DEFAULT_TAG_KEY, 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, tag_key), "When Include_Tag_Key is enabled, this property defines the key name for the tag" }, { - FLB_CONFIG_MAP_SIZE, "buffer_size", FLB_ES_DEFAULT_HTTP_MAX, + FLB_CONFIG_MAP_SIZE, FLB_ES_CONFIG_PROPERTY_BUFFER_SIZE, FLB_ES_DEFAULT_HTTP_MAX, 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, buffer_size), "Specify the buffer size used to read the response from the Elasticsearch HTTP " "service. This option is useful for debugging purposes where is required to read " @@ -1190,7 +1191,7 @@ static struct flb_config_map config_map[] = { /* Elasticsearch specifics */ { - FLB_CONFIG_MAP_STR, "path", NULL, + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_PATH, NULL, 0, FLB_FALSE, 0, "Elasticsearch accepts new data on HTTP query path '/_bulk'. But it is also " "possible to serve Elasticsearch behind a reverse proxy on a subpath. This " @@ -1198,7 +1199,7 @@ static struct flb_config_map config_map[] = { "prefix in the indexing HTTP POST URI" }, { - FLB_CONFIG_MAP_STR, "pipeline", NULL, + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_PIPELINE, NULL, 0, FLB_FALSE, 0, "Newer versions of Elasticsearch allows to setup filters called pipelines. " "This option allows to define which pipeline the database should use. For " @@ -1206,48 +1207,48 @@ static struct flb_config_map config_map[] = { "Fluent Bit side, avoid pipelines" }, { - FLB_CONFIG_MAP_BOOL, "generate_id", "false", + FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_GENERATE_ID, "false", 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, generate_id), "When enabled, generate _id for outgoing records. This prevents duplicate " "records when retrying ES" }, { - FLB_CONFIG_MAP_STR, "write_operation", "create", + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_WRITE_OPERATION, "create", 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, write_operation), "Operation to use to write in bulk requests" }, { - FLB_CONFIG_MAP_STR, "id_key", NULL, + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_ID_KEY, NULL, 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, id_key), "If set, _id will be the value of the key from incoming record." }, { - FLB_CONFIG_MAP_BOOL, "replace_dots", "false", + FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_REPLACE_DOTS, "false", 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, replace_dots), "When enabled, replace field name dots with underscore, required by Elasticsearch " "2.0-2.3." }, { - FLB_CONFIG_MAP_BOOL, "current_time_index", "false", + FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_CURRENT_TIME_INDEX, "false", 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, current_time_index), "Use current time for index generation instead of message record" }, /* Trace */ { - FLB_CONFIG_MAP_BOOL, "trace_output", "false", + FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_TRACE_OUTPUT, "false", 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, trace_output), "When enabled print the Elasticsearch API calls to stdout (for diag only)" }, { - FLB_CONFIG_MAP_BOOL, "trace_error", "false", + FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_TRACE_ERROR, "false", 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, trace_error), "When enabled print the Elasticsearch exception to stderr (for diag only)" }, { - FLB_CONFIG_MAP_STR, "upstream", NULL, + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_UPSTREAM, NULL, 0, FLB_FALSE, 0, "Path to 'upstream' configuration file (define multiple nodes)" }, diff --git a/plugins/out_es/es.h b/plugins/out_es/es.h index c10ac8ff6ba..ccb9ec111f1 100644 --- a/plugins/out_es/es.h +++ b/plugins/out_es/es.h @@ -34,10 +34,6 @@ #define FLB_ES_DEFAULT_TAG_KEY "flb-key" #define FLB_ES_DEFAULT_HTTP_MAX "512k" #define FLB_ES_DEFAULT_HTTPS_PORT 443 -#define FLB_ES_WRITE_OP_INDEX "index" -#define FLB_ES_WRITE_OP_CREATE "create" -#define FLB_ES_WRITE_OP_UPDATE "update" -#define FLB_ES_WRITE_OP_UPSERT "upsert" struct flb_elasticsearch_config { /* Elasticsearch index (database) and type (table) */ @@ -45,7 +41,7 @@ struct flb_elasticsearch_config { int own_index; char *type; int own_type; - char suppress_type_name; + int suppress_type_name; /* HTTP Auth */ char *http_user; @@ -53,7 +49,9 @@ struct flb_elasticsearch_config { /* Elastic Cloud Auth */ char *cloud_user; + int own_cloud_user; char *cloud_passwd; + int own_cloud_passwd; /* AWS Auth */ #ifdef FLB_HAVE_AWS @@ -62,12 +60,17 @@ struct flb_elasticsearch_config { char *aws_sts_endpoint; char *aws_profile; struct flb_aws_provider *aws_provider; + int own_aws_provider; struct flb_aws_provider *base_aws_provider; + int own_base_aws_provider; /* tls instances can't be re-used; aws provider requires a separate one */ struct flb_tls *aws_tls; + int own_aws_tls; struct flb_tls *aws_sts_tls; + int own_aws_sts_tls; char *aws_service_name; struct mk_list *aws_unsigned_headers; + int own_aws_unsigned_headers; #endif /* HTTP Client Setup */ @@ -94,40 +97,51 @@ struct flb_elasticsearch_config { /* prefix */ flb_sds_t logstash_prefix; + int own_logstash_prefix; flb_sds_t logstash_prefix_separator; + int own_logstash_prefix_separator; /* prefix key */ flb_sds_t logstash_prefix_key; + int own_logstash_prefix_key; /* date format */ flb_sds_t logstash_dateformat; + int own_logstash_dateformat; /* time key */ flb_sds_t time_key; + int own_time_key; /* time key format */ flb_sds_t time_key_format; + int own_time_key_format; /* time key nanoseconds */ int time_key_nanos; /* write operation */ flb_sds_t write_operation; + int own_write_operation; /* write operation elasticsearch operation */ - flb_sds_t es_action; + const char *es_action; /* id_key */ flb_sds_t id_key; + int own_id_key; struct flb_record_accessor *ra_id_key; + int own_ra_id_key; /* include_tag_key */ int include_tag_key; flb_sds_t tag_key; + int own_tag_key; /* Elasticsearch HTTP API */ char uri[256]; struct flb_record_accessor *ra_prefix_key; + int own_ra_prefix_key; /* Compression mode (gzip) */ int compress_gzip; diff --git a/plugins/out_es/es_conf.c b/plugins/out_es/es_conf.c index 71bc3942b8c..aa1df9acc12 100644 --- a/plugins/out_es/es_conf.c +++ b/plugins/out_es/es_conf.c @@ -21,23 +21,167 @@ #include #include #include +#include #include #include #include -#include "es_conf.h" -#include "es_conf_parse.h" #include "es.h" +#include "es_conf_parse.h" +#include "es_conf_prop.h" +#include "es_conf.h" + +static const char * const es_default_path = ""; +static const char * const es_write_op_index = FLB_ES_WRITE_OP_INDEX; +static const char * const es_write_op_create = FLB_ES_WRITE_OP_CREATE; +static const char * const es_write_op_update = FLB_ES_WRITE_OP_UPDATE; +static const char * const es_write_op_upsert = FLB_ES_WRITE_OP_UPSERT; + +static int config_set_ra_id_key(flb_sds_t id_key, struct flb_elasticsearch_config *ec, + struct flb_elasticsearch *ctx) +{ + if (!id_key) { + return 0; + } + + ec->ra_id_key = flb_ra_create(id_key, FLB_FALSE); + if (ec->ra_id_key == NULL) { + flb_plg_error(ctx->ins, "could not create record accessor for Id Key"); + return -1; + } + ec->own_ra_id_key = FLB_TRUE; + + if (ec->generate_id == FLB_TRUE) { + flb_plg_warn(ctx->ins, "Generate_ID is ignored when ID_key is set"); + ec->generate_id = FLB_FALSE; + } + + return 0; +} + +static int config_set_es_action(const char *write_operation, + const struct flb_record_accessor *ra_id_key, + int generate_id, + struct flb_elasticsearch_config *ec, + struct flb_elasticsearch *ctx) +{ + if (!write_operation) { + return 0; + } + + if (strcasecmp(write_operation, es_write_op_index) == 0) { + ec->es_action = es_write_op_index; + } + else if (strcasecmp(write_operation, es_write_op_create) == 0) { + ec->es_action = es_write_op_create; + } + else if (strcasecmp(write_operation, es_write_op_update) == 0 + || strcasecmp(write_operation, es_write_op_upsert) == 0) { + ec->es_action = es_write_op_update; + } + else { + flb_plg_error(ctx->ins, + "wrong Write_Operation (should be one of index, create, update, upsert)"); + return -1; + } + + if (strcasecmp(ec->es_action, es_write_op_update) == 0 + && !ra_id_key + && generate_id == FLB_FALSE) { + flb_plg_error(ctx->ins, + "Id_Key or Generate_Id must be set when Write_Operation update or upsert"); + return -1; + } + + return 0; +} + +static size_t config_adjust_buffer_size(size_t buffer_size) +{ + /* HTTP Payload (response) maximum buffer size (0 == unlimited) */ + if (buffer_size == -1) { + return 0; + } + return buffer_size; +} + +static int config_is_compressed_gzip(const char *compress) +{ + if (strcasecmp(compress, "gzip") == 0) { + return FLB_TRUE; + } + return FLB_FALSE; +} + +static int config_set_pipeline(const char *path, const char *pipeline, + struct flb_elasticsearch_config *ec) +{ + int ret; + + if (!path) { + path = es_default_path; + } + + if (pipeline && flb_str_emptyval(pipeline) != FLB_TRUE) { + ret = snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk/?pipeline=%s", path, + pipeline); + } + else { + ret = snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk", path); + } + + if (ret < 0 || ret >= sizeof(ec->uri)) { + return -1; + } + return 0; +} + +static int config_set_ra_prefix_key(flb_sds_t logstash_prefix_key, + struct flb_elasticsearch_config *ec, + struct flb_elasticsearch *ctx) +{ + size_t len; + char *buf; + + if (!logstash_prefix_key) { + return 0; + } + + if (logstash_prefix_key[0] != '$') { + len = flb_sds_len(logstash_prefix_key); + buf = flb_malloc(len + 2); + if (!buf) { + flb_errno(); + return -1; + } + buf[0] = '$'; + memcpy(buf + 1, logstash_prefix_key, len); + buf[len + 1] = '\0'; + + ec->ra_prefix_key = flb_ra_create(buf, FLB_TRUE); + ec->own_ra_prefix_key = FLB_TRUE; + flb_free(buf); + } + else { + ec->ra_prefix_key = flb_ra_create(logstash_prefix_key, FLB_TRUE); + ec->own_ra_prefix_key = FLB_TRUE; + } + + if (!ec->ra_prefix_key) { + flb_plg_error(ctx->ins, "invalid logstash_prefix_key pattern '%s'", + logstash_prefix_key); + return -1; + } + + return 0; +} static int config_set_properties(struct flb_elasticsearch_config *ec, struct flb_elasticsearch *ctx, struct flb_config *config) { - size_t len; - ssize_t ret; - char *buf; + int ret; const char *tmp; - const char *path; struct flb_uri *uri = ctx->ins->host.uri; struct flb_uri_field *f_index = NULL; struct flb_uri_field *f_type = NULL; @@ -45,14 +189,13 @@ static int config_set_properties(struct flb_elasticsearch_config *ec, if (uri) { if (uri->count >= 2) { f_index = flb_uri_get(uri, 0); - f_type = flb_uri_get(uri, 1); + f_type = flb_uri_get(uri, 1); } } /* handle cloud_id */ - ret = flb_es_conf_set_cloud_auth(flb_output_get_property("cloud_id", - ctx->ins), - ctx); + ret = flb_es_conf_set_cloud_auth( + flb_output_get_property(FLB_ES_CONFIG_PROPERTY_CLOUD_ID, ctx->ins), ctx); if (ret != 0) { flb_plg_error(ctx->ins, "cannot configure cloud_id"); return -1; @@ -65,21 +208,21 @@ static int config_set_properties(struct flb_elasticsearch_config *ec, return -1; } + ec->buffer_size = config_adjust_buffer_size(ec->buffer_size); + /* handle cloud_auth */ ret = flb_es_conf_set_cloud_credentials( - flb_output_get_property("cloud_auth", ctx->ins), ec); + flb_output_get_property(FLB_ES_CONFIG_PROPERTY_CLOUD_AUTH, ctx->ins), ec); if (ret != 0) { flb_plg_error(ctx->ins, "cannot configure cloud_auth"); return -1; } /* Compress (gzip) */ - tmp = flb_output_get_property("compress", ctx->ins); + tmp = flb_output_get_property(FLB_ES_CONFIG_PROPERTY_COMPRESS, ctx->ins); ec->compress_gzip = FLB_FALSE; if (tmp) { - if (strcasecmp(tmp, "gzip") == 0) { - ec->compress_gzip = FLB_TRUE; - } + ec->compress_gzip = config_is_compressed_gzip(tmp); } /* Set manual Index and Type */ @@ -93,98 +236,339 @@ static int config_set_properties(struct flb_elasticsearch_config *ec, ec->own_type = FLB_TRUE; } - /* HTTP Payload (response) maximum buffer size (0 == unlimited) */ - if (ec->buffer_size == -1) { - ec->buffer_size = 0; + /* Elasticsearch: path and pipeline */ + ret = config_set_pipeline( + flb_output_get_property(FLB_ES_CONFIG_PROPERTY_PATH, ctx->ins), + flb_output_get_property(FLB_ES_CONFIG_PROPERTY_PIPELINE, ctx->ins), + ec); + if (ret != 0) { + flb_plg_error(ctx->ins, "cannot configure path and/or pipeline"); + return -1; } - /* Elasticsearch: Path */ - path = flb_output_get_property("path", ctx->ins); - if (!path) { - path = ""; + ret = config_set_ra_id_key(ec->id_key, ec, ctx); + if (ret != 0) { + return -1; + } + + ret = config_set_es_action(ec->write_operation, ec->ra_id_key, ec->generate_id, ec, + ctx); + if (ret != 0) { + return -1; + } + + ret = config_set_ra_prefix_key(ec->logstash_prefix_key, ec, ctx); + if (ret != 0) { + return -1; + } + +#ifdef FLB_HAVE_AWS + ret = flb_es_set_aws_unsigned_headers(ec); + if (ret != 0) { + flb_plg_error(ctx->ins, "cannot configure AWS unsigned headers"); + return -1; } - /* Elasticsearch: Pipeline */ - tmp = flb_output_get_property("pipeline", ctx->ins); + ret = flb_es_conf_set_aws_provider( + flb_output_get_property(FLB_ES_CONFIG_PROPERTY_AWS_EXTERNAL_ID, ctx->ins), + flb_output_get_property(FLB_ES_CONFIG_PROPERTY_AWS_ROLE_ARN, ctx->ins), + ec, ctx, config); + if (ret != 0) { + flb_plg_error(ctx->ins, "cannot configure AWS authentication"); + return -1; + } +#endif + + return 0; +} + +static int config_set_node_properties(struct flb_upstream_node *node, + struct flb_elasticsearch_config *ec, + struct flb_elasticsearch_config *base, + struct flb_elasticsearch *ctx, + struct flb_config *config) +{ + const char *tmp; + int ret; + const char *path; + +#ifdef FLB_HAVE_AWS + const char *aws_external_id = NULL; + const char *aws_role_arn = NULL; + int aws_provider_node = FLB_FALSE; +#endif + + /* Copy base configuration */ + *ec = *base; + ec->own_index = FLB_FALSE; + ec->own_type = FLB_FALSE; + ec->own_cloud_user = FLB_FALSE; + ec->own_cloud_passwd = FLB_FALSE; + +#ifdef FLB_HAVE_AWS + ec->own_base_aws_provider = FLB_FALSE; + ec->own_aws_provider = FLB_FALSE; + ec->own_aws_tls = FLB_FALSE; + ec->own_aws_sts_tls = FLB_FALSE; + ec->own_aws_unsigned_headers = FLB_FALSE; +#endif + + ec->own_logstash_prefix = FLB_FALSE; + ec->own_logstash_prefix_separator = FLB_FALSE; + ec->own_logstash_prefix_key = FLB_FALSE; + ec->own_logstash_dateformat = FLB_FALSE; + ec->own_time_key = FLB_FALSE; + ec->own_time_key_format = FLB_FALSE; + ec->own_write_operation = FLB_FALSE; + ec->own_id_key = FLB_FALSE; + ec->own_ra_id_key = FLB_FALSE; + ec->own_ra_prefix_key = FLB_FALSE; + ec->own_tag_key = FLB_FALSE; + mk_list_entry_init(&ec->_head); + + /* Overwrite configuration from upstream node properties */ + + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_INDEX, node); if (tmp) { - snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk/?pipeline=%s", path, tmp); + ec->index = (char *)tmp; } - else { - snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk", path); + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_TYPE, node); + if (tmp) { + ec->type = (char *)tmp; + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_SUPPRESS_TYPE_NAME, node); + if (tmp) { + ec->suppress_type_name = flb_utils_bool(tmp); + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_HTTP_USER, node); + if (tmp) { + ec->http_user = (char *)tmp; + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_HTTP_PASSWD, node); + if (tmp) { + ec->http_passwd = (char *)tmp; + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_GENERATE_ID, node); + if (tmp) { + ec->generate_id = flb_utils_bool(tmp); } - if (ec->id_key) { - ec->ra_id_key = flb_ra_create(ec->id_key, FLB_FALSE); - if (ec->ra_id_key == NULL) { - flb_plg_error(ctx->ins, "could not create record accessor for Id Key"); +#ifdef FLB_HAVE_AWS + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_AWS_AUTH, node); + if (tmp) { + ec->has_aws_auth = flb_utils_bool(tmp); + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_AWS_REGION, node); + if (tmp) { + ec->aws_region = (char *)tmp; + aws_provider_node = FLB_TRUE; + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_AWS_STS_ENDPOINT, node); + if (tmp) { + ec->aws_sts_endpoint = (char *)tmp; + aws_provider_node = FLB_TRUE; + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_AWS_SERVICE_NAME, node); + if (tmp) { + ec->aws_service_name = (char *)tmp; + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_AWS_PROFILE, node); + if (tmp) { + ec->aws_profile = (char *)tmp; + aws_provider_node = FLB_TRUE; + } + if (ec->has_aws_auth) { + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_AWS_EXTERNAL_ID, + node); + if (tmp) { + aws_external_id = tmp; + aws_provider_node = FLB_TRUE; } - if (ec->generate_id == FLB_TRUE) { - flb_plg_warn(ctx->ins, "Generate_ID is ignored when ID_key is set"); - ec->generate_id = FLB_FALSE; + else { + aws_external_id = flb_output_get_property( + FLB_ES_CONFIG_PROPERTY_AWS_EXTERNAL_ID, ctx->ins); + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_AWS_ROLE_ARN, node); + if (tmp) { + aws_role_arn = tmp; + aws_provider_node = FLB_TRUE; + } + else { + aws_role_arn = flb_output_get_property(FLB_ES_CONFIG_PROPERTY_AWS_ROLE_ARN, + ctx->ins); } } +#endif - if (ec->write_operation) { - if (strcasecmp(ec->write_operation, FLB_ES_WRITE_OP_INDEX) == 0) { - ec->es_action = flb_strdup(FLB_ES_WRITE_OP_INDEX); + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_LOGSTASH_FORMAT, node); + if (tmp) { + ec->logstash_format = flb_utils_bool(tmp); + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX, node); + if (tmp) { + ec->logstash_prefix = flb_sds_create(tmp); + if (ec->logstash_prefix == NULL) { + return -1; + } + ec->own_logstash_prefix = FLB_TRUE; + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX_SEPARATOR, + node); + if (tmp) { + ec->logstash_prefix_separator = flb_sds_create(tmp); + if (ec->logstash_prefix_separator == NULL) { + return -1; } - else if (strcasecmp(ec->write_operation, FLB_ES_WRITE_OP_CREATE) == 0) { - ec->es_action = flb_strdup(FLB_ES_WRITE_OP_CREATE); + ec->own_logstash_prefix_separator = FLB_TRUE; + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_LOGSTASH_DATEFORMAT, + node); + if (tmp) { + ec->logstash_dateformat = flb_sds_create(tmp); + if (ec->logstash_dateformat == NULL) { + return -1; } - else if (strcasecmp(ec->write_operation, FLB_ES_WRITE_OP_UPDATE) == 0 - || strcasecmp(ec->write_operation, FLB_ES_WRITE_OP_UPSERT) == 0) { - ec->es_action = flb_strdup(FLB_ES_WRITE_OP_UPDATE); + ec->own_logstash_dateformat = FLB_TRUE; + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_TIME_KEY, node); + if (tmp) { + ec->time_key = flb_sds_create(tmp); + if (ec->time_key == NULL) { + return -1; } - else { - flb_plg_error(ctx->ins, "wrong Write_Operation (should be one of index, create, update, upsert)"); + ec->own_time_key = FLB_TRUE; + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_TIME_KEY_FORMAT, node); + if (tmp) { + ec->time_key_format = flb_sds_create(tmp); + if (ec->time_key_format == NULL) { return -1; } - if (strcasecmp(ec->es_action, FLB_ES_WRITE_OP_UPDATE) == 0 - && !ec->ra_id_key && ec->generate_id == FLB_FALSE) { - flb_plg_error(ctx->ins, "Id_Key or Generate_Id must be set when Write_Operation update or upsert"); + ec->own_time_key_format = FLB_TRUE; + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_TIME_KEY_NANOS, node); + if (tmp) { + ec->time_key_nanos = flb_utils_bool(tmp); + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_INCLUDE_TAG_KEY, node); + if (tmp) { + ec->include_tag_key = flb_utils_bool(tmp); + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_TAG_KEY, node); + if (tmp) { + ec->tag_key = flb_sds_create(tmp); + if (ec->tag_key == NULL) { return -1; } + ec->own_tag_key = FLB_TRUE; + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_BUFFER_SIZE, node); + if (tmp) { + ec->buffer_size = config_adjust_buffer_size(flb_utils_size_to_bytes(tmp)); + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_REPLACE_DOTS, node); + if (tmp) { + ec->replace_dots = flb_utils_bool(tmp); + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_CURRENT_TIME_INDEX, node); + if (tmp) { + ec->current_time_index = flb_utils_bool(tmp); + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_TRACE_OUTPUT, node); + if (tmp) { + ec->trace_output = flb_utils_bool(tmp); + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_TRACE_ERROR, node); + if (tmp) { + ec->trace_error = flb_utils_bool(tmp); } - if (ec->logstash_prefix_key) { - if (ec->logstash_prefix_key[0] != '$') { - len = flb_sds_len(ec->logstash_prefix_key); - buf = flb_malloc(len + 2); - if (!buf) { - flb_errno(); - return -1; - } - buf[0] = '$'; - memcpy(buf + 1, ec->logstash_prefix_key, len); - buf[len + 1] = '\0'; - - ec->ra_prefix_key = flb_ra_create(buf, FLB_TRUE); - flb_free(buf); + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX_KEY, + node); + if (tmp) { + ec->logstash_prefix_key = flb_sds_create(tmp); + if (ec->logstash_prefix_key == NULL) { + return -1; } - else { - ec->ra_prefix_key = flb_ra_create(ec->logstash_prefix_key, FLB_TRUE); + ec->own_logstash_prefix_key = FLB_TRUE; + ret = config_set_ra_prefix_key(ec->logstash_prefix_key, ec, ctx); + if (ret != 0) { + return -1; } + } - if (!ec->ra_prefix_key) { - flb_plg_error(ctx->ins, "invalid logstash_prefix_key pattern '%s'", tmp); + /* handle cloud_auth */ + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_CLOUD_AUTH, node); + if (tmp) { + ret = flb_es_conf_set_cloud_credentials(tmp, ec); + if (ret != 0) { + flb_plg_error(ctx->ins, "cannot configure cloud_auth"); return -1; } } -#ifdef FLB_HAVE_AWS - ret = flb_es_set_aws_unsigned_headers(ec); - if (ret != 0) { - flb_plg_error(ctx->ins, "cannot configure AWS unsigned headers"); - return -1; + /* Compress (gzip) */ + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_COMPRESS, node); + if (tmp) { + ec->compress_gzip = config_is_compressed_gzip(tmp); } - ret = flb_es_conf_set_aws_provider( - flb_output_get_property("aws_external_id", ctx->ins), - flb_output_get_property("aws_role_arn", ctx->ins), - ec, ctx, config); - if (ret != 0) { - flb_plg_error(ctx->ins, "cannot configure AWS authentication"); - return -1; + /* Elasticsearch: path and pipeline */ + path = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_PATH, node); + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_PIPELINE, node); + if (path || tmp) { + if (!path) { + path = flb_output_get_property(FLB_ES_CONFIG_PROPERTY_PATH, ctx->ins); + } + if (!tmp) { + tmp = flb_output_get_property(FLB_ES_CONFIG_PROPERTY_PIPELINE, ctx->ins); + } + ret = config_set_pipeline(path, tmp, ec); + if (ret != 0) { + flb_plg_error(ctx->ins, "cannot configure path and/or pipeline"); + return -1; + } + } + + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_ID_KEY, node); + if (tmp) { + ec->id_key = flb_sds_create(tmp); + if (ec->id_key == NULL) { + return -1; + } + ec->own_id_key = FLB_TRUE; + ret = config_set_ra_id_key(ec->id_key, ec, ctx); + if (ret != 0) { + return -1; + } + } + + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_WRITE_OPERATION, node); + if (tmp) { + ec->write_operation = flb_sds_create(tmp); + if (ec->write_operation == NULL) { + return -1; + } + ec->own_write_operation = FLB_TRUE; + ret = config_set_es_action(ec->write_operation, ec->ra_id_key, ec->generate_id, + ec, ctx); + + if (ret != 0) { + return -1; + } + } + +#ifdef FLB_HAVE_AWS + if ((base->has_aws_auth != ec->has_aws_auth) + || (base->has_aws_auth == FLB_TRUE + && ec->has_aws_auth == FLB_TRUE + && aws_provider_node == FLB_TRUE)) { + ret = flb_es_conf_set_aws_provider(aws_external_id, aws_role_arn, ec, ctx, + config); + if (ret != 0) { + flb_plg_error(ctx->ins, "cannot configure AWS authentication"); + return -1; + } } #endif @@ -193,43 +577,80 @@ static int config_set_properties(struct flb_elasticsearch_config *ec, static void elasticsearch_config_destroy(struct flb_elasticsearch_config *ec) { - if (ec->ra_id_key) { + if (ec->own_tag_key == FLB_TRUE) { + flb_sds_destroy(ec->tag_key); + } + + if (ec->ra_id_key && ec->own_ra_id_key == FLB_TRUE) { flb_ra_destroy(ec->ra_id_key); ec->ra_id_key = NULL; } - if (ec->es_action) { - flb_free(ec->es_action); + + if (ec->own_write_operation == FLB_TRUE) { + flb_sds_destroy(ec->write_operation); + } + + if (ec->own_id_key == FLB_TRUE) { + flb_sds_destroy(ec->id_key); + } + + if (ec->own_time_key_format == FLB_TRUE) { + flb_sds_destroy(ec->time_key_format); + } + + if (ec->own_time_key == FLB_TRUE) { + flb_sds_destroy(ec->time_key); + } + + if (ec->own_logstash_dateformat == FLB_TRUE) { + flb_sds_destroy(ec->logstash_dateformat); + } + + if (ec->own_logstash_prefix_key == FLB_TRUE) { + flb_sds_destroy(ec->logstash_prefix_key); + } + + if (ec->own_logstash_prefix_separator == FLB_TRUE) { + flb_sds_destroy(ec->logstash_prefix_separator); + } + + if (ec->own_logstash_prefix == FLB_TRUE) { + flb_sds_destroy(ec->logstash_prefix); } #ifdef FLB_HAVE_AWS - if (ec->base_aws_provider) { + if (ec->base_aws_provider && ec->own_base_aws_provider == FLB_TRUE) { flb_aws_provider_destroy(ec->base_aws_provider); } - if (ec->aws_provider) { + if (ec->aws_provider && ec->own_aws_provider == FLB_TRUE) { flb_aws_provider_destroy(ec->aws_provider); } - if (ec->aws_tls) { + if (ec->aws_tls && ec->own_aws_tls == FLB_TRUE) { flb_tls_destroy(ec->aws_tls); } - if (ec->aws_sts_tls) { + if (ec->aws_sts_tls && ec->own_aws_sts_tls == FLB_TRUE) { flb_tls_destroy(ec->aws_sts_tls); } - if (ec->aws_unsigned_headers) { + if (ec->aws_unsigned_headers && ec->own_aws_unsigned_headers == FLB_TRUE) { flb_slist_destroy(ec->aws_unsigned_headers); flb_free(ec->aws_unsigned_headers); } #endif - if (ec->ra_prefix_key) { + if (ec->ra_prefix_key && ec->own_ra_prefix_key == FLB_TRUE) { flb_ra_destroy(ec->ra_prefix_key); } - flb_free(ec->cloud_passwd); - flb_free(ec->cloud_user); + if (ec->own_cloud_passwd == FLB_TRUE) { + flb_free(ec->cloud_passwd); + } + if (ec->own_cloud_user == FLB_TRUE) { + flb_free(ec->cloud_user); + } if (ec->own_type == FLB_TRUE) { flb_free(ec->type); @@ -247,10 +668,12 @@ int es_config_ha(const char *upstream_file, struct flb_elasticsearch *ctx, { int ret; struct mk_list *head; + struct mk_list *tmp; struct flb_upstream_node *node; struct flb_elasticsearch_config *ec; + struct flb_elasticsearch_config *node_ec; - /* Create elasticsearch_config context */ + /* Create main elasticsearch_config context */ ec = flb_calloc(1, sizeof(struct flb_elasticsearch_config)); if (!ec) { flb_errno(); @@ -258,7 +681,7 @@ int es_config_ha(const char *upstream_file, struct flb_elasticsearch *ctx, return -1; } - /* Read properties into elasticsearch_config context */ + /* Read properties into main elasticsearch_config context */ ret = config_set_properties(ec, ctx, config); if (ret != 0) { elasticsearch_config_destroy(ec); @@ -282,15 +705,61 @@ int es_config_ha(const char *upstream_file, struct flb_elasticsearch *ctx, } /* - * Iterate over upstreams nodes and link shared elasticsearch_config context - * with each node + * Iterate over upstreams nodes and create elasticsearch_config context + * for each node */ mk_list_foreach(head, &ctx->ha->nodes) { node = mk_list_entry(head, struct flb_upstream_node, _head); + /* Create elasticsearch_config context for the upstream node */ + node_ec = flb_calloc(1, sizeof(struct flb_elasticsearch_config)); + if (!node_ec) { + flb_errno(); + flb_plg_error(ctx->ins, "failed upstream node config allocation for %s node", + node->name); + ret = -1; + break; + } + + /* + * Fill elasticsearch_config context of the upstream node from: + * 1. main elasticsearch_config context + * 2. upstream node configuration section + */ + ret = config_set_node_properties(node, node_ec, ec, ctx, config); + if (ret != 0) { + flb_plg_error(ctx->ins, "failed upstream node configuration for %s node", + node->name); + elasticsearch_config_destroy(node_ec); + break; + } + + /* Register allocated elasticsearch_config context for later cleanup */ + mk_list_add(&node_ec->_head, &ctx->configs); + /* Set elasticsearch_config context into the node opaque data */ - flb_upstream_node_set_data(ec, node); + flb_upstream_node_set_data(node_ec, node); + } + + if (ret != 0) { + /* Nullify each upstream node elasticsearch_config context */ + mk_list_foreach(head, &ctx->ha->nodes) { + node = mk_list_entry(head, struct flb_upstream_node, _head); + flb_upstream_node_set_data(NULL, node); + } + + /* Cleanup elasticsearch_config contexts which were created */ + mk_list_foreach_safe(head, tmp, &ctx->configs) { + node_ec = mk_list_entry(head, struct flb_elasticsearch_config, _head); + mk_list_del(&node_ec->_head); + elasticsearch_config_destroy(node_ec); + } + + flb_upstream_ha_destroy(ctx->ha); + elasticsearch_config_destroy(ec); + return -1; } + /* Register allocated elasticsearch_config context for later cleanup */ mk_list_add(&ec->_head, &ctx->configs); return 0; @@ -380,7 +849,7 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, flb_output_set_context(ins, ctx); /* Configure HA or simple mode ? */ - upstream_file = flb_output_get_property("upstream", ins); + upstream_file = flb_output_get_property(FLB_ES_CONFIG_PROPERTY_UPSTREAM, ins); if (upstream_file) { ret = es_config_ha(upstream_file, ctx, config); } diff --git a/plugins/out_es/es_conf.h b/plugins/out_es/es_conf.h index 7524074c49a..a3f584a9811 100644 --- a/plugins/out_es/es_conf.h +++ b/plugins/out_es/es_conf.h @@ -24,6 +24,11 @@ #include "es.h" +#define FLB_ES_WRITE_OP_INDEX "index" +#define FLB_ES_WRITE_OP_CREATE "create" +#define FLB_ES_WRITE_OP_UPDATE "update" +#define FLB_ES_WRITE_OP_UPSERT "upsert" + struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, struct flb_config *config); diff --git a/plugins/out_es/es_conf_parse.c b/plugins/out_es/es_conf_parse.c index 498c18c0f94..862bce3c52c 100644 --- a/plugins/out_es/es_conf_parse.c +++ b/plugins/out_es/es_conf_parse.c @@ -52,9 +52,11 @@ int flb_es_conf_set_cloud_credentials(const char *cloud_auth, entry = mk_list_entry(head, struct flb_split_entry, _head); if (items == 1) { ec->cloud_user = flb_strdup(entry->value); + ec->own_cloud_user = FLB_TRUE; } if (items == 2) { ec->cloud_passwd = flb_strdup(entry->value); + ec->own_cloud_passwd = FLB_TRUE; } } flb_utils_split_free(toks); @@ -193,6 +195,7 @@ int flb_es_set_aws_unsigned_headers(struct flb_elasticsearch_config *ec) flb_errno(); return -1; } + ec->own_aws_unsigned_headers = FLB_TRUE; flb_slist_create(ec->aws_unsigned_headers); ret = flb_slist_add(ec->aws_unsigned_headers, "Content-Length"); @@ -239,6 +242,7 @@ static int set_aws_sts_provider(const char *aws_external_id, flb_free(aws_session_name); return -1; } + ec->own_aws_sts_tls = FLB_TRUE; ec->aws_provider = flb_sts_provider_create(config, ec->aws_sts_tls, @@ -250,6 +254,9 @@ static int set_aws_sts_provider(const char *aws_external_id, ec->aws_sts_endpoint, NULL, flb_aws_client_generator()); + ec->own_base_aws_provider = FLB_TRUE; + ec->own_aws_provider = FLB_TRUE; + /* Session name can be freed once provider is created */ flb_free(aws_session_name); @@ -270,6 +277,9 @@ int flb_es_conf_set_aws_provider(const char *aws_external_id, int ret; if (ec->has_aws_auth == FLB_FALSE) { + ec->aws_tls = NULL; + ec->aws_provider = NULL; + ec->base_aws_provider = NULL; return 0; } @@ -294,6 +304,7 @@ int flb_es_conf_set_aws_provider(const char *aws_external_id, flb_errno(); return -1; } + ec->own_aws_tls = FLB_TRUE; ec->aws_provider = flb_standard_chain_provider_create(config, ec->aws_tls, @@ -306,6 +317,7 @@ int flb_es_conf_set_aws_provider(const char *aws_external_id, flb_error("[out_es] Failed to create AWS Credential Provider"); return -1; } + ec->own_aws_provider = FLB_TRUE; ret = set_aws_sts_provider(aws_external_id, aws_role_arn, ec, ctx, config); if (ret != 0) { diff --git a/plugins/out_es/es_conf_prop.h b/plugins/out_es/es_conf_prop.h new file mode 100644 index 00000000000..97a05a22c85 --- /dev/null +++ b/plugins/out_es/es_conf_prop.h @@ -0,0 +1,64 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_ES_CONF_PROP_H +#define FLB_OUT_ES_CONF_PROP_H + +#define FLB_ES_CONFIG_PROPERTY_INDEX "index" +#define FLB_ES_CONFIG_PROPERTY_TYPE "type" +#define FLB_ES_CONFIG_PROPERTY_SUPPRESS_TYPE_NAME "suppress_type_name" +#define FLB_ES_CONFIG_PROPERTY_HTTP_USER "http_user" +#define FLB_ES_CONFIG_PROPERTY_HTTP_PASSWD "http_passwd" +#define FLB_ES_CONFIG_PROPERTY_COMPRESS "compress" +#define FLB_ES_CONFIG_PROPERTY_CLOUD_ID "cloud_id" +#define FLB_ES_CONFIG_PROPERTY_CLOUD_AUTH "cloud_auth" + +#ifdef FLB_HAVE_AWS +#define FLB_ES_CONFIG_PROPERTY_AWS_AUTH "aws_auth" +#define FLB_ES_CONFIG_PROPERTY_AWS_REGION "aws_region" +#define FLB_ES_CONFIG_PROPERTY_AWS_STS_ENDPOINT "aws_sts_endpoint" +#define FLB_ES_CONFIG_PROPERTY_AWS_ROLE_ARN "aws_role_arn" +#define FLB_ES_CONFIG_PROPERTY_AWS_EXTERNAL_ID "aws_external_id" +#define FLB_ES_CONFIG_PROPERTY_AWS_SERVICE_NAME "aws_service_name" +#define FLB_ES_CONFIG_PROPERTY_AWS_PROFILE "aws_profile" +#endif + +#define FLB_ES_CONFIG_PROPERTY_LOGSTASH_FORMAT "logstash_format" +#define FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX "logstash_prefix" +#define FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX_SEPARATOR "logstash_prefix_separator" +#define FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX_KEY "logstash_prefix_key" +#define FLB_ES_CONFIG_PROPERTY_LOGSTASH_DATEFORMAT "logstash_dateformat" +#define FLB_ES_CONFIG_PROPERTY_TIME_KEY "time_key" +#define FLB_ES_CONFIG_PROPERTY_TIME_KEY_FORMAT "time_key_format" +#define FLB_ES_CONFIG_PROPERTY_TIME_KEY_NANOS "time_key_nanos" +#define FLB_ES_CONFIG_PROPERTY_INCLUDE_TAG_KEY "include_tag_key" +#define FLB_ES_CONFIG_PROPERTY_TAG_KEY "tag_key" +#define FLB_ES_CONFIG_PROPERTY_BUFFER_SIZE "buffer_size" +#define FLB_ES_CONFIG_PROPERTY_PATH "path" +#define FLB_ES_CONFIG_PROPERTY_PIPELINE "pipeline" +#define FLB_ES_CONFIG_PROPERTY_GENERATE_ID "generate_id" +#define FLB_ES_CONFIG_PROPERTY_WRITE_OPERATION "write_operation" +#define FLB_ES_CONFIG_PROPERTY_ID_KEY "id_key" +#define FLB_ES_CONFIG_PROPERTY_REPLACE_DOTS "replace_dots" +#define FLB_ES_CONFIG_PROPERTY_CURRENT_TIME_INDEX "current_time_index" +#define FLB_ES_CONFIG_PROPERTY_TRACE_OUTPUT "trace_output" +#define FLB_ES_CONFIG_PROPERTY_TRACE_ERROR "trace_error" +#define FLB_ES_CONFIG_PROPERTY_UPSTREAM "upstream" + +#endif