From 32ec8a2ed6a3a24d1daff69e9c1560da8029c8d4 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Wed, 11 Oct 2023 16:41:59 -0300 Subject: [PATCH] in_calyptia_fleet: download and load fleet files. Download fleet files to a sub-directory with the timestamp of the relevant configuration and allow it to be loaded by changing to that directory when fluent-bit is reloaded. * Recurse through fleet files and download each one. * Change to the fleet files directory on reload so they can be referred to from the main fleet configuration file. * Delete old fleet file directories when the configuration is deleted. Signed-off-by: Phillip Whelan --- plugins/in_calyptia_fleet/in_calyptia_fleet.c | 377 ++++++++++++++++-- 1 file changed, 339 insertions(+), 38 deletions(-) diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index efa540eb0f7..d4bf4c22288 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -85,6 +85,7 @@ struct flb_in_calyptia_fleet_config { flb_sds_t cloud_port; flb_sds_t fleet_url; + flb_sds_t fleet_files_url; struct flb_input_instance *ins; /* plugin instance */ struct flb_config *config; /* Fluent Bit context */ @@ -97,6 +98,13 @@ struct flb_in_calyptia_fleet_config { int collect_fd; }; +static int get_calyptia_files(struct flb_in_calyptia_fleet_config *ctx, + struct flb_connection *u_conn, + const char *url, + time_t timestamp); + +static int fleet_cur_chdir(struct flb_in_calyptia_fleet_config *ctx); + static char *find_case_header(struct flb_http_client *cli, const char *header) { char *ptr; @@ -439,6 +447,59 @@ static int test_config_is_valid(flb_sds_t cfgpath) return ret; } +static int parse_config_name_timestamp(struct flb_in_calyptia_fleet_config *ctx, + const char *cfgpath, + long *config_timestamp) +{ + char *ext = NULL; + long timestamp; + char realname[4096]; + char *fname; + ssize_t len; + + if (config_timestamp == NULL || cfgpath == NULL) { + return FLB_FALSE; + } + + len = readlink(cfgpath, realname, sizeof(realname)); + + if (len > sizeof(realname)) { + return FLB_FALSE; + } + + fname = basename(realname); + + errno = 0; + timestamp = strtol(fname, &ext, 10); + + if ((errno == ERANGE && (timestamp == LONG_MAX || timestamp == LONG_MIN)) || + (errno != 0 && timestamp == 0)) { + flb_errno(); + return FLB_FALSE; + } + + /* unable to parse the timstamp */ + if (errno == ERANGE) { + return FLB_FALSE; + } + + *config_timestamp = timestamp; + + return FLB_TRUE; +} + +static int parse_config_timestamp(struct flb_in_calyptia_fleet_config *ctx, + long *config_timestamp) +{ + flb_ctx_t *flb_ctx = flb_context_get(); + + if (config_timestamp == NULL) { + return FLB_FALSE; + } + + return parse_config_name_timestamp(ctx, flb_ctx->config->conf_path_file, config_timestamp); +} + static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cfgpath) { struct reload_ctx *reload; @@ -446,6 +507,9 @@ static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cf pthread_attr_t ptha; flb_ctx_t *flb = flb_context_get(); + + parse_config_name_timestamp(ctx, cfgpath, &ctx->config_timestamp); + if (ctx->collect_fd > 0) { flb_input_collector_pause(ctx->collect_fd, ctx->ins); } @@ -480,6 +544,8 @@ static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cf reload->flb = flb; reload->cfg_path = cfgpath; + fleet_cur_chdir(ctx); + pthread_attr_init(&ptha); pthread_attr_setdetachstate(&ptha, PTHREAD_CREATE_DETACHED); pthread_create(&pth, &ptha, do_reload, reload); @@ -778,6 +844,7 @@ static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ct #ifdef FLB_SYSTEM_WINDOWS #define link(a, b) CreateHardLinkA(b, a, 0) +#define symlink(a, b) CreateSymLinkA(b, a, 0) ssize_t readlink(const char *path, char *realpath, size_t srealpath) { HANDLE hFile; @@ -803,10 +870,9 @@ ssize_t readlink(const char *path, char *realpath, size_t srealpath) { #endif -/* cb_collect callback */ static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx, struct flb_connection *u_conn, - const char *url, + flb_sds_t url, const char *hdr, const char *dst, time_t *time_last_modified) @@ -821,11 +887,11 @@ static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx, time_t last_modified; flb_sds_t fname; - if (ctx == NULL || u_conn == NULL || url == NULL || dst == NULL) { + if (ctx == NULL || u_conn == NULL || url == NULL) { return -1; } - client = fleet_http_do(ctx, u_conn, ctx->fleet_url); + client = fleet_http_do(ctx, u_conn, url); if (client == NULL) { return -1; @@ -838,10 +904,14 @@ static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx, goto client_error; } - flb_strptime(fbit_last_modified, "%a, %d %B %Y %H:%M:%S GMT", &tm_last_modified); - last_modified = mktime(&tm_last_modified.tm); + if (dst == NULL) { + flb_strptime(fbit_last_modified, "%a, %d %B %Y %H:%M:%S GMT", &tm_last_modified); + last_modified = mktime(&tm_last_modified.tm); - fname = time_fleet_config_filename(ctx, last_modified); + fname = time_fleet_config_filename(ctx, last_modified); + } else { + fname = flb_sds_create_len(dst, strlen(dst)); + } if (access(fname, F_OK) == 0) { ret = 0; @@ -881,6 +951,7 @@ static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx, file_error: fclose(fp); client_error: + flb_sds_destroy(fname); file_exists: flb_http_client_destroy(client); return ret; @@ -1081,6 +1152,43 @@ static int cfl_array_qsort_ini(const void *arg_a, const void *arg_b) return strcmp(var_a->data.as_string, var_b->data.as_string); } +static int calyptia_config_delete_old_dir(const char *cfgpath) +{ + flb_sds_t cfg_glob; + char *ext; + struct cfl_array *files; + int idx; + + ext = strrchr(cfgpath, '.'); + if (ext == NULL) { + return FLB_FALSE; + } + + cfg_glob = flb_sds_create_len(cfgpath, ext - cfgpath); + if (cfg_glob == NULL) { + return FLB_FALSE; + } + + if (flb_sds_cat_safe(&cfg_glob, "/*", strlen("/*")) != 0) { + flb_sds_destroy(cfg_glob); + return FLB_FALSE; + } + + files = read_glob(cfg_glob); + if (files == NULL) { + flb_sds_destroy(cfg_glob); + return FLB_FALSE; + } + + for (idx = 0; idx < ((ssize_t)files->entry_count); idx++) { + unlink(files->entries[idx]->data.as_string); + } + + flb_sds_destroy(cfg_glob); + cfl_array_destroy(files); + return FLB_TRUE; +} + static int calyptia_config_delete_old(struct flb_in_calyptia_fleet_config *ctx) { struct cfl_array *inis; @@ -1107,10 +1215,12 @@ static int calyptia_config_delete_old(struct flb_in_calyptia_fleet_config *ctx) sizeof(struct cfl_variant *), cfl_array_qsort_ini); - for (idx = 0; idx < (((ssize_t)inis->entry_count) -1 -3);idx++) { + for (idx = 0; idx < (((ssize_t)inis->entry_count) -1 - 3); idx++) { unlink(inis->entries[idx]->data.as_string); + calyptia_config_delete_old_dir(inis->entries[idx]->data.as_string); } + cfl_array_destroy(inis); flb_sds_destroy(glob_ini); return 0; } @@ -1191,6 +1301,11 @@ static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx, flb_sds_printf(&ctx->fleet_url, "/v1/fleets/%s/config?format=ini", ctx->fleet_id); } + if (ctx->fleet_files_url == NULL) { + ctx->fleet_files_url = flb_sds_create_size(4096); + flb_sds_printf(&ctx->fleet_files_url, "/v1/fleets/%s/files", ctx->fleet_id); + } + header = flb_sds_create_size(4096); if (ctx->fleet_name == NULL) { @@ -1238,10 +1353,12 @@ static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx, /* create the base file. */ ret = get_calyptia_file(ctx, u_conn, ctx->fleet_url, header, - ".ini", &time_last_modified); + NULL, &time_last_modified); /* new file created! */ if (ret == 1) { + get_calyptia_files(ctx, u_conn, ctx->fleet_files_url, time_last_modified); + cfgname = time_fleet_config_filename(ctx, time_last_modified); calyptia_config_add(ctx, cfgname); flb_sds_destroy(cfgname); @@ -1355,14 +1472,71 @@ static int create_fleet_directory(struct flb_in_calyptia_fleet_config *ctx) return 0; } +static flb_sds_t fleet_gendir(struct flb_in_calyptia_fleet_config *ctx, time_t timestamp) +{ + flb_sds_t fleetdir; + flb_sds_t fleetcurdir; + + + if (generate_base_fleet_directory(ctx, &fleetdir) == NULL) { + return NULL; + } + + fleetcurdir = flb_sds_create_size(strlen(fleetdir) + 32); + + if (fleetcurdir == NULL) { + flb_sds_destroy(fleetdir); + return NULL; + } + + if (flb_sds_printf(&fleetcurdir, "%s/%ld", fleetdir, timestamp) == NULL) { + flb_sds_destroy(fleetdir); + flb_sds_destroy(fleetcurdir); + return NULL; + } + + flb_sds_destroy(fleetdir); + + return fleetcurdir; +} + +static int fleet_mkdir(struct flb_in_calyptia_fleet_config *ctx, time_t timestamp) +{ + flb_sds_t fleetcurdir; + + fleetcurdir = fleet_gendir(ctx, timestamp); + + if (fleetcurdir == NULL) { + return -1; + } + + __mkdir(fleetcurdir, 0700); + flb_sds_destroy(fleetcurdir); + + return 0; +} + +static int fleet_cur_chdir(struct flb_in_calyptia_fleet_config *ctx) +{ + flb_sds_t fleetcurdir; + int ret; + + fleetcurdir = fleet_gendir(ctx, ctx->config_timestamp); + flb_plg_info(ctx->ins, "changing to config dir: %s", fleetcurdir); + + if (fleetcurdir == NULL) { + return -1; + } + + ret = chdir(fleetcurdir); + flb_sds_destroy(fleetcurdir); + + return ret; +} + static int load_fleet_config(struct flb_in_calyptia_fleet_config *ctx) { flb_ctx_t *flb_ctx = flb_context_get(); - char *fname; - char *ext; - long timestamp; - char realname[4096]; - ssize_t len; if (create_fleet_directory(ctx) != 0) { return -1; @@ -1379,41 +1553,168 @@ static int load_fleet_config(struct flb_in_calyptia_fleet_config *ctx) } } else { - if (is_new_fleet_config(ctx, flb_ctx->config) || is_cur_fleet_config(ctx, flb_ctx->config)) { - len = readlink(flb_ctx->config->conf_path_file, realname, sizeof(realname)); + parse_config_timestamp(ctx, &ctx->config_timestamp); + } - if (len > sizeof(realname)) { - return FLB_FALSE; - } + return FLB_FALSE; +} - fname = basename(realname); - } - else { - fname = basename(flb_ctx->config->conf_path_file); - } +static int create_fleet_file(flb_sds_t fleetdir, + const char *name, + int nlen, + const char *b64_content, + int blen) +{ + flb_sds_t fname; + flb_sds_t dst; + size_t dlen = 2 * blen; + FILE *fp; + int ret; - if (fname == NULL) { - return FLB_FALSE; - } + fname = flb_sds_create_size(strlen(fleetdir) + nlen + 2); + if (fname == NULL) { + return -1; + } + + if (flb_sds_cat_safe(&fname, fleetdir, strlen(fleetdir)) < 0) { + flb_sds_destroy(fname); + return -1; + } + + if (flb_sds_cat_safe(&fname, "/", 1) < 0) { + flb_sds_destroy(fname); + return -1; + } + + if (flb_sds_cat_safe(&fname, name, nlen) < 0) { + flb_sds_destroy(fname); + return -1; + } + + fp = fopen(fname, "w+"); + if (fp == NULL) { + return -1; + } + + dst = flb_sds_create_size(dlen); + ret = flb_base64_decode((unsigned char *)dst, dlen, &dlen, + (unsigned char *)b64_content, blen); + + if (ret != 0) { + return -1; + } - errno = 0; - timestamp = strtol(fname, &ext, 10); + fwrite(dst, dlen, 1, fp); - if ((errno == ERANGE && (timestamp == LONG_MAX || timestamp == LONG_MIN)) || - (errno != 0 && timestamp == 0)) { - flb_errno(); - return FLB_FALSE; + fclose(fp); + flb_sds_destroy(dst); + flb_sds_destroy(fname); + + return 0; +} + +static int create_fleet_files(struct flb_in_calyptia_fleet_config *ctx, + char *payload, size_t size, time_t timestamp) +{ + int ret; + int out_size; + char *pack; + struct flb_pack_state pack_state; + size_t off = 0; + int idx; + flb_sds_t fleetdir; + msgpack_unpacked result; + msgpack_object *map; + msgpack_object *name; + msgpack_object *contents; + + /* Initialize packer */ + flb_pack_state_init(&pack_state); + + /* Pack JSON as msgpack */ + ret = flb_pack_json_state(payload, size, + &pack, &out_size, &pack_state); + flb_pack_state_reset(&pack_state); + + /* Handle exceptions */ + if (ret == FLB_ERR_JSON_PART || ret == FLB_ERR_JSON_INVAL || ret == -1) { + flb_plg_warn(ctx->ins, "invalid JSON message, skipping"); + return -1; + } + + fleetdir = fleet_gendir(ctx, timestamp); + + msgpack_unpacked_init(&result); + while (msgpack_unpack_next(&result, pack, out_size, &off) == MSGPACK_UNPACK_SUCCESS) { + if (result.data.type != MSGPACK_OBJECT_ARRAY) { + continue; } + for (idx = 0; idx < result.data.via.array.size; idx++) { + map = msgpack_lookup_array_offset(&result.data, idx); - /* unable to parse the timstamp */ - if (errno == ERANGE) { - return FLB_FALSE; + if (map == NULL) { + return -1; + } + + name = msgpack_lookup_map_key(map, "name"); + if (name == NULL) { + return -1; + } + if (name->type != MSGPACK_OBJECT_STR) { + return -1; + } + + contents = msgpack_lookup_map_key(map, "contents"); + if (contents == NULL) { + return -1; + } + if (contents->type != MSGPACK_OBJECT_STR) { + return -1; + } + + create_fleet_file(fleetdir, + name->via.str.ptr, + name->via.str.size, + contents->via.str.ptr, + contents->via.str.size); } + } - ctx->config_timestamp = timestamp; + msgpack_unpacked_destroy(&result); + flb_free(pack); + + return 0; +} + +static int get_calyptia_files(struct flb_in_calyptia_fleet_config *ctx, + struct flb_connection *u_conn, + const char *url, + time_t timestamp) +{ + struct flb_http_client *client; + int ret = -1; + + if (ctx == NULL || u_conn == NULL || url == NULL) { + return -1; } - return FLB_FALSE; + client = fleet_http_do(ctx, u_conn, ctx->fleet_files_url); + + if (client == NULL) { + return -1; + } + + fleet_mkdir(ctx, timestamp); + ret = create_fleet_files(ctx, client->resp.payload, client->resp.payload_size, timestamp); + if (ret != 0) { + goto file_error; + } + + ret = 1; + +file_error: + flb_http_client_destroy(client); + return ret; } static int in_calyptia_fleet_init(struct flb_input_instance *in,