Skip to content

Commit

Permalink
in_kafka: add parser support and improve performance
Browse files Browse the repository at this point in the history
When processing record, the in_kafka plugin lack of
parser ability, This commit added support to
parse payload. It has been tested with kafka.

When processing record, the in_kafka plugin currently
will commit every single message poll from kafka.
which is good at normal case. But with this default
behavier, the performance is strictly limited. This
commit has add auto_commit as an option when
the performance matter more. It has been tested
with kafka. There is a sample benchmark as following.
              topic_size topic_offset time_cost
before  251 MB     506701         1m30s
after     251 MB     506701         10s

Signed-off-by: luoyy8 <[email protected]>
  • Loading branch information
luoyy8 committed Dec 15, 2024
1 parent 362ca1f commit cba27c1
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 9 deletions.
112 changes: 103 additions & 9 deletions plugins/in_kafka/in_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ static int process_message(struct flb_in_kafka_config *ctx,
{
struct flb_log_event_encoder *log_encoder = ctx->log_encoder;
int ret;
struct flb_time out_time;
void *out_buf;
size_t out_size;

ret = flb_log_event_encoder_begin_record(log_encoder);

Expand Down Expand Up @@ -128,12 +131,38 @@ static int process_message(struct flb_in_kafka_config *ctx,

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
if (rkm->payload) {
if (ctx->format != FLB_IN_KAFKA_FORMAT_JSON ||
/* Reset time for each line */
flb_time_zero(&out_time);

/* Use the defined parser */
ret = flb_parser_do(ctx->parser, rkm->payload, rkm->len,
&out_buf, &out_size, &out_time);

if (ret >= 0) {
if (flb_time_to_nanosec(&out_time) == 0L) {
flb_time_get(&out_time);
}
ret = flb_log_event_encoder_set_timestamp(log_encoder, &out_time);

if (ret == FLB_EVENT_ENCODER_SUCCESS){
ret = flb_log_event_encoder_append_body_raw_msgpack(log_encoder,
out_buf,
out_size);
}

flb_free(out_buf);
}
else {
flb_plg_warn(ctx->ins,
"failed to parse payload, fluentbit error code : %d, return to default behaver\n",
ret);
if (ctx->format != FLB_IN_KAFKA_FORMAT_JSON ||
try_json(log_encoder, rkm)) {
ret = flb_log_event_encoder_append_body_string(log_encoder,
rkm->payload,
rkm->len);
}
ret = flb_log_event_encoder_append_body_string(log_encoder,
rkm->payload,
rkm->len);
}
}
}
else {
ret = flb_log_event_encoder_append_body_null(log_encoder);
Expand Down Expand Up @@ -164,6 +193,7 @@ static int in_kafka_collect(struct flb_input_instance *ins,
rkm = rd_kafka_consumer_poll(ctx->kafka.rk, 1);

if (!rkm) {
flb_plg_debug(ins, "no message polled, break collection loop");
break;
}

Expand All @@ -177,12 +207,33 @@ static int in_kafka_collect(struct flb_input_instance *ins,
flb_plg_debug(ins, "kafka message received");

ret = process_message(ctx, rkm);

flb_plg_debug(ins,
"encode kafka message error, \
topic: %s, offset: %s, partition: %s\n",
rd_kafka_topic_name(rkm -> rkt),
rkm->offset,
rkm->partition);
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
if ( ctx -> auto_commit ) {
rd_kafka_offset_store_message(rkm);
}
else{
rd_kafka_commit_message(ctx -> kafka.rk,rkm,0);
}
}
else{
flb_plg_warn(ins,
"encode kafka message error, \
topic: %s, offset: %s, partition: %s\n",
rd_kafka_topic_name(rkm -> rkt),
rkm->offset,
rkm->partition);
rd_kafka_message_destroy(rkm);
continue;
}

rd_kafka_message_destroy(rkm);

/* TO-DO: commit the record based on `ret` */
rd_kafka_commit(ctx->kafka.rk, NULL, 0);

/* Break from the loop when reaching the limit of polling if available */
if (ctx->polling_threshold != FLB_IN_KAFKA_UNLIMITED &&
ctx->log_encoder->output_length > ctx->polling_threshold + 512) {
Expand Down Expand Up @@ -237,6 +288,17 @@ static int in_kafka_init(struct flb_input_instance *ins,
return -1;
}

/* parser settings, need to set after flb_input_config_map_set call */
if (ctx->parser_name) {
flb_plg_debug(ctx->ins, "request parser '%s'", ctx->parser_name);
ctx->parser = flb_parser_get(ctx->parser_name, config);
if (!ctx->parser) {
flb_plg_error(ctx->ins, "requested parser '%s' not found",
ctx->parser_name);
return -1;
}
}

kafka_conf = flb_kafka_conf_create(&ctx->kafka, &ins->properties, 1);
if (!kafka_conf) {
flb_plg_error(ins, "Could not initialize kafka config object");
Expand All @@ -263,6 +325,25 @@ static int in_kafka_init(struct flb_input_instance *ins,
rd_kafka_err2str(err), conf_val);
goto init_error;
}

/* disable auto_offset_store, manully store/commit offset latter */
res = rd_kafka_conf_set(kafka_conf, "enable.auto.offset.store", "false",
errstr, sizeof(errstr));
if (res != RD_KAFKA_CONF_OK) {
flb_plg_error(ins, "Failed to set up enable.auto.offset.store: %s, val = %s",
rd_kafka_err2str(err), "false");
goto init_error;
}

if (!ctx ->auto_commit){
res = rd_kafka_conf_set(kafka_conf, "enable.auto.commit", "false",
errstr, sizeof(errstr));
if (res != RD_KAFKA_CONF_OK) {
flb_plg_error(ins, "Failed to set up enable.auto.commit : %s, val = %s",
rd_kafka_err2str(err), "false");
goto init_error;
}
}
}
else {
ctx->polling_threshold = FLB_IN_KAFKA_UNLIMITED;
Expand Down Expand Up @@ -402,6 +483,12 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, format_str),
"Set the data format which will be used for parsing records."
},
{
FLB_CONFIG_MAP_STR, "parser", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, parser_name),
"Set the data parser which will be used for \
parsing kafka message payload."
},
{
FLB_CONFIG_MAP_STR, "brokers", (char *)NULL,
0, FLB_FALSE, 0,
Expand All @@ -428,6 +515,13 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, buffer_max_size),
"Set the maximum size of chunk"
},
{
FLB_CONFIG_MAP_BOOL, "auto_commit", false,
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, auto_commit),
"Set if enable the auto_commit \
Set it to true when the throughtput \
is the game"
},
/* EOF */
{0}
};
Expand Down
3 changes: 3 additions & 0 deletions plugins/in_kafka/in_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ struct flb_in_kafka_config {
int coll_fd;
size_t buffer_max_size; /* Maximum size of chunk allocation */
size_t polling_threshold;
flb_sds_t parser_name; /* Bame of the parser */
struct flb_parser *parser; /* Parser */
bool auto_commit; /* Auto commit switch */
};

#endif

0 comments on commit cba27c1

Please sign in to comment.