diff --git a/plugins/in_kubernetes_events/kubernetes_events.c b/plugins/in_kubernetes_events/kubernetes_events.c index b5f90a15015..00d2c49c84e 100644 --- a/plugins/in_kubernetes_events/kubernetes_events.c +++ b/plugins/in_kubernetes_events/kubernetes_events.c @@ -167,19 +167,6 @@ static int refresh_token_if_needed(struct k8s_events *ctx) return 0; } -static int timestamp_lookup(struct k8s_events *ctx, char *ts, struct flb_time *time) -{ - struct flb_tm tm = { 0 }; - - if (flb_strptime(ts, "%Y-%m-%dT%H:%M:%SZ", &tm) == NULL) { - return -1; - } - - time->tm.tv_sec = flb_parser_tm2time(&tm); - time->tm.tv_nsec = 0; - - return 0; -} static msgpack_object *record_get_field_ptr(msgpack_object *obj, const char *fieldname) { @@ -301,25 +288,19 @@ static int item_get_timestamp(msgpack_object *obj, time_t *event_time) return FLB_FALSE; } -static bool check_event_is_filtered(struct k8s_events *ctx, msgpack_object *obj) +static bool check_event_is_filtered(struct k8s_events *ctx, msgpack_object *obj, + time_t* event_time) { int ret; - time_t event_time; time_t now; msgpack_object *metadata; flb_sds_t uid; uint64_t resource_version; - ret = item_get_timestamp(obj, &event_time); - if (ret == -FLB_FALSE) { - flb_plg_error(ctx->ins, "Cannot get timestamp for item in response"); - return FLB_FALSE; - } - now = (time_t)(cfl_time_now() / 1000000000); - if (event_time < (now - ctx->retention_time)) { + if (*event_time < (now - ctx->retention_time)) { flb_plg_debug(ctx->ins, "Item is older than retention_time: %ld < %ld", - event_time, (now - ctx->retention_time)); + *event_time, (now - ctx->retention_time)); return FLB_TRUE; } @@ -393,7 +374,6 @@ static int process_events(struct k8s_events *ctx, char *in_data, size_t in_size, size_t buf_size; size_t off = 0; struct flb_time ts; - struct flb_ra_value *rval; uint64_t resource_version; msgpack_unpacked result; msgpack_object root; @@ -498,7 +478,14 @@ static int process_events(struct k8s_events *ctx, char *in_data, size_t in_size, goto msg_error; } - if (check_event_is_filtered(ctx, item) == FLB_TRUE) { + /* get event timestamp */ + ret = item_get_timestamp(item, &ts); + if (ret == FLB_FALSE) { + flb_plg_error(ctx->ins, "cannot retrieve event timestamp"); + goto msg_error; + } + + if (check_event_is_filtered(ctx, item, (time_t)&ts) == FLB_TRUE) { continue; } @@ -508,21 +495,6 @@ static int process_events(struct k8s_events *ctx, char *in_data, size_t in_size, } #endif - /* get event timestamp */ - rval = flb_ra_get_value_object(ctx->ra_timestamp, *item); - if (!rval || rval->type != FLB_RA_STRING) { - flb_plg_error(ctx->ins, "cannot retrieve event timestamp"); - goto msg_error; - } - - /* convert timestamp */ - ret = timestamp_lookup(ctx, rval->val.string, &ts); - if (ret == -1) { - flb_plg_error(ctx->ins, "cannot lookup event timestamp"); - flb_ra_key_value_destroy(rval); - goto msg_error; - } - /* encode content as a log event */ flb_log_event_encoder_begin_record(ctx->encoder); flb_log_event_encoder_set_timestamp(ctx->encoder, &ts); @@ -533,7 +505,6 @@ static int process_events(struct k8s_events *ctx, char *in_data, size_t in_size, } else { flb_plg_warn(ctx->ins, "unable to encode: %lu", resource_version); } - flb_ra_key_value_destroy(rval); } if (ctx->encoder->output_length > 0) { @@ -889,11 +860,6 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct k8s_events, namespace), "kubernetes namespace to get events from, gets event from all namespaces by default." }, - { - FLB_CONFIG_MAP_STR, "timestamp_key", K8S_EVENTS_RA_TIMESTAMP, - 0, FLB_TRUE, offsetof(struct k8s_events, timestamp_key), - "Record accessor for the timestamp from the event. Default is $lastTimestamp." - }, #ifdef FLB_HAVE_SQLDB { diff --git a/plugins/in_kubernetes_events/kubernetes_events.h b/plugins/in_kubernetes_events/kubernetes_events.h index cb1e7233775..85b50ac148c 100644 --- a/plugins/in_kubernetes_events/kubernetes_events.h +++ b/plugins/in_kubernetes_events/kubernetes_events.h @@ -74,11 +74,7 @@ struct k8s_events { struct flb_log_event_encoder *encoder; - /* timestamp key */ - flb_sds_t timestamp_key; - /* record accessor */ - struct flb_record_accessor *ra_timestamp; struct flb_record_accessor *ra_resource_version; /* others */ diff --git a/plugins/in_kubernetes_events/kubernetes_events_conf.c b/plugins/in_kubernetes_events/kubernetes_events_conf.c index 2d4ed6f17cb..725d2b916de 100644 --- a/plugins/in_kubernetes_events/kubernetes_events_conf.c +++ b/plugins/in_kubernetes_events/kubernetes_events_conf.c @@ -135,7 +135,6 @@ struct k8s_events *k8s_events_conf_create(struct flb_input_instance *ins) int ret; const char *p; const char *url; - const char *timestampKey; const char *tmp; struct k8s_events *ctx = NULL; pthread_mutexattr_t attr; @@ -165,19 +164,6 @@ struct k8s_events *k8s_events_conf_create(struct flb_input_instance *ins) return NULL; } - /* Record accessor pattern */ - timestampKey = flb_input_get_property("timestamp_key", ins); - if (!timestampKey ) { - timestampKey = K8S_EVENTS_RA_TIMESTAMP; - } - ctx->ra_timestamp = flb_ra_create(timestampKey, FLB_TRUE); - if (!ctx->ra_timestamp) { - flb_plg_error(ctx->ins, - "could not create record accessor for record timestamp"); - k8s_events_conf_destroy(ctx); - return NULL; - } - ctx->ra_resource_version = flb_ra_create(K8S_EVENTS_RA_RESOURCE_VERSION, FLB_TRUE); if (!ctx->ra_resource_version) { flb_plg_error(ctx->ins, "could not create record accessor for resource version"); @@ -289,9 +275,6 @@ struct k8s_events *k8s_events_conf_create(struct flb_input_instance *ins) void k8s_events_conf_destroy(struct k8s_events *ctx) { - if (ctx->ra_timestamp) { - flb_ra_destroy(ctx->ra_timestamp); - } if (ctx->ra_resource_version) { flb_ra_destroy(ctx->ra_resource_version); diff --git a/plugins/in_kubernetes_events/kubernetes_events_conf.h b/plugins/in_kubernetes_events/kubernetes_events_conf.h index 73b48c323f3..734765626e3 100644 --- a/plugins/in_kubernetes_events/kubernetes_events_conf.h +++ b/plugins/in_kubernetes_events/kubernetes_events_conf.h @@ -38,7 +38,6 @@ #define K8S_EVENTS_KUBE_TOKEN "/var/run/secrets/kubernetes.io/serviceaccount/token" #define K8S_EVENTS_KUBE_CA "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" -#define K8S_EVENTS_RA_TIMESTAMP "$lastTimestamp" #define K8S_EVENTS_RA_RESOURCE_VERSION "$metadata['resourceVersion']" struct k8s_events *k8s_events_conf_create(struct flb_input_instance *ins);