diff --git a/tests/internal/input_chunk.c b/tests/internal/input_chunk.c index 7154b324baf..2e2b5d04bef 100644 --- a/tests/internal/input_chunk.c +++ b/tests/internal/input_chunk.c @@ -463,7 +463,7 @@ void flb_test_input_chunk_fs_chunks_size_real() flb_input_chunk_append_raw(i_ins, FLB_INPUT_LOGS, 256, "dummy", 4, (void *)buf, 256); msgpack_sbuffer_destroy(&mp_sbuf); - /* clean up test chunks */ + /* Check each test chunk for size discrepancy */ mk_list_foreach_safe(head, tmp, &i_ins->chunks) { ic = mk_list_entry(head, struct flb_input_chunk, _head); if (cio_chunk_get_real_size(ic->chunk) != cio_chunk_get_content_size(ic->chunk)) { @@ -506,11 +506,99 @@ void flb_test_input_chunk_fs_chunks_size_real() flb_config_exit(cfg); } +/* This tests uses the subsystems of the engine directly + * to avoid threading issues when submitting chunks. + */ +void flb_test_input_chunk_correct_total_records(void) +{ + int records; + struct flb_input_instance *i_ins; + struct flb_output_instance *o_ins; + struct mk_list *tmp; + struct mk_list *head; + struct flb_input_chunk *ic; + struct flb_task *task; + struct flb_config *cfg; + struct cio_ctx *cio; + msgpack_sbuffer mp_sbuf; + char buf[262144]; + struct mk_event_loop *evl; + struct cio_options opts = {0}; + + flb_init_env(); + cfg = flb_config_init(); + evl = mk_event_loop_create(256); + + TEST_CHECK(evl != NULL); + cfg->evl = evl; + + flb_log_create(cfg, FLB_LOG_STDERR, FLB_LOG_DEBUG, NULL); + + i_ins = flb_input_new(cfg, "dummy", NULL, FLB_TRUE); + i_ins->storage_type = CIO_STORE_FS; + + cio_options_init(&opts); + + opts.root_path = "/tmp/input-chunk-fs_chunks-size_real"; + opts.log_cb = log_cb; + opts.log_level = CIO_LOG_DEBUG; + opts.flags = CIO_OPEN; + + cio = cio_create(&opts); + flb_storage_input_create(cio, i_ins); + flb_input_init_all(cfg); + + o_ins = flb_output_new(cfg, "http", NULL, FLB_TRUE); + // not the right way to do this + o_ins->id = 1; + TEST_CHECK_(o_ins != NULL, "unable to instance output"); + flb_output_set_property(o_ins, "match", "*"); + flb_output_set_property(o_ins, "storage.total_limit_size", "1M"); + + TEST_CHECK_((flb_router_io_set(cfg) != -1), "unable to router"); + + /* fill up the chunk ... */ + memset((void *)buf, 0x41, sizeof(buf)); + msgpack_sbuffer_init(&mp_sbuf); + gen_buf(&mp_sbuf, buf, sizeof(buf)); + + records = flb_mp_count(buf, sizeof(buf)); + flb_input_chunk_append_raw(i_ins, FLB_INPUT_LOGS, records, "dummy", 4, (void *)buf, sizeof(buf)); + msgpack_sbuffer_destroy(&mp_sbuf); + + /* Check each chunk's total records */ + mk_list_foreach_safe(head, tmp, &i_ins->chunks) { + ic = mk_list_entry(head, struct flb_input_chunk, _head); + TEST_CHECK_(ic->total_records > 0, "found input chunk with 0 total records"); + } + + /* FORCE clean up test tasks*/ + mk_list_foreach_safe(head, tmp, &i_ins->tasks) { + task = mk_list_entry(head, struct flb_task, _head); + flb_info("[task] cleanup test task"); + flb_task_destroy(task, FLB_TRUE); + } + + /* clean up test chunks */ + mk_list_foreach_safe(head, tmp, &i_ins->chunks) { + ic = mk_list_entry(head, struct flb_input_chunk, _head); + flb_input_chunk_destroy(ic, FLB_TRUE); + } + + cio_destroy(cio); + flb_router_exit(cfg); + flb_input_exit_all(cfg); + flb_output_exit(cfg); + flb_config_exit(cfg); +} + + /* Test list */ TEST_LIST = { {"input_chunk_exceed_limit", flb_test_input_chunk_exceed_limit}, {"input_chunk_buffer_valid", flb_test_input_chunk_buffer_valid}, {"input_chunk_dropping_chunks", flb_test_input_chunk_dropping_chunks}, {"input_chunk_fs_chunk_size_real", flb_test_input_chunk_fs_chunks_size_real}, + {"input_chunk_correct_total_records", flb_test_input_chunk_correct_total_records}, {NULL, NULL} };