diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c
index 5ac72b0c306..d87ed7800ab 100644
--- a/plugins/out_es/es.c
+++ b/plugins/out_es/es.c
@@ -35,6 +35,7 @@
 
 #include "es.h"
 #include "es_conf.h"
+#include "es_conf_prop.h"
 #include "es_bulk.h"
 #include "murmur3.h"
 
@@ -1117,48 +1118,48 @@ static int cb_es_exit(void *data, struct flb_config *config)
 /* Configuration properties map */
 static struct flb_config_map config_map[] = {
     {
-     FLB_CONFIG_MAP_STR, "index", FLB_ES_DEFAULT_INDEX,
+     FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_INDEX, FLB_ES_DEFAULT_INDEX,
      0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, index),
      "Set an index name"
     },
     {
-     FLB_CONFIG_MAP_STR, "type", FLB_ES_DEFAULT_TYPE,
+     FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_TYPE, FLB_ES_DEFAULT_TYPE,
      0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, type),
      "Set the document type property"
     },
     {
-     FLB_CONFIG_MAP_BOOL, "suppress_type_name", "false",
+     FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_SUPPRESS_TYPE_NAME, "false",
      0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, suppress_type_name),
      "If true, mapping types is removed. (for v7.0.0 or later)"
     },
 
     /* HTTP Authentication */
     {
-     FLB_CONFIG_MAP_STR, "http_user", NULL,
+     FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_HTTP_USER, NULL,
      0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, http_user),
      "Optional username credential for Elastic X-Pack access"
     },
     {
-     FLB_CONFIG_MAP_STR, "http_passwd", "",
+     FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_HTTP_PASSWD, "",
      0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, http_passwd),
      "Password for user defined in HTTP_User"
     },
 
     /* HTTP Compression */
     {
-     FLB_CONFIG_MAP_STR, "compress", NULL,
+     FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_COMPRESS, NULL,
      0, FLB_FALSE, 0,
      "Set payload compression mechanism. Option available is 'gzip'"
     },
 
     /* Cloud Authentication */
     {
-     FLB_CONFIG_MAP_STR, "cloud_id", NULL,
+     FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_CLOUD_ID, NULL,
      0, FLB_FALSE, 0,
      "Elastic cloud ID of the cluster to connect to"
     },
     {
-     FLB_CONFIG_MAP_STR, "cloud_auth", NULL,
+     FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_CLOUD_AUTH, NULL,
      0, FLB_FALSE, 0,
      "Elastic cloud authentication credentials"
     },
@@ -1166,37 +1167,37 @@ static struct flb_config_map config_map[] = {
     /* AWS Authentication */
 #ifdef FLB_HAVE_AWS
     {
-     FLB_CONFIG_MAP_BOOL, "aws_auth", "false",
+     FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_AWS_AUTH, "false",
      0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, has_aws_auth),
      "Enable AWS Sigv4 Authentication"
     },
     {
-     FLB_CONFIG_MAP_STR, "aws_region", NULL,
+     FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_REGION, NULL,
      0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, aws_region),
      "AWS Region of your Amazon OpenSearch Service cluster"
     },
     {
-     FLB_CONFIG_MAP_STR, "aws_sts_endpoint", NULL,
+     FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_STS_ENDPOINT, NULL,
      0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, aws_sts_endpoint),
      "Custom endpoint for the AWS STS API, used with the AWS_Role_ARN option"
     },
     {
-     FLB_CONFIG_MAP_STR, "aws_role_arn", NULL,
+     FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_ROLE_ARN, NULL,
      0, FLB_FALSE, 0,
      "AWS IAM Role to assume to put records to your Amazon OpenSearch cluster"
     },
     {
-     FLB_CONFIG_MAP_STR, "aws_external_id", NULL,
+     FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_EXTERNAL_ID, NULL,
      0, FLB_FALSE, 0,
      "External ID for the AWS IAM Role specified with `aws_role_arn`"
     },
     {
-     FLB_CONFIG_MAP_STR, "aws_service_name", "es",
+     FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_SERVICE_NAME, "es",
      0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, aws_service_name),
      "AWS Service Name"
     },
     {
-     FLB_CONFIG_MAP_STR, "aws_profile", NULL,
+     FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_PROFILE, NULL,
      0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, aws_profile),
      "AWS Profile name. AWS Profiles can be configured with AWS CLI and are usually stored in "
      "$HOME/.aws/ directory."
@@ -1205,12 +1206,12 @@ static struct flb_config_map config_map[] = {
 
     /* Logstash compatibility */
     {
-     FLB_CONFIG_MAP_BOOL, "logstash_format", "false",
+     FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_LOGSTASH_FORMAT, "false",
      0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_format),
      "Enable Logstash format compatibility"
     },
     {
-     FLB_CONFIG_MAP_STR, "logstash_prefix", FLB_ES_DEFAULT_PREFIX,
+     FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX, FLB_ES_DEFAULT_PREFIX,
      0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_prefix),
      "When Logstash_Format is enabled, the Index name is composed using a prefix "
      "and the date, e.g: If Logstash_Prefix is equals to 'mydata' your index will "
@@ -1218,12 +1219,12 @@ static struct flb_config_map config_map[] = {
      "when the data is being generated"
     },
     {
-     FLB_CONFIG_MAP_STR, "logstash_prefix_separator", "-",
+     FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX_SEPARATOR, "-",
      0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_prefix_separator),
      "Set a separator between logstash_prefix and date."
     },
     {
-     FLB_CONFIG_MAP_STR, "logstash_prefix_key", NULL,
+     FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX_KEY, NULL,
      0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_prefix_key),
      "When included: the value in the record that belongs to the key will be looked "
      "up and over-write the Logstash_Prefix for index generation. If the key/value "
