Skip to content

Commit

Permalink
in_kubernetes_events: consolidate record timestamp logic
Browse files Browse the repository at this point in the history
Signed-off-by: ryanohnemus <[email protected]>
  • Loading branch information
ryanohnemus committed Jan 3, 2024
1 parent 3a70346 commit 14b804c
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 24 deletions.
36 changes: 18 additions & 18 deletions plugins/in_kubernetes_events/kubernetes_events.c
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ static int record_get_field_sds(msgpack_object *obj, const char *fieldname, flb_
return 0;
}

static int record_get_field_time(msgpack_object *obj, const char *fieldname, time_t *val)
static int record_get_field_time(msgpack_object *obj, const char *fieldname, struct flb_time *val)
{
msgpack_object *v;
struct flb_tm tm = { 0 };
Expand All @@ -227,7 +227,9 @@ static int record_get_field_time(msgpack_object *obj, const char *fieldname, tim
return -2;
}

*val = mktime(&tm.tm);
val->tm.tv_sec = flb_parser_tm2time(&tm);
val->tm.tv_nsec = 0;

return 0;
}

Expand Down Expand Up @@ -260,7 +262,7 @@ static int record_get_field_uint64(msgpack_object *obj, const char *fieldname, u
return -1;
}

static int item_get_timestamp(msgpack_object *obj, time_t *event_time)
static int item_get_timestamp(msgpack_object *obj, struct flb_time *event_time)
{
int ret;
msgpack_object *metadata;
Expand Down Expand Up @@ -291,18 +293,18 @@ static int item_get_timestamp(msgpack_object *obj, time_t *event_time)
}

static bool check_event_is_filtered(struct k8s_events *ctx, msgpack_object *obj,
time_t* event_time)
struct flb_time* event_time)
{
int ret;
time_t now;
uint64_t outdated;
msgpack_object *metadata;
flb_sds_t uid;
uint64_t resource_version;

now = (time_t)(cfl_time_now() / 1000000000);
if (*event_time < (now - ctx->retention_time)) {
outdated = cfl_time_now() - (ctx->retention_time * 1000000000);
if (flb_time_to_nanosec(event_time) < outdated) {
flb_plg_debug(ctx->ins, "Item is older than retention_time: %ld < %ld",
*event_time, (now - ctx->retention_time));
flb_time_to_nanosec(event_time), outdated);
return FLB_TRUE;
}

Expand Down Expand Up @@ -407,7 +409,7 @@ static int process_event_object(struct k8s_events* ctx, flb_sds_t action,
return -1;
}

if (check_event_is_filtered(ctx, item, (time_t)&ts) == FLB_TRUE) {
if (check_event_is_filtered(ctx, item, &ts) == FLB_TRUE) {
return 0;
}

Expand Down Expand Up @@ -489,8 +491,6 @@ static int process_event_list(struct k8s_events *ctx, char *in_data, size_t in_s
char *buf_data;
size_t buf_size;
size_t off = 0;
struct flb_time ts;
uint64_t resource_version;
msgpack_unpacked result;
msgpack_object root;
msgpack_object k;
Expand Down Expand Up @@ -671,7 +671,7 @@ static int k8s_events_sql_insert_event(struct k8s_events *ctx, msgpack_object *i
{
int ret;
uint64_t resource_version;
time_t last;
struct flb_time last;
msgpack_object *meta;
flb_sds_t uid;

Expand Down Expand Up @@ -709,7 +709,7 @@ static int k8s_events_sql_insert_event(struct k8s_events *ctx, msgpack_object *i
/* Bind parameters */
sqlite3_bind_text(ctx->stmt_insert_kubernetes_event, 1, uid, -1, 0);
sqlite3_bind_int64(ctx->stmt_insert_kubernetes_event, 2, resource_version);
sqlite3_bind_int64(ctx->stmt_insert_kubernetes_event, 3, (int64_t)last);
sqlite3_bind_int64(ctx->stmt_insert_kubernetes_event, 3, flb_time_to_nanosec(&last));

/* Run the insert */
ret = sqlite3_step(ctx->stmt_insert_kubernetes_event);
Expand All @@ -723,8 +723,8 @@ static int k8s_events_sql_insert_event(struct k8s_events *ctx, msgpack_object *i
}

flb_plg_debug(ctx->ins,
"inserted k8s event: uid=%s, resource_version=%llu, last=%ld",
uid, resource_version, last);
"inserted k8s event: uid=%s, resource_version=%llu, last=%llu",
uid, resource_version, flb_time_to_nanosec(&last));
sqlite3_clear_bindings(ctx->stmt_insert_kubernetes_event);
sqlite3_reset(ctx->stmt_insert_kubernetes_event);

Expand Down Expand Up @@ -847,7 +847,7 @@ static int k8s_events_collect(struct flb_input_instance *ins,
} while(continue_token != NULL);

if (max_resource_version > ctx->last_resource_version) {
flb_plg_debug(ctx->ins, "set last resourceVersion=%lu", max_resource_version);
flb_plg_debug(ctx->ins, "set last resourceVersion=%llu", max_resource_version);
ctx->last_resource_version = max_resource_version;
}

Expand All @@ -861,7 +861,7 @@ static int k8s_events_collect(struct flb_input_instance *ins,
initialize_http_client(c, ctx);

// Watch will stream chunked json data, so we only send
// the http request, then use flb_http_get_available_chunks
// the http request, then use flb_http_get_response_data
// to attempt processing on available streamed data
b_sent = 0;
ret = flb_http_do_request(c, &b_sent);
Expand All @@ -874,7 +874,7 @@ static int k8s_events_collect(struct flb_input_instance *ins,
bytes_consumed = 0;
chunk_proc_ret = 0;
while ((ret == FLB_HTTP_MORE || ret == FLB_HTTP_CHUNK_AVAILABLE) && chunk_proc_ret == 0) {
ret = flb_http_get_available_chunks(c, bytes_consumed);
ret = flb_http_get_response_data(c, bytes_consumed);
bytes_consumed = 0;
if( c->resp.status == 200 && ret == FLB_HTTP_CHUNK_AVAILABLE ) {
chunk_proc_ret = process_http_chunk(ctx, c, &bytes_consumed);
Expand Down
6 changes: 0 additions & 6 deletions src/flb_http_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1179,12 +1179,6 @@ int flb_http_bearer_auth(struct flb_http_client *c, const char *token)
return result;
}

/* flb_http_do_request only sends the http request the data.
* This is useful for processing the chunked responses on your own.
* If you do not want to process the response on your own or expect
* all response data before you process data, use flb_http_do instead.
*/
int flb_http_do_request(struct flb_http_client *c, size_t *bytes)
/* flb_http_do_request only sends the http request the data.
* This is useful for processing the chunked responses on your own.
* If you do not want to process the response on your own or expect
Expand Down

0 comments on commit 14b804c

Please sign in to comment.