From 589a7cb63cce043874ba3f2eba9e67b44c034635 Mon Sep 17 00:00:00 2001 From: ryanohnemus Date: Fri, 22 Dec 2023 08:34:21 -0600 Subject: [PATCH] in_kubernetes_events: consolidate record timestamp logic Signed-off-by: ryanohnemus --- include/fluent-bit/flb_http_client.h | 1 - .../in_kubernetes_events/kubernetes_events.c | 113 ++++++++++-------- src/flb_http_client.c | 6 - 3 files changed, 60 insertions(+), 60 deletions(-) diff --git a/include/fluent-bit/flb_http_client.h b/include/fluent-bit/flb_http_client.h index 372e9cc25c0..5c0bf642223 100644 --- a/include/fluent-bit/flb_http_client.h +++ b/include/fluent-bit/flb_http_client.h @@ -53,7 +53,6 @@ #define FLB_HTTP_OK 1 #define FLB_HTTP_NOT_FOUND 2 /* header not found */ #define FLB_HTTP_CHUNK_AVAILABLE 3 /* means chunk is available, but there is more data. end of all chunks returns FLB_HTTP_OK */ -#define FLB_HTTP_CHUNK_AVAILABLE 3 /* means chunk is available, but there is more data. end of all chunks returns FLB_HTTP_OK */ /* Useful headers */ #define FLB_HTTP_HEADER_AUTH "Authorization" diff --git a/plugins/in_kubernetes_events/kubernetes_events.c b/plugins/in_kubernetes_events/kubernetes_events.c index 5b546697a45..c15a1be96ca 100644 --- a/plugins/in_kubernetes_events/kubernetes_events.c +++ b/plugins/in_kubernetes_events/kubernetes_events.c @@ -81,7 +81,7 @@ static int file_to_buffer(const char *path, fclose(fp); - // trim new lines + /* trim new lines */ for (len = st.st_size; len > 0; len--) { if (buf[len-1] != '\n' && buf[len-1] != '\r') { break; @@ -243,7 +243,7 @@ static int record_get_field_uint64(msgpack_object *obj, const char *fieldname, u return -1; } - // attempt to parse string as number... + /* attempt to parse string as number... */ if (v->type == MSGPACK_OBJECT_STR) { *val = strtoul(v->via.str.ptr, &end, 10); if (end == NULL || (end < v->via.str.ptr + v->via.str.size)) { @@ -267,8 +267,9 @@ static int item_get_timestamp(msgpack_object *obj, struct flb_time *event_time) int ret; msgpack_object *metadata; - // some events can have lastTimestamp and firstTimestamp set to - // NULL while having metadata.creationTimestamp set. + /* some events can have lastTimestamp and firstTimestamp set to + * NULL while having metadata.creationTimestamp set. + */ ret = record_get_field_time(obj, "lastTimestamp", event_time); if (ret != -1) { return FLB_TRUE; @@ -293,18 +294,18 @@ static int item_get_timestamp(msgpack_object *obj, struct flb_time *event_time) } static bool check_event_is_filtered(struct k8s_events *ctx, msgpack_object *obj, - time_t* event_time) + struct flb_time* event_time) { int ret; - time_t now; + uint64_t outdated; msgpack_object *metadata; flb_sds_t uid; uint64_t resource_version; - now = (time_t)(cfl_time_now() / 1000000000); - if (*event_time < (now - ctx->retention_time)) { + outdated = cfl_time_now() - (ctx->retention_time * 1000000000L); + if (flb_time_to_nanosec(event_time) < outdated) { flb_plg_debug(ctx->ins, "Item is older than retention_time: %ld < %ld", - *event_time, (now - ctx->retention_time)); + flb_time_to_nanosec(event_time), outdated); return FLB_TRUE; } @@ -356,7 +357,7 @@ static bool check_event_is_filtered(struct k8s_events *ctx, msgpack_object *obj, } #endif - // check if this is an old event. + /* check if this is an old event. */ if (ctx->last_resource_version && resource_version <= ctx->last_resource_version) { flb_plg_debug(ctx->ins, "skipping old object: %llu (< %llu)", resource_version, ctx->last_resource_version); @@ -378,7 +379,7 @@ static int process_event_object(struct k8s_events* ctx, flb_sds_t action, msgpack_object* item_metadata; if(strncmp(action, "ADDED", 5) != 0 && strncmp(action, "MODIFIED", 8) != 0 ) { - //We don't process DELETED nor BOOKMARK + /* We don't process DELETED nor BOOKMARK */ return 0; } @@ -387,8 +388,7 @@ static int process_event_object(struct k8s_events* ctx, flb_sds_t action, flb_plg_warn(ctx->ins, "Event without metadata"); return -1; } - ret = record_get_field_uint64(item_metadata, - "resourceVersion", &resource_version); + ret = record_get_field_uint64(item_metadata, "resourceVersion", &resource_version); if (ret == -1) { return ret; } @@ -409,7 +409,7 @@ static int process_event_object(struct k8s_events* ctx, flb_sds_t action, return -1; } - if (check_event_is_filtered(ctx, item, (time_t)&ts) == FLB_TRUE) { + if (check_event_is_filtered(ctx, item, &ts) == FLB_TRUE) { return 0; } @@ -426,7 +426,8 @@ static int process_event_object(struct k8s_events* ctx, flb_sds_t action, ret = flb_log_event_encoder_set_body_from_msgpack_object(ctx->encoder, item); if (ret == FLB_EVENT_ENCODER_SUCCESS) { ret = flb_log_event_encoder_commit_record(ctx->encoder); - } else { + } + else { flb_plg_warn(ctx->ins, "unable to encode: %llu", resource_version); } @@ -491,15 +492,13 @@ static int process_event_list(struct k8s_events *ctx, char *in_data, size_t in_s char *buf_data; size_t buf_size; size_t off = 0; - struct flb_time ts; - uint64_t resource_version; msgpack_unpacked result; msgpack_object root; msgpack_object k; msgpack_object *items = NULL; msgpack_object *item = NULL; msgpack_object *metadata = NULL; - const flb_sds_t action = "ADDED"; //All items from a k8s list we consider an 'ADDED' type + const flb_sds_t action = "ADDED"; /* All items from a k8s list we consider as 'ADDED' */ ret = flb_pack_json(in_data, in_size, &buf_data, &buf_size, &root_type, &consumed); if (ret == -1) { @@ -521,8 +520,9 @@ static int process_event_list(struct k8s_events *ctx, char *in_data, size_t in_s return -1; } - // Traverse the EventList for the metadata (for the continue token) and the items. - // https://kubernetes.io/docs/reference/kubernetes-api/cluster-resources/event-v1/#EventList + /* Traverse the EventList for the metadata (for the continue token) and the items. + * https://kubernetes.io/docs/reference/kubernetes-api/cluster-resources/event-v1/#EventList + */ for (i = 0; i < root.via.map.size; i++) { k = root.via.map.ptr[i].key; if (k.type != MSGPACK_OBJECT_STR) { @@ -587,15 +587,17 @@ static int process_event_list(struct k8s_events *ctx, char *in_data, size_t in_s } static struct flb_http_client *make_event_watch_api_request(struct k8s_events *ctx, - struct flb_connection *u_conn, - uint64_t max_resource_version) { + struct flb_connection *u_conn, + uint64_t max_resource_version) +{ flb_sds_t url; struct flb_http_client *c; if (ctx->namespace == NULL) { url = flb_sds_create(K8S_EVENTS_KUBE_API_URI); - } else { - url = flb_sds_create_size(strlen(K8S_EVENTS_KUBE_NAMESPACE_API_URI) + + } + else { + url = flb_sds_create_size(strlen(K8S_EVENTS_KUBE_NAMESPACE_API_URI) + strlen(ctx->namespace)); flb_sds_printf(&url, K8S_EVENTS_KUBE_NAMESPACE_API_URI, ctx->namespace); } @@ -622,8 +624,9 @@ static struct flb_http_client *make_event_list_api_request(struct k8s_events *ct if (ctx->namespace == NULL) { url = flb_sds_create(K8S_EVENTS_KUBE_API_URI); - } else { - url = flb_sds_create_size(strlen(K8S_EVENTS_KUBE_NAMESPACE_API_URI) + + } + else { + url = flb_sds_create_size(strlen(K8S_EVENTS_KUBE_NAMESPACE_API_URI) + strlen(ctx->namespace)); flb_sds_printf(&url, K8S_EVENTS_KUBE_NAMESPACE_API_URI, ctx->namespace); } @@ -725,8 +728,8 @@ static int k8s_events_sql_insert_event(struct k8s_events *ctx, msgpack_object *i } flb_plg_debug(ctx->ins, - "inserted k8s event: uid=%s, resource_version=%llu, last=%ld", - uid, resource_version, last); + "inserted k8s event: uid=%s, resource_version=%llu, last=%llu", + uid, resource_version, flb_time_to_nanosec(&last)); sqlite3_clear_bindings(ctx->stmt_insert_kubernetes_event); sqlite3_reset(ctx->stmt_insert_kubernetes_event); @@ -736,27 +739,29 @@ static int k8s_events_sql_insert_event(struct k8s_events *ctx, msgpack_object *i #endif -static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c, - size_t *bytes_consumed) +static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c, + size_t *bytes_consumed) { int ret = 0; - char *token = NULL; int root_type; size_t consumed = 0; char *buf_data = NULL; size_t buf_size; - flb_sds_t payload; - - //we copy payload because tokenizer will overwrite when it finds end of string - payload = flb_sds_create_len(c->resp.payload, c->resp.payload_size); - - token = strtok(payload, JSON_ARRAY_DELIM); - while ( token != NULL && ret == 0 ) { - ret = flb_pack_json(token, strlen(token), &buf_data, &buf_size, &root_type, &consumed); + size_t token_size = 0; + char *token_start = 0; + char *token_end = NULL; + + token_start = c->resp.payload; + token_end = strpbrk(token_start, JSON_ARRAY_DELIM); + while ( token_end != NULL && ret == 0 ) { + token_size = token_end - token_start; + ret = flb_pack_json(token_start, token_size, &buf_data, &buf_size, &root_type, &consumed); if (ret == -1) { - flb_plg_error(ctx->ins, "could not process payload, incomplete or bad formed JSON: %s", token); - } else { - *bytes_consumed += strlen(token) + 1; + flb_plg_debug(ctx->ins, "could not process payload, incomplete or bad formed JSON: %s", + c->resp.payload); + } + else { + *bytes_consumed += token_size + 1; ret = process_watched_event(ctx, buf_data, buf_size); } @@ -764,13 +769,13 @@ static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c, if (buf_data) { buf_data = NULL; } - token = strtok(NULL, JSON_ARRAY_DELIM); - } + token_start = token_end+1; + token_end = strpbrk(token_start, JSON_ARRAY_DELIM); + } if (buf_data) { flb_free(buf_data); } - flb_sds_destroy(payload); return ret; } @@ -849,12 +854,13 @@ static int k8s_events_collect(struct flb_input_instance *ins, } while(continue_token != NULL); if (max_resource_version > ctx->last_resource_version) { - flb_plg_debug(ctx->ins, "set last resourceVersion=%lu", max_resource_version); + flb_plg_debug(ctx->ins, "set last resourceVersion=%llu", max_resource_version); ctx->last_resource_version = max_resource_version; } - //Now that we've done a full list, we can use the resource version and do a watch - //to stream updates efficiently + /* Now that we've done a full list, we can use the resource version and do a watch + * to stream updates efficiently + */ c = make_event_watch_api_request(ctx, u_conn, max_resource_version); if (!c) { flb_plg_error(ins, "unable to create http client"); @@ -862,9 +868,10 @@ static int k8s_events_collect(struct flb_input_instance *ins, } initialize_http_client(c, ctx); - // Watch will stream chunked json data, so we only send - // the http request, then use flb_http_get_available_chunks - // to attempt processing on available streamed data + /* Watch will stream chunked json data, so we only send + * the http request, then use flb_http_get_response_data + * to attempt processing on available streamed data + */ b_sent = 0; ret = flb_http_do_request(c, &b_sent); if (ret != 0) { @@ -876,13 +883,13 @@ static int k8s_events_collect(struct flb_input_instance *ins, bytes_consumed = 0; chunk_proc_ret = 0; while ((ret == FLB_HTTP_MORE || ret == FLB_HTTP_CHUNK_AVAILABLE) && chunk_proc_ret == 0) { - ret = flb_http_get_available_chunks(c, bytes_consumed); + ret = flb_http_get_response_data(c, bytes_consumed); bytes_consumed = 0; if( c->resp.status == 200 && ret == FLB_HTTP_CHUNK_AVAILABLE ) { chunk_proc_ret = process_http_chunk(ctx, c, &bytes_consumed); } } - // NOTE: skipping any processing after streaming socket closes + /* NOTE: skipping any processing after streaming socket closes */ if (c->resp.status != 200) { flb_plg_warn(ins, "events watch failure, http_status=%d payload=%s", c->resp.status, c->resp.payload); diff --git a/src/flb_http_client.c b/src/flb_http_client.c index a989cec92d1..ae287c8c029 100644 --- a/src/flb_http_client.c +++ b/src/flb_http_client.c @@ -1179,12 +1179,6 @@ int flb_http_bearer_auth(struct flb_http_client *c, const char *token) return result; } -/* flb_http_do_request only sends the http request the data. -* This is useful for processing the chunked responses on your own. -* If you do not want to process the response on your own or expect -* all response data before you process data, use flb_http_do instead. -*/ -int flb_http_do_request(struct flb_http_client *c, size_t *bytes) /* flb_http_do_request only sends the http request the data. * This is useful for processing the chunked responses on your own. * If you do not want to process the response on your own or expect