Skip to content

Commit

Permalink
ml: add mutex for flb_ml_stream_group
Browse files Browse the repository at this point in the history
To fix following race.

==================
WARNING: ThreadSanitizer: data race (pid=15218)
  Read of size 8 at 0x7b2000008398 by thread T1:
    #0 flb_ml_flush_stream_group /home/taka/git/fluent-bit/src/multiline/flb_ml.c:1025 (fluent-bit+0x1930ba)
    #1 flb_ml_flush_parser_instance /home/taka/git/fluent-bit/src/multiline/flb_ml.c:116 (fluent-bit+0x1907ce)
    #2 flb_ml_flush_pending /home/taka/git/fluent-bit/src/multiline/flb_ml.c:136 (fluent-bit+0x1908e4)
    #3 cb_ml_flush_timer /home/taka/git/fluent-bit/src/multiline/flb_ml.c:162 (fluent-bit+0x1909e9)
    #4 flb_sched_event_handler /home/taka/git/fluent-bit/src/flb_scheduler.c:428 (fluent-bit+0x139c4a)
    #5 flb_engine_start /home/taka/git/fluent-bit/src/flb_engine.c:937 (fluent-bit+0x12612e)
    #6 flb_lib_worker /home/taka/git/fluent-bit/src/flb_lib.c:629 (fluent-bit+0xa0ccd)

  Previous write of size 8 at 0x7b2000008398 by thread T3:
    #0 calloc ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:672 (libtsan.so.0+0x31edc)
    #1 flb_calloc /home/taka/git/fluent-bit/include/fluent-bit/flb_mem.h:89 (fluent-bit+0x2272f4)
    #2 stream_group_create /home/taka/git/fluent-bit/src/multiline/flb_ml_stream.c:52 (fluent-bit+0x22c3fb)
    #3 flb_ml_stream_group_get /home/taka/git/fluent-bit/src/multiline/flb_ml_stream.c:126 (fluent-bit+0x22c799)
    #4 package_content /home/taka/git/fluent-bit/src/multiline/flb_ml.c:229 (fluent-bit+0x190bf6)
    #5 process_append /home/taka/git/fluent-bit/src/multiline/flb_ml.c:499 (fluent-bit+0x191782)
    #6 ml_append_try_parser /home/taka/git/fluent-bit/src/multiline/flb_ml.c:659 (fluent-bit+0x191f1a)
    #7 flb_ml_append /home/taka/git/fluent-bit/src/multiline/flb_ml.c:699 (fluent-bit+0x1920f7)
    #8 process_content /home/taka/git/fluent-bit/plugins/in_tail/tail_file.c:456 (fluent-bit+0x342854)
    #9 flb_tail_file_chunk /home/taka/git/fluent-bit/plugins/in_tail/tail_file.c:1341 (fluent-bit+0x346ef7)
    #10 in_tail_collect_static /home/taka/git/fluent-bit/plugins/in_tail/tail.c:188 (fluent-bit+0x324ee3)
    #11 input_collector_fd /home/taka/git/fluent-bit/src/flb_input_thread.c:168 (fluent-bit+0xe0369)
    #12 engine_handle_event /home/taka/git/fluent-bit/src/flb_input_thread.c:183 (fluent-bit+0xe10e5)
    #13 input_thread /home/taka/git/fluent-bit/src/flb_input_thread.c:384 (fluent-bit+0xe10e5)
    #14 step_callback /home/taka/git/fluent-bit/src/flb_worker.c:43 (fluent-bit+0x157a02)

  Location is heap block of size 120 at 0x7b2000008380 allocated by thread T3:
    #0 calloc ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:672 (libtsan.so.0+0x31edc)
    #1 flb_calloc /home/taka/git/fluent-bit/include/fluent-bit/flb_mem.h:89 (fluent-bit+0x2272f4)
    #2 stream_group_create /home/taka/git/fluent-bit/src/multiline/flb_ml_stream.c:52 (fluent-bit+0x22c3fb)
    #3 flb_ml_stream_group_get /home/taka/git/fluent-bit/src/multiline/flb_ml_stream.c:126 (fluent-bit+0x22c799)
    #4 package_content /home/taka/git/fluent-bit/src/multiline/flb_ml.c:229 (fluent-bit+0x190bf6)
    #5 process_append /home/taka/git/fluent-bit/src/multiline/flb_ml.c:499 (fluent-bit+0x191782)
    #6 ml_append_try_parser /home/taka/git/fluent-bit/src/multiline/flb_ml.c:659 (fluent-bit+0x191f1a)
    #7 flb_ml_append /home/taka/git/fluent-bit/src/multiline/flb_ml.c:699 (fluent-bit+0x1920f7)
    #8 process_content /home/taka/git/fluent-bit/plugins/in_tail/tail_file.c:456 (fluent-bit+0x342854)
    #9 flb_tail_file_chunk /home/taka/git/fluent-bit/plugins/in_tail/tail_file.c:1341 (fluent-bit+0x346ef7)
    #10 in_tail_collect_static /home/taka/git/fluent-bit/plugins/in_tail/tail.c:188 (fluent-bit+0x324ee3)
    #11 input_collector_fd /home/taka/git/fluent-bit/src/flb_input_thread.c:168 (fluent-bit+0xe0369)
    #12 engine_handle_event /home/taka/git/fluent-bit/src/flb_input_thread.c:183 (fluent-bit+0xe10e5)
    #13 input_thread /home/taka/git/fluent-bit/src/flb_input_thread.c:384 (fluent-bit+0xe10e5)
    #14 step_callback /home/taka/git/fluent-bit/src/flb_worker.c:43 (fluent-bit+0x157a02)

  Thread T1 'flb-pipeline' (tid=15220, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
    #1 mk_utils_worker_spawn /home/taka/git/fluent-bit/lib/monkey/mk_core/mk_utils.c:284 (fluent-bit+0xbfb422)
    #2 flb_main /home/taka/git/fluent-bit/src/fluent-bit.c:1231 (fluent-bit+0x871d1)
    #3 main /home/taka/git/fluent-bit/src/fluent-bit.c:1257 (fluent-bit+0x8732e)

  Thread T3 'flb-in-tail.0-w' (tid=15222, running) created by thread T1 at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
    #1 mk_utils_worker_spawn /home/taka/git/fluent-bit/lib/monkey/mk_core/mk_utils.c:284 (fluent-bit+0xbfb422)
    #2 flb_tp_thread_start /home/taka/git/fluent-bit/src/flb_thread_pool.c:123 (fluent-bit+0x170237)
    #3 flb_input_thread_instance_init /home/taka/git/fluent-bit/src/flb_input_thread.c:543 (fluent-bit+0xe1cca)
    #4 flb_input_instance_init /home/taka/git/fluent-bit/src/flb_input.c:1129 (fluent-bit+0xd73a4)
    #5 flb_input_init_all /home/taka/git/fluent-bit/src/flb_input.c:1216 (fluent-bit+0xd7877)
    #6 flb_engine_start /home/taka/git/fluent-bit/src/flb_engine.c:717 (fluent-bit+0x1252fe)
    #7 flb_lib_worker /home/taka/git/fluent-bit/src/flb_lib.c:629 (fluent-bit+0xa0ccd)

SUMMARY: ThreadSanitizer: data race /home/taka/git/fluent-bit/src/multiline/flb_ml.c:1025 in flb_ml_flush_stream_group
==================

Signed-off-by: Takahiro Yamashita <[email protected]>
  • Loading branch information
nokute78 committed Feb 1, 2023
1 parent b0b655d commit d5fc4e9
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 3 deletions.
3 changes: 3 additions & 0 deletions include/fluent-bit/multiline/flb_ml.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <fluent-bit/flb_mp.h>
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_parser.h>
#include <fluent-bit/flb_pthread.h>

/* Types available */
#define FLB_ML_REGEX 1 /* pattern is a regular expression */
Expand Down Expand Up @@ -101,6 +102,8 @@ struct flb_ml_stream_group {
msgpack_packer mp_pck; /* temporary msgpack packer */
struct flb_time mp_time; /* multiline time parsed from first line */

pthread_mutex_t pth_mutex;

struct mk_list _head;
};

Expand Down
21 changes: 18 additions & 3 deletions src/multiline/flb_ml.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_scheduler.h>
#include <fluent-bit/flb_pthread.h>
#include <fluent-bit/multiline/flb_ml.h>
#include <fluent-bit/multiline/flb_ml_rule.h>
#include <fluent-bit/multiline/flb_ml_group.h>
Expand Down Expand Up @@ -113,7 +114,9 @@ void flb_ml_flush_parser_instance(struct flb_ml *ml,
/* Iterate stream groups */
mk_list_foreach(head_group, &mst->groups) {
group = mk_list_entry(head_group, struct flb_ml_stream_group, _head);
pthread_mutex_lock(&group->pth_mutex);
flb_ml_flush_stream_group(parser_i->ml_parser, mst, group, forced_flush);
pthread_mutex_unlock(&group->pth_mutex);
}
}
}
Expand Down Expand Up @@ -227,6 +230,7 @@ static int package_content(struct flb_ml_stream *mst,

/* Get stream group */
stream_group = flb_ml_stream_group_get(mst->parser, mst, val_group);
pthread_mutex_lock(&stream_group->pth_mutex);
if (!mst->last_stream_group) {
mst->last_stream_group = stream_group;
}
Expand All @@ -236,6 +240,7 @@ static int package_content(struct flb_ml_stream *mst,
mst->last_stream_group = stream_group;
}
}
pthread_mutex_unlock(&stream_group->pth_mutex);