@@ -1231,42 +1232,42 @@ static struct flb_config_map config_map[] = {
      "fallback. Nested keys are supported through record accessor pattern"
     },
     {
-     FLB_CONFIG_MAP_STR, "logstash_dateformat", FLB_ES_DEFAULT_TIME_FMT,
+     FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_LOGSTASH_DATEFORMAT, FLB_ES_DEFAULT_TIME_FMT,
      0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_dateformat),
      "Time format (based on strftime) to generate the second part of the Index name"
     },
 
     /* Custom Time and Tag keys */
     {
-     FLB_CONFIG_MAP_STR, "time_key", FLB_ES_DEFAULT_TIME_KEY,
+     FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_TIME_KEY, FLB_ES_DEFAULT_TIME_KEY,
      0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, time_key),
      "When Logstash_Format is enabled, each record will get a new timestamp field. "
      "The Time_Key property defines the name of that field"
     },
     {
-     FLB_CONFIG_MAP_STR, "time_key_format", FLB_ES_DEFAULT_TIME_KEYF,
+     FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_TIME_KEY_FORMAT, FLB_ES_DEFAULT_TIME_KEYF,
      0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, time_key_format),
      "When Logstash_Format is enabled, this property defines the format of the "
      "timestamp"
     },
     {
-     FLB_CONFIG_MAP_BOOL, "time_key_nanos", "false",
+     FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_TIME_KEY_NANOS, "false",
      0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, time_key_nanos),
      "When Logstash_Format is enabled, enabling this property sends nanosecond "
      "precision timestamps"
     },
     {
-     FLB_CONFIG_MAP_BOOL, "include_tag_key", "false",
+     FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_INCLUDE_TAG_KEY, "false",
      0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, include_tag_key),
      "When enabled, it append the Tag name to the record"
     },
     {
-     FLB_CONFIG_MAP_STR, "tag_key", FLB_ES_DEFAULT_TAG_KEY,
+     FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_TAG_KEY, FLB_ES_DEFAULT_TAG_KEY,
      0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, tag_key),
      "When Include_Tag_Key is enabled, this property defines the key name for the tag"
     },
     {
-     FLB_CONFIG_MAP_SIZE, "buffer_size", FLB_ES_DEFAULT_HTTP_MAX,
+     FLB_CONFIG_MAP_SIZE, FLB_ES_CONFIG_PROPERTY_BUFFER_SIZE, FLB_ES_DEFAULT_HTTP_MAX,
      0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, buffer_size),
      "Specify the buffer size used to read the response from the Elasticsearch HTTP "
      "service. This option is useful for debugging purposes where is required to read "
@@ -1277,7 +1278,7 @@ static struct flb_config_map config_map[] = {
 
     /* Elasticsearch specifics */
     {
-     FLB_CONFIG_MAP_STR, "path", NULL,
+     FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_PATH, NULL,
      0, FLB_FALSE, 0,
      "Elasticsearch accepts new data on HTTP query path '/_bulk'. But it is also "
      "possible to serve Elasticsearch behind a reverse proxy on a subpath. This "
@@ -1285,7 +1286,7 @@ static struct flb_config_map config_map[] = {
      "prefix in the indexing HTTP POST URI"
     },
     {
-     FLB_CONFIG_MAP_STR, "pipeline", NULL,
+     FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_PIPELINE, NULL,
      0, FLB_FALSE, 0,
      "Newer versions of Elasticsearch allows to setup filters called pipelines. "
      "This option allows to define which pipeline the database should use. For "
@@ -1293,48 +1294,48 @@ static struct flb_config_map config_map[] = {
      "Fluent Bit side, avoid pipelines"
     },
     {
-     FLB_CONFIG_MAP_BOOL, "generate_id", "false",
+     FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_GENERATE_ID, "false",
      0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, generate_id),
      "When enabled, generate _id for outgoing records. This prevents duplicate "
      "records when retrying ES"
     },
     {
-     FLB_CONFIG_MAP_STR, "write_operation", "create",
+     FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_WRITE_OPERATION, "create",
      0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, write_operation),
      "Operation to use to write in bulk requests"
     },
     {
-     FLB_CONFIG_MAP_STR, "id_key", NULL,
+     FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_ID_KEY, NULL,
      0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, id_key),
      "If set, _id will be the value of the key from incoming record."
     },
     {
-     FLB_CONFIG_MAP_BOOL, "replace_dots", "false",
+     FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_REPLACE_DOTS, "false",
      0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, replace_dots),
      "When enabled, replace field name dots with underscore, required by Elasticsearch "
      "2.0-2.3."
     },
 
     {
-     FLB_CONFIG_MAP_BOOL, "current_time_index", "false",
+     FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_CURRENT_TIME_INDEX, "false",
      0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, current_time_index),
      "Use current time for index generation instead of message record"
     },
 
     /* Trace */
     {
-     FLB_CONFIG_MAP_BOOL, "trace_output", "false",
+     FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_TRACE_OUTPUT, "false",
      0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, trace_output),
      "When enabled print the Elasticsearch API calls to stdout (for diag only)"
     },
     {
-     FLB_CONFIG_MAP_BOOL, "trace_error", "false",
+     FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_TRACE_ERROR, "false",
      0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, trace_error),
      "When enabled print the Elasticsearch exception to stderr (for diag only)"
     },
 
     {
-     FLB_CONFIG_MAP_STR, "upstream", NULL,
+     FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_UPSTREAM, NULL,
      0, FLB_FALSE, 0,
      "Path to 'upstream' configuration file (define multiple nodes)"
     },
diff --git a/plugins/out_es/es.h b/plugins/out_es/es.h
index 45473d927b8..34546212cf2 100644
--- a/plugins/out_es/es.h
+++ b/plugins/out_es/es.h
@@ -34,10 +34,6 @@
 #define FLB_ES_DEFAULT_TAG_KEY    "flb-key"
 #define FLB_ES_DEFAULT_HTTP_MAX   "512k"
 #define FLB_ES_DEFAULT_HTTPS_PORT 443
-#define FLB_ES_WRITE_OP_INDEX     "index"
-#define FLB_ES_WRITE_OP_CREATE    "create"
-#define FLB_ES_WRITE_OP_UPDATE    "update"
-#define FLB_ES_WRITE_OP_UPSERT    "upsert"
 
 #define FLB_ES_STATUS_SUCCESS          (1 << 0)
 #define FLB_ES_STATUS_IMCOMPLETE       (1 << 1)
