diff --git a/plugins/in_emitter/emitter.c b/plugins/in_emitter/emitter.c index 62886d1346c..532a629b924 100644 --- a/plugins/in_emitter/emitter.c +++ b/plugins/in_emitter/emitter.c @@ -31,6 +31,9 @@ #define DEFAULT_EMITTER_RING_BUFFER_FLUSH_FREQUENCY 2000 +/* return values */ +#define FLB_EMITTER_BUSY 3 + struct em_chunk { flb_sds_t tag; struct msgpack_sbuffer mp_sbuf; /* msgpack sbuffer */ @@ -39,6 +42,7 @@ struct em_chunk { }; struct flb_emitter { + int coll_fd; /* collector id */ struct mk_list chunks; /* list of all pending chunks */ struct flb_input_instance *ins; /* input instance */ struct flb_ring_buffer *msgs; /* ring buffer for cross-thread messages */ @@ -97,7 +101,6 @@ int static do_in_emitter_add_record(struct em_chunk *ec, em_chunk_destroy(ec); return -1; } - /* Release the echunk */ em_chunk_destroy(ec); return 0; } @@ -118,6 +121,12 @@ int in_emitter_add_record(const char *tag, int tag_len, ctx = (struct flb_emitter *) in->context; ec = NULL; + /* Restricted by mem_buf_limit */ + if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) { + flb_plg_debug(ctx->ins, "emitter memory buffer limit reached. Not accepting record."); + return FLB_EMITTER_BUSY; + } + /* Use the ring buffer first if it exists */ if (ctx->msgs) { memset(&temporary_chunk, 0, sizeof(struct em_chunk)); @@ -161,8 +170,7 @@ int in_emitter_add_record(const char *tag, int tag_len, /* Append raw msgpack data */ msgpack_sbuffer_write(&ec->mp_sbuf, buf_data, buf_size); - - return do_in_emitter_add_record(ec, in); + return 0; } /* @@ -191,6 +199,34 @@ static int in_emitter_ingest_ring_buffer(struct flb_input_instance *in, return ret; } +static int cb_queue_chunks(struct flb_input_instance *in, + struct flb_config *config, void *data) +{ + int ret; + struct mk_list *tmp; + struct mk_list *head; + struct em_chunk *echunk; + struct flb_emitter *ctx; + + /* Get context */ + ctx = (struct flb_emitter *) data; + + /* Try to enqueue chunks under our limits */ + mk_list_foreach_safe(head, tmp, &ctx->chunks) { + echunk = mk_list_entry(head, struct em_chunk, _head); + + /* Associate this backlog chunk to this instance into the engine */ + ret = do_in_emitter_add_record(echunk, in); + if (ret == -1) { + flb_error("[in_emitter] error registering chunk with tag: %s", + echunk->tag); + continue; + } + } + + return 0; +} + static int in_emitter_start_ring_buffer(struct flb_input_instance *in, struct flb_emitter *ctx) { if (ctx->ring_buffer_size <= 0) { @@ -257,6 +293,15 @@ static int cb_emitter_init(struct flb_input_instance *in, return -1; } } + else{ + ret = flb_input_set_collector_time(in, cb_queue_chunks, 0, 50000000, config); + if (ret < 0) { + flb_error("[in_emitter] could not create collector"); + flb_free(ctx); + return -1; + } + ctx->coll_fd = ret; + } /* export plugin context */ flb_input_set_context(in, ctx); @@ -264,6 +309,18 @@ static int cb_emitter_init(struct flb_input_instance *in, return 0; } +static void cb_emitter_pause(void *data, struct flb_config *config) +{ + struct flb_emitter *ctx = data; + flb_input_collector_pause(ctx->coll_fd, ctx->ins); +} + +static void cb_emitter_resume(void *data, struct flb_config *config) +{ + struct flb_emitter *ctx = data; + flb_input_collector_resume(ctx->coll_fd, ctx->ins); +} + static int cb_emitter_exit(void *data, struct flb_config *config) { struct mk_list *tmp; @@ -312,8 +369,8 @@ struct flb_input_plugin in_emitter_plugin = { .cb_ingest = NULL, .cb_flush_buf = NULL, .config_map = config_map, - .cb_pause = NULL, - .cb_resume = NULL, + .cb_pause = cb_emitter_pause, + .cb_resume = cb_emitter_resume, .cb_exit = cb_emitter_exit, /* This plugin can only be configured and invoked by the Engine only */