From b08b568a13d24000f5ca3aaec4b1a35573807a44 Mon Sep 17 00:00:00 2001 From: Clara Pohland <54847419+claralui@users.noreply.github.com> Date: Mon, 9 Sep 2019 11:15:27 +0200 Subject: [PATCH] es_out: support multiple output nodes in round-robin by using upstream servers configuration Signed-off-by: Clara Pohland <54847419+claralui@users.noreply.github.com> --- plugins/out_es/es.c | 164 ++++++++++----- plugins/out_es/es.h | 15 +- plugins/out_es/es_conf.c | 429 ++++++++++++++++++++++++++++++++------- plugins/out_es/es_conf.h | 13 +- 4 files changed, 487 insertions(+), 134 deletions(-) diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index 1d45f845366..34347509e54 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -23,6 +23,8 @@ #include #include #include +#include +#include #include #include #include @@ -38,7 +40,7 @@ struct flb_output_plugin out_es_plugin; static inline int es_pack_map_content(msgpack_packer *tmp_pck, msgpack_object map, - struct flb_elasticsearch *ctx) + struct flb_elasticsearch_config *ec) { int i; char *ptr_key = NULL; @@ -87,7 +89,7 @@ static inline int es_pack_map_content(msgpack_packer *tmp_pck, * * https://goo.gl/R5NMTr */ - if (ctx->replace_dots == FLB_TRUE) { + if (ec->replace_dots == FLB_TRUE) { char *p = ptr_key; char *end = ptr_key + key_size; while (p != end) { @@ -112,7 +114,7 @@ static inline int es_pack_map_content(msgpack_packer *tmp_pck, */ if (v->type == MSGPACK_OBJECT_MAP) { msgpack_pack_map(tmp_pck, v->via.map.size); - es_pack_map_content(tmp_pck, *v, ctx); + es_pack_map_content(tmp_pck, *v, ec); } else { msgpack_pack_object(tmp_pck, *v); @@ -130,7 +132,7 @@ static inline int es_pack_map_content(msgpack_packer *tmp_pck, */ static char *elasticsearch_format(const void *data, size_t bytes, const char *tag, int tag_len, int *out_size, - struct flb_elasticsearch *ctx) + struct flb_elasticsearch_config *ec) { int ret; int len; @@ -200,9 +202,9 @@ static char *elasticsearch_format(const void *data, size_t bytes, msgpack_unpacked_init(&result); /* Copy logstash prefix if logstash format is enabled */ - if (ctx->logstash_format == FLB_TRUE) { - memcpy(logstash_index, ctx->logstash_prefix, ctx->logstash_prefix_len); - logstash_index[ctx->logstash_prefix_len] = '\0'; + if (ec->logstash_format == FLB_TRUE) { + memcpy(logstash_index, ec->logstash_prefix, ec->logstash_prefix_len); + logstash_index[ec->logstash_prefix_len] = '\0'; } /* @@ -212,17 +214,17 @@ static char *elasticsearch_format(const void *data, size_t bytes, * The header stored in 'j_index' will be used for the all records on * this payload. */ - if (ctx->logstash_format == FLB_FALSE && ctx->generate_id == FLB_FALSE) { + if (ec->logstash_format == FLB_FALSE && ec->generate_id == FLB_FALSE) { flb_time_get(&tms); gmtime_r(&tms.tm.tv_sec, &tm); s = strftime(index_formatted, sizeof(index_formatted) - 1, - ctx->index, &tm); + ec->index, &tm); es_index = index_formatted; index_len = snprintf(j_index, ES_BULK_HEADER, ES_BULK_INDEX_FMT, - es_index, ctx->type); + es_index, ec->type); } /* @@ -231,7 +233,7 @@ static char *elasticsearch_format(const void *data, size_t bytes, * in order to prevent generating millions of indexes * we can set to always use current time for index generation */ - if (ctx->current_time_index == FLB_TRUE) { + if (ec->current_time_index == FLB_TRUE) { flb_time_get(&tms); } @@ -248,7 +250,7 @@ static char *elasticsearch_format(const void *data, size_t bytes, } /* Only pop time from record if current_time_index is disabled */ - if (ctx->current_time_index == FLB_FALSE) { + if (ec->current_time_index == FLB_FALSE) { flb_time_pop_from_msgpack(&tms, &result, &obj); } @@ -263,16 +265,16 @@ static char *elasticsearch_format(const void *data, size_t bytes, map_size = map.via.map.size; es_index_custom_len = 0; - if (ctx->logstash_prefix_key_len != 0) { + if (ec->logstash_prefix_key_len != 0) { for (i = 0; i < map_size; i++) { key = map.via.map.ptr[i].key; if (key.type != MSGPACK_OBJECT_STR) { continue; } - if (key.via.str.size != ctx->logstash_prefix_key_len) { + if (key.via.str.size != ec->logstash_prefix_key_len) { continue; } - if (strncmp(key.via.str.ptr, ctx->logstash_prefix_key, ctx->logstash_prefix_key_len) != 0) { + if (strncmp(key.via.str.ptr, ec->logstash_prefix_key, ec->logstash_prefix_key_len) != 0) { continue; } val = map.via.map.ptr[i].val; @@ -294,7 +296,7 @@ static char *elasticsearch_format(const void *data, size_t bytes, msgpack_sbuffer_init(&tmp_sbuf); msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write); - if (ctx->include_tag_key == FLB_TRUE) { + if (ec->include_tag_key == FLB_TRUE) { map_size++; } @@ -302,13 +304,13 @@ static char *elasticsearch_format(const void *data, size_t bytes, msgpack_pack_map(&tmp_pck, map_size + 1); /* Append the time key */ - msgpack_pack_str(&tmp_pck, ctx->time_key_len); - msgpack_pack_str_body(&tmp_pck, ctx->time_key, ctx->time_key_len); + msgpack_pack_str(&tmp_pck, ec->time_key_len); + msgpack_pack_str_body(&tmp_pck, ec->time_key, ec->time_key_len); /* Format the time */ gmtime_r(&tms.tm.tv_sec, &tm); s = strftime(time_formatted, sizeof(time_formatted) - 1, - ctx->time_key_format, &tm); + ec->time_key_format, &tm); len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s, ".%03" PRIu64 "Z", (uint64_t) tms.tm.tv_nsec); @@ -316,40 +318,40 @@ static char *elasticsearch_format(const void *data, size_t bytes, msgpack_pack_str(&tmp_pck, s); msgpack_pack_str_body(&tmp_pck, time_formatted, s); - es_index = ctx->index; - if (ctx->logstash_format == FLB_TRUE) { + es_index = ec->index; + if (ec->logstash_format == FLB_TRUE) { /* Compose Index header */ if (es_index_custom_len > 0) { p = logstash_index + es_index_custom_len; } else { - p = logstash_index + ctx->logstash_prefix_len; + p = logstash_index + ec->logstash_prefix_len; } *p++ = '-'; len = p - logstash_index; s = strftime(p, sizeof(logstash_index) - len - 1, - ctx->logstash_dateformat, &tm); + ec->logstash_dateformat, &tm); p += s; *p++ = '\0'; es_index = logstash_index; - if (ctx->generate_id == FLB_FALSE) { + if (ec->generate_id == FLB_FALSE) { index_len = snprintf(j_index, ES_BULK_HEADER, ES_BULK_INDEX_FMT, - es_index, ctx->type); + es_index, ec->type); } } - else if (ctx->current_time_index == FLB_TRUE) { + else if (ec->current_time_index == FLB_TRUE) { /* Make sure we handle index time format for index */ strftime(index_formatted, sizeof(index_formatted) - 1, - ctx->index, &tm); + ec->index, &tm); es_index = index_formatted; } /* Tag Key */ - if (ctx->include_tag_key == FLB_TRUE) { - msgpack_pack_str(&tmp_pck, ctx->tag_key_len); - msgpack_pack_str_body(&tmp_pck, ctx->tag_key, ctx->tag_key_len); + if (ec->include_tag_key == FLB_TRUE) { + msgpack_pack_str(&tmp_pck, ec->tag_key_len); + msgpack_pack_str_body(&tmp_pck, ec->tag_key, ec->tag_key_len); msgpack_pack_str(&tmp_pck, tag_len); msgpack_pack_str_body(&tmp_pck, tag, tag_len); } @@ -361,7 +363,7 @@ static char *elasticsearch_format(const void *data, size_t bytes, * Elasticsearch have a restriction that key names cannot contain * a dot; if some dot is found, it's replaced with an underscore. */ - ret = es_pack_map_content(&tmp_pck, map, ctx); + ret = es_pack_map_content(&tmp_pck, map, ec); if (ret == -1) { msgpack_unpacked_destroy(&result); msgpack_sbuffer_destroy(&tmp_sbuf); @@ -369,7 +371,7 @@ static char *elasticsearch_format(const void *data, size_t bytes, return NULL; } - if (ctx->generate_id == FLB_TRUE) { + if (ec->generate_id == FLB_TRUE) { MurmurHash3_x64_128(tmp_sbuf.data, tmp_sbuf.size, 42, hash); snprintf(es_uuid, sizeof(es_uuid), "%04x%04x-%04x-%04x-%04x-%04x%04x%04x", @@ -378,7 +380,7 @@ static char *elasticsearch_format(const void *data, size_t bytes, index_len = snprintf(j_index, ES_BULK_HEADER, ES_BULK_INDEX_FMT_ID, - es_index, ctx->type, es_uuid); + es_index, ec->type, es_uuid); } /* Convert msgpack to JSON */ @@ -412,7 +414,7 @@ static char *elasticsearch_format(const void *data, size_t bytes, * return the bulk->ptr buffer */ flb_free(bulk); - if (ctx->trace_output) { + if (ec->trace_output) { fwrite(buf, 1, *out_size, stdout); fflush(stdout); } @@ -423,20 +425,30 @@ int cb_es_init(struct flb_output_instance *ins, struct flb_config *config, void *data) { + int ret; + const char *tmp; struct flb_elasticsearch *ctx; + (void) data; - ctx = flb_es_conf_create(ins, config); + ctx = flb_calloc(1, sizeof(struct flb_elasticsearch)); if (!ctx) { - flb_error("[out_es] cannot initialize plugin"); + flb_errno(); return -1; } - flb_debug("[out_es] host=%s port=%i uri=%s index=%s type=%s", - ins->host.name, ins->host.port, ctx->uri, - ctx->index, ctx->type); - + mk_list_init(&ctx->configs); flb_output_set_context(ins, ctx); - return 0; + + /* Configure HA or simple mode ? */ + tmp = flb_output_get_property("upstream", ins); + if (tmp) { + ret = es_config_ha(tmp, ctx, config); + } + else { + ret = es_config_simple(ins, ctx, config); + } + + return ret; } static int elasticsearch_error_check(struct flb_http_client *c) @@ -546,46 +558,72 @@ void cb_es_flush(const void *data, size_t bytes, char *pack; size_t b_sent; struct flb_elasticsearch *ctx = out_context; + struct flb_elasticsearch_config *ec = NULL; struct flb_upstream_conn *u_conn; struct flb_http_client *c; + struct flb_upstream_node *node; (void) i_ins; (void) tag; (void) tag_len; + if (ctx->ha_mode == FLB_TRUE) { + node = flb_upstream_ha_node_get(ctx->ha); + if (!node) { + flb_error("[out_es] cannot get an Upstream HA node"); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + /* Get forward_config stored in node opaque data */ + ec = flb_upstream_node_get_data(node); + } + else { + ec = mk_list_entry_first(&ctx->configs, + struct flb_elasticsearch_config, + _head); + } + + flb_debug("[out_es] trying node %s", node->name); + /* Get upstream connection */ - u_conn = flb_upstream_conn_get(ctx->u); + if (ctx->ha_mode == FLB_TRUE) { + u_conn = flb_upstream_conn_get(node->u); + } + else { + u_conn = flb_upstream_conn_get(ctx->u); + } if (!u_conn) { + flb_error("[out_es] no upstream connections available"); FLB_OUTPUT_RETURN(FLB_RETRY); } /* Convert format */ - pack = elasticsearch_format(data, bytes, tag, tag_len, &bytes_out, ctx); + pack = elasticsearch_format(data, bytes, tag, tag_len, &bytes_out, ec); if (!pack) { flb_upstream_conn_release(u_conn); FLB_OUTPUT_RETURN(FLB_ERROR); } /* Compose HTTP Client request */ - c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->uri, + c = flb_http_client(u_conn, FLB_HTTP_POST, ec->uri, pack, bytes_out, NULL, 0, NULL, 0); - flb_http_buffer_size(c, ctx->buffer_size); + flb_http_buffer_size(c, ec->buffer_size); flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); flb_http_add_header(c, "Content-Type", 12, "application/x-ndjson", 20); - if (ctx->http_user && ctx->http_passwd) { - flb_http_basic_auth(c, ctx->http_user, ctx->http_passwd); + if (ec->http_user && ec->http_passwd) { + flb_http_basic_auth(c, ec->http_user, ec->http_passwd); } ret = flb_http_do(c, &b_sent); if (ret != 0) { - flb_warn("[out_es] http_do=%i URI=%s", ret, ctx->uri); + flb_warn("[out_es] http_do=%i URI=%s", ret, ec->uri); goto retry; } else { /* The request was issued successfully, validate the 'error' field */ - flb_debug("[out_es] HTTP Status=%i URI=%s", c->resp.status, ctx->uri); + flb_debug("[out_es] HTTP Status=%i URI=%s", c->resp.status, ec->uri); if (c->resp.status != 200 && c->resp.status != 201) { goto retry; } @@ -598,7 +636,7 @@ void cb_es_flush(const void *data, size_t bytes, ret = elasticsearch_error_check(c); if (ret == FLB_TRUE) { /* we got an error */ - if (ctx->trace_error) { + if (ec->trace_error) { /* * If trace_error is set, trace the actual * input/output to Elasticsearch that caused the problem. @@ -635,8 +673,32 @@ void cb_es_flush(const void *data, size_t bytes, int cb_es_exit(void *data, struct flb_config *config) { struct flb_elasticsearch *ctx = data; + struct flb_elasticsearch_config *ec; + struct mk_list *head; + struct mk_list *tmp; + (void) config; + + if (!ctx) { + return 0; + } + + /* Destroy elasticsearch_config contexts */ + mk_list_foreach_safe(head, tmp, &ctx->configs) { + ec = mk_list_entry(head, struct flb_elasticsearch_config, _head); + mk_list_del(&ec->_head); + flb_es_conf_destroy(ec); + } + + if (ctx->ha_mode == FLB_TRUE) { + if (ctx->ha) { + flb_upstream_ha_destroy(ctx->ha); + } + } + else { + flb_upstream_destroy(ctx->u); + } + flb_free(ctx); - flb_es_conf_destroy(ctx); return 0; } diff --git a/plugins/out_es/es.h b/plugins/out_es/es.h index d1dc66060be..558777ede85 100644 --- a/plugins/out_es/es.h +++ b/plugins/out_es/es.h @@ -31,7 +31,7 @@ #define FLB_ES_DEFAULT_TIME_KEYF "%Y-%m-%dT%H:%M:%S" #define FLB_ES_DEFAULT_TAG_KEY "flb-key" -struct flb_elasticsearch { +struct flb_elasticsearch_config { /* Elasticsearch index (database) and type (table) */ char *index; char *type; @@ -90,8 +90,19 @@ struct flb_elasticsearch { /* Elasticsearch HTTP API */ char uri[256]; - /* Upstream connection to the backend server */ + /* Link to list flb_elasticsearch->configs */ + struct mk_list _head; +}; + +struct flb_elasticsearch { + /* if HA mode is enabled */ + int ha_mode; /* High Availability mode enabled ? */ + char *ha_upstream; /* Upstream configuration file */ + struct flb_upstream_ha *ha; + + /* Upstream handler and config context for single mode (no HA) */ struct flb_upstream *u; + struct mk_list configs; }; #endif diff --git a/plugins/out_es/es_conf.c b/plugins/out_es/es_conf.c index b1c4d05627d..e3e880d78ec 100644 --- a/plugins/out_es/es_conf.c +++ b/plugins/out_es/es_conf.c @@ -22,12 +22,278 @@ #include #include #include +#include +#include #include "es.h" #include "es_conf.h" -struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, - struct flb_config *config) +/* Configure in HA mode */ +int es_config_ha(const char *upstream_file, + struct flb_elasticsearch *ctx, + struct flb_config *config) +{ + int io_flags = 0; + ssize_t ret; + const char *tmp; + const char *path; + struct mk_list *head; + struct flb_uri_field *f_index = NULL; + struct flb_uri_field *f_type = NULL; + struct flb_upstream_node *node; + struct flb_elasticsearch_config *ec = NULL; + + /* Allocate context */ + ec = flb_calloc(1, sizeof(struct flb_elasticsearch_config)); + if (!ec) { + flb_errno(); + return -1; + } + + ctx->ha_mode = FLB_TRUE; + ctx->ha = flb_upstream_ha_from_file(upstream_file, config); + if (!ctx->ha) { + flb_error("[out_es] cannot load Upstream file"); + return -1; + } + + /* Iterate nodes and create a forward_config context */ + mk_list_foreach(head, &ctx->ha->nodes) { + node = mk_list_entry(head, struct flb_upstream_node, _head); + + /* Is TLS enabled ? */ + if (node->tls_enabled == FLB_TRUE) { + io_flags = FLB_IO_TLS; + } + else { + io_flags = FLB_IO_TCP; + } + + if (f_index) { + ec->index = flb_strdup(f_index->value); + } + else { + tmp = flb_upstream_node_get_property("index", node); + if (!tmp) { + ec->index = flb_strdup(FLB_ES_DEFAULT_INDEX); + } + else { + ec->index = flb_strdup(tmp); + } + } + + if (f_type) { + ec->type = flb_strdup(f_type->value); + } + else { + tmp = flb_upstream_node_get_property("type", node); + if (!tmp) { + ec->type = flb_strdup(FLB_ES_DEFAULT_TYPE); + } + else { + ec->type = flb_strdup(tmp); + } + } + + /* HTTP Auth */ + tmp = flb_upstream_node_get_property("http_user", node); + if (tmp) { + ec->http_user = flb_strdup(tmp); + + tmp = flb_upstream_node_get_property("http_passwd", node); + if (tmp) { + ec->http_passwd = flb_strdup(tmp); + } + else { + ec->http_passwd = flb_strdup(""); + } + } + + /* + * Logstash compatibility options + * ============================== + */ + + /* Logstash_Format */ + tmp = flb_upstream_node_get_property("logstash_format", node); + if (tmp) { + ec->logstash_format = flb_utils_bool(tmp); + } + else { + ec->logstash_format = FLB_FALSE; + } + + /* Logstash_Prefix */ + tmp = flb_upstream_node_get_property("logstash_prefix", node); + if (tmp) { + ec->logstash_prefix = flb_strdup(tmp); + ec->logstash_prefix_len = strlen(tmp); + } + else if (ec->logstash_format == FLB_TRUE) { + ec->logstash_prefix = flb_strdup(FLB_ES_DEFAULT_PREFIX); + ec->logstash_prefix_len = sizeof(FLB_ES_DEFAULT_PREFIX) - 1; + } + + /* Logstash_Prefix_Key */ + tmp = flb_upstream_node_get_property("logstash_prefix_key", node); + if (tmp) { + ec->logstash_prefix_key = flb_strdup(tmp); + ec->logstash_prefix_key_len = strlen(tmp); + } + + /* Logstash_DateFormat */ + tmp = flb_upstream_node_get_property("logstash_dateformat", node); + if (tmp) { + ec->logstash_dateformat = flb_strdup(tmp); + ec->logstash_dateformat_len = strlen(tmp); + } + else if (ec->logstash_format == FLB_TRUE) { + ec->logstash_dateformat = flb_strdup(FLB_ES_DEFAULT_TIME_FMT); + ec->logstash_dateformat_len = sizeof(FLB_ES_DEFAULT_TIME_FMT) - 1; + } + + /* Time Key */ + tmp = flb_upstream_node_get_property("time_key", node); + if (tmp) { + ec->time_key = flb_strdup(tmp); + ec->time_key_len = strlen(tmp); + } + else { + ec->time_key = flb_strdup(FLB_ES_DEFAULT_TIME_KEY); + ec->time_key_len = sizeof(FLB_ES_DEFAULT_TIME_KEY) - 1; + } + + /* Time Key Format */ + tmp = flb_upstream_node_get_property("time_key_format", node); + if (tmp) { + ec->time_key_format = flb_strdup(tmp); + ec->time_key_format_len = strlen(tmp); + } + else { + ec->time_key_format = flb_strdup(FLB_ES_DEFAULT_TIME_KEYF); + ec->time_key_format_len = sizeof(FLB_ES_DEFAULT_TIME_KEYF) - 1; + } + + /* Include Tag key */ + tmp = flb_upstream_node_get_property("include_tag_key", node); + if (tmp) { + ec->include_tag_key = flb_utils_bool(tmp); + } + else { + ec->include_tag_key = FLB_FALSE; + } + + /* Tag Key */ + if (ec->include_tag_key == FLB_TRUE) { + tmp = flb_upstream_node_get_property("tag_key", node); + if (tmp) { + ec->tag_key = flb_strdup(tmp); + ec->tag_key_len = strlen(tmp); + } + else { + ec->tag_key = flb_strdup(FLB_ES_DEFAULT_TAG_KEY); + ec->tag_key_len = sizeof(FLB_ES_DEFAULT_TAG_KEY) - 1; + } + } + + ec->buffer_size = FLB_HTTP_DATA_SIZE_MAX; + tmp = flb_upstream_node_get_property("buffer_size", node); + if (tmp) { + if (*tmp == 'f' || *tmp == 'F' || *tmp == 'o' || *tmp == 'O') { + /* unlimited size ? */ + if (flb_utils_bool(tmp) == FLB_FALSE) { + ec->buffer_size = 0; + } + } + else { + ret = flb_utils_size_to_bytes(tmp); + if (ret == -1) { + flb_error("[out_es] invalid buffer_size=%s, using default", tmp); + } + else { + ec->buffer_size = (size_t) ret; + } + } + } + + /* Elasticsearch: Path */ + path = flb_upstream_node_get_property("path", node); + if (!path) { + path = ""; + } + + /* Elasticsearch: Pipeline */ + tmp = flb_upstream_node_get_property("pipeline", node); + if (tmp) { + snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk/?pipeline=%s", path, tmp); + } + else { + snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk", path); + } + + /* Generate _id */ + tmp = flb_upstream_node_get_property("generate_id", node); + if (tmp) { + ec->generate_id = flb_utils_bool(tmp); + } else { + ec->generate_id = FLB_FALSE; + } + + /* Replace dots */ + tmp = flb_upstream_node_get_property("replace_dots", node); + if (tmp) { + ec->replace_dots = flb_utils_bool(tmp); + } + else { + ec->replace_dots = FLB_FALSE; + } + + /* Use current time for index generation instead of message record */ + tmp = flb_upstream_node_get_property("current_time_index", node); + if (tmp) { + ec->current_time_index = flb_utils_bool(tmp); + } + else { + ec->current_time_index = FLB_FALSE; + } + + + /* Trace output */ + tmp = flb_upstream_node_get_property("Trace_Output", node); + if (tmp) { + ec->trace_output = flb_utils_bool(tmp); + } + else { + ec->trace_output = FLB_FALSE; + } + tmp = flb_upstream_node_get_property("Trace_Error", node); + if (tmp) { + ec->trace_error = flb_utils_bool(tmp); + } + else { + ec->trace_error = FLB_FALSE; + } + + /* Initialize and validate forward_config context */ + mk_list_add(&ec->_head, &ctx->configs); + + if (ret == -1) { + if (ec) { + flb_es_conf_destroy(ec); + } + return -1; + } + + /* Set our elasticsearch_config context into the node */ + flb_upstream_node_set_data(ec, node); + } + + return 0; +} + +int es_config_simple(struct flb_output_instance *ins, + struct flb_elasticsearch *ctx, + struct flb_config *config) { int io_flags = 0; @@ -38,13 +304,12 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, struct flb_uri_field *f_index = NULL; struct flb_uri_field *f_type = NULL; struct flb_upstream *upstream; - struct flb_elasticsearch *ctx; + struct flb_elasticsearch_config *ec = NULL; /* Allocate context */ - ctx = flb_calloc(1, sizeof(struct flb_elasticsearch)); - if (!ctx) { - flb_errno(); - return NULL; + ec = flb_calloc(1, sizeof(struct flb_elasticsearch_config)); + if (!ec) { + return -1; } if (uri) { @@ -77,49 +342,51 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, &ins->tls); if (!upstream) { flb_error("[out_es] cannot create Upstream context"); - flb_es_conf_destroy(ctx); - return NULL; + + flb_es_conf_destroy(ec); + flb_free(ctx); + return -1; } /* Set manual Index and Type */ ctx->u = upstream; if (f_index) { - ctx->index = flb_strdup(f_index->value); + ec->index = flb_strdup(f_index->value); } else { tmp = flb_output_get_property("index", ins); if (!tmp) { - ctx->index = flb_strdup(FLB_ES_DEFAULT_INDEX); + ec->index = flb_strdup(FLB_ES_DEFAULT_INDEX); } else { - ctx->index = flb_strdup(tmp); + ec->index = flb_strdup(tmp); } } if (f_type) { - ctx->type = flb_strdup(f_type->value); + ec->type = flb_strdup(f_type->value); } else { tmp = flb_output_get_property("type", ins); if (!tmp) { - ctx->type = flb_strdup(FLB_ES_DEFAULT_TYPE); + ec->type = flb_strdup(FLB_ES_DEFAULT_TYPE); } else { - ctx->type = flb_strdup(tmp); + ec->type = flb_strdup(tmp); } } /* HTTP Auth */ tmp = flb_output_get_property("http_user", ins); if (tmp) { - ctx->http_user = flb_strdup(tmp); + ec->http_user = flb_strdup(tmp); tmp = flb_output_get_property("http_passwd", ins); if (tmp) { - ctx->http_passwd = flb_strdup(tmp); + ec->http_passwd = flb_strdup(tmp); } else { - ctx->http_passwd = flb_strdup(""); + ec->http_passwd = flb_strdup(""); } } @@ -131,92 +398,92 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, /* Logstash_Format */ tmp = flb_output_get_property("logstash_format", ins); if (tmp) { - ctx->logstash_format = flb_utils_bool(tmp); + ec->logstash_format = flb_utils_bool(tmp); } else { - ctx->logstash_format = FLB_FALSE; + ec->logstash_format = FLB_FALSE; } /* Logstash_Prefix */ tmp = flb_output_get_property("logstash_prefix", ins); if (tmp) { - ctx->logstash_prefix = flb_strdup(tmp); - ctx->logstash_prefix_len = strlen(tmp); + ec->logstash_prefix = flb_strdup(tmp); + ec->logstash_prefix_len = strlen(tmp); } - else if (ctx->logstash_format == FLB_TRUE) { - ctx->logstash_prefix = flb_strdup(FLB_ES_DEFAULT_PREFIX); - ctx->logstash_prefix_len = sizeof(FLB_ES_DEFAULT_PREFIX) - 1; + else if (ec->logstash_format == FLB_TRUE) { + ec->logstash_prefix = flb_strdup(FLB_ES_DEFAULT_PREFIX); + ec->logstash_prefix_len = sizeof(FLB_ES_DEFAULT_PREFIX) - 1; } /* Logstash_Prefix_Key */ tmp = flb_output_get_property("logstash_prefix_key", ins); if (tmp) { - ctx->logstash_prefix_key = flb_strdup(tmp); - ctx->logstash_prefix_key_len = strlen(tmp); + ec->logstash_prefix_key = flb_strdup(tmp); + ec->logstash_prefix_key_len = strlen(tmp); } /* Logstash_DateFormat */ tmp = flb_output_get_property("logstash_dateformat", ins); if (tmp) { - ctx->logstash_dateformat = flb_strdup(tmp); - ctx->logstash_dateformat_len = strlen(tmp); + ec->logstash_dateformat = flb_strdup(tmp); + ec->logstash_dateformat_len = strlen(tmp); } - else if (ctx->logstash_format == FLB_TRUE) { - ctx->logstash_dateformat = flb_strdup(FLB_ES_DEFAULT_TIME_FMT); - ctx->logstash_dateformat_len = sizeof(FLB_ES_DEFAULT_TIME_FMT) - 1; + else if (ec->logstash_format == FLB_TRUE) { + ec->logstash_dateformat = flb_strdup(FLB_ES_DEFAULT_TIME_FMT); + ec->logstash_dateformat_len = sizeof(FLB_ES_DEFAULT_TIME_FMT) - 1; } /* Time Key */ tmp = flb_output_get_property("time_key", ins); if (tmp) { - ctx->time_key = flb_strdup(tmp); - ctx->time_key_len = strlen(tmp); + ec->time_key = flb_strdup(tmp); + ec->time_key_len = strlen(tmp); } else { - ctx->time_key = flb_strdup(FLB_ES_DEFAULT_TIME_KEY); - ctx->time_key_len = sizeof(FLB_ES_DEFAULT_TIME_KEY) - 1; + ec->time_key = flb_strdup(FLB_ES_DEFAULT_TIME_KEY); + ec->time_key_len = sizeof(FLB_ES_DEFAULT_TIME_KEY) - 1; } /* Time Key Format */ tmp = flb_output_get_property("time_key_format", ins); if (tmp) { - ctx->time_key_format = flb_strdup(tmp); - ctx->time_key_format_len = strlen(tmp); + ec->time_key_format = flb_strdup(tmp); + ec->time_key_format_len = strlen(tmp); } else { - ctx->time_key_format = flb_strdup(FLB_ES_DEFAULT_TIME_KEYF); - ctx->time_key_format_len = sizeof(FLB_ES_DEFAULT_TIME_KEYF) - 1; + ec->time_key_format = flb_strdup(FLB_ES_DEFAULT_TIME_KEYF); + ec->time_key_format_len = sizeof(FLB_ES_DEFAULT_TIME_KEYF) - 1; } /* Include Tag key */ tmp = flb_output_get_property("include_tag_key", ins); if (tmp) { - ctx->include_tag_key = flb_utils_bool(tmp); + ec->include_tag_key = flb_utils_bool(tmp); } else { - ctx->include_tag_key = FLB_FALSE; + ec->include_tag_key = FLB_FALSE; } /* Tag Key */ - if (ctx->include_tag_key == FLB_TRUE) { + if (ec->include_tag_key == FLB_TRUE) { tmp = flb_output_get_property("tag_key", ins); if (tmp) { - ctx->tag_key = flb_strdup(tmp); - ctx->tag_key_len = strlen(tmp); + ec->tag_key = flb_strdup(tmp); + ec->tag_key_len = strlen(tmp); } else { - ctx->tag_key = flb_strdup(FLB_ES_DEFAULT_TAG_KEY); - ctx->tag_key_len = sizeof(FLB_ES_DEFAULT_TAG_KEY) - 1; + ec->tag_key = flb_strdup(FLB_ES_DEFAULT_TAG_KEY); + ec->tag_key_len = sizeof(FLB_ES_DEFAULT_TAG_KEY) - 1; } } - ctx->buffer_size = FLB_HTTP_DATA_SIZE_MAX; + ec->buffer_size = FLB_HTTP_DATA_SIZE_MAX; tmp = flb_output_get_property("buffer_size", ins); if (tmp) { if (*tmp == 'f' || *tmp == 'F' || *tmp == 'o' || *tmp == 'O') { /* unlimited size ? */ if (flb_utils_bool(tmp) == FLB_FALSE) { - ctx->buffer_size = 0; + ec->buffer_size = 0; } } else { @@ -225,7 +492,7 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, flb_error("[out_es] invalid buffer_size=%s, using default", tmp); } else { - ctx->buffer_size = (size_t) ret; + ec->buffer_size = (size_t) ret; } } } @@ -239,81 +506,87 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, /* Elasticsearch: Pipeline */ tmp = flb_output_get_property("pipeline", ins); if (tmp) { - snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk/?pipeline=%s", path, tmp); + snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk/?pipeline=%s", path, tmp); } else { - snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk", path); + snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk", path); } /* Generate _id */ tmp = flb_output_get_property("generate_id", ins); if (tmp) { - ctx->generate_id = flb_utils_bool(tmp); + ec->generate_id = flb_utils_bool(tmp); } else { - ctx->generate_id = FLB_FALSE; + ec->generate_id = FLB_FALSE; } /* Replace dots */ tmp = flb_output_get_property("replace_dots", ins); if (tmp) { - ctx->replace_dots = flb_utils_bool(tmp); + ec->replace_dots = flb_utils_bool(tmp); } else { - ctx->replace_dots = FLB_FALSE; + ec->replace_dots = FLB_FALSE; } /* Use current time for index generation instead of message record */ tmp = flb_output_get_property("current_time_index", ins); if (tmp) { - ctx->current_time_index = flb_utils_bool(tmp); + ec->current_time_index = flb_utils_bool(tmp); } else { - ctx->current_time_index = FLB_FALSE; + ec->current_time_index = FLB_FALSE; } /* Trace output */ tmp = flb_output_get_property("Trace_Output", ins); if (tmp) { - ctx->trace_output = flb_utils_bool(tmp); + ec->trace_output = flb_utils_bool(tmp); } else { - ctx->trace_output = FLB_FALSE; + ec->trace_output = FLB_FALSE; } tmp = flb_output_get_property("Trace_Error", ins); if (tmp) { - ctx->trace_error = flb_utils_bool(tmp); + ec->trace_error = flb_utils_bool(tmp); } else { - ctx->trace_error = FLB_FALSE; + ec->trace_error = FLB_FALSE; } - return ctx; + mk_list_add(&ec->_head, &ctx->configs); + + flb_debug("[out_es] host=%s port=%i uri=%s index=%s type=%s", + ins->host.name, ins->host.port, ec->uri, + ec->index, ec->type); + + return 0; } -int flb_es_conf_destroy(struct flb_elasticsearch *ctx) +int flb_es_conf_destroy(struct flb_elasticsearch_config *ec) { - flb_free(ctx->index); - flb_free(ctx->type); + flb_free(ec->index); + flb_free(ec->type); - flb_free(ctx->http_user); - flb_free(ctx->http_passwd); + flb_free(ec->http_user); + flb_free(ec->http_passwd); - flb_free(ctx->logstash_prefix); - flb_free(ctx->logstash_dateformat); - flb_free(ctx->time_key); - flb_free(ctx->time_key_format); + flb_free(ec->logstash_prefix); + flb_free(ec->logstash_dateformat); + flb_free(ec->time_key); + flb_free(ec->time_key_format); - if (ctx->include_tag_key) { - flb_free(ctx->tag_key); + if (ec->include_tag_key) { + flb_free(ec->tag_key); } - if (ctx->logstash_prefix_key) { - flb_free(ctx->logstash_prefix_key); + if (ec->logstash_prefix_key) { + flb_free(ec->logstash_prefix_key); } - flb_upstream_destroy(ctx->u); - flb_free(ctx); + //flb_upstream_destroy(ec->u); + flb_free(ec); return 0; } diff --git a/plugins/out_es/es_conf.h b/plugins/out_es/es_conf.h index ff9f93ae1f0..380fa2ba19b 100644 --- a/plugins/out_es/es_conf.h +++ b/plugins/out_es/es_conf.h @@ -27,8 +27,15 @@ #include "es.h" -struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, - struct flb_config *config); -int flb_es_conf_destroy(struct flb_elasticsearch *ctx); + +int es_config_ha(const char *upstream_file, + struct flb_elasticsearch *ctx, + struct flb_config *config); + +int es_config_simple(struct flb_output_instance *ins, + struct flb_elasticsearch *ctx, + struct flb_config *config); + +int flb_es_conf_destroy(struct flb_elasticsearch_config *ec); #endif