From e2ca174d912fb8f49c133e55bd485200a6f8926f Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 18 Nov 2024 19:10:48 +0900 Subject: [PATCH 1/4] in_forward: Recreate connection when resumed Signed-off-by: Hiroshi Hatake --- plugins/in_forward/fw.c | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/plugins/in_forward/fw.c b/plugins/in_forward/fw.c index e692f674cd1..8b98b56b010 100644 --- a/plugins/in_forward/fw.c +++ b/plugins/in_forward/fw.c @@ -383,8 +383,25 @@ static void in_fw_pause(void *data, struct flb_config *config) } static void in_fw_resume(void *data, struct flb_config *config) { + struct flb_connection *connection; + struct fw_conn *conn; struct flb_in_fw_config *ctx = data; if (config->is_running == FLB_TRUE) { + connection = flb_downstream_conn_get(ctx->downstream); + + if (connection == NULL) { + flb_plg_error(ctx->ins, "could not accept new connection"); + + return; + } + conn = fw_conn_add(connection, ctx); + + if (!conn) { + flb_plg_error(ctx->ins, "could not add connection"); + + return; + } + ctx->is_paused = FLB_FALSE; flb_input_collector_resume(ctx->coll_fd, ctx->ins); } From 65c5bc20aff05a9f7f666b85084793979c4d1d13 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 19 Nov 2024 15:25:15 +0900 Subject: [PATCH 2/4] in_forward: Synchronize the status of pause/resume with mutex Signed-off-by: Hiroshi Hatake --- plugins/in_forward/fw.c | 48 ++++++++++++++++++++++++----------------- plugins/in_forward/fw.h | 2 ++ 2 files changed, 30 insertions(+), 20 deletions(-) diff --git a/plugins/in_forward/fw.c b/plugins/in_forward/fw.c index 8b98b56b010..faefc4d82a8 100644 --- a/plugins/in_forward/fw.c +++ b/plugins/in_forward/fw.c @@ -351,6 +351,8 @@ static int in_fw_init(struct flb_input_instance *ins, ctx->coll_fd = ret; + pthread_mutex_init(&ctx->conn_mutex, NULL); + return 0; } @@ -358,15 +360,18 @@ static void in_fw_pause(void *data, struct flb_config *config) { struct flb_in_fw_config *ctx = data; if (config->is_running == FLB_TRUE) { - /* - * This is the case when we are not in a shutdown phase, but - * backpressure built up, and the plugin needs to - * pause the ingestion. The plugin should close all the connections - * and wait for the ingestion to resume. - */ - flb_input_collector_pause(ctx->coll_fd, ctx->ins); - fw_conn_del_all(ctx); - ctx->is_paused = FLB_TRUE; + if (pthread_mutex_lock(&ctx->conn_mutex)) { + /* + * This is the case when we are not in a shutdown phase, but + * backpressure built up, and the plugin needs to + * pause the ingestion. The plugin should close all the connections + * and wait for the ingestion to resume. + */ + flb_input_collector_pause(ctx->coll_fd, ctx->ins); + fw_conn_del_all(ctx); + ctx->is_paused = FLB_TRUE; + } + pthread_mutex_unlock(&ctx->conn_mutex); } /* @@ -387,23 +392,26 @@ static void in_fw_resume(void *data, struct flb_config *config) { struct fw_conn *conn; struct flb_in_fw_config *ctx = data; if (config->is_running == FLB_TRUE) { - connection = flb_downstream_conn_get(ctx->downstream); + if (pthread_mutex_lock(&ctx->conn_mutex)) { + connection = flb_downstream_conn_get(ctx->downstream); + if (connection == NULL) { + flb_plg_error(ctx->ins, "could not accept new connection"); - if (connection == NULL) { - flb_plg_error(ctx->ins, "could not accept new connection"); + return; + } - return; - } - conn = fw_conn_add(connection, ctx); + conn = fw_conn_add(connection, ctx); + if (!conn) { + flb_plg_error(ctx->ins, "could not add connection"); - if (!conn) { - flb_plg_error(ctx->ins, "could not add connection"); + return; + } - return; + flb_input_collector_resume(ctx->coll_fd, ctx->ins); + ctx->is_paused = FLB_FALSE; } + pthread_mutex_unlock(&ctx->conn_mutex); - ctx->is_paused = FLB_FALSE; - flb_input_collector_resume(ctx->coll_fd, ctx->ins); } } diff --git a/plugins/in_forward/fw.h b/plugins/in_forward/fw.h index 032371b7d3a..d51e1c47925 100644 --- a/plugins/in_forward/fw.h +++ b/plugins/in_forward/fw.h @@ -73,6 +73,8 @@ struct flb_in_fw_config { struct flb_log_event_decoder *log_decoder; struct flb_log_event_encoder *log_encoder; + pthread_mutex_t conn_mutex; + /* Plugin is paused */ int is_paused; }; From 5138f8aee0701b3b43c52ceb9affb103ff976a4b Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 9 Dec 2024 20:33:48 +0900 Subject: [PATCH 3/4] in_forward: Remove a needless add event for handling sockets Signed-off-by: Hiroshi Hatake --- plugins/in_forward/fw.c | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/plugins/in_forward/fw.c b/plugins/in_forward/fw.c index faefc4d82a8..6618d948323 100644 --- a/plugins/in_forward/fw.c +++ b/plugins/in_forward/fw.c @@ -388,30 +388,13 @@ static void in_fw_pause(void *data, struct flb_config *config) } static void in_fw_resume(void *data, struct flb_config *config) { - struct flb_connection *connection; - struct fw_conn *conn; struct flb_in_fw_config *ctx = data; if (config->is_running == FLB_TRUE) { if (pthread_mutex_lock(&ctx->conn_mutex)) { - connection = flb_downstream_conn_get(ctx->downstream); - if (connection == NULL) { - flb_plg_error(ctx->ins, "could not accept new connection"); - - return; - } - - conn = fw_conn_add(connection, ctx); - if (!conn) { - flb_plg_error(ctx->ins, "could not add connection"); - - return; - } - flb_input_collector_resume(ctx->coll_fd, ctx->ins); ctx->is_paused = FLB_FALSE; } pthread_mutex_unlock(&ctx->conn_mutex); - } } From 201fc3553026aeb60c1010224a2950bbdc8facaf Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 12 Dec 2024 14:40:19 +0900 Subject: [PATCH 4/4] in_forward: Synchronize is_paused state and deleting conns Signed-off-by: Hiroshi Hatake --- plugins/in_forward/fw.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/plugins/in_forward/fw.c b/plugins/in_forward/fw.c index 6618d948323..1e4d83089af 100644 --- a/plugins/in_forward/fw.c +++ b/plugins/in_forward/fw.c @@ -360,14 +360,14 @@ static void in_fw_pause(void *data, struct flb_config *config) { struct flb_in_fw_config *ctx = data; if (config->is_running == FLB_TRUE) { + /* + * This is the case when we are not in a shutdown phase, but + * backpressure built up, and the plugin needs to + * pause the ingestion. The plugin should close all the connections + * and wait for the ingestion to resume. + */ + flb_input_collector_pause(ctx->coll_fd, ctx->ins); if (pthread_mutex_lock(&ctx->conn_mutex)) { - /* - * This is the case when we are not in a shutdown phase, but - * backpressure built up, and the plugin needs to - * pause the ingestion. The plugin should close all the connections - * and wait for the ingestion to resume. - */ - flb_input_collector_pause(ctx->coll_fd, ctx->ins); fw_conn_del_all(ctx); ctx->is_paused = FLB_TRUE; } @@ -390,8 +390,8 @@ static void in_fw_pause(void *data, struct flb_config *config) static void in_fw_resume(void *data, struct flb_config *config) { struct flb_in_fw_config *ctx = data; if (config->is_running == FLB_TRUE) { + flb_input_collector_resume(ctx->coll_fd, ctx->ins); if (pthread_mutex_lock(&ctx->conn_mutex)) { - flb_input_collector_resume(ctx->coll_fd, ctx->ins); ctx->is_paused = FLB_FALSE; } pthread_mutex_unlock(&ctx->conn_mutex);