Skip to content

Commit

Permalink
fixes: lots of fixes for file descriptor leaks.
Browse files Browse the repository at this point in the history
Signed-off-by: Phillip Whelan <[email protected]>
  • Loading branch information
pwhelan committed Jan 17, 2024
1 parent 71746b3 commit ae8fa99
Show file tree
Hide file tree
Showing 11 changed files with 59 additions and 43 deletions.
1 change: 1 addition & 0 deletions include/fluent-bit/flb_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ static inline int flb_log_suppress_check(int log_suppress_interval, const char *
#endif

int flb_log_worker_init(struct flb_worker *worker);
int flb_log_work_destroy(struct flb_worker *worker);
int flb_errno_print(int errnum, const char *file, int line);

#ifdef __FLB_FILENAME__
Expand Down
2 changes: 1 addition & 1 deletion include/fluent-bit/flb_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ int flb_sched_event_handler(struct flb_config *config, struct mk_event *event);
struct flb_sched *flb_sched_create(struct flb_config *config,
struct mk_event_loop *evl);

int flb_sched_destroy(struct flb_sched *sched);
int flb_sched_destroy(struct mk_event_loop *evl, struct flb_sched *sched);

struct flb_sched_timer *flb_sched_timer_create(struct flb_sched *sched);
int flb_sched_timer_destroy(struct flb_sched_timer *timer);
Expand Down
55 changes: 24 additions & 31 deletions plugins/in_calyptia_fleet/in_calyptia_fleet.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ struct flb_in_calyptia_fleet_config {
};

static int get_calyptia_files(struct flb_in_calyptia_fleet_config *ctx,
struct flb_connection *u_conn,
const char *url,
time_t timestamp);

Expand Down Expand Up @@ -878,14 +877,20 @@ static flb_sds_t get_project_id_from_api_key(struct flb_in_calyptia_fleet_config
}

static struct flb_http_client *fleet_http_do(struct flb_in_calyptia_fleet_config *ctx,
struct flb_connection *u_conn,
flb_sds_t url)
{
struct flb_http_client *client;
struct flb_connection *u_conn;
size_t b_sent;
int ret = -1;

if (ctx == NULL || u_conn == NULL || url == NULL) {
if (ctx == NULL || url == NULL) {
return NULL;
}

u_conn = flb_upstream_conn_get(ctx->u);
if (u_conn == NULL) {
flb_plg_error(ctx->ins, "unable to get upstream connection");
return NULL;
}

Expand Down Expand Up @@ -920,23 +925,24 @@ static struct flb_http_client *fleet_http_do(struct flb_in_calyptia_fleet_config
goto http_do_error;
}

flb_upstream_conn_release(u_conn);
return client;

http_do_error:
flb_http_client_destroy(client);
http_client_error:
flb_upstream_conn_release(u_conn);
return NULL;
}

static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ctx,
struct flb_connection *u_conn,
struct flb_config *config)
{
struct flb_http_client *client;
flb_sds_t url;
flb_sds_t project_id;

if (ctx == NULL || u_conn == NULL || config == NULL) {
if (ctx == NULL || config == NULL) {
return -1;
}

Expand All @@ -955,7 +961,7 @@ static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ct
flb_sds_printf(&url, "/v1/search?project_id=%s&resource=fleet&term=%s",
project_id, ctx->fleet_name);

client = fleet_http_do(ctx, u_conn, url);
client = fleet_http_do(ctx, url);
flb_sds_destroy(url);

if (!client) {
Expand All @@ -981,7 +987,6 @@ static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ct
}

static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx,
struct flb_connection *u_conn,
flb_sds_t url,
const char *hdr,
const char *dst,
Expand All @@ -997,11 +1002,11 @@ static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx,
time_t last_modified;
flb_sds_t fname;

if (ctx == NULL || u_conn == NULL || url == NULL) {
if (ctx == NULL || url == NULL) {
return -1;
}

client = fleet_http_do(ctx, u_conn, url);
client = fleet_http_do(ctx, url);

if (client == NULL) {
return -1;
Expand Down Expand Up @@ -1740,8 +1745,7 @@ static int create_fleet_header(struct flb_in_calyptia_fleet_config *ctx)
return rc;
}

static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx,
struct flb_connection *u_conn)
static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx)
{
flb_sds_t cfgname;
flb_sds_t cfgnewname;
Expand Down Expand Up @@ -1778,14 +1782,14 @@ static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx,
flb_sds_destroy(hdrname);

/* create the base file. */
ret = get_calyptia_file(ctx, u_conn, ctx->fleet_url, header,
ret = get_calyptia_file(ctx, ctx->fleet_url, header,
NULL, &time_last_modified);

flb_sds_destroy(header);

/* new file created! */
if (ret == 1) {
get_calyptia_files(ctx, u_conn, ctx->fleet_files_url, time_last_modified);
get_calyptia_files(ctx, ctx->fleet_files_url, time_last_modified);

cfgname = time_fleet_config_filename(ctx, time_last_modified);

Expand Down Expand Up @@ -1820,30 +1824,20 @@ static int in_calyptia_fleet_collect(struct flb_input_instance *ins,
void *in_context)
{
struct flb_in_calyptia_fleet_config *ctx = in_context;
struct flb_connection *u_conn;
int ret = -1;

u_conn = flb_upstream_conn_get(ctx->u);

if (!u_conn) {
flb_plg_error(ctx->ins, "could not get an upstream connection to %s:%u",
ctx->ins->host.name, ctx->ins->host.port);
goto conn_error;
}

if (ctx->fleet_id == NULL) {

if (get_calyptia_fleet_id_by_name(ctx, u_conn, config) == -1) {
if (get_calyptia_fleet_id_by_name(ctx, config) == -1) {
flb_plg_error(ctx->ins, "unable to find fleet: %s", ctx->fleet_name);
goto calyptia_error;
}
goto fleet_id_error;
}
}

ret = get_calyptia_fleet_config(ctx, u_conn);
ret = get_calyptia_fleet_config(ctx);

calyptia_error:
flb_upstream_conn_release(u_conn);
conn_error:
fleet_id_error:
FLB_INPUT_RETURN(ret);
}

Expand Down Expand Up @@ -2104,18 +2098,17 @@ static int create_fleet_files(struct flb_in_calyptia_fleet_config *ctx,
}

static int get_calyptia_files(struct flb_in_calyptia_fleet_config *ctx,
struct flb_connection *u_conn,
const char *url,
time_t timestamp)
{
struct flb_http_client *client;
int ret = -1;

if (ctx == NULL || u_conn == NULL || url == NULL) {
if (ctx == NULL || url == NULL) {
return -1;
}

client = fleet_http_do(ctx, u_conn, ctx->fleet_files_url);
client = fleet_http_do(ctx, ctx->fleet_files_url);

if (client == NULL) {
return -1;
Expand Down
2 changes: 1 addition & 1 deletion src/flb_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ void flb_config_exit(struct flb_config *config)

/* Release scheduler */
if (config->sched) {
flb_sched_destroy(config->sched);
flb_sched_destroy(config->evl, config->sched);
}

#ifdef FLB_HAVE_HTTP_SERVER
Expand Down
10 changes: 7 additions & 3 deletions src/flb_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -726,9 +726,9 @@ int flb_engine_start(struct flb_config *config)
* to the local event loop 'evl'.
*/
ret = mk_event_channel_create(config->evl,
&config->ch_self_events[0],
&config->ch_self_events[1],
&config->event_thread_init);
&config->ch_self_events[0],
&config->ch_self_events[1],
&config->event_thread_init);
if (ret == -1) {
flb_error("[engine] could not create engine thread channel");
return -1;
Expand Down Expand Up @@ -1135,6 +1135,10 @@ int flb_engine_shutdown(struct flb_config *config)
flb_hs_destroy(config->http_ctx);
}
#endif
mk_event_channel_destroy(config->evl,
config->ch_self_events[0],
config->ch_self_events[1],
&config->event_thread_init);

return 0;
}
Expand Down
2 changes: 1 addition & 1 deletion src/flb_input_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ static void input_thread(void *data)

/* Create the bucket queue (FLB_ENGINE_PRIORITY_COUNT priorities) */
flb_bucket_queue_destroy(evl_bktq);
flb_sched_destroy(sched);
flb_sched_destroy(thi->evl, sched);
input_thread_instance_destroy(thi);
}

Expand Down
9 changes: 9 additions & 0 deletions src/flb_log.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,14 @@ int flb_log_cache_check_suppress(struct flb_log_cache *cache, char *msg_buf, siz
return FLB_TRUE;
}

int flb_log_work_destroy(struct flb_worker *worker)
{
close(worker->log[0]);
close(worker->log[1]);

return 0;
}

int flb_log_worker_init(struct flb_worker *worker)
{
int ret;
Expand Down Expand Up @@ -689,6 +697,7 @@ int flb_log_destroy(struct flb_log *log, struct flb_config *config)
if (log->worker->log_cache) {
flb_log_cache_destroy(log->worker->log_cache);
}
flb_log_work_destroy(log->worker);
flb_free(log->worker);
flb_free(log);

Expand Down
12 changes: 11 additions & 1 deletion src/flb_output_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,10 @@ static void output_thread(void *data)
}
}

mk_event_channel_destroy(th_ins->evl,
th_ins->ch_thread_events[0],
th_ins->ch_thread_events[1],
&event_local);
/*
* Final cleanup, destroy all resources associated with:
*
Expand All @@ -358,11 +362,17 @@ static void output_thread(void *data)
flb_upstream_conn_active_destroy_list(&th_ins->upstreams);
flb_upstream_conn_pending_destroy_list(&th_ins->upstreams);

flb_sched_destroy(sched);
flb_sched_destroy(th_ins->evl, sched);
params = FLB_TLS_GET(out_flush_params);
if (params) {
flb_free(params);
}

mk_event_channel_destroy(th_ins->evl,
th_ins->ch_parent_events[0],
th_ins->ch_parent_events[1],
th_ins);

mk_event_loop_destroy(th_ins->evl);
flb_bucket_queue_destroy(th_ins->evl_bktq);

Expand Down
3 changes: 2 additions & 1 deletion src/flb_scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ struct flb_sched *flb_sched_create(struct flb_config *config,
}

/* Release all resources used by the Scheduler */
int flb_sched_destroy(struct flb_sched *sched)
int flb_sched_destroy(struct mk_event_loop *evl, struct flb_sched *sched)
{
int c = 0;
struct mk_list *tmp;
Expand Down Expand Up @@ -603,6 +603,7 @@ int flb_sched_destroy(struct flb_sched *sched)
/* Delete timers */
mk_list_foreach_safe(head, tmp, &sched->timers) {
timer = mk_list_entry(head, struct flb_sched_timer, _head);
mk_event_timeout_destroy(evl, &timer->event);
flb_sched_timer_destroy(timer);
c++;
}
Expand Down
4 changes: 1 addition & 3 deletions src/flb_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,7 @@ void flb_worker_destroy(struct flb_worker *worker)
return;
}

if (worker->log_cache) {
flb_log_cache_destroy(worker->log_cache);
}
flb_log_work_destroy(worker);

mk_list_del(&worker->_head);
flb_free(worker);
Expand Down
2 changes: 1 addition & 1 deletion tests/internal/multiline.c
Original file line number Diff line number Diff line change
Expand Up @@ -1445,7 +1445,7 @@ static void test_issue_5504()
TEST_CHECK(ml->last_flush > last_flush);

/* Cleanup */
flb_sched_destroy(config->sched);
flb_sched_destroy(config->evl, config->sched);
config->sched = sched;
mk_event_loop_destroy(config->evl);
config->evl = evl;
Expand Down

0 comments on commit ae8fa99

Please sign in to comment.