@@ -62,7 +58,9 @@ struct flb_elasticsearch_config {
 
     /* Elastic Cloud Auth */
     char *cloud_user;
+    int own_cloud_user;
     char *cloud_passwd;
+    int own_cloud_passwd;
 
     /* AWS Auth */
 #ifdef FLB_HAVE_AWS
@@ -71,12 +69,17 @@ struct flb_elasticsearch_config {
     char *aws_sts_endpoint;
     char *aws_profile;
     struct flb_aws_provider *aws_provider;
+    int own_aws_provider;
     struct flb_aws_provider *base_aws_provider;
+    int own_base_aws_provider;
     /* tls instances can't be re-used; aws provider requires a separate one */
     struct flb_tls *aws_tls;
+    int own_aws_tls;
     struct flb_tls *aws_sts_tls;
+    int own_aws_sts_tls;
     char *aws_service_name;
     struct mk_list *aws_unsigned_headers;
+    int own_aws_unsigned_headers;
 #endif
 
     /* HTTP Client Setup */
@@ -103,40 +106,51 @@ struct flb_elasticsearch_config {
 
     /* prefix */
     flb_sds_t logstash_prefix;
+    int own_logstash_prefix;
     flb_sds_t logstash_prefix_separator;
+    int own_logstash_prefix_separator;
 
     /* prefix key */
     flb_sds_t logstash_prefix_key;
+    int own_logstash_prefix_key;
 
     /* date format */
     flb_sds_t logstash_dateformat;
+    int own_logstash_dateformat;
 
     /* time key */
     flb_sds_t time_key;
+    int own_time_key;
 
     /* time key format */
     flb_sds_t time_key_format;
+    int own_time_key_format;
 
     /* time key nanoseconds */
     int time_key_nanos;
 
     /* write operation */
     flb_sds_t write_operation;
+    int own_write_operation;
     /* write operation elasticsearch operation */
-    flb_sds_t es_action;
+    const char *es_action;
 
     /* id_key */
     flb_sds_t id_key;
+    int own_id_key;
     struct flb_record_accessor *ra_id_key;
+    int own_ra_id_key;
 
     /* include_tag_key */
     int include_tag_key;
     flb_sds_t tag_key;
+    int own_tag_key;
 
     /* Elasticsearch HTTP API */
     char uri[256];
 
     struct flb_record_accessor *ra_prefix_key;
+    int own_ra_prefix_key;
 
     /* Compression mode (gzip) */
     int compress_gzip;
diff --git a/plugins/out_es/es_conf.c b/plugins/out_es/es_conf.c
index 4ff27a5cfbd..293df5e1e9a 100644
--- a/plugins/out_es/es_conf.c
+++ b/plugins/out_es/es_conf.c
@@ -21,23 +21,167 @@
 #include <fluent-bit/flb_config.h>
 #include <fluent-bit/flb_output_plugin.h>
 #include <fluent-bit/flb_mem.h>
+#include <fluent-bit/flb_utils.h>
 #include <fluent-bit/flb_http_client.h>
 #include <fluent-bit/flb_record_accessor.h>
 #include <fluent-bit/flb_signv4.h>
 
-#include "es_conf.h"
-#include "es_conf_parse.h"
 #include "es.h"
+#include "es_conf_parse.h"
+#include "es_conf_prop.h"
+#include "es_conf.h"
+
+static const char * const es_default_path    = "";
+static const char * const es_write_op_index  = FLB_ES_WRITE_OP_INDEX;
+static const char * const es_write_op_create = FLB_ES_WRITE_OP_CREATE;
+static const char * const es_write_op_update = FLB_ES_WRITE_OP_UPDATE;
+static const char * const es_write_op_upsert = FLB_ES_WRITE_OP_UPSERT;
+
+static int config_set_ra_id_key(flb_sds_t id_key, struct flb_elasticsearch_config *ec,
+                                struct flb_elasticsearch *ctx)
+{
+    if (!id_key) {
+        return 0;
+    }
+
+    ec->ra_id_key = flb_ra_create(id_key, FLB_FALSE);
+    if (ec->ra_id_key == NULL) {
+        flb_plg_error(ctx->ins, "could not create record accessor for Id Key");
+        return -1;
+    }
+    ec->own_ra_id_key = FLB_TRUE;
+
+    if (ec->generate_id == FLB_TRUE) {
+        flb_plg_warn(ctx->ins, "Generate_ID is ignored when ID_key is set");
+        ec->generate_id = FLB_FALSE;
+    }
+
+    return 0;
+}
+
+static int config_set_es_action(const char *write_operation,
+                                const struct flb_record_accessor *ra_id_key,
+                                int generate_id,
+                                struct flb_elasticsearch_config *ec,
+                                struct flb_elasticsearch *ctx)
+{
+    if (!write_operation) {
+        return 0;
+    }
+
+    if (strcasecmp(write_operation, es_write_op_index) == 0) {
+        ec->es_action = es_write_op_index;
+    }
+    else if (strcasecmp(write_operation, es_write_op_create) == 0) {
+        ec->es_action = es_write_op_create;
+    }
+    else if (strcasecmp(write_operation, es_write_op_update) == 0
+             || strcasecmp(write_operation, es_write_op_upsert) == 0) {
+        ec->es_action = es_write_op_update;
+    }
+    else {
+        flb_plg_error(ctx->ins,
+                      "wrong Write_Operation (should be one of index, create, update, upsert)");
+        return -1;
+    }
+
+    if (strcasecmp(ec->es_action, es_write_op_update) == 0
+        && !ra_id_key
+        && generate_id == FLB_FALSE) {
+        flb_plg_error(ctx->ins,
+                      "Id_Key or Generate_Id must be set when Write_Operation update or upsert");
+        return -1;
+    }
+
+    return 0;
+}
+
+static size_t config_adjust_buffer_size(size_t buffer_size)
+{
+    /* HTTP Payload (response) maximum buffer size (0 == unlimited) */
+    if (buffer_size == -1) {
+        return 0;
+    }
+    return buffer_size;
+}
+
+static int config_is_compressed_gzip(const char *compress)
+{
+    if (strcasecmp(compress, "gzip") == 0) {
+        return FLB_TRUE;
+    }
+    return FLB_FALSE;
+}
+
+static int config_set_pipeline(const char *path, const char *pipeline,
+                               struct flb_elasticsearch_config *ec)
+{
+    int ret;
+
+    if (!path) {
+        path = es_default_path;
+    }
+
+    if (pipeline && flb_str_emptyval(pipeline) != FLB_TRUE) {
+        ret = snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk/?pipeline=%s", path,
+                       pipeline);
+    }
+    else {
+        ret = snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk", path);
+    }
+
+    if (ret < 0 || ret >= sizeof(ec->uri)) {
+        return -1;
+    }
+    return 0;
+}
+
+static int config_set_ra_prefix_key(flb_sds_t logstash_prefix_key,
+                                    struct flb_elasticsearch_config *ec,
+                                    struct flb_elasticsearch *ctx)
+{
+    size_t len;
+    char *buf;
+
+    if (!logstash_prefix_key) {
+        return 0;
+    }
+
+    if (logstash_prefix_key[0] != '$') {
+        len = flb_sds_len(logstash_prefix_key);
+        buf = flb_malloc(len + 2);
+        if (!buf) {
+            flb_errno();
+            return -1;
+        }
+        buf[0] = '$';
+        memcpy(buf + 1, logstash_prefix_key, len);
+        buf[len + 1] = '\0';
+
+        ec->ra_prefix_key = flb_ra_create(buf, FLB_TRUE);
+        ec->own_ra_prefix_key = FLB_TRUE;
+        flb_free(buf);
+    }
+    else {
+        ec->ra_prefix_key = flb_ra_create(logstash_prefix_key, FLB_TRUE);
+        ec->own_ra_prefix_key = FLB_TRUE;
+    }
+
+    if (!ec->ra_prefix_key) {
+        flb_plg_error(ctx->ins, "invalid logstash_prefix_key pattern '%s'",
+                      logstash_prefix_key);
+        return -1;
+    }
+
+    return 0;
+}
 
 static int config_set_properties(struct flb_elasticsearch_config *ec,
                                  struct flb_elasticsearch *ctx,
                                  struct flb_config *config)
 {
-    size_t len;
-    ssize_t ret;
-    char *buf;
+    int ret;
     const char *tmp;
-    const char *path;
     struct flb_uri *uri = ctx->ins->host.uri;
     struct flb_uri_field *f_index = NULL;
     struct flb_uri_field *f_type = NULL;
@@ -45,14 +189,13 @@ static int config_set_properties(struct flb_elasticsearch_config *ec,
     if (uri) {
         if (uri->count >= 2) {
             f_index = flb_uri_get(uri, 0);
-            f_type  = flb_uri_get(uri, 1);
+            f_type = flb_uri_get(uri, 1);
         }
     }
 
     /* handle cloud_id */
-    ret = flb_es_conf_set_cloud_auth(flb_output_get_property("cloud_id",
-                                                             ctx->ins),
-                                     ctx);
+    ret = flb_es_conf_set_cloud_auth(
+            flb_output_get_property(FLB_ES_CONFIG_PROPERTY_CLOUD_ID, ctx->ins), ctx);
     if (ret != 0) {
         flb_plg_error(ctx->ins, "cannot configure cloud_id");
         return -1;
@@ -65,21 +208,21 @@ static int config_set_properties(struct flb_elasticsearch_config *ec,
         return -1;
     }
 
+    ec->buffer_size = config_adjust_buffer_size(ec->buffer_size);
+
     /* handle cloud_auth */
     ret = flb_es_conf_set_cloud_credentials(
-            flb_output_get_property("cloud_auth", ctx->ins), ec);
+            flb_output_get_property(FLB_ES_CONFIG_PROPERTY_CLOUD_AUTH, ctx->ins), ec);
     if (ret != 0) {
         flb_plg_error(ctx->ins, "cannot configure cloud_auth");
         return -1;
     }
 
     /* Compress (gzip) */
-    tmp = flb_output_get_property("compress", ctx->ins);
+    tmp = flb_output_get_property(FLB_ES_CONFIG_PROPERTY_COMPRESS, ctx->ins);
     ec->compress_gzip = FLB_FALSE;
     if (tmp) {
-        if (strcasecmp(tmp, "gzip") == 0) {
-            ec->compress_gzip = FLB_TRUE;
-        }
+        ec->compress_gzip = config_is_compressed_gzip(tmp);
     }
 
     /* Set manual Index and Type */
@@ -93,98 +236,339 @@ static int config_set_properties(struct flb_elasticsearch_config *ec,
         ec->own_type = FLB_TRUE;
     }
 
-    /* HTTP Payload (response) maximum buffer size (0 == unlimited) */
-    if (ec->buffer_size == -1) {
-        ec->buffer_size = 0;
+    /* Elasticsearch: path and pipeline */
+    ret = config_set_pipeline(
+            flb_output_get_property(FLB_ES_CONFIG_PROPERTY_PATH, ctx->ins),
+            flb_output_get_property(FLB_ES_CONFIG_PROPERTY_PIPELINE, ctx->ins),
+            ec);
+    if (ret != 0) {
+        flb_plg_error(ctx->ins, "cannot configure path and/or pipeline");
+        return -1;
     }
 
-    /* Elasticsearch: Path */
-    path = flb_output_get_property("path", ctx->ins);
-    if (!path) {
-        path = "";
+    ret = config_set_ra_id_key(ec->id_key, ec, ctx);
+    if (ret != 0) {
+        return -1;
+    }
+
+    ret = config_set_es_action(ec->write_operation, ec->ra_id_key, ec->generate_id, ec,
+                               ctx);
+    if (ret != 0) {
+        return -1;
+    }
+
+    ret = config_set_ra_prefix_key(ec->logstash_prefix_key, ec, ctx);
+    if (ret != 0) {
+        return -1;
+    }
+
+#ifdef FLB_HAVE_AWS
+    ret = flb_es_set_aws_unsigned_headers(ec);
+    if (ret != 0) {
+        flb_plg_error(ctx->ins, "cannot configure AWS unsigned headers");
+        return -1;
     }
 
-    /* Elasticsearch: Pipeline */
-    tmp = flb_output_get_property("pipeline", ctx->ins);
+    ret = flb_es_conf_set_aws_provider(
+            flb_output_get_property(FLB_ES_CONFIG_PROPERTY_AWS_EXTERNAL_ID, ctx->ins),
+            flb_output_get_property(FLB_ES_CONFIG_PROPERTY_AWS_ROLE_ARN, ctx->ins),
+            ec, ctx, config);
+    if (ret != 0) {
+        flb_plg_error(ctx->ins, "cannot configure AWS authentication");
+        return -1;
+    }
+#endif
+
+    return 0;
+}
+
+static int config_set_node_properties(struct flb_upstream_node *node,
+                                      struct flb_elasticsearch_config *ec,
+                                      struct flb_elasticsearch_config *base,
+                                      struct flb_elasticsearch *ctx,
+                                      struct flb_config *config)
+{
+    const char *tmp;
+    int ret;
+    const char *path;
+
+#ifdef FLB_HAVE_AWS
+    const char *aws_external_id = NULL;
+    const char *aws_role_arn = NULL;
+    int aws_provider_node = FLB_FALSE;
+#endif
+
+    /* Copy base configuration */
+    *ec = *base;
+    ec->own_index = FLB_FALSE;
+    ec->own_type = FLB_FALSE;
+    ec->own_cloud_user = FLB_FALSE;
+    ec->own_cloud_passwd = FLB_FALSE;
+
+#ifdef FLB_HAVE_AWS
+    ec->own_base_aws_provider = FLB_FALSE;
+    ec->own_aws_provider = FLB_FALSE;
+    ec->own_aws_tls = FLB_FALSE;
+    ec->own_aws_sts_tls = FLB_FALSE;
+    ec->own_aws_unsigned_headers = FLB_FALSE;
+#endif
+
+    ec->own_logstash_prefix = FLB_FALSE;
+    ec->own_logstash_prefix_separator = FLB_FALSE;
+    ec->own_logstash_prefix_key = FLB_FALSE;
+    ec->own_logstash_dateformat = FLB_FALSE;
+    ec->own_time_key = FLB_FALSE;
+    ec->own_time_key_format = FLB_FALSE;
+    ec->own_write_operation = FLB_FALSE;
+    ec->own_id_key = FLB_FALSE;
+    ec->own_ra_id_key = FLB_FALSE;
+    ec->own_ra_prefix_key = FLB_FALSE;
+    ec->own_tag_key = FLB_FALSE;
+    mk_list_entry_init(&ec->_head);
+
+    /* Overwrite configuration from upstream node properties */
+
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_INDEX, node);
     if (tmp) {
-        snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk/?pipeline=%s", path, tmp);
+        ec->index = (char *)tmp;
     }
-    else {
-        snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk", path);
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_TYPE, node);
+    if (tmp) {
+        ec->type = (char *)tmp;
+    }
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_SUPPRESS_TYPE_NAME, node);
+    if (tmp) {
+        ec->suppress_type_name = flb_utils_bool(tmp);
+    }
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_HTTP_USER, node);
+    if (tmp) {
+        ec->http_user = (char *)tmp;
+    }
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_HTTP_PASSWD, node);
+    if (tmp) {
+        ec->http_passwd = (char *)tmp;
+    }
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_GENERATE_ID, node);
+    if (tmp) {
+        ec->generate_id = flb_utils_bool(tmp);
     }
 
-    if (ec->id_key) {
-        ec->ra_id_key = flb_ra_create(ec->id_key, FLB_FALSE);
-        if (ec->ra_id_key == NULL) {
-            flb_plg_error(ctx->ins, "could not create record accessor for Id Key");
+#ifdef FLB_HAVE_AWS
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_AWS_AUTH, node);
+    if (tmp) {
+        ec->has_aws_auth = flb_utils_bool(tmp);
+    }
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_AWS_REGION, node);
+    if (tmp) {
+        ec->aws_region = (char *)tmp;
+        aws_provider_node = FLB_TRUE;
+    }
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_AWS_STS_ENDPOINT, node);
+    if (tmp) {
+        ec->aws_sts_endpoint = (char *)tmp;
+        aws_provider_node = FLB_TRUE;
+    }
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_AWS_SERVICE_NAME, node);
+    if (tmp) {
+        ec->aws_service_name = (char *)tmp;
+    }
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_AWS_PROFILE, node);
+    if (tmp) {
+        ec->aws_profile = (char *)tmp;
+        aws_provider_node = FLB_TRUE;
+    }
+    if (ec->has_aws_auth) {
+        tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_AWS_EXTERNAL_ID,
+                                             node);
+        if (tmp) {
+            aws_external_id = tmp;
+            aws_provider_node = FLB_TRUE;
         }
