From aa61aed30b76a82e2117aa5655cff23d0c969b4a Mon Sep 17 00:00:00 2001 From: ryanohnemus Date: Thu, 1 Aug 2024 13:52:55 -0500 Subject: [PATCH] in_kubernetes_events: add chunked streaming test Signed-off-by: ryanohnemus --- .../watch_v1_with_lastTimestamp.json | 1 + .../watch_v1_with_lastTimestamp.out | 1 + tests/runtime/in_kubernetes_events.c | 141 ++++++++++++++---- 3 files changed, 116 insertions(+), 27 deletions(-) create mode 100644 tests/runtime/data/in_kubernetes_events/watch_v1_with_lastTimestamp.json create mode 100644 tests/runtime/data/in_kubernetes_events/watch_v1_with_lastTimestamp.out diff --git a/tests/runtime/data/in_kubernetes_events/watch_v1_with_lastTimestamp.json b/tests/runtime/data/in_kubernetes_events/watch_v1_with_lastTimestamp.json new file mode 100644 index 00000000000..5203d11b13f --- /dev/null +++ b/tests/runtime/data/in_kubernetes_events/watch_v1_with_lastTimestamp.json @@ -0,0 +1 @@ +{"type":"MODIFIED","object":{"kind":"Event","apiVersion":"v1","metadata":{"name":"fluent-bit-78945dccd8-hvr55.17e75f85e7d9e678","namespace":"default","uid":"b7cb03e8-0e0b-4e02-971d-24807f563d43","resourceVersion":"177158","creationTimestamp":"2024-07-31T18:26:51Z","managedFields":[{"manager":"kubelet","operation":"Update","apiVersion":"v1","time":"2024-07-31T18:47:15Z","fieldsType":"FieldsV1","fieldsV1":{"f:count":{},"f:firstTimestamp":{},"f:involvedObject":{},"f:lastTimestamp":{},"f:message":{},"f:reason":{},"f:reportingComponent":{},"f:reportingInstance":{},"f:source":{"f:component":{},"f:host":{}},"f:type":{}}}]},"involvedObject":{"kind":"Pod","namespace":"default","name":"fluent-bit-78945dccd8-hvr55","uid":"d5cd8257-e28a-4e64-8b29-6358309d7196","apiVersion":"v1","resourceVersion":"177159"},"reason":"FailedMount","message":"MountVolume.SetUp failed for volume \"config-volume\" : configmap \"fluent-bit-config\" not found","source":{"component":"kubelet","host":"minikube"},"firstTimestamp":"2024-07-31T18:26:51Z","lastTimestamp":"2024-07-31T18:47:15Z","count":16,"type":"Warning","eventTime":null,"reportingComponent":"kubelet","reportingInstance":"minikube"}} \ No newline at end of file diff --git a/tests/runtime/data/in_kubernetes_events/watch_v1_with_lastTimestamp.out b/tests/runtime/data/in_kubernetes_events/watch_v1_with_lastTimestamp.out new file mode 100644 index 00000000000..1c33f24ef21 --- /dev/null +++ b/tests/runtime/data/in_kubernetes_events/watch_v1_with_lastTimestamp.out @@ -0,0 +1 @@ +[1722451635.000000,{"kind":"Event","apiVersion":"v1","metadata":{"name":"fluent-bit-78945dccd8-hvr55.17e75f85e7d9e678","namespace":"default","uid":"b7cb03e8-0e0b-4e02-971d-24807f563d43","resourceVersion":"177158","creationTimestamp":"2024-07-31T18:26:51Z","managedFields":[{"manager":"kubelet","operation":"Update","apiVersion":"v1","time":"2024-07-31T18:47:15Z","fieldsType":"FieldsV1","fieldsV1":{"f:count":{},"f:firstTimestamp":{},"f:involvedObject":{},"f:lastTimestamp":{},"f:message":{},"f:reason":{},"f:reportingComponent":{},"f:reportingInstance":{},"f:source":{"f:component":{},"f:host":{}},"f:type":{}}}]},"involvedObject":{"kind":"Pod","namespace":"default","name":"fluent-bit-78945dccd8-hvr55","uid":"d5cd8257-e28a-4e64-8b29-6358309d7196","apiVersion":"v1","resourceVersion":"177159"},"reason":"FailedMount","message":"MountVolume.SetUp failed for volume \"config-volume\" : configmap \"fluent-bit-config\" not found","source":{"component":"kubelet","host":"minikube"},"firstTimestamp":"2024-07-31T18:26:51Z","lastTimestamp":"2024-07-31T18:47:15Z","count":16,"type":"Warning","eventTime":null,"reportingComponent":"kubelet","reportingInstance":"minikube"}] \ No newline at end of file diff --git a/tests/runtime/in_kubernetes_events.c b/tests/runtime/in_kubernetes_events.c index de054fac2c7..6fd0e21a413 100644 --- a/tests/runtime/in_kubernetes_events.c +++ b/tests/runtime/in_kubernetes_events.c @@ -52,6 +52,8 @@ struct test_k8s_server_ctx { int mq_id; /* Message Queue ID */ struct mk_event_loop *evl; char json_input_file[1024]; + char json_input_file_to_stream[1024]; + int chunk_size; /* send messages in http chunks of this size, if 0 send all data */ }; @@ -112,26 +114,33 @@ static flb_sds_t read_file(const char *filename) /* Callback to check expected results */ static int cb_check_result_json(void *record, size_t size, void *data) { - char *p; + char *p = NULL; flb_sds_t expected; char *result; int num = get_output_num(); - const char *filename; - char full_filename[1024]; + flb_sds_t filename = NULL; set_output_num(num+1); - filename = (const char *) data; - result = (char *) record; + /* Note this is probably confusing, but we expected 1 record from the event list, and 1 record from the stream + * these should be output num 0 and num 1 respectively, each json file should have a corresponding .out + */ + struct test_k8s_server_ctx *k8s_server = data; + if (num == 0) { + filename = flb_sds_create_len(k8s_server->json_input_file, strlen(k8s_server->json_input_file)-5); //remove .json + } else { + filename = flb_sds_create_len(k8s_server->json_input_file_to_stream, strlen(k8s_server->json_input_file_to_stream)-5); + } + flb_sds_cat(filename, ".out", 4); - sprintf(full_filename, "%s/%s.out", IN_KUBERNETES_EVENTS_DATA_PATH, filename); - expected = read_file(full_filename); + result = (char *) record; + expected = read_file(filename); p = strstr(result, expected); TEST_CHECK(p != NULL); if (p == NULL) { - flb_error("Expected to find: '%s' in result '%s'", + flb_error("Expected to find: '%s' \nin result '%s'", expected, result); } @@ -139,6 +148,9 @@ static int cb_check_result_json(void *record, size_t size, void *data) if (expected) { flb_sds_destroy(expected); } + if (filename) { + flb_sds_destroy(filename); + } return 0; } @@ -146,27 +158,52 @@ static void cb_root(mk_request_t *request, void *data) { flb_sds_t payload; struct test_k8s_server_ctx *server = data; - payload = read_file(server->json_input_file); if (request->query_string.data && strstr(request->query_string.data, "watch=1") != NULL) { - // NOTE/TODO: stream via watch not currently supported, this should become 200 status - // and chunked response when we do support it - mk_http_status(request, 500); - mk_http_done(request); + mk_http_status(request, 200); + mk_http_header(request, "Content-Type", 12, JSON_CONTENT_TYPE, 16); + + if(strlen(server->json_input_file_to_stream) > 0) { + payload = read_file(server->json_input_file_to_stream); + char* start = payload; + int maxSize = server->chunk_size; + int totalSent = 0; + while(totalSent < strlen(payload)) { + if(strlen(start) < server->chunk_size) { + maxSize = strlen(start); + } + mk_http_send(request, start, maxSize, NULL); + + start += maxSize; + totalSent += maxSize; + + flb_time_msleep(500); + } + + // ensure we send an end of json delimeter so kubernetes_events plugin knows there's a json message + if(payload[strlen(payload)-1] != '\n') { + mk_http_send(request, "\n", 1, NULL); + } + flb_sds_destroy(payload); + } + /* mk_http_done(request); not used for our chunked send test */ } else { + payload = read_file(server->json_input_file); + + /* we don't use chunk_size in the non-streamed requests, but we could */ mk_http_status(request, 200); mk_http_header(request, "Content-Type", 12, JSON_CONTENT_TYPE, 16); mk_http_send(request, payload, strlen(payload), NULL); mk_http_done(request); - } - if (payload) { flb_sds_destroy(payload); } + } -struct test_k8s_server_ctx *initialize_mock_k8s_api(const char* filename) +struct test_k8s_server_ctx *initialize_mock_k8s_api(const char* filename, + const char* stream_filename, int chunk_size) { int vid; char tmp[32]; @@ -181,6 +218,15 @@ struct test_k8s_server_ctx *initialize_mock_k8s_api(const char* filename) sprintf(server->json_input_file, "%s/%s.json", IN_KUBERNETES_EVENTS_DATA_PATH, filename); + /* setup info for streamed events, if any */ + server->chunk_size = chunk_size; + if (strlen(stream_filename) > 0) { + sprintf(server->json_input_file_to_stream, "%s/%s.json", + IN_KUBERNETES_EVENTS_DATA_PATH, stream_filename); + } else { + memset(server->json_input_file_to_stream, 0, sizeof(server->json_input_file_to_stream)); + } + /* Create HTTP server context */ server->ctx = mk_create(); if (!server->ctx) { @@ -226,7 +272,7 @@ static struct test_ctx *test_ctx_create(struct flb_lib_out_cb *data) flb_service_set(ctx->flb, "Flush", "0.200000000", "Grace", "3", - "Log_Level", "debug", + "Log_Level", "trace", NULL); /* Input */ @@ -284,8 +330,12 @@ void flb_test_events_v1_with_lastTimestamp() clear_output_num(); + struct test_k8s_server_ctx* k8s_server = initialize_mock_k8s_api( + filename, "", 0 + ); + cb_data.cb = cb_check_result_json; - cb_data.data = (void *)filename; + cb_data.data = (void *)k8s_server; ctx = test_ctx_create(&cb_data); if (!TEST_CHECK(ctx != NULL)) { @@ -293,10 +343,6 @@ void flb_test_events_v1_with_lastTimestamp() exit(EXIT_FAILURE); } - struct test_k8s_server_ctx* k8s_server = initialize_mock_k8s_api( - filename - ); - ret = flb_start(ctx->flb); TEST_CHECK(ret == 0); @@ -321,8 +367,12 @@ void flb_test_events_v1_with_creationTimestamp() clear_output_num(); + struct test_k8s_server_ctx* k8s_server = initialize_mock_k8s_api( + filename, "", 0 + ); + cb_data.cb = cb_check_result_json; - cb_data.data = (void *)filename; + cb_data.data = (void *)k8s_server; ctx = test_ctx_create(&cb_data); if (!TEST_CHECK(ctx != NULL)) { @@ -330,10 +380,6 @@ void flb_test_events_v1_with_creationTimestamp() exit(EXIT_FAILURE); } - struct test_k8s_server_ctx* k8s_server = initialize_mock_k8s_api( - filename - ); - ret = flb_start(ctx->flb); TEST_CHECK(ret == 0); @@ -348,9 +394,50 @@ void flb_test_events_v1_with_creationTimestamp() test_ctx_destroy(ctx); } +void flb_test_events_with_chunkedrecv() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + + int ret; + int num; + const char *filename = "eventlist_v1_with_lastTimestamp"; + const char *stream_filename = "watch_v1_with_lastTimestamp"; + + clear_output_num(); + + struct test_k8s_server_ctx* k8s_server = initialize_mock_k8s_api( + filename, stream_filename, 1000 + ); + + cb_data.cb = cb_check_result_json; + cb_data.data = (void *)k8s_server; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + // waiting to flush + flb_time_msleep(5000); + + num = get_output_num(); + if (!TEST_CHECK(num == 2)) { + TEST_MSG("2 output records are expected"); + } + + mock_k8s_api_destroy(k8s_server); + test_ctx_destroy(ctx); +} + TEST_LIST = { {"events_v1_with_lastTimestamp", flb_test_events_v1_with_lastTimestamp}, {"events_v1_with_creationTimestamp", flb_test_events_v1_with_creationTimestamp}, + {"events_v1_with_chunkedrecv", flb_test_events_with_chunkedrecv}, {NULL, NULL} };