Skip to content

Commit

Permalink
input_chunk: add total_records test
Browse files Browse the repository at this point in the history
Adds a test that checks whether the input chunk that was created during
the test has a `total_records` value set.

Signed-off-by: braydonk <[email protected]>
  • Loading branch information
braydonk authored and edsiper committed Nov 28, 2023
1 parent 949a0b1 commit 81fe698
Showing 1 changed file with 89 additions and 1 deletion.
90 changes: 89 additions & 1 deletion tests/internal/input_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ void flb_test_input_chunk_fs_chunks_size_real()
flb_input_chunk_append_raw(i_ins, FLB_INPUT_LOGS, 256, "dummy", 4, (void *)buf, 256);
msgpack_sbuffer_destroy(&mp_sbuf);

/* clean up test chunks */
/* Check each test chunk for size discrepancy */
mk_list_foreach_safe(head, tmp, &i_ins->chunks) {
ic = mk_list_entry(head, struct flb_input_chunk, _head);
if (cio_chunk_get_real_size(ic->chunk) != cio_chunk_get_content_size(ic->chunk)) {
Expand Down Expand Up @@ -506,11 +506,99 @@ void flb_test_input_chunk_fs_chunks_size_real()
flb_config_exit(cfg);
}

/* This tests uses the subsystems of the engine directly
* to avoid threading issues when submitting chunks.
*/
void flb_test_input_chunk_correct_total_records(void)
{
int records;
struct flb_input_instance *i_ins;
struct flb_output_instance *o_ins;
struct mk_list *tmp;
struct mk_list *head;
struct flb_input_chunk *ic;
struct flb_task *task;
struct flb_config *cfg;
struct cio_ctx *cio;
msgpack_sbuffer mp_sbuf;
char buf[262144];
struct mk_event_loop *evl;
struct cio_options opts = {0};

flb_init_env();
cfg = flb_config_init();
evl = mk_event_loop_create(256);

TEST_CHECK(evl != NULL);
cfg->evl = evl;

flb_log_create(cfg, FLB_LOG_STDERR, FLB_LOG_DEBUG, NULL);

i_ins = flb_input_new(cfg, "dummy", NULL, FLB_TRUE);
i_ins->storage_type = CIO_STORE_FS;

cio_options_init(&opts);

opts.root_path = "/tmp/input-chunk-fs_chunks-size_real";
opts.log_cb = log_cb;
opts.log_level = CIO_LOG_DEBUG;
opts.flags = CIO_OPEN;

cio = cio_create(&opts);
flb_storage_input_create(cio, i_ins);
flb_input_init_all(cfg);

o_ins = flb_output_new(cfg, "http", NULL, FLB_TRUE);
// not the right way to do this
o_ins->id = 1;
TEST_CHECK_(o_ins != NULL, "unable to instance output");
flb_output_set_property(o_ins, "match", "*");
flb_output_set_property(o_ins, "storage.total_limit_size", "1M");

TEST_CHECK_((flb_router_io_set(cfg) != -1), "unable to router");

/* fill up the chunk ... */
memset((void *)buf, 0x41, sizeof(buf));
msgpack_sbuffer_init(&mp_sbuf);
gen_buf(&mp_sbuf, buf, sizeof(buf));

records = flb_mp_count(buf, sizeof(buf));
flb_input_chunk_append_raw(i_ins, FLB_INPUT_LOGS, records, "dummy", 4, (void *)buf, sizeof(buf));
msgpack_sbuffer_destroy(&mp_sbuf);

/* Check each chunk's total records */
mk_list_foreach_safe(head, tmp, &i_ins->chunks) {
ic = mk_list_entry(head, struct flb_input_chunk, _head);
TEST_CHECK_(ic->total_records > 0, "found input chunk with 0 total records");
}

/* FORCE clean up test tasks*/
mk_list_foreach_safe(head, tmp, &i_ins->tasks) {
task = mk_list_entry(head, struct flb_task, _head);
flb_info("[task] cleanup test task");
flb_task_destroy(task, FLB_TRUE);
}

/* clean up test chunks */
mk_list_foreach_safe(head, tmp, &i_ins->chunks) {
ic = mk_list_entry(head, struct flb_input_chunk, _head);
flb_input_chunk_destroy(ic, FLB_TRUE);
}

cio_destroy(cio);
flb_router_exit(cfg);
flb_input_exit_all(cfg);
flb_output_exit(cfg);
flb_config_exit(cfg);
}


/* Test list */
TEST_LIST = {
{"input_chunk_exceed_limit", flb_test_input_chunk_exceed_limit},
{"input_chunk_buffer_valid", flb_test_input_chunk_buffer_valid},
{"input_chunk_dropping_chunks", flb_test_input_chunk_dropping_chunks},
{"input_chunk_fs_chunk_size_real", flb_test_input_chunk_fs_chunks_size_real},
{"input_chunk_correct_total_records", flb_test_input_chunk_correct_total_records},
{NULL, NULL}
};

0 comments on commit 81fe698

Please sign in to comment.