Skip to content

Commit

Permalink
in_forward: Synchronize the status of pause/resume with mutex
Browse files Browse the repository at this point in the history
Signed-off-by: Hiroshi Hatake <[email protected]>
  • Loading branch information
cosmo0920 committed Nov 19, 2024
1 parent eeec6b9 commit ea70cf9
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 20 deletions.
48 changes: 28 additions & 20 deletions plugins/in_forward/fw.c
Original file line number Diff line number Diff line change
Expand Up @@ -351,22 +351,27 @@ static int in_fw_init(struct flb_input_instance *ins,

ctx->coll_fd = ret;

pthread_mutex_init(&ctx->conn_mutex, NULL);

return 0;
}

static void in_fw_pause(void *data, struct flb_config *config)
{
struct flb_in_fw_config *ctx = data;
if (config->is_running == FLB_TRUE) {
/*
* This is the case when we are not in a shutdown phase, but
* backpressure built up, and the plugin needs to
* pause the ingestion. The plugin should close all the connections
* and wait for the ingestion to resume.
*/
flb_input_collector_pause(ctx->coll_fd, ctx->ins);
fw_conn_del_all(ctx);
ctx->is_paused = FLB_TRUE;
if (pthread_mutex_lock(&ctx->conn_mutex)) {
/*
* This is the case when we are not in a shutdown phase, but
* backpressure built up, and the plugin needs to
* pause the ingestion. The plugin should close all the connections
* and wait for the ingestion to resume.
*/
flb_input_collector_pause(ctx->coll_fd, ctx->ins);
fw_conn_del_all(ctx);
ctx->is_paused = FLB_TRUE;
}
pthread_mutex_unlock(&ctx->conn_mutex);
}

/*
Expand All @@ -387,23 +392,26 @@ static void in_fw_resume(void *data, struct flb_config *config) {
struct fw_conn *conn;
struct flb_in_fw_config *ctx = data;
if (config->is_running == FLB_TRUE) {
connection = flb_downstream_conn_get(ctx->downstream);
if (pthread_mutex_lock(&ctx->conn_mutex)) {
connection = flb_downstream_conn_get(ctx->downstream);
if (connection == NULL) {
flb_plg_error(ctx->ins, "could not accept new connection");

if (connection == NULL) {
flb_plg_error(ctx->ins, "could not accept new connection");
return;
}

return;
}
conn = fw_conn_add(connection, ctx);
conn = fw_conn_add(connection, ctx);
if (!conn) {
flb_plg_error(ctx->ins, "could not add connection");

if (!conn) {
flb_plg_error(ctx->ins, "could not add connection");
return;
}

return;
flb_input_collector_resume(ctx->coll_fd, ctx->ins);
ctx->is_paused = FLB_FALSE;
}
pthread_mutex_unlock(&ctx->conn_mutex);

ctx->is_paused = FLB_FALSE;
flb_input_collector_resume(ctx->coll_fd, ctx->ins);
}
}

Expand Down
2 changes: 2 additions & 0 deletions plugins/in_forward/fw.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ struct flb_in_fw_config {
struct flb_log_event_decoder *log_decoder;
struct flb_log_event_encoder *log_encoder;

pthread_mutex_t conn_mutex;

/* Plugin is paused */
int is_paused;
};
Expand Down

0 comments on commit ea70cf9

Please sign in to comment.