Skip to content

Commit

Permalink
in_kubernetes_events: add chunked streaming test
Browse files Browse the repository at this point in the history
Signed-off-by: ryanohnemus <[email protected]>
  • Loading branch information
ryanohnemus authored and edsiper committed Aug 2, 2024
1 parent 2929a3d commit bc07686
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type":"MODIFIED","object":{"kind":"Event","apiVersion":"v1","metadata":{"name":"fluent-bit-78945dccd8-hvr55.17e75f85e7d9e678","namespace":"default","uid":"b7cb03e8-0e0b-4e02-971d-24807f563d43","resourceVersion":"177158","creationTimestamp":"2024-07-31T18:26:51Z","managedFields":[{"manager":"kubelet","operation":"Update","apiVersion":"v1","time":"2024-07-31T18:47:15Z","fieldsType":"FieldsV1","fieldsV1":{"f:count":{},"f:firstTimestamp":{},"f:involvedObject":{},"f:lastTimestamp":{},"f:message":{},"f:reason":{},"f:reportingComponent":{},"f:reportingInstance":{},"f:source":{"f:component":{},"f:host":{}},"f:type":{}}}]},"involvedObject":{"kind":"Pod","namespace":"default","name":"fluent-bit-78945dccd8-hvr55","uid":"d5cd8257-e28a-4e64-8b29-6358309d7196","apiVersion":"v1","resourceVersion":"177159"},"reason":"FailedMount","message":"MountVolume.SetUp failed for volume \"config-volume\" : configmap \"fluent-bit-config\" not found","source":{"component":"kubelet","host":"minikube"},"firstTimestamp":"2024-07-31T18:26:51Z","lastTimestamp":"2024-07-31T18:47:15Z","count":16,"type":"Warning","eventTime":null,"reportingComponent":"kubelet","reportingInstance":"minikube"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[1722451635.000000,{"kind":"Event","apiVersion":"v1","metadata":{"name":"fluent-bit-78945dccd8-hvr55.17e75f85e7d9e678","namespace":"default","uid":"b7cb03e8-0e0b-4e02-971d-24807f563d43","resourceVersion":"177158","creationTimestamp":"2024-07-31T18:26:51Z","managedFields":[{"manager":"kubelet","operation":"Update","apiVersion":"v1","time":"2024-07-31T18:47:15Z","fieldsType":"FieldsV1","fieldsV1":{"f:count":{},"f:firstTimestamp":{},"f:involvedObject":{},"f:lastTimestamp":{},"f:message":{},"f:reason":{},"f:reportingComponent":{},"f:reportingInstance":{},"f:source":{"f:component":{},"f:host":{}},"f:type":{}}}]},"involvedObject":{"kind":"Pod","namespace":"default","name":"fluent-bit-78945dccd8-hvr55","uid":"d5cd8257-e28a-4e64-8b29-6358309d7196","apiVersion":"v1","resourceVersion":"177159"},"reason":"FailedMount","message":"MountVolume.SetUp failed for volume \"config-volume\" : configmap \"fluent-bit-config\" not found","source":{"component":"kubelet","host":"minikube"},"firstTimestamp":"2024-07-31T18:26:51Z","lastTimestamp":"2024-07-31T18:47:15Z","count":16,"type":"Warning","eventTime":null,"reportingComponent":"kubelet","reportingInstance":"minikube"}]
140 changes: 114 additions & 26 deletions tests/runtime/in_kubernetes_events.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ struct test_k8s_server_ctx {
int mq_id; /* Message Queue ID */
struct mk_event_loop *evl;
char json_input_file[1024];
char json_input_file_to_stream[1024];
int chunk_size; /* send messages in http chunks of this size, if 0 send all data */
};


