From cba27c1e94e0f32e98a23ac29445af6332ffdeb3 Mon Sep 17 00:00:00 2001 From: luoyy8 Date: Mon, 16 Dec 2024 06:02:06 +0800 Subject: [PATCH] in_kafka: add parser support and improve performance When processing record, the in_kafka plugin lack of parser ability, This commit added support to parse payload. It has been tested with kafka. When processing record, the in_kafka plugin currently will commit every single message poll from kafka. which is good at normal case. But with this default behavier, the performance is strictly limited. This commit has add auto_commit as an option when the performance matter more. It has been tested with kafka. There is a sample benchmark as following. topic_size topic_offset time_cost before 251 MB 506701 1m30s after 251 MB 506701 10s Signed-off-by: luoyy8 --- plugins/in_kafka/in_kafka.c | 112 +++++++++++++++++++++++++++++++++--- plugins/in_kafka/in_kafka.h | 3 + 2 files changed, 106 insertions(+), 9 deletions(-) diff --git a/plugins/in_kafka/in_kafka.c b/plugins/in_kafka/in_kafka.c index 5d149d731e3..3672555ae3f 100644 --- a/plugins/in_kafka/in_kafka.c +++ b/plugins/in_kafka/in_kafka.c @@ -60,6 +60,9 @@ static int process_message(struct flb_in_kafka_config *ctx, { struct flb_log_event_encoder *log_encoder = ctx->log_encoder; int ret; + struct flb_time out_time; + void *out_buf; + size_t out_size; ret = flb_log_event_encoder_begin_record(log_encoder); @@ -128,12 +131,38 @@ static int process_message(struct flb_in_kafka_config *ctx, if (ret == FLB_EVENT_ENCODER_SUCCESS) { if (rkm->payload) { - if (ctx->format != FLB_IN_KAFKA_FORMAT_JSON || + /* Reset time for each line */ + flb_time_zero(&out_time); + + /* Use the defined parser */ + ret = flb_parser_do(ctx->parser, rkm->payload, rkm->len, + &out_buf, &out_size, &out_time); + + if (ret >= 0) { + if (flb_time_to_nanosec(&out_time) == 0L) { + flb_time_get(&out_time); + } + ret = flb_log_event_encoder_set_timestamp(log_encoder, &out_time); + + if (ret == FLB_EVENT_ENCODER_SUCCESS){ + ret = flb_log_event_encoder_append_body_raw_msgpack(log_encoder, + out_buf, + out_size); + } + + flb_free(out_buf); + } + else { + flb_plg_warn(ctx->ins, + "failed to parse payload, fluentbit error code : %d, return to default behaver\n", + ret); + if (ctx->format != FLB_IN_KAFKA_FORMAT_JSON || try_json(log_encoder, rkm)) { - ret = flb_log_event_encoder_append_body_string(log_encoder, - rkm->payload, - rkm->len); - } + ret = flb_log_event_encoder_append_body_string(log_encoder, + rkm->payload, + rkm->len); + } + } } else { ret = flb_log_event_encoder_append_body_null(log_encoder); @@ -164,6 +193,7 @@ static int in_kafka_collect(struct flb_input_instance *ins, rkm = rd_kafka_consumer_poll(ctx->kafka.rk, 1); if (!rkm) { + flb_plg_debug(ins, "no message polled, break collection loop"); break; } @@ -177,12 +207,33 @@ static int in_kafka_collect(struct flb_input_instance *ins, flb_plg_debug(ins, "kafka message received"); ret = process_message(ctx, rkm); - + flb_plg_debug(ins, + "encode kafka message error, \ + topic: %s, offset: %s, partition: %s\n", + rd_kafka_topic_name(rkm -> rkt), + rkm->offset, + rkm->partition); + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + if ( ctx -> auto_commit ) { + rd_kafka_offset_store_message(rkm); + } + else{ + rd_kafka_commit_message(ctx -> kafka.rk,rkm,0); + } + } + else{ + flb_plg_warn(ins, + "encode kafka message error, \ + topic: %s, offset: %s, partition: %s\n", + rd_kafka_topic_name(rkm -> rkt), + rkm->offset, + rkm->partition); + rd_kafka_message_destroy(rkm); + continue; + } + rd_kafka_message_destroy(rkm); - /* TO-DO: commit the record based on `ret` */ - rd_kafka_commit(ctx->kafka.rk, NULL, 0); - /* Break from the loop when reaching the limit of polling if available */ if (ctx->polling_threshold != FLB_IN_KAFKA_UNLIMITED && ctx->log_encoder->output_length > ctx->polling_threshold + 512) { @@ -237,6 +288,17 @@ static int in_kafka_init(struct flb_input_instance *ins, return -1; } + /* parser settings, need to set after flb_input_config_map_set call */ + if (ctx->parser_name) { + flb_plg_debug(ctx->ins, "request parser '%s'", ctx->parser_name); + ctx->parser = flb_parser_get(ctx->parser_name, config); + if (!ctx->parser) { + flb_plg_error(ctx->ins, "requested parser '%s' not found", + ctx->parser_name); + return -1; + } + } + kafka_conf = flb_kafka_conf_create(&ctx->kafka, &ins->properties, 1); if (!kafka_conf) { flb_plg_error(ins, "Could not initialize kafka config object"); @@ -263,6 +325,25 @@ static int in_kafka_init(struct flb_input_instance *ins, rd_kafka_err2str(err), conf_val); goto init_error; } + + /* disable auto_offset_store, manully store/commit offset latter */ + res = rd_kafka_conf_set(kafka_conf, "enable.auto.offset.store", "false", + errstr, sizeof(errstr)); + if (res != RD_KAFKA_CONF_OK) { + flb_plg_error(ins, "Failed to set up enable.auto.offset.store: %s, val = %s", + rd_kafka_err2str(err), "false"); + goto init_error; + } + + if (!ctx ->auto_commit){ + res = rd_kafka_conf_set(kafka_conf, "enable.auto.commit", "false", + errstr, sizeof(errstr)); + if (res != RD_KAFKA_CONF_OK) { + flb_plg_error(ins, "Failed to set up enable.auto.commit : %s, val = %s", + rd_kafka_err2str(err), "false"); + goto init_error; + } + } } else { ctx->polling_threshold = FLB_IN_KAFKA_UNLIMITED; @@ -402,6 +483,12 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, format_str), "Set the data format which will be used for parsing records." }, + { + FLB_CONFIG_MAP_STR, "parser", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, parser_name), + "Set the data parser which will be used for \ + parsing kafka message payload." + }, { FLB_CONFIG_MAP_STR, "brokers", (char *)NULL, 0, FLB_FALSE, 0, @@ -428,6 +515,13 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, buffer_max_size), "Set the maximum size of chunk" }, + { + FLB_CONFIG_MAP_BOOL, "auto_commit", false, + 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, auto_commit), + "Set if enable the auto_commit \ + Set it to true when the throughtput \ + is the game" + }, /* EOF */ {0} }; diff --git a/plugins/in_kafka/in_kafka.h b/plugins/in_kafka/in_kafka.h index b56d9c66893..1b82cbe63ed 100644 --- a/plugins/in_kafka/in_kafka.h +++ b/plugins/in_kafka/in_kafka.h @@ -48,6 +48,9 @@ struct flb_in_kafka_config { int coll_fd; size_t buffer_max_size; /* Maximum size of chunk allocation */ size_t polling_threshold; + flb_sds_t parser_name; /* Bame of the parser */ + struct flb_parser *parser; /* Parser */ + bool auto_commit; /* Auto commit switch */ }; #endif