-        if (ec->generate_id == FLB_TRUE) {
-            flb_plg_warn(ctx->ins, "Generate_ID is ignored when ID_key is set");
-            ec->generate_id = FLB_FALSE;
+        else {
+            aws_external_id = flb_output_get_property(
+                    FLB_ES_CONFIG_PROPERTY_AWS_EXTERNAL_ID, ctx->ins);
+        }
+        tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_AWS_ROLE_ARN, node);
+        if (tmp) {
+            aws_role_arn = tmp;
+            aws_provider_node = FLB_TRUE;
+        }
+        else {
+            aws_role_arn = flb_output_get_property(FLB_ES_CONFIG_PROPERTY_AWS_ROLE_ARN,
+                                                   ctx->ins);
         }
     }
+#endif
 
-    if (ec->write_operation) {
-        if (strcasecmp(ec->write_operation, FLB_ES_WRITE_OP_INDEX) == 0) {
-            ec->es_action = flb_strdup(FLB_ES_WRITE_OP_INDEX);
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_LOGSTASH_FORMAT, node);
+    if (tmp) {
+        ec->logstash_format = flb_utils_bool(tmp);
+    }
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX, node);
+    if (tmp) {
+        ec->logstash_prefix = flb_sds_create(tmp);
+        if (ec->logstash_prefix == NULL) {
+            return -1;
+        }
+        ec->own_logstash_prefix = FLB_TRUE;
+    }
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX_SEPARATOR,
+                                         node);
+    if (tmp) {
+        ec->logstash_prefix_separator = flb_sds_create(tmp);
+        if (ec->logstash_prefix_separator == NULL) {
+            return -1;
         }
-        else if (strcasecmp(ec->write_operation, FLB_ES_WRITE_OP_CREATE) == 0) {
-            ec->es_action = flb_strdup(FLB_ES_WRITE_OP_CREATE);
+        ec->own_logstash_prefix_separator = FLB_TRUE;
+    }
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_LOGSTASH_DATEFORMAT,
+                                         node);
+    if (tmp) {
+        ec->logstash_dateformat = flb_sds_create(tmp);
+        if (ec->logstash_dateformat == NULL) {
+            return -1;
         }
-        else if (strcasecmp(ec->write_operation, FLB_ES_WRITE_OP_UPDATE) == 0
-            || strcasecmp(ec->write_operation, FLB_ES_WRITE_OP_UPSERT) == 0) {
-            ec->es_action = flb_strdup(FLB_ES_WRITE_OP_UPDATE);
+        ec->own_logstash_dateformat = FLB_TRUE;
+    }
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_TIME_KEY, node);
+    if (tmp) {
+        ec->time_key = flb_sds_create(tmp);
+        if (ec->time_key == NULL) {
+            return -1;
         }
-        else {
-            flb_plg_error(ctx->ins, "wrong Write_Operation (should be one of index, create, update, upsert)");
+        ec->own_time_key = FLB_TRUE;
+    }
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_TIME_KEY_FORMAT, node);
+    if (tmp) {
+        ec->time_key_format = flb_sds_create(tmp);
+        if (ec->time_key_format == NULL) {
             return -1;
         }
-        if (strcasecmp(ec->es_action, FLB_ES_WRITE_OP_UPDATE) == 0
-            && !ec->ra_id_key && ec->generate_id == FLB_FALSE) {
-            flb_plg_error(ctx->ins, "Id_Key or Generate_Id must be set when Write_Operation update or upsert");
+        ec->own_time_key_format = FLB_TRUE;
+    }
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_TIME_KEY_NANOS, node);
+    if (tmp) {
+        ec->time_key_nanos = flb_utils_bool(tmp);
+    }
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_INCLUDE_TAG_KEY, node);
+    if (tmp) {
+        ec->include_tag_key = flb_utils_bool(tmp);
+    }
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_TAG_KEY, node);
+    if (tmp) {
+        ec->tag_key = flb_sds_create(tmp);
+        if (ec->tag_key == NULL) {
             return -1;
         }
+        ec->own_tag_key = FLB_TRUE;
+    }
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_BUFFER_SIZE, node);
+    if (tmp) {
+        ec->buffer_size = config_adjust_buffer_size(flb_utils_size_to_bytes(tmp));
+    }
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_REPLACE_DOTS, node);
+    if (tmp) {
+        ec->replace_dots = flb_utils_bool(tmp);
+    }
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_CURRENT_TIME_INDEX, node);
+    if (tmp) {
+        ec->current_time_index = flb_utils_bool(tmp);
+    }
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_TRACE_OUTPUT, node);
+    if (tmp) {
+        ec->trace_output = flb_utils_bool(tmp);
+    }
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_TRACE_ERROR, node);
+    if (tmp) {
+        ec->trace_error = flb_utils_bool(tmp);
     }
 
