From b08b568a13d24000f5ca3aaec4b1a35573807a44 Mon Sep 17 00:00:00 2001 From: Clara Pohland <54847419+claralui@users.noreply.github.com> Date: Mon, 9 Sep 2019 11:15:27 +0200 Subject: [PATCH 1/4] es_out: support multiple output nodes in round-robin by using upstream servers configuration Signed-off-by: Clara Pohland <54847419+claralui@users.noreply.github.com> --- plugins/out_es/es.c | 164 ++++++++++----- plugins/out_es/es.h | 15 +- plugins/out_es/es_conf.c | 429 ++++++++++++++++++++++++++++++++------- plugins/out_es/es_conf.h | 13 +- 4 files changed, 487 insertions(+), 134 deletions(-) diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index 1d45f845366..34347509e54 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -23,6 +23,8 @@ #include #include #include +#include +#include #include #include #include @@ -38,7 +40,7 @@ struct flb_output_plugin out_es_plugin; static inline int es_pack_map_content(msgpack_packer *tmp_pck, msgpack_object map, - struct flb_elasticsearch *ctx) + struct flb_elasticsearch_config *ec) { int i; char *ptr_key = NULL; @@ -87,7 +89,7 @@ static inline int es_pack_map_content(msgpack_packer *tmp_pck, * * https://goo.gl/R5NMTr */ - if (ctx->replace_dots == FLB_TRUE) { + if (ec->replace_dots == FLB_TRUE) { char *p = ptr_key; char *end = ptr_key + key_size; while (p != end) { @@ -112,7 +114,7 @@ static inline int es_pack_map_content(msgpack_packer *tmp_pck, */ if (v->type == MSGPACK_OBJECT_MAP) { msgpack_pack_map(tmp_pck, v->via.map.size); - es_pack_map_content(tmp_pck, *v, ctx); + es_pack_map_content(tmp_pck, *v, ec); } else { msgpack_pack_object(tmp_pck, *v); @@ -130,7 +132,7 @@ static inline int es_pack_map_content(msgpack_packer *tmp_pck, */ static char *elasticsearch_format(const void *data, size_t bytes, const char *tag, int tag_len, int *out_size, - struct flb_elasticsearch *ctx) + struct flb_elasticsearch_config *ec) { int ret; int len; @@ -200,9 +202,9 @@ static char *elasticsearch_format(const void *data, size_t bytes, msgpack_unpacked_init(&result); /* Copy logstash prefix if logstash format is enabled */ - if (ctx->logstash_format == FLB_TRUE) { - memcpy(logstash_index, ctx->logstash_prefix, ctx->logstash_prefix_len); - logstash_index[ctx->logstash_prefix_len] = '\0'; + if (ec->logstash_format == FLB_TRUE) { + memcpy(logstash_index, ec->logstash_prefix, ec->logstash_prefix_len); + logstash_index[ec->logstash_prefix_len] = '\0'; } /* @@ -212,17 +214,17 @@ static char *elasticsearch_format(const void *data, size_t bytes, * The header stored in 'j_index' will be used for the all records on * this payload. */ - if (ctx->logstash_format == FLB_FALSE && ctx->generate_id == FLB_FALSE) { + if (ec->logstash_format == FLB_FALSE && ec->generate_id == FLB_FALSE) { flb_time_get(&tms); gmtime_r(&tms.tm.tv_sec, &tm); s = strftime(index_formatted, sizeof(index_formatted) - 1, - ctx->index, &tm); + ec->index, &tm); es_index = index_formatted; index_len = snprintf(j_index, ES_BULK_HEADER, ES_BULK_INDEX_FMT, - es_index, ctx->type); + es_index, ec->type); } /* @@ -231,7 +233,7 @@ static char *elasticsearch_format(const void *data, size_t bytes, * in order to prevent generating millions of indexes * we can set to always use current time for index generation */ - if (ctx->current_time_index == FLB_TRUE) { + if (ec->current_time_index == FLB_TRUE) { flb_time_get(&tms); } @@ -248,7 +250,7 @@ static char *elasticsearch_format(const void *data, size_t bytes, } /* Only pop time from record if current_time_index is disabled */ - if (ctx->current_time_index == FLB_FALSE) { + if (ec->current_time_index == FLB_FALSE) { flb_time_pop_from_msgpack(&tms, &result, &obj); } @@ -263,16 +265,16 @@ static char *elasticsearch_format(const void *data, size_t bytes, map_size = map.via.map.size; es_index_custom_len = 0; - if (ctx->logstash_prefix_key_len != 0) { + if (ec->logstash_prefix_key_len != 0) { for (i = 0; i < map_size; i++) { key = map.via.map.ptr[i].key; if (key.type != MSGPACK_OBJECT_STR) { continue; } - if (key.via.str.size != ctx->logstash_prefix_key_len) { + if (key.via.str.size != ec->logstash_prefix_key_len) { continue; } - if (strncmp(key.via.str.ptr, ctx->logstash_prefix_key, ctx->logstash_prefix_key_len) != 0) { + if (strncmp(key.via.str.ptr, ec->logstash_prefix_key, ec->logstash_prefix_key_len) != 0) { continue; } val = map.via.map.ptr[i].val; @@ -294,7 +296,7 @@ static char *elasticsearch_format(const void *data, size_t bytes, msgpack_sbuffer_init(&tmp_sbuf); msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write); - if (ctx->include_tag_key == FLB_TRUE) { + if (ec->include_tag_key == FLB_TRUE) { map_size++; } @@ -302,13 +304,13 @@ static char *elasticsearch_format(const void *data, size_t bytes, msgpack_pack_map(&tmp_pck, map_size + 1); /* Append the time key */ - msgpack_pack_str(&tmp_pck, ctx->time_key_len); - msgpack_pack_str_body(&tmp_pck, ctx->time_key, ctx->time_key_len); + msgpack_pack_str(&tmp_pck, ec->time_key_len); + msgpack_pack_str_body(&tmp_pck, ec->time_key, ec->time_key_len); /* Format the time */ gmtime_r(&tms.tm.tv_sec, &tm); s = strftime(time_formatted, sizeof(time_formatted) - 1, - ctx->time_key_format, &tm); + ec->time_key_format, &tm); len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s, ".%03" PRIu64 "Z", (uint64_t) tms.tm.tv_nsec); @@ -316,40 +318,40 @@ static char *elasticsearch_format(const void *data, size_t bytes, msgpack_pack_str(&tmp_pck, s); msgpack_pack_str_body(&tmp_pck, time_formatted, s); - es_index = ctx->index; - if (ctx->logstash_format == FLB_TRUE) { + es_index = ec->index; + if (ec->logstash_format == FLB_TRUE) { /* Compose Index header */ if (es_index_custom_len > 0) { p = logstash_index + es_index_custom_len; } else { - p = logstash_index + ctx->logstash_prefix_len; + p = logstash_index + ec->logstash_prefix_len; } *p++ = '-'; len = p - logstash_index; s = strftime(p, sizeof(logstash_index) - len - 1, - ctx->logstash_dateformat, &tm); + ec->logstash_dateformat, &tm); p += s; *p++ = '\0'; es_index = logstash_index; - if (ctx->generate_id == FLB_FALSE) { + if (ec->generate_id == FLB_FALSE) { index_len = snprintf(j_index, ES_BULK_HEADER, ES_BULK_INDEX_FMT, - es_index, ctx->type); + es_index, ec->type); } } - else if (ctx->current_time_index == FLB_TRUE) { + else if (ec->current_time_index == FLB_TRUE) { /* Make sure we handle index time format for index */ strftime(index_formatted, sizeof(index_formatted) - 1, - ctx->index, &tm); + ec->index, &tm); es_index = index_formatted; } /* Tag Key */ - if (ctx->include_tag_key == FLB_TRUE) { - msgpack_pack_str(&tmp_pck, ctx->tag_key_len); - msgpack_pack_str_body(&tmp_pck, ctx->tag_key, ctx->tag_key_len); + if (ec->include_tag_key == FLB_TRUE) { + msgpack_pack_str(&tmp_pck, ec->tag_key_len); + msgpack_pack_str_body(&tmp_pck, ec->tag_key, ec->tag_key_len); msgpack_pack_str(&tmp_pck, tag_len); msgpack_pack_str_body(&tmp_pck, tag, tag_len); } @@ -361,7 +363,7 @@ static char *elasticsearch_format(const void *data, size_t bytes, * Elasticsearch have a restriction that key names cannot contain * a dot; if some dot is found, it's replaced with an underscore. */ - ret = es_pack_map_content(&tmp_pck, map, ctx); + ret = es_pack_map_content(&tmp_pck, map, ec); if (ret == -1) { msgpack_unpacked_destroy(&result); msgpack_sbuffer_destroy(&tmp_sbuf); @@ -369,7 +371,7 @@ static char *elasticsearch_format(const void *data, size_t bytes, return NULL; } - if (ctx->generate_id == FLB_TRUE) { + if (ec->generate_id == FLB_TRUE) { MurmurHash3_x64_128(tmp_sbuf.data, tmp_sbuf.size, 42, hash); snprintf(es_uuid, sizeof(es_uuid), "%04x%04x-%04x-%04x-%04x-%04x%04x%04x", @@ -378,7 +380,7 @@ static char *elasticsearch_format(const void *data, size_t bytes, index_len = snprintf(j_index, ES_BULK_HEADER, ES_BULK_INDEX_FMT_ID, - es_index, ctx->type, es_uuid); + es_index, ec->type, es_uuid); } /* Convert msgpack to JSON */ @@ -412,7 +414,7 @@ static char *elasticsearch_format(const void *data, size_t bytes, * return the bulk->ptr buffer */ flb_free(bulk); - if (ctx->trace_output) { + if (ec->trace_output) { fwrite(buf, 1, *out_size, stdout); fflush(stdout); } @@ -423,20 +425,30 @@ int cb_es_init(struct flb_output_instance *ins, struct flb_config *config, void *data) { + int ret; + const char *tmp; struct flb_elasticsearch *ctx; + (void) data; - ctx = flb_es_conf_create(ins, config); + ctx = flb_calloc(1, sizeof(struct flb_elasticsearch)); if (!ctx) { - flb_error("[out_es] cannot initialize plugin"); + flb_errno(); return -1; } - flb_debug("[out_es] host=%s port=%i uri=%s index=%s type=%s", - ins->host.name, ins->host.port, ctx->uri, - ctx->index, ctx->type); - + mk_list_init(&ctx->configs); flb_output_set_context(ins, ctx); - return 0; + + /* Configure HA or simple mode ? */ + tmp = flb_output_get_property("upstream", ins); + if (tmp) { + ret = es_config_ha(tmp, ctx, config); + } + else { + ret = es_config_simple(ins, ctx, config); + } + + return ret; } static int elasticsearch_error_check(struct flb_http_client *c) @@ -546,46 +558,72 @@ void cb_es_flush(const void *data, size_t bytes, char *pack; size_t b_sent; struct flb_elasticsearch *ctx = out_context; + struct flb_elasticsearch_config *ec = NULL; struct flb_upstream_conn *u_conn; struct flb_http_client *c; + struct flb_upstream_node *node; (void) i_ins; (void) tag; (void) tag_len; + if (ctx->ha_mode == FLB_TRUE) { + node = flb_upstream_ha_node_get(ctx->ha); + if (!node) { + flb_error("[out_es] cannot get an Upstream HA node"); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + /* Get forward_config stored in node opaque data */ + ec = flb_upstream_node_get_data(node); + } + else { + ec = mk_list_entry_first(&ctx->configs, + struct flb_elasticsearch_config, + _head); + } + + flb_debug("[out_es] trying node %s", node->name); + /* Get upstream connection */ - u_conn = flb_upstream_conn_get(ctx->u); + if (ctx->ha_mode == FLB_TRUE) { + u_conn = flb_upstream_conn_get(node->u); + } + else { + u_conn = flb_upstream_conn_get(ctx->u); + } if (!u_conn) { + flb_error("[out_es] no upstream connections available"); FLB_OUTPUT_RETURN(FLB_RETRY); } /* Convert format */ - pack = elasticsearch_format(data, bytes, tag, tag_len, &bytes_out, ctx); + pack = elasticsearch_format(data, bytes, tag, tag_len, &bytes_out, ec); if (!pack) { flb_upstream_conn_release(u_conn); FLB_OUTPUT_RETURN(FLB_ERROR); } /* Compose HTTP Client request */ - c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->uri, + c = flb_http_client(u_conn, FLB_HTTP_POST, ec->uri, pack, bytes_out, NULL, 0, NULL, 0); - flb_http_buffer_size(c, ctx->buffer_size); + flb_http_buffer_size(c, ec->buffer_size); flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); flb_http_add_header(c, "Content-Type", 12, "application/x-ndjson", 20); - if (ctx->http_user && ctx->http_passwd) { - flb_http_basic_auth(c, ctx->http_user, ctx->http_passwd); + if (ec->http_user && ec->http_passwd) { + flb_http_basic_auth(c, ec->http_user, ec->http_passwd); } ret = flb_http_do(c, &b_sent); if (ret != 0) { - flb_warn("[out_es] http_do=%i URI=%s", ret, ctx->uri); + flb_warn("[out_es] http_do=%i URI=%s", ret, ec->uri); goto retry; } else { /* The request was issued successfully, validate the 'error' field */ - flb_debug("[out_es] HTTP Status=%i URI=%s", c->resp.status, ctx->uri); + flb_debug("[out_es] HTTP Status=%i URI=%s", c->resp.status, ec->uri); if (c->resp.status != 200 && c->resp.status != 201) { goto retry; } @@ -598,7 +636,7 @@ void cb_es_flush(const void *data, size_t bytes, ret = elasticsearch_error_check(c); if (ret == FLB_TRUE) { /* we got an error */ - if (ctx->trace_error) { + if (ec->trace_error) { /* * If trace_error is set, trace the actual * input/output to Elasticsearch that caused the problem. @@ -635,8 +673,32 @@ void cb_es_flush(const void *data, size_t bytes, int cb_es_exit(void *data, struct flb_config *config) { struct flb_elasticsearch *ctx = data; + struct flb_elasticsearch_config *ec; + struct mk_list *head; + struct mk_list *tmp; + (void) config; + + if (!ctx) { + return 0; + } + + /* Destroy elasticsearch_config contexts */ + mk_list_foreach_safe(head, tmp, &ctx->configs) { + ec = mk_list_entry(head, struct flb_elasticsearch_config, _head); + mk_list_del(&ec->_head); + flb_es_conf_destroy(ec); + } + + if (ctx->ha_mode == FLB_TRUE) { + if (ctx->ha) { + flb_upstream_ha_destroy(ctx->ha); + } + } + else { + flb_upstream_destroy(ctx->u); + } + flb_free(ctx); - flb_es_conf_destroy(ctx); return 0; } diff --git a/plugins/out_es/es.h b/plugins/out_es/es.h index d1dc66060be..558777ede85 100644 --- a/plugins/out_es/es.h +++ b/plugins/out_es/es.h @@ -31,7 +31,7 @@ #define FLB_ES_DEFAULT_TIME_KEYF "%Y-%m-%dT%H:%M:%S" #define FLB_ES_DEFAULT_TAG_KEY "flb-key" -struct flb_elasticsearch { +struct flb_elasticsearch_config { /* Elasticsearch index (database) and type (table) */ char *index; char *type; @@ -90,8 +90,19 @@ struct flb_elasticsearch { /* Elasticsearch HTTP API */ char uri[256]; - /* Upstream connection to the backend server */ + /* Link to list flb_elasticsearch->configs */ + struct mk_list _head; +}; + +struct flb_elasticsearch { + /* if HA mode is enabled */ + int ha_mode; /* High Availability mode enabled ? */ + char *ha_upstream; /* Upstream configuration file */ + struct flb_upstream_ha *ha; + + /* Upstream handler and config context for single mode (no HA) */ struct flb_upstream *u; + struct mk_list configs; }; #endif diff --git a/plugins/out_es/es_conf.c b/plugins/out_es/es_conf.c index b1c4d05627d..e3e880d78ec 100644 --- a/plugins/out_es/es_conf.c +++ b/plugins/out_es/es_conf.c @@ -22,12 +22,278 @@ #include #include #include +#include +#include #include "es.h" #include "es_conf.h" -struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, - struct flb_config *config) +/* Configure in HA mode */ +int es_config_ha(const char *upstream_file, + struct flb_elasticsearch *ctx, + struct flb_config *config) +{ + int io_flags = 0; + ssize_t ret; + const char *tmp; + const char *path; + struct mk_list *head; + struct flb_uri_field *f_index = NULL; + struct flb_uri_field *f_type = NULL; + struct flb_upstream_node *node; + struct flb_elasticsearch_config *ec = NULL; + + /* Allocate context */ + ec = flb_calloc(1, sizeof(struct flb_elasticsearch_config)); + if (!ec) { + flb_errno(); + return -1; + } + + ctx->ha_mode = FLB_TRUE; + ctx->ha = flb_upstream_ha_from_file(upstream_file, config); + if (!ctx->ha) { + flb_error("[out_es] cannot load Upstream file"); + return -1; + } + + /* Iterate nodes and create a forward_config context */ + mk_list_foreach(head, &ctx->ha->nodes) { + node = mk_list_entry(head, struct flb_upstream_node, _head); + + /* Is TLS enabled ? */ + if (node->tls_enabled == FLB_TRUE) { + io_flags = FLB_IO_TLS; + } + else { + io_flags = FLB_IO_TCP; + } + + if (f_index) { + ec->index = flb_strdup(f_index->value); + } + else { + tmp = flb_upstream_node_get_property("index", node); + if (!tmp) { + ec->index = flb_strdup(FLB_ES_DEFAULT_INDEX); + } + else { + ec->index = flb_strdup(tmp); + } + } + + if (f_type) { + ec->type = flb_strdup(f_type->value); + } + else { + tmp = flb_upstream_node_get_property("type", node); + if (!tmp) { + ec->type = flb_strdup(FLB_ES_DEFAULT_TYPE); + } + else { + ec->type = flb_strdup(tmp); + } + } + + /* HTTP Auth */ + tmp = flb_upstream_node_get_property("http_user", node); + if (tmp) { + ec->http_user = flb_strdup(tmp); + + tmp = flb_upstream_node_get_property("http_passwd", node); + if (tmp) { + ec->http_passwd = flb_strdup(tmp); + } + else { + ec->http_passwd = flb_strdup(""); + } + } + + /* + * Logstash compatibility options + * ============================== + */ + + /* Logstash_Format */ + tmp = flb_upstream_node_get_property("logstash_format", node); + if (tmp) { + ec->logstash_format = flb_utils_bool(tmp); + } + else { + ec->logstash_format = FLB_FALSE; + } + + /* Logstash_Prefix */ + tmp = flb_upstream_node_get_property("logstash_prefix", node); + if (tmp) { + ec->logstash_prefix = flb_strdup(tmp); + ec->logstash_prefix_len = strlen(tmp); + } + else if (ec->logstash_format == FLB_TRUE) { + ec->logstash_prefix = flb_strdup(FLB_ES_DEFAULT_PREFIX); + ec->logstash_prefix_len = sizeof(FLB_ES_DEFAULT_PREFIX) - 1; + } + + /* Logstash_Prefix_Key */ + tmp = flb_upstream_node_get_property("logstash_prefix_key", node); + if (tmp) { + ec->logstash_prefix_key = flb_strdup(tmp); + ec->logstash_prefix_key_len = strlen(tmp); + } + + /* Logstash_DateFormat */ + tmp = flb_upstream_node_get_property("logstash_dateformat", node); + if (tmp) { + ec->logstash_dateformat = flb_strdup(tmp); + ec->logstash_dateformat_len = strlen(tmp); + } + else if (ec->logstash_format == FLB_TRUE) { + ec->logstash_dateformat = flb_strdup(FLB_ES_DEFAULT_TIME_FMT); + ec->logstash_dateformat_len = sizeof(FLB_ES_DEFAULT_TIME_FMT) - 1; + } + + /* Time Key */ + tmp = flb_upstream_node_get_property("time_key", node); + if (tmp) { + ec->time_key = flb_strdup(tmp); + ec->time_key_len = strlen(tmp); + } + else { + ec->time_key = flb_strdup(FLB_ES_DEFAULT_TIME_KEY); + ec->time_key_len = sizeof(FLB_ES_DEFAULT_TIME_KEY) - 1; + } + + /* Time Key Format */ + tmp = flb_upstream_node_get_property("time_key_format", node); + if (tmp) { + ec->time_key_format = flb_strdup(tmp); + ec->time_key_format_len = strlen(tmp); + } + else { + ec->time_key_format = flb_strdup(FLB_ES_DEFAULT_TIME_KEYF); + ec->time_key_format_len = sizeof(FLB_ES_DEFAULT_TIME_KEYF) - 1; + } + + /* Include Tag key */ + tmp = flb_upstream_node_get_property("include_tag_key", node); + if (tmp) { + ec->include_tag_key = flb_utils_bool(tmp); + } + else { + ec->include_tag_key = FLB_FALSE; + } + + /* Tag Key */ + if (ec->include_tag_key == FLB_TRUE) { + tmp = flb_upstream_node_get_property("tag_key", node); + if (tmp) { + ec->tag_key = flb_strdup(tmp); + ec->tag_key_len = strlen(tmp); + } + else { + ec->tag_key = flb_strdup(FLB_ES_DEFAULT_TAG_KEY); + ec->tag_key_len = sizeof(FLB_ES_DEFAULT_TAG_KEY) - 1; + } + } + + ec->buffer_size = FLB_HTTP_DATA_SIZE_MAX; + tmp = flb_upstream_node_get_property("buffer_size", node); + if (tmp) { + if (*tmp == 'f' || *tmp == 'F' || *tmp == 'o' || *tmp == 'O') { + /* unlimited size ? */ + if (flb_utils_bool(tmp) == FLB_FALSE) { + ec->buffer_size = 0; + } + } + else { + ret = flb_utils_size_to_bytes(tmp); + if (ret == -1) { + flb_error("[out_es] invalid buffer_size=%s, using default", tmp); + } + else { + ec->buffer_size = (size_t) ret; + } + } + } + + /* Elasticsearch: Path */ + path = flb_upstream_node_get_property("path", node); + if (!path) { + path = ""; + } + + /* Elasticsearch: Pipeline */ + tmp = flb_upstream_node_get_property("pipeline", node); + if (tmp) { + snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk/?pipeline=%s", path, tmp); + } + else { + snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk", path); + } + + /* Generate _id */ + tmp = flb_upstream_node_get_property("generate_id", node); + if (tmp) { + ec->generate_id = flb_utils_bool(tmp); + } else { + ec->generate_id = FLB_FALSE; + } + + /* Replace dots */ + tmp = flb_upstream_node_get_property("replace_dots", node); + if (tmp) { + ec->replace_dots = flb_utils_bool(tmp); + } + else { + ec->replace_dots = FLB_FALSE; + } + + /* Use current time for index generation instead of message record */ + tmp = flb_upstream_node_get_property("current_time_index", node); + if (tmp) { + ec->current_time_index = flb_utils_bool(tmp); + } + else { + ec->current_time_index = FLB_FALSE; + } + + + /* Trace output */ + tmp = flb_upstream_node_get_property("Trace_Output", node); + if (tmp) { + ec->trace_output = flb_utils_bool(tmp); + } + else { + ec->trace_output = FLB_FALSE; + } + tmp = flb_upstream_node_get_property("Trace_Error", node); + if (tmp) { + ec->trace_error = flb_utils_bool(tmp); + } + else { + ec->trace_error = FLB_FALSE; + } + + /* Initialize and validate forward_config context */ + mk_list_add(&ec->_head, &ctx->configs); + + if (ret == -1) { + if (ec) { + flb_es_conf_destroy(ec); + } + return -1; + } + + /* Set our elasticsearch_config context into the node */ + flb_upstream_node_set_data(ec, node); + } + + return 0; +} + +int es_config_simple(struct flb_output_instance *ins, + struct flb_elasticsearch *ctx, + struct flb_config *config) { int io_flags = 0; @@ -38,13 +304,12 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, struct flb_uri_field *f_index = NULL; struct flb_uri_field *f_type = NULL; struct flb_upstream *upstream; - struct flb_elasticsearch *ctx; + struct flb_elasticsearch_config *ec = NULL; /* Allocate context */ - ctx = flb_calloc(1, sizeof(struct flb_elasticsearch)); - if (!ctx) { - flb_errno(); - return NULL; + ec = flb_calloc(1, sizeof(struct flb_elasticsearch_config)); + if (!ec) { + return -1; } if (uri) { @@ -77,49 +342,51 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, &ins->tls); if (!upstream) { flb_error("[out_es] cannot create Upstream context"); - flb_es_conf_destroy(ctx); - return NULL; + + flb_es_conf_destroy(ec); + flb_free(ctx); + return -1; } /* Set manual Index and Type */ ctx->u = upstream; if (f_index) { - ctx->index = flb_strdup(f_index->value); + ec->index = flb_strdup(f_index->value); } else { tmp = flb_output_get_property("index", ins); if (!tmp) { - ctx->index = flb_strdup(FLB_ES_DEFAULT_INDEX); + ec->index = flb_strdup(FLB_ES_DEFAULT_INDEX); } else { - ctx->index = flb_strdup(tmp); + ec->index = flb_strdup(tmp); } } if (f_type) { - ctx->type = flb_strdup(f_type->value); + ec->type = flb_strdup(f_type->value); } else { tmp = flb_output_get_property("type", ins); if (!tmp) { - ctx->type = flb_strdup(FLB_ES_DEFAULT_TYPE); + ec->type = flb_strdup(FLB_ES_DEFAULT_TYPE); } else { - ctx->type = flb_strdup(tmp); + ec->type = flb_strdup(tmp); } } /* HTTP Auth */ tmp = flb_output_get_property("http_user", ins); if (tmp) { - ctx->http_user = flb_strdup(tmp); + ec->http_user = flb_strdup(tmp); tmp = flb_output_get_property("http_passwd", ins); if (tmp) { - ctx->http_passwd = flb_strdup(tmp); + ec->http_passwd = flb_strdup(tmp); } else { - ctx->http_passwd = flb_strdup(""); + ec->http_passwd = flb_strdup(""); } } @@ -131,92 +398,92 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, /* Logstash_Format */ tmp = flb_output_get_property("logstash_format", ins); if (tmp) { - ctx->logstash_format = flb_utils_bool(tmp); + ec->logstash_format = flb_utils_bool(tmp); } else { - ctx->logstash_format = FLB_FALSE; + ec->logstash_format = FLB_FALSE; } /* Logstash_Prefix */ tmp = flb_output_get_property("logstash_prefix", ins); if (tmp) { - ctx->logstash_prefix = flb_strdup(tmp); - ctx->logstash_prefix_len = strlen(tmp); + ec->logstash_prefix = flb_strdup(tmp); + ec->logstash_prefix_len = strlen(tmp); } - else if (ctx->logstash_format == FLB_TRUE) { - ctx->logstash_prefix = flb_strdup(FLB_ES_DEFAULT_PREFIX); - ctx->logstash_prefix_len = sizeof(FLB_ES_DEFAULT_PREFIX) - 1; + else if (ec->logstash_format == FLB_TRUE) { + ec->logstash_prefix = flb_strdup(FLB_ES_DEFAULT_PREFIX); + ec->logstash_prefix_len = sizeof(FLB_ES_DEFAULT_PREFIX) - 1; } /* Logstash_Prefix_Key */ tmp = flb_output_get_property("logstash_prefix_key", ins); if (tmp) { - ctx->logstash_prefix_key = flb_strdup(tmp); - ctx->logstash_prefix_key_len = strlen(tmp); + ec->logstash_prefix_key = flb_strdup(tmp); + ec->logstash_prefix_key_len = strlen(tmp); } /* Logstash_DateFormat */ tmp = flb_output_get_property("logstash_dateformat", ins); if (tmp) { - ctx->logstash_dateformat = flb_strdup(tmp); - ctx->logstash_dateformat_len = strlen(tmp); + ec->logstash_dateformat = flb_strdup(tmp); + ec->logstash_dateformat_len = strlen(tmp); } - else if (ctx->logstash_format == FLB_TRUE) { - ctx->logstash_dateformat = flb_strdup(FLB_ES_DEFAULT_TIME_FMT); - ctx->logstash_dateformat_len = sizeof(FLB_ES_DEFAULT_TIME_FMT) - 1; + else if (ec->logstash_format == FLB_TRUE) { + ec->logstash_dateformat = flb_strdup(FLB_ES_DEFAULT_TIME_FMT); + ec->logstash_dateformat_len = sizeof(FLB_ES_DEFAULT_TIME_FMT) - 1; } /* Time Key */ tmp = flb_output_get_property("time_key", ins); if (tmp) { - ctx->time_key = flb_strdup(tmp); - ctx->time_key_len = strlen(tmp); + ec->time_key = flb_strdup(tmp); + ec->time_key_len = strlen(tmp); } else { - ctx->time_key = flb_strdup(FLB_ES_DEFAULT_TIME_KEY); - ctx->time_key_len = sizeof(FLB_ES_DEFAULT_TIME_KEY) - 1; + ec->time_key = flb_strdup(FLB_ES_DEFAULT_TIME_KEY); + ec->time_key_len = sizeof(FLB_ES_DEFAULT_TIME_KEY) - 1; } /* Time Key Format */ tmp = flb_output_get_property("time_key_format", ins); if (tmp) { - ctx->time_key_format = flb_strdup(tmp); - ctx->time_key_format_len = strlen(tmp); + ec->time_key_format = flb_strdup(tmp); + ec->time_key_format_len = strlen(tmp); } else { - ctx->time_key_format = flb_strdup(FLB_ES_DEFAULT_TIME_KEYF); - ctx->time_key_format_len = sizeof(FLB_ES_DEFAULT_TIME_KEYF) - 1; + ec->time_key_format = flb_strdup(FLB_ES_DEFAULT_TIME_KEYF); + ec->time_key_format_len = sizeof(FLB_ES_DEFAULT_TIME_KEYF) - 1; } /* Include Tag key */ tmp = flb_output_get_property("include_tag_key", ins); if (tmp) { - ctx->include_tag_key = flb_utils_bool(tmp); + ec->include_tag_key = flb_utils_bool(tmp); } else { - ctx->include_tag_key = FLB_FALSE; + ec->include_tag_key = FLB_FALSE; } /* Tag Key */ - if (ctx->include_tag_key == FLB_TRUE) { + if (ec->include_tag_key == FLB_TRUE) { tmp = flb_output_get_property("tag_key", ins); if (tmp) { - ctx->tag_key = flb_strdup(tmp); - ctx->tag_key_len = strlen(tmp); + ec->tag_key = flb_strdup(tmp); + ec->tag_key_len = strlen(tmp); } else { - ctx->tag_key = flb_strdup(FLB_ES_DEFAULT_TAG_KEY); - ctx->tag_key_len = sizeof(FLB_ES_DEFAULT_TAG_KEY) - 1; + ec->tag_key = flb_strdup(FLB_ES_DEFAULT_TAG_KEY); + ec->tag_key_len = sizeof(FLB_ES_DEFAULT_TAG_KEY) - 1; } } - ctx->buffer_size = FLB_HTTP_DATA_SIZE_MAX; + ec->buffer_size = FLB_HTTP_DATA_SIZE_MAX; tmp = flb_output_get_property("buffer_size", ins); if (tmp) { if (*tmp == 'f' || *tmp == 'F' || *tmp == 'o' || *tmp == 'O') { /* unlimited size ? */ if (flb_utils_bool(tmp) == FLB_FALSE) { - ctx->buffer_size = 0; + ec->buffer_size = 0; } } else { @@ -225,7 +492,7 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, flb_error("[out_es] invalid buffer_size=%s, using default", tmp); } else { - ctx->buffer_size = (size_t) ret; + ec->buffer_size = (size_t) ret; } } } @@ -239,81 +506,87 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, /* Elasticsearch: Pipeline */ tmp = flb_output_get_property("pipeline", ins); if (tmp) { - snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk/?pipeline=%s", path, tmp); + snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk/?pipeline=%s", path, tmp); } else { - snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk", path); + snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk", path); } /* Generate _id */ tmp = flb_output_get_property("generate_id", ins); if (tmp) { - ctx->generate_id = flb_utils_bool(tmp); + ec->generate_id = flb_utils_bool(tmp); } else { - ctx->generate_id = FLB_FALSE; + ec->generate_id = FLB_FALSE; } /* Replace dots */ tmp = flb_output_get_property("replace_dots", ins); if (tmp) { - ctx->replace_dots = flb_utils_bool(tmp); + ec->replace_dots = flb_utils_bool(tmp); } else { - ctx->replace_dots = FLB_FALSE; + ec->replace_dots = FLB_FALSE; } /* Use current time for index generation instead of message record */ tmp = flb_output_get_property("current_time_index", ins); if (tmp) { - ctx->current_time_index = flb_utils_bool(tmp); + ec->current_time_index = flb_utils_bool(tmp); } else { - ctx->current_time_index = FLB_FALSE; + ec->current_time_index = FLB_FALSE; } /* Trace output */ tmp = flb_output_get_property("Trace_Output", ins); if (tmp) { - ctx->trace_output = flb_utils_bool(tmp); + ec->trace_output = flb_utils_bool(tmp); } else { - ctx->trace_output = FLB_FALSE; + ec->trace_output = FLB_FALSE; } tmp = flb_output_get_property("Trace_Error", ins); if (tmp) { - ctx->trace_error = flb_utils_bool(tmp); + ec->trace_error = flb_utils_bool(tmp); } else { - ctx->trace_error = FLB_FALSE; + ec->trace_error = FLB_FALSE; } - return ctx; + mk_list_add(&ec->_head, &ctx->configs); + + flb_debug("[out_es] host=%s port=%i uri=%s index=%s type=%s", + ins->host.name, ins->host.port, ec->uri, + ec->index, ec->type); + + return 0; } -int flb_es_conf_destroy(struct flb_elasticsearch *ctx) +int flb_es_conf_destroy(struct flb_elasticsearch_config *ec) { - flb_free(ctx->index); - flb_free(ctx->type); + flb_free(ec->index); + flb_free(ec->type); - flb_free(ctx->http_user); - flb_free(ctx->http_passwd); + flb_free(ec->http_user); + flb_free(ec->http_passwd); - flb_free(ctx->logstash_prefix); - flb_free(ctx->logstash_dateformat); - flb_free(ctx->time_key); - flb_free(ctx->time_key_format); + flb_free(ec->logstash_prefix); + flb_free(ec->logstash_dateformat); + flb_free(ec->time_key); + flb_free(ec->time_key_format); - if (ctx->include_tag_key) { - flb_free(ctx->tag_key); + if (ec->include_tag_key) { + flb_free(ec->tag_key); } - if (ctx->logstash_prefix_key) { - flb_free(ctx->logstash_prefix_key); + if (ec->logstash_prefix_key) { + flb_free(ec->logstash_prefix_key); } - flb_upstream_destroy(ctx->u); - flb_free(ctx); + //flb_upstream_destroy(ec->u); + flb_free(ec); return 0; } diff --git a/plugins/out_es/es_conf.h b/plugins/out_es/es_conf.h index ff9f93ae1f0..380fa2ba19b 100644 --- a/plugins/out_es/es_conf.h +++ b/plugins/out_es/es_conf.h @@ -27,8 +27,15 @@ #include "es.h" -struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, - struct flb_config *config); -int flb_es_conf_destroy(struct flb_elasticsearch *ctx); + +int es_config_ha(const char *upstream_file, + struct flb_elasticsearch *ctx, + struct flb_config *config); + +int es_config_simple(struct flb_output_instance *ins, + struct flb_elasticsearch *ctx, + struct flb_config *config); + +int flb_es_conf_destroy(struct flb_elasticsearch_config *ec); #endif From a25563282aeccf196a0d7dc965493bf82ec8187b Mon Sep 17 00:00:00 2001 From: Clara Pohland <54847419+claralui@users.noreply.github.com> Date: Mon, 9 Sep 2019 11:15:27 +0200 Subject: [PATCH 2/4] es_out: support multiple output nodes in round-robin by using upstream servers configuration Signed-off-by: Clara Pohland <54847419+claralui@users.noreply.github.com> --- plugins/out_es/es.c | 170 +++++++++++++++++++++++++++++---------- plugins/out_es/es.h | 16 +++- plugins/out_es/es_conf.c | 167 +++++++++++++++++++++++++++++++++----- plugins/out_es/es_conf.h | 13 ++- 4 files changed, 298 insertions(+), 68 deletions(-) diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index 3b34de865e6..fda31bc0e85 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -22,6 +22,8 @@ #include #include #include +#include +#include #include #include #include @@ -83,7 +85,7 @@ static flb_sds_t add_aws_auth(struct flb_elasticsearch *ctx, static inline int es_pack_map_content(msgpack_packer *tmp_pck, msgpack_object map, - struct flb_elasticsearch *ctx) + struct flb_elasticsearch_config *ec) { int i; char *ptr_key = NULL; @@ -132,7 +134,7 @@ static inline int es_pack_map_content(msgpack_packer *tmp_pck, * * https://goo.gl/R5NMTr */ - if (ctx->replace_dots == FLB_TRUE) { + if (ec->replace_dots == FLB_TRUE) { char *p = ptr_key; char *end = ptr_key + key_size; while (p != end) { @@ -157,7 +159,7 @@ static inline int es_pack_map_content(msgpack_packer *tmp_pck, */ if (v->type == MSGPACK_OBJECT_MAP) { msgpack_pack_map(tmp_pck, v->via.map.size); - es_pack_map_content(tmp_pck, *v, ctx); + es_pack_map_content(tmp_pck, *v, ec); } else { msgpack_pack_object(tmp_pck, *v); @@ -175,7 +177,7 @@ static inline int es_pack_map_content(msgpack_packer *tmp_pck, */ static char *elasticsearch_format(const void *data, size_t bytes, const char *tag, int tag_len, int *out_size, - struct flb_elasticsearch *ctx) + struct flb_elasticsearch_config *ec) { int ret; int len; @@ -245,9 +247,9 @@ static char *elasticsearch_format(const void *data, size_t bytes, msgpack_unpacked_init(&result); /* Copy logstash prefix if logstash format is enabled */ - if (ctx->logstash_format == FLB_TRUE) { - memcpy(logstash_index, ctx->logstash_prefix, flb_sds_len(ctx->logstash_prefix)); - logstash_index[flb_sds_len(ctx->logstash_prefix)] = '\0'; + if (ec->logstash_format == FLB_TRUE) { + memcpy(logstash_index, ec->logstash_prefix, flb_sds_len(ec->logstash_prefix)); + logstash_index[flb_sds_len(ec->logstash_prefix)] = '\0'; } /* @@ -257,17 +259,17 @@ static char *elasticsearch_format(const void *data, size_t bytes, * The header stored in 'j_index' will be used for the all records on * this payload. */ - if (ctx->logstash_format == FLB_FALSE && ctx->generate_id == FLB_FALSE) { + if (ec->logstash_format == FLB_FALSE && ec->generate_id == FLB_FALSE) { flb_time_get(&tms); gmtime_r(&tms.tm.tv_sec, &tm); strftime(index_formatted, sizeof(index_formatted) - 1, - ctx->index, &tm); + ec->index, &tm); es_index = index_formatted; index_len = snprintf(j_index, ES_BULK_HEADER, ES_BULK_INDEX_FMT, - es_index, ctx->type); + es_index, ec->type); } /* @@ -276,7 +278,7 @@ static char *elasticsearch_format(const void *data, size_t bytes, * in order to prevent generating millions of indexes * we can set to always use current time for index generation */ - if (ctx->current_time_index == FLB_TRUE) { + if (ec->current_time_index == FLB_TRUE) { flb_time_get(&tms); } @@ -293,7 +295,7 @@ static char *elasticsearch_format(const void *data, size_t bytes, } /* Only pop time from record if current_time_index is disabled */ - if (ctx->current_time_index == FLB_FALSE) { + if (ec->current_time_index == FLB_FALSE) { flb_time_pop_from_msgpack(&tms, &result, &obj); } @@ -308,17 +310,17 @@ static char *elasticsearch_format(const void *data, size_t bytes, map_size = map.via.map.size; es_index_custom_len = 0; - if (ctx->logstash_prefix_key) { + if (ec->logstash_prefix_key) { for (i = 0; i < map_size; i++) { key = map.via.map.ptr[i].key; if (key.type != MSGPACK_OBJECT_STR) { continue; } - if (key.via.str.size != flb_sds_len(ctx->logstash_prefix_key)) { + if (key.via.str.size != flb_sds_len(ec->logstash_prefix_key_len)) { continue; } - if (strncmp(key.via.str.ptr, ctx->logstash_prefix_key, - flb_sds_len(ctx->logstash_prefix_key)) != 0) { + if (strncmp(key.via.str.ptr, ec->logstash_prefix_key, + flb_sds_len(ec->logstash_prefix_key)) != 0) { continue; } val = map.via.map.ptr[i].val; @@ -340,7 +342,7 @@ static char *elasticsearch_format(const void *data, size_t bytes, msgpack_sbuffer_init(&tmp_sbuf); msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write); - if (ctx->include_tag_key == FLB_TRUE) { + if (ec->include_tag_key == FLB_TRUE) { map_size++; } @@ -348,13 +350,18 @@ static char *elasticsearch_format(const void *data, size_t bytes, msgpack_pack_map(&tmp_pck, map_size + 1); /* Append the time key */ +<<<<<<< HEAD msgpack_pack_str(&tmp_pck, flb_sds_len(ctx->time_key)); msgpack_pack_str_body(&tmp_pck, ctx->time_key, flb_sds_len(ctx->time_key)); +======= + msgpack_pack_str(&tmp_pck, ec->time_key_len); + msgpack_pack_str_body(&tmp_pck, ec->time_key, ec->time_key_len); +>>>>>>> es_out: support multiple output nodes in round-robin by using upstream servers configuration /* Format the time */ gmtime_r(&tms.tm.tv_sec, &tm); s = strftime(time_formatted, sizeof(time_formatted) - 1, - ctx->time_key_format, &tm); + ec->time_key_format, &tm); len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s, ".%03" PRIu64 "Z", (uint64_t) tms.tm.tv_nsec); @@ -362,40 +369,50 @@ static char *elasticsearch_format(const void *data, size_t bytes, msgpack_pack_str(&tmp_pck, s); msgpack_pack_str_body(&tmp_pck, time_formatted, s); - es_index = ctx->index; - if (ctx->logstash_format == FLB_TRUE) { + es_index = ec->index; + if (ec->logstash_format == FLB_TRUE) { /* Compose Index header */ if (es_index_custom_len > 0) { p = logstash_index + es_index_custom_len; } else { +<<<<<<< HEAD p = logstash_index + flb_sds_len(ctx->logstash_prefix); +======= + p = logstash_index + ec->logstash_prefix_len; +>>>>>>> es_out: support multiple output nodes in round-robin by using upstream servers configuration } *p++ = '-'; len = p - logstash_index; s = strftime(p, sizeof(logstash_index) - len - 1, - ctx->logstash_dateformat, &tm); + ec->logstash_dateformat, &tm); p += s; *p++ = '\0'; es_index = logstash_index; - if (ctx->generate_id == FLB_FALSE) { + if (ec->generate_id == FLB_FALSE) { index_len = snprintf(j_index, ES_BULK_HEADER, ES_BULK_INDEX_FMT, - es_index, ctx->type); + es_index, ec->type); } } - else if (ctx->current_time_index == FLB_TRUE) { + else if (ec->current_time_index == FLB_TRUE) { /* Make sure we handle index time format for index */ strftime(index_formatted, sizeof(index_formatted) - 1, - ctx->index, &tm); + ec->index, &tm); es_index = index_formatted; } /* Tag Key */ +<<<<<<< HEAD if (ctx->include_tag_key == FLB_TRUE) { msgpack_pack_str(&tmp_pck, flb_sds_len(ctx->tag_key)); msgpack_pack_str_body(&tmp_pck, ctx->tag_key, flb_sds_len(ctx->tag_key)); +======= + if (ec->include_tag_key == FLB_TRUE) { + msgpack_pack_str(&tmp_pck, ec->tag_key_len); + msgpack_pack_str_body(&tmp_pck, ec->tag_key, ec->tag_key_len); +>>>>>>> es_out: support multiple output nodes in round-robin by using upstream servers configuration msgpack_pack_str(&tmp_pck, tag_len); msgpack_pack_str_body(&tmp_pck, tag, tag_len); } @@ -407,7 +424,7 @@ static char *elasticsearch_format(const void *data, size_t bytes, * Elasticsearch have a restriction that key names cannot contain * a dot; if some dot is found, it's replaced with an underscore. */ - ret = es_pack_map_content(&tmp_pck, map, ctx); + ret = es_pack_map_content(&tmp_pck, map, ec); if (ret == -1) { msgpack_unpacked_destroy(&result); msgpack_sbuffer_destroy(&tmp_sbuf); @@ -415,7 +432,7 @@ static char *elasticsearch_format(const void *data, size_t bytes, return NULL; } - if (ctx->generate_id == FLB_TRUE) { + if (ec->generate_id == FLB_TRUE) { MurmurHash3_x64_128(tmp_sbuf.data, tmp_sbuf.size, 42, hash); snprintf(es_uuid, sizeof(es_uuid), "%04x%04x-%04x-%04x-%04x-%04x%04x%04x", @@ -424,7 +441,7 @@ static char *elasticsearch_format(const void *data, size_t bytes, index_len = snprintf(j_index, ES_BULK_HEADER, ES_BULK_INDEX_FMT_ID, - es_index, ctx->type, es_uuid); + es_index, ec->type, es_uuid); } /* Convert msgpack to JSON */ @@ -458,7 +475,7 @@ static char *elasticsearch_format(const void *data, size_t bytes, * return the bulk->ptr buffer */ flb_free(bulk); - if (ctx->trace_output) { + if (ec->trace_output) { fwrite(buf, 1, *out_size, stdout); fflush(stdout); } @@ -469,10 +486,14 @@ static int cb_es_init(struct flb_output_instance *ins, struct flb_config *config, void *data) { + int ret; + const char *tmp; struct flb_elasticsearch *ctx; + (void) data; - ctx = flb_es_conf_create(ins, config); + ctx = flb_calloc(1, sizeof(struct flb_elasticsearch)); if (!ctx) { +<<<<<<< HEAD flb_plg_error(ins, "cannot initialize plugin"); return -1; } @@ -481,8 +502,25 @@ static int cb_es_init(struct flb_output_instance *ins, ins->host.name, ins->host.port, ctx->uri, ctx->index, ctx->type); +======= + flb_errno(); + return -1; + } + + mk_list_init(&ctx->configs); +>>>>>>> es_out: support multiple output nodes in round-robin by using upstream servers configuration flb_output_set_context(ins, ctx); - return 0; + + /* Configure HA or simple mode ? */ + tmp = flb_output_get_property("upstream", ins); + if (tmp) { + ret = es_config_ha(tmp, ctx, config); + } + else { + ret = es_config_simple(ins, ctx, config); + } + + return ret; } static int elasticsearch_error_check(struct flb_elasticsearch *ctx, @@ -593,31 +631,57 @@ static void cb_es_flush(const void *data, size_t bytes, char *pack; size_t b_sent; struct flb_elasticsearch *ctx = out_context; + struct flb_elasticsearch_config *ec = NULL; struct flb_upstream_conn *u_conn; struct flb_http_client *c; flb_sds_t signature = NULL; + struct flb_upstream_node *node; (void) i_ins; (void) tag; (void) tag_len; + if (ctx->ha_mode == FLB_TRUE) { + node = flb_upstream_ha_node_get(ctx->ha); + if (!node) { + flb_plg_error(ctx->ins, "cannot get an Upstream HA node"); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + /* Get forward_config stored in node opaque data */ + ec = flb_upstream_node_get_data(node); + } + else { + ec = mk_list_entry_first(&ctx->configs, + struct flb_elasticsearch_config, + _head); + } + + flb_plg_debug(ctx->ins, "trying node %s", node->name); + /* Get upstream connection */ - u_conn = flb_upstream_conn_get(ctx->u); + if (ctx->ha_mode == FLB_TRUE) { + u_conn = flb_upstream_conn_get(node->u); + } + else { + u_conn = flb_upstream_conn_get(ctx->u); + } if (!u_conn) { + flb_plg_error(ctx->ins, ("no upstream connections available"); FLB_OUTPUT_RETURN(FLB_RETRY); } /* Convert format */ - pack = elasticsearch_format(data, bytes, tag, tag_len, &bytes_out, ctx); + pack = elasticsearch_format(data, bytes, tag, tag_len, &bytes_out, ec); if (!pack) { flb_upstream_conn_release(u_conn); FLB_OUTPUT_RETURN(FLB_ERROR); } /* Compose HTTP Client request */ - c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->uri, + c = flb_http_client(u_conn, FLB_HTTP_POST, ec->uri, pack, bytes_out, NULL, 0, NULL, 0); - flb_http_buffer_size(c, ctx->buffer_size); + flb_http_buffer_size(c, ec->buffer_size); #ifndef FLB_HAVE_SIGNV4 flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); @@ -625,8 +689,8 @@ static void cb_es_flush(const void *data, size_t bytes, flb_http_add_header(c, "Content-Type", 12, "application/x-ndjson", 20); - if (ctx->http_user && ctx->http_passwd) { - flb_http_basic_auth(c, ctx->http_user, ctx->http_passwd); + if (ec->http_user && ec->http_passwd) { + flb_http_basic_auth(c, ec->http_user, ec->http_passwd); } #ifdef FLB_HAVE_SIGNV4 @@ -645,12 +709,12 @@ static void cb_es_flush(const void *data, size_t bytes, ret = flb_http_do(c, &b_sent); if (ret != 0) { - flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri); + flb_plg_warn(ec->ins, "http_do=%i URI=%s", ret, ec->uri); goto retry; } else { /* The request was issued successfully, validate the 'error' field */ - flb_plg_debug(ctx->ins, "HTTP Status=%i URI=%s", c->resp.status, ctx->uri); + flb_plg_debug(ec->ins, "HTTP Status=%i URI=%s", c->resp.status, ec->uri); if (c->resp.status != 200 && c->resp.status != 201) { if (c->resp.payload_size > 0) { flb_plg_error(ctx->ins, "HTTP status=%i URI=%s, response:\n%s\n", @@ -671,7 +735,7 @@ static void cb_es_flush(const void *data, size_t bytes, ret = elasticsearch_error_check(ctx, c); if (ret == FLB_TRUE) { /* we got an error */ - if (ctx->trace_error) { + if (ec->trace_error) { /* * If trace_error is set, trace the actual * input/output to Elasticsearch that caused the problem. @@ -711,8 +775,32 @@ static void cb_es_flush(const void *data, size_t bytes, static int cb_es_exit(void *data, struct flb_config *config) { struct flb_elasticsearch *ctx = data; + struct flb_elasticsearch_config *ec; + struct mk_list *head; + struct mk_list *tmp; + (void) config; + + if (!ctx) { + return 0; + } + + /* Destroy elasticsearch_config contexts */ + mk_list_foreach_safe(head, tmp, &ctx->configs) { + ec = mk_list_entry(head, struct flb_elasticsearch_config, _head); + mk_list_del(&ec->_head); + flb_es_conf_destroy(ec); + } + + if (ctx->ha_mode == FLB_TRUE) { + if (ctx->ha) { + flb_upstream_ha_destroy(ctx->ha); + } + } + else { + flb_upstream_destroy(ctx->u); + } + flb_free(ctx); - flb_es_conf_destroy(ctx); return 0; } diff --git a/plugins/out_es/es.h b/plugins/out_es/es.h index 4422af3f99a..fdc4f547302 100644 --- a/plugins/out_es/es.h +++ b/plugins/out_es/es.h @@ -32,7 +32,7 @@ #define FLB_ES_DEFAULT_TAG_KEY "flb-key" #define FLB_ES_DEFAULT_HTTP_MAX "4096" -struct flb_elasticsearch { +struct flb_elasticsearch_config { /* Elasticsearch index (database) and type (table) */ char *index; char *type; @@ -91,9 +91,19 @@ struct flb_elasticsearch { /* Elasticsearch HTTP API */ char uri[256]; - /* Upstream connection to the backend server */ - struct flb_upstream *u; + /* Link to list flb_elasticsearch->configs */ + struct mk_list _head; +}; +struct flb_elasticsearch { + /* if HA mode is enabled */ + int ha_mode; /* High Availability mode enabled ? */ + char *ha_upstream; /* Upstream configuration file */ + struct flb_upstream_ha *ha; + + /* Upstream handler and config context for single mode (no HA) */ + struct flb_upstream *u; + struct mk_list configs; /* Plugin output instance reference */ struct flb_output_instance *ins; }; diff --git a/plugins/out_es/es_conf.c b/plugins/out_es/es_conf.c index 2b64e3ef354..de3ccad67dc 100644 --- a/plugins/out_es/es_conf.c +++ b/plugins/out_es/es_conf.c @@ -23,12 +23,114 @@ #include #include #include +#include +#include #include "es.h" #include "es_conf.h" -struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, - struct flb_config *config) +/* Configure in HA mode */ +int es_config_ha(const char *upstream_file, + struct flb_elasticsearch *ctx, + struct flb_config *config) +{ + int io_flags = 0; + ssize_t ret; + const char *tmp; + const char *path; + struct mk_list *head; + struct flb_uri_field *f_index = NULL; + struct flb_uri_field *f_type = NULL; + struct flb_upstream_node *node; + struct flb_elasticsearch_config *ec = NULL; + + /* Allocate context */ + ec = flb_calloc(1, sizeof(struct flb_elasticsearch_config)); + if (!ec) { + flb_errno(); + flb_plg_error(ctx->ins, "failed config allocation"); + flb_es_conf_destroy(ec); + flb_free(ctx); + return -1 + } + + ctx->ha_mode = FLB_TRUE; + ctx->ha = flb_upstream_ha_from_file(upstream_file, config); + if (!ctx->ha) { + flb_plg_error(ctx->ins, "cannot load Upstream file"); + return -1; + } + + /* Iterate nodes and create a forward_config context */ + mk_list_foreach(head, &ctx->ha->nodes) { + node = mk_list_entry(head, struct flb_upstream_node, _head); + + /* Populate context with config map defaults and incoming properties */ + ret = flb_output_config_map_set(ins, (void *) ec); + if (ret == -1) { + flb_plg_error(ctx->ins, "configuration error"); + flb_es_conf_destroy(ec); + flb_free(ctx); + return -1; + } + + /* Is TLS enabled ? */ + if (node->tls_enabled == FLB_TRUE) { + io_flags = FLB_IO_TLS; + } + else { + io_flags = FLB_IO_TCP; + } + + /* Set manual Index and Type */ + if (f_index) { + ec->index = flb_strdup(f_index->value); /* FIXME */ + } + + if (f_type) { + ec->type = flb_strdup(f_type->value); /* FIXME */ + } + + /* HTTP Payload (response) maximum buffer size (0 == unlimited) */ + if (ec->buffer_size == -1) { + ec->buffer_size = 0; + } + + /* Elasticsearch: Path */ + path = flb_upstream_node_get_property("path", node); + if (!path) { + path = ""; + } + + /* Elasticsearch: Pipeline */ + tmp = flb_upstream_node_get_property("pipeline", node); + if (tmp) { + snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk/?pipeline=%s", path, tmp); + } + else { + snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk", path); + } + + /* Initialize and validate es_config context */ + ret = mk_list_add(&ec->_head, &ctx->configs); + + if (ret == -1) { + if (ec) { + flb_es_conf_destroy(ec); + } + return -1; + } + + /* Set our elasticsearch_config context into the node */ + flb_upstream_node_set_data(ec, node); + } + + return 0; +} + +int es_config_simple(struct flb_output_instance *ins, + struct flb_elasticsearch *ctx, + struct flb_config *config) { int io_flags = 0; @@ -39,13 +141,12 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, struct flb_uri_field *f_index = NULL; struct flb_uri_field *f_type = NULL; struct flb_upstream *upstream; - struct flb_elasticsearch *ctx; + struct flb_elasticsearch_config *ec = NULL; /* Allocate context */ - ctx = flb_calloc(1, sizeof(struct flb_elasticsearch)); - if (!ctx) { - flb_errno(); - return NULL; + ec = flb_calloc(1, sizeof(struct flb_elasticsearch_config)); + if (!ec) { + return -1; } ctx->ins = ins; @@ -63,7 +164,8 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, ret = flb_output_config_map_set(ins, (void *) ctx); if (ret == -1) { flb_plg_error(ctx->ins, "configuration error"); - flb_es_conf_destroy(ctx); + flb_es_conf_destroy(ec); + flb_free(ctx); return NULL; } @@ -87,7 +189,8 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, &ins->tls); if (!upstream) { flb_plg_error(ctx->ins, "cannot create Upstream context"); - flb_es_conf_destroy(ctx); + flb_es_conf_destroy(ec); + flb_free(ctx); return NULL; } ctx->u = upstream; @@ -97,16 +200,16 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, /* Set manual Index and Type */ if (f_index) { - ctx->index = flb_strdup(f_index->value); /* FIXME */ + ec->index = flb_strdup(f_index->value); /* FIXME */ } if (f_type) { - ctx->type = flb_strdup(f_type->value); /* FIXME */ + ec->type = flb_strdup(f_type->value); /* FIXME */ } /* HTTP Payload (response) maximum buffer size (0 == unlimited) */ - if (ctx->buffer_size == -1) { - ctx->buffer_size = 0; + if (ec->buffer_size == -1) { + ec->buffer_size = 0; } /* Elasticsearch: Path */ @@ -118,10 +221,10 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, /* Elasticsearch: Pipeline */ tmp = flb_output_get_property("pipeline", ins); if (tmp) { - snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk/?pipeline=%s", path, tmp); + snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk/?pipeline=%s", path, tmp); } else { - snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk", path); + snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk", path); } #ifdef FLB_HAVE_SIGNV4 @@ -147,19 +250,41 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, } #endif - return ctx; + mk_list_add(&ec->_head, &ctx->configs); + + flb_debug("[out_es] host=%s port=%i uri=%s index=%s type=%s", + ins->host.name, ins->host.port, ec->uri, + ec->index, ec->type); + + return 0; } -int flb_es_conf_destroy(struct flb_elasticsearch *ctx) +int flb_es_conf_destroy(struct flb_elasticsearch_config *ec) { - if (!ctx) { + if (!ec) { return 0; } - if (ctx->u) { - flb_upstream_destroy(ctx->u); + flb_free(ec->index); + flb_free(ec->type); + + flb_free(ec->http_user); + flb_free(ec->http_passwd); + + flb_free(ec->logstash_prefix); + flb_free(ec->logstash_dateformat); + flb_free(ec->time_key); + flb_free(ec->time_key_format); + + if (ec->include_tag_key) { + flb_free(ec->tag_key); } - flb_free(ctx); + + if (ec->logstash_prefix_key) { + flb_free(ec->logstash_prefix_key); + } + + flb_free(ec); return 0; } diff --git a/plugins/out_es/es_conf.h b/plugins/out_es/es_conf.h index 52c37c74570..3af6a3b356c 100644 --- a/plugins/out_es/es_conf.h +++ b/plugins/out_es/es_conf.h @@ -27,8 +27,15 @@ #include "es.h" -struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, - struct flb_config *config); -int flb_es_conf_destroy(struct flb_elasticsearch *ctx); + +int es_config_ha(const char *upstream_file, + struct flb_elasticsearch *ctx, + struct flb_config *config); + +int es_config_simple(struct flb_output_instance *ins, + struct flb_elasticsearch *ctx, + struct flb_config *config); + +int flb_es_conf_destroy(struct flb_elasticsearch_config *ec); #endif From d0ba501e6fde08ea73078acff6e9bdf277223934 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magnus=20Sirvi=C3=B6?= Date: Wed, 11 Mar 2020 00:36:44 +0100 Subject: [PATCH 3/4] Various fixes to adopt the out_es_ha_support branch for master --- plugins/out_es/es.c | 8 ++++-- plugins/out_es/es_conf.c | 57 +++++++++++++++------------------------- 2 files changed, 27 insertions(+), 38 deletions(-) diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index 1a64b5ee23d..72cb152f1d8 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -624,6 +624,7 @@ static void cb_es_flush(const void *data, size_t bytes, /* Get forward_config stored in node opaque data */ ec = flb_upstream_node_get_data(node); + flb_plg_debug(ctx->ins, "trying node %s", node->name); } else { ec = mk_list_entry_first(&ctx->configs, @@ -631,8 +632,6 @@ static void cb_es_flush(const void *data, size_t bytes, _head); } - flb_plg_debug(ctx->ins, "trying node %s", node->name); - /* Get upstream connection */ if (ctx->ha_mode == FLB_TRUE) { u_conn = flb_upstream_conn_get(node->u); @@ -906,6 +905,11 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, trace_error), NULL }, + { + FLB_CONFIG_MAP_STR, "upstream", NULL, + 0, FLB_FALSE, 0, + NULL + }, /* EOF */ {0} diff --git a/plugins/out_es/es_conf.c b/plugins/out_es/es_conf.c index 078c884c4cf..01b9c9031ae 100644 --- a/plugins/out_es/es_conf.c +++ b/plugins/out_es/es_conf.c @@ -25,6 +25,7 @@ #include #include #include +#include #include "es.h" #include "es_conf.h" @@ -44,14 +45,6 @@ int es_config_ha(const char *upstream_file, struct flb_upstream_node *node; struct flb_elasticsearch_config *ec = NULL; - /* Allocate context */ - ec = flb_calloc(1, sizeof(struct flb_elasticsearch_config)); - if (!ec) { - flb_errno(); - flb_plg_error(ctx->ins, "failed config allocation"); - return -1; - } - ctx->ha_mode = FLB_TRUE; ctx->ha = flb_upstream_ha_from_file(upstream_file, config); if (!ctx->ha) { @@ -62,9 +55,16 @@ int es_config_ha(const char *upstream_file, /* Iterate nodes and create a forward_config context */ mk_list_foreach(head, &ctx->ha->nodes) { node = mk_list_entry(head, struct flb_upstream_node, _head); + /* Allocate context */ + ec = flb_calloc(1, sizeof(struct flb_elasticsearch_config)); + if (!ec) { + flb_errno(); + flb_plg_error(ctx->ins, "failed config allocation"); + continue; + } /* Populate context with config map defaults and incoming properties */ - ret = flb_output_config_map_set(node, (void *) ec); + ret = flb_output_config_map_set(ctx->ins, (void *) ec); if (ret == -1) { flb_plg_error(ctx->ins, "configuration error"); flb_es_conf_destroy(ec); @@ -83,15 +83,19 @@ int es_config_ha(const char *upstream_file, if (f_index) { ec->index = flb_strdup(f_index->value); /* FIXME */ } - + else { + ec->index = flb_strdup(FLB_ES_DEFAULT_INDEX); + } if (f_type) { ec->type = flb_strdup(f_type->value); /* FIXME */ } + else { + ec->type = flb_strdup(FLB_ES_DEFAULT_TYPE); + } /* HTTP Payload (response) maximum buffer size (0 == unlimited) */ if (ec->buffer_size == -1) { ec->buffer_size = 0; - } /* Elasticsearch: Path */ @@ -112,7 +116,7 @@ int es_config_ha(const char *upstream_file, #ifdef FLB_HAVE_SIGNV4 /* AWS Auth */ ec->has_aws_auth = FLB_FALSE; - tmp = flb_output_get_property("aws_auth", node); + tmp = flb_upstream_node_get_property("aws_auth", node); if (tmp) { if (strncasecmp(tmp, "On", 2) == 0) { ec->has_aws_auth = FLB_TRUE; @@ -120,14 +124,14 @@ int es_config_ha(const char *upstream_file, "Enabled AWS Auth. Note: Amazon ElasticSearch " "Service support in Fluent Bit is experimental."); - tmp = flb_output_get_property("aws_region", node); + tmp = flb_upstream_node_get_property("aws_region", node); if (!tmp) { flb_plg_error(ctx->ins, "aws_auth enabled but aws_region not set"); flb_es_conf_destroy(ctx); return NULL; } - ec->aws_region = (char *) tmp; + ec->aws_region = flb_strdup(tmp); } } #endif @@ -174,7 +178,7 @@ int es_config_simple(struct flb_output_instance *ins, flb_output_net_default("127.0.0.1", 9200, ins); /* Populate context with config map defaults and incoming properties */ - ret = flb_output_config_map_set(ins, (void *) ctx); + ret = flb_output_config_map_set(ins, (void *) ec); if (ret == -1) { flb_plg_error(ctx->ins, "configuration error"); flb_es_conf_destroy(ec); @@ -203,7 +207,7 @@ int es_config_simple(struct flb_output_instance *ins, if (!upstream) { flb_plg_error(ctx->ins, "cannot create Upstream context"); flb_es_conf_destroy(ec); - flb_free(ctx); + flb_free(ctx); return NULL; } ctx->u = upstream; @@ -258,7 +262,7 @@ int es_config_simple(struct flb_output_instance *ins, flb_es_conf_destroy(ctx); return NULL; } - ec->aws_region = (char *) tmp; + ec->aws_region = flb_strdup(tmp); } } #endif @@ -278,25 +282,6 @@ int flb_es_conf_destroy(struct flb_elasticsearch_config *ec) return 0; } - flb_free(ec->index); - flb_free(ec->type); - - flb_free(ec->http_user); - flb_free(ec->http_passwd); - - flb_free(ec->logstash_prefix); - flb_free(ec->logstash_dateformat); - flb_free(ec->time_key); - flb_free(ec->time_key_format); - - if (ec->include_tag_key) { - flb_free(ec->tag_key); - } - - if (ec->logstash_prefix_key) { - flb_free(ec->logstash_prefix_key); - } - flb_free(ec); return 0; From 7a189c6943166bd523c878610b57576d14c089c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magnus=20Sirvi=C3=B6?= Date: Thu, 12 Mar 2020 00:09:05 +0100 Subject: [PATCH 4/4] Fix assignment of values so that memory leaks are avoided. Remove unnecessary code to set index and type for the simple configuration. --- plugins/out_es/es_conf.c | 28 +++++++++++----------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/plugins/out_es/es_conf.c b/plugins/out_es/es_conf.c index 01b9c9031ae..06188265ac3 100644 --- a/plugins/out_es/es_conf.c +++ b/plugins/out_es/es_conf.c @@ -80,17 +80,20 @@ int es_config_ha(const char *upstream_file, } /* Set manual Index and Type */ - if (f_index) { - ec->index = flb_strdup(f_index->value); /* FIXME */ + tmp = flb_upstream_node_get_property("index", node); + if (tmp) { + ec->index = tmp; } else { - ec->index = flb_strdup(FLB_ES_DEFAULT_INDEX); + ec->index = FLB_ES_DEFAULT_INDEX; } - if (f_type) { - ec->type = flb_strdup(f_type->value); /* FIXME */ + + tmp = flb_upstream_node_get_property("type", node); + if (tmp) { + ec->type = tmp; } else { - ec->type = flb_strdup(FLB_ES_DEFAULT_TYPE); + ec->type = FLB_ES_DEFAULT_TYPE; } /* HTTP Payload (response) maximum buffer size (0 == unlimited) */ @@ -131,7 +134,7 @@ int es_config_ha(const char *upstream_file, flb_es_conf_destroy(ctx); return NULL; } - ec->aws_region = flb_strdup(tmp); + ec->aws_region = tmp; } } #endif @@ -215,15 +218,6 @@ int es_config_simple(struct flb_output_instance *ins, /* Set instance flags into upstream */ flb_output_upstream_set(ctx->u, ins); - /* Set manual Index and Type */ - if (f_index) { - ec->index = flb_strdup(f_index->value); /* FIXME */ - } - - if (f_type) { - ec->type = flb_strdup(f_type->value); /* FIXME */ - } - /* HTTP Payload (response) maximum buffer size (0 == unlimited) */ if (ec->buffer_size == -1) { ec->buffer_size = 0; @@ -262,7 +256,7 @@ int es_config_simple(struct flb_output_instance *ins, flb_es_conf_destroy(ctx); return NULL; } - ec->aws_region = flb_strdup(tmp); + ec->aws_region = tmp; } } #endif