Skip to content

Commit

Permalink
in_kubernetes_events: consolidate record timestamp logic (fluent#8323)
Browse files Browse the repository at this point in the history
Signed-off-by: ryanohnemus <[email protected]>
Signed-off-by: ahspw <[email protected]>
  • Loading branch information
ryanohnemus authored and ahspw committed Jan 16, 2024
1 parent 92c8127 commit d55f53e
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 73 deletions.
80 changes: 27 additions & 53 deletions plugins/in_kubernetes_events/kubernetes_events.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,19 +167,6 @@ static int refresh_token_if_needed(struct k8s_events *ctx)

return 0;
}
static int timestamp_lookup(struct k8s_events *ctx, char *ts, struct flb_time *time)
{
struct flb_tm tm = { 0 };

if (flb_strptime(ts, "%Y-%m-%dT%H:%M:%SZ", &tm) == NULL) {
return -1;
}

time->tm.tv_sec = flb_parser_tm2time(&tm);
time->tm.tv_nsec = 0;

return 0;
}

static msgpack_object *record_get_field_ptr(msgpack_object *obj, const char *fieldname)
{
Expand Down Expand Up @@ -221,7 +208,7 @@ static int record_get_field_sds(msgpack_object *obj, const char *fieldname, flb_
return 0;
}

static int record_get_field_time(msgpack_object *obj, const char *fieldname, time_t *val)
static int record_get_field_time(msgpack_object *obj, const char *fieldname, struct flb_time *val)
{
msgpack_object *v;
struct flb_tm tm = { 0 };
Expand All @@ -238,7 +225,9 @@ static int record_get_field_time(msgpack_object *obj, const char *fieldname, tim
return -2;
}

*val = mktime(&tm.tm);
val->tm.tv_sec = flb_parser_tm2time(&tm);
val->tm.tv_nsec = 0;

return 0;
}

Expand Down Expand Up @@ -271,7 +260,7 @@ static int record_get_field_uint64(msgpack_object *obj, const char *fieldname, u
return -1;
}

static int item_get_timestamp(msgpack_object *obj, time_t *event_time)
static int item_get_timestamp(msgpack_object *obj, struct flb_time *event_time)
{
int ret;
msgpack_object *metadata;
Expand Down Expand Up @@ -301,25 +290,19 @@ static int item_get_timestamp(msgpack_object *obj, time_t *event_time)
return FLB_FALSE;
}

static bool check_event_is_filtered(struct k8s_events *ctx, msgpack_object *obj)
static bool check_event_is_filtered(struct k8s_events *ctx, msgpack_object *obj,
time_t* event_time)
{
int ret;
time_t event_time;
time_t now;
msgpack_object *metadata;
flb_sds_t uid;
uint64_t resource_version;

ret = item_get_timestamp(obj, &event_time);
if (ret == -FLB_FALSE) {
flb_plg_error(ctx->ins, "Cannot get timestamp for item in response");
return FLB_FALSE;
}

now = (time_t)(cfl_time_now() / 1000000000);
if (event_time < (now - ctx->retention_time)) {
if (*event_time < (now - ctx->retention_time)) {
flb_plg_debug(ctx->ins, "Item is older than retention_time: %ld < %ld",
event_time, (now - ctx->retention_time));
*event_time, (now - ctx->retention_time));
return FLB_TRUE;
}

Expand Down Expand Up @@ -373,7 +356,7 @@ static bool check_event_is_filtered(struct k8s_events *ctx, msgpack_object *obj)

// check if this is an old event.
if (ctx->last_resource_version && resource_version <= ctx->last_resource_version) {
flb_plg_debug(ctx->ins, "skipping old object: %lu (< %lu)", resource_version,
flb_plg_debug(ctx->ins, "skipping old object: %llu (< %llu)", resource_version,
ctx->last_resource_version);
flb_sds_destroy(uid);
return FLB_TRUE;
Expand All @@ -393,7 +376,6 @@ static int process_events(struct k8s_events *ctx, char *in_data, size_t in_size,
size_t buf_size;
size_t off = 0;
struct flb_time ts;
struct flb_ra_value *rval;
uint64_t resource_version;
msgpack_unpacked result;
msgpack_object root;
Expand Down Expand Up @@ -498,7 +480,14 @@ static int process_events(struct k8s_events *ctx, char *in_data, size_t in_size,
goto msg_error;
}

if (check_event_is_filtered(ctx, item) == FLB_TRUE) {
/* get event timestamp */
ret = item_get_timestamp(item, &ts);
if (ret == FLB_FALSE) {
flb_plg_error(ctx->ins, "cannot retrieve event timestamp");
goto msg_error;
}

if (check_event_is_filtered(ctx, item, &ts) == FLB_TRUE) {
continue;
}

Expand All @@ -508,21 +497,6 @@ static int process_events(struct k8s_events *ctx, char *in_data, size_t in_size,
}
#endif

/* get event timestamp */
rval = flb_ra_get_value_object(ctx->ra_timestamp, *item);
if (!rval || rval->type != FLB_RA_STRING) {
flb_plg_error(ctx->ins, "cannot retrieve event timestamp");
goto msg_error;
}

/* convert timestamp */
ret = timestamp_lookup(ctx, rval->val.string, &ts);
if (ret == -1) {
flb_plg_error(ctx->ins, "cannot lookup event timestamp");
flb_ra_key_value_destroy(rval);
goto msg_error;
}

/* encode content as a log event */
flb_log_event_encoder_begin_record(ctx->encoder);
flb_log_event_encoder_set_timestamp(ctx->encoder, &ts);
Expand All @@ -531,9 +505,8 @@ static int process_events(struct k8s_events *ctx, char *in_data, size_t in_size,
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_commit_record(ctx->encoder);
} else {
flb_plg_warn(ctx->ins, "unable to encode: %lu", resource_version);
flb_plg_warn(ctx->ins, "unable to encode: %llu", resource_version);
}
flb_ra_key_value_destroy(rval);
}

if (ctx->encoder->output_length > 0) {
Expand Down Expand Up @@ -616,7 +589,7 @@ static int k8s_events_sql_insert_event(struct k8s_events *ctx, msgpack_object *i
{
int ret;
uint64_t resource_version;
time_t last;
struct flb_time last;
msgpack_object *meta;
flb_sds_t uid;

Expand Down Expand Up @@ -654,21 +627,21 @@ static int k8s_events_sql_insert_event(struct k8s_events *ctx, msgpack_object *i
/* Bind parameters */
sqlite3_bind_text(ctx->stmt_insert_kubernetes_event, 1, uid, -1, 0);
sqlite3_bind_int64(ctx->stmt_insert_kubernetes_event, 2, resource_version);
sqlite3_bind_int64(ctx->stmt_insert_kubernetes_event, 3, (int64_t)last);
sqlite3_bind_int64(ctx->stmt_insert_kubernetes_event, 3, flb_time_to_nanosec(&last));

/* Run the insert */
ret = sqlite3_step(ctx->stmt_insert_kubernetes_event);
if (ret != SQLITE_DONE) {
sqlite3_clear_bindings(ctx->stmt_insert_kubernetes_event);
sqlite3_reset(ctx->stmt_insert_kubernetes_event);
flb_plg_error(ctx->ins, "cannot execute insert kubernetes event %s inode=%lu",
flb_plg_error(ctx->ins, "cannot execute insert kubernetes event %s inode=%llu",
uid, resource_version);
flb_sds_destroy(uid);
return -1;
}

flb_plg_debug(ctx->ins,
"inserted k8s event: uid=%s, resource_version=%lu, last=%ld",
"inserted k8s event: uid=%s, resource_version=%llu, last=%ld",
uid, resource_version, last);
sqlite3_clear_bindings(ctx->stmt_insert_kubernetes_event);
sqlite3_reset(ctx->stmt_insert_kubernetes_event);
Expand Down Expand Up @@ -745,7 +718,7 @@ static int k8s_events_collect(struct flb_input_instance *ins,
} while(continue_token != NULL);

if (max_resource_version > ctx->last_resource_version) {
flb_plg_debug(ctx->ins, "set last resourceVersion=%lu", max_resource_version);
flb_plg_debug(ctx->ins, "set last resourceVersion=%llu", max_resource_version);
ctx->last_resource_version = max_resource_version;
}

Expand Down Expand Up @@ -889,10 +862,11 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct k8s_events, namespace),
"kubernetes namespace to get events from, gets event from all namespaces by default."
},

{
FLB_CONFIG_MAP_STR, "timestamp_key", K8S_EVENTS_RA_TIMESTAMP,
FLB_CONFIG_MAP_STR, "timestamp_key", NULL,
0, FLB_TRUE, offsetof(struct k8s_events, timestamp_key),
"Record accessor for the timestamp from the event. Default is $lastTimestamp."
"Deprecated. To be removed in v3.0"
},

#ifdef FLB_HAVE_SQLDB
Expand Down
3 changes: 1 addition & 2 deletions plugins/in_kubernetes_events/kubernetes_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,10 @@ struct k8s_events {

struct flb_log_event_encoder *encoder;

/* timestamp key */
/* timestamp key - deprecated, to be removed in v3.0 */
flb_sds_t timestamp_key;

/* record accessor */
struct flb_record_accessor *ra_timestamp;
struct flb_record_accessor *ra_resource_version;

/* others */
Expand Down
17 changes: 0 additions & 17 deletions plugins/in_kubernetes_events/kubernetes_events_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ struct k8s_events *k8s_events_conf_create(struct flb_input_instance *ins)
int ret;
const char *p;
const char *url;
const char *timestampKey;
const char *tmp;
struct k8s_events *ctx = NULL;
pthread_mutexattr_t attr;
Expand Down Expand Up @@ -165,19 +164,6 @@ struct k8s_events *k8s_events_conf_create(struct flb_input_instance *ins)
return NULL;
}

/* Record accessor pattern */
timestampKey = flb_input_get_property("timestamp_key", ins);
if (!timestampKey ) {
timestampKey = K8S_EVENTS_RA_TIMESTAMP;
}
ctx->ra_timestamp = flb_ra_create(timestampKey, FLB_TRUE);
if (!ctx->ra_timestamp) {
flb_plg_error(ctx->ins,
"could not create record accessor for record timestamp");
k8s_events_conf_destroy(ctx);
return NULL;
}

ctx->ra_resource_version = flb_ra_create(K8S_EVENTS_RA_RESOURCE_VERSION, FLB_TRUE);
if (!ctx->ra_resource_version) {
flb_plg_error(ctx->ins, "could not create record accessor for resource version");
Expand Down Expand Up @@ -289,9 +275,6 @@ struct k8s_events *k8s_events_conf_create(struct flb_input_instance *ins)

void k8s_events_conf_destroy(struct k8s_events *ctx)
{
if (ctx->ra_timestamp) {
flb_ra_destroy(ctx->ra_timestamp);
}

if (ctx->ra_resource_version) {
flb_ra_destroy(ctx->ra_resource_version);
Expand Down
1 change: 0 additions & 1 deletion plugins/in_kubernetes_events/kubernetes_events_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
#define K8S_EVENTS_KUBE_TOKEN "/var/run/secrets/kubernetes.io/serviceaccount/token"
#define K8S_EVENTS_KUBE_CA "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"

#define K8S_EVENTS_RA_TIMESTAMP "$lastTimestamp"
#define K8S_EVENTS_RA_RESOURCE_VERSION "$metadata['resourceVersion']"

struct k8s_events *k8s_events_conf_create(struct flb_input_instance *ins);
Expand Down

0 comments on commit d55f53e

Please sign in to comment.