diff --git a/plugins/filter_multiline/ml.c b/plugins/filter_multiline/ml.c index 41b1b8a4d64..ced8ec83739 100644 --- a/plugins/filter_multiline/ml.c +++ b/plugins/filter_multiline/ml.c @@ -176,7 +176,7 @@ static int flush_callback(struct flb_ml_parser *parser, /* Emit record with original tag */ flb_plg_trace(ctx->ins, "emitting from %s to %s", stream->input_name, stream->tag); ret = in_emitter_add_record(stream->tag, flb_sds_len(stream->tag), buf_data, buf_size, - ctx->ins_emitter); + ctx->ins_emitter, ctx->i_ins); return ret; } @@ -526,7 +526,8 @@ static void partial_timer_cb(struct flb_config *config, void *data) ret = in_emitter_add_record(packer->tag, flb_sds_len(packer->tag), packer->log_encoder.output_buffer, packer->log_encoder.output_length, - ctx->ins_emitter); + ctx->ins_emitter, + ctx->i_ins); if (ret < 0) { /* this shouldn't happen in normal execution */ flb_plg_warn(ctx->ins, @@ -741,6 +742,15 @@ static int cb_ml_filter(const void *data, size_t bytes, return FLB_FILTER_NOTOUCH; } + if (ctx->i_ins == NULL){ + ctx->i_ins = i_ins; + } + if (ctx->i_ins != i_ins) { + flb_plg_trace(ctx->ins, "input instance changed from %s to %s", + ctx->i_ins->name, i_ins->name); + ctx->i_ins = i_ins; + } + /* 'partial_message' mode */ if (ctx->partial_mode == FLB_TRUE) { return ml_filter_partial(data, bytes, tag, tag_len, diff --git a/plugins/filter_multiline/ml.h b/plugins/filter_multiline/ml.h index 59bf6c7e826..cae8fb64166 100644 --- a/plugins/filter_multiline/ml.h +++ b/plugins/filter_multiline/ml.h @@ -73,6 +73,7 @@ struct ml_ctx { size_t emitter_mem_buf_limit; /* Emitter buffer limit */ struct flb_input_instance *ins_emitter; /* emitter input plugin instance */ struct flb_config *config; /* Fluent Bit context */ + struct flb_input_instance *i_ins; /* Fluent Bit input instance (last used)*/ #ifdef FLB_HAVE_METRICS struct cmt_counter *cmt_emitted; @@ -82,6 +83,7 @@ struct ml_ctx { /* Register external function to emit records, check 'plugins/in_emitter' */ int in_emitter_add_record(const char *tag, int tag_len, const char *buf_data, size_t buf_size, - struct flb_input_instance *in); + struct flb_input_instance *in, + struct flb_input_instance *i_ins); #endif diff --git a/plugins/filter_rewrite_tag/rewrite_tag.c b/plugins/filter_rewrite_tag/rewrite_tag.c index 01b0f168fe2..c8bfe029350 100644 --- a/plugins/filter_rewrite_tag/rewrite_tag.c +++ b/plugins/filter_rewrite_tag/rewrite_tag.c @@ -355,7 +355,8 @@ static int ingest_inline(struct flb_rewrite_tag *ctx, */ static int process_record(const char *tag, int tag_len, msgpack_object map, const void *buf, size_t buf_size, int *keep, - struct flb_rewrite_tag *ctx, int *matched) + struct flb_rewrite_tag *ctx, int *matched, + struct flb_input_instance *i_ins) { int ret; flb_sds_t out_tag; @@ -404,7 +405,7 @@ static int process_record(const char *tag, int tag_len, msgpack_object map, if (!ret) { /* Emit record with new tag */ ret = in_emitter_add_record(out_tag, flb_sds_len(out_tag), buf, buf_size, - ctx->ins_emitter); + ctx->ins_emitter, i_ins); } else { ret = 0; @@ -489,7 +490,7 @@ static int cb_rewrite_tag_filter(const void *data, size_t bytes, * If a record was emitted, the variable 'keep' will define if the record must * be preserved or not. */ - is_emitted = process_record(tag, tag_len, map, (char *) data + pre, off - pre, &keep, ctx, &is_matched); + is_emitted = process_record(tag, tag_len, map, (char *) data + pre, off - pre, &keep, ctx, &is_matched, i_ins); if (is_emitted == FLB_TRUE) { /* A record with the new tag was emitted */ emitted_num++; diff --git a/plugins/filter_rewrite_tag/rewrite_tag.h b/plugins/filter_rewrite_tag/rewrite_tag.h index 11c0535fde1..d73b49f12eb 100644 --- a/plugins/filter_rewrite_tag/rewrite_tag.h +++ b/plugins/filter_rewrite_tag/rewrite_tag.h @@ -57,7 +57,8 @@ struct flb_rewrite_tag { /* Register external function to emit records, check 'plugins/in_emitter' */ int in_emitter_add_record(const char *tag, int tag_len, const char *buf_data, size_t buf_size, - struct flb_input_instance *in); + struct flb_input_instance *in, + struct flb_input_instance *i_ins); int in_emitter_get_collector_id(struct flb_input_instance *in); diff --git a/plugins/in_emitter/emitter.c b/plugins/in_emitter/emitter.c index 532a629b924..a474a1c9c3f 100644 --- a/plugins/in_emitter/emitter.c +++ b/plugins/in_emitter/emitter.c @@ -32,7 +32,7 @@ #define DEFAULT_EMITTER_RING_BUFFER_FLUSH_FREQUENCY 2000 /* return values */ -#define FLB_EMITTER_BUSY 3 +#define FLB_EMITTER_BUSY -2 struct em_chunk { flb_sds_t tag; @@ -41,12 +41,18 @@ struct em_chunk { struct mk_list _head; }; +struct input_ref { + struct flb_input_instance *i_ins; + struct mk_list _head; +}; + struct flb_emitter { int coll_fd; /* collector id */ struct mk_list chunks; /* list of all pending chunks */ struct flb_input_instance *ins; /* input instance */ struct flb_ring_buffer *msgs; /* ring buffer for cross-thread messages */ int ring_buffer_size; /* size of the ring buffer */ + struct mk_list i_ins_list; /* input instance */ }; struct em_chunk *em_chunk_create(const char *tag, int tag_len, @@ -89,6 +95,12 @@ int static do_in_emitter_add_record(struct em_chunk *ec, struct flb_emitter *ctx = (struct flb_emitter *) in->context; int ret; + if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) { + flb_plg_debug(ctx->ins, "_emitter %s paused. Not processing records.", + ctx->ins->name); + return FLB_EMITTER_BUSY; + } + /* Associate this backlog chunk to this instance into the engine */ ret = flb_input_log_append(in, ec->tag, flb_sds_len(ec->tag), @@ -111,16 +123,45 @@ int static do_in_emitter_add_record(struct em_chunk *ec, */ int in_emitter_add_record(const char *tag, int tag_len, const char *buf_data, size_t buf_size, - struct flb_input_instance *in) + struct flb_input_instance *in, + struct flb_input_instance *i_ins) { struct em_chunk temporary_chunk; struct mk_list *head; + struct input_ref *i_ref; + bool ref_found; + struct mk_list *tmp; + struct em_chunk *ec; struct flb_emitter *ctx; ctx = (struct flb_emitter *) in->context; ec = NULL; + ref_found = false; + mk_list_foreach_safe(head, tmp, &ctx->i_ins_list) { + i_ref = mk_list_entry(head, struct input_ref, _head); + if(i_ref->i_ins == i_ins){ + ref_found = true; + break; + } + } + if (!ref_found){ + i_ref = flb_malloc(sizeof(struct input_ref)); + if (!i_ref) { + flb_errno(); + return FLB_FILTER_NOTOUCH; + } + i_ref->i_ins = i_ins; + mk_list_add(&i_ref->_head, &ctx->i_ins_list); + /* If in_emitter is paused, but new input plugin is not paused, pause it */ + if (flb_input_buf_paused(ctx->ins) == FLB_TRUE && + flb_input_buf_paused(i_ins) == FLB_FALSE) { + flb_input_pause(i_ins); + } + } + + /* Restricted by mem_buf_limit */ if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) { flb_plg_debug(ctx->ins, "emitter memory buffer limit reached. Not accepting record."); @@ -268,6 +309,8 @@ static int cb_emitter_init(struct flb_input_instance *in, ctx->ins = in; mk_list_init(&ctx->chunks); + mk_list_init(&ctx->i_ins_list); + ret = flb_input_config_map_set(in, (void *) ctx); if (ret == -1) { @@ -312,13 +355,29 @@ static int cb_emitter_init(struct flb_input_instance *in, static void cb_emitter_pause(void *data, struct flb_config *config) { struct flb_emitter *ctx = data; + struct mk_list *tmp; + struct mk_list *head; + struct input_ref *i_ref; + flb_input_collector_pause(ctx->coll_fd, ctx->ins); + mk_list_foreach_safe(head, tmp, &ctx->i_ins_list) { + i_ref = mk_list_entry(head, struct input_ref, _head); + flb_input_pause(i_ref->i_ins); + } } static void cb_emitter_resume(void *data, struct flb_config *config) { struct flb_emitter *ctx = data; + struct mk_list *tmp; + struct mk_list *head; + struct input_ref *i_ref; + flb_input_collector_resume(ctx->coll_fd, ctx->ins); + mk_list_foreach_safe(head, tmp, &ctx->i_ins_list) { + i_ref = mk_list_entry(head, struct input_ref, _head); + flb_input_resume(i_ref->i_ins); + } } static int cb_emitter_exit(void *data, struct flb_config *config) @@ -328,6 +387,7 @@ static int cb_emitter_exit(void *data, struct flb_config *config) struct flb_emitter *ctx = data; struct em_chunk *echunk; struct em_chunk ec; + struct input_ref *i_ref; int ret; @@ -346,6 +406,13 @@ static int cb_emitter_exit(void *data, struct flb_config *config) flb_ring_buffer_destroy(ctx->msgs); } + mk_list_foreach_safe(head,tmp, &ctx->i_ins_list) { + i_ref = mk_list_entry(head, struct input_ref, _head); + mk_list_del(&i_ref->_head); + flb_free(i_ref); + } + + flb_free(ctx); return 0; } diff --git a/src/flb_input.c b/src/flb_input.c index a990a9d2805..7b614ccdb44 100644 --- a/src/flb_input.c +++ b/src/flb_input.c @@ -1729,6 +1729,7 @@ int flb_input_resume(struct flb_input_instance *ins) flb_input_thread_instance_resume(ins); } else { + flb_info("[input] resume %s", flb_input_name(ins)); ins->p->cb_resume(ins->context, ins->config); } } diff --git a/tests/runtime/filter_multiline.c b/tests/runtime/filter_multiline.c index 18253a5b2c7..b60e3bf2b0d 100644 --- a/tests/runtime/filter_multiline.c +++ b/tests/runtime/filter_multiline.c @@ -682,6 +682,98 @@ static void flb_test_ml_buffered_16_streams() filter_test_destroy(ctx); } +/* This test will test the pausing of in_emitter */ +static void flb_test_ml_buffered_16_streams_pausing() +{ + struct flb_lib_out_cb cb_data; + struct filter_test *ctx; + int i_ffds[16] = {0}; + int ffd_num = sizeof(i_ffds)/sizeof(int); + int ret; + int i; + int j; + int bytes; + int len; + char line_buf[2048] = {0}; + char tag_buf[32] = {0}; + int line_num; + int num; + + char *expected_strs[] = {"Exception in thread main java.lang.IllegalStateException: ..null property\\n at com.example.myproject.Author.getBookIds(xx.java:38)\\n at com.example.myproject.Bootstrap.main(Bootstrap.java:14)\\nCaused by: java.lang.NullPointerException\\n at com.example.myproject.Book.getId(Book.java:22)\\n at com.example.myproject.Author.getBookIds(Author.java:35)\\n ... 1 more"}; + + struct str_list expected = { + .size = sizeof(expected_strs)/sizeof(char*), + .lists = &expected_strs[0], + .ignore_min_line_num = 64, + }; + + char *ml_logs[] = {"Exception in thread main java.lang.IllegalStateException: ..null property", + " at com.example.myproject.Author.getBookIds(xx.java:38)", + " at com.example.myproject.Bootstrap.main(Bootstrap.java:14)", + "Caused by: java.lang.NullPointerException", + " at com.example.myproject.Book.getId(Book.java:22)", + " at com.example.myproject.Author.getBookIds(Author.java:35)", + " ... 1 more", + "single line"}; + + cb_data.cb = cb_check_str_list; + cb_data.data = (void *)&expected; + + clear_output_num(); + + line_num = sizeof(ml_logs)/sizeof(char*); + + /* Create test context */ + ctx = filter_test_create((void *) &cb_data); + if (!ctx) { + exit(EXIT_FAILURE); + } + flb_service_set(ctx->flb, + "Flush", "0.200000000", + "Grace", "5", + NULL); + + i_ffds[0] = ctx->i_ffd; + for (i=1; iflb, (char *) "lib", NULL); + TEST_CHECK(i_ffds[i] >= 0); + sprintf(&tag_buf[0], "test%d", i); + flb_input_set(ctx->flb, i_ffds[i], "tag", tag_buf, NULL); + } + + /* Configure filter */ + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "multiline.key_content", "log", + "multiline.parser", "java", + "buffer", "on", + "debug_flush", "on", + "emitter_mem_buf_limit", "1k", + NULL); + TEST_CHECK(ret == 0); + + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + for (i=0; iflb, i_ffds[j], &line_buf[0], len); + TEST_CHECK(bytes == len); + } + } + sleep(3); + + num = get_output_num(); + if (!TEST_CHECK(num == ffd_num)) { + TEST_MSG("output error. got %d expect %d", num, ffd_num); + } + + filter_test_destroy(ctx); +} + @@ -695,5 +787,7 @@ TEST_LIST = { {"flb_test_multiline_partial_message_concat" , flb_test_multiline_partial_message_concat }, {"flb_test_multiline_partial_message_concat_two_ids" , flb_test_multiline_partial_message_concat_two_ids }, + + {"ml_buffered_16_streams_pausing" , flb_test_ml_buffered_16_streams_pausing }, {NULL, NULL} };