From ea70cf98b0bbafd350ad6813c9bce445e5f9b0e1 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 19 Nov 2024 15:25:15 +0900 Subject: [PATCH] in_forward: Synchronize the status of pause/resume with mutex Signed-off-by: Hiroshi Hatake --- plugins/in_forward/fw.c | 48 ++++++++++++++++++++++++----------------- plugins/in_forward/fw.h | 2 ++ 2 files changed, 30 insertions(+), 20 deletions(-) diff --git a/plugins/in_forward/fw.c b/plugins/in_forward/fw.c index 1b4b3436e9d..31f37c771bc 100644 --- a/plugins/in_forward/fw.c +++ b/plugins/in_forward/fw.c @@ -351,6 +351,8 @@ static int in_fw_init(struct flb_input_instance *ins, ctx->coll_fd = ret; + pthread_mutex_init(&ctx->conn_mutex, NULL); + return 0; } @@ -358,15 +360,18 @@ static void in_fw_pause(void *data, struct flb_config *config) { struct flb_in_fw_config *ctx = data; if (config->is_running == FLB_TRUE) { - /* - * This is the case when we are not in a shutdown phase, but - * backpressure built up, and the plugin needs to - * pause the ingestion. The plugin should close all the connections - * and wait for the ingestion to resume. - */ - flb_input_collector_pause(ctx->coll_fd, ctx->ins); - fw_conn_del_all(ctx); - ctx->is_paused = FLB_TRUE; + if (pthread_mutex_lock(&ctx->conn_mutex)) { + /* + * This is the case when we are not in a shutdown phase, but + * backpressure built up, and the plugin needs to + * pause the ingestion. The plugin should close all the connections + * and wait for the ingestion to resume. + */ + flb_input_collector_pause(ctx->coll_fd, ctx->ins); + fw_conn_del_all(ctx); + ctx->is_paused = FLB_TRUE; + } + pthread_mutex_unlock(&ctx->conn_mutex); } /* @@ -387,23 +392,26 @@ static void in_fw_resume(void *data, struct flb_config *config) { struct fw_conn *conn; struct flb_in_fw_config *ctx = data; if (config->is_running == FLB_TRUE) { - connection = flb_downstream_conn_get(ctx->downstream); + if (pthread_mutex_lock(&ctx->conn_mutex)) { + connection = flb_downstream_conn_get(ctx->downstream); + if (connection == NULL) { + flb_plg_error(ctx->ins, "could not accept new connection"); - if (connection == NULL) { - flb_plg_error(ctx->ins, "could not accept new connection"); + return; + } - return; - } - conn = fw_conn_add(connection, ctx); + conn = fw_conn_add(connection, ctx); + if (!conn) { + flb_plg_error(ctx->ins, "could not add connection"); - if (!conn) { - flb_plg_error(ctx->ins, "could not add connection"); + return; + } - return; + flb_input_collector_resume(ctx->coll_fd, ctx->ins); + ctx->is_paused = FLB_FALSE; } + pthread_mutex_unlock(&ctx->conn_mutex); - ctx->is_paused = FLB_FALSE; - flb_input_collector_resume(ctx->coll_fd, ctx->ins); } } diff --git a/plugins/in_forward/fw.h b/plugins/in_forward/fw.h index 4cd270e27a2..dd675d93dd5 100644 --- a/plugins/in_forward/fw.h +++ b/plugins/in_forward/fw.h @@ -72,6 +72,8 @@ struct flb_in_fw_config { struct flb_log_event_decoder *log_decoder; struct flb_log_event_encoder *log_encoder; + pthread_mutex_t conn_mutex; + /* Plugin is paused */ int is_paused; };