diff --git a/include/fluent-bit/flb_log.h b/include/fluent-bit/flb_log.h index 45f60d1e098..48e8ff74966 100644 --- a/include/fluent-bit/flb_log.h +++ b/include/fluent-bit/flb_log.h @@ -225,6 +225,7 @@ static inline int flb_log_suppress_check(int log_suppress_interval, const char * #endif int flb_log_worker_init(struct flb_worker *worker); +int flb_log_worker_destroy(struct flb_worker *worker); int flb_errno_print(int errnum, const char *file, int line); #ifdef __FLB_FILENAME__ diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index f56f4d6b15c..cbacc38de76 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -39,7 +39,7 @@ #include #include -// Glob support +/* Glob support */ #ifndef _MSC_VER #include #endif @@ -102,8 +102,18 @@ struct flb_in_calyptia_fleet_config { int collect_fd; }; +struct reload_ctx { + flb_ctx_t *flb; + flb_sds_t cfg_path; +}; + +static flb_sds_t fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx, char *fname); + +#define new_fleet_config_filename(a) fleet_config_filename((a), "new") +#define cur_fleet_config_filename(a) fleet_config_filename((a), "cur") +#define old_fleet_config_filename(a) fleet_config_filename((a), "old") + static int get_calyptia_files(struct flb_in_calyptia_fleet_config *ctx, - struct flb_connection *u_conn, const char *url, time_t timestamp); @@ -139,18 +149,15 @@ static char *find_case_header(struct flb_http_client *cli, const char *header) headstart = strstr(cli->resp.data, "\r\n"); - if (headstart == NULL) { return NULL; } /* Lookup the beginning of the header */ for (ptr = headstart; ptr != NULL && ptr+2 < cli->resp.payload; ptr = strstr(ptr, "\r\n")) { - if (ptr + 4 < cli->resp.payload && strcmp(ptr, "\r\n\r\n") == 0) { return NULL; } - ptr+=2; /* no space left for header */ @@ -160,7 +167,6 @@ static char *find_case_header(struct flb_http_client *cli, const char *header) /* matched header and the delimiter */ if (strncasecmp(ptr, header, strlen(header)) == 0) { - if (ptr[strlen(header)] == ':' && ptr[strlen(header)+1] == ' ') { return ptr; } @@ -185,9 +191,7 @@ static int case_header_lookup(struct flb_http_client *cli, ptr = find_case_header(cli, header); end = strstr(cli->resp.data, "\r\n\r\n"); - if (!ptr) { - if (end) { /* The headers are complete but the header is not there */ return -1; @@ -204,7 +208,6 @@ static int case_header_lookup(struct flb_http_client *cli, /* Lookup CRLF (end of line \r\n) */ crlf = strstr(ptr, "\r\n"); - if (!crlf) { return -1; } @@ -222,10 +225,6 @@ static int case_header_lookup(struct flb_http_client *cli, return 0; } -struct reload_ctx { - flb_ctx_t *flb; - flb_sds_t cfg_path; -}; static flb_sds_t generate_base_fleet_directory(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t *fleet_dir) { @@ -269,11 +268,6 @@ static flb_sds_t fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx, return cfgname; } - -#define new_fleet_config_filename(a) fleet_config_filename((a), "new") -#define cur_fleet_config_filename(a) fleet_config_filename((a), "cur") -#define old_fleet_config_filename(a) fleet_config_filename((a), "old") - static flb_sds_t time_fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx, time_t t) { char s_last_modified[32]; @@ -388,9 +382,7 @@ static int is_timestamped_fleet_config_path(struct flb_in_calyptia_fleet_config errno = 0; val = strtol(fname, &end, 10); - - if ((errno == ERANGE && (val == LONG_MAX || val == LONG_MIN)) || - (errno != 0 && val == 0)) { + if ((errno == ERANGE && (val == LONG_MAX || val == LONG_MIN)) || (errno != 0 && val == 0)) { flb_errno(); return FLB_FALSE; } @@ -433,9 +425,8 @@ static int is_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_ static int exists_new_fleet_config(struct flb_in_calyptia_fleet_config *ctx) { - flb_sds_t cfgnewname; int ret = FLB_FALSE; - + flb_sds_t cfgnewname; cfgnewname = new_fleet_config_filename(ctx); if (cfgnewname == NULL) { @@ -444,8 +435,8 @@ static int exists_new_fleet_config(struct flb_in_calyptia_fleet_config *ctx) } ret = access(cfgnewname, F_OK) == 0 ? FLB_TRUE : FLB_FALSE; - flb_sds_destroy(cfgnewname); + return ret; } @@ -469,9 +460,8 @@ static int exists_cur_fleet_config(struct flb_in_calyptia_fleet_config *ctx) static int exists_old_fleet_config(struct flb_in_calyptia_fleet_config *ctx) { - flb_sds_t cfgoldname; int ret = FLB_FALSE; - + flb_sds_t cfgoldname; cfgoldname = old_fleet_config_filename(ctx); if (cfgoldname == NULL) { @@ -480,8 +470,8 @@ static int exists_old_fleet_config(struct flb_in_calyptia_fleet_config *ctx) } ret = access(cfgoldname, F_OK) == 0 ? FLB_TRUE : FLB_FALSE; - flb_sds_destroy(cfgoldname); + return ret; } @@ -496,6 +486,9 @@ static void *do_reload(void *data) /* avoid reloading the current configuration... just use our new one! */ flb_context_set(reload->flb); reload->flb->config->enable_hot_reload = FLB_TRUE; + if (reload->flb->config->conf_path_file) { + flb_sds_destroy(reload->flb->config->conf_path_file); + } reload->flb->config->conf_path_file = reload->cfg_path; flb_free(reload); @@ -519,7 +512,6 @@ static int test_config_is_valid(struct flb_in_calyptia_fleet_config *ctx, } conf = flb_cf_create(); - if (conf == NULL) { flb_plg_debug(ctx->ins, "unable to create conf during validation test: %s", cfgpath); @@ -527,7 +519,6 @@ static int test_config_is_valid(struct flb_in_calyptia_fleet_config *ctx, } conf = flb_cf_create_from_file(conf, cfgpath); - if (conf == NULL) { flb_plg_debug(ctx->ins, "unable to create conf from file during validation test: %s", @@ -574,7 +565,6 @@ static int parse_config_name_timestamp(struct flb_in_calyptia_fleet_config *ctx, } fname = basename(realname); - flb_plg_debug(ctx->ins, "parsing configuration timestamp from path: %s", fname); errno = 0; @@ -726,7 +716,7 @@ static flb_sds_t parse_api_key_json(struct flb_in_calyptia_fleet_config *ctx, struct flb_pack_state pack_state; size_t off = 0; msgpack_unpacked result; - msgpack_object *projectID; + msgpack_object *tmp; flb_sds_t project_id = NULL; if (ctx == NULL || payload == NULL) { @@ -749,22 +739,20 @@ static flb_sds_t parse_api_key_json(struct flb_in_calyptia_fleet_config *ctx, msgpack_unpacked_init(&result); while (msgpack_unpack_next(&result, pack, out_size, &off) == MSGPACK_UNPACK_SUCCESS) { - projectID = msgpack_lookup_map_key(&result.data, "ProjectID"); - - if (projectID == NULL) { + tmp = msgpack_lookup_map_key(&result.data, "ProjectID"); + if (tmp == NULL) { flb_plg_error(ctx->ins, "unable to find fleet by name"); msgpack_unpacked_destroy(&result); return NULL; } - if (projectID->type != MSGPACK_OBJECT_STR) { - flb_plg_error(ctx->ins, "invalid fleet ID"); + if (tmp->type != MSGPACK_OBJECT_STR) { + flb_plg_error(ctx->ins, "invalid fleet ID data type"); msgpack_unpacked_destroy(&result); return NULL; } - project_id = flb_sds_create_len(projectID->via.str.ptr, - projectID->via.str.size); + project_id = flb_sds_create_len(tmp->via.str.ptr, tmp->via.str.size); break; } @@ -851,7 +839,6 @@ static flb_sds_t get_project_id_from_api_key(struct flb_in_calyptia_fleet_config } api_token_sep = strchr(ctx->api_key, '.'); - if (api_token_sep == NULL) { return NULL; } @@ -878,14 +865,20 @@ static flb_sds_t get_project_id_from_api_key(struct flb_in_calyptia_fleet_config } static struct flb_http_client *fleet_http_do(struct flb_in_calyptia_fleet_config *ctx, - struct flb_connection *u_conn, flb_sds_t url) { - struct flb_http_client *client; - size_t b_sent; int ret = -1; + size_t b_sent; + struct flb_connection *u_conn; + struct flb_http_client *client; - if (ctx == NULL || u_conn == NULL || url == NULL) { + if (ctx == NULL || url == NULL) { + return NULL; + } + + u_conn = flb_upstream_conn_get(ctx->u); + if (u_conn == NULL) { + flb_plg_error(ctx->ins, "unable to get upstream connection"); return NULL; } @@ -904,7 +897,6 @@ static struct flb_http_client *fleet_http_do(struct flb_in_calyptia_fleet_config ctx->api_key, flb_sds_len(ctx->api_key)); ret = flb_http_do(client, &b_sent); - if (ret != 0) { flb_plg_error(ctx->ins, "http do error"); goto http_do_error; @@ -920,28 +912,28 @@ static struct flb_http_client *fleet_http_do(struct flb_in_calyptia_fleet_config goto http_do_error; } + flb_upstream_conn_release(u_conn); return client; http_do_error: flb_http_client_destroy(client); http_client_error: + flb_upstream_conn_release(u_conn); return NULL; } static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ctx, - struct flb_connection *u_conn, struct flb_config *config) { struct flb_http_client *client; flb_sds_t url; flb_sds_t project_id; - if (ctx == NULL || u_conn == NULL || config == NULL) { + if (ctx == NULL || config == NULL) { return -1; } project_id = get_project_id_from_api_key(ctx); - if (project_id == NULL) { return -1; } @@ -955,7 +947,7 @@ static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ct flb_sds_printf(&url, "/v1/search?project_id=%s&resource=fleet&term=%s", project_id, ctx->fleet_name); - client = fleet_http_do(ctx, u_conn, url); + client = fleet_http_do(ctx, url); flb_sds_destroy(url); if (!client) { @@ -981,7 +973,6 @@ static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ct } static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx, - struct flb_connection *u_conn, flb_sds_t url, const char *hdr, const char *dst, @@ -997,18 +988,17 @@ static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx, time_t last_modified; flb_sds_t fname; - if (ctx == NULL || u_conn == NULL || url == NULL) { + if (ctx == NULL || url == NULL) { return -1; } - client = fleet_http_do(ctx, u_conn, url); - + client = fleet_http_do(ctx, url); if (client == NULL) { return -1; } ret = case_header_lookup(client, "Last-modified", strlen("Last-modified"), - &fbit_last_modified, &fbit_last_modified_len); + &fbit_last_modified, &fbit_last_modified_len); if (ret < 0) { goto client_error; @@ -1019,7 +1009,8 @@ static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx, last_modified = mktime(&tm_last_modified.tm); fname = time_fleet_config_filename(ctx, last_modified); - } else { + } + else { fname = flb_sds_create_len(dst, strlen(dst)); } @@ -1165,7 +1156,6 @@ static char *dirname(char *path) { char *ptr; - ptr = strrchr(path, '\\'); if (ptr == NULL) { @@ -1191,7 +1181,6 @@ static struct cfl_array *read_glob_win(const char *path, struct cfl_array *list) } star = strchr(path, '*'); - if (star == NULL) { flb_error("path has no wild card: %s", path); return NULL; @@ -1353,7 +1342,6 @@ static int calyptia_config_delete_old_dir(const char *cfgpath) for (idx = 0; idx < ((ssize_t)files->entry_count); idx++) { unlink(files->entries[idx]->data.as_string); } - cfl_array_destroy(files); } /* attempt to delete the main directory */ @@ -1740,8 +1728,7 @@ static int create_fleet_header(struct flb_in_calyptia_fleet_config *ctx) return rc; } -static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx, - struct flb_connection *u_conn) +static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx) { flb_sds_t cfgname; flb_sds_t cfgnewname; @@ -1778,14 +1765,12 @@ static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx, flb_sds_destroy(hdrname); /* create the base file. */ - ret = get_calyptia_file(ctx, u_conn, ctx->fleet_url, header, - NULL, &time_last_modified); - + ret = get_calyptia_file(ctx, ctx->fleet_url, header, NULL, &time_last_modified); flb_sds_destroy(header); /* new file created! */ if (ret == 1) { - get_calyptia_files(ctx, u_conn, ctx->fleet_files_url, time_last_modified); + get_calyptia_files(ctx, ctx->fleet_files_url, time_last_modified); cfgname = time_fleet_config_filename(ctx, time_last_modified); @@ -1819,31 +1804,19 @@ static int in_calyptia_fleet_collect(struct flb_input_instance *ins, struct flb_config *config, void *in_context) { - struct flb_in_calyptia_fleet_config *ctx = in_context; - struct flb_connection *u_conn; int ret = -1; - - u_conn = flb_upstream_conn_get(ctx->u); - - if (!u_conn) { - flb_plg_error(ctx->ins, "could not get an upstream connection to %s:%u", - ctx->ins->host.name, ctx->ins->host.port); - goto conn_error; - } + struct flb_in_calyptia_fleet_config *ctx = in_context; if (ctx->fleet_id == NULL) { - - if (get_calyptia_fleet_id_by_name(ctx, u_conn, config) == -1) { + if (get_calyptia_fleet_id_by_name(ctx, config) == -1) { flb_plg_error(ctx->ins, "unable to find fleet: %s", ctx->fleet_name); - goto calyptia_error; + goto fleet_id_error; } } - ret = get_calyptia_fleet_config(ctx, u_conn); + ret = get_calyptia_fleet_config(ctx); -calyptia_error: - flb_upstream_conn_release(u_conn); -conn_error: +fleet_id_error: FLB_INPUT_RETURN(ret); } @@ -2104,19 +2077,17 @@ static int create_fleet_files(struct flb_in_calyptia_fleet_config *ctx, } static int get_calyptia_files(struct flb_in_calyptia_fleet_config *ctx, - struct flb_connection *u_conn, const char *url, time_t timestamp) { struct flb_http_client *client; int ret = -1; - if (ctx == NULL || u_conn == NULL || url == NULL) { + if (ctx == NULL || url == NULL) { return -1; } - client = fleet_http_do(ctx, u_conn, ctx->fleet_files_url); - + client = fleet_http_do(ctx, ctx->fleet_files_url); if (client == NULL) { return -1; } @@ -2155,7 +2126,6 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, /* Allocate space for the configuration */ ctx = flb_calloc(1, sizeof(struct flb_in_calyptia_fleet_config)); - if (!ctx) { flb_errno(); return -1; @@ -2166,8 +2136,7 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, /* Load the config map */ - ret = flb_input_config_map_set(in, (void *)ctx); - + ret = flb_input_config_map_set(in, (void *) ctx); if (ret == -1) { flb_free(ctx); flb_plg_error(in, "unable to load configuration"); diff --git a/plugins/out_forward/forward.c b/plugins/out_forward/forward.c index 82e83b2d305..ea7904691c9 100644 --- a/plugins/out_forward/forward.c +++ b/plugins/out_forward/forward.c @@ -1574,9 +1574,7 @@ static void cb_forward_flush(struct flb_event_chunk *event_chunk, if (!u_conn) { flb_plg_error(ctx->ins, "no upstream connections available"); msgpack_sbuffer_destroy(&mp_sbuf); - if (fc->time_as_integer == FLB_TRUE) { - flb_free(out_buf); - } + flb_free(out_buf); flb_free(flush_ctx); FLB_OUTPUT_RETURN(FLB_RETRY); } @@ -1590,9 +1588,7 @@ static void cb_forward_flush(struct flb_event_chunk *event_chunk, flb_plg_error(ctx->ins, "no unix socket connection available"); msgpack_sbuffer_destroy(&mp_sbuf); - if (fc->time_as_integer == FLB_TRUE) { - flb_free(out_buf); - } + flb_free(out_buf); flb_free(flush_ctx); FLB_OUTPUT_RETURN(FLB_RETRY); } @@ -1622,9 +1618,7 @@ static void cb_forward_flush(struct flb_event_chunk *event_chunk, } msgpack_sbuffer_destroy(&mp_sbuf); - if (fc->time_as_integer == FLB_TRUE) { - flb_free(out_buf); - } + flb_free(out_buf); flb_free(flush_ctx); FLB_OUTPUT_RETURN(FLB_RETRY); } diff --git a/src/flb_config.c b/src/flb_config.c index 94612cd42bb..54249fa8d33 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -417,8 +417,7 @@ void flb_config_exit(struct flb_config *config) /* Pipe */ if (config->ch_data[0]) { - mk_event_closesocket(config->ch_data[0]); - mk_event_closesocket(config->ch_data[1]); + flb_pipe_destroy(config->ch_data); } /* Channel manager */ diff --git a/src/flb_engine.c b/src/flb_engine.c index 48338367f78..5ec76d788e0 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -726,9 +726,9 @@ int flb_engine_start(struct flb_config *config) * to the local event loop 'evl'. */ ret = mk_event_channel_create(config->evl, - &config->ch_self_events[0], - &config->ch_self_events[1], - &config->event_thread_init); + &config->ch_self_events[0], + &config->ch_self_events[1], + &config->event_thread_init); if (ret == -1) { flb_error("[engine] could not create engine thread channel"); return -1; @@ -1135,6 +1135,12 @@ int flb_engine_shutdown(struct flb_config *config) flb_hs_destroy(config->http_ctx); } #endif + if (config->evl) { + mk_event_channel_destroy(config->evl, + config->ch_self_events[0], + config->ch_self_events[1], + &config->event_thread_init); + } return 0; } diff --git a/src/flb_log.c b/src/flb_log.c index 9ef027aca0f..f9c7bfbc15d 100644 --- a/src/flb_log.c +++ b/src/flb_log.c @@ -303,6 +303,12 @@ int flb_log_cache_check_suppress(struct flb_log_cache *cache, char *msg_buf, siz return FLB_TRUE; } +int flb_log_worker_destroy(struct flb_worker *worker) +{ + flb_pipe_destroy(worker->log); + return 0; +} + int flb_log_worker_init(struct flb_worker *worker) { int ret; @@ -321,16 +327,14 @@ int flb_log_worker_init(struct flb_worker *worker) ret = mk_event_add(log->evl, worker->log[0], FLB_LOG_EVENT, MK_EVENT_READ, &worker->event); if (ret == -1) { - close(worker->log[0]); - close(worker->log[1]); + flb_pipe_destroy(worker->log); return -1; } /* Log cache to reduce noise */ cache = flb_log_cache_create(10, FLB_LOG_CACHE_ENTRIES); if (!cache) { - close(worker->log[0]); - close(worker->log[1]); + flb_pipe_destroy(worker->log); return -1; } worker->log_cache = cache; @@ -688,7 +692,9 @@ int flb_log_destroy(struct flb_log *log, struct flb_config *config) flb_pipe_destroy(log->ch_mng); if (log->worker->log_cache) { flb_log_cache_destroy(log->worker->log_cache); + log->worker->log_cache = NULL; } + flb_log_worker_destroy(log->worker); flb_free(log->worker); flb_free(log); diff --git a/src/flb_output_thread.c b/src/flb_output_thread.c index 62cc5a04716..5fca6be4bf3 100644 --- a/src/flb_output_thread.c +++ b/src/flb_output_thread.c @@ -344,6 +344,10 @@ static void output_thread(void *data) } } + mk_event_channel_destroy(th_ins->evl, + th_ins->ch_thread_events[0], + th_ins->ch_thread_events[1], + &event_local); /* * Final cleanup, destroy all resources associated with: * @@ -363,6 +367,12 @@ static void output_thread(void *data) if (params) { flb_free(params); } + + mk_event_channel_destroy(th_ins->evl, + th_ins->ch_parent_events[0], + th_ins->ch_parent_events[1], + th_ins); + mk_event_loop_destroy(th_ins->evl); flb_bucket_queue_destroy(th_ins->evl_bktq); diff --git a/src/flb_scheduler.c b/src/flb_scheduler.c index 7e42550e5c3..01725eb03e8 100644 --- a/src/flb_scheduler.c +++ b/src/flb_scheduler.c @@ -603,6 +603,7 @@ int flb_sched_destroy(struct flb_sched *sched) /* Delete timers */ mk_list_foreach_safe(head, tmp, &sched->timers) { timer = mk_list_entry(head, struct flb_sched_timer, _head); + mk_event_timeout_destroy(sched->evl, &timer->event); flb_sched_timer_destroy(timer); c++; } diff --git a/src/flb_worker.c b/src/flb_worker.c index 067689596a1..87bc006433d 100644 --- a/src/flb_worker.c +++ b/src/flb_worker.c @@ -144,7 +144,9 @@ void flb_worker_destroy(struct flb_worker *worker) if (worker->log_cache) { flb_log_cache_destroy(worker->log_cache); + worker->log_cache = NULL; } + flb_log_worker_destroy(worker); mk_list_del(&worker->_head); flb_free(worker);