Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_kafka: Make back pressure workable and add a mechanism of polling threshold #8174

Merged
71 changes: 68 additions & 3 deletions plugins/in_kafka/in_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,13 @@ static int in_kafka_collect(struct flb_input_instance *ins,
break;
}

if (rkm->err) {
flb_plg_warn(ins, "consumer error: %s\n",
rd_kafka_message_errstr(rkm));
rd_kafka_message_destroy(rkm);
continue;
}

flb_plg_debug(ins, "kafka message received");

ret = process_message(ctx, rkm);
Expand All @@ -175,12 +182,20 @@ static int in_kafka_collect(struct flb_input_instance *ins,

/* TO-DO: commit the record based on `ret` */
rd_kafka_commit(ctx->kafka.rk, NULL, 0);

/* Break from the loop when reaching the limit of polling if available */
if (ctx->polling_threshold != FLB_IN_KAFKA_UNLIMITED &&
ctx->log_encoder->output_length > ctx->polling_threshold + 512) {
break;
}
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
flb_input_log_append(ins, NULL, 0,
ctx->log_encoder->output_buffer,
ctx->log_encoder->output_length);
if (ctx->log_encoder->output_length > 0) {
flb_input_log_append(ins, NULL, 0,
ctx->log_encoder->output_buffer,
ctx->log_encoder->output_length);
}
ret = 0;
}
else {
Expand All @@ -203,8 +218,10 @@ static int in_kafka_init(struct flb_input_instance *ins,
rd_kafka_conf_t *kafka_conf = NULL;
rd_kafka_topic_partition_list_t *kafka_topics = NULL;
rd_kafka_resp_err_t err;
rd_kafka_conf_res_t res;
char errstr[512];
(void) data;
char conf_val[16];

/* Allocate space for the configuration context */
ctx = flb_malloc(sizeof(struct flb_in_kafka_config));
Expand All @@ -226,6 +243,31 @@ static int in_kafka_init(struct flb_input_instance *ins,
goto init_error;
}

if (ctx->buffer_max_size > 0) {
ctx->polling_threshold = ctx->buffer_max_size;

snprintf(conf_val, sizeof(conf_val), "%zu", ctx->polling_threshold - 512);
res = rd_kafka_conf_set(kafka_conf, "fetch.max.bytes", conf_val,
errstr, sizeof(errstr));
if (res != RD_KAFKA_CONF_OK) {
flb_plg_error(ins, "Failed to set up fetch.max.bytes: %s, val = %s",
rd_kafka_err2str(err), conf_val);
goto init_error;
}

snprintf(conf_val, sizeof(conf_val), "%zu", ctx->polling_threshold);
res = rd_kafka_conf_set(kafka_conf, "receive.message.max.bytes", conf_val,
errstr, sizeof(errstr));
if (res != RD_KAFKA_CONF_OK) {
flb_plg_error(ins, "Failed to set up receive.message.max.bytes: %s, val = %s",
rd_kafka_err2str(err), conf_val);
goto init_error;
}
}
else {
ctx->polling_threshold = FLB_IN_KAFKA_UNLIMITED;
}

ctx->kafka.rk = rd_kafka_new(RD_KAFKA_CONSUMER, kafka_conf, errstr,
sizeof(errstr));

Expand Down Expand Up @@ -281,6 +323,8 @@ static int in_kafka_init(struct flb_input_instance *ins,
goto init_error;
}

ctx->coll_fd = ret;

ctx->log_encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT);

if (ctx->log_encoder == NULL) {
Expand All @@ -306,6 +350,20 @@ static int in_kafka_init(struct flb_input_instance *ins,
return -1;
}

static void in_kafka_pause(void *data, struct flb_config *config)
{
struct flb_in_kafka_config *ctx = data;

flb_input_collector_pause(ctx->coll_fd, ctx->ins);
}

static void in_kafka_resume(void *data, struct flb_config *config)
{
struct flb_in_kafka_config *ctx = data;

flb_input_collector_resume(ctx->coll_fd, ctx->ins);
}

/* Cleanup serial input */
static int in_kafka_exit(void *in_context, struct flb_config *config)
{
Expand Down Expand Up @@ -365,6 +423,11 @@ static struct flb_config_map config_map[] = {
0, FLB_FALSE, 0,
"Set the librdkafka options"
},
{
FLB_CONFIG_MAP_SIZE, "buffer_max_size", FLB_IN_KAFKA_BUFFER_MAX_SIZE,
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, buffer_max_size),
"Set the maximum size of chunk"
},
/* EOF */
{0}
};
Expand All @@ -377,6 +440,8 @@ struct flb_input_plugin in_kafka_plugin = {
.cb_pre_run = NULL,
.cb_collect = in_kafka_collect,
.cb_flush_buf = NULL,
.cb_pause = in_kafka_pause,
.cb_resume = in_kafka_resume,
.cb_exit = in_kafka_exit,
.config_map = config_map
};
5 changes: 5 additions & 0 deletions plugins/in_kafka/in_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

#define FLB_IN_KAFKA_DEFAULT_POLL_MS "500"
#define FLB_IN_KAFKA_DEFAULT_FORMAT "none"
#define FLB_IN_KAFKA_UNLIMITED (size_t)-1
#define FLB_IN_KAFKA_BUFFER_MAX_SIZE "4M"

enum {
FLB_IN_KAFKA_FORMAT_NONE,
Expand All @@ -43,6 +45,9 @@ struct flb_in_kafka_config {
int poll_ms;
int format;
char *format_str;
int coll_fd;
size_t buffer_max_size; /* Maximum size of chunk allocation */
size_t polling_threshold;
};

#endif
Loading