diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index 5d6a428992a..b0773a40991 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -647,7 +647,7 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, { int i, j, k; int ret; - int check = FLB_FALSE; + int check = 0; int root_type; char *out_buf; size_t off = 0; @@ -671,17 +671,20 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, if (ret == -1) { /* Is this an incomplete HTTP Request ? */ if (c->resp.payload_size <= 0) { - return FLB_TRUE; + check |= FLB_ES_STATUS_IMCOMPLETE; + return check; } /* Lookup error field */ if (strstr(c->resp.payload, "\"errors\":false,\"items\":[")) { - return FLB_FALSE; + check |= FLB_ES_STATUS_SUCCESS; + return check; } flb_plg_error(ctx->ins, "could not pack/validate JSON response\n%s", c->resp.payload); - return FLB_TRUE; + check |= FLB_ES_STATUS_BAD_RESPONSE; + return check; } /* Lookup error field */ @@ -690,14 +693,15 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, if (ret != MSGPACK_UNPACK_SUCCESS) { flb_plg_error(ctx->ins, "Cannot unpack response to find error\n%s", c->resp.payload); - return FLB_TRUE; + check |= FLB_ES_STATUS_ERROR_UNPACK; + return check; } root = result.data; if (root.type != MSGPACK_OBJECT_MAP) { flb_plg_error(ctx->ins, "unexpected payload type=%i", root.type); - check = FLB_TRUE; + check |= FLB_ES_STATUS_BAD_TYPE; goto done; } @@ -706,7 +710,7 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, if (key.type != MSGPACK_OBJECT_STR) { flb_plg_error(ctx->ins, "unexpected key type=%i", key.type); - check = FLB_TRUE; + check |= FLB_ES_STATUS_INVAILD_ARGUMENT; goto done; } @@ -715,14 +719,14 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, if (val.type != MSGPACK_OBJECT_BOOLEAN) { flb_plg_error(ctx->ins, "unexpected 'error' value type=%i", val.type); - check = FLB_TRUE; + check |= FLB_ES_STATUS_BAD_TYPE; goto done; } /* If error == false, we are OK (no errors = FLB_FALSE) */ if (!val.via.boolean) { /* no errors */ - check = FLB_FALSE; + check |= FLB_ES_STATUS_SUCCESS; goto done; } } @@ -731,7 +735,7 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, if (val.type != MSGPACK_OBJECT_ARRAY) { flb_plg_error(ctx->ins, "unexpected 'items' value type=%i", val.type); - check = FLB_TRUE; + check |= FLB_ES_STATUS_BAD_TYPE; goto done; } @@ -740,14 +744,14 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, if (item.type != MSGPACK_OBJECT_MAP) { flb_plg_error(ctx->ins, "unexpected 'item' outer value type=%i", item.type); - check = FLB_TRUE; + check |= FLB_ES_STATUS_BAD_TYPE; goto done; } if (item.via.map.size != 1) { flb_plg_error(ctx->ins, "unexpected 'item' size=%i", item.via.map.size); - check = FLB_TRUE; + check |= FLB_ES_STATUS_INVAILD_ARGUMENT; goto done; } @@ -755,7 +759,7 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, if (item.type != MSGPACK_OBJECT_MAP) { flb_plg_error(ctx->ins, "unexpected 'item' inner value type=%i", item.type); - check = FLB_TRUE; + check |= FLB_ES_STATUS_BAD_TYPE; goto done; } @@ -764,7 +768,7 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, if (item_key.type != MSGPACK_OBJECT_STR) { flb_plg_error(ctx->ins, "unexpected key type=%i", item_key.type); - check = FLB_TRUE; + check |= FLB_ES_STATUS_BAD_TYPE; goto done; } @@ -774,13 +778,16 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, if (item_val.type != MSGPACK_OBJECT_POSITIVE_INTEGER) { flb_plg_error(ctx->ins, "unexpected 'status' value type=%i", item_val.type); - check = FLB_TRUE; + check |= FLB_ES_STATUS_BAD_TYPE; goto done; } + /* Check for success responses */ + if (item_val.via.i64 == 200 || item_val.via.i64 == 201) { + check |= FLB_ES_STATUS_SUCCESS; + } /* Check for errors other than version conflict (document already exists) */ if (item_val.via.i64 != 409) { - check = FLB_TRUE; - goto done; + check |= FLB_ES_STATUS_ERROR; } } } @@ -920,7 +927,11 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk, * and lookup the 'error' field. */ ret = elasticsearch_error_check(ctx, c); - if (ret == FLB_TRUE) { + if (ret & FLB_ES_STATUS_SUCCESS) { + flb_plg_debug(ctx->ins, "Elasticsearch response\n%s", + c->resp.payload); + } + else { /* we got an error */ if (ctx->trace_error) { /* @@ -946,10 +957,6 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk, } goto retry; } - else { - flb_plg_debug(ctx->ins, "Elasticsearch response\n%s", - c->resp.payload); - } } else { goto retry; diff --git a/plugins/out_es/es.h b/plugins/out_es/es.h index a3f1ecadf98..b6512ebc2a0 100644 --- a/plugins/out_es/es.h +++ b/plugins/out_es/es.h @@ -36,6 +36,15 @@ #define FLB_ES_WRITE_OP_UPDATE "update" #define FLB_ES_WRITE_OP_UPSERT "upsert" +#define FLB_ES_STATUS_SUCCESS (1 << 0) +#define FLB_ES_STATUS_IMCOMPLETE (1 << 1) +#define FLB_ES_STATUS_ERROR_UNPACK (1 << 2) +#define FLB_ES_STATUS_BAD_TYPE (1 << 3) +#define FLB_ES_STATUS_INVAILD_ARGUMENT (1 << 4) +#define FLB_ES_STATUS_BAD_RESPONSE (1 << 5) +#define FLB_ES_STATUS_DUPLICATES (1 << 6) +#define FLB_ES_STATUS_ERROR (1 << 7) + struct flb_elasticsearch { /* Elasticsearch index (database) and type (table) */ char *index;