Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes: lots of fixes for file descriptor leaks. #8371

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/fluent-bit/flb_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ static inline int flb_log_suppress_check(int log_suppress_interval, const char *
#endif

int flb_log_worker_init(struct flb_worker *worker);
int flb_log_work_destroy(struct flb_worker *worker);
int flb_errno_print(int errnum, const char *file, int line);

#ifdef __FLB_FILENAME__
Expand Down
2 changes: 1 addition & 1 deletion include/fluent-bit/flb_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ int flb_sched_event_handler(struct flb_config *config, struct mk_event *event);
struct flb_sched *flb_sched_create(struct flb_config *config,
struct mk_event_loop *evl);

int flb_sched_destroy(struct flb_sched *sched);
int flb_sched_destroy(struct mk_event_loop *evl, struct flb_sched *sched);

struct flb_sched_timer *flb_sched_timer_create(struct flb_sched *sched);
int flb_sched_timer_destroy(struct flb_sched_timer *timer);
Expand Down
55 changes: 24 additions & 31 deletions plugins/in_calyptia_fleet/in_calyptia_fleet.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ struct flb_in_calyptia_fleet_config {
};

static int get_calyptia_files(struct flb_in_calyptia_fleet_config *ctx,
struct flb_connection *u_conn,
const char *url,
time_t timestamp);

Expand Down Expand Up @@ -878,14 +877,20 @@ static flb_sds_t get_project_id_from_api_key(struct flb_in_calyptia_fleet_config
}

static struct flb_http_client *fleet_http_do(struct flb_in_calyptia_fleet_config *ctx,
struct flb_connection *u_conn,
flb_sds_t url)
{
struct flb_http_client *client;
struct flb_connection *u_conn;
size_t b_sent;
int ret = -1;

if (ctx == NULL || u_conn == NULL || url == NULL) {
if (ctx == NULL || url == NULL) {
return NULL;
}

u_conn = flb_upstream_conn_get(ctx->u);
if (u_conn == NULL) {
flb_plg_error(ctx->ins, "unable to get upstream connection");
return NULL;
}

Expand Down Expand Up @@ -920,23 +925,24 @@ static struct flb_http_client *fleet_http_do(struct flb_in_calyptia_fleet_config
goto http_do_error;
}

flb_upstream_conn_release(u_conn);
return client;

http_do_error:
flb_http_client_destroy(client);
http_client_error:
flb_upstream_conn_release(u_conn);
return NULL;
}

static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ctx,
struct flb_connection *u_conn,
struct flb_config *config)
{
struct flb_http_client *client;
flb_sds_t url;
flb_sds_t project_id;

if (ctx == NULL || u_conn == NULL || config == NULL) {
if (ctx == NULL || config == NULL) {
return -1;
}

Expand All @@ -955,7 +961,7 @@ static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ct
flb_sds_printf(&url, "/v1/search?project_id=%s&resource=fleet&term=%s",
project_id, ctx->fleet_name);

client = fleet_http_do(ctx, u_conn, url);
client = fleet_http_do(ctx, url);
flb_sds_destroy(url);

if (!client) {
Expand All @@ -981,7 +987,6 @@ static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ct
}

static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx,
struct flb_connection *u_conn,
flb_sds_t url,
const char *hdr,
const char *dst,
Expand All @@ -997,11 +1002,11 @@ static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx,
time_t last_modified;
flb_sds_t fname;

if (ctx == NULL || u_conn == NULL || url == NULL) {
if (ctx == NULL || url == NULL) {
return -1;
}

client = fleet_http_do(ctx, u_conn, url);
client = fleet_http_do(ctx, url);

if (client == NULL) {
return -1;
Expand Down Expand Up @@ -1740,8 +1745,7 @@ static int create_fleet_header(struct flb_in_calyptia_fleet_config *ctx)
return rc;
}

static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx,
struct flb_connection *u_conn)
static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx)
{
flb_sds_t cfgname;
flb_sds_t cfgnewname;
Expand Down Expand Up @@ -1778,14 +1782,14 @@ static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx,
flb_sds_destroy(hdrname);

/* create the base file. */
ret = get_calyptia_file(ctx, u_conn, ctx->fleet_url, header,
ret = get_calyptia_file(ctx, ctx->fleet_url, header,
NULL, &time_last_modified);

flb_sds_destroy(header);

/* new file created! */
if (ret == 1) {
get_calyptia_files(ctx, u_conn, ctx->fleet_files_url, time_last_modified);
get_calyptia_files(ctx, ctx->fleet_files_url, time_last_modified);

cfgname = time_fleet_config_filename(ctx, time_last_modified);

Expand Down Expand Up @@ -1820,30 +1824,20 @@ static int in_calyptia_fleet_collect(struct flb_input_instance *ins,
void *in_context)
{
struct flb_in_calyptia_fleet_config *ctx = in_context;
struct flb_connection *u_conn;
int ret = -1;

u_conn = flb_upstream_conn_get(ctx->u);

if (!u_conn) {
flb_plg_error(ctx->ins, "could not get an upstream connection to %s:%u",
ctx->ins->host.name, ctx->ins->host.port);
goto conn_error;
}

if (ctx->fleet_id == NULL) {

if (get_calyptia_fleet_id_by_name(ctx, u_conn, config) == -1) {
if (get_calyptia_fleet_id_by_name(ctx, config) == -1) {
flb_plg_error(ctx->ins, "unable to find fleet: %s", ctx->fleet_name);
goto calyptia_error;
}
goto fleet_id_error;
}
}

ret = get_calyptia_fleet_config(ctx, u_conn);
ret = get_calyptia_fleet_config(ctx);

calyptia_error:
flb_upstream_conn_release(u_conn);
conn_error:
fleet_id_error:
FLB_INPUT_RETURN(ret);
}

Expand Down Expand Up @@ -2104,18 +2098,17 @@ static int create_fleet_files(struct flb_in_calyptia_fleet_config *ctx,
}

static int get_calyptia_files(struct flb_in_calyptia_fleet_config *ctx,
struct flb_connection *u_conn,
const char *url,
time_t timestamp)
{
struct flb_http_client *client;
int ret = -1;

if (ctx == NULL || u_conn == NULL || url == NULL) {
if (ctx == NULL || url == NULL) {
return -1;
}

client = fleet_http_do(ctx, u_conn, ctx->fleet_files_url);
client = fleet_http_do(ctx, ctx->fleet_files_url);

if (client == NULL) {
return -1;
Expand Down
2 changes: 1 addition & 1 deletion src/flb_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ void flb_config_exit(struct flb_config *config)

/* Release scheduler */
if (config->sched) {
flb_sched_destroy(config->sched);
flb_sched_destroy(config->evl, config->sched);
}

#ifdef FLB_HAVE_HTTP_SERVER
Expand Down
12 changes: 9 additions & 3 deletions src/flb_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -726,9 +726,9 @@ int flb_engine_start(struct flb_config *config)
* to the local event loop 'evl'.
*/
ret = mk_event_channel_create(config->evl,
&config->ch_self_events[0],
&config->ch_self_events[1],
&config->event_thread_init);
&config->ch_self_events[0],
&config->ch_self_events[1],
&config->event_thread_init);
if (ret == -1) {
flb_error("[engine] could not create engine thread channel");
return -1;
Expand Down Expand Up @@ -1135,6 +1135,12 @@ int flb_engine_shutdown(struct flb_config *config)
flb_hs_destroy(config->http_ctx);
}
#endif
if (config->evl) {
mk_event_channel_destroy(config->evl,
config->ch_self_events[0],
config->ch_self_events[1],
&config->event_thread_init);
}

return 0;
}
Expand Down
2 changes: 1 addition & 1 deletion src/flb_input_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ static void input_thread(void *data)

/* Create the bucket queue (FLB_ENGINE_PRIORITY_COUNT priorities) */
flb_bucket_queue_destroy(evl_bktq);
flb_sched_destroy(sched);
flb_sched_destroy(thi->evl, sched);
input_thread_instance_destroy(thi);
}

Expand Down
7 changes: 7 additions & 0 deletions src/flb_log.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,12 @@ int flb_log_cache_check_suppress(struct flb_log_cache *cache, char *msg_buf, siz
return FLB_TRUE;
}

int flb_log_work_destroy(struct flb_worker *worker)
{
flb_pipe_destroy(worker->log);
return 0;
}

int flb_log_worker_init(struct flb_worker *worker)
{
int ret;
Expand Down Expand Up @@ -689,6 +695,7 @@ int flb_log_destroy(struct flb_log *log, struct flb_config *config)
if (log->worker->log_cache) {
flb_log_cache_destroy(log->worker->log_cache);
}
flb_log_work_destroy(log->worker);
flb_free(log->worker);
flb_free(log);

Expand Down
12 changes: 11 additions & 1 deletion src/flb_output_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,10 @@ static void output_thread(void *data)
}
}

mk_event_channel_destroy(th_ins->evl,
th_ins->ch_thread_events[0],
th_ins->ch_thread_events[1],
&event_local);
/*
* Final cleanup, destroy all resources associated with:
*
Expand All @@ -358,11 +362,17 @@ static void output_thread(void *data)
flb_upstream_conn_active_destroy_list(&th_ins->upstreams);
flb_upstream_conn_pending_destroy_list(&th_ins->upstreams);

flb_sched_destroy(sched);
flb_sched_destroy(th_ins->evl, sched);
params = FLB_TLS_GET(out_flush_params);
if (params) {
flb_free(params);
}

mk_event_channel_destroy(th_ins->evl,
th_ins->ch_parent_events[0],
th_ins->ch_parent_events[1],
th_ins);

mk_event_loop_destroy(th_ins->evl);
flb_bucket_queue_destroy(th_ins->evl_bktq);

Expand Down
3 changes: 2 additions & 1 deletion src/flb_scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ struct flb_sched *flb_sched_create(struct flb_config *config,
}

/* Release all resources used by the Scheduler */
int flb_sched_destroy(struct flb_sched *sched)
int flb_sched_destroy(struct mk_event_loop *evl, struct flb_sched *sched)
{
int c = 0;
struct mk_list *tmp;
Expand Down Expand Up @@ -603,6 +603,7 @@ int flb_sched_destroy(struct flb_sched *sched)
/* Delete timers */
mk_list_foreach_safe(head, tmp, &sched->timers) {
timer = mk_list_entry(head, struct flb_sched_timer, _head);
mk_event_timeout_destroy(evl, &timer->event);
flb_sched_timer_destroy(timer);
c++;
}
Expand Down
4 changes: 1 addition & 3 deletions src/flb_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,7 @@ void flb_worker_destroy(struct flb_worker *worker)
return;
}

if (worker->log_cache) {
flb_log_cache_destroy(worker->log_cache);
}
flb_log_work_destroy(worker);

mk_list_del(&worker->_head);
flb_free(worker);
Expand Down
2 changes: 1 addition & 1 deletion tests/internal/multiline.c
Original file line number Diff line number Diff line change
Expand Up @@ -1445,7 +1445,7 @@ static void test_issue_5504()
TEST_CHECK(ml->last_flush > last_flush);

/* Cleanup */
flb_sched_destroy(config->sched);
flb_sched_destroy(config->evl, config->sched);
config->sched = sched;
mk_event_loop_destroy(config->evl);
config->evl = evl;
Expand Down
Loading