Skip to content

Commit

Permalink
es_out: support multiple output nodes in round-robin by using upstrea…
Browse files Browse the repository at this point in the history
…m servers configuration

Signed-off-by: Clara Pohland <[email protected]>
  • Loading branch information
claralp committed Sep 9, 2019
1 parent 54f1c6f commit b08b568
Show file tree
Hide file tree
Showing 4 changed files with 487 additions and 134 deletions.
164 changes: 113 additions & 51 deletions plugins/out_es/es.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_network.h>
#include <fluent-bit/flb_http_client.h>
#include <fluent-bit/flb_upstream.h>
#include <fluent-bit/flb_upstream_ha.h>
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_time.h>
#include <msgpack.h>
Expand All @@ -38,7 +40,7 @@ struct flb_output_plugin out_es_plugin;

static inline int es_pack_map_content(msgpack_packer *tmp_pck,
msgpack_object map,
struct flb_elasticsearch *ctx)
struct flb_elasticsearch_config *ec)
{
int i;
char *ptr_key = NULL;
Expand Down Expand Up @@ -87,7 +89,7 @@ static inline int es_pack_map_content(msgpack_packer *tmp_pck,
*
* https://goo.gl/R5NMTr
*/
if (ctx->replace_dots == FLB_TRUE) {
if (ec->replace_dots == FLB_TRUE) {
char *p = ptr_key;
char *end = ptr_key + key_size;
while (p != end) {
Expand All @@ -112,7 +114,7 @@ static inline int es_pack_map_content(msgpack_packer *tmp_pck,
*/
if (v->type == MSGPACK_OBJECT_MAP) {
msgpack_pack_map(tmp_pck, v->via.map.size);
es_pack_map_content(tmp_pck, *v, ctx);
es_pack_map_content(tmp_pck, *v, ec);
}
else {
msgpack_pack_object(tmp_pck, *v);
Expand All @@ -130,7 +132,7 @@ static inline int es_pack_map_content(msgpack_packer *tmp_pck,
*/
static char *elasticsearch_format(const void *data, size_t bytes,
const char *tag, int tag_len, int *out_size,
struct flb_elasticsearch *ctx)
struct flb_elasticsearch_config *ec)
{
int ret;
int len;
Expand Down Expand Up @@ -200,9 +202,9 @@ static char *elasticsearch_format(const void *data, size_t bytes,
msgpack_unpacked_init(&result);

/* Copy logstash prefix if logstash format is enabled */
if (ctx->logstash_format == FLB_TRUE) {
memcpy(logstash_index, ctx->logstash_prefix, ctx->logstash_prefix_len);
logstash_index[ctx->logstash_prefix_len] = '\0';
if (ec->logstash_format == FLB_TRUE) {
memcpy(logstash_index, ec->logstash_prefix, ec->logstash_prefix_len);
logstash_index[ec->logstash_prefix_len] = '\0';
}

/*
Expand All @@ -212,17 +214,17 @@ static char *elasticsearch_format(const void *data, size_t bytes,
* The header stored in 'j_index' will be used for the all records on
* this payload.
*/
if (ctx->logstash_format == FLB_FALSE && ctx->generate_id == FLB_FALSE) {
if (ec->logstash_format == FLB_FALSE && ec->generate_id == FLB_FALSE) {
flb_time_get(&tms);
gmtime_r(&tms.tm.tv_sec, &tm);
s = strftime(index_formatted, sizeof(index_formatted) - 1,
ctx->index, &tm);
ec->index, &tm);
es_index = index_formatted;

index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT,
es_index, ctx->type);
es_index, ec->type);
}

/*
Expand All @@ -231,7 +233,7 @@ static char *elasticsearch_format(const void *data, size_t bytes,
* in order to prevent generating millions of indexes
* we can set to always use current time for index generation
*/
if (ctx->current_time_index == FLB_TRUE) {
if (ec->current_time_index == FLB_TRUE) {
flb_time_get(&tms);
}

Expand All @@ -248,7 +250,7 @@ static char *elasticsearch_format(const void *data, size_t bytes,
}

/* Only pop time from record if current_time_index is disabled */
if (ctx->current_time_index == FLB_FALSE) {
if (ec->current_time_index == FLB_FALSE) {
flb_time_pop_from_msgpack(&tms, &result, &obj);
}

Expand All @@ -263,16 +265,16 @@ static char *elasticsearch_format(const void *data, size_t bytes,
map_size = map.via.map.size;

es_index_custom_len = 0;
if (ctx->logstash_prefix_key_len != 0) {
if (ec->logstash_prefix_key_len != 0) {
for (i = 0; i < map_size; i++) {
key = map.via.map.ptr[i].key;
if (key.type != MSGPACK_OBJECT_STR) {
continue;
}
if (key.via.str.size != ctx->logstash_prefix_key_len) {
if (key.via.str.size != ec->logstash_prefix_key_len) {
continue;
}
if (strncmp(key.via.str.ptr, ctx->logstash_prefix_key, ctx->logstash_prefix_key_len) != 0) {
if (strncmp(key.via.str.ptr, ec->logstash_prefix_key, ec->logstash_prefix_key_len) != 0) {
continue;
}
val = map.via.map.ptr[i].val;
Expand All @@ -294,62 +296,62 @@ static char *elasticsearch_format(const void *data, size_t bytes,
msgpack_sbuffer_init(&tmp_sbuf);
msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);

if (ctx->include_tag_key == FLB_TRUE) {
if (ec->include_tag_key == FLB_TRUE) {
map_size++;
}

/* Set the new map size */
msgpack_pack_map(&tmp_pck, map_size + 1);

/* Append the time key */
msgpack_pack_str(&tmp_pck, ctx->time_key_len);
msgpack_pack_str_body(&tmp_pck, ctx->time_key, ctx->time_key_len);
msgpack_pack_str(&tmp_pck, ec->time_key_len);
msgpack_pack_str_body(&tmp_pck, ec->time_key, ec->time_key_len);

/* Format the time */
gmtime_r(&tms.tm.tv_sec, &tm);
s = strftime(time_formatted, sizeof(time_formatted) - 1,
ctx->time_key_format, &tm);
ec->time_key_format, &tm);
len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s,
".%03" PRIu64 "Z", (uint64_t) tms.tm.tv_nsec);

s += len;
msgpack_pack_str(&tmp_pck, s);
msgpack_pack_str_body(&tmp_pck, time_formatted, s);

es_index = ctx->index;
if (ctx->logstash_format == FLB_TRUE) {
es_index = ec->index;
if (ec->logstash_format == FLB_TRUE) {
/* Compose Index header */
if (es_index_custom_len > 0) {
p = logstash_index + es_index_custom_len;
} else {
p = logstash_index + ctx->logstash_prefix_len;
p = logstash_index + ec->logstash_prefix_len;
}
*p++ = '-';

len = p - logstash_index;
s = strftime(p, sizeof(logstash_index) - len - 1,
ctx->logstash_dateformat, &tm);
ec->logstash_dateformat, &tm);
p += s;
*p++ = '\0';
es_index = logstash_index;
if (ctx->generate_id == FLB_FALSE) {
if (ec->generate_id == FLB_FALSE) {
index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT,
es_index, ctx->type);
es_index, ec->type);
}
}
else if (ctx->current_time_index == FLB_TRUE) {
else if (ec->current_time_index == FLB_TRUE) {
/* Make sure we handle index time format for index */
strftime(index_formatted, sizeof(index_formatted) - 1,
ctx->index, &tm);
ec->index, &tm);
es_index = index_formatted;
}

/* Tag Key */
if (ctx->include_tag_key == FLB_TRUE) {
msgpack_pack_str(&tmp_pck, ctx->tag_key_len);
msgpack_pack_str_body(&tmp_pck, ctx->tag_key, ctx->tag_key_len);
if (ec->include_tag_key == FLB_TRUE) {
msgpack_pack_str(&tmp_pck, ec->tag_key_len);
msgpack_pack_str_body(&tmp_pck, ec->tag_key, ec->tag_key_len);
msgpack_pack_str(&tmp_pck, tag_len);
msgpack_pack_str_body(&tmp_pck, tag, tag_len);
}
Expand All @@ -361,15 +363,15 @@ static char *elasticsearch_format(const void *data, size_t bytes,
* Elasticsearch have a restriction that key names cannot contain
* a dot; if some dot is found, it's replaced with an underscore.
*/
ret = es_pack_map_content(&tmp_pck, map, ctx);
ret = es_pack_map_content(&tmp_pck, map, ec);
if (ret == -1) {
msgpack_unpacked_destroy(&result);
msgpack_sbuffer_destroy(&tmp_sbuf);
es_bulk_destroy(bulk);
return NULL;
}

if (ctx->generate_id == FLB_TRUE) {
if (ec->generate_id == FLB_TRUE) {
MurmurHash3_x64_128(tmp_sbuf.data, tmp_sbuf.size, 42, hash);
snprintf(es_uuid, sizeof(es_uuid),
"%04x%04x-%04x-%04x-%04x-%04x%04x%04x",
Expand All @@ -378,7 +380,7 @@ static char *elasticsearch_format(const void *data, size_t bytes,
index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT_ID,
es_index, ctx->type, es_uuid);
es_index, ec->type, es_uuid);
}

/* Convert msgpack to JSON */
Expand Down Expand Up @@ -412,7 +414,7 @@ static char *elasticsearch_format(const void *data, size_t bytes,
* return the bulk->ptr buffer
*/
flb_free(bulk);
if (ctx->trace_output) {
if (ec->trace_output) {
fwrite(buf, 1, *out_size, stdout);
fflush(stdout);
}
Expand All @@ -423,20 +425,30 @@ int cb_es_init(struct flb_output_instance *ins,
struct flb_config *config,
void *data)
{
int ret;
const char *tmp;
struct flb_elasticsearch *ctx;
(void) data;

ctx = flb_es_conf_create(ins, config);
ctx = flb_calloc(1, sizeof(struct flb_elasticsearch));
if (!ctx) {
flb_error("[out_es] cannot initialize plugin");
flb_errno();
return -1;
}

flb_debug("[out_es] host=%s port=%i uri=%s index=%s type=%s",
ins->host.name, ins->host.port, ctx->uri,
ctx->index, ctx->type);

mk_list_init(&ctx->configs);
flb_output_set_context(ins, ctx);
return 0;

/* Configure HA or simple mode ? */
tmp = flb_output_get_property("upstream", ins);
if (tmp) {
ret = es_config_ha(tmp, ctx, config);
}
else {
ret = es_config_simple(ins, ctx, config);
}

return ret;
}

static int elasticsearch_error_check(struct flb_http_client *c)
Expand Down Expand Up @@ -546,46 +558,72 @@ void cb_es_flush(const void *data, size_t bytes,
char *pack;
size_t b_sent;
struct flb_elasticsearch *ctx = out_context;
struct flb_elasticsearch_config *ec = NULL;
struct flb_upstream_conn *u_conn;
struct flb_http_client *c;
struct flb_upstream_node *node;
(void) i_ins;
(void) tag;
(void) tag_len;

if (ctx->ha_mode == FLB_TRUE) {
node = flb_upstream_ha_node_get(ctx->ha);
if (!node) {
flb_error("[out_es] cannot get an Upstream HA node");
FLB_OUTPUT_RETURN(FLB_RETRY);
}

/* Get forward_config stored in node opaque data */
ec = flb_upstream_node_get_data(node);
}
else {
ec = mk_list_entry_first(&ctx->configs,
struct flb_elasticsearch_config,
_head);
}

flb_debug("[out_es] trying node %s", node->name);

/* Get upstream connection */
u_conn = flb_upstream_conn_get(ctx->u);
if (ctx->ha_mode == FLB_TRUE) {
u_conn = flb_upstream_conn_get(node->u);
}
else {
u_conn = flb_upstream_conn_get(ctx->u);
}
if (!u_conn) {
flb_error("[out_es] no upstream connections available");
FLB_OUTPUT_RETURN(FLB_RETRY);
}

/* Convert format */
pack = elasticsearch_format(data, bytes, tag, tag_len, &bytes_out, ctx);
pack = elasticsearch_format(data, bytes, tag, tag_len, &bytes_out, ec);
if (!pack) {
flb_upstream_conn_release(u_conn);
FLB_OUTPUT_RETURN(FLB_ERROR);
}

/* Compose HTTP Client request */
c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->uri,
c = flb_http_client(u_conn, FLB_HTTP_POST, ec->uri,
pack, bytes_out, NULL, 0, NULL, 0);

flb_http_buffer_size(c, ctx->buffer_size);
flb_http_buffer_size(c, ec->buffer_size);

flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
flb_http_add_header(c, "Content-Type", 12, "application/x-ndjson", 20);

if (ctx->http_user && ctx->http_passwd) {
flb_http_basic_auth(c, ctx->http_user, ctx->http_passwd);
if (ec->http_user && ec->http_passwd) {
flb_http_basic_auth(c, ec->http_user, ec->http_passwd);
}

ret = flb_http_do(c, &b_sent);
if (ret != 0) {
flb_warn("[out_es] http_do=%i URI=%s", ret, ctx->uri);
flb_warn("[out_es] http_do=%i URI=%s", ret, ec->uri);
goto retry;
}
else {
/* The request was issued successfully, validate the 'error' field */
flb_debug("[out_es] HTTP Status=%i URI=%s", c->resp.status, ctx->uri);
flb_debug("[out_es] HTTP Status=%i URI=%s", c->resp.status, ec->uri);
if (c->resp.status != 200 && c->resp.status != 201) {
goto retry;
}
Expand All @@ -598,7 +636,7 @@ void cb_es_flush(const void *data, size_t bytes,
ret = elasticsearch_error_check(c);
if (ret == FLB_TRUE) {
/* we got an error */
if (ctx->trace_error) {
if (ec->trace_error) {
/*
* If trace_error is set, trace the actual
* input/output to Elasticsearch that caused the problem.
Expand Down Expand Up @@ -635,8 +673,32 @@ void cb_es_flush(const void *data, size_t bytes,
int cb_es_exit(void *data, struct flb_config *config)
{
struct flb_elasticsearch *ctx = data;
struct flb_elasticsearch_config *ec;
struct mk_list *head;
struct mk_list *tmp;
(void) config;

if (!ctx) {
return 0;
}

/* Destroy elasticsearch_config contexts */
mk_list_foreach_safe(head, tmp, &ctx->configs) {
ec = mk_list_entry(head, struct flb_elasticsearch_config, _head);
mk_list_del(&ec->_head);
flb_es_conf_destroy(ec);
}

if (ctx->ha_mode == FLB_TRUE) {
if (ctx->ha) {
flb_upstream_ha_destroy(ctx->ha);
}
}
else {
flb_upstream_destroy(ctx->u);
}
flb_free(ctx);

flb_es_conf_destroy(ctx);
return 0;
}

Expand Down
Loading

0 comments on commit b08b568

Please sign in to comment.