diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index caca271e857..bb8dfb99992 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -42,7 +42,7 @@ #define CALYPTIA_H_CTYPE "Content-Type" #define CALYPTIA_H_CTYPE_JSON "application/json" -#define DEFAULT_INTERVAL_SEC "3" +#define DEFAULT_INTERVAL_SEC "15" #define DEFAULT_INTERVAL_NSEC "0" #define CALYPTIA_HOST "cloud-api.calyptia.com" @@ -61,6 +61,11 @@ struct flb_in_calyptia_fleet_config { int interval_sec; int interval_nsec; + /* Grabbed from the cfg_path, used to check if configuration has + * has been updated. + */ + long config_timestamp; + flb_sds_t api_key; flb_sds_t fleet_id; flb_sds_t fleet_name; @@ -103,12 +108,12 @@ static char *find_case_header(struct flb_http_client *cli, const char *header) ptr+=2; - // no space left for header + /* no space left for header */ if (ptr + strlen(header)+2 >= cli->resp.payload) { return NULL; } - // matched header and the delimiter + /* matched header and the delimiter */ if (strncasecmp(ptr, header, strlen(header)) == 0) { if (ptr[strlen(header)] == ':' && ptr[strlen(header)+1] == ' ') { @@ -159,6 +164,11 @@ static int case_header_lookup(struct flb_http_client *cli, return -1; } + /* sanity check that the header_len does not exceed the headers. */ + if (ptr + header_len + 2 > end) { + return -1; + } + ptr += header_len + 2; *out_val = ptr; @@ -257,13 +267,49 @@ static int is_cur_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct return ret; } +static int is_timestamped_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg) +{ + char *fname; + char *end; + long val; + + if (cfg->conf_path_file == NULL) { + return FLB_FALSE; + } + + fname = strrchr(cfg->conf_path_file, PATH_SEPARATOR[0]); + + if (fname == NULL) { + return FLB_FALSE; + } + + fname++; + + errno = 0; + val = strtol(fname, &end, 10); + + if ((errno == ERANGE && (val == LONG_MAX || val == LONG_MIN)) || + (errno != 0 && val == 0)) { + flb_errno(); + return FLB_FALSE; + } + + if (strcmp(end, ".ini") == 0) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + static int is_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg) { if (cfg->conf_path_file == NULL) { return FLB_FALSE; } - return is_new_fleet_config(ctx, cfg) || is_cur_fleet_config(ctx, cfg); + return is_new_fleet_config(ctx, cfg) || + is_cur_fleet_config(ctx, cfg) || + is_timestamped_fleet_config(ctx, cfg); } static int exists_new_fleet_config(struct flb_in_calyptia_fleet_config *ctx) @@ -296,7 +342,7 @@ static void *do_reload(void *data) { struct reload_ctx *reload = (struct reload_ctx *)data; - // avoid reloading the current configuration... just use our new one! + /* avoid reloading the current configuration... just use our new one! */ flb_context_set(reload->flb); reload->flb->config->enable_hot_reload = FLB_TRUE; reload->flb->config->conf_path_file = reload->cfg_path; @@ -359,19 +405,33 @@ static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cf pthread_attr_t ptha; flb_ctx_t *flb = flb_context_get(); + if (ctx->collect_fd > 0) { + flb_input_collector_pause(ctx->collect_fd, ctx->ins); + } if (flb == NULL) { flb_plg_error(ctx->ins, "unable to get fluent-bit context."); + + if (ctx->collect_fd > 0) { + flb_input_collector_resume(ctx->collect_fd, ctx->ins); + } + return FLB_FALSE; } - // fix execution in valgrind... - // otherwise flb_reload errors out with: - // [error] [reload] given flb context is NULL + /* fix execution in valgrind... + * otherwise flb_reload errors out with: + * [error] [reload] given flb context is NULL + */ flb_plg_info(ctx->ins, "loading configuration from %s.", cfgpath); if (test_config_is_valid(cfgpath) == FLB_FALSE) { flb_plg_error(ctx->ins, "unable to load configuration."); + + if (ctx->collect_fd > 0) { + flb_input_collector_resume(ctx->collect_fd, ctx->ins); + } + return FLB_FALSE; } @@ -447,8 +507,7 @@ static flb_sds_t parse_api_key_json(struct flb_in_calyptia_fleet_config *ctx, } project_id = flb_sds_create_len(cur->val.via.str.ptr, - cur->val.via.str.size); - + cur->val.via.str.size); msgpack_unpacked_destroy(&result); flb_free(pack); @@ -724,7 +783,6 @@ static int in_calyptia_fleet_collect(struct flb_input_instance *ins, goto http_error; } - // Wed, 21 Oct 2015 07:28:00 GMT flb_strptime(fbit_last_modified, "%a, %d %B %Y %H:%M:%S GMT", &tm_last_modified); time_last_modified = mktime(&tm_last_modified.tm); @@ -807,14 +865,16 @@ static int in_calyptia_fleet_collect(struct flb_input_instance *ins, flb_sds_destroy(cfgoldname); } - link(cfgname, cfgnewname); + symlink(cfgname, cfgnewname); + } - // FORCE THE RELOAD!!! + if (ctx->config_timestamp < time_last_modified) { + flb_plg_debug(ctx->ins, "new configuration is newer than current: %ld < %ld", + ctx->config_timestamp, time_last_modified); flb_plg_info(ctx->ins, "force the reloading of the configuration file=%d.", ctx->event_fd); - flb_sds_destroy(cfgname); flb_sds_destroy(data); - if (execute_reload(ctx, cfgnewname) == FLB_FALSE) { + if (execute_reload(ctx, cfgname) == FLB_FALSE) { cfgoldname = old_fleet_config_filename(ctx); cfgcurname = cur_fleet_config_filename(ctx); rename(cfgoldname, cfgcurname); @@ -822,7 +882,7 @@ static int in_calyptia_fleet_collect(struct flb_input_instance *ins, flb_sds_destroy(cfgoldname); goto reload_error; } - else { + else { FLB_INPUT_RETURN(0); } } @@ -838,10 +898,11 @@ static int in_calyptia_fleet_collect(struct flb_input_instance *ins, FLB_INPUT_RETURN(ret); } -// recursively create directories, based on: -// https://stackoverflow.com/a/2336245 -// who found it at: -// http://nion.modprobe.de/blog/archives/357-Recursive-directory-creation.html +/* recursively create directories, based on: + * https://stackoverflow.com/a/2336245 + * who found it at: + * http://nion.modprobe.de/blog/archives/357-Recursive-directory-creation.html + */ static int _mkdir(const char *dir, int perms) { char tmp[255]; char *ptr = NULL; @@ -907,23 +968,62 @@ static int create_fleet_directory(struct flb_in_calyptia_fleet_config *ctx) static int load_fleet_config(struct flb_in_calyptia_fleet_config *ctx) { flb_ctx_t *flb_ctx = flb_context_get(); - + char *fname; + char *ext; + long timestamp; + char realname[4096]; + ssize_t len; if (create_fleet_directory(ctx) != 0) { return -1; } - // check if we are already using the fleet configuration file. + /* check if we are already using the fleet configuration file. */ if (is_fleet_config(ctx, flb_ctx->config) == FLB_FALSE) { - // check which one and load it + /* check which one and load it */ if (exists_cur_fleet_config(ctx) == FLB_TRUE) { - execute_reload(ctx, cur_fleet_config_filename(ctx)); + return execute_reload(ctx, cur_fleet_config_filename(ctx)); } - else if (exists_new_fleet_config(ctx) == FLB_TRUE) { - execute_reload(ctx, new_fleet_config_filename(ctx)); + else if (exists_new_fleet_config(ctx) == FLB_TRUE) { + return execute_reload(ctx, new_fleet_config_filename(ctx)); } } - return 0; + else { + if (is_new_fleet_config(ctx, flb_ctx->config) || is_cur_fleet_config(ctx, flb_ctx->config)) { + len = readlink(flb_ctx->config->conf_path_file, realname, sizeof(realname)); + + if (len > sizeof(realname)) { + return FLB_FALSE; + } + + fname = basename(realname); + } + else { + fname = basename(flb_ctx->config->conf_path_file); + } + + if (fname == NULL) { + return FLB_FALSE; + } + + errno = 0; + timestamp = strtol(fname, &ext, 10); + + if ((errno == ERANGE && (timestamp == LONG_MAX || timestamp == LONG_MIN)) || + (errno != 0 && timestamp == 0)) { + flb_errno(); + return FLB_FALSE; + } + + /* unable to parse the timstamp */ + if (errno == ERANGE) { + return FLB_FALSE; + } + + ctx->config_timestamp = timestamp; + } + + return FLB_FALSE; } static int in_calyptia_fleet_init(struct flb_input_instance *in, @@ -953,6 +1053,7 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, return -1; } ctx->ins = in; + ctx->collect_fd = -1; /* Load the config map */ @@ -1004,9 +1105,18 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, ctx->interval_nsec = atoi(DEFAULT_INTERVAL_NSEC); } + if (ctx->interval_sec < atoi(DEFAULT_INTERVAL_SEC)) { + ctx->interval_sec = atoi(DEFAULT_INTERVAL_SEC); + } + /* Set the context */ flb_input_set_context(in, ctx); - + + /* if we load a new configuration then we will be reloaded anyways */ + if (load_fleet_config(ctx) == FLB_TRUE) { + return 0; + } + /* Set our collector based on time */ ret = flb_input_set_collector_time(in, in_calyptia_fleet_collect, @@ -1015,14 +1125,13 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, config); if (ret == -1) { - flb_plg_error(ctx->ins, "could not set collector for Health input plugin"); + flb_plg_error(ctx->ins, "could not initialize collector for fleet input plugin"); flb_free(ctx); return -1; } ctx->collect_fd = ret; - load_fleet_config(ctx); return 0; }