Skip to content

Commit

Permalink
in_forward: Synchronize the status of pause/resume with mutex
Browse files Browse the repository at this point in the history
Signed-off-by: Hiroshi Hatake <[email protected]>
  • Loading branch information
cosmo0920 committed Dec 9, 2024
1 parent e2ca174 commit 65c5bc2
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 20 deletions.
48 changes: 28 additions & 20 deletions plugins/in_forward/fw.c
Original file line number Diff line number Diff line change
Expand Up @@ -351,22 +351,27 @@ static int in_fw_init(struct flb_input_instance *ins,

ctx->coll_fd = ret;

pthread_mutex_init(&ctx->conn_mutex, NULL);

return 0;
}

static void in_fw_pause(void *data, struct flb_config *config)
{
struct flb_in_fw_config *ctx = data;
if (config->is_running == FLB_TRUE) {
/*
* This is the case when we are not in a shutdown phase, but
* backpressure built up, and the plugin needs to
* pause the ingestion. The plugin should close all the connections
* and wait for the ingestion to resume.
*/
flb_input_collector_pause(ctx->coll_fd, ctx->ins);
fw_conn_del_all(ctx);
ctx->is_paused = FLB_TRUE;
if (pthread_mutex_lock(&ctx->conn_mutex)) {
/*
* This is the case when we are not in a shutdown phase, but
* backpressure built up, and the plugin needs to
* pause the ingestion. The plugin should close all the connections
* and wait for the ingestion to resume.
*/
flb_input_collector_pause(ctx->coll_fd, ctx->ins);
fw_conn_del_all(ctx);
ctx->is_paused = FLB_TRUE;
}
pthread_mutex_unlock(&ctx->conn_mutex);
}

/*
Expand All @@ -387,23 +392,26 @@ static void in_fw_resume(void *data, struct flb_config *config) {
struct fw_conn *conn;
struct flb_in_fw_config *ctx = data;
if (config->is_running == FLB_TRUE) {
connection = flb_downstream_conn_get(ctx->downstream);
if (pthread_mutex_lock(&ctx->conn_mutex)) {
connection = flb_downstream_conn_get(ctx->downstream);
if (connection == NULL) {
flb_plg_error(ctx->ins, "could not accept new connection");

if (connection == NULL) {
flb_plg_error(ctx->ins, "could not accept new connection");
return;
}

return;
}
conn = fw_conn_add(connection, ctx);
conn = fw_conn_add(connection, ctx);
if (!conn) {
flb_plg_error(ctx->ins, "could not add connection");

if (!conn) {
flb_plg_error(ctx->ins, "could not add connection");
return;
}

return;
flb_input_collector_resume(ctx->coll_fd, ctx->ins);
ctx->is_paused = FLB_FALSE;
}
pthread_mutex_unlock(&ctx->conn_mutex);

ctx->is_paused = FLB_FALSE;
flb_input_collector_resume(ctx->coll_fd, ctx->ins);
}
}

Expand Down
2 changes: 2 additions & 0 deletions plugins/in_forward/fw.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ struct flb_in_fw_config {
struct flb_log_event_decoder *log_decoder;
struct flb_log_event_encoder *log_encoder;

pthread_mutex_t conn_mutex;

/* Plugin is paused */
int is_paused;
};
Expand Down

0 comments on commit 65c5bc2

Please sign in to comment.