diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index 26c3af8ae27..04f59eba4b2 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -22,6 +22,8 @@ #include #include #include +#include +#include #include #include #include @@ -39,21 +41,22 @@ struct flb_output_plugin out_es_plugin; static int es_pack_array_content(msgpack_packer *tmp_pck, msgpack_object array, - struct flb_elasticsearch *ctx); + struct flb_elasticsearch_config *ec); #ifdef FLB_HAVE_AWS static flb_sds_t add_aws_auth(struct flb_http_client *c, - struct flb_elasticsearch *ctx) + struct flb_output_instance *ins, + struct flb_elasticsearch_config *ec) { flb_sds_t signature = NULL; int ret; - flb_plg_debug(ctx->ins, "Signing request with AWS Sigv4"); + flb_plg_debug(ins, "Signing request with AWS Sigv4"); /* Amazon ES Sigv4 does not allow the host header to include the port */ ret = flb_http_strip_port_from_host(c); if (ret < 0) { - flb_plg_error(ctx->ins, "could not strip port from host for sigv4"); + flb_plg_error(ins, "could not strip port from host for sigv4"); return NULL; } @@ -61,11 +64,11 @@ static flb_sds_t add_aws_auth(struct flb_http_client *c, flb_http_add_header(c, "User-Agent", 10, "aws-fluent-bit-plugin", 21); signature = flb_signv4_do(c, FLB_TRUE, FLB_TRUE, time(NULL), - ctx->aws_region, "es", + ec->aws_region, "es", 0, - ctx->aws_provider); + ec->aws_provider); if (!signature) { - flb_plg_error(ctx->ins, "could not sign request with sigv4"); + flb_plg_error(ins, "could not sign request with sigv4"); return NULL; } return signature; @@ -74,7 +77,7 @@ static flb_sds_t add_aws_auth(struct flb_http_client *c, static int es_pack_map_content(msgpack_packer *tmp_pck, msgpack_object map, - struct flb_elasticsearch *ctx) + struct flb_elasticsearch_config *ec) { int i; char *ptr_key = NULL; @@ -123,7 +126,7 @@ static int es_pack_map_content(msgpack_packer *tmp_pck, * * https://goo.gl/R5NMTr */ - if (ctx->replace_dots == FLB_TRUE) { + if (ec->replace_dots == FLB_TRUE) { char *p = ptr_key; char *end = ptr_key + key_size; while (p != end) { @@ -148,7 +151,7 @@ static int es_pack_map_content(msgpack_packer *tmp_pck, */ if (v->type == MSGPACK_OBJECT_MAP) { msgpack_pack_map(tmp_pck, v->via.map.size); - es_pack_map_content(tmp_pck, *v, ctx); + es_pack_map_content(tmp_pck, *v, ec); } /* * The value can be any data type, if it's an array we need to @@ -156,7 +159,7 @@ static int es_pack_map_content(msgpack_packer *tmp_pck, */ else if (v->type == MSGPACK_OBJECT_ARRAY) { msgpack_pack_array(tmp_pck, v->via.array.size); - es_pack_array_content(tmp_pck, *v, ctx); + es_pack_array_content(tmp_pck, *v, ec); } else { msgpack_pack_object(tmp_pck, *v); @@ -171,7 +174,7 @@ static int es_pack_map_content(msgpack_packer *tmp_pck, */ static int es_pack_array_content(msgpack_packer *tmp_pck, msgpack_object array, - struct flb_elasticsearch *ctx) + struct flb_elasticsearch_config *ec) { int i; msgpack_object *e; @@ -181,12 +184,12 @@ static int es_pack_array_content(msgpack_packer *tmp_pck, if (e->type == MSGPACK_OBJECT_MAP) { msgpack_pack_map(tmp_pck, e->via.map.size); - es_pack_map_content(tmp_pck, *e, ctx); + es_pack_map_content(tmp_pck, *e, ec); } else if (e->type == MSGPACK_OBJECT_ARRAY) { msgpack_pack_array(tmp_pck, e->via.array.size); - es_pack_array_content(tmp_pck, *e, ctx); + es_pack_array_content(tmp_pck, *e, ec); } else { @@ -239,7 +242,7 @@ static int elasticsearch_format(struct flb_config *config, int i; msgpack_object key; msgpack_object val; - struct flb_elasticsearch *ctx = plugin_context; + struct flb_elasticsearch_config *ec = plugin_context; /* Iterate the original buffer and perform adjustments */ msgpack_unpacked_init(&result); @@ -278,9 +281,9 @@ static int elasticsearch_format(struct flb_config *config, msgpack_unpacked_init(&result); /* Copy logstash prefix if logstash format is enabled */ - if (ctx->logstash_format == FLB_TRUE) { - memcpy(logstash_index, ctx->logstash_prefix, flb_sds_len(ctx->logstash_prefix)); - logstash_index[flb_sds_len(ctx->logstash_prefix)] = '\0'; + if (ec->logstash_format == FLB_TRUE) { + memcpy(logstash_index, ec->logstash_prefix, flb_sds_len(ec->logstash_prefix)); + logstash_index[flb_sds_len(ec->logstash_prefix)] = '\0'; } /* @@ -290,17 +293,17 @@ static int elasticsearch_format(struct flb_config *config, * The header stored in 'j_index' will be used for the all records on * this payload. */ - if (ctx->logstash_format == FLB_FALSE && ctx->generate_id == FLB_FALSE) { + if (ec->logstash_format == FLB_FALSE && ec->generate_id == FLB_FALSE) { flb_time_get(&tms); gmtime_r(&tms.tm.tv_sec, &tm); strftime(index_formatted, sizeof(index_formatted) - 1, - ctx->index, &tm); + ec->index, &tm); es_index = index_formatted; index_len = snprintf(j_index, ES_BULK_HEADER, ES_BULK_INDEX_FMT, - es_index, ctx->type); + es_index, ec->type); } /* @@ -309,7 +312,7 @@ static int elasticsearch_format(struct flb_config *config, * in order to prevent generating millions of indexes * we can set to always use current time for index generation */ - if (ctx->current_time_index == FLB_TRUE) { + if (ec->current_time_index == FLB_TRUE) { flb_time_get(&tms); } @@ -326,7 +329,7 @@ static int elasticsearch_format(struct flb_config *config, } /* Only pop time from record if current_time_index is disabled */ - if (ctx->current_time_index == FLB_FALSE) { + if (ec->current_time_index == FLB_FALSE) { flb_time_pop_from_msgpack(&tms, &result, &obj); } @@ -334,17 +337,17 @@ static int elasticsearch_format(struct flb_config *config, map_size = map.via.map.size; es_index_custom_len = 0; - if (ctx->logstash_prefix_key) { + if (ec->logstash_prefix_key) { for (i = 0; i < map_size; i++) { key = map.via.map.ptr[i].key; if (key.type != MSGPACK_OBJECT_STR) { continue; } - if (key.via.str.size != flb_sds_len(ctx->logstash_prefix_key)) { + if (key.via.str.size != flb_sds_len(ec->logstash_prefix_key)) { continue; } - if (strncmp(key.via.str.ptr, ctx->logstash_prefix_key, - flb_sds_len(ctx->logstash_prefix_key)) != 0) { + if (strncmp(key.via.str.ptr, ec->logstash_prefix_key, + flb_sds_len(ec->logstash_prefix_key)) != 0) { continue; } val = map.via.map.ptr[i].val; @@ -366,7 +369,7 @@ static int elasticsearch_format(struct flb_config *config, msgpack_sbuffer_init(&tmp_sbuf); msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write); - if (ctx->include_tag_key == FLB_TRUE) { + if (ec->include_tag_key == FLB_TRUE) { map_size++; } @@ -374,14 +377,14 @@ static int elasticsearch_format(struct flb_config *config, msgpack_pack_map(&tmp_pck, map_size + 1); /* Append the time key */ - msgpack_pack_str(&tmp_pck, flb_sds_len(ctx->time_key)); - msgpack_pack_str_body(&tmp_pck, ctx->time_key, flb_sds_len(ctx->time_key)); + msgpack_pack_str(&tmp_pck, flb_sds_len(ec->time_key)); + msgpack_pack_str_body(&tmp_pck, ec->time_key, flb_sds_len(ec->time_key)); /* Format the time */ gmtime_r(&tms.tm.tv_sec, &tm); s = strftime(time_formatted, sizeof(time_formatted) - 1, - ctx->time_key_format, &tm); - if (ctx->time_key_nanos) { + ec->time_key_format, &tm); + if (ec->time_key_nanos) { len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s, ".%09" PRIu64 "Z", (uint64_t) tms.tm.tv_nsec); } else { @@ -394,40 +397,40 @@ static int elasticsearch_format(struct flb_config *config, msgpack_pack_str(&tmp_pck, s); msgpack_pack_str_body(&tmp_pck, time_formatted, s); - es_index = ctx->index; - if (ctx->logstash_format == FLB_TRUE) { + es_index = ec->index; + if (ec->logstash_format == FLB_TRUE) { /* Compose Index header */ if (es_index_custom_len > 0) { p = logstash_index + es_index_custom_len; } else { - p = logstash_index + flb_sds_len(ctx->logstash_prefix); + p = logstash_index + flb_sds_len(ec->logstash_prefix); } *p++ = '-'; len = p - logstash_index; s = strftime(p, sizeof(logstash_index) - len - 1, - ctx->logstash_dateformat, &tm); + ec->logstash_dateformat, &tm); p += s; *p++ = '\0'; es_index = logstash_index; - if (ctx->generate_id == FLB_FALSE) { + if (ec->generate_id == FLB_FALSE) { index_len = snprintf(j_index, ES_BULK_HEADER, ES_BULK_INDEX_FMT, - es_index, ctx->type); + es_index, ec->type); } } - else if (ctx->current_time_index == FLB_TRUE) { + else if (ec->current_time_index == FLB_TRUE) { /* Make sure we handle index time format for index */ strftime(index_formatted, sizeof(index_formatted) - 1, - ctx->index, &tm); + ec->index, &tm); es_index = index_formatted; } /* Tag Key */ - if (ctx->include_tag_key == FLB_TRUE) { - msgpack_pack_str(&tmp_pck, flb_sds_len(ctx->tag_key)); - msgpack_pack_str_body(&tmp_pck, ctx->tag_key, flb_sds_len(ctx->tag_key)); + if (ec->include_tag_key == FLB_TRUE) { + msgpack_pack_str(&tmp_pck, flb_sds_len(ec->tag_key)); + msgpack_pack_str_body(&tmp_pck, ec->tag_key, flb_sds_len(ec->tag_key)); msgpack_pack_str(&tmp_pck, tag_len); msgpack_pack_str_body(&tmp_pck, tag, tag_len); } @@ -439,7 +442,7 @@ static int elasticsearch_format(struct flb_config *config, * Elasticsearch have a restriction that key names cannot contain * a dot; if some dot is found, it's replaced with an underscore. */ - ret = es_pack_map_content(&tmp_pck, map, ctx); + ret = es_pack_map_content(&tmp_pck, map, ec); if (ret == -1) { msgpack_unpacked_destroy(&result); msgpack_sbuffer_destroy(&tmp_sbuf); @@ -447,7 +450,7 @@ static int elasticsearch_format(struct flb_config *config, return -1; } - if (ctx->generate_id == FLB_TRUE) { + if (ec->generate_id == FLB_TRUE) { MurmurHash3_x64_128(tmp_sbuf.data, tmp_sbuf.size, 42, hash); snprintf(es_uuid, sizeof(es_uuid), "%04x%04x-%04x-%04x-%04x-%04x%04x%04x", @@ -456,7 +459,7 @@ static int elasticsearch_format(struct flb_config *config, index_len = snprintf(j_index, ES_BULK_HEADER, ES_BULK_INDEX_FMT_ID, - es_index, ctx->type, es_uuid); + es_index, ec->type, es_uuid); } /* Convert msgpack to JSON */ @@ -491,7 +494,7 @@ static int elasticsearch_format(struct flb_config *config, * return the bulk->ptr buffer */ flb_free(bulk); - if (ctx->trace_output) { + if (ec->trace_output) { fwrite(*out_data, 1, *out_size, stdout); fflush(stdout); } @@ -503,19 +506,30 @@ static int cb_es_init(struct flb_output_instance *ins, struct flb_config *config, void *data) { + int ret = 0; + const char *tmp; struct flb_elasticsearch *ctx; + (void) data; - ctx = flb_es_conf_create(ins, config); + ctx = flb_calloc(1, sizeof(struct flb_elasticsearch)); if (!ctx) { - flb_plg_error(ins, "cannot initialize plugin"); + flb_errno(); return -1; } + ctx->ins = ins; - flb_plg_debug(ctx->ins, "host=%s port=%i uri=%s index=%s type=%s", - ins->host.name, ins->host.port, ctx->uri, - ctx->index, ctx->type); - + mk_list_init(&ctx->configs); flb_output_set_context(ins, ctx); + + /* Configure HA or simple mode ? */ + tmp = flb_output_get_property("upstream", ins); + if (tmp) { + ret = es_config_ha(tmp, ctx, config); + } + else { + ret = es_config_simple(ctx, ins, config); + } + /* * This plugin instance uses the HTTP client interface, let's register @@ -523,7 +537,7 @@ static int cb_es_init(struct flb_output_instance *ins, */ flb_output_set_http_debug_callbacks(ins); - return 0; + return ret; } static int elasticsearch_error_check(struct flb_elasticsearch *ctx, @@ -636,19 +650,44 @@ static void cb_es_flush(const void *data, size_t bytes, size_t out_size; size_t b_sent; struct flb_elasticsearch *ctx = out_context; + struct flb_elasticsearch_config *ec = NULL; struct flb_upstream_conn *u_conn; + struct flb_upstream_node *node; struct flb_http_client *c; flb_sds_t signature = NULL; + if (ctx->ha_mode == FLB_TRUE) { + node = flb_upstream_ha_node_get(ctx->ha); + if (!node) { + flb_plg_error(ctx->ins, "cannot get an Upstream HA node"); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + /* Get forward_config stored in node opaque data */ + ec = flb_upstream_node_get_data(node); + flb_plg_debug(ctx->ins, "trying node %s", node->name); + } + else { + ec = mk_list_entry_first(&ctx->configs, + struct flb_elasticsearch_config, + _head); + } + /* Get upstream connection */ - u_conn = flb_upstream_conn_get(ctx->u); + if (ctx->ha_mode == FLB_TRUE) { + u_conn = flb_upstream_conn_get(node->u); + } + else { + u_conn = flb_upstream_conn_get(ctx->u); + } if (!u_conn) { + flb_plg_error(ctx->ins, "no upstream connections available"); FLB_OUTPUT_RETURN(FLB_RETRY); } /* Convert format */ ret = elasticsearch_format(config, ins, - ctx, NULL, + ec, NULL, tag, tag_len, data, bytes, &out_buf, &out_size); @@ -661,10 +700,10 @@ static void cb_es_flush(const void *data, size_t bytes, pack_size = out_size; /* Compose HTTP Client request */ - c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->uri, + c = flb_http_client(u_conn, FLB_HTTP_POST, ec->uri, pack, pack_size, NULL, 0, NULL, 0); - flb_http_buffer_size(c, ctx->buffer_size); + flb_http_buffer_size(c, ec->buffer_size); #ifndef FLB_HAVE_AWS flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); @@ -672,13 +711,13 @@ static void cb_es_flush(const void *data, size_t bytes, flb_http_add_header(c, "Content-Type", 12, "application/x-ndjson", 20); - if (ctx->http_user && ctx->http_passwd) { - flb_http_basic_auth(c, ctx->http_user, ctx->http_passwd); + if (ec->http_user && ec->http_passwd) { + flb_http_basic_auth(c, ec->http_user, ec->http_passwd); } #ifdef FLB_HAVE_AWS - if (ctx->has_aws_auth == FLB_TRUE) { - signature = add_aws_auth(c, ctx); + if (ec->has_aws_auth == FLB_TRUE) { + signature = add_aws_auth(c, ctx->ins, ec); if (!signature) { goto retry; } @@ -690,20 +729,20 @@ static void cb_es_flush(const void *data, size_t bytes, ret = flb_http_do(c, &b_sent); if (ret != 0) { - flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri); + flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ec->uri); goto retry; } else { /* The request was issued successfully, validate the 'error' field */ - flb_plg_debug(ctx->ins, "HTTP Status=%i URI=%s", c->resp.status, ctx->uri); + flb_plg_debug(ctx->ins, "HTTP Status=%i URI=%s", c->resp.status, ec->uri); if (c->resp.status != 200 && c->resp.status != 201) { if (c->resp.payload_size > 0) { flb_plg_error(ctx->ins, "HTTP status=%i URI=%s, response:\n%s\n", - c->resp.status, ctx->uri, c->resp.payload); + c->resp.status, ec->uri, c->resp.payload); } else { flb_plg_error(ctx->ins, "HTTP status=%i URI=%s", - c->resp.status, ctx->uri); + c->resp.status, ec->uri); } goto retry; } @@ -716,7 +755,7 @@ static void cb_es_flush(const void *data, size_t bytes, ret = elasticsearch_error_check(ctx, c); if (ret == FLB_TRUE) { /* we got an error */ - if (ctx->trace_error) { + if (ec->trace_error) { /* * If trace_error is set, trace the actual * input/output to Elasticsearch that caused the problem. @@ -758,8 +797,32 @@ static void cb_es_flush(const void *data, size_t bytes, static int cb_es_exit(void *data, struct flb_config *config) { struct flb_elasticsearch *ctx = data; + struct flb_elasticsearch_config *ec; + struct mk_list *head; + struct mk_list *tmp; + (void) config; + + if (!ctx) { + return 0; + } + + /* Destroy elasticsearch_config contexts */ + mk_list_foreach_safe(head, tmp, &ctx->configs) { + ec = mk_list_entry(head, struct flb_elasticsearch_config, _head); + mk_list_del(&ec->_head); + flb_es_conf_destroy(ec); + } + + if (ctx->ha_mode == FLB_TRUE) { + if (ctx->ha) { + flb_upstream_ha_destroy(ctx->ha); + } + } + else { + flb_upstream_destroy(ctx->u); + } + flb_free(ctx); - flb_es_conf_destroy(ctx); return 0; } @@ -767,24 +830,24 @@ static int cb_es_exit(void *data, struct flb_config *config) static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_STR, "index", FLB_ES_DEFAULT_INDEX, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, index), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, index), NULL }, { FLB_CONFIG_MAP_STR, "type", FLB_ES_DEFAULT_TYPE, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, type), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, type), NULL }, /* HTTP Authentication */ { FLB_CONFIG_MAP_STR, "http_user", NULL, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, http_user), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, http_user), NULL }, { FLB_CONFIG_MAP_STR, "http_passwd", "", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, http_passwd), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, http_passwd), NULL }, @@ -792,17 +855,17 @@ static struct flb_config_map config_map[] = { #ifdef FLB_HAVE_AWS { FLB_CONFIG_MAP_BOOL, "aws_auth", "false", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, has_aws_auth), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, has_aws_auth), "Enable AWS Sigv4 Authentication" }, { FLB_CONFIG_MAP_STR, "aws_region", NULL, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, aws_region), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, aws_region), "AWS Region of your Amazon ElasticSearch Service cluster" }, { FLB_CONFIG_MAP_STR, "aws_sts_endpoint", NULL, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, aws_sts_endpoint), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, aws_sts_endpoint), "Custom endpoint for the AWS STS API, used with the AWS_Role_ARN option" }, { @@ -820,54 +883,54 @@ static struct flb_config_map config_map[] = { /* Logstash compatibility */ { FLB_CONFIG_MAP_BOOL, "logstash_format", "false", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_format), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_format), NULL }, { FLB_CONFIG_MAP_STR, "logstash_prefix", FLB_ES_DEFAULT_PREFIX, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_prefix), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_prefix), NULL }, { FLB_CONFIG_MAP_STR, "logstash_prefix_key", NULL, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_prefix_key), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_prefix_key), NULL }, { FLB_CONFIG_MAP_STR, "logstash_dateformat", FLB_ES_DEFAULT_TIME_FMT, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_dateformat), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_dateformat), NULL }, /* Custom Time and Tag keys */ { FLB_CONFIG_MAP_STR, "time_key", FLB_ES_DEFAULT_TIME_KEY, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, time_key), NULL }, { FLB_CONFIG_MAP_STR, "time_key_format", FLB_ES_DEFAULT_TIME_KEYF, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key_format), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, time_key_format), NULL }, { FLB_CONFIG_MAP_BOOL, "time_key_nanos", "false", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key_nanos), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, time_key_nanos), NULL }, { FLB_CONFIG_MAP_BOOL, "include_tag_key", "false", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, include_tag_key), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, include_tag_key), NULL }, { FLB_CONFIG_MAP_STR, "tag_key", FLB_ES_DEFAULT_TAG_KEY, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, tag_key), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, tag_key), NULL }, { FLB_CONFIG_MAP_SIZE, "buffer_size", FLB_ES_DEFAULT_HTTP_MAX, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, buffer_size), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, buffer_size), NULL }, @@ -884,30 +947,35 @@ static struct flb_config_map config_map[] = { }, { FLB_CONFIG_MAP_BOOL, "generate_id", "false", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, generate_id), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, generate_id), NULL }, { FLB_CONFIG_MAP_BOOL, "replace_dots", "false", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, replace_dots), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, replace_dots), NULL }, { FLB_CONFIG_MAP_BOOL, "current_time_index", "false", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, current_time_index), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, current_time_index), NULL }, /* Trace */ { FLB_CONFIG_MAP_BOOL, "trace_output", "false", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, trace_output), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, trace_output), NULL }, { FLB_CONFIG_MAP_BOOL, "trace_error", "false", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, trace_error), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, trace_error), + NULL + }, + { + FLB_CONFIG_MAP_BOOL, "upstream", NULL, + 0, FLB_FALSE, 0, NULL }, diff --git a/plugins/out_es/es.h b/plugins/out_es/es.h index 68b3a1da510..c3096c1e870 100644 --- a/plugins/out_es/es.h +++ b/plugins/out_es/es.h @@ -32,7 +32,16 @@ #define FLB_ES_DEFAULT_TAG_KEY "flb-key" #define FLB_ES_DEFAULT_HTTP_MAX "512k" -struct flb_elasticsearch { +/* + * Configuration: we put this separate from the main + * context so every Upstream Node can have it own configuration + * reference and pass it smoothly to the required caller. + * + * On simple mode (no HA), the structure is referenced + * by flb_es->config. In HA mode the structure is referenced + * by the Upstream node context as an opaque data type. + */ +struct flb_elasticsearch_config { /* Elasticsearch index (database) and type (table) */ char *index; char *type; @@ -102,10 +111,20 @@ struct flb_elasticsearch { /* Elasticsearch HTTP API */ char uri[256]; - /* Upstream connection to the backend server */ - struct flb_upstream *u; + /* Link to list flb_elasticsearch->configs */ + struct mk_list _head; +}; + +/* Plugin Context */ +struct flb_elasticsearch { + /* if HA mode is enabled */ + int ha_mode; /* High Availability mode enabled ? */ + char *ha_upstream; /* Upstream configuration file */ + struct flb_upstream_ha *ha; - /* Plugin output instance reference */ + /* Upstream handler and config context for single mode (no HA) */ + struct flb_upstream *u; + struct mk_list configs; struct flb_output_instance *ins; }; diff --git a/plugins/out_es/es_conf.c b/plugins/out_es/es_conf.c index 4968229db65..7b29f235765 100644 --- a/plugins/out_es/es_conf.c +++ b/plugins/out_es/es_conf.c @@ -22,14 +22,17 @@ #include #include #include +#include +#include #include #include #include "es.h" #include "es_conf.h" -struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, - struct flb_config *config) +int es_config_simple(struct flb_elasticsearch *ctx, + struct flb_output_instance *ins, + struct flb_config *config) { int io_flags = 0; @@ -44,16 +47,15 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, struct flb_uri *uri = ins->host.uri; struct flb_uri_field *f_index = NULL; struct flb_uri_field *f_type = NULL; + struct flb_elasticsearch_config *ec = NULL; struct flb_upstream *upstream; - struct flb_elasticsearch *ctx; /* Allocate context */ - ctx = flb_calloc(1, sizeof(struct flb_elasticsearch)); - if (!ctx) { + ec = flb_calloc(1, sizeof(struct flb_elasticsearch_config)); + if (!ec) { flb_errno(); - return NULL; + return -1; } - ctx->ins = ins; if (uri) { if (uri->count >= 2) { @@ -66,11 +68,11 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, flb_output_net_default("127.0.0.1", 9200, ins); /* Populate context with config map defaults and incoming properties */ - ret = flb_output_config_map_set(ins, (void *) ctx); + ret = flb_output_config_map_set(ins, (void *) ec); if (ret == -1) { flb_plg_error(ctx->ins, "configuration error"); - flb_es_conf_destroy(ctx); - return NULL; + flb_es_conf_destroy(ec); + return -1; } /* use TLS ? */ @@ -93,8 +95,8 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, &ins->tls); if (!upstream) { flb_plg_error(ctx->ins, "cannot create Upstream context"); - flb_es_conf_destroy(ctx); - return NULL; + flb_es_conf_destroy(ec); + return -1; } ctx->u = upstream; @@ -103,16 +105,16 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, /* Set manual Index and Type */ if (f_index) { - ctx->index = flb_strdup(f_index->value); /* FIXME */ + ec->index = flb_strdup(f_index->value); /* FIXME */ } if (f_type) { - ctx->type = flb_strdup(f_type->value); /* FIXME */ + ec->type = flb_strdup(f_type->value); /* FIXME */ } /* HTTP Payload (response) maximum buffer size (0 == unlimited) */ - if (ctx->buffer_size == -1) { - ctx->buffer_size = 0; + if (ec->buffer_size == -1) { + ec->buffer_size = 0; } /* Elasticsearch: Path */ @@ -124,65 +126,65 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, /* Elasticsearch: Pipeline */ tmp = flb_output_get_property("pipeline", ins); if (tmp) { - snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk/?pipeline=%s", path, tmp); + snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk/?pipeline=%s", path, tmp); } else { - snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk", path); + snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk", path); } #ifdef FLB_HAVE_AWS /* AWS Auth */ - ctx->has_aws_auth = FLB_FALSE; + ec->has_aws_auth = FLB_FALSE; tmp = flb_output_get_property("aws_auth", ins); if (tmp) { if (strncasecmp(tmp, "On", 2) == 0) { - ctx->has_aws_auth = FLB_TRUE; + ec->has_aws_auth = FLB_TRUE; flb_debug("[out_es] Enabled AWS Auth"); /* AWS provider needs a separate TLS instance */ - ctx->aws_tls.context = flb_tls_context_new(FLB_TRUE, - ins->tls_debug, - ins->tls_vhost, - ins->tls_ca_path, - ins->tls_ca_file, - ins->tls_crt_file, - ins->tls_key_file, - ins->tls_key_passwd); - if (!ctx->aws_tls.context) { + ec->aws_tls.context = flb_tls_context_new(FLB_TRUE, + ins->tls_debug, + ins->tls_vhost, + ins->tls_ca_path, + ins->tls_ca_file, + ins->tls_crt_file, + ins->tls_key_file, + ins->tls_key_passwd); + if (!ec->aws_tls.context) { flb_errno(); - flb_es_conf_destroy(ctx); - return NULL; + flb_es_conf_destroy(ec); + return -1; } tmp = flb_output_get_property("aws_region", ins); if (!tmp) { flb_error("[out_es] aws_auth enabled but aws_region not set"); - flb_es_conf_destroy(ctx); - return NULL; + flb_es_conf_destroy(ec); + return -1; } - ctx->aws_region = (char *) tmp; + ec->aws_region = (char *) tmp; tmp = flb_output_get_property("aws_sts_endpoint", ins); if (tmp) { - ctx->aws_sts_endpoint = (char *) tmp; + ec->aws_sts_endpoint = (char *) tmp; } - ctx->aws_provider = flb_standard_chain_provider_create(config, - &ctx->aws_tls, - ctx->aws_region, - ctx->aws_sts_endpoint, + ec->aws_provider = flb_standard_chain_provider_create(config, + &ec->aws_tls, + ec->aws_region, + ec->aws_sts_endpoint, NULL, flb_aws_client_generator()); - if (!ctx->aws_provider) { + if (!ec->aws_provider) { flb_error("[out_es] Failed to create AWS Credential Provider"); - flb_es_conf_destroy(ctx); - return NULL; + flb_es_conf_destroy(ec); + return -1; } tmp = flb_output_get_property("aws_role_arn", ins); if (tmp) { /* Use the STS Provider */ - ctx->base_aws_provider = ctx->aws_provider; + ec->base_aws_provider = ec->aws_provider; aws_role_arn = (char *) tmp; aws_external_id = NULL; tmp = flb_output_get_property("aws_external_id", ins); @@ -194,88 +196,190 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, if (!aws_session_name) { flb_error("[out_es] Failed to create aws iam role " "session name"); - flb_es_conf_destroy(ctx); - return NULL; + flb_es_conf_destroy(ec); + return -1; } /* STS provider needs yet another separate TLS instance */ - ctx->aws_sts_tls.context = flb_tls_context_new(FLB_TRUE, - ins->tls_debug, - ins->tls_vhost, - ins->tls_ca_path, - ins->tls_ca_file, - ins->tls_crt_file, - ins->tls_key_file, - ins->tls_key_passwd); - if (!ctx->aws_sts_tls.context) { + ec->aws_sts_tls.context = flb_tls_context_new(FLB_TRUE, + ins->tls_debug, + ins->tls_vhost, + ins->tls_ca_path, + ins->tls_ca_file, + ins->tls_crt_file, + ins->tls_key_file, + ins->tls_key_passwd); + if(ec->aws_sts_tls.context) { flb_errno(); - flb_es_conf_destroy(ctx); - return NULL; + flb_es_conf_destroy(ec); + return -1; } - ctx->aws_provider = flb_sts_provider_create(config, - &ctx->aws_sts_tls, - ctx-> + ec->aws_provider = flb_sts_provider_create(config, + &ec->aws_sts_tls, + ec-> base_aws_provider, aws_external_id, aws_role_arn, aws_session_name, - ctx->aws_region, - ctx->aws_sts_endpoint, + ec->aws_region, + ec->aws_sts_endpoint, NULL, flb_aws_client_generator()); /* Session name can be freed once provider is created */ flb_free(aws_session_name); - if (!ctx->aws_provider) { + if (!ec->aws_provider) { flb_error("[out_es] Failed to create AWS STS Credential " "Provider"); - flb_es_conf_destroy(ctx); - return NULL; + flb_es_conf_destroy(ec); + return -1; } } /* initialize credentials in sync mode */ - ctx->aws_provider->provider_vtable->sync(ctx->aws_provider); - ctx->aws_provider->provider_vtable->init(ctx->aws_provider); + ec->aws_provider->provider_vtable->sync(ec->aws_provider); + ec->aws_provider->provider_vtable->init(ec->aws_provider); /* set back to async */ - ctx->aws_provider->provider_vtable->async(ctx->aws_provider); + ec->aws_provider->provider_vtable->async(ec->aws_provider); } } #endif - return ctx; + + /* Initialize and validate es_config context */ + ret = flb_es_conf_init(ec, ctx); + if (ret == -1) { + if (ec) { + flb_es_conf_destroy(ec); + } + return -1; + } + + return 0; } -int flb_es_conf_destroy(struct flb_elasticsearch *ctx) +/* Configure in HA mode */ +int es_config_ha(const char *upstream_file, + struct flb_elasticsearch *ctx, + struct flb_config *config) { - if (!ctx) { - return 0; + ssize_t ret = 0; + const char *tmp; + const char *path; + struct mk_list *head; + struct flb_uri *uri = ctx->ins->host.uri; + struct flb_uri_field *f_index = NULL; + struct flb_uri_field *f_type = NULL; + struct flb_upstream_node *node; + struct flb_elasticsearch_config *ec = NULL; + + ctx->ha_mode = FLB_TRUE; + ctx->ha = flb_upstream_ha_from_file(upstream_file, config); + if (!ctx->ha) { + flb_error("[out_es] cannot load Upstream file"); + return -1; + } + + if (uri) { + if (uri->count >= 2) { + f_index = flb_uri_get(uri, 0); + f_type = flb_uri_get(uri, 1); + } + } + + /* Iterate nodes and create a forward_config context */ + mk_list_foreach(head, &ctx->ha->nodes) { + node = mk_list_entry(head, struct flb_upstream_node, _head); + + /* Allocate context */ + ec = flb_calloc(1, sizeof(struct flb_elasticsearch_config)); + if (!ec) { + flb_errno(); + flb_error("[out_es] failed config allocation"); + continue; + } + + /* Set manual Index and Type */ + if (f_index) { + ec->index = flb_strdup(f_index->value); /* FIXME */ + } + + if (f_type) { + ec->type = flb_strdup(f_type->value); /* FIXME */ + } + + /* Set default values */ + ret = flb_output_config_map_set(ctx->ins, ec); + if (ret == -1) { + flb_free(ec); + return -1; + } + + /* Elasticsearch: Path */ + path = flb_upstream_node_get_property("path", node); + if (!path) { + path = ""; + } + + /* Elasticsearch: Pipeline */ + tmp = flb_upstream_node_get_property("pipeline", node); + if (tmp) { + snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk/?pipeline=%s", path, tmp); + } + else { + snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk", path); + } + + /* Initialize and validate es_config context */ + ret = flb_es_conf_init(ec, ctx); + if (ret == -1) { + if (ec) { + flb_es_conf_destroy(ec); + } + return -1; + } + + /* Set our elasticsearch_config context into the node */ + flb_upstream_node_set_data(ec, node); } - if (ctx->u) { - flb_upstream_destroy(ctx->u); + return 0; +} + +int flb_es_conf_init(struct flb_elasticsearch_config *ec, + struct flb_elasticsearch *ctx) +{ + mk_list_add(&ec->_head, &ctx->configs); + return 0; +} + +int flb_es_conf_destroy(struct flb_elasticsearch_config *ec) +{ + if (!ec) { + return 0; } #ifdef FLB_HAVE_AWS - if (ctx->base_aws_provider) { - flb_aws_provider_destroy(ctx->base_aws_provider); + if (ec->base_aws_provider) { + flb_aws_provider_destroy(ec->base_aws_provider); } - if (ctx->aws_provider) { - flb_aws_provider_destroy(ctx->aws_provider); + if (ec->aws_provider) { + flb_aws_provider_destroy(ec->aws_provider); } - if (ctx->aws_tls.context) { - flb_tls_context_destroy(ctx->aws_tls.context); + if (ec->aws_tls.context) { + flb_tls_context_destroy(ec->aws_tls.context); } - if (ctx->aws_sts_tls.context) { - flb_tls_context_destroy(ctx->aws_sts_tls.context); + if (ec->aws_sts_tls.context) { + flb_tls_context_destroy(ec->aws_sts_tls.context); } #endif - flb_free(ctx); + flb_free(ec); return 0; } + diff --git a/plugins/out_es/es_conf.h b/plugins/out_es/es_conf.h index 52c37c74570..af95ba45b0b 100644 --- a/plugins/out_es/es_conf.h +++ b/plugins/out_es/es_conf.h @@ -27,8 +27,18 @@ #include "es.h" -struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, - struct flb_config *config); -int flb_es_conf_destroy(struct flb_elasticsearch *ctx); + +int es_config_ha(const char *upstream_file, + struct flb_elasticsearch *ctx, + struct flb_config *config); + +int es_config_simple(struct flb_elasticsearch *ctx, + struct flb_output_instance *ins, + struct flb_config *config); + +int flb_es_conf_init(struct flb_elasticsearch_config *ec, + struct flb_elasticsearch *ctx); + +int flb_es_conf_destroy(struct flb_elasticsearch_config *ec); #endif