-    if (ec->logstash_prefix_key) {
-        if (ec->logstash_prefix_key[0] != '$') {
-            len = flb_sds_len(ec->logstash_prefix_key);
-            buf = flb_malloc(len + 2);
-            if (!buf) {
-                flb_errno();
-                return -1;
-            }
-            buf[0] = '$';
-            memcpy(buf + 1, ec->logstash_prefix_key, len);
-            buf[len + 1] = '\0';
-
-            ec->ra_prefix_key = flb_ra_create(buf, FLB_TRUE);
-            flb_free(buf);
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX_KEY,
+                                         node);
+    if (tmp) {
+        ec->logstash_prefix_key = flb_sds_create(tmp);
+        if (ec->logstash_prefix_key == NULL) {
+            return -1;
         }
-        else {
-            ec->ra_prefix_key = flb_ra_create(ec->logstash_prefix_key, FLB_TRUE);
+        ec->own_logstash_prefix_key = FLB_TRUE;
+        ret = config_set_ra_prefix_key(ec->logstash_prefix_key, ec, ctx);
+        if (ret != 0) {
+            return -1;
         }
+    }
 
-        if (!ec->ra_prefix_key) {
-            flb_plg_error(ctx->ins, "invalid logstash_prefix_key pattern '%s'", tmp);
+    /* handle cloud_auth */
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_CLOUD_AUTH, node);
+    if (tmp) {
+        ret = flb_es_conf_set_cloud_credentials(tmp, ec);
+        if (ret != 0) {
+            flb_plg_error(ctx->ins, "cannot configure cloud_auth");
             return -1;
         }
     }
 
-#ifdef FLB_HAVE_AWS
-    ret = flb_es_set_aws_unsigned_headers(ec);
-    if (ret != 0) {
-        flb_plg_error(ctx->ins, "cannot configure AWS unsigned headers");
-        return -1;
+    /* Compress (gzip) */
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_COMPRESS, node);
+    if (tmp) {
+        ec->compress_gzip = config_is_compressed_gzip(tmp);
     }
 
-    ret = flb_es_conf_set_aws_provider(
-            flb_output_get_property("aws_external_id", ctx->ins),
-            flb_output_get_property("aws_role_arn", ctx->ins),
-            ec, ctx, config);
-    if (ret != 0) {
-        flb_plg_error(ctx->ins, "cannot configure AWS authentication");
-        return -1;
+    /* Elasticsearch: path and pipeline */
+    path = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_PATH, node);
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_PIPELINE, node);
+    if (path || tmp) {
+        if (!path) {
+            path = flb_output_get_property(FLB_ES_CONFIG_PROPERTY_PATH, ctx->ins);
+        }
+        if (!tmp) {
+            tmp = flb_output_get_property(FLB_ES_CONFIG_PROPERTY_PIPELINE, ctx->ins);
+        }
+        ret = config_set_pipeline(path, tmp, ec);
+        if (ret != 0) {
+            flb_plg_error(ctx->ins, "cannot configure path and/or pipeline");
+            return -1;
+        }
+    }
+
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_ID_KEY, node);
+    if (tmp) {
+        ec->id_key = flb_sds_create(tmp);
+        if (ec->id_key == NULL) {
+            return -1;
+        }
+        ec->own_id_key = FLB_TRUE;
+        ret = config_set_ra_id_key(ec->id_key, ec, ctx);
+        if (ret != 0) {
+            return -1;
+        }
+    }
+
+    tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_WRITE_OPERATION, node);
+    if (tmp) {
+        ec->write_operation = flb_sds_create(tmp);
+        if (ec->write_operation == NULL) {
+            return -1;
+        }
+        ec->own_write_operation = FLB_TRUE;
+        ret = config_set_es_action(ec->write_operation, ec->ra_id_key, ec->generate_id,
+                                   ec, ctx);
+
+        if (ret != 0) {
+            return -1;
+        }
+    }
+
+#ifdef FLB_HAVE_AWS
+    if ((base->has_aws_auth != ec->has_aws_auth)
+        || (base->has_aws_auth == FLB_TRUE
+            && ec->has_aws_auth == FLB_TRUE
+            && aws_provider_node == FLB_TRUE)) {
+        ret = flb_es_conf_set_aws_provider(aws_external_id, aws_role_arn, ec, ctx,
+                                           config);
+        if (ret != 0) {
+            flb_plg_error(ctx->ins, "cannot configure AWS authentication");
+            return -1;
+        }
     }
 #endif
 
