diff --git a/plugins/out_stackdriver/stackdriver.c b/plugins/out_stackdriver/stackdriver.c index 5c48a338b81..23d31ad4f0a 100644 --- a/plugins/out_stackdriver/stackdriver.c +++ b/plugins/out_stackdriver/stackdriver.c @@ -435,31 +435,50 @@ void replace_prefix_dot(flb_sds_t s, int tag_prefix_len) } } } +static int extract_msgpack_obj_from_msgpack_map(msgpack_object_map *root, + char *name, int size, + msgpack_object_type object_type, + msgpack_object *val) +{ + int i; + msgpack_object key; + + if (root == NULL) { + return -1; + } + for (i = 0; i < root->size; i++) { + key = root->ptr[i].key; + if (key.type != MSGPACK_OBJECT_STR) { + continue; + } + if (key.via.str.size == size + && strncmp(key.via.str.ptr, name, size) == 0) { + *val = root->ptr[i].val; + if (val->type != object_type) { + return -1; + } + return 0; + } + } + return -1; +} + static flb_sds_t get_str_value_from_msgpack_map(msgpack_object_map map, const char *key, int key_size) { int i; + int ret; msgpack_object k; msgpack_object v; flb_sds_t ptr = NULL; - for (i = 0; i < map.size; i++) { - k = map.ptr[i].key; - v = map.ptr[i].val; - - if (k.type != MSGPACK_OBJECT_STR) { - continue; - } - - if (k.via.str.size == key_size && - strncmp(key, (char *) k.via.str.ptr, k.via.str.size) == 0) { - /* make sure to free it after use */ - ptr = flb_sds_create_len(v.via.str.ptr, v.via.str.size); - break; - } +// convert msgpack_object_map to msgpack_object + ret = extract_msgpack_obj_from_msgpack_map(&map, (char*) key, key_size, + MSGPACK_OBJECT_STR, &v); + if (ret == 0) { + ptr = flb_sds_create_len(v.via.str.ptr, v.via.str.size); } - return ptr; } @@ -1769,7 +1788,12 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx, * {"resource": {"type": "...", "labels": {...}, * "entries": [] */ - msgpack_pack_map(&mp_pck, 2); + msgpack_pack_map(&mp_pck, 3); + + /* Set partialSuccess to true */ + msgpack_pack_str(&mp_pck, 14); + msgpack_pack_str_body(&mp_pck, "partialSuccess", 14); + msgpack_pack_true(&mp_pck); msgpack_pack_str(&mp_pck, 8); msgpack_pack_str_body(&mp_pck, "resource", 8); @@ -2467,41 +2491,51 @@ static int stackdriver_format_test(struct flb_config *config, return 0; } - #ifdef FLB_HAVE_METRICS -static void update_http_metrics(struct flb_stackdriver *ctx, - struct flb_event_chunk *event_chunk, +static void add_record_metrics(struct flb_stackdriver* ctx, + uint64_t ts, + int val, + int response_code, + int grpc_code) +{ + char grpc_code_label[32]; + char response_code_label[32]; + char* name = (char*) flb_output_name(ctx->ins); + /* convert status to string format */ + snprintf(response_code_label, sizeof(response_code_label) - 1, "%i", + response_code); + /* convert grpc_code to string format */ + snprintf(grpc_code_label, sizeof(grpc_code_label) - 1, "%i", grpc_code); + + /* processed records total */ + cmt_counter_add(ctx->cmt_proc_records_total, ts, val, 3, + (char* []) {grpc_code_label, response_code_label, name}); +} + +static void update_http_metrics(struct flb_stackdriver* ctx, + struct flb_event_chunk* event_chunk, uint64_t ts, int http_status) { - char tmp[32]; + char response_code_label[32]; /* convert status to string format */ - snprintf(tmp, sizeof(tmp) - 1, "%i", http_status); - char *name = (char *) flb_output_name(ctx->ins); - - /* processed records total */ - cmt_counter_add(ctx->cmt_proc_records_total, ts, event_chunk->total_events, - 2, (char *[]) {tmp, name}); + snprintf(response_code_label, sizeof(response_code_label) - 1, "%i", + http_status); + char* name = (char*) flb_output_name(ctx->ins); - /* HTTP status */ - if (http_status != STACKDRIVER_NET_ERROR) { - cmt_counter_inc(ctx->cmt_requests_total, ts, 2, (char *[]) {tmp, name}); - } + cmt_counter_inc(ctx->cmt_requests_total, ts, 2, + (char* []) {response_code_label, name}); } static void update_retry_metric(struct flb_stackdriver *ctx, struct flb_event_chunk *event_chunk, uint64_t ts, - int http_status, int ret_code) + int http_status) { char tmp[32]; char *name = (char *) flb_output_name(ctx->ins); - if (ret_code != FLB_RETRY) { - return; - } - /* convert status to string format */ snprintf(tmp, sizeof(tmp) - 1, "%i", http_status); cmt_counter_add(ctx->cmt_retried_records_total, @@ -2510,6 +2544,178 @@ static void update_retry_metric(struct flb_stackdriver *ctx, } #endif +static int parse_partial_success_response(struct flb_http_client* c, + struct flb_stackdriver* ctx, + uint64_t ts, + int total_events, + int* grpc_status_codes) +{ + int ret; + int root_type; + int i; + int log_entry_ret; + int code_ret; + char* buffer; + char at_type_str[PARTIAL_SUCCESS_GRPC_TYPE_SIZE]; + size_t size; + size_t off = 0; + msgpack_unpacked result; + msgpack_object root; + msgpack_object error_map; + msgpack_object details_arr; + msgpack_object details_map; + msgpack_object logEntryErrors_map; + msgpack_object logEntryError_key; + msgpack_object logEntryError_map; + msgpack_object logEntryCode; + msgpack_object at_type; + + if (c->resp.status != 400 && c->resp.status != 403) { + return -1; + } + + ret = flb_pack_json(c->resp.payload, c->resp.payload_size, + &buffer, &size, &root_type, NULL); + if (ret != 0) { + flb_plg_error(ctx->ins, "failed to parse json into msgpack: %s", + c->resp.payload); + return -1; + } + + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, buffer, size, &off); + if (ret != MSGPACK_UNPACK_SUCCESS) { + flb_plg_error(ctx->ins, "Cannot unpack %s response: %s", + c->resp.payload); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + return -1; + } + + root = result.data; + if (root.type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "%s response parsing failed, msgpack_type=%i", + root.type); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + return -1; + } +/* Sample error response +{ + "error": { + "code": 400, + "message": "Log entry with size 293.1K exceeds maximum size of 256.0K", + "status": "INVALID_ARGUMENT", + "details": [ + { + "@type": "type.googleapis.com/google.logging.v2.WriteLogEntriesPartialErrors", + "logEntryErrors": { + "2": { + "code": 3, + "message": "Log entry with size 293.1K exceeds maximum size of 256.0K" + }, + "4": { + "code": 3, + "message": "Log entry with size 293.1K exceeds maximum size of 256.0K" + } + } + } + ] + } +} +*/ + ret = extract_msgpack_obj_from_msgpack_map(&root.via.map, "error", 5, + MSGPACK_OBJECT_MAP, &error_map); + if (ret == -1) { + flb_plg_debug(ctx->ins, + "%s response does not have key: \"error\""); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + return -1; + } + + ret = extract_msgpack_obj_from_msgpack_map(&error_map.via.map, "details", 7, + MSGPACK_OBJECT_ARRAY, + &details_arr); + if (ret == -1) { + flb_plg_debug(ctx->ins, + "%s response does not have key: \"details\""); + flb_free(buffer); + msgpack_unpacked_destroy(&result); + return -1; + } + for (i = 0; i < details_arr.via.array.size; i++) { + details_map = details_arr.via.array.ptr[i]; + if (details_map.type != MSGPACK_OBJECT_MAP) { + continue; + } + + + ret = extract_msgpack_obj_from_msgpack_map(&details_map.via.map, + "@type", 5, + MSGPACK_OBJECT_STR, + &at_type); + strncpy(at_type_str, at_type.via.str.ptr, + PARTIAL_SUCCESS_GRPC_TYPE_SIZE); + if (ret != 0 || + at_type.via.str.size != PARTIAL_SUCCESS_GRPC_TYPE_SIZE || + strncmp(at_type_str, PARTIAL_SUCCESS_GRPC_TYPE, + PARTIAL_SUCCESS_GRPC_TYPE_SIZE) != 0) { + continue; + } + + ret = extract_msgpack_obj_from_msgpack_map(&details_map.via.map, + "logEntryErrors", 14, + MSGPACK_OBJECT_MAP, + &logEntryErrors_map); + if (ret != 0) { + continue; + } + + for (i = 0; i < logEntryErrors_map.via.map.size; i++) { + logEntryError_key = logEntryErrors_map.via.map.ptr[i].key; + if (logEntryError_key.type != MSGPACK_OBJECT_STR) { + continue; + } + log_entry_ret = extract_msgpack_obj_from_msgpack_map( + &logEntryErrors_map.via.map, + logEntryError_key.via.str.ptr, + logEntryError_key.via.str.size, + MSGPACK_OBJECT_MAP, + &logEntryError_map); + + if (log_entry_ret != 0) { + continue; + } + + code_ret = extract_msgpack_obj_from_msgpack_map( + &logEntryError_map.via.map, + "code", + 4, + MSGPACK_OBJECT_POSITIVE_INTEGER, + &logEntryCode); + + if (code_ret == 0) { + if (logEntryCode.via.i64 < 0 + || logEntryCode.via.i64 >= GRPC_STATUS_CODES_SIZE) { + // TODO: fallback on a different data structure + flb_plg_error(ctx->ins, + "internal error unexpected status code: %i", + (int) logEntryCode.via.i64); + return -1; + } + grpc_status_codes[(int) logEntryCode.via.i64]++; +#ifdef FLB_HAVE_METRICS + add_record_metrics(ctx, ts, 1, c->resp.status, + (int) logEntryCode.via.i64); +#endif + } + } + } + flb_free(buffer); + msgpack_unpacked_destroy(&result); + return 0; +} static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk, struct flb_output_flush *out_flush, struct flb_input_instance *i_ins, @@ -2519,7 +2725,10 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk, (void) i_ins; (void) config; int ret; + int code; + int ret_partial_success; int ret_code = FLB_RETRY; + int grpc_status_counts[GRPC_STATUS_CODES_SIZE] = {0}; size_t b_sent; flb_sds_t token; flb_sds_t payload_buf; @@ -2544,8 +2753,7 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk, /* OLD api */ flb_metrics_sum(FLB_STACKDRIVER_FAILED_REQUESTS, 1, ctx->ins->metrics); - update_http_metrics(ctx, event_chunk, ts, STACKDRIVER_NET_ERROR); - update_retry_metric(ctx, event_chunk, ts, STACKDRIVER_NET_ERROR, FLB_RETRY); + update_retry_metric(ctx, event_chunk, ts, STACKDRIVER_NET_ERROR); #endif FLB_OUTPUT_RETURN(FLB_RETRY); } @@ -2625,9 +2833,6 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk, if (ret != 0) { flb_plg_warn(ctx->ins, "http_do=%i", ret); ret_code = FLB_RETRY; -#ifdef FLB_HAVE_METRICS - update_http_metrics(ctx, event_chunk, ts, STACKDRIVER_NET_ERROR); -#endif } else { /* The request was issued successfully, validate the 'error' field */ @@ -2635,52 +2840,80 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk, if (c->resp.status == 200) { ret_code = FLB_OK; } - else if (c->resp.status >= 400 && c->resp.status < 500) { - ret_code = FLB_ERROR; - flb_plg_warn(ctx->ins, "error\n%s", - c->resp.payload); - } else { +#ifdef FLB_HAVE_METRICS + /* check partial success */ + ret_partial_success = + parse_partial_success_response(c, + ctx, + ts, + (int) event_chunk->total_events, + grpc_status_counts); + + int failed_records = 0; + if (ret_partial_success == 0) { + for (code = 0; code < GRPC_STATUS_CODES_SIZE; code++) { + if (grpc_status_counts[code] != 0) { + failed_records += grpc_status_counts[code]; + } + } + cmt_counter_add(ctx->ins->cmt_dropped_records, ts, + failed_records, 1, (char* []) {name}); + int successful_records = + (int) event_chunk->total_events - failed_records; + if (successful_records != 0) { + add_record_metrics(ctx, ts, successful_records, 200, 0); + } + } + else { + add_record_metrics(ctx, ts, (int) event_chunk->total_events, + c->resp.status, -1); + cmt_counter_add(ctx->ins->cmt_dropped_records, ts, + (int) event_chunk->total_events, 1, + (char* []) {name}); + } +#endif + if (c->resp.status >= 400 && c->resp.status < 500) { + ret_code = FLB_ERROR; + flb_plg_warn(ctx->ins, "error: %s", c->resp.payload); + } + else { if (c->resp.payload_size > 0) { - /* we got an error */ - flb_plg_warn(ctx->ins, "error\n%s", - c->resp.payload); + /* we got an error */ + flb_plg_warn(ctx->ins, "error: %s", c->resp.payload); } else { - flb_plg_debug(ctx->ins, "response\n%s", - c->resp.payload); + flb_plg_debug(ctx->ins, "response: %s", c->resp.payload); } ret_code = FLB_RETRY; + } } } /* Update specific stackdriver metrics */ #ifdef FLB_HAVE_METRICS if (ret_code == FLB_OK) { - cmt_counter_inc(ctx->cmt_successful_requests, - ts, 1, (char *[]) {name}); + cmt_counter_inc(ctx->cmt_successful_requests, ts, 1, (char *[]) {name}); + add_record_metrics(ctx, ts, (int) event_chunk->total_events, 200, 0); /* OLD api */ flb_metrics_sum(FLB_STACKDRIVER_SUCCESSFUL_REQUESTS, 1, ctx->ins->metrics); } - else { - cmt_counter_inc(ctx->cmt_failed_requests, - ts, 1, (char *[]) {name}); + else if (ret_code == FLB_ERROR) { + cmt_counter_inc(ctx->cmt_failed_requests, ts, 1, (char* []) {name}); /* OLD api */ flb_metrics_sum(FLB_STACKDRIVER_FAILED_REQUESTS, 1, ctx->ins->metrics); } - /* Update metrics counter by using labels/http status code */ - if (ret == 0) { - update_http_metrics(ctx, event_chunk, ts, c->resp.status); + if (ret_code == FLB_RETRY) { + update_retry_metric(ctx, event_chunk, ts, c->resp.status); } - /* Update retry count if necessary */ - update_retry_metric(ctx, event_chunk, ts, c->resp.status, ret_code); + /* Update metrics counter by using labels/http status code */ + update_http_metrics(ctx, event_chunk, ts, c->resp.status); #endif - /* Cleanup */ if (compressed == FLB_TRUE) { flb_free(compressed_payload_buffer); diff --git a/plugins/out_stackdriver/stackdriver.h b/plugins/out_stackdriver/stackdriver.h index 239a3ee318a..41ec385b507 100644 --- a/plugins/out_stackdriver/stackdriver.h +++ b/plugins/out_stackdriver/stackdriver.h @@ -86,6 +86,11 @@ #define FLB_STACKDRIVER_FAILED_REQUESTS 1001 /* failed requests */ #endif +// https://grpc.github.io/grpc/core/md_doc_statuscodes.html +#define GRPC_STATUS_CODES_SIZE 17 +#define PARTIAL_SUCCESS_GRPC_TYPE "type.googleapis.com/google.logging.v2.WriteLogEntriesPartialErrors" +#define PARTIAL_SUCCESS_GRPC_TYPE_SIZE 66 + struct flb_stackdriver_oauth_credentials { /* parsed credentials file */ flb_sds_t type; diff --git a/plugins/out_stackdriver/stackdriver_conf.c b/plugins/out_stackdriver/stackdriver_conf.c index 9f3f28a354c..2be649a417b 100644 --- a/plugins/out_stackdriver/stackdriver_conf.c +++ b/plugins/out_stackdriver/stackdriver_conf.c @@ -551,7 +551,7 @@ struct flb_stackdriver *flb_stackdriver_conf_create(struct flb_output_instance * "stackdriver", "proc_records_total", "Total number of processed records.", - 2, (char *[]) {"status", "name"}); + 3, (char *[]) {"grpc_code" ,"status", "name"}); ctx->cmt_retried_records_total = cmt_counter_create(ins->cmt, "fluentbit",