From e37b56b2a1c75ca50c15848a945b448f7d85ee85 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 15 Aug 2024 19:32:33 +0900 Subject: [PATCH] out_elasticsearch: Process error information properly The current out_elasticsearch implementation is just giving up to process responses when encountering the error information. In this patch, continue to process until the end of the elements of msgpack converted JSON response. If there is a succeeded infrmation in the converted response, Fluent Bit will assume the requesting payloads to be succeeded to send ES clusters. Signed-off-by: Hiroshi Hatake --- plugins/out_es/es.c | 51 ++++++++++++++++++++++++++------------------- plugins/out_es/es.h | 9 ++++++++ 2 files changed, 38 insertions(+), 22 deletions(-) diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index 5d6a428992a..b0773a40991 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -647,7 +647,7 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, { int i, j, k; int ret; - int check = FLB_FALSE; + int check = 0; int root_type; char *out_buf; size_t off = 0; @@ -671,17 +671,20 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, if (ret == -1) { /* Is this an incomplete HTTP Request ? */ if (c->resp.payload_size <= 0) { - return FLB_TRUE; + check |= FLB_ES_STATUS_IMCOMPLETE; + return check; } /* Lookup error field */ if (strstr(c->resp.payload, "\"errors\":false,\"items\":[")) { - return FLB_FALSE; + check |= FLB_ES_STATUS_SUCCESS; + return check; } flb_plg_error(ctx->ins, "could not pack/validate JSON response\n%s", c->resp.payload); - return FLB_TRUE; + check |= FLB_ES_STATUS_BAD_RESPONSE; + return check; } /* Lookup error field */ @@ -690,14 +693,15 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, if (ret != MSGPACK_UNPACK_SUCCESS) { flb_plg_error(ctx->ins, "Cannot unpack response to find error\n%s", c->resp.payload); - return FLB_TRUE; + check |= FLB_ES_STATUS_ERROR_UNPACK; + return check; } root = result.data; if (root.type != MSGPACK_OBJECT_MAP) { flb_plg_error(ctx->ins, "unexpected payload type=%i", root.type); - check = FLB_TRUE; + check |= FLB_ES_STATUS_BAD_TYPE; goto done; } @@ -706,7 +710,7 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, if (key.type != MSGPACK_OBJECT_STR) { flb_plg_error(ctx->ins, "unexpected key type=%i", key.type); - check = FLB_TRUE; + check |= FLB_ES_STATUS_INVAILD_ARGUMENT; goto done; } @@ -715,14 +719,14 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, if (val.type != MSGPACK_OBJECT_BOOLEAN) { flb_plg_error(ctx->ins, "unexpected 'error' value type=%i", val.type); - check = FLB_TRUE; + check |= FLB_ES_STATUS_BAD_TYPE; goto done; } /* If error == false, we are OK (no errors = FLB_FALSE) */ if (!val.via.boolean) { /* no errors */ - check = FLB_FALSE; + check |= FLB_ES_STATUS_SUCCESS; goto done; } } @@ -731,7 +735,7 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, if (val.type != MSGPACK_OBJECT_ARRAY) { flb_plg_error(ctx->ins, "unexpected 'items' value type=%i", val.type); - check = FLB_TRUE; + check |= FLB_ES_STATUS_BAD_TYPE; goto done; } @@ -740,14 +744,14 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, if (item.type != MSGPACK_OBJECT_MAP) { flb_plg_error(ctx->ins, "unexpected 'item' outer value type=%i", item.type); - check = FLB_TRUE; + check |= FLB_ES_STATUS_BAD_TYPE; goto done; } if (item.via.map.size != 1) { flb_plg_error(ctx->ins, "unexpected 'item' size=%i", item.via.map.size); - check = FLB_TRUE; + check |= FLB_ES_STATUS_INVAILD_ARGUMENT; goto done; } @@ -755,7 +759,7 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, if (item.type != MSGPACK_OBJECT_MAP) { flb_plg_error(ctx->ins, "unexpected 'item' inner value type=%i", item.type); - check = FLB_TRUE; + check |= FLB_ES_STATUS_BAD_TYPE; goto done; } @@ -764,7 +768,7 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, if (item_key.type != MSGPACK_OBJECT_STR) { flb_plg_error(ctx->ins, "unexpected key type=%i", item_key.type); - check = FLB_TRUE; + check |= FLB_ES_STATUS_BAD_TYPE; goto done; } @@ -774,13 +778,16 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, if (item_val.type != MSGPACK_OBJECT_POSITIVE_INTEGER) { flb_plg_error(ctx->ins, "unexpected 'status' value type=%i", item_val.type); - check = FLB_TRUE; + check |= FLB_ES_STATUS_BAD_TYPE; goto done; } + /* Check for success responses */ + if (item_val.via.i64 == 200 || item_val.via.i64 == 201) { + check |= FLB_ES_STATUS_SUCCESS; + } /* Check for errors other than version conflict (document already exists) */ if (item_val.via.i64 != 409) { - check = FLB_TRUE; - goto done; + check |= FLB_ES_STATUS_ERROR; } } } @@ -920,7 +927,11 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk, * and lookup the 'error' field. */ ret = elasticsearch_error_check(ctx, c); - if (ret == FLB_TRUE) { + if (ret & FLB_ES_STATUS_SUCCESS) { + flb_plg_debug(ctx->ins, "Elasticsearch response\n%s", + c->resp.payload); + } + else { /* we got an error */ if (ctx->trace_error) { /* @@ -946,10 +957,6 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk, } goto retry; } - else { - flb_plg_debug(ctx->ins, "Elasticsearch response\n%s", - c->resp.payload); - } } else { goto retry; diff --git a/plugins/out_es/es.h b/plugins/out_es/es.h index a3f1ecadf98..b6512ebc2a0 100644 --- a/plugins/out_es/es.h +++ b/plugins/out_es/es.h @@ -36,6 +36,15 @@ #define FLB_ES_WRITE_OP_UPDATE "update" #define FLB_ES_WRITE_OP_UPSERT "upsert" +#define FLB_ES_STATUS_SUCCESS (1 << 0) +#define FLB_ES_STATUS_IMCOMPLETE (1 << 1) +#define FLB_ES_STATUS_ERROR_UNPACK (1 << 2) +#define FLB_ES_STATUS_BAD_TYPE (1 << 3) +#define FLB_ES_STATUS_INVAILD_ARGUMENT (1 << 4) +#define FLB_ES_STATUS_BAD_RESPONSE (1 << 5) +#define FLB_ES_STATUS_DUPLICATES (1 << 6) +#define FLB_ES_STATUS_ERROR (1 << 7) + struct flb_elasticsearch { /* Elasticsearch index (database) and type (table) */ char *index;