diff --git a/include/fluent-bit/flb_log.h b/include/fluent-bit/flb_log.h index 45f60d1e098..979fd612677 100644 --- a/include/fluent-bit/flb_log.h +++ b/include/fluent-bit/flb_log.h @@ -225,6 +225,7 @@ static inline int flb_log_suppress_check(int log_suppress_interval, const char * #endif int flb_log_worker_init(struct flb_worker *worker); +int flb_log_work_destroy(struct flb_worker *worker); int flb_errno_print(int errnum, const char *file, int line); #ifdef __FLB_FILENAME__ diff --git a/include/fluent-bit/flb_scheduler.h b/include/fluent-bit/flb_scheduler.h index 52c72c019d3..c744d01a80f 100644 --- a/include/fluent-bit/flb_scheduler.h +++ b/include/fluent-bit/flb_scheduler.h @@ -127,7 +127,7 @@ int flb_sched_event_handler(struct flb_config *config, struct mk_event *event); struct flb_sched *flb_sched_create(struct flb_config *config, struct mk_event_loop *evl); -int flb_sched_destroy(struct flb_sched *sched); +int flb_sched_destroy(struct mk_event_loop *evl, struct flb_sched *sched); struct flb_sched_timer *flb_sched_timer_create(struct flb_sched *sched); int flb_sched_timer_destroy(struct flb_sched_timer *timer); diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index f56f4d6b15c..44e91b7401b 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -103,7 +103,6 @@ struct flb_in_calyptia_fleet_config { }; static int get_calyptia_files(struct flb_in_calyptia_fleet_config *ctx, - struct flb_connection *u_conn, const char *url, time_t timestamp); @@ -878,14 +877,20 @@ static flb_sds_t get_project_id_from_api_key(struct flb_in_calyptia_fleet_config } static struct flb_http_client *fleet_http_do(struct flb_in_calyptia_fleet_config *ctx, - struct flb_connection *u_conn, flb_sds_t url) { struct flb_http_client *client; + struct flb_connection *u_conn; size_t b_sent; int ret = -1; - if (ctx == NULL || u_conn == NULL || url == NULL) { + if (ctx == NULL || url == NULL) { + return NULL; + } + + u_conn = flb_upstream_conn_get(ctx->u); + if (u_conn == NULL) { + flb_plg_error(ctx->ins, "unable to get upstream connection"); return NULL; } @@ -920,23 +925,24 @@ static struct flb_http_client *fleet_http_do(struct flb_in_calyptia_fleet_config goto http_do_error; } + flb_upstream_conn_release(u_conn); return client; http_do_error: flb_http_client_destroy(client); http_client_error: + flb_upstream_conn_release(u_conn); return NULL; } static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ctx, - struct flb_connection *u_conn, struct flb_config *config) { struct flb_http_client *client; flb_sds_t url; flb_sds_t project_id; - if (ctx == NULL || u_conn == NULL || config == NULL) { + if (ctx == NULL || config == NULL) { return -1; } @@ -955,7 +961,7 @@ static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ct flb_sds_printf(&url, "/v1/search?project_id=%s&resource=fleet&term=%s", project_id, ctx->fleet_name); - client = fleet_http_do(ctx, u_conn, url); + client = fleet_http_do(ctx, url); flb_sds_destroy(url); if (!client) { @@ -981,7 +987,6 @@ static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ct } static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx, - struct flb_connection *u_conn, flb_sds_t url, const char *hdr, const char *dst, @@ -997,11 +1002,11 @@ static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx, time_t last_modified; flb_sds_t fname; - if (ctx == NULL || u_conn == NULL || url == NULL) { + if (ctx == NULL || url == NULL) { return -1; } - client = fleet_http_do(ctx, u_conn, url); + client = fleet_http_do(ctx, url); if (client == NULL) { return -1; @@ -1740,8 +1745,7 @@ static int create_fleet_header(struct flb_in_calyptia_fleet_config *ctx) return rc; } -static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx, - struct flb_connection *u_conn) +static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx) { flb_sds_t cfgname; flb_sds_t cfgnewname; @@ -1778,14 +1782,14 @@ static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx, flb_sds_destroy(hdrname); /* create the base file. */ - ret = get_calyptia_file(ctx, u_conn, ctx->fleet_url, header, + ret = get_calyptia_file(ctx, ctx->fleet_url, header, NULL, &time_last_modified); flb_sds_destroy(header); /* new file created! */ if (ret == 1) { - get_calyptia_files(ctx, u_conn, ctx->fleet_files_url, time_last_modified); + get_calyptia_files(ctx, ctx->fleet_files_url, time_last_modified); cfgname = time_fleet_config_filename(ctx, time_last_modified); @@ -1820,30 +1824,20 @@ static int in_calyptia_fleet_collect(struct flb_input_instance *ins, void *in_context) { struct flb_in_calyptia_fleet_config *ctx = in_context; - struct flb_connection *u_conn; int ret = -1; - u_conn = flb_upstream_conn_get(ctx->u); - - if (!u_conn) { - flb_plg_error(ctx->ins, "could not get an upstream connection to %s:%u", - ctx->ins->host.name, ctx->ins->host.port); - goto conn_error; - } if (ctx->fleet_id == NULL) { - if (get_calyptia_fleet_id_by_name(ctx, u_conn, config) == -1) { + if (get_calyptia_fleet_id_by_name(ctx, config) == -1) { flb_plg_error(ctx->ins, "unable to find fleet: %s", ctx->fleet_name); - goto calyptia_error; - } + goto fleet_id_error; + } } - ret = get_calyptia_fleet_config(ctx, u_conn); + ret = get_calyptia_fleet_config(ctx); -calyptia_error: - flb_upstream_conn_release(u_conn); -conn_error: +fleet_id_error: FLB_INPUT_RETURN(ret); } @@ -2104,18 +2098,17 @@ static int create_fleet_files(struct flb_in_calyptia_fleet_config *ctx, } static int get_calyptia_files(struct flb_in_calyptia_fleet_config *ctx, - struct flb_connection *u_conn, const char *url, time_t timestamp) { struct flb_http_client *client; int ret = -1; - if (ctx == NULL || u_conn == NULL || url == NULL) { + if (ctx == NULL || url == NULL) { return -1; } - client = fleet_http_do(ctx, u_conn, ctx->fleet_files_url); + client = fleet_http_do(ctx, ctx->fleet_files_url); if (client == NULL) { return -1; diff --git a/src/flb_config.c b/src/flb_config.c index 94612cd42bb..9aa38dad756 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -478,7 +478,7 @@ void flb_config_exit(struct flb_config *config) /* Release scheduler */ if (config->sched) { - flb_sched_destroy(config->sched); + flb_sched_destroy(config->evl, config->sched); } #ifdef FLB_HAVE_HTTP_SERVER diff --git a/src/flb_engine.c b/src/flb_engine.c index 48338367f78..e7e2b7cdd09 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -726,9 +726,9 @@ int flb_engine_start(struct flb_config *config) * to the local event loop 'evl'. */ ret = mk_event_channel_create(config->evl, - &config->ch_self_events[0], - &config->ch_self_events[1], - &config->event_thread_init); + &config->ch_self_events[0], + &config->ch_self_events[1], + &config->event_thread_init); if (ret == -1) { flb_error("[engine] could not create engine thread channel"); return -1; @@ -1135,6 +1135,12 @@ int flb_engine_shutdown(struct flb_config *config) flb_hs_destroy(config->http_ctx); } #endif + if (config->evl) { + mk_event_channel_destroy(config->evl, + config->ch_self_events[0], + config->ch_self_events[1], + &config->event_thread_init); + } return 0; } diff --git a/src/flb_input_thread.c b/src/flb_input_thread.c index bf073296de2..61b6ec77eff 100644 --- a/src/flb_input_thread.c +++ b/src/flb_input_thread.c @@ -457,7 +457,7 @@ static void input_thread(void *data) /* Create the bucket queue (FLB_ENGINE_PRIORITY_COUNT priorities) */ flb_bucket_queue_destroy(evl_bktq); - flb_sched_destroy(sched); + flb_sched_destroy(thi->evl, sched); input_thread_instance_destroy(thi); } diff --git a/src/flb_log.c b/src/flb_log.c index 9ef027aca0f..9336821c7fe 100644 --- a/src/flb_log.c +++ b/src/flb_log.c @@ -303,6 +303,12 @@ int flb_log_cache_check_suppress(struct flb_log_cache *cache, char *msg_buf, siz return FLB_TRUE; } +int flb_log_work_destroy(struct flb_worker *worker) +{ + flb_pipe_destroy(worker->log); + return 0; +} + int flb_log_worker_init(struct flb_worker *worker) { int ret; @@ -689,6 +695,7 @@ int flb_log_destroy(struct flb_log *log, struct flb_config *config) if (log->worker->log_cache) { flb_log_cache_destroy(log->worker->log_cache); } + flb_log_work_destroy(log->worker); flb_free(log->worker); flb_free(log); diff --git a/src/flb_output_thread.c b/src/flb_output_thread.c index 62cc5a04716..9cc5ec0c659 100644 --- a/src/flb_output_thread.c +++ b/src/flb_output_thread.c @@ -344,6 +344,10 @@ static void output_thread(void *data) } } + mk_event_channel_destroy(th_ins->evl, + th_ins->ch_thread_events[0], + th_ins->ch_thread_events[1], + &event_local); /* * Final cleanup, destroy all resources associated with: * @@ -358,11 +362,17 @@ static void output_thread(void *data) flb_upstream_conn_active_destroy_list(&th_ins->upstreams); flb_upstream_conn_pending_destroy_list(&th_ins->upstreams); - flb_sched_destroy(sched); + flb_sched_destroy(th_ins->evl, sched); params = FLB_TLS_GET(out_flush_params); if (params) { flb_free(params); } + + mk_event_channel_destroy(th_ins->evl, + th_ins->ch_parent_events[0], + th_ins->ch_parent_events[1], + th_ins); + mk_event_loop_destroy(th_ins->evl); flb_bucket_queue_destroy(th_ins->evl_bktq); diff --git a/src/flb_scheduler.c b/src/flb_scheduler.c index 7e42550e5c3..82b1629d575 100644 --- a/src/flb_scheduler.c +++ b/src/flb_scheduler.c @@ -575,7 +575,7 @@ struct flb_sched *flb_sched_create(struct flb_config *config, } /* Release all resources used by the Scheduler */ -int flb_sched_destroy(struct flb_sched *sched) +int flb_sched_destroy(struct mk_event_loop *evl, struct flb_sched *sched) { int c = 0; struct mk_list *tmp; @@ -603,6 +603,7 @@ int flb_sched_destroy(struct flb_sched *sched) /* Delete timers */ mk_list_foreach_safe(head, tmp, &sched->timers) { timer = mk_list_entry(head, struct flb_sched_timer, _head); + mk_event_timeout_destroy(evl, &timer->event); flb_sched_timer_destroy(timer); c++; } diff --git a/src/flb_worker.c b/src/flb_worker.c index 067689596a1..6d4313d3c45 100644 --- a/src/flb_worker.c +++ b/src/flb_worker.c @@ -142,9 +142,7 @@ void flb_worker_destroy(struct flb_worker *worker) return; } - if (worker->log_cache) { - flb_log_cache_destroy(worker->log_cache); - } + flb_log_work_destroy(worker); mk_list_del(&worker->_head); flb_free(worker); diff --git a/tests/internal/multiline.c b/tests/internal/multiline.c index e175e1171aa..62fe73b023b 100644 --- a/tests/internal/multiline.c +++ b/tests/internal/multiline.c @@ -1445,7 +1445,7 @@ static void test_issue_5504() TEST_CHECK(ml->last_flush > last_flush); /* Cleanup */ - flb_sched_destroy(config->sched); + flb_sched_destroy(config->evl, config->sched); config->sched = sched; mk_event_loop_destroy(config->evl); config->evl = evl;