Skip to content

Commit

Permalink
in_kubernetes_events: add tests and move connection stream up to context
Browse files Browse the repository at this point in the history
Signed-off-by: ryanohnemus <[email protected]>
  • Loading branch information
ryanohnemus committed Apr 30, 2024
1 parent 540f537 commit 99bad45
Show file tree
Hide file tree
Showing 10 changed files with 564 additions and 50 deletions.
131 changes: 81 additions & 50 deletions plugins/in_kubernetes_events/kubernetes_events.c
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,6 @@ static int process_event_list(struct k8s_events *ctx, char *in_data, size_t in_s
}

static struct flb_http_client *make_event_watch_api_request(struct k8s_events *ctx,
struct flb_connection *u_conn,
uint64_t max_resource_version)
{
flb_sds_t url;
Expand All @@ -603,21 +602,20 @@ static struct flb_http_client *make_event_watch_api_request(struct k8s_events *c

flb_sds_printf(&url, "?watch=1&resourceVersion=%llu", max_resource_version);
flb_plg_info(ctx->ins, "Requesting %s", url);
c = flb_http_client(u_conn, FLB_HTTP_GET, url,
c = flb_http_client(ctx->current_connection, FLB_HTTP_GET, url,
NULL, 0, ctx->api_host, ctx->api_port, NULL, 0);
flb_sds_destroy(url);
return c;
}

static struct flb_http_client *make_event_list_api_request(struct k8s_events *ctx,
struct flb_connection *u_conn,
flb_sds_t continue_token)
{
flb_sds_t url;
struct flb_http_client *c;

if (continue_token == NULL && ctx->limit_request == 0 && ctx->namespace == NULL) {
return flb_http_client(u_conn, FLB_HTTP_GET, K8S_EVENTS_KUBE_API_URI,
return flb_http_client(ctx->current_connection, FLB_HTTP_GET, K8S_EVENTS_KUBE_API_URI,
NULL, 0, ctx->api_host, ctx->api_port, NULL, 0);
}

Expand All @@ -637,7 +635,7 @@ static struct flb_http_client *make_event_list_api_request(struct k8s_events *ct
}
flb_sds_printf(&url, "limit=%d", ctx->limit_request);
}
c = flb_http_client(u_conn, FLB_HTTP_GET, url,
c = flb_http_client(ctx->current_connection, FLB_HTTP_GET, url,
NULL, 0, ctx->api_host, ctx->api_port, NULL, 0);
flb_sds_destroy(url);
return c;
Expand Down Expand Up @@ -788,50 +786,50 @@ static void initialize_http_client(struct flb_http_client* c, struct k8s_events*
}
}

static int k8s_events_collect(struct flb_input_instance *ins,
struct flb_config *config, void *in_context)
static int check_and_init_stream(struct k8s_events *ctx)
{
int ret;
size_t b_sent;
struct flb_connection *u_conn = NULL;
struct flb_http_client *c = NULL;
struct k8s_events *ctx = in_context;
/* Returns FLB_TRUE if stream has been initialized */
flb_sds_t continue_token = NULL;
uint64_t max_resource_version = 0;
size_t bytes_consumed;
int chunk_proc_ret;
size_t b_sent;
int ret;
struct flb_http_client *c = NULL;

if (pthread_mutex_trylock(&ctx->lock) != 0) {
FLB_INPUT_RETURN(0);
/* if the streaming client is already active, just return it */
if(ctx->streaming_client) {
return FLB_TRUE;
}

u_conn = flb_upstream_conn_get(ctx->upstream);
if (!u_conn) {
flb_plg_error(ins, "upstream connection initialization error");
goto exit;
}
/* setup connection if one does not exist */
if(!ctx->current_connection) {
ctx->current_connection = flb_upstream_conn_get(ctx->upstream);
if (!ctx->current_connection) {
flb_plg_error(ctx->ins, "upstream connection initialization error");
goto failure;
}

ret = refresh_token_if_needed(ctx);
if (ret == -1) {
flb_plg_error(ctx->ins, "failed to refresh token");
goto exit;
ret = refresh_token_if_needed(ctx);
if (ret == -1) {
flb_plg_error(ctx->ins, "failed to refresh token");
goto failure;
}
}

do {
c = make_event_list_api_request(ctx, u_conn, continue_token);
c = make_event_list_api_request(ctx, continue_token);
if (continue_token != NULL) {
flb_sds_destroy(continue_token);
continue_token = NULL;
}
if (!c) {
flb_plg_error(ins, "unable to create http client");
goto exit;
flb_plg_error(ctx->ins, "unable to create http client");
goto failure;
}
initialize_http_client(c, ctx);
ret = flb_http_do(c, &b_sent);
if (ret != 0) {
flb_plg_error(ins, "http do error");
goto exit;
flb_plg_error(ctx->ins, "http do error");
goto failure;
}

if (c->resp.status == 200 && c->resp.payload_size > 0) {
Expand All @@ -846,7 +844,7 @@ static int k8s_events_collect(struct flb_input_instance *ins,
else {
flb_plg_error(ctx->ins, "http_status=%i", c->resp.status);
}
goto exit;
goto failure;
}
flb_http_client_destroy(c);
c = NULL;
Expand All @@ -860,48 +858,81 @@ static int k8s_events_collect(struct flb_input_instance *ins,
/* Now that we've done a full list, we can use the resource version and do a watch
* to stream updates efficiently
*/
c = make_event_watch_api_request(ctx, u_conn, max_resource_version);
if (!c) {
flb_plg_error(ins, "unable to create http client");
goto exit;
ctx->streaming_client = make_event_watch_api_request(ctx, max_resource_version);
if (!ctx->streaming_client) {
flb_plg_error(ctx->ins, "unable to create http client");
goto failure;
}
initialize_http_client(c, ctx);
initialize_http_client(ctx->streaming_client, ctx);

/* Watch will stream chunked json data, so we only send
* the http request, then use flb_http_get_response_data
* to attempt processing on available streamed data
*/
b_sent = 0;
ret = flb_http_do_request(c, &b_sent);
ret = flb_http_do_request(ctx->streaming_client, &b_sent);
if (ret != 0) {
flb_plg_error(ins, "http do request error");
goto exit;
flb_plg_error(ctx->ins, "http do request error");
goto failure;
}

return FLB_TRUE;

failure:
if (c) {
flb_http_client_destroy(c);
}
if (ctx->streaming_client) {
flb_http_client_destroy(ctx->streaming_client);
ctx->streaming_client = NULL;
}
if (ctx->current_connection) {
flb_upstream_conn_release(ctx->current_connection);
ctx->current_connection = NULL;
}
return FLB_FALSE;
}

static int k8s_events_collect(struct flb_input_instance *ins,
struct flb_config *config, void *in_context)
{
int ret;
struct k8s_events *ctx = in_context;
size_t bytes_consumed;
int chunk_proc_ret;

if (pthread_mutex_trylock(&ctx->lock) != 0) {
FLB_INPUT_RETURN(0);
}

if (check_and_init_stream(ctx) == FLB_FALSE) {
FLB_INPUT_RETURN(0);
}

ret = FLB_HTTP_MORE;
bytes_consumed = 0;
chunk_proc_ret = 0;
while ((ret == FLB_HTTP_MORE || ret == FLB_HTTP_CHUNK_AVAILABLE) && chunk_proc_ret == 0) {
ret = flb_http_get_response_data(c, bytes_consumed);
ret = flb_http_get_response_data(ctx->streaming_client, bytes_consumed);
bytes_consumed = 0;
if( c->resp.status == 200 && ret == FLB_HTTP_CHUNK_AVAILABLE ) {
chunk_proc_ret = process_http_chunk(ctx, c, &bytes_consumed);
if(ctx->streaming_client->resp.status == 200 && ret == FLB_HTTP_CHUNK_AVAILABLE ) {
chunk_proc_ret = process_http_chunk(ctx, ctx->streaming_client, &bytes_consumed);
}
}
/* NOTE: skipping any processing after streaming socket closes */

if (c->resp.status != 200) {
flb_plg_warn(ins, "events watch failure, http_status=%d payload=%s", c->resp.status, c->resp.payload);
if (ctx->streaming_client->resp.status != 200) {
flb_plg_warn(ins, "events watch failure, http_status=%d payload=%s",
ctx->streaming_client->resp.status, ctx->streaming_client->resp.payload);

flb_http_client_destroy(ctx->streaming_client);
flb_upstream_conn_release(ctx->current_connection);
ctx->streaming_client = NULL;
ctx->current_connection = NULL;
}

exit:
pthread_mutex_unlock(&ctx->lock);
if (c) {
flb_http_client_destroy(c);
}
if (u_conn) {
flb_upstream_conn_release(u_conn);
}
FLB_INPUT_RETURN(0);
}

Expand Down
3 changes: 3 additions & 0 deletions plugins/in_kubernetes_events/kubernetes_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ struct k8s_events {
struct flb_upstream *upstream;
struct flb_input_instance *ins;

struct flb_connection *current_connection;
struct flb_http_client *streaming_client;

/* limit for event queries */
int limit_request;
/* last highest seen resource_version */
Expand Down
10 changes: 10 additions & 0 deletions plugins/in_kubernetes_events/kubernetes_events_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ static int network_init(struct k8s_events *ctx, struct flb_config *config)
int io_type = FLB_IO_TCP;

ctx->upstream = NULL;
ctx->current_connection = NULL;
ctx->streaming_client = NULL;

if (ctx->api_https == FLB_TRUE) {
if (!ctx->tls_ca_path && !ctx->tls_ca_file) {
Expand Down Expand Up @@ -280,6 +282,14 @@ void k8s_events_conf_destroy(struct k8s_events *ctx)
flb_ra_destroy(ctx->ra_resource_version);
}

if(ctx->streaming_client) {
flb_http_client_destroy(ctx->streaming_client);
}

if(ctx->current_connection) {
flb_upstream_conn_release(ctx->current_connection);
}

if (ctx->upstream) {
flb_upstream_destroy(ctx->upstream);
}
Expand Down
1 change: 1 addition & 0 deletions tests/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ if(FLB_OUT_LIB)
FLB_RT_TEST(FLB_IN_TCP "in_tcp.c")
FLB_RT_TEST(FLB_IN_FORWARD "in_forward.c")
FLB_RT_TEST(FLB_IN_FLUENTBIT_METRICS "in_fluentbit_metrics.c")
FLB_RT_TEST(FLB_IN_KUBERNETES_EVENTS "in_kubernetes_events.c")
endif()

# Filter Plugins
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{
"kind": "EventList",
"apiVersion": "v1",
"metadata": {
"resourceVersion": "177157"
},
"items": [
{
"metadata": {
"name": "fluent-bit-78945dccd8-2g7qg.17a3c80ba0453aee",
"namespace": "default",
"uid": "6e3013d5-a79b-4dc4-b6c0-6b652302672e",
"resourceVersion": "176761",
"creationTimestamp": "2023-12-24T13:37:16Z",
"managedFields": [
{
"manager": "kube-scheduler",
"operation": "Update",
"apiVersion": "events.k8s.io/v1",
"time": "2023-12-24T13:37:16Z",
"fieldsType": "FieldsV1",
"fieldsV1": {
"f:action": {},
"f:eventTime": {},
"f:note": {},
"f:reason": {},
"f:regarding": {},
"f:reportingController": {},
"f:reportingInstance": {},
"f:type": {}
}
}
]
},
"involvedObject": {
"kind": "Pod",
"namespace": "default",
"name": "fluent-bit-78945dccd8-2g7qg",
"uid": "ed7de8ff-61fb-40bb-9ecb-55a801a4cd89",
"apiVersion": "v1",
"resourceVersion": "176749"
},
"reason": "FailedScheduling",
"message": "0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..",
"source": {},
"firstTimestamp": null,
"lastTimestamp": null,
"type": "Warning",
"eventTime": "2023-12-24T13:37:16.335172Z",
"action": "Scheduling",
"reportingComponent": "default-scheduler",
"reportingInstance": "default-scheduler-minikube"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[1703425036.000000,{"metadata":{"name":"fluent-bit-78945dccd8-2g7qg.17a3c80ba0453aee","namespace":"default","uid":"6e3013d5-a79b-4dc4-b6c0-6b652302672e","resourceVersion":"176761","creationTimestamp":"2023-12-24T13:37:16Z","managedFields":[{"manager":"kube-scheduler","operation":"Update","apiVersion":"events.k8s.io/v1","time":"2023-12-24T13:37:16Z","fieldsType":"FieldsV1","fieldsV1":{"f:action":{},"f:eventTime":{},"f:note":{},"f:reason":{},"f:regarding":{},"f:reportingController":{},"f:reportingInstance":{},"f:type":{}}}]},"involvedObject":{"kind":"Pod","namespace":"default","name":"fluent-bit-78945dccd8-2g7qg","uid":"ed7de8ff-61fb-40bb-9ecb-55a801a4cd89","apiVersion":"v1","resourceVersion":"176749"},"reason":"FailedScheduling","message":"0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..","source":{},"firstTimestamp":null,"lastTimestamp":null,"type":"Warning","eventTime":"2023-12-24T13:37:16.335172Z","action":"Scheduling","reportingComponent":"default-scheduler","reportingInstance":"default-scheduler-minikube"}]
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{
"kind": "EventList",
"apiVersion": "v1",
"metadata": {
"resourceVersion": "177157"
},
"items": [
{
"metadata": {
"name": ".17a3ba8b4aa36c81",
"namespace": "default",
"uid": "ec5546b7-f1b9-4e61-a90c-a1f3b611edbc",
"resourceVersion": "174688",
"creationTimestamp": "2023-12-24T09:30:07Z",
"managedFields": [
{
"manager": "storage-provisioner",
"operation": "Update",
"apiVersion": "v1",
"time": "2023-12-24T09:30:07Z",
"fieldsType": "FieldsV1",
"fieldsV1": {
"f:count": {},
"f:firstTimestamp": {},
"f:involvedObject": {},
"f:lastTimestamp": {},
"f:message": {},
"f:reason": {},
"f:source": {
"f:component": {}
},
"f:type": {}
}
}
]
},
"involvedObject": {
"kind": "Endpoints",
"apiVersion": "v1"
},
"reason": "LeaderElection",
"message": "minikube_31f5cdfb-29b0-4f84-9f9c-585088e9235f stopped leading",
"source": {
"component": "k8s.io/minikube-hostpath_minikube_31f5cdfb-29b0-4f84-9f9c-585088e9235f"
},
"firstTimestamp": "2023-12-24T09:29:51Z",
"lastTimestamp": "2023-12-24T09:29:51Z",
"count": 1,
"type": "Normal",
"eventTime": null,
"reportingComponent": "",
"reportingInstance": ""
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[1703410191.000000,{"metadata":{"name":".17a3ba8b4aa36c81","namespace":"default","uid":"ec5546b7-f1b9-4e61-a90c-a1f3b611edbc","resourceVersion":"174688","creationTimestamp":"2023-12-24T09:30:07Z","managedFields":[{"manager":"storage-provisioner","operation":"Update","apiVersion":"v1","time":"2023-12-24T09:30:07Z","fieldsType":"FieldsV1","fieldsV1":{"f:count":{},"f:firstTimestamp":{},"f:involvedObject":{},"f:lastTimestamp":{},"f:message":{},"f:reason":{},"f:source":{"f:component":{}},"f:type":{}}}]},"involvedObject":{"kind":"Endpoints","apiVersion":"v1"},"reason":"LeaderElection","message":"minikube_31f5cdfb-29b0-4f84-9f9c-585088e9235f stopped leading","source":{"component":"k8s.io/minikube-hostpath_minikube_31f5cdfb-29b0-4f84-9f9c-585088e9235f"},"firstTimestamp":"2023-12-24T09:29:51Z","lastTimestamp":"2023-12-24T09:29:51Z","count":1,"type":"Normal","eventTime":null,"reportingComponent":"","reportingInstance":""}]
1 change: 1 addition & 0 deletions tests/runtime/data/in_kubernetes_events/token
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fakeTokenFile
Loading

0 comments on commit 99bad45

Please sign in to comment.