/* Set the parser type */
type = parser->type;
Expand All @@ -254,9 +259,11 @@ static int package_content(struct flb_ml_stream *mst,

}
if (type == FLB_ML_REGEX) {
pthread_mutex_lock(&stream_group->pth_mutex);
ret = flb_ml_rule_process(parser, mst,
stream_group, full_map, buf, size, tm,
val_content, val_pattern);
pthread_mutex_unlock(&stream_group->pth_mutex);
if (ret == -1) {
processed = FLB_FALSE;
}
Expand All @@ -276,7 +283,7 @@ static int package_content(struct flb_ml_stream *mst,
else {
rule_match = match_negate(parser, FLB_FALSE);
}

pthread_mutex_lock(&stream_group->pth_mutex);
if (stream_group->mp_sbuf.size == 0) {
flb_ml_register_context(stream_group, tm, full_map);
}
Expand All @@ -298,6 +305,7 @@ static int package_content(struct flb_ml_stream *mst,
if (rule_match) {
flb_ml_flush_stream_group(parser, mst, stream_group, FLB_FALSE);
}
pthread_mutex_unlock(&stream_group->pth_mutex);
processed = FLB_TRUE;
}
}
Expand All @@ -310,7 +318,7 @@ static int package_content(struct flb_ml_stream *mst,
else {
rule_match = match_negate(parser, FLB_FALSE);
}

pthread_mutex_lock(&stream_group->pth_mutex);
if (stream_group->mp_sbuf.size == 0) {
flb_ml_register_context(stream_group, tm, full_map);
}
Expand All @@ -332,6 +340,7 @@ static int package_content(struct flb_ml_stream *mst,
if (rule_match) {
flb_ml_flush_stream_group(parser, mst, stream_group, FLB_FALSE);
}
pthread_mutex_unlock(&stream_group->pth_mutex);
processed = FLB_TRUE;
}

Expand All @@ -342,13 +351,16 @@ static int package_content(struct flb_ml_stream *mst,
* process it as a raw text generating a single record with the given
* content.
*/

if (!processed && type == FLB_ML_TYPE_TEXT) {
pthread_mutex_lock(&stream_group->pth_mutex);
flb_ml_flush_stream_group(parser, mst, stream_group, FLB_FALSE);

/* Concatenate value */
flb_sds_cat_safe(&stream_group->buf, buf, size);
breakline_prepare(parser_i, stream_group);
flb_ml_flush_stream_group(parser, mst, stream_group, FLB_FALSE);
pthread_mutex_unlock(&stream_group->pth_mutex);
}
else {
return FLB_FALSE;
Expand Down Expand Up @@ -762,7 +774,9 @@ int flb_ml_append(struct flb_ml *ml, uint64_t stream_id,
/* Get stream group */
st_group = flb_ml_stream_group_get(mst->parser, mst, NULL);
flb_sds_cat_safe(&st_group->buf, buf, size);
pthread_mutex_lock(&st_group->pth_mutex);
flb_ml_flush_stream_group(parser_i->ml_parser, mst, st_group, FLB_FALSE);
pthread_mutex_unlock(&st_group->pth_mutex);
}

return 0;
Expand Down Expand Up @@ -874,7 +888,7 @@ int flb_ml_append_object(struct flb_ml *ml, uint64_t stream_id,

/* Get stream group */
st_group = flb_ml_stream_group_get(mst->parser, mst, NULL);

pthread_mutex_lock(&st_group->pth_mutex);
/* Append record content to group msgpack buffer */
msgpack_pack_array(&st_group->mp_pck, 2);

Expand All @@ -892,6 +906,7 @@ int flb_ml_append_object(struct flb_ml *ml, uint64_t stream_id,

/* Update last flush time */
st_group->last_flush = time_ms_now();
pthread_mutex_unlock(&st_group->pth_mutex);
}

return 0;
Expand Down
2 changes: 2 additions & 0 deletions src/multiline/flb_ml_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ static struct flb_ml_stream_group *stream_group_create(struct flb_ml_stream *mst
/* status */
group->first_line = FLB_TRUE;

pthread_mutex_init(&group->pth_mutex, NULL);

/* multiline buffer */
group->buf = flb_sds_create_size(FLB_ML_BUF_SIZE);
if (!group->buf) {
Expand Down

0 comments on commit d5fc4e9

Please sign in to comment.