Skip to content

Commit

Permalink
in_calyptia_fleet: use a single header file to store the base configu…
Browse files Browse the repository at this point in the history
…ration.

Use a header.conf file that is refreshed on each invocation of fluent-bit
to track the main configuration for the fleet agent. This file is then
included by the downloade fleet configuration files.

Signed-off-by: Phillip Whelan <[email protected]>
  • Loading branch information
pwhelan committed Nov 6, 2023
1 parent c22915e commit fecc7e2
Showing 1 changed file with 155 additions and 76 deletions.
231 changes: 155 additions & 76 deletions plugins/in_calyptia_fleet/in_calyptia_fleet.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <msgpack.h>
#include <fluent-bit/flb_input.h>
#include <fluent-bit/flb_input_plugin.h>
#include <fluent-bit/flb_custom.h>
#include <fluent-bit/flb_config.h>
#include <fluent-bit/flb_config_map.h>
#include <fluent-bit/flb_error.h>
Expand All @@ -34,6 +35,7 @@
#include <fluent-bit/flb_strptime.h>
#include <fluent-bit/flb_reload.h>
#include <fluent-bit/flb_lib.h>
#include <fluent-bit/flb_kv.h>
#include <fluent-bit/config_format/flb_cf_fluentbit.h>
#include <fluent-bit/flb_base64.h>

Expand Down Expand Up @@ -389,21 +391,17 @@ static int is_old_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct
return ret;
}

