Skip to content

Commit

Permalink
in_calyptia_fleet: improve configuration reloading. (#7925)
Browse files Browse the repository at this point in the history
- use current configuration timestamp to evaluate if a reload is required.
- increase minimum interval to 15 seconds, pause collector sooner.
- use symlinks instead of hardlinks and check the filename to properly account for the timestamp.
- in_calyptia_fleet: initialize ctx->collect_fd to -1.
- in_calyptia_fleet: check return value from strtol()

---------

Signed-off-by: Phillip Whelan <[email protected]>
  • Loading branch information
pwhelan authored Sep 18, 2023
1 parent 47b42dd commit 64b70ed
Showing 1 changed file with 139 additions and 30 deletions.
169 changes: 139 additions & 30 deletions plugins/in_calyptia_fleet/in_calyptia_fleet.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
#define CALYPTIA_H_CTYPE "Content-Type"
#define CALYPTIA_H_CTYPE_JSON "application/json"

#define DEFAULT_INTERVAL_SEC "3"
#define DEFAULT_INTERVAL_SEC "15"
#define DEFAULT_INTERVAL_NSEC "0"

#define CALYPTIA_HOST "cloud-api.calyptia.com"
Expand All @@ -61,6 +61,11 @@ struct flb_in_calyptia_fleet_config {
int interval_sec;
int interval_nsec;

/* Grabbed from the cfg_path, used to check if configuration has
* has been updated.
*/
long config_timestamp;

flb_sds_t api_key;
flb_sds_t fleet_id;
flb_sds_t fleet_name;
Expand Down Expand Up @@ -103,12 +108,12 @@ static char *find_case_header(struct flb_http_client *cli, const char *header)

ptr+=2;

// no space left for header
/* no space left for header */
if (ptr + strlen(header)+2 >= cli->resp.payload) {
return NULL;
}

// matched header and the delimiter
/* matched header and the delimiter */
if (strncasecmp(ptr, header, strlen(header)) == 0) {

if (ptr[strlen(header)] == ':' && ptr[strlen(header)+1] == ' ') {
Expand Down Expand Up @@ -159,6 +164,11 @@ static int case_header_lookup(struct flb_http_client *cli,
return -1;
}

/* sanity check that the header_len does not exceed the headers. */
if (ptr + header_len + 2 > end) {
return -1;
}

ptr += header_len + 2;

*out_val = ptr;
Expand Down Expand Up @@ -257,13 +267,49 @@ static int is_cur_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct
return ret;
}

static int is_timestamped_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg)
{
char *fname;
char *end;
long val;

if (cfg->conf_path_file == NULL) {
return FLB_FALSE;
}

fname = strrchr(cfg->conf_path_file, PATH_SEPARATOR[0]);

if (fname == NULL) {
return FLB_FALSE;
}

fname++;

errno = 0;
val = strtol(fname, &end, 10);

if ((errno == ERANGE && (val == LONG_MAX || val == LONG_MIN)) ||
(errno != 0 && val == 0)) {
flb_errno();
return FLB_FALSE;
}

if (strcmp(end, ".ini") == 0) {
return FLB_TRUE;
}

return FLB_FALSE;
}

static int is_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg)
{
if (cfg->conf_path_file == NULL) {
return FLB_FALSE;
}

return is_new_fleet_config(ctx, cfg) || is_cur_fleet_config(ctx, cfg);
return is_new_fleet_config(ctx, cfg) ||
is_cur_fleet_config(ctx, cfg) ||
is_timestamped_fleet_config(ctx, cfg);
}

static int exists_new_fleet_config(struct flb_in_calyptia_fleet_config *ctx)
Expand Down Expand Up @@ -296,7 +342,7 @@ static void *do_reload(void *data)
{
struct reload_ctx *reload = (struct reload_ctx *)data;

// avoid reloading the current configuration... just use our new one!
/* avoid reloading the current configuration... just use our new one! */
flb_context_set(reload->flb);
reload->flb->config->enable_hot_reload = FLB_TRUE;
reload->flb->config->conf_path_file = reload->cfg_path;
Expand Down Expand Up @@ -359,19 +405,33 @@ static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cf
pthread_attr_t ptha;
flb_ctx_t *flb = flb_context_get();

if (ctx->collect_fd > 0) {
flb_input_collector_pause(ctx->collect_fd, ctx->ins);
}

if (flb == NULL) {
flb_plg_error(ctx->ins, "unable to get fluent-bit context.");

if (ctx->collect_fd > 0) {
flb_input_collector_resume(ctx->collect_fd, ctx->ins);
}

return FLB_FALSE;
}

// fix execution in valgrind...
// otherwise flb_reload errors out with:
// [error] [reload] given flb context is NULL
/* fix execution in valgrind...
* otherwise flb_reload errors out with:
* [error] [reload] given flb context is NULL
*/
flb_plg_info(ctx->ins, "loading configuration from %s.", cfgpath);

if (test_config_is_valid(cfgpath) == FLB_FALSE) {
flb_plg_error(ctx->ins, "unable to load configuration.");

if (ctx->collect_fd > 0) {
flb_input_collector_resume(ctx->collect_fd, ctx->ins);
}

return FLB_FALSE;
}

Expand Down Expand Up @@ -447,8 +507,7 @@ static flb_sds_t parse_api_key_json(struct flb_in_calyptia_fleet_config *ctx,
}

project_id = flb_sds_create_len(cur->val.via.str.ptr,
cur->val.via.str.size);

cur->val.via.str.size);
msgpack_unpacked_destroy(&result);
flb_free(pack);

Expand Down Expand Up @@ -724,7 +783,6 @@ static int in_calyptia_fleet_collect(struct flb_input_instance *ins,
goto http_error;
}

// Wed, 21 Oct 2015 07:28:00 GMT
flb_strptime(fbit_last_modified, "%a, %d %B %Y %H:%M:%S GMT", &tm_last_modified);
time_last_modified = mktime(&tm_last_modified.tm);

Expand Down Expand Up @@ -807,22 +865,24 @@ static int in_calyptia_fleet_collect(struct flb_input_instance *ins,
flb_sds_destroy(cfgoldname);
}

link(cfgname, cfgnewname);
symlink(cfgname, cfgnewname);
}