Expand Down Expand Up @@ -112,61 +114,97 @@ static flb_sds_t read_file(const char *filename)
/* Callback to check expected results */
static int cb_check_result_json(void *record, size_t size, void *data)
{
char *p;
char *p = NULL;
flb_sds_t expected;
char *result;
int num = get_output_num();
const char *filename;
char full_filename[1024];
flb_sds_t filename = NULL;

set_output_num(num+1);

filename = (const char *) data;
result = (char *) record;
/* Note this is probably confusing, but we expected 1 record from the event list, and 1 record from the stream
* these should be output num 0 and num 1 respectively, each json file should have a corresponding .out
*/
struct test_k8s_server_ctx *k8s_server = data;
if (num == 0) {
filename = flb_sds_create_len(k8s_server->json_input_file, strlen(k8s_server->json_input_file)-5); //remove .json
} else {
filename = flb_sds_create_len(k8s_server->json_input_file_to_stream, strlen(k8s_server->json_input_file_to_stream)-5);
}
filename = flb_sds_cat(filename, ".out", 4);

sprintf(full_filename, "%s/%s.out", IN_KUBERNETES_EVENTS_DATA_PATH, filename);
expected = read_file(full_filename);
result = (char *) record;
expected = read_file(filename);

p = strstr(result, expected);
TEST_CHECK(p != NULL);

if (p == NULL) {
flb_error("Expected to find: '%s' in result '%s'",
flb_error("Expected to find: '%s' \nin result '%s'",
expected, result);
}

flb_free(record);
if (expected) {
flb_sds_destroy(expected);
}
if (filename) {
flb_sds_destroy(filename);
}
return 0;
}

static void cb_root(mk_request_t *request, void *data)
{
flb_sds_t payload;
struct test_k8s_server_ctx *server = data;
payload = read_file(server->json_input_file);

if (request->query_string.data && strstr(request->query_string.data, "watch=1") != NULL) {
// NOTE/TODO: stream via watch not currently supported, this should become 200 status
// and chunked response when we do support it
mk_http_status(request, 500);
mk_http_done(request);
mk_http_status(request, 200);
mk_http_header(request, "Content-Type", 12, JSON_CONTENT_TYPE, 16);

if(strlen(server->json_input_file_to_stream) > 0) {
payload = read_file(server->json_input_file_to_stream);
char* start = payload;
int maxSize = server->chunk_size;
int totalSent = 0;
while(totalSent < strlen(payload)) {
if(strlen(start) < server->chunk_size) {
maxSize = strlen(start);
}
mk_http_send(request, start, maxSize, NULL);

start += maxSize;
totalSent += maxSize;

flb_time_msleep(300);
}

// ensure we send an end of json delimeter so kubernetes_events plugin knows there's a json message
if(payload[strlen(payload)-1] != '\n') {
mk_http_send(request, "\n", 1, NULL);
}
flb_sds_destroy(payload);
}

mk_http_done(request); /* this will end the connection and send the end-chunk */
}
else {
payload = read_file(server->json_input_file);

/* we don't use chunk_size in the non-streamed requests, but we could */
mk_http_status(request, 200);
mk_http_header(request, "Content-Type", 12, JSON_CONTENT_TYPE, 16);
mk_http_send(request, payload, strlen(payload), NULL);
mk_http_done(request);
}

if (payload) {
flb_sds_destroy(payload);
}

}

struct test_k8s_server_ctx *initialize_mock_k8s_api(const char* filename)
struct test_k8s_server_ctx *initialize_mock_k8s_api(const char* filename,
const char* stream_filename, int chunk_size)
{
int vid;
char tmp[32];
Expand All @@ -181,6 +219,15 @@ struct test_k8s_server_ctx *initialize_mock_k8s_api(const char* filename)
sprintf(server->json_input_file, "%s/%s.json",
IN_KUBERNETES_EVENTS_DATA_PATH, filename);

/* setup info for streamed events, if any */
server->chunk_size = chunk_size;
if (strlen(stream_filename) > 0) {
sprintf(server->json_input_file_to_stream, "%s/%s.json",
IN_KUBERNETES_EVENTS_DATA_PATH, stream_filename);
} else {
memset(server->json_input_file_to_stream, 0, sizeof(server->json_input_file_to_stream));
}

/* Create HTTP server context */
server->ctx = mk_create();
if (!server->ctx) {
Expand Down Expand Up @@ -284,19 +331,19 @@ void flb_test_events_v1_with_lastTimestamp()

clear_output_num();

struct test_k8s_server_ctx* k8s_server = initialize_mock_k8s_api(
filename, "", 0
);

cb_data.cb = cb_check_result_json;
cb_data.data = (void *)filename;
cb_data.data = (void *)k8s_server;

ctx = test_ctx_create(&cb_data);
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
exit(EXIT_FAILURE);
}

struct test_k8s_server_ctx* k8s_server = initialize_mock_k8s_api(
filename
);

ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

Expand All @@ -321,19 +368,19 @@ void flb_test_events_v1_with_creationTimestamp()

clear_output_num();

struct test_k8s_server_ctx* k8s_server = initialize_mock_k8s_api(
filename, "", 0
);

cb_data.cb = cb_check_result_json;
cb_data.data = (void *)filename;
cb_data.data = (void *)k8s_server;

ctx = test_ctx_create(&cb_data);
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
exit(EXIT_FAILURE);
}

struct test_k8s_server_ctx* k8s_server = initialize_mock_k8s_api(
filename
);

ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

Expand All @@ -348,9 +395,50 @@ void flb_test_events_v1_with_creationTimestamp()
test_ctx_destroy(ctx);
}

void flb_test_events_with_chunkedrecv()
{
struct flb_lib_out_cb cb_data;
struct test_ctx *ctx;

int ret;
int num;
const char *filename = "eventlist_v1_with_lastTimestamp";
const char *stream_filename = "watch_v1_with_lastTimestamp";

clear_output_num();

struct test_k8s_server_ctx* k8s_server = initialize_mock_k8s_api(
filename, stream_filename, 1000
);

cb_data.cb = cb_check_result_json;
cb_data.data = (void *)k8s_server;

ctx = test_ctx_create(&cb_data);
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
exit(EXIT_FAILURE);
}

ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

// waiting to flush
flb_time_msleep(5000);

num = get_output_num();
if (!TEST_CHECK(num >= 2)) {
TEST_MSG("2 output records are expected found %d", num);
}

mock_k8s_api_destroy(k8s_server);
test_ctx_destroy(ctx);
}

TEST_LIST = {
{"events_v1_with_lastTimestamp", flb_test_events_v1_with_lastTimestamp},
{"events_v1_with_creationTimestamp", flb_test_events_v1_with_creationTimestamp},
//{"events_v1_with_chunkedrecv", flb_test_events_with_chunkedrecv},
{NULL, NULL}
};

0 comments on commit bc07686

Please sign in to comment.