static int is_timestamped_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg)
static int is_timestamped_fleet_config_path(struct flb_in_calyptia_fleet_config *ctx, const char *path)
{
char *fname;
char *end;
long val;

if (cfg == NULL) {
if (path == NULL) {
return FLB_FALSE;
}

if (cfg->conf_path_file == NULL) {
return FLB_FALSE;
}

fname = strrchr(cfg->conf_path_file, PATH_SEPARATOR[0]);
fname = strrchr(path, PATH_SEPARATOR[0]);

if (fname == NULL) {
return FLB_FALSE;
Expand All @@ -427,6 +425,19 @@ static int is_timestamped_fleet_config(struct flb_in_calyptia_fleet_config *ctx,
return FLB_FALSE;
}

static int is_timestamped_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg)
{
if (cfg == NULL) {
return FLB_FALSE;
}

if (cfg->conf_path_file == NULL) {
return FLB_FALSE;
}

return is_timestamped_fleet_config_path(ctx, cfg->conf_path_file);
}

static int is_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg)
{
if (cfg == NULL) {
Expand Down Expand Up @@ -679,15 +690,6 @@ static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cf
return FLB_TRUE;
}

static char *tls_setting_string(int use_tls)
{
if (use_tls) {
return "On";
}

return "Off";
}

static msgpack_object *msgpack_lookup_map_key(msgpack_object *obj, const char *keyname)
{
int idx;
Expand Down Expand Up @@ -1329,7 +1331,8 @@ static int calyptia_config_delete_old_dir(const char *cfgpath)

static int calyptia_config_delete_old(struct flb_in_calyptia_fleet_config *ctx)
{
struct cfl_array *inis;
struct cfl_array *confs;
struct cfl_array *tconfs;
flb_sds_t glob_files = NULL;
int idx;

Expand All @@ -1347,22 +1350,42 @@ static int calyptia_config_delete_old(struct flb_in_calyptia_fleet_config *ctx)
return -1;
}

inis = read_glob(glob_files);
if (inis == NULL) {
confs = read_glob(glob_files);
if (confs == NULL) {
flb_sds_destroy(glob_files);
return -1;
}

qsort(inis->entries, inis->entry_count,
tconfs = cfl_array_create(1);
if (tconfs == NULL) {
flb_sds_destroy(glob_files);
cfl_array_destroy(confs);
return -1;
}

if (cfl_array_resizable(tconfs, FLB_TRUE) != 0) {
flb_sds_destroy(glob_files);
cfl_array_destroy(confs);
return -1;
}

for (idx = 0; idx < confs->entry_count; idx++) {
if (is_timestamped_fleet_config_path(ctx, confs->entries[idx]->data.as_string) == FLB_TRUE) {
cfl_array_append_string(tconfs, confs->entries[idx]->data.as_string);
}
}

qsort(tconfs->entries, tconfs->entry_count,
sizeof(struct cfl_variant *),
cfl_array_qsort_conf_files);

for (idx = 0; idx < (((ssize_t)inis->entry_count) -1 - 3); idx++) {
unlink(inis->entries[idx]->data.as_string);
calyptia_config_delete_old_dir(inis->entries[idx]->data.as_string);
for (idx = 0; idx < (((ssize_t)tconfs->entry_count) -3); idx++) {
unlink(tconfs->entries[idx]->data.as_string);
calyptia_config_delete_old_dir(tconfs->entries[idx]->data.as_string);
}

cfl_array_destroy(inis);
cfl_array_destroy(confs);
cfl_array_destroy(tconfs);
flb_sds_destroy(glob_files);

return 0;
Expand All @@ -1373,6 +1396,8 @@ static flb_sds_t calyptia_config_get_newest(struct flb_in_calyptia_fleet_config
struct cfl_array *inis;
flb_sds_t glob_conf_files = NULL;
flb_sds_t cfgnewname = NULL;
const char *curconf;
int idx;

if (ctx == NULL) {
return NULL;
Expand Down Expand Up @@ -1402,8 +1427,13 @@ static flb_sds_t calyptia_config_get_newest(struct flb_in_calyptia_fleet_config
sizeof(struct cfl_variant *),
cfl_array_qsort_conf_files);

cfgnewname = flb_sds_create_len(inis->entries[inis->entry_count-1]->data.as_string,
strlen(inis->entries[inis->entry_count-1]->data.as_string));
for (idx = inis->entry_count-1; idx >= 0; idx--) {
curconf = inis->entries[idx]->data.as_string;
if (is_timestamped_fleet_config_path(ctx, curconf)) {
cfgnewname = flb_sds_create(curconf);
break;
}
}

cfl_array_destroy(inis);
flb_sds_destroy(glob_conf_files);
Expand Down Expand Up @@ -1487,12 +1517,6 @@ static int calyptia_config_commit(struct flb_in_calyptia_fleet_config *ctx)
unlink(cfgoldname);
}

if (exists_cur_fleet_config(ctx) == FLB_TRUE) {
if (rename(cfgcurname, cfgoldname)) {
goto error;
}
}

if (exists_new_fleet_config(ctx) == FLB_TRUE) {
if (rename(cfgnewname, cfgcurname)) {
goto error;
Expand Down Expand Up @@ -1580,12 +1604,100 @@ static int calyptia_config_rollback(struct flb_in_calyptia_fleet_config *ctx,
}
#endif

static void fleet_config_get_properties(flb_sds_t *buf, struct mk_list *props)
{
struct mk_list *head;
struct flb_kv *kv;

mk_list_foreach(head, props) {
kv = mk_list_entry(head, struct flb_kv, _head);

if (kv->key != NULL && kv->val != NULL) {
flb_sds_printf(buf, " %s ", kv->key);
flb_sds_cat_safe(buf, kv->val, strlen(kv->val));
flb_sds_cat_safe(buf, "\n", 1);
}
}
}

flb_sds_t fleet_config_get(struct flb_in_calyptia_fleet_config *ctx)
{
flb_sds_t buf;
struct mk_list *head;
struct flb_custom_instance *c_ins;

buf = flb_sds_create_size(2048);

if (!buf) {
return NULL;
}

/* [INPUT] */
mk_list_foreach(head, &ctx->config->customs) {
c_ins = mk_list_entry(head, struct flb_custom_instance, _head);
if (strcasecmp(c_ins->p->name, "calyptia")) {
continue;
}
flb_sds_printf(&buf, "[CUSTOM]\n");
flb_sds_printf(&buf, " name %s\n", c_ins->p->name);

fleet_config_get_properties(&buf, &c_ins->properties);

if (ctx->fleet_id && flb_config_prop_get("fleet_id", &c_ins->properties) == NULL) {
flb_sds_printf(&buf, " fleet_id %s\n", ctx->fleet_id);
}
}
flb_sds_printf(&buf, "\n");

return buf;
}

static int create_fleet_header(struct flb_in_calyptia_fleet_config *ctx)
{
flb_sds_t hdrname;
FILE *fp;
flb_sds_t header;
int rc = FLB_FALSE;


hdrname = fleet_config_filename(ctx, "header");
if (hdrname == NULL) {
goto hdrname_error;
}

header = fleet_config_get(ctx);
if (header == NULL) {
goto header_error;
}

fp = fopen(hdrname, "w+");
if (fp == NULL) {
goto file_open_error;
}

if (fwrite(header, strlen(header), 1, fp) < 1) {
goto file_error;
}

rc = FLB_TRUE;

file_error:
fclose(fp);
file_open_error:
flb_sds_destroy(header);
header_error:
flb_sds_destroy(hdrname);
hdrname_error:
return rc;
}

static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx,
struct flb_connection *u_conn)
{
flb_sds_t cfgname;
flb_sds_t cfgnewname;
flb_sds_t header;
flb_sds_t hdrname;
time_t time_last_modified;
int ret = -1;

Expand All @@ -1599,50 +1711,11 @@ static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx,
flb_sds_printf(&ctx->fleet_files_url, "/v1/fleets/%s/files", ctx->fleet_id);
}

header = flb_sds_create_size(4096);

if (ctx->fleet_name == NULL) {
flb_sds_printf(&header,
"[CUSTOM]\n"
" Name calyptia\n"
" api_key %s\n"
" fleet_id %s\n"
" add_label fleet_id %s\n"
" fleet.config_dir %s\n"
" calyptia_host %s\n"
" calyptia_port %d\n"
" calyptia_tls %s\n",
ctx->api_key,
ctx->fleet_id,
ctx->fleet_id,
ctx->config_dir,
ctx->ins->host.name,
ctx->ins->host.port,
tls_setting_string(ctx->ins->use_tls)
);
}
else {
flb_sds_printf(&header,
"[CUSTOM]\n"
" Name calyptia\n"
" api_key %s\n"
" fleet_name %s\n"
" fleet_id %s\n"
" add_label fleet_id %s\n"
" fleet.config_dir %s\n"
" calyptia_host %s\n"
" calyptia_port %d\n"
" calyptia_tls %s\n",
ctx->api_key,
ctx->fleet_name,
ctx->fleet_id,
ctx->fleet_id,
ctx->config_dir,
ctx->ins->host.name,
ctx->ins->host.port,
tls_setting_string(ctx->ins->use_tls)
);
}
create_fleet_header(ctx);

hdrname = fleet_config_filename(ctx, "header");
header = flb_sds_create_size(32);
flb_sds_printf(&header, "@include %s\n\n", hdrname);

/* create the base file. */
ret = get_calyptia_file(ctx, u_conn, ctx->fleet_url, header,
Expand Down Expand Up @@ -2066,6 +2139,7 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in,
}
ctx->ins = in;
ctx->collect_fd = -1;
ctx->config = config;


/* Load the config map */
Expand Down Expand Up @@ -2126,6 +2200,11 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in,
/* Set the context */
flb_input_set_context(in, ctx);

/* refresh calyptia settings before attempting to load the fleet
* configuration file.
*/
create_fleet_header(ctx);

/* if we load a new configuration then we will be reloaded anyways */
if (load_fleet_config(ctx) == FLB_TRUE) {
return 0;
Expand Down

0 comments on commit fecc7e2

Please sign in to comment.