Skip to content

Commit

Permalink
tests: internal: sp: add test case for conv_from_str_to_num
Browse files Browse the repository at this point in the history
Signed-off-by: Takahiro Yamashita <[email protected]>
  • Loading branch information
nokute78 authored and edsiper committed Sep 23, 2023
1 parent 3325727 commit 8f408d9
Showing 1 changed file with 136 additions and 3 deletions.
139 changes: 136 additions & 3 deletions tests/internal/stream_processor.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_error.h>
#include <fluent-bit/flb_router.h>
#include <fluent-bit/flb_storage.h>
#include <fluent-bit/stream_processor/flb_sp.h>
#include <fluent-bit/stream_processor/flb_sp_parser.h>
#include <fluent-bit/stream_processor/flb_sp_stream.h>
#include <fluent-bit/stream_processor/flb_sp_window.h>
#include <msgpack.h>

#include "flb_tests_internal.h"
#include "include/sp_invalid_queries.h"
Expand Down Expand Up @@ -107,7 +109,7 @@ int flb_sp_do_test(struct flb_sp *sp, struct flb_sp_task *task,
if (task->aggregate_keys == FLB_TRUE) {
ret = sp_process_data_aggr(data_buf->buffer, data_buf->size,
tag, tag_len,
task, sp);
task, sp, FLB_TRUE);
if (ret == -1) {
flb_error("[sp] error error processing records for '%s'",
task->name);
Expand All @@ -119,8 +121,7 @@ int flb_sp_do_test(struct flb_sp *sp, struct flb_sp_task *task,
task->name);
return -1;
}

if (task->window.type == FLB_SP_WINDOW_DEFAULT) {
if (task->window.type == FLB_SP_WINDOW_DEFAULT || task->window.type == FLB_SP_WINDOW_TUMBLING) {
package_results(tag, tag_len, &out_buf->buffer, &out_buf->size, task);
}

Expand Down Expand Up @@ -751,11 +752,143 @@ static void test_snapshot()
#endif
}

static void test_conv_from_str_to_num()
{
struct flb_config *config = NULL;
struct flb_sp *sp = NULL;
struct flb_sp_task *task = NULL;
struct sp_buffer out_buf;
struct sp_buffer data_buf;
msgpack_sbuffer sbuf;
msgpack_packer pck;
msgpack_unpacked result;
size_t off = 0;
char json[4096] = {0};
int ret;

#ifdef _WIN32
WSADATA wsa_data;

WSAStartup(0x0201, &wsa_data);
#endif
out_buf.buffer = NULL;

config = flb_config_init();
config->evl = mk_event_loop_create(256);

ret = flb_storage_create(config);
if (!TEST_CHECK(ret == 0)) {
TEST_MSG("flb_storage_create failed");
flb_config_exit(config);
return;
}

sp = flb_sp_create(config);
if (!TEST_CHECK(sp != NULL)) {
TEST_MSG("[sp test] cannot create stream processor context");
goto test_conv_from_str_to_num_end;
}

task = flb_sp_task_create(sp, "tail.0", "CREATE STREAM test WITH (tag=\'test\') AS SELECT word, num, COUNT(*) FROM STREAM:tail.0 WINDOW TUMBLING (1 SECOND) GROUP BY word, num;");
if (!TEST_CHECK(task != NULL)) {
TEST_MSG("[sp test] wrong check 'conv', fix it!");
goto test_conv_from_str_to_num_end;
}

/* Create input data */
msgpack_sbuffer_init(&sbuf);
msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write);
msgpack_pack_array(&pck, 2);
flb_pack_time_now(&pck);
msgpack_pack_map(&pck, 2);

msgpack_pack_str(&pck, 4);
msgpack_pack_str_body(&pck, "word", 4);
msgpack_pack_str(&pck, 4);
msgpack_pack_str_body(&pck, "hoge", 4);

msgpack_pack_str(&pck, 3);
msgpack_pack_str_body(&pck, "num", 3);
msgpack_pack_str(&pck, 6);
msgpack_pack_str_body(&pck, "123456", 6);

data_buf.buffer = sbuf.data;
data_buf.size = sbuf.size;

out_buf.buffer = NULL;
out_buf.size = 0;

/* Exec stream processor */
ret = flb_sp_do_test(sp, task, "tail.0", strlen("tail.0"), &data_buf, &out_buf);
if (!TEST_CHECK(ret == 0)) {
TEST_MSG("flb_sp_do_test failed");
msgpack_sbuffer_destroy(&sbuf);
goto test_conv_from_str_to_num_end;
}

if (!TEST_CHECK(out_buf.size > 0)) {
TEST_MSG("out_buf size is 0");
msgpack_sbuffer_destroy(&sbuf);
goto test_conv_from_str_to_num_end;
}


/* Check output buffer. It should contain a number 123456 not a string "123456" */

msgpack_unpacked_init(&result);
ret = msgpack_unpack_next(&result, out_buf.buffer, out_buf.size, &off);
if (!TEST_CHECK(ret == MSGPACK_UNPACK_SUCCESS)) {
TEST_MSG("failed to unpack ret=%d", ret);
msgpack_unpacked_destroy(&result);
msgpack_sbuffer_destroy(&sbuf);
goto test_conv_from_str_to_num_end;
}

ret = flb_msgpack_to_json(&json[0], sizeof(json), &result.data);
if (!TEST_CHECK(ret > 0)) {
TEST_MSG("flb_msgpack_to_json failed");
msgpack_unpacked_destroy(&result);
msgpack_sbuffer_destroy(&sbuf);
goto test_conv_from_str_to_num_end;
}

if (!TEST_CHECK(strstr(json,"123456") != NULL)) {
TEST_MSG("number not found");
msgpack_unpacked_destroy(&result);
msgpack_sbuffer_destroy(&sbuf);
goto test_conv_from_str_to_num_end;
}
if (!TEST_CHECK(strstr(json,"\"123456\"") == NULL)) {
TEST_MSG("output should be number type");
msgpack_unpacked_destroy(&result);
msgpack_sbuffer_destroy(&sbuf);
goto test_conv_from_str_to_num_end;
}

msgpack_unpacked_destroy(&result);
msgpack_sbuffer_destroy(&sbuf);

test_conv_from_str_to_num_end:
if (out_buf.buffer != NULL) {
flb_free(out_buf.buffer);
}

#ifdef _WIN32
WSACleanup();
#endif
if (sp != NULL) {
flb_sp_destroy(sp);
}
flb_storage_destroy(config);
flb_config_exit(config);
}

TEST_LIST = {
{ "invalid_queries", invalid_queries},
{ "select_keys", test_select_keys},
{ "select_subkeys", test_select_subkeys},
{ "window", test_window},
{ "snapshot", test_snapshot},
{ "conv_from_str_to_num", test_conv_from_str_to_num},
{ NULL }
};

0 comments on commit 8f408d9

Please sign in to comment.