Skip to content

Commit

Permalink
in_kubernetes_events: consolidate record timestamp logic
Browse files Browse the repository at this point in the history
Signed-off-by: ryanohnemus <[email protected]>
  • Loading branch information
ryanohnemus committed Jan 16, 2024
1 parent 05c0beb commit 589a7cb
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 60 deletions.
1 change: 0 additions & 1 deletion include/fluent-bit/flb_http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
#define FLB_HTTP_OK 1
#define FLB_HTTP_NOT_FOUND 2 /* header not found */
#define FLB_HTTP_CHUNK_AVAILABLE 3 /* means chunk is available, but there is more data. end of all chunks returns FLB_HTTP_OK */
#define FLB_HTTP_CHUNK_AVAILABLE 3 /* means chunk is available, but there is more data. end of all chunks returns FLB_HTTP_OK */

/* Useful headers */
#define FLB_HTTP_HEADER_AUTH "Authorization"
Expand Down
113 changes: 60 additions & 53 deletions plugins/in_kubernetes_events/kubernetes_events.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ static int file_to_buffer(const char *path,

fclose(fp);

// trim new lines
/* trim new lines */
for (len = st.st_size; len > 0; len--) {
if (buf[len-1] != '\n' && buf[len-1] != '\r') {
break;
Expand Down Expand Up @@ -243,7 +243,7 @@ static int record_get_field_uint64(msgpack_object *obj, const char *fieldname, u
return -1;
}

// attempt to parse string as number...
/* attempt to parse string as number... */
if (v->type == MSGPACK_OBJECT_STR) {
*val = strtoul(v->via.str.ptr, &end, 10);
if (end == NULL || (end < v->via.str.ptr + v->via.str.size)) {
Expand All @@ -267,8 +267,9 @@ static int item_get_timestamp(msgpack_object *obj, struct flb_time *event_time)
int ret;
msgpack_object *metadata;

// some events can have lastTimestamp and firstTimestamp set to
// NULL while having metadata.creationTimestamp set.
/* some events can have lastTimestamp and firstTimestamp set to
* NULL while having metadata.creationTimestamp set.
*/
ret = record_get_field_time(obj, "lastTimestamp", event_time);
if (ret != -1) {
return FLB_TRUE;
Expand All @@ -293,18 +294,18 @@ static int item_get_timestamp(msgpack_object *obj, struct flb_time *event_time)
}

static bool check_event_is_filtered(struct k8s_events *ctx, msgpack_object *obj,
time_t* event_time)
struct flb_time* event_time)
{
int ret;
time_t now;
uint64_t outdated;
msgpack_object *metadata;
flb_sds_t uid;
uint64_t resource_version;

now = (time_t)(cfl_time_now() / 1000000000);
if (*event_time < (now - ctx->retention_time)) {
outdated = cfl_time_now() - (ctx->retention_time * 1000000000L);
if (flb_time_to_nanosec(event_time) < outdated) {
flb_plg_debug(ctx->ins, "Item is older than retention_time: %ld < %ld",
*event_time, (now - ctx->retention_time));
flb_time_to_nanosec(event_time), outdated);
return FLB_TRUE;
}

Expand Down Expand Up @@ -356,7 +357,7 @@ static bool check_event_is_filtered(struct k8s_events *ctx, msgpack_object *obj,
}
#endif

// check if this is an old event.
/* check if this is an old event. */
if (ctx->last_resource_version && resource_version <= ctx->last_resource_version) {
flb_plg_debug(ctx->ins, "skipping old object: %llu (< %llu)", resource_version,
ctx->last_resource_version);
Expand All @@ -378,7 +379,7 @@ static int process_event_object(struct k8s_events* ctx, flb_sds_t action,
msgpack_object* item_metadata;

if(strncmp(action, "ADDED", 5) != 0 && strncmp(action, "MODIFIED", 8) != 0 ) {
//We don't process DELETED nor BOOKMARK
/* We don't process DELETED nor BOOKMARK */
return 0;
}

Expand All @@ -387,8 +388,7 @@ static int process_event_object(struct k8s_events* ctx, flb_sds_t action,
flb_plg_warn(ctx->ins, "Event without metadata");
return -1;
}
ret = record_get_field_uint64(item_metadata,
"resourceVersion", &resource_version);
ret = record_get_field_uint64(item_metadata, "resourceVersion", &resource_version);
if (ret == -1) {
return ret;
}
Expand All @@ -409,7 +409,7 @@ static int process_event_object(struct k8s_events* ctx, flb_sds_t action,
return -1;
}

if (check_event_is_filtered(ctx, item, (time_t)&ts) == FLB_TRUE) {
if (check_event_is_filtered(ctx, item, &ts) == FLB_TRUE) {
return 0;
}

Expand All @@ -426,7 +426,8 @@ static int process_event_object(struct k8s_events* ctx, flb_sds_t action,
ret = flb_log_event_encoder_set_body_from_msgpack_object(ctx->encoder, item);
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_commit_record(ctx->encoder);
} else {
}
else {
flb_plg_warn(ctx->ins, "unable to encode: %llu", resource_version);
}

Expand Down Expand Up @@ -491,15 +492,13 @@ static int process_event_list(struct k8s_events *ctx, char *in_data, size_t in_s
char *buf_data;
size_t buf_size;
size_t off = 0;
struct flb_time ts;
uint64_t resource_version;
msgpack_unpacked result;
msgpack_object root;
msgpack_object k;
msgpack_object *items = NULL;
msgpack_object *item = NULL;
msgpack_object *metadata = NULL;
const flb_sds_t action = "ADDED"; //All items from a k8s list we consider an 'ADDED' type
const flb_sds_t action = "ADDED"; /* All items from a k8s list we consider as 'ADDED' */

ret = flb_pack_json(in_data, in_size, &buf_data, &buf_size, &root_type, &consumed);
if (ret == -1) {
Expand All @@ -521,8 +520,9 @@ static int process_event_list(struct k8s_events *ctx, char *in_data, size_t in_s
return -1;
}

// Traverse the EventList for the metadata (for the continue token) and the items.
// https://kubernetes.io/docs/reference/kubernetes-api/cluster-resources/event-v1/#EventList
/* Traverse the EventList for the metadata (for the continue token) and the items.
* https://kubernetes.io/docs/reference/kubernetes-api/cluster-resources/event-v1/#EventList
*/
for (i = 0; i < root.via.map.size; i++) {
k = root.via.map.ptr[i].key;
if (k.type != MSGPACK_OBJECT_STR) {
Expand Down Expand Up @@ -587,15 +587,17 @@ static int process_event_list(struct k8s_events *ctx, char *in_data, size_t in_s
}

static struct flb_http_client *make_event_watch_api_request(struct k8s_events *ctx,
struct flb_connection *u_conn,
uint64_t max_resource_version) {
struct flb_connection *u_conn,
uint64_t max_resource_version)
{
flb_sds_t url;
struct flb_http_client *c;

if (ctx->namespace == NULL) {
url = flb_sds_create(K8S_EVENTS_KUBE_API_URI);
} else {
url = flb_sds_create_size(strlen(K8S_EVENTS_KUBE_NAMESPACE_API_URI) +
}
else {
url = flb_sds_create_size(strlen(K8S_EVENTS_KUBE_NAMESPACE_API_URI) +
strlen(ctx->namespace));
flb_sds_printf(&url, K8S_EVENTS_KUBE_NAMESPACE_API_URI, ctx->namespace);
}
Expand All @@ -622,8 +624,9 @@ static struct flb_http_client *make_event_list_api_request(struct k8s_events *ct

if (ctx->namespace == NULL) {
url = flb_sds_create(K8S_EVENTS_KUBE_API_URI);
} else {
url = flb_sds_create_size(strlen(K8S_EVENTS_KUBE_NAMESPACE_API_URI) +
}
else {
url = flb_sds_create_size(strlen(K8S_EVENTS_KUBE_NAMESPACE_API_URI) +
strlen(ctx->namespace));
flb_sds_printf(&url, K8S_EVENTS_KUBE_NAMESPACE_API_URI, ctx->namespace);
}
Expand Down Expand Up @@ -725,8 +728,8 @@ static int k8s_events_sql_insert_event(struct k8s_events *ctx, msgpack_object *i
}

flb_plg_debug(ctx->ins,
"inserted k8s event: uid=%s, resource_version=%llu, last=%ld",
uid, resource_version, last);
"inserted k8s event: uid=%s, resource_version=%llu, last=%llu",
uid, resource_version, flb_time_to_nanosec(&last));
sqlite3_clear_bindings(ctx->stmt_insert_kubernetes_event);
sqlite3_reset(ctx->stmt_insert_kubernetes_event);

Expand All @@ -736,41 +739,43 @@ static int k8s_events_sql_insert_event(struct k8s_events *ctx, msgpack_object *i

#endif

static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c,
size_t *bytes_consumed)
static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c,
size_t *bytes_consumed)
{
int ret = 0;
char *token = NULL;
int root_type;
size_t consumed = 0;
char *buf_data = NULL;
size_t buf_size;
flb_sds_t payload;

//we copy payload because tokenizer will overwrite when it finds end of string
payload = flb_sds_create_len(c->resp.payload, c->resp.payload_size);

token = strtok(payload, JSON_ARRAY_DELIM);
while ( token != NULL && ret == 0 ) {
ret = flb_pack_json(token, strlen(token), &buf_data, &buf_size, &root_type, &consumed);
size_t token_size = 0;
char *token_start = 0;
char *token_end = NULL;

token_start = c->resp.payload;
token_end = strpbrk(token_start, JSON_ARRAY_DELIM);
while ( token_end != NULL && ret == 0 ) {
token_size = token_end - token_start;
ret = flb_pack_json(token_start, token_size, &buf_data, &buf_size, &root_type, &consumed);
if (ret == -1) {
flb_plg_error(ctx->ins, "could not process payload, incomplete or bad formed JSON: %s", token);
} else {
*bytes_consumed += strlen(token) + 1;
flb_plg_debug(ctx->ins, "could not process payload, incomplete or bad formed JSON: %s",
c->resp.payload);
}
else {
*bytes_consumed += token_size + 1;
ret = process_watched_event(ctx, buf_data, buf_size);
}

flb_free(buf_data);
if (buf_data) {
buf_data = NULL;
}
token = strtok(NULL, JSON_ARRAY_DELIM);
}
token_start = token_end+1;
token_end = strpbrk(token_start, JSON_ARRAY_DELIM);
}

if (buf_data) {
flb_free(buf_data);
}
flb_sds_destroy(payload);
return ret;
}

Expand Down Expand Up @@ -849,22 +854,24 @@ static int k8s_events_collect(struct flb_input_instance *ins,
} while(continue_token != NULL);

if (max_resource_version > ctx->last_resource_version) {
flb_plg_debug(ctx->ins, "set last resourceVersion=%lu", max_resource_version);
flb_plg_debug(ctx->ins, "set last resourceVersion=%llu", max_resource_version);
ctx->last_resource_version = max_resource_version;
}

//Now that we've done a full list, we can use the resource version and do a watch
//to stream updates efficiently
/* Now that we've done a full list, we can use the resource version and do a watch
* to stream updates efficiently
*/
c = make_event_watch_api_request(ctx, u_conn, max_resource_version);
if (!c) {
flb_plg_error(ins, "unable to create http client");
goto exit;
}
initialize_http_client(c, ctx);

// Watch will stream chunked json data, so we only send
// the http request, then use flb_http_get_available_chunks
// to attempt processing on available streamed data
/* Watch will stream chunked json data, so we only send
* the http request, then use flb_http_get_response_data
* to attempt processing on available streamed data
*/
b_sent = 0;
ret = flb_http_do_request(c, &b_sent);
if (ret != 0) {
Expand All @@ -876,13 +883,13 @@ static int k8s_events_collect(struct flb_input_instance *ins,
bytes_consumed = 0;
chunk_proc_ret = 0;
while ((ret == FLB_HTTP_MORE || ret == FLB_HTTP_CHUNK_AVAILABLE) && chunk_proc_ret == 0) {
ret = flb_http_get_available_chunks(c, bytes_consumed);
ret = flb_http_get_response_data(c, bytes_consumed);
bytes_consumed = 0;
if( c->resp.status == 200 && ret == FLB_HTTP_CHUNK_AVAILABLE ) {
chunk_proc_ret = process_http_chunk(ctx, c, &bytes_consumed);
}
}
// NOTE: skipping any processing after streaming socket closes
/* NOTE: skipping any processing after streaming socket closes */

if (c->resp.status != 200) {
flb_plg_warn(ins, "events watch failure, http_status=%d payload=%s", c->resp.status, c->resp.payload);
Expand Down
6 changes: 0 additions & 6 deletions src/flb_http_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1179,12 +1179,6 @@ int flb_http_bearer_auth(struct flb_http_client *c, const char *token)
return result;
}

/* flb_http_do_request only sends the http request the data.
* This is useful for processing the chunked responses on your own.
* If you do not want to process the response on your own or expect
* all response data before you process data, use flb_http_do instead.
*/
int flb_http_do_request(struct flb_http_client *c, size_t *bytes)
/* flb_http_do_request only sends the http request the data.
* This is useful for processing the chunked responses on your own.
* If you do not want to process the response on your own or expect
Expand Down

0 comments on commit 589a7cb

Please sign in to comment.