Skip to content

Commit

Permalink
in_kafka: Register pause/resume callbacks to handle back pressure cor…
Browse files Browse the repository at this point in the history
…rectly

Signed-off-by: Hiroshi Hatake <[email protected]>
  • Loading branch information
cosmo0920 committed Nov 14, 2023
1 parent 9b9c800 commit cc1e554
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 0 deletions.
18 changes: 18 additions & 0 deletions plugins/in_kafka/in_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ static int in_kafka_init(struct flb_input_instance *ins,
goto init_error;
}

ctx->coll_fd = ret;

ctx->log_encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT);

if (ctx->log_encoder == NULL) {
Expand All @@ -306,6 +308,20 @@ static int in_kafka_init(struct flb_input_instance *ins,
return -1;
}

static void in_kafka_pause(void *data, struct flb_config *config)
{
struct flb_in_kafka_config *ctx = data;

flb_input_collector_pause(ctx->coll_fd, ctx->ins);
}

static void in_kafka_resume(void *data, struct flb_config *config)
{
struct flb_in_kafka_config *ctx = data;

flb_input_collector_resume(ctx->coll_fd, ctx->ins);
}

/* Cleanup serial input */
static int in_kafka_exit(void *in_context, struct flb_config *config)
{
Expand Down Expand Up @@ -377,6 +393,8 @@ struct flb_input_plugin in_kafka_plugin = {
.cb_pre_run = NULL,
.cb_collect = in_kafka_collect,
.cb_flush_buf = NULL,
.cb_pause = in_kafka_pause,
.cb_resume = in_kafka_resume,
.cb_exit = in_kafka_exit,
.config_map = config_map
};
1 change: 1 addition & 0 deletions plugins/in_kafka/in_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ struct flb_in_kafka_config {
int poll_ms;
int format;
char *format_str;
int coll_fd;
};

#endif

0 comments on commit cc1e554

Please sign in to comment.