Skip to content

Commit

Permalink
in_emitter: Pause source input plugins on pause
Browse files Browse the repository at this point in the history
This commit will pause the inputs (sending to multiline
or rewrite_tag) to not loose any in-flight records.

Signed-off-by: Richard Treu <[email protected]>
  • Loading branch information
drbugfinder-work committed Mar 14, 2024
1 parent feb4243 commit b845530
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 11 deletions.
14 changes: 12 additions & 2 deletions plugins/filter_multiline/ml.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ static int flush_callback(struct flb_ml_parser *parser,
/* Emit record with original tag */
flb_plg_trace(ctx->ins, "emitting from %s to %s", stream->input_name, stream->tag);
ret = in_emitter_add_record(stream->tag, flb_sds_len(stream->tag), buf_data, buf_size,
ctx->ins_emitter);
ctx->ins_emitter, ctx->i_ins);

return ret;
}
Expand Down Expand Up @@ -526,7 +526,8 @@ static void partial_timer_cb(struct flb_config *config, void *data)
ret = in_emitter_add_record(packer->tag, flb_sds_len(packer->tag),
packer->log_encoder.output_buffer,
packer->log_encoder.output_length,
ctx->ins_emitter);
ctx->ins_emitter,
ctx->i_ins);
if (ret < 0) {
/* this shouldn't happen in normal execution */
flb_plg_warn(ctx->ins,
Expand Down Expand Up @@ -741,6 +742,15 @@ static int cb_ml_filter(const void *data, size_t bytes,
return FLB_FILTER_NOTOUCH;
}

if (ctx->i_ins == NULL){
ctx->i_ins = i_ins;
}
if (ctx->i_ins != i_ins) {
flb_plg_trace(ctx->ins, "input instance changed from %s to %s",
ctx->i_ins->name, i_ins->name);
ctx->i_ins = i_ins;
}

/* 'partial_message' mode */
if (ctx->partial_mode == FLB_TRUE) {
return ml_filter_partial(data, bytes, tag, tag_len,
Expand Down
4 changes: 3 additions & 1 deletion plugins/filter_multiline/ml.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ struct ml_ctx {
size_t emitter_mem_buf_limit; /* Emitter buffer limit */
struct flb_input_instance *ins_emitter; /* emitter input plugin instance */
struct flb_config *config; /* Fluent Bit context */
struct flb_input_instance *i_ins; /* Fluent Bit input instance (last used)*/

#ifdef FLB_HAVE_METRICS
struct cmt_counter *cmt_emitted;
Expand All @@ -82,6 +83,7 @@ struct ml_ctx {
/* Register external function to emit records, check 'plugins/in_emitter' */
int in_emitter_add_record(const char *tag, int tag_len,
const char *buf_data, size_t buf_size,
struct flb_input_instance *in);
struct flb_input_instance *in,
struct flb_input_instance *i_ins);

#endif
7 changes: 4 additions & 3 deletions plugins/filter_rewrite_tag/rewrite_tag.c
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,8 @@ static int ingest_inline(struct flb_rewrite_tag *ctx,
*/
static int process_record(const char *tag, int tag_len, msgpack_object map,
const void *buf, size_t buf_size, int *keep,
struct flb_rewrite_tag *ctx, int *matched)
struct flb_rewrite_tag *ctx, int *matched,
struct flb_input_instance *i_ins)
{
int ret;
flb_sds_t out_tag;
Expand Down Expand Up @@ -404,7 +405,7 @@ static int process_record(const char *tag, int tag_len, msgpack_object map,
if (!ret) {
/* Emit record with new tag */
ret = in_emitter_add_record(out_tag, flb_sds_len(out_tag), buf, buf_size,
ctx->ins_emitter);
ctx->ins_emitter, i_ins);
}
else {
ret = 0;
Expand Down Expand Up @@ -489,7 +490,7 @@ static int cb_rewrite_tag_filter(const void *data, size_t bytes,
* If a record was emitted, the variable 'keep' will define if the record must
* be preserved or not.
*/
is_emitted = process_record(tag, tag_len, map, (char *) data + pre, off - pre, &keep, ctx, &is_matched);
is_emitted = process_record(tag, tag_len, map, (char *) data + pre, off - pre, &keep, ctx, &is_matched, i_ins);
if (is_emitted == FLB_TRUE) {
/* A record with the new tag was emitted */
emitted_num++;
Expand Down
3 changes: 2 additions & 1 deletion plugins/filter_rewrite_tag/rewrite_tag.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ struct flb_rewrite_tag {
/* Register external function to emit records, check 'plugins/in_emitter' */
int in_emitter_add_record(const char *tag, int tag_len,
const char *buf_data, size_t buf_size,
struct flb_input_instance *in);
struct flb_input_instance *in,
struct flb_input_instance *i_ins);
int in_emitter_get_collector_id(struct flb_input_instance *in);


Expand Down
77 changes: 73 additions & 4 deletions plugins/in_emitter/emitter.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
#define DEFAULT_EMITTER_RING_BUFFER_FLUSH_FREQUENCY 2000

/* return values */
#define FLB_EMITTER_BUSY 3
#define FLB_EMITTER_BUSY -2

struct em_chunk {
flb_sds_t tag;
Expand All @@ -41,12 +41,18 @@ struct em_chunk {
struct mk_list _head;
};

struct input_ref {
struct flb_input_instance *i_ins;
struct mk_list _head;
};

struct flb_emitter {
int coll_fd; /* collector id */
struct mk_list chunks; /* list of all pending chunks */
struct flb_input_instance *ins; /* input instance */
struct flb_ring_buffer *msgs; /* ring buffer for cross-thread messages */
int ring_buffer_size; /* size of the ring buffer */
struct mk_list i_ins_list; /* instance list of linked/sending inputs */
};

struct em_chunk *em_chunk_create(const char *tag, int tag_len,
Expand Down Expand Up @@ -89,6 +95,12 @@ int static do_in_emitter_add_record(struct em_chunk *ec,
struct flb_emitter *ctx = (struct flb_emitter *) in->context;
int ret;

if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) {
flb_plg_debug(ctx->ins, "_emitter %s paused. Not processing records.",
ctx->ins->name);
return FLB_EMITTER_BUSY;
}

/* Associate this backlog chunk to this instance into the engine */
ret = flb_input_log_append(in,
ec->tag, flb_sds_len(ec->tag),
Expand All @@ -111,15 +123,45 @@ int static do_in_emitter_add_record(struct em_chunk *ec,
*/
int in_emitter_add_record(const char *tag, int tag_len,
const char *buf_data, size_t buf_size,
struct flb_input_instance *in)
struct flb_input_instance *in,
struct flb_input_instance *i_ins)
{
struct em_chunk temporary_chunk;
struct mk_list *head;
struct input_ref *i_ref;
bool ref_found;
struct mk_list *tmp;

struct em_chunk *ec;
struct flb_emitter *ctx;

ctx = (struct flb_emitter *) in->context;
ec = NULL;
/* Iterate over list of already known (source) inputs */
/* If new, add it to the list to be able to pause it later on */
ref_found = false;
mk_list_foreach_safe(head, tmp, &ctx->i_ins_list) {
i_ref = mk_list_entry(head, struct input_ref, _head);
if(i_ref->i_ins == i_ins){
ref_found = true;
break;
}
}
if (!ref_found) {
i_ref = flb_malloc(sizeof(struct input_ref));
if (!i_ref) {
flb_errno();
return FLB_FILTER_NOTOUCH;
}
i_ref->i_ins = i_ins;
mk_list_add(&i_ref->_head, &ctx->i_ins_list);
/* If in_emitter is paused, but new input plugin is not paused, pause it */
if (flb_input_buf_paused(ctx->ins) == FLB_TRUE &&
flb_input_buf_paused(i_ins) == FLB_FALSE) {
flb_input_pause(i_ins);
}
}


/* Restricted by mem_buf_limit */
if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) {
Expand Down Expand Up @@ -268,6 +310,8 @@ static int cb_emitter_init(struct flb_input_instance *in,
ctx->ins = in;
mk_list_init(&ctx->chunks);

mk_list_init(&ctx->i_ins_list);


ret = flb_input_config_map_set(in, (void *) ctx);
if (ret == -1) {
Expand All @@ -294,7 +338,7 @@ static int cb_emitter_init(struct flb_input_instance *in,
}
}
else{
ret = flb_input_set_collector_time(in, cb_queue_chunks, 0, 50000000, config);
ret = flb_input_set_collector_time(in, cb_queue_chunks, 0, 25000000, config);
if (ret < 0) {
flb_error("[in_emitter] could not create collector");
flb_free(ctx);
Expand All @@ -312,13 +356,31 @@ static int cb_emitter_init(struct flb_input_instance *in,
static void cb_emitter_pause(void *data, struct flb_config *config)
{
struct flb_emitter *ctx = data;
struct mk_list *tmp;
struct mk_list *head;
struct input_ref *i_ref;

/* Pause all known senders */
flb_input_collector_pause(ctx->coll_fd, ctx->ins);
mk_list_foreach_safe(head, tmp, &ctx->i_ins_list) {
i_ref = mk_list_entry(head, struct input_ref, _head);
flb_input_pause(i_ref->i_ins);
}
}

static void cb_emitter_resume(void *data, struct flb_config *config)
{
struct flb_emitter *ctx = data;
struct mk_list *tmp;
struct mk_list *head;
struct input_ref *i_ref;

/* Resume all known senders */
flb_input_collector_resume(ctx->coll_fd, ctx->ins);
mk_list_foreach_safe(head, tmp, &ctx->i_ins_list) {
i_ref = mk_list_entry(head, struct input_ref, _head);
flb_input_resume(i_ref->i_ins);
}
}

static int cb_emitter_exit(void *data, struct flb_config *config)
Expand All @@ -328,9 +390,9 @@ static int cb_emitter_exit(void *data, struct flb_config *config)
struct flb_emitter *ctx = data;
struct em_chunk *echunk;
struct em_chunk ec;
struct input_ref *i_ref;
int ret;


mk_list_foreach_safe(head, tmp, &ctx->chunks) {
echunk = mk_list_entry(head, struct em_chunk, _head);
mk_list_del(&echunk->_head);
Expand All @@ -346,6 +408,13 @@ static int cb_emitter_exit(void *data, struct flb_config *config)
flb_ring_buffer_destroy(ctx->msgs);
}

mk_list_foreach_safe(head,tmp, &ctx->i_ins_list) {
i_ref = mk_list_entry(head, struct input_ref, _head);
mk_list_del(&i_ref->_head);
flb_free(i_ref);
}


flb_free(ctx);
return 0;
}
Expand Down
1 change: 1 addition & 0 deletions src/flb_input.c
Original file line number Diff line number Diff line change
Expand Up @@ -1729,6 +1729,7 @@ int flb_input_resume(struct flb_input_instance *ins)
flb_input_thread_instance_resume(ins);
}
else {
flb_info("[input] resume %s", flb_input_name(ins));
ins->p->cb_resume(ins->context, ins->config);
}
}
Expand Down
94 changes: 94 additions & 0 deletions tests/runtime/filter_multiline.c
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,98 @@ static void flb_test_ml_buffered_16_streams()
filter_test_destroy(ctx);
}

/* This test will test the pausing of in_emitter */
static void flb_test_ml_buffered_16_streams_pausing()
{
struct flb_lib_out_cb cb_data;
struct filter_test *ctx;
int i_ffds[16] = {0};
int ffd_num = sizeof(i_ffds)/sizeof(int);
int ret;
int i;
int j;
int bytes;
int len;
char line_buf[2048] = {0};
char tag_buf[32] = {0};
int line_num;
int num;

char *expected_strs[] = {"Exception in thread main java.lang.IllegalStateException: ..null property\\n at com.example.myproject.Author.getBookIds(xx.java:38)\\n at com.example.myproject.Bootstrap.main(Bootstrap.java:14)\\nCaused by: java.lang.NullPointerException\\n at com.example.myproject.Book.getId(Book.java:22)\\n at com.example.myproject.Author.getBookIds(Author.java:35)\\n ... 1 more"};

struct str_list expected = {
.size = sizeof(expected_strs)/sizeof(char*),
.lists = &expected_strs[0],
.ignore_min_line_num = 64,
};

char *ml_logs[] = {"Exception in thread main java.lang.IllegalStateException: ..null property",
" at com.example.myproject.Author.getBookIds(xx.java:38)",
" at com.example.myproject.Bootstrap.main(Bootstrap.java:14)",
"Caused by: java.lang.NullPointerException",
" at com.example.myproject.Book.getId(Book.java:22)",
" at com.example.myproject.Author.getBookIds(Author.java:35)",
" ... 1 more",
"single line"};

cb_data.cb = cb_check_str_list;
cb_data.data = (void *)&expected;

clear_output_num();

line_num = sizeof(ml_logs)/sizeof(char*);

/* Create test context */
ctx = filter_test_create((void *) &cb_data);
if (!ctx) {
exit(EXIT_FAILURE);
}
flb_service_set(ctx->flb,
"Flush", "0.200000000",
"Grace", "5",
NULL);

i_ffds[0] = ctx->i_ffd;
for (i=1; i<ffd_num; i++) {
i_ffds[i] = flb_input(ctx->flb, (char *) "lib", NULL);
TEST_CHECK(i_ffds[i] >= 0);
sprintf(&tag_buf[0], "test%d", i);
flb_input_set(ctx->flb, i_ffds[i], "tag", tag_buf, NULL);
}

/* Configure filter */
ret = flb_filter_set(ctx->flb, ctx->f_ffd,
"multiline.key_content", "log",
"multiline.parser", "java",
"buffer", "on",
"debug_flush", "on",
"emitter_mem_buf_limit", "1k",
NULL);
TEST_CHECK(ret == 0);


/* Start the engine */
ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

for (i=0; i<line_num; i++) {
sprintf(&line_buf[0], "[%d, {\"log\":\"%s\"}]", i, ml_logs[i]);
len = strlen(line_buf);
for (j=0; j<ffd_num; j++) {
bytes = flb_lib_push(ctx->flb, i_ffds[j], &line_buf[0], len);
TEST_CHECK(bytes == len);
}
}
sleep(5);

num = get_output_num();
if (!TEST_CHECK(num == ffd_num)) {
TEST_MSG("output error. got %d expect %d", num, ffd_num);
}

filter_test_destroy(ctx);
}




Expand All @@ -695,5 +787,7 @@ TEST_LIST = {

{"flb_test_multiline_partial_message_concat" , flb_test_multiline_partial_message_concat },
{"flb_test_multiline_partial_message_concat_two_ids" , flb_test_multiline_partial_message_concat_two_ids },

{"ml_buffered_16_streams_pausing" , flb_test_ml_buffered_16_streams_pausing },
{NULL, NULL}
};

0 comments on commit b845530

Please sign in to comment.