// FORCE THE RELOAD!!!
if (ctx->config_timestamp < time_last_modified) {
flb_plg_debug(ctx->ins, "new configuration is newer than current: %ld < %ld",
ctx->config_timestamp, time_last_modified);
flb_plg_info(ctx->ins, "force the reloading of the configuration file=%d.", ctx->event_fd);
flb_sds_destroy(cfgname);
flb_sds_destroy(data);

if (execute_reload(ctx, cfgnewname) == FLB_FALSE) {
if (execute_reload(ctx, cfgname) == FLB_FALSE) {
cfgoldname = old_fleet_config_filename(ctx);
cfgcurname = cur_fleet_config_filename(ctx);
rename(cfgoldname, cfgcurname);
flb_sds_destroy(cfgcurname);
flb_sds_destroy(cfgoldname);
goto reload_error;
}
else {
else {
FLB_INPUT_RETURN(0);
}
}
Expand All @@ -838,10 +898,11 @@ static int in_calyptia_fleet_collect(struct flb_input_instance *ins,
FLB_INPUT_RETURN(ret);
}

// recursively create directories, based on:
// https://stackoverflow.com/a/2336245
// who found it at:
// http://nion.modprobe.de/blog/archives/357-Recursive-directory-creation.html
/* recursively create directories, based on:
* https://stackoverflow.com/a/2336245
* who found it at:
* http://nion.modprobe.de/blog/archives/357-Recursive-directory-creation.html
*/
static int _mkdir(const char *dir, int perms) {
char tmp[255];
char *ptr = NULL;
Expand Down Expand Up @@ -907,23 +968,62 @@ static int create_fleet_directory(struct flb_in_calyptia_fleet_config *ctx)
static int load_fleet_config(struct flb_in_calyptia_fleet_config *ctx)
{
flb_ctx_t *flb_ctx = flb_context_get();

char *fname;
char *ext;
long timestamp;
char realname[4096];
ssize_t len;

if (create_fleet_directory(ctx) != 0) {
return -1;
}

// check if we are already using the fleet configuration file.
/* check if we are already using the fleet configuration file. */
if (is_fleet_config(ctx, flb_ctx->config) == FLB_FALSE) {
// check which one and load it
/* check which one and load it */
if (exists_cur_fleet_config(ctx) == FLB_TRUE) {
execute_reload(ctx, cur_fleet_config_filename(ctx));
return execute_reload(ctx, cur_fleet_config_filename(ctx));
}
else if (exists_new_fleet_config(ctx) == FLB_TRUE) {
execute_reload(ctx, new_fleet_config_filename(ctx));
else if (exists_new_fleet_config(ctx) == FLB_TRUE) {
return execute_reload(ctx, new_fleet_config_filename(ctx));
}
}
return 0;
else {
if (is_new_fleet_config(ctx, flb_ctx->config) || is_cur_fleet_config(ctx, flb_ctx->config)) {
len = readlink(flb_ctx->config->conf_path_file, realname, sizeof(realname));

if (len > sizeof(realname)) {
return FLB_FALSE;
}

fname = basename(realname);
}
else {
fname = basename(flb_ctx->config->conf_path_file);
}

if (fname == NULL) {
return FLB_FALSE;
}

errno = 0;
timestamp = strtol(fname, &ext, 10);

if ((errno == ERANGE && (timestamp == LONG_MAX || timestamp == LONG_MIN)) ||
(errno != 0 && timestamp == 0)) {
flb_errno();
return FLB_FALSE;
}

/* unable to parse the timstamp */
if (errno == ERANGE) {
return FLB_FALSE;
}

ctx->config_timestamp = timestamp;
}

return FLB_FALSE;
}

static int in_calyptia_fleet_init(struct flb_input_instance *in,
Expand Down Expand Up @@ -953,6 +1053,7 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in,
return -1;
}
ctx->ins = in;
ctx->collect_fd = -1;


/* Load the config map */
Expand Down Expand Up @@ -1004,9 +1105,18 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in,
ctx->interval_nsec = atoi(DEFAULT_INTERVAL_NSEC);
}

if (ctx->interval_sec < atoi(DEFAULT_INTERVAL_SEC)) {
ctx->interval_sec = atoi(DEFAULT_INTERVAL_SEC);
}

/* Set the context */
flb_input_set_context(in, ctx);


/* if we load a new configuration then we will be reloaded anyways */
if (load_fleet_config(ctx) == FLB_TRUE) {
return 0;
}

/* Set our collector based on time */
ret = flb_input_set_collector_time(in,
in_calyptia_fleet_collect,
Expand All @@ -1015,14 +1125,13 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in,
config);

if (ret == -1) {
flb_plg_error(ctx->ins, "could not set collector for Health input plugin");
flb_plg_error(ctx->ins, "could not initialize collector for fleet input plugin");
flb_free(ctx);
return -1;
}

ctx->collect_fd = ret;

load_fleet_config(ctx);
return 0;
}

Expand Down

0 comments on commit 64b70ed

Please sign in to comment.