From d5fc4e970e102064b71e5864831dad9663edd3de Mon Sep 17 00:00:00 2001 From: Takahiro Yamashita Date: Sun, 29 Jan 2023 21:22:45 +0900 Subject: [PATCH] ml: add mutex for flb_ml_stream_group To fix following race. ================== WARNING: ThreadSanitizer: data race (pid=15218) Read of size 8 at 0x7b2000008398 by thread T1: #0 flb_ml_flush_stream_group /home/taka/git/fluent-bit/src/multiline/flb_ml.c:1025 (fluent-bit+0x1930ba) #1 flb_ml_flush_parser_instance /home/taka/git/fluent-bit/src/multiline/flb_ml.c:116 (fluent-bit+0x1907ce) #2 flb_ml_flush_pending /home/taka/git/fluent-bit/src/multiline/flb_ml.c:136 (fluent-bit+0x1908e4) #3 cb_ml_flush_timer /home/taka/git/fluent-bit/src/multiline/flb_ml.c:162 (fluent-bit+0x1909e9) #4 flb_sched_event_handler /home/taka/git/fluent-bit/src/flb_scheduler.c:428 (fluent-bit+0x139c4a) #5 flb_engine_start /home/taka/git/fluent-bit/src/flb_engine.c:937 (fluent-bit+0x12612e) #6 flb_lib_worker /home/taka/git/fluent-bit/src/flb_lib.c:629 (fluent-bit+0xa0ccd) Previous write of size 8 at 0x7b2000008398 by thread T3: #0 calloc ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:672 (libtsan.so.0+0x31edc) #1 flb_calloc /home/taka/git/fluent-bit/include/fluent-bit/flb_mem.h:89 (fluent-bit+0x2272f4) #2 stream_group_create /home/taka/git/fluent-bit/src/multiline/flb_ml_stream.c:52 (fluent-bit+0x22c3fb) #3 flb_ml_stream_group_get /home/taka/git/fluent-bit/src/multiline/flb_ml_stream.c:126 (fluent-bit+0x22c799) #4 package_content /home/taka/git/fluent-bit/src/multiline/flb_ml.c:229 (fluent-bit+0x190bf6) #5 process_append /home/taka/git/fluent-bit/src/multiline/flb_ml.c:499 (fluent-bit+0x191782) #6 ml_append_try_parser /home/taka/git/fluent-bit/src/multiline/flb_ml.c:659 (fluent-bit+0x191f1a) #7 flb_ml_append /home/taka/git/fluent-bit/src/multiline/flb_ml.c:699 (fluent-bit+0x1920f7) #8 process_content /home/taka/git/fluent-bit/plugins/in_tail/tail_file.c:456 (fluent-bit+0x342854) #9 flb_tail_file_chunk /home/taka/git/fluent-bit/plugins/in_tail/tail_file.c:1341 (fluent-bit+0x346ef7) #10 in_tail_collect_static /home/taka/git/fluent-bit/plugins/in_tail/tail.c:188 (fluent-bit+0x324ee3) #11 input_collector_fd /home/taka/git/fluent-bit/src/flb_input_thread.c:168 (fluent-bit+0xe0369) #12 engine_handle_event /home/taka/git/fluent-bit/src/flb_input_thread.c:183 (fluent-bit+0xe10e5) #13 input_thread /home/taka/git/fluent-bit/src/flb_input_thread.c:384 (fluent-bit+0xe10e5) #14 step_callback /home/taka/git/fluent-bit/src/flb_worker.c:43 (fluent-bit+0x157a02) Location is heap block of size 120 at 0x7b2000008380 allocated by thread T3: #0 calloc ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:672 (libtsan.so.0+0x31edc) #1 flb_calloc /home/taka/git/fluent-bit/include/fluent-bit/flb_mem.h:89 (fluent-bit+0x2272f4) #2 stream_group_create /home/taka/git/fluent-bit/src/multiline/flb_ml_stream.c:52 (fluent-bit+0x22c3fb) #3 flb_ml_stream_group_get /home/taka/git/fluent-bit/src/multiline/flb_ml_stream.c:126 (fluent-bit+0x22c799) #4 package_content /home/taka/git/fluent-bit/src/multiline/flb_ml.c:229 (fluent-bit+0x190bf6) #5 process_append /home/taka/git/fluent-bit/src/multiline/flb_ml.c:499 (fluent-bit+0x191782) #6 ml_append_try_parser /home/taka/git/fluent-bit/src/multiline/flb_ml.c:659 (fluent-bit+0x191f1a) #7 flb_ml_append /home/taka/git/fluent-bit/src/multiline/flb_ml.c:699 (fluent-bit+0x1920f7) #8 process_content /home/taka/git/fluent-bit/plugins/in_tail/tail_file.c:456 (fluent-bit+0x342854) #9 flb_tail_file_chunk /home/taka/git/fluent-bit/plugins/in_tail/tail_file.c:1341 (fluent-bit+0x346ef7) #10 in_tail_collect_static /home/taka/git/fluent-bit/plugins/in_tail/tail.c:188 (fluent-bit+0x324ee3) #11 input_collector_fd /home/taka/git/fluent-bit/src/flb_input_thread.c:168 (fluent-bit+0xe0369) #12 engine_handle_event /home/taka/git/fluent-bit/src/flb_input_thread.c:183 (fluent-bit+0xe10e5) #13 input_thread /home/taka/git/fluent-bit/src/flb_input_thread.c:384 (fluent-bit+0xe10e5) #14 step_callback /home/taka/git/fluent-bit/src/flb_worker.c:43 (fluent-bit+0x157a02) Thread T1 'flb-pipeline' (tid=15220, running) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8) #1 mk_utils_worker_spawn /home/taka/git/fluent-bit/lib/monkey/mk_core/mk_utils.c:284 (fluent-bit+0xbfb422) #2 flb_main /home/taka/git/fluent-bit/src/fluent-bit.c:1231 (fluent-bit+0x871d1) #3 main /home/taka/git/fluent-bit/src/fluent-bit.c:1257 (fluent-bit+0x8732e) Thread T3 'flb-in-tail.0-w' (tid=15222, running) created by thread T1 at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8) #1 mk_utils_worker_spawn /home/taka/git/fluent-bit/lib/monkey/mk_core/mk_utils.c:284 (fluent-bit+0xbfb422) #2 flb_tp_thread_start /home/taka/git/fluent-bit/src/flb_thread_pool.c:123 (fluent-bit+0x170237) #3 flb_input_thread_instance_init /home/taka/git/fluent-bit/src/flb_input_thread.c:543 (fluent-bit+0xe1cca) #4 flb_input_instance_init /home/taka/git/fluent-bit/src/flb_input.c:1129 (fluent-bit+0xd73a4) #5 flb_input_init_all /home/taka/git/fluent-bit/src/flb_input.c:1216 (fluent-bit+0xd7877) #6 flb_engine_start /home/taka/git/fluent-bit/src/flb_engine.c:717 (fluent-bit+0x1252fe) #7 flb_lib_worker /home/taka/git/fluent-bit/src/flb_lib.c:629 (fluent-bit+0xa0ccd) SUMMARY: ThreadSanitizer: data race /home/taka/git/fluent-bit/src/multiline/flb_ml.c:1025 in flb_ml_flush_stream_group ================== Signed-off-by: Takahiro Yamashita --- include/fluent-bit/multiline/flb_ml.h | 3 +++ src/multiline/flb_ml.c | 21 ++++++++++++++++++--- src/multiline/flb_ml_stream.c | 2 ++ 3 files changed, 23 insertions(+), 3 deletions(-) diff --git a/include/fluent-bit/multiline/flb_ml.h b/include/fluent-bit/multiline/flb_ml.h index 2e891503af1..deafde3ca44 100644 --- a/include/fluent-bit/multiline/flb_ml.h +++ b/include/fluent-bit/multiline/flb_ml.h @@ -28,6 +28,7 @@ #include #include #include +#include /* Types available */ #define FLB_ML_REGEX 1 /* pattern is a regular expression */ @@ -101,6 +102,8 @@ struct flb_ml_stream_group { msgpack_packer mp_pck; /* temporary msgpack packer */ struct flb_time mp_time; /* multiline time parsed from first line */ + pthread_mutex_t pth_mutex; + struct mk_list _head; }; diff --git a/src/multiline/flb_ml.c b/src/multiline/flb_ml.c index 417c84e0ab5..b7cc370496a 100644 --- a/src/multiline/flb_ml.c +++ b/src/multiline/flb_ml.c @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -113,7 +114,9 @@ void flb_ml_flush_parser_instance(struct flb_ml *ml, /* Iterate stream groups */ mk_list_foreach(head_group, &mst->groups) { group = mk_list_entry(head_group, struct flb_ml_stream_group, _head); + pthread_mutex_lock(&group->pth_mutex); flb_ml_flush_stream_group(parser_i->ml_parser, mst, group, forced_flush); + pthread_mutex_unlock(&group->pth_mutex); } } } @@ -227,6 +230,7 @@ static int package_content(struct flb_ml_stream *mst, /* Get stream group */ stream_group = flb_ml_stream_group_get(mst->parser, mst, val_group); + pthread_mutex_lock(&stream_group->pth_mutex); if (!mst->last_stream_group) { mst->last_stream_group = stream_group; } @@ -236,6 +240,7 @@ static int package_content(struct flb_ml_stream *mst, mst->last_stream_group = stream_group; } } + pthread_mutex_unlock(&stream_group->pth_mutex); /* Set the parser type */ type = parser->type; @@ -254,9 +259,11 @@ static int package_content(struct flb_ml_stream *mst, } if (type == FLB_ML_REGEX) { + pthread_mutex_lock(&stream_group->pth_mutex); ret = flb_ml_rule_process(parser, mst, stream_group, full_map, buf, size, tm, val_content, val_pattern); + pthread_mutex_unlock(&stream_group->pth_mutex); if (ret == -1) { processed = FLB_FALSE; } @@ -276,7 +283,7 @@ static int package_content(struct flb_ml_stream *mst, else { rule_match = match_negate(parser, FLB_FALSE); } - + pthread_mutex_lock(&stream_group->pth_mutex); if (stream_group->mp_sbuf.size == 0) { flb_ml_register_context(stream_group, tm, full_map); } @@ -298,6 +305,7 @@ static int package_content(struct flb_ml_stream *mst, if (rule_match) { flb_ml_flush_stream_group(parser, mst, stream_group, FLB_FALSE); } + pthread_mutex_unlock(&stream_group->pth_mutex); processed = FLB_TRUE; } } @@ -310,7 +318,7 @@ static int package_content(struct flb_ml_stream *mst, else { rule_match = match_negate(parser, FLB_FALSE); } - + pthread_mutex_lock(&stream_group->pth_mutex); if (stream_group->mp_sbuf.size == 0) { flb_ml_register_context(stream_group, tm, full_map); } @@ -332,6 +340,7 @@ static int package_content(struct flb_ml_stream *mst, if (rule_match) { flb_ml_flush_stream_group(parser, mst, stream_group, FLB_FALSE); } + pthread_mutex_unlock(&stream_group->pth_mutex); processed = FLB_TRUE; } @@ -342,13 +351,16 @@ static int package_content(struct flb_ml_stream *mst, * process it as a raw text generating a single record with the given * content. */ + if (!processed && type == FLB_ML_TYPE_TEXT) { + pthread_mutex_lock(&stream_group->pth_mutex); flb_ml_flush_stream_group(parser, mst, stream_group, FLB_FALSE); /* Concatenate value */ flb_sds_cat_safe(&stream_group->buf, buf, size); breakline_prepare(parser_i, stream_group); flb_ml_flush_stream_group(parser, mst, stream_group, FLB_FALSE); + pthread_mutex_unlock(&stream_group->pth_mutex); } else { return FLB_FALSE; @@ -762,7 +774,9 @@ int flb_ml_append(struct flb_ml *ml, uint64_t stream_id, /* Get stream group */ st_group = flb_ml_stream_group_get(mst->parser, mst, NULL); flb_sds_cat_safe(&st_group->buf, buf, size); + pthread_mutex_lock(&st_group->pth_mutex); flb_ml_flush_stream_group(parser_i->ml_parser, mst, st_group, FLB_FALSE); + pthread_mutex_unlock(&st_group->pth_mutex); } return 0; @@ -874,7 +888,7 @@ int flb_ml_append_object(struct flb_ml *ml, uint64_t stream_id, /* Get stream group */ st_group = flb_ml_stream_group_get(mst->parser, mst, NULL); - + pthread_mutex_lock(&st_group->pth_mutex); /* Append record content to group msgpack buffer */ msgpack_pack_array(&st_group->mp_pck, 2); @@ -892,6 +906,7 @@ int flb_ml_append_object(struct flb_ml *ml, uint64_t stream_id, /* Update last flush time */ st_group->last_flush = time_ms_now(); + pthread_mutex_unlock(&st_group->pth_mutex); } return 0; diff --git a/src/multiline/flb_ml_stream.c b/src/multiline/flb_ml_stream.c index d3f9f10a2ee..c0ea86c6662 100644 --- a/src/multiline/flb_ml_stream.c +++ b/src/multiline/flb_ml_stream.c @@ -63,6 +63,8 @@ static struct flb_ml_stream_group *stream_group_create(struct flb_ml_stream *mst /* status */ group->first_line = FLB_TRUE; + pthread_mutex_init(&group->pth_mutex, NULL); + /* multiline buffer */ group->buf = flb_sds_create_size(FLB_ML_BUF_SIZE); if (!group->buf) {