Skip to content

Commit

Permalink
out_elasticsearch: Process error information properly
Browse files Browse the repository at this point in the history
The current out_elasticsearch implementation is just giving up to
process responses when encountering the error information.
In this patch, continue to process until the end of the elements of
msgpack converted JSON response.
If there is a succeeded infrmation in the converted response,
Fluent Bit will assume the requesting payloads to be succeeded to send
ES clusters.

Signed-off-by: Hiroshi Hatake <[email protected]>
  • Loading branch information
cosmo0920 authored and edsiper committed Aug 27, 2024
1 parent 36c46b7 commit e37b56b
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 22 deletions.
51 changes: 29 additions & 22 deletions plugins/out_es/es.c
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx,
{
int i, j, k;
int ret;
int check = FLB_FALSE;
int check = 0;
int root_type;
char *out_buf;
size_t off = 0;
Expand All @@ -671,17 +671,20 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx,
if (ret == -1) {
/* Is this an incomplete HTTP Request ? */
if (c->resp.payload_size <= 0) {
return FLB_TRUE;
check |= FLB_ES_STATUS_IMCOMPLETE;
return check;
}

/* Lookup error field */
if (strstr(c->resp.payload, "\"errors\":false,\"items\":[")) {
return FLB_FALSE;
check |= FLB_ES_STATUS_SUCCESS;
return check;
}

flb_plg_error(ctx->ins, "could not pack/validate JSON response\n%s",
c->resp.payload);
return FLB_TRUE;
check |= FLB_ES_STATUS_BAD_RESPONSE;
return check;
}

/* Lookup error field */
Expand All @@ -690,14 +693,15 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx,
if (ret != MSGPACK_UNPACK_SUCCESS) {
flb_plg_error(ctx->ins, "Cannot unpack response to find error\n%s",
c->resp.payload);
return FLB_TRUE;
check |= FLB_ES_STATUS_ERROR_UNPACK;
return check;
}

root = result.data;
if (root.type != MSGPACK_OBJECT_MAP) {
flb_plg_error(ctx->ins, "unexpected payload type=%i",
root.type);
check = FLB_TRUE;
check |= FLB_ES_STATUS_BAD_TYPE;
goto done;
}

Expand All @@ -706,7 +710,7 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx,
if (key.type != MSGPACK_OBJECT_STR) {
flb_plg_error(ctx->ins, "unexpected key type=%i",
key.type);
check = FLB_TRUE;
check |= FLB_ES_STATUS_INVAILD_ARGUMENT;
goto done;
}

Expand All @@ -715,14 +719,14 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx,
if (val.type != MSGPACK_OBJECT_BOOLEAN) {
flb_plg_error(ctx->ins, "unexpected 'error' value type=%i",
val.type);
check = FLB_TRUE;
check |= FLB_ES_STATUS_BAD_TYPE;
goto done;
}

/* If error == false, we are OK (no errors = FLB_FALSE) */
if (!val.via.boolean) {
/* no errors */
check = FLB_FALSE;
check |= FLB_ES_STATUS_SUCCESS;
goto done;
}
}
Expand All @@ -731,7 +735,7 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx,
if (val.type != MSGPACK_OBJECT_ARRAY) {
flb_plg_error(ctx->ins, "unexpected 'items' value type=%i",
val.type);
check = FLB_TRUE;
check |= FLB_ES_STATUS_BAD_TYPE;
goto done;
}

Expand All @@ -740,22 +744,22 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx,
if (item.type != MSGPACK_OBJECT_MAP) {
flb_plg_error(ctx->ins, "unexpected 'item' outer value type=%i",
item.type);
check = FLB_TRUE;
check |= FLB_ES_STATUS_BAD_TYPE;
goto done;
}

if (item.via.map.size != 1) {
flb_plg_error(ctx->ins, "unexpected 'item' size=%i",
item.via.map.size);
check = FLB_TRUE;
check |= FLB_ES_STATUS_INVAILD_ARGUMENT;
goto done;
}

item = item.via.map.ptr[0].val;
if (item.type != MSGPACK_OBJECT_MAP) {
flb_plg_error(ctx->ins, "unexpected 'item' inner value type=%i",
item.type);
check = FLB_TRUE;
check |= FLB_ES_STATUS_BAD_TYPE;
goto done;
}

Expand All @@ -764,7 +768,7 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx,
if (item_key.type != MSGPACK_OBJECT_STR) {
flb_plg_error(ctx->ins, "unexpected key type=%i",
item_key.type);
check = FLB_TRUE;
check |= FLB_ES_STATUS_BAD_TYPE;
goto done;
}

Expand All @@ -774,13 +778,16 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx,
if (item_val.type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
flb_plg_error(ctx->ins, "unexpected 'status' value type=%i",
item_val.type);
check = FLB_TRUE;
check |= FLB_ES_STATUS_BAD_TYPE;
goto done;
}
/* Check for success responses */
if (item_val.via.i64 == 200 || item_val.via.i64 == 201) {
check |= FLB_ES_STATUS_SUCCESS;
}
/* Check for errors other than version conflict (document already exists) */
if (item_val.via.i64 != 409) {
check = FLB_TRUE;
goto done;
check |= FLB_ES_STATUS_ERROR;
}
}
}
Expand Down Expand Up @@ -920,7 +927,11 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk,
* and lookup the 'error' field.
*/
ret = elasticsearch_error_check(ctx, c);
if (ret == FLB_TRUE) {
if (ret & FLB_ES_STATUS_SUCCESS) {
flb_plg_debug(ctx->ins, "Elasticsearch response\n%s",
c->resp.payload);
}
else {
/* we got an error */
if (ctx->trace_error) {
/*
Expand All @@ -946,10 +957,6 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk,
}
goto retry;
}
else {
flb_plg_debug(ctx->ins, "Elasticsearch response\n%s",
c->resp.payload);
}
}
else {
goto retry;
Expand Down
9 changes: 9 additions & 0 deletions plugins/out_es/es.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@
#define FLB_ES_WRITE_OP_UPDATE "update"
#define FLB_ES_WRITE_OP_UPSERT "upsert"

#define FLB_ES_STATUS_SUCCESS (1 << 0)
#define FLB_ES_STATUS_IMCOMPLETE (1 << 1)
#define FLB_ES_STATUS_ERROR_UNPACK (1 << 2)
#define FLB_ES_STATUS_BAD_TYPE (1 << 3)
#define FLB_ES_STATUS_INVAILD_ARGUMENT (1 << 4)
#define FLB_ES_STATUS_BAD_RESPONSE (1 << 5)
#define FLB_ES_STATUS_DUPLICATES (1 << 6)
#define FLB_ES_STATUS_ERROR (1 << 7)

struct flb_elasticsearch {
/* Elasticsearch index (database) and type (table) */
char *index;
Expand Down

1 comment on commit e37b56b

@sirwio
Copy link

@sirwio sirwio commented on e37b56b Sep 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why this issue is not addressed/fixed on out_opensearch?

Please sign in to comment.