From 175000eaad56e9600e9cf795ba32eff7288abe5b Mon Sep 17 00:00:00 2001 From: avilevy Date: Tue, 3 Oct 2023 15:45:27 +0000 Subject: [PATCH] out_stackdriver: Add `partialSuccess: true` to all logs sent to Google Cloud Logging API Signed-off-by: avilevy --- plugins/out_stackdriver/stackdriver.c | 98 ++++++++++++++++++++++++++- 1 file changed, 97 insertions(+), 1 deletion(-) diff --git a/plugins/out_stackdriver/stackdriver.c b/plugins/out_stackdriver/stackdriver.c index 5c48a338b81..a7571a80261 100644 --- a/plugins/out_stackdriver/stackdriver.c +++ b/plugins/out_stackdriver/stackdriver.c @@ -1769,7 +1769,12 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx, * {"resource": {"type": "...", "labels": {...}, * "entries": [] */ - msgpack_pack_map(&mp_pck, 2); + msgpack_pack_map(&mp_pck, 3); + + /* Set partialSuccess to true */ + msgpack_pack_str(&mp_pck, 14); + msgpack_pack_str_body(&mp_pck, "partialSuccess", 14); + msgpack_pack_true(&mp_pck); msgpack_pack_str(&mp_pck, 8); msgpack_pack_str_body(&mp_pck, "resource", 8); @@ -2510,6 +2515,91 @@ static void update_retry_metric(struct flb_stackdriver *ctx, } #endif +static int extract_map_from_msgpack(msgpack_object *root, char *name, int size, msgpack_object_type object_type ,msgpack_object *val){ + int i; + msgpack_object key; + for (i = 0; i < root->via.map.size; i++) { + key = root->via.map.ptr[i].key; + if (key.type != MSGPACK_OBJECT_STR) { + continue; + } + if(key.via.str.size == size && strncmp(key.via.str.ptr, name, size) == 0){ + *val = root->via.map.ptr[i].val; + if (val->type != object_type) { + return -1; + } + return 0; + } + } + return -1; +} + +static int parse_partial_success_response(struct flb_http_client *c, struct flb_stackdriver *ctx, int *partial_failure){ + int ret; + int root_type; + int i; + char *buffer; + size_t size; + size_t off = 0; + msgpack_unpacked result; + msgpack_object root; + msgpack_object error_map; + msgpack_object details_arr; + msgpack_object logEntryErrors_map; + ret = flb_pack_json(c->resp.payload, c->resp.payload_size, + &buffer, &size, &root_type, NULL); + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, buffer, size, &off); + if (ret != MSGPACK_UNPACK_SUCCESS) { + flb_plg_error(ctx->ins, "Cannot unpack %s response\n%s", c->resp.payload); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + return -1; + } + + root = result.data; + if (root.type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "%s response parsing failed, msgpack_type=%i", root.type); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + return -1; + } + + ret = extract_map_from_msgpack(&root, "error", 5, MSGPACK_OBJECT_MAP ,&error_map); + if (ret == -1) { + flb_plg_error(ctx->ins, "%s response parsing failed, could not find key: \"error\""); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + return -1; + } + ret = extract_map_from_msgpack(&error_map, "details", 7, MSGPACK_OBJECT_ARRAY ,&details_arr); + if (ret == -1) { + flb_plg_error(ctx->ins, "%s response parsing failed, could not find key: \"details\""); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + return -1; + } + for (i = 0; i < details_arr.via.array.size; i++) { + logEntryErrors_map = details_arr.via.array.ptr[i]; + if (logEntryErrors_map.type != MSGPACK_OBJECT_MAP) { + continue; + } + ret = extract_map_from_msgpack(&logEntryErrors_map, "logEntryErrors", 14, MSGPACK_OBJECT_MAP ,&logEntryErrors_map); + if (ret == -1){ + flb_plg_error(ctx->ins, "%s response parsing failed, could not find key: \"logEntryErrors\""); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + return -1; + } + break; + } + + *partial_failure = logEntryErrors_map.via.map.size; + flb_free(buffer); + msgpack_unpacked_destroy(&result); + return 0; +} + static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk, struct flb_output_flush *out_flush, struct flb_input_instance *i_ins, @@ -2520,6 +2610,7 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk, (void) config; int ret; int ret_code = FLB_RETRY; + int partial_failure; size_t b_sent; flb_sds_t token; flb_sds_t payload_buf; @@ -2637,6 +2728,11 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk, } else if (c->resp.status >= 400 && c->resp.status < 500) { ret_code = FLB_ERROR; + if(c->resp.status == 400){ + ret = parse_partial_success_response(c, ctx, &partial_failure); + cmt_counter_add(ctx->cmt_failed_requests, ts, (double)partial_failure, 1, (char *[]) {name}); + } + flb_plg_warn(ctx->ins, "error\n%s", c->resp.payload); }