diff --git a/plugins/in_kubernetes_events/kubernetes_events.c b/plugins/in_kubernetes_events/kubernetes_events.c index 6231becb0ad..0a439ae4fab 100644 --- a/plugins/in_kubernetes_events/kubernetes_events.c +++ b/plugins/in_kubernetes_events/kubernetes_events.c @@ -42,6 +42,8 @@ static int k8s_events_sql_insert_event(struct k8s_events *ctx, msgpack_object *item); #endif +#define JSON_ARRAY_DELIM "\r\n" + static int file_to_buffer(const char *path, char **out_buf, size_t *out_size) { @@ -79,7 +81,7 @@ static int file_to_buffer(const char *path, fclose(fp); - // trim new lines + /* trim new lines */ for (len = st.st_size; len > 0; len--) { if (buf[len-1] != '\n' && buf[len-1] != '\r') { break; @@ -241,7 +243,7 @@ static int record_get_field_uint64(msgpack_object *obj, const char *fieldname, u return -1; } - // attempt to parse string as number... + /* attempt to parse string as number... */ if (v->type == MSGPACK_OBJECT_STR) { *val = strtoul(v->via.str.ptr, &end, 10); if (end == NULL || (end < v->via.str.ptr + v->via.str.size)) { @@ -265,8 +267,9 @@ static int item_get_timestamp(msgpack_object *obj, struct flb_time *event_time) int ret; msgpack_object *metadata; - // some events can have lastTimestamp and firstTimestamp set to - // NULL while having metadata.creationTimestamp set. + /* some events can have lastTimestamp and firstTimestamp set to + * NULL while having metadata.creationTimestamp set. + */ ret = record_get_field_time(obj, "lastTimestamp", event_time); if (ret != -1) { return FLB_TRUE; @@ -291,18 +294,18 @@ static int item_get_timestamp(msgpack_object *obj, struct flb_time *event_time) } static bool check_event_is_filtered(struct k8s_events *ctx, msgpack_object *obj, - time_t* event_time) + struct flb_time* event_time) { int ret; - time_t now; + uint64_t outdated; msgpack_object *metadata; flb_sds_t uid; uint64_t resource_version; - now = (time_t)(cfl_time_now() / 1000000000); - if (*event_time < (now - ctx->retention_time)) { + outdated = cfl_time_now() - (ctx->retention_time * 1000000000L); + if (flb_time_to_nanosec(event_time) < outdated) { flb_plg_debug(ctx->ins, "Item is older than retention_time: %ld < %ld", - *event_time, (now - ctx->retention_time)); + flb_time_to_nanosec(event_time), outdated); return FLB_TRUE; } @@ -354,7 +357,7 @@ static bool check_event_is_filtered(struct k8s_events *ctx, msgpack_object *obj, } #endif - // check if this is an old event. + /* check if this is an old event. */ if (ctx->last_resource_version && resource_version <= ctx->last_resource_version) { flb_plg_debug(ctx->ins, "skipping old object: %llu (< %llu)", resource_version, ctx->last_resource_version); @@ -366,7 +369,121 @@ static bool check_event_is_filtered(struct k8s_events *ctx, msgpack_object *obj, return FLB_FALSE; } -static int process_events(struct k8s_events *ctx, char *in_data, size_t in_size, uint64_t *max_resource_version, flb_sds_t *continue_token) + +static int process_event_object(struct k8s_events* ctx, flb_sds_t action, + msgpack_object* item) +{ + int ret = -1; + struct flb_time ts; + uint64_t resource_version; + msgpack_object* item_metadata; + + if(strncmp(action, "ADDED", 5) != 0 && strncmp(action, "MODIFIED", 8) != 0 ) { + /* We don't process DELETED nor BOOKMARK */ + return 0; + } + + item_metadata = record_get_field_ptr(item, "metadata"); + if (item_metadata == NULL) { + flb_plg_warn(ctx->ins, "Event without metadata"); + return -1; + } + ret = record_get_field_uint64(item_metadata, "resourceVersion", &resource_version); + if (ret == -1) { + return ret; + } + + /* reset the log encoder */ + flb_log_event_encoder_reset(ctx->encoder); + + /* print every item from the items array */ + if (item->type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "Cannot unpack item in response"); + return -1; + } + + /* get event timestamp */ + ret = item_get_timestamp(item, &ts); + if (ret == FLB_FALSE) { + flb_plg_error(ctx->ins, "cannot retrieve event timestamp"); + return -1; + } + + if (check_event_is_filtered(ctx, item, &ts) == FLB_TRUE) { + return 0; + } + +#ifdef FLB_HAVE_SQLDB + if (ctx->db) { + k8s_events_sql_insert_event(ctx, item); + } +#endif + + /* encode content as a log event */ + flb_log_event_encoder_begin_record(ctx->encoder); + flb_log_event_encoder_set_timestamp(ctx->encoder, &ts); + + ret = flb_log_event_encoder_set_body_from_msgpack_object(ctx->encoder, item); + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_commit_record(ctx->encoder); + } + else { + flb_plg_warn(ctx->ins, "unable to encode: %llu", resource_version); + } + + if (ctx->encoder->output_length > 0) { + flb_input_log_append(ctx->ins, NULL, 0, + ctx->encoder->output_buffer, + ctx->encoder->output_length); + } + + return 0; +} + +static int process_watched_event(struct k8s_events *ctx, char *buf_data, size_t buf_size) { + int ret = -1; + size_t off = 0; + msgpack_unpacked result; + msgpack_object root; + msgpack_object *item = NULL; + flb_sds_t event_type = NULL; + + /* unpack */ + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, buf_data, buf_size, &off); + if (ret != MSGPACK_UNPACK_SUCCESS) { + flb_plg_error(ctx->ins, "Cannot unpack response"); + return -1; + } + + root = result.data; + if (root.type != MSGPACK_OBJECT_MAP) { + return -1; + } + + ret = record_get_field_sds(&root, "type", &event_type); + if (ret == -1) { + flb_plg_warn(ctx->ins, "Streamed Event 'type' not found"); + goto msg_error; + } + + item = record_get_field_ptr(&root, "object"); + if (item == NULL || item->type != MSGPACK_OBJECT_MAP) { + flb_plg_warn(ctx->ins, "Streamed Event 'object' not found"); + ret = -1; + goto msg_error; + } + + ret = process_event_object(ctx, event_type, item); + +msg_error: + flb_sds_destroy(event_type); + msgpack_unpacked_destroy(&result); + return ret; +} + +static int process_event_list(struct k8s_events *ctx, char *in_data, size_t in_size, + uint64_t *max_resource_version, flb_sds_t *continue_token) { int i; int ret = -1; @@ -375,16 +492,13 @@ static int process_events(struct k8s_events *ctx, char *in_data, size_t in_size, char *buf_data; size_t buf_size; size_t off = 0; - struct flb_time ts; - uint64_t resource_version; msgpack_unpacked result; msgpack_object root; msgpack_object k; msgpack_object *items = NULL; msgpack_object *item = NULL; - msgpack_object *item_metadata = NULL; msgpack_object *metadata = NULL; - + const flb_sds_t action = "ADDED"; /* All items from a k8s list we consider as 'ADDED' */ ret = flb_pack_json(in_data, in_size, &buf_data, &buf_size, &root_type, &consumed); if (ret == -1) { @@ -406,8 +520,9 @@ static int process_events(struct k8s_events *ctx, char *in_data, size_t in_size, return -1; } - // Traverse the EventList for the metadata (for the continue token) and the items. - // https://kubernetes.io/docs/reference/kubernetes-api/cluster-resources/event-v1/#EventList + /* Traverse the EventList for the metadata (for the continue token) and the items. + * https://kubernetes.io/docs/reference/kubernetes-api/cluster-resources/event-v1/#EventList + */ for (i = 0; i < root.via.map.size; i++) { k = root.via.map.ptr[i].key; if (k.type != MSGPACK_OBJECT_STR) { @@ -437,82 +552,29 @@ static int process_events(struct k8s_events *ctx, char *in_data, size_t in_size, } if (metadata == NULL) { - flb_plg_error(ctx->ins, "Cannot find metatada in response"); + flb_plg_error(ctx->ins, "Cannot find metadata in response"); goto msg_error; } - ret = record_get_field_sds(metadata, "continue", continue_token); + ret = record_get_field_uint64(metadata, "resourceVersion", max_resource_version); if (ret == -1) { - if (ret == -1) { - flb_plg_error(ctx->ins, "Cannot process continue token"); + flb_plg_error(ctx->ins, "Cannot find EventList resourceVersion"); goto msg_error; - } } - for (i = 0; i < items->via.array.size; i++) { - if (items->via.array.ptr[i].type != MSGPACK_OBJECT_MAP) { - flb_plg_warn(ctx->ins, "Event that is not a map"); - continue; - } - item_metadata = record_get_field_ptr(&items->via.array.ptr[i], "metadata"); - if (item_metadata == NULL) { - flb_plg_warn(ctx->ins, "Event without metadata"); - continue; - } - ret = record_get_field_uint64(item_metadata, - "resourceVersion", &resource_version); - if (ret == -1) { - continue; - } - if (resource_version > *max_resource_version) { - *max_resource_version = resource_version; - } + ret = record_get_field_sds(metadata, "continue", continue_token); + if (ret == -1) { + flb_plg_error(ctx->ins, "Cannot process continue token"); + goto msg_error; } - /* reset the log encoder */ - flb_log_event_encoder_reset(ctx->encoder); - - /* print every item from the items array */ for (i = 0; i < items->via.array.size; i++) { item = &items->via.array.ptr[i]; if (item->type != MSGPACK_OBJECT_MAP) { flb_plg_error(ctx->ins, "Cannot unpack item in response"); goto msg_error; } - - /* get event timestamp */ - ret = item_get_timestamp(item, &ts); - if (ret == FLB_FALSE) { - flb_plg_error(ctx->ins, "cannot retrieve event timestamp"); - goto msg_error; - } - - if (check_event_is_filtered(ctx, item, &ts) == FLB_TRUE) { - continue; - } - -#ifdef FLB_HAVE_SQLDB - if (ctx->db) { - k8s_events_sql_insert_event(ctx, item); - } -#endif - - /* encode content as a log event */ - flb_log_event_encoder_begin_record(ctx->encoder); - flb_log_event_encoder_set_timestamp(ctx->encoder, &ts); - - ret = flb_log_event_encoder_set_body_from_msgpack_object(ctx->encoder, item); - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_commit_record(ctx->encoder); - } else { - flb_plg_warn(ctx->ins, "unable to encode: %llu", resource_version); - } - } - - if (ctx->encoder->output_length > 0) { - flb_input_log_append(ctx->ins, NULL, 0, - ctx->encoder->output_buffer, - ctx->encoder->output_length); + process_event_object(ctx, action, item); } msg_error: @@ -523,13 +585,36 @@ static int process_events(struct k8s_events *ctx, char *in_data, size_t in_size, return ret; } -static struct flb_http_client *make_event_api_request(struct k8s_events *ctx, +static struct flb_http_client *make_event_watch_api_request(struct k8s_events *ctx, struct flb_connection *u_conn, - flb_sds_t continue_token) + uint64_t max_resource_version) { flb_sds_t url; struct flb_http_client *c; + if (ctx->namespace == NULL) { + url = flb_sds_create(K8S_EVENTS_KUBE_API_URI); + } + else { + url = flb_sds_create_size(strlen(K8S_EVENTS_KUBE_NAMESPACE_API_URI) + + strlen(ctx->namespace)); + flb_sds_printf(&url, K8S_EVENTS_KUBE_NAMESPACE_API_URI, ctx->namespace); + } + + flb_sds_printf(&url, "?watch=1&resourceVersion=%llu", max_resource_version); + flb_plg_info(ctx->ins, "Requesting %s", url); + c = flb_http_client(u_conn, FLB_HTTP_GET, url, + NULL, 0, ctx->api_host, ctx->api_port, NULL, 0); + flb_sds_destroy(url); + return c; + } + +static struct flb_http_client *make_event_list_api_request(struct k8s_events *ctx, + struct flb_connection *u_conn, + flb_sds_t continue_token) +{ + flb_sds_t url; + struct flb_http_client *c; if (continue_token == NULL && ctx->limit_request == 0 && ctx->namespace == NULL) { return flb_http_client(u_conn, FLB_HTTP_GET, K8S_EVENTS_KUBE_API_URI, @@ -538,8 +623,9 @@ static struct flb_http_client *make_event_api_request(struct k8s_events *ctx, if (ctx->namespace == NULL) { url = flb_sds_create(K8S_EVENTS_KUBE_API_URI); - } else { - url = flb_sds_create_size(strlen(K8S_EVENTS_KUBE_NAMESPACE_API_URI) + + } + else { + url = flb_sds_create_size(strlen(K8S_EVENTS_KUBE_NAMESPACE_API_URI) + strlen(ctx->namespace)); flb_sds_printf(&url, K8S_EVENTS_KUBE_NAMESPACE_API_URI, ctx->namespace); } @@ -641,8 +727,8 @@ static int k8s_events_sql_insert_event(struct k8s_events *ctx, msgpack_object *i } flb_plg_debug(ctx->ins, - "inserted k8s event: uid=%s, resource_version=%llu, last=%ld", - uid, resource_version, last); + "inserted k8s event: uid=%s, resource_version=%llu, last=%llu", + uid, resource_version, flb_time_to_nanosec(&last)); sqlite3_clear_bindings(ctx->stmt_insert_kubernetes_event); sqlite3_reset(ctx->stmt_insert_kubernetes_event); @@ -652,6 +738,56 @@ static int k8s_events_sql_insert_event(struct k8s_events *ctx, msgpack_object *i #endif +static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c, + size_t *bytes_consumed) +{ + int ret = 0; + int root_type; + size_t consumed = 0; + char *buf_data = NULL; + size_t buf_size; + size_t token_size = 0; + char *token_start = 0; + char *token_end = NULL; + + token_start = c->resp.payload; + token_end = strpbrk(token_start, JSON_ARRAY_DELIM); + while ( token_end != NULL && ret == 0 ) { + token_size = token_end - token_start; + ret = flb_pack_json(token_start, token_size, &buf_data, &buf_size, &root_type, &consumed); + if (ret == -1) { + flb_plg_debug(ctx->ins, "could not process payload, incomplete or bad formed JSON: %s", + c->resp.payload); + } + else { + *bytes_consumed += token_size + 1; + ret = process_watched_event(ctx, buf_data, buf_size); + } + + flb_free(buf_data); + if (buf_data) { + buf_data = NULL; + } + token_start = token_end+1; + token_end = strpbrk(token_start, JSON_ARRAY_DELIM); + } + + if (buf_data) { + flb_free(buf_data); + } + return ret; +} + +static void initialize_http_client(struct flb_http_client* c, struct k8s_events* ctx) +{ + flb_http_buffer_size(c, 0); + + flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); + if (ctx->auth_len > 0) { + flb_http_add_header(c, "Authorization", 13, ctx->auth, ctx->auth_len); + } +} + static int k8s_events_collect(struct flb_input_instance *ins, struct flb_config *config, void *in_context) { @@ -662,6 +798,8 @@ static int k8s_events_collect(struct flb_input_instance *ins, struct k8s_events *ctx = in_context; flb_sds_t continue_token = NULL; uint64_t max_resource_version = 0; + size_t bytes_consumed; + int chunk_proc_ret; if (pthread_mutex_trylock(&ctx->lock) != 0) { FLB_INPUT_RETURN(0); @@ -680,7 +818,7 @@ static int k8s_events_collect(struct flb_input_instance *ins, } do { - c = make_event_api_request(ctx, u_conn, continue_token); + c = make_event_list_api_request(ctx, u_conn, continue_token); if (continue_token != NULL) { flb_sds_destroy(continue_token); continue_token = NULL; @@ -689,29 +827,26 @@ static int k8s_events_collect(struct flb_input_instance *ins, flb_plg_error(ins, "unable to create http client"); goto exit; } - flb_http_buffer_size(c, 0); - - flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); - if (ctx->auth_len > 0) { - flb_http_add_header(c, "Authorization", 13, ctx->auth, ctx->auth_len); - } - + initialize_http_client(c, ctx); ret = flb_http_do(c, &b_sent); if (ret != 0) { flb_plg_error(ins, "http do error"); goto exit; } - if (c->resp.status == 200) { - ret = process_events(ctx, c->resp.payload, c->resp.payload_size, &max_resource_version, &continue_token); + if (c->resp.status == 200 && c->resp.payload_size > 0) { + ret = process_event_list(ctx, c->resp.payload, c->resp.payload_size, + &max_resource_version, &continue_token); } - else { + else + { if (c->resp.payload_size > 0) { flb_plg_error(ctx->ins, "http_status=%i:\n%s", c->resp.status, c->resp.payload); } else { flb_plg_error(ctx->ins, "http_status=%i", c->resp.status); } + goto exit; } flb_http_client_destroy(c); c = NULL; @@ -722,6 +857,43 @@ static int k8s_events_collect(struct flb_input_instance *ins, ctx->last_resource_version = max_resource_version; } + /* Now that we've done a full list, we can use the resource version and do a watch + * to stream updates efficiently + */ + c = make_event_watch_api_request(ctx, u_conn, max_resource_version); + if (!c) { + flb_plg_error(ins, "unable to create http client"); + goto exit; + } + initialize_http_client(c, ctx); + + /* Watch will stream chunked json data, so we only send + * the http request, then use flb_http_get_response_data + * to attempt processing on available streamed data + */ + b_sent = 0; + ret = flb_http_do_request(c, &b_sent); + if (ret != 0) { + flb_plg_error(ins, "http do request error"); + goto exit; + } + + ret = FLB_HTTP_MORE; + bytes_consumed = 0; + chunk_proc_ret = 0; + while ((ret == FLB_HTTP_MORE || ret == FLB_HTTP_CHUNK_AVAILABLE) && chunk_proc_ret == 0) { + ret = flb_http_get_response_data(c, bytes_consumed); + bytes_consumed = 0; + if( c->resp.status == 200 && ret == FLB_HTTP_CHUNK_AVAILABLE ) { + chunk_proc_ret = process_http_chunk(ctx, c, &bytes_consumed); + } + } + /* NOTE: skipping any processing after streaming socket closes */ + + if (c->resp.status != 200) { + flb_plg_warn(ins, "events watch failure, http_status=%d payload=%s", c->resp.status, c->resp.payload); + } + exit: pthread_mutex_unlock(&ctx->lock); if (c) { @@ -863,12 +1035,6 @@ static struct flb_config_map config_map[] = { "kubernetes namespace to get events from, gets event from all namespaces by default." }, - { - FLB_CONFIG_MAP_STR, "timestamp_key", NULL, - 0, FLB_TRUE, offsetof(struct k8s_events, timestamp_key), - "Deprecated. To be removed in v3.0" - }, - #ifdef FLB_HAVE_SQLDB { FLB_CONFIG_MAP_STR, "db", NULL, diff --git a/plugins/in_kubernetes_events/kubernetes_events.h b/plugins/in_kubernetes_events/kubernetes_events.h index a4bcbf19d11..b5ec7db1c9c 100644 --- a/plugins/in_kubernetes_events/kubernetes_events.h +++ b/plugins/in_kubernetes_events/kubernetes_events.h @@ -74,9 +74,6 @@ struct k8s_events { struct flb_log_event_encoder *encoder; - /* timestamp key - deprecated, to be removed in v3.0 */ - flb_sds_t timestamp_key; - /* record accessor */ struct flb_record_accessor *ra_resource_version; @@ -105,4 +102,4 @@ struct k8s_events { pthread_mutex_t lock; }; -#endif \ No newline at end of file +#endif