Skip to content

Commit

Permalink
in_emitter: Fix to prevent single record chunks and do pause on mem_b…
Browse files Browse the repository at this point in the history
…uf_limit

The current code creates a situation, where only one record per chunk
 is created. In case of a non-existing ring-buffer, the old mechanism is used.

Also the in_emitter plugin continued to accept records even after the
set emitter_mem_buf_limit was reached. This commit implements a
check if the plugin was paused and returns accordingly.

Signed-off-by: Richard Treu <[email protected]>
  • Loading branch information
drbugfinder-work committed Feb 12, 2024
1 parent 9652b0d commit feb4243
Showing 1 changed file with 62 additions and 5 deletions.
67 changes: 62 additions & 5 deletions plugins/in_emitter/emitter.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@

#define DEFAULT_EMITTER_RING_BUFFER_FLUSH_FREQUENCY 2000

/* return values */
#define FLB_EMITTER_BUSY 3

struct em_chunk {
flb_sds_t tag;
struct msgpack_sbuffer mp_sbuf; /* msgpack sbuffer */
Expand All @@ -39,6 +42,7 @@ struct em_chunk {
};

struct flb_emitter {
int coll_fd; /* collector id */
struct mk_list chunks; /* list of all pending chunks */
struct flb_input_instance *ins; /* input instance */
struct flb_ring_buffer *msgs; /* ring buffer for cross-thread messages */
Expand Down Expand Up @@ -97,7 +101,6 @@ int static do_in_emitter_add_record(struct em_chunk *ec,
em_chunk_destroy(ec);
return -1;
}
/* Release the echunk */
em_chunk_destroy(ec);
return 0;
}
Expand All @@ -118,6 +121,12 @@ int in_emitter_add_record(const char *tag, int tag_len,
ctx = (struct flb_emitter *) in->context;
ec = NULL;

/* Restricted by mem_buf_limit */
if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) {
flb_plg_debug(ctx->ins, "emitter memory buffer limit reached. Not accepting record.");
return FLB_EMITTER_BUSY;
}

/* Use the ring buffer first if it exists */
if (ctx->msgs) {
memset(&temporary_chunk, 0, sizeof(struct em_chunk));
Expand Down Expand Up @@ -161,8 +170,7 @@ int in_emitter_add_record(const char *tag, int tag_len,

/* Append raw msgpack data */
msgpack_sbuffer_write(&ec->mp_sbuf, buf_data, buf_size);

return do_in_emitter_add_record(ec, in);
return 0;
}

/*
Expand Down Expand Up @@ -191,6 +199,34 @@ static int in_emitter_ingest_ring_buffer(struct flb_input_instance *in,
return ret;
}

static int cb_queue_chunks(struct flb_input_instance *in,
struct flb_config *config, void *data)
{
int ret;
struct mk_list *tmp;
struct mk_list *head;
struct em_chunk *echunk;
struct flb_emitter *ctx;

/* Get context */
ctx = (struct flb_emitter *) data;

/* Try to enqueue chunks under our limits */
mk_list_foreach_safe(head, tmp, &ctx->chunks) {
echunk = mk_list_entry(head, struct em_chunk, _head);

/* Associate this backlog chunk to this instance into the engine */
ret = do_in_emitter_add_record(echunk, in);
if (ret == -1) {
flb_error("[in_emitter] error registering chunk with tag: %s",
echunk->tag);
continue;
}
}

return 0;
}

static int in_emitter_start_ring_buffer(struct flb_input_instance *in, struct flb_emitter *ctx)
{
if (ctx->ring_buffer_size <= 0) {
Expand Down Expand Up @@ -257,13 +293,34 @@ static int cb_emitter_init(struct flb_input_instance *in,
return -1;
}
}
else{
ret = flb_input_set_collector_time(in, cb_queue_chunks, 0, 50000000, config);
if (ret < 0) {
flb_error("[in_emitter] could not create collector");
flb_free(ctx);
return -1;
}
ctx->coll_fd = ret;
}

/* export plugin context */
flb_input_set_context(in, ctx);

return 0;
}

static void cb_emitter_pause(void *data, struct flb_config *config)
{
struct flb_emitter *ctx = data;
flb_input_collector_pause(ctx->coll_fd, ctx->ins);
}

static void cb_emitter_resume(void *data, struct flb_config *config)
{
struct flb_emitter *ctx = data;
flb_input_collector_resume(ctx->coll_fd, ctx->ins);
}

static int cb_emitter_exit(void *data, struct flb_config *config)
{
struct mk_list *tmp;
Expand Down Expand Up @@ -312,8 +369,8 @@ struct flb_input_plugin in_emitter_plugin = {
.cb_ingest = NULL,
.cb_flush_buf = NULL,
.config_map = config_map,
.cb_pause = NULL,
.cb_resume = NULL,
.cb_pause = cb_emitter_pause,
.cb_resume = cb_emitter_resume,
.cb_exit = cb_emitter_exit,

/* This plugin can only be configured and invoked by the Engine only */
Expand Down

0 comments on commit feb4243

Please sign in to comment.