@@ -193,43 +577,80 @@ static int config_set_properties(struct flb_elasticsearch_config *ec,
 
 static void elasticsearch_config_destroy(struct flb_elasticsearch_config *ec)
 {
-    if (ec->ra_id_key) {
+    if (ec->own_tag_key == FLB_TRUE) {
+        flb_sds_destroy(ec->tag_key);
+    }
+
+    if (ec->ra_id_key && ec->own_ra_id_key == FLB_TRUE) {
         flb_ra_destroy(ec->ra_id_key);
         ec->ra_id_key = NULL;
     }
-    if (ec->es_action) {
-        flb_free(ec->es_action);
+
+    if (ec->own_write_operation == FLB_TRUE) {
+        flb_sds_destroy(ec->write_operation);
+    }
+
+    if (ec->own_id_key == FLB_TRUE) {
+        flb_sds_destroy(ec->id_key);
+    }
+
+    if (ec->own_time_key_format == FLB_TRUE) {
+        flb_sds_destroy(ec->time_key_format);
+    }
+
+    if (ec->own_time_key == FLB_TRUE) {
+        flb_sds_destroy(ec->time_key);
+    }
+
+    if (ec->own_logstash_dateformat == FLB_TRUE) {
+        flb_sds_destroy(ec->logstash_dateformat);
+    }
+
+    if (ec->own_logstash_prefix_key == FLB_TRUE) {
+        flb_sds_destroy(ec->logstash_prefix_key);
+    }
+
+    if (ec->own_logstash_prefix_separator == FLB_TRUE) {
+        flb_sds_destroy(ec->logstash_prefix_separator);
+    }
+
+    if (ec->own_logstash_prefix == FLB_TRUE) {
+        flb_sds_destroy(ec->logstash_prefix);
     }
 
 #ifdef FLB_HAVE_AWS
-    if (ec->base_aws_provider) {
+    if (ec->base_aws_provider && ec->own_base_aws_provider == FLB_TRUE) {
         flb_aws_provider_destroy(ec->base_aws_provider);
     }
 
-    if (ec->aws_provider) {
+    if (ec->aws_provider && ec->own_aws_provider == FLB_TRUE) {
         flb_aws_provider_destroy(ec->aws_provider);
     }
 
-    if (ec->aws_tls) {
+    if (ec->aws_tls && ec->own_aws_tls == FLB_TRUE) {
         flb_tls_destroy(ec->aws_tls);
     }
 
-    if (ec->aws_sts_tls) {
+    if (ec->aws_sts_tls && ec->own_aws_sts_tls == FLB_TRUE) {
         flb_tls_destroy(ec->aws_sts_tls);
     }
 
-    if (ec->aws_unsigned_headers) {
+    if (ec->aws_unsigned_headers && ec->own_aws_unsigned_headers == FLB_TRUE) {
         flb_slist_destroy(ec->aws_unsigned_headers);
         flb_free(ec->aws_unsigned_headers);
     }
 #endif
 
-    if (ec->ra_prefix_key) {
+    if (ec->ra_prefix_key && ec->own_ra_prefix_key == FLB_TRUE) {
         flb_ra_destroy(ec->ra_prefix_key);
     }
 
-    flb_free(ec->cloud_passwd);
-    flb_free(ec->cloud_user);
+    if (ec->own_cloud_passwd == FLB_TRUE) {
+        flb_free(ec->cloud_passwd);
+    }
+    if (ec->own_cloud_user == FLB_TRUE) {
+        flb_free(ec->cloud_user);
+    }
 
     if (ec->own_type == FLB_TRUE) {
         flb_free(ec->type);
@@ -247,10 +668,12 @@ int es_config_ha(const char *upstream_file, struct flb_elasticsearch *ctx,
 {
     int ret;
     struct mk_list *head;
+    struct mk_list *tmp;
     struct flb_upstream_node *node;
     struct flb_elasticsearch_config *ec;
+    struct flb_elasticsearch_config *node_ec;
 
-    /* Create elasticsearch_config context */
+    /* Create main elasticsearch_config context */
     ec = flb_calloc(1, sizeof(struct flb_elasticsearch_config));
     if (!ec) {
         flb_errno();
@@ -258,7 +681,7 @@ int es_config_ha(const char *upstream_file, struct flb_elasticsearch *ctx,
         return -1;
     }
 
-    /* Read properties into elasticsearch_config context */
+    /* Read properties into main elasticsearch_config context */
     ret = config_set_properties(ec, ctx, config);
     if (ret != 0) {
         elasticsearch_config_destroy(ec);
@@ -282,15 +705,61 @@ int es_config_ha(const char *upstream_file, struct flb_elasticsearch *ctx,
     }
 
     /*
-     * Iterate over upstreams nodes and link shared elasticsearch_config context
-     * with each node
+     * Iterate over upstreams nodes and create elasticsearch_config context
+     * for each node
      */
     mk_list_foreach(head, &ctx->ha->nodes) {
         node = mk_list_entry(head, struct flb_upstream_node, _head);
+        /* Create elasticsearch_config context for the upstream node */
+        node_ec = flb_calloc(1, sizeof(struct flb_elasticsearch_config));
+        if (!node_ec) {
+            flb_errno();
+            flb_plg_error(ctx->ins, "failed upstream node config allocation for %s node",
+                          node->name);
+            ret = -1;
+            break;
+        }
+
+        /*
+         * Fill elasticsearch_config context of the upstream node from:
+         * 1. main elasticsearch_config context
+         * 2. upstream node configuration section
+         */
+        ret = config_set_node_properties(node, node_ec, ec, ctx, config);
+        if (ret != 0) {
+            flb_plg_error(ctx->ins, "failed upstream node configuration for %s node",
+                          node->name);
+            elasticsearch_config_destroy(node_ec);
+            break;
+        }
+
+        /* Register allocated elasticsearch_config context for later cleanup */
+        mk_list_add(&node_ec->_head, &ctx->configs);
+
         /* Set elasticsearch_config context into the node opaque data */
-        flb_upstream_node_set_data(ec, node);
+        flb_upstream_node_set_data(node_ec, node);
+    }
+
+    if (ret != 0) {
+        /* Nullify each upstream node elasticsearch_config context */
+        mk_list_foreach(head, &ctx->ha->nodes) {
+            node = mk_list_entry(head, struct flb_upstream_node, _head);
+            flb_upstream_node_set_data(NULL, node);
+        }
+
+        /* Cleanup elasticsearch_config contexts which were created */
+        mk_list_foreach_safe(head, tmp, &ctx->configs) {
+            node_ec = mk_list_entry(head, struct flb_elasticsearch_config, _head);
+            mk_list_del(&node_ec->_head);
+            elasticsearch_config_destroy(node_ec);
+        }
+
+        flb_upstream_ha_destroy(ctx->ha);
+        elasticsearch_config_destroy(ec);
+        return -1;
     }
 
+    /* Register allocated elasticsearch_config context for later cleanup */
     mk_list_add(&ec->_head, &ctx->configs);
 
     return 0;
@@ -380,7 +849,7 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins,
     flb_output_set_context(ins, ctx);
 
     /* Configure HA or simple mode ? */
-    upstream_file = flb_output_get_property("upstream", ins);
+    upstream_file = flb_output_get_property(FLB_ES_CONFIG_PROPERTY_UPSTREAM, ins);
     if (upstream_file) {
         ret = es_config_ha(upstream_file, ctx, config);
     }
diff --git a/plugins/out_es/es_conf.h b/plugins/out_es/es_conf.h
index 7cfcf595730..39d04defb2b 100644
--- a/plugins/out_es/es_conf.h
+++ b/plugins/out_es/es_conf.h
@@ -24,6 +24,11 @@
 
 #include "es.h"
 
+#define FLB_ES_WRITE_OP_INDEX  "index"
+#define FLB_ES_WRITE_OP_CREATE "create"
+#define FLB_ES_WRITE_OP_UPDATE "update"
+#define FLB_ES_WRITE_OP_UPSERT "upsert"
+
 struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins,
                                              struct flb_config *config);
 
diff --git a/plugins/out_es/es_conf_parse.c b/plugins/out_es/es_conf_parse.c
index 4d79f121cf4..9e81a9691b7 100644
--- a/plugins/out_es/es_conf_parse.c
+++ b/plugins/out_es/es_conf_parse.c
@@ -52,9 +52,11 @@ int flb_es_conf_set_cloud_credentials(const char *cloud_auth,
         entry = mk_list_entry(head, struct flb_split_entry, _head);
         if (items == 1) {
             ec->cloud_user = flb_strdup(entry->value);
+            ec->own_cloud_user = FLB_TRUE;
         }
         if (items == 2) {
             ec->cloud_passwd = flb_strdup(entry->value);
+            ec->own_cloud_passwd = FLB_TRUE;
         }
     }
     flb_utils_split_free(toks);
@@ -193,6 +195,7 @@ int flb_es_set_aws_unsigned_headers(struct flb_elasticsearch_config *ec)
         flb_errno();
         return -1;
     }
+    ec->own_aws_unsigned_headers = FLB_TRUE;
 
     flb_slist_create(ec->aws_unsigned_headers);
     ret = flb_slist_add(ec->aws_unsigned_headers, "Content-Length");
@@ -239,6 +242,7 @@ static int set_aws_sts_provider(const char *aws_external_id,
         flb_free(aws_session_name);
         return -1;
     }
+    ec->own_aws_sts_tls = FLB_TRUE;
 
     ec->aws_provider = flb_sts_provider_create(config,
                                                ec->aws_sts_tls,
@@ -250,6 +254,9 @@ static int set_aws_sts_provider(const char *aws_external_id,
                                                ec->aws_sts_endpoint,
                                                NULL,
                                                flb_aws_client_generator());
+    ec->own_base_aws_provider = FLB_TRUE;
+    ec->own_aws_provider = FLB_TRUE;
+
     /* Session name can be freed once provider is created */
     flb_free(aws_session_name);
 
@@ -270,6 +277,9 @@ int flb_es_conf_set_aws_provider(const char *aws_external_id,
     int ret;
 
     if (ec->has_aws_auth == FLB_FALSE) {
+        ec->aws_tls = NULL;
+        ec->aws_provider = NULL;
+        ec->base_aws_provider = NULL;
         return 0;
     }
 
@@ -294,6 +304,7 @@ int flb_es_conf_set_aws_provider(const char *aws_external_id,
         flb_errno();
         return -1;
     }
+    ec->own_aws_tls = FLB_TRUE;
 
     ec->aws_provider = flb_standard_chain_provider_create(config,
                                                           ec->aws_tls,
@@ -306,6 +317,7 @@ int flb_es_conf_set_aws_provider(const char *aws_external_id,
         flb_error("[out_es] Failed to create AWS Credential Provider");
         return -1;
     }
+    ec->own_aws_provider = FLB_TRUE;
 
     ret = set_aws_sts_provider(aws_external_id, aws_role_arn, ec, ctx, config);
     if (ret != 0) {
diff --git a/plugins/out_es/es_conf_prop.h b/plugins/out_es/es_conf_prop.h
new file mode 100644
index 00000000000..b555f4d074b
--- /dev/null
+++ b/plugins/out_es/es_conf_prop.h
@@ -0,0 +1,64 @@
+/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+
+/*  Fluent Bit
+ *  ==========
+ *  Copyright (C) 2015-2024 The Fluent Bit Authors
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+#ifndef FLB_OUT_ES_CONF_PROP_H
+#define FLB_OUT_ES_CONF_PROP_H
+
+#define FLB_ES_CONFIG_PROPERTY_INDEX                     "index"
+#define FLB_ES_CONFIG_PROPERTY_TYPE                      "type"
+#define FLB_ES_CONFIG_PROPERTY_SUPPRESS_TYPE_NAME        "suppress_type_name"
+#define FLB_ES_CONFIG_PROPERTY_HTTP_USER                 "http_user"
+#define FLB_ES_CONFIG_PROPERTY_HTTP_PASSWD               "http_passwd"
+#define FLB_ES_CONFIG_PROPERTY_COMPRESS                  "compress"
+#define FLB_ES_CONFIG_PROPERTY_CLOUD_ID                  "cloud_id"
+#define FLB_ES_CONFIG_PROPERTY_CLOUD_AUTH                "cloud_auth"
+
+#ifdef FLB_HAVE_AWS
+#define FLB_ES_CONFIG_PROPERTY_AWS_AUTH                  "aws_auth"
+#define FLB_ES_CONFIG_PROPERTY_AWS_REGION                "aws_region"
+#define FLB_ES_CONFIG_PROPERTY_AWS_STS_ENDPOINT          "aws_sts_endpoint"
+#define FLB_ES_CONFIG_PROPERTY_AWS_ROLE_ARN              "aws_role_arn"
+#define FLB_ES_CONFIG_PROPERTY_AWS_EXTERNAL_ID           "aws_external_id"
+#define FLB_ES_CONFIG_PROPERTY_AWS_SERVICE_NAME          "aws_service_name"
+#define FLB_ES_CONFIG_PROPERTY_AWS_PROFILE               "aws_profile"
+#endif
+
+#define FLB_ES_CONFIG_PROPERTY_LOGSTASH_FORMAT           "logstash_format"
+#define FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX           "logstash_prefix"
+#define FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX_SEPARATOR "logstash_prefix_separator"
+#define FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX_KEY       "logstash_prefix_key"
+#define FLB_ES_CONFIG_PROPERTY_LOGSTASH_DATEFORMAT       "logstash_dateformat"
+#define FLB_ES_CONFIG_PROPERTY_TIME_KEY                  "time_key"
+#define FLB_ES_CONFIG_PROPERTY_TIME_KEY_FORMAT           "time_key_format"
+#define FLB_ES_CONFIG_PROPERTY_TIME_KEY_NANOS            "time_key_nanos"
+#define FLB_ES_CONFIG_PROPERTY_INCLUDE_TAG_KEY           "include_tag_key"
+#define FLB_ES_CONFIG_PROPERTY_TAG_KEY                   "tag_key"
+#define FLB_ES_CONFIG_PROPERTY_BUFFER_SIZE               "buffer_size"
+#define FLB_ES_CONFIG_PROPERTY_PATH                      "path"
+#define FLB_ES_CONFIG_PROPERTY_PIPELINE                  "pipeline"
+#define FLB_ES_CONFIG_PROPERTY_GENERATE_ID               "generate_id"
+#define FLB_ES_CONFIG_PROPERTY_WRITE_OPERATION           "write_operation"
+#define FLB_ES_CONFIG_PROPERTY_ID_KEY                    "id_key"
+#define FLB_ES_CONFIG_PROPERTY_REPLACE_DOTS              "replace_dots"
+#define FLB_ES_CONFIG_PROPERTY_CURRENT_TIME_INDEX        "current_time_index"
+#define FLB_ES_CONFIG_PROPERTY_TRACE_OUTPUT              "trace_output"
+#define FLB_ES_CONFIG_PROPERTY_TRACE_ERROR               "trace_error"
+#define FLB_ES_CONFIG_PROPERTY_UPSTREAM                  "upstream"
+
+#endif