From a143dff4f71115bedcdebe2911920fbb68a478da Mon Sep 17 00:00:00 2001
From: avilevy <avilevy@google.com>
Date: Tue, 3 Oct 2023 15:45:27 +0000
Subject: [PATCH 1/2] Add `partialSuccess: true` to all logs sent to Google
 Cloud Logging API

Signed-off-by: avilevy <avilevy@google.com>
---
 plugins/out_stackdriver/stackdriver.c | 96 ++++++++++++++++++++++++++-
 1 file changed, 95 insertions(+), 1 deletion(-)

diff --git a/plugins/out_stackdriver/stackdriver.c b/plugins/out_stackdriver/stackdriver.c
index 5c48a338b81..a4afab94a3f 100644
--- a/plugins/out_stackdriver/stackdriver.c
+++ b/plugins/out_stackdriver/stackdriver.c
@@ -1769,7 +1769,12 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx,
      * {"resource": {"type": "...", "labels": {...},
      *  "entries": []
      */
-    msgpack_pack_map(&mp_pck, 2);
+    msgpack_pack_map(&mp_pck, 3);
+    
+    /* Set partialSuccess to true */
+    msgpack_pack_str(&mp_pck, 14);
+    msgpack_pack_str_body(&mp_pck, "partialSuccess", 14);
+    msgpack_pack_true(&mp_pck);
 
     msgpack_pack_str(&mp_pck, 8);
     msgpack_pack_str_body(&mp_pck, "resource", 8);
@@ -2510,6 +2515,91 @@ static void update_retry_metric(struct flb_stackdriver *ctx,
 }
 #endif
 
+static int extract_map_from_msgpack(msgpack_object *root, char *name, int size, msgpack_object_type object_type ,msgpack_object *val){
+    int i;
+    msgpack_object key;
+    for (i = 0; i < root->via.map.size; i++) {
+        key = root->via.map.ptr[i].key;
+        if (key.type != MSGPACK_OBJECT_STR) {
+                continue;
+        }
+        if(key.via.str.size == size && strncmp(key.via.str.ptr, name, size) == 0){
+            *val = root->via.map.ptr[i].val;
+            if (val->type != object_type) {
+                return -1;
+            }
+            return 0;
+        }
+    }
+    return -1;
+}
+
+static int parse_partial_success_response(struct flb_http_client *c, struct flb_stackdriver *ctx, int *partial_failure){
+    int ret;
+    int root_type;
+    int i;
+    char *buffer;
+    size_t size;
+    size_t off = 0;
+    msgpack_unpacked result;
+    msgpack_object root;
+    msgpack_object error_map;
+    msgpack_object details_arr;
+    msgpack_object logEntryErrors_map;
+    ret = flb_pack_json(c->resp.payload, c->resp.payload_size,
+        &buffer, &size, &root_type, NULL);
+    msgpack_unpacked_init(&result);
+    ret = msgpack_unpack_next(&result, buffer, size, &off);
+    if (ret != MSGPACK_UNPACK_SUCCESS) {
+        flb_plg_error(ctx->ins, "Cannot unpack %s response\n%s", c->resp.payload);
+        flb_free(buffer);
+        msgpack_unpacked_destroy(&result);
+        return -1;
+    }
+
+    root = result.data;
+    if (root.type != MSGPACK_OBJECT_MAP) {
+        flb_plg_error(ctx->ins, "%s response parsing failed, msgpack_type=%i", root.type);
+        flb_free(buffer);
+        msgpack_unpacked_destroy(&result);
+        return -1;
+    }
+
+    ret = extract_map_from_msgpack(&root, "error", 5, MSGPACK_OBJECT_MAP ,&error_map);
+    if (ret == -1) {
+        flb_plg_error(ctx->ins, "%s response parsing failed, could not find key: \"error\"");
+        flb_free(buffer);
+        msgpack_unpacked_destroy(&result);
+        return -1;
+    }
+    ret = extract_map_from_msgpack(&error_map, "details", 7, MSGPACK_OBJECT_ARRAY ,&details_arr);
+    if (ret == -1) {
+        flb_plg_error(ctx->ins, "%s response parsing failed, could not find key: \"details\"");
+        flb_free(buffer);
+        msgpack_unpacked_destroy(&result);
+        return -1;
+    }
+    for (i = 0; i < details_arr.via.array.size; i++) {
+        logEntryErrors_map = details_arr.via.array.ptr[i];
+        if (logEntryErrors_map.type != MSGPACK_OBJECT_MAP) {
+            continue;
+        }
+        ret = extract_map_from_msgpack(&logEntryErrors_map, "logEntryErrors", 14, MSGPACK_OBJECT_MAP ,&logEntryErrors_map);
+        if (ret == -1){
+            flb_plg_error(ctx->ins, "%s response parsing failed, could not find key: \"logEntryErrors\"");
+            flb_free(buffer);
+            msgpack_unpacked_destroy(&result);
+            return -1;
+        }
+        break;
+    }
+
+    *partial_failure = logEntryErrors_map.via.map.size;
+    flb_free(buffer);
+    msgpack_unpacked_destroy(&result);
+    return 0;
+}
+
 static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk,
                                  struct flb_output_flush *out_flush,
                                  struct flb_input_instance *i_ins,
@@ -2520,6 +2610,7 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk,
     (void) config;
     int ret;
     int ret_code = FLB_RETRY;
+    int partial_failure;
     size_t b_sent;
     flb_sds_t token;
     flb_sds_t payload_buf;
@@ -2637,6 +2728,9 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk,
         }
         else if (c->resp.status >= 400 && c->resp.status < 500) {
             ret_code = FLB_ERROR;
+            ret = parse_partial_success_response(c, ctx, &partial_failure);
+            cmt_counter_add(ctx->cmt_failed_requests, ts, (double)partial_failure, 1, (char *[]) {name});
+
             flb_plg_warn(ctx->ins, "error\n%s",
                 c->resp.payload);
         }

From fea1d2cb377224b59b49a1f11f9dbdb38b67f651 Mon Sep 17 00:00:00 2001
From: avilevy <avilevy@google.com>
Date: Tue, 3 Oct 2023 18:31:32 +0000
Subject: [PATCH 2/2] Small bug fix

Signed-off-by: avilevy <avilevy@google.com>
---
 plugins/out_stackdriver/stackdriver.c | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/plugins/out_stackdriver/stackdriver.c b/plugins/out_stackdriver/stackdriver.c
index a4afab94a3f..a7571a80261 100644
--- a/plugins/out_stackdriver/stackdriver.c
+++ b/plugins/out_stackdriver/stackdriver.c
@@ -2728,8 +2728,10 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk,
         }
         else if (c->resp.status >= 400 && c->resp.status < 500) {
             ret_code = FLB_ERROR;
-            ret = parse_partial_success_response(c, ctx, &partial_failure);
-            cmt_counter_add(ctx->cmt_failed_requests, ts, (double)partial_failure, 1, (char *[]) {name});
+            if(c->resp.status == 400){
+              ret = parse_partial_success_response(c, ctx, &partial_failure);
+              cmt_counter_add(ctx->cmt_failed_requests, ts, (double)partial_failure, 1, (char *[]) {name});
+            }
 
             flb_plg_warn(ctx->ins, "error\n%s",
                 c->resp.payload);