Skip to content

Commit

Permalink
custom_calyptia: honour interval in secs and nano secs.
Browse files Browse the repository at this point in the history
This change makes the custom calyptia plugin honour the
configured interval (both in secs and nanosecs) that is
cascaded to the instantiated input fleet plugin.

Signed-off-by: Jorge Niedbalski <[email protected]>
  • Loading branch information
Jorge Niedbalski committed Nov 26, 2024
1 parent faf7da1 commit 68ecb95
Show file tree
Hide file tree
Showing 5 changed files with 330 additions and 99 deletions.
159 changes: 65 additions & 94 deletions plugins/custom_calyptia/calyptia.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,40 +30,7 @@

#include <fluent-bit/flb_hash.h>

struct calyptia {
/* config map options */
flb_sds_t api_key;
flb_sds_t store_path;
flb_sds_t cloud_host;
flb_sds_t cloud_port;
flb_sds_t machine_id;
int machine_id_auto_configured;

/* used for reporting chunk trace records. */
#ifdef FLB_HAVE_CHUNK_TRACE
flb_sds_t pipeline_id;
#endif /* FLB_HAVE_CHUNK_TRACE */

int cloud_tls;
int cloud_tls_verify;

/* config reader for 'add_label' */
struct mk_list *add_labels;

/* instances */
struct flb_input_instance *i;
struct flb_output_instance *o;
struct flb_input_instance *fleet;
struct flb_custom_instance *ins;

/* Fleet configuration */
flb_sds_t fleet_id; /* fleet-id */
flb_sds_t fleet_name;
flb_sds_t fleet_config_dir; /* fleet configuration directory */
flb_sds_t fleet_max_http_buffer_size;
int fleet_interval_sec;
int fleet_interval_nsec;
};
#include "calyptia.h"

/*
* Check if the key belongs to a sensitive data field, if so report it. We never
Expand Down Expand Up @@ -232,6 +199,53 @@ flb_sds_t custom_calyptia_pipeline_config_get(struct flb_config *ctx)
return buf;
}

int set_fleet_input_properties(struct calyptia *ctx, struct flb_input_instance *fleet)
{
if (!fleet) {
flb_plg_error(ctx->ins, "invalid fleet input instance");
return -1;
}

if (ctx->fleet_name) {
flb_input_set_property(fleet, "fleet_name", ctx->fleet_name);
}

if (ctx->fleet_id) {
flb_input_set_property(fleet, "fleet_id", ctx->fleet_id);
}

flb_input_set_property(fleet, "api_key", ctx->api_key);
flb_input_set_property(fleet, "host", ctx->cloud_host);
flb_input_set_property(fleet, "port", ctx->cloud_port);

/* Set TLS properties */
flb_input_set_property(fleet, "tls", ctx->cloud_tls == 1 ? "on" : "off");
flb_input_set_property(fleet, "tls.verify", ctx->cloud_tls_verify == 1 ? "on" : "off");

/* Optional configurations */
if (ctx->fleet_config_dir) {
flb_input_set_property(fleet, "config_dir", ctx->fleet_config_dir);
}

if (ctx->fleet_max_http_buffer_size) {
flb_input_set_property(fleet, "max_http_buffer_size", ctx->fleet_max_http_buffer_size);
}

if (ctx->machine_id) {
flb_input_set_property(fleet, "machine_id", ctx->machine_id);
}

if (ctx->fleet_interval_sec) {
flb_input_set_property(fleet, "interval_sec", ctx->fleet_interval_sec);
}

if (ctx->fleet_interval_nsec) {
flb_input_set_property(fleet, "interval_nsec", ctx->fleet_interval_nsec);
}

return 0;
}

static struct flb_output_instance *setup_cloud_output(struct flb_config *config, struct calyptia *ctx)
{
int ret;
Expand Down Expand Up @@ -387,16 +401,14 @@ static flb_sds_t get_machine_id(struct calyptia *ctx)
}

static int cb_calyptia_init(struct flb_custom_instance *ins,
struct flb_config *config,
void *data)
struct flb_config *config,
void *data)
{
int ret;
struct calyptia *ctx;
int is_fleet_mode;
(void) data;

ctx = flb_calloc(1, sizeof(struct calyptia));

if (!ctx) {
flb_errno();
return -1;
Expand All @@ -405,7 +417,6 @@ static int cb_calyptia_init(struct flb_custom_instance *ins,

/* Load the config map */
ret = flb_custom_config_map_set(ins, (void *) ctx);

if (ret == -1) {
flb_free(ctx);
return -1;
Expand All @@ -416,21 +427,17 @@ static int cb_calyptia_init(struct flb_custom_instance *ins,

/* If no machine_id has been provided via a configuration option get it from the local machine-id. */
if (!ctx->machine_id) {
/* machine id */
ctx->machine_id = get_machine_id(ctx);

if (ctx->machine_id == NULL) {
flb_plg_error(ctx->ins, "unable to retrieve machine_id");
flb_free(ctx);
return -1;
}

ctx->machine_id_auto_configured = 1;
}

/* input collector */
ctx->i = flb_input_new(config, "fluentbit_metrics", NULL, FLB_TRUE);

if (!ctx->i) {
flb_plg_error(ctx->ins, "could not load metrics collector");
flb_free(ctx);
Expand All @@ -439,76 +446,40 @@ static int cb_calyptia_init(struct flb_custom_instance *ins,

flb_input_set_property(ctx->i, "tag", "_calyptia_cloud");
flb_input_set_property(ctx->i, "scrape_on_start", "true");
// This scrape interval should be configurable.
flb_input_set_property(ctx->i, "scrape_interval", "30");

if (ctx->fleet_name || ctx->fleet_id) {
is_fleet_mode = FLB_TRUE;
}
else {
is_fleet_mode = FLB_FALSE;
}

/* output cloud connector */
if ((is_fleet_mode == FLB_TRUE && ctx->fleet_id != NULL) ||
(is_fleet_mode == FLB_FALSE)) {
/* Setup cloud output if needed */
if (ctx->fleet_id != NULL || !ctx->fleet_name) {
ctx->o = setup_cloud_output(config, ctx);

if (ctx->o == NULL) {
flb_free(ctx);
return -1;
}
/* Set fleet_id for output if present */
if (ctx->fleet_id) {
flb_output_set_property(ctx->o, "fleet_id", ctx->fleet_id);
}
}

/* Setup fleet input if needed */
if (ctx->fleet_id || ctx->fleet_name) {
ctx->fleet = flb_input_new(config, "calyptia_fleet", NULL, FLB_FALSE);

ctx->fleet = flb_input_new(config, "calyptia_fleet", NULL, FLB_FALSE);
if (!ctx->fleet) {
flb_plg_error(ctx->ins, "could not load Calyptia Fleet plugin");
return -1;
}

if (ctx->fleet_name) {
flb_input_set_property(ctx->fleet, "fleet_name", ctx->fleet_name);
}

if (ctx->fleet_id) {
flb_output_set_property(ctx->o, "fleet_id", ctx->fleet_id);
flb_input_set_property(ctx->fleet, "fleet_id", ctx->fleet_id);
}

flb_input_set_property(ctx->fleet, "api_key", ctx->api_key);
flb_input_set_property(ctx->fleet, "host", ctx->cloud_host);
flb_input_set_property(ctx->fleet, "port", ctx->cloud_port);

if (ctx->cloud_tls == 1) {
flb_input_set_property(ctx->fleet, "tls", "on");
}
else {
flb_input_set_property(ctx->fleet, "tls", "off");
}

if (ctx->cloud_tls_verify == 1) {
flb_input_set_property(ctx->fleet, "tls.verify", "on");
}
else {
flb_input_set_property(ctx->fleet, "tls.verify", "off");
}

if (ctx->fleet_config_dir) {
flb_input_set_property(ctx->fleet, "config_dir", ctx->fleet_config_dir);
}

if (ctx->fleet_max_http_buffer_size) {
flb_input_set_property(ctx->fleet, "max_http_buffer_size", ctx->fleet_max_http_buffer_size);
}
if (ctx->machine_id) {
flb_input_set_property(ctx->fleet, "machine_id", ctx->machine_id);
ret = set_fleet_input_properties(ctx, ctx->fleet);
if (ret == -1) {
return -1;
}
}

if (ctx->o) {
flb_router_connect(ctx->i, ctx->o);
}

flb_plg_info(ins, "custom initialized!");
return 0;
}
Expand Down Expand Up @@ -587,12 +558,12 @@ static struct flb_config_map config_map[] = {
"Base path for the configuration directory."
},
{
FLB_CONFIG_MAP_INT, "fleet.interval_sec", "-1",
FLB_CONFIG_MAP_STR, "fleet.interval_sec", "-1",
0, FLB_TRUE, offsetof(struct calyptia, fleet_interval_sec),
"Set the collector interval"
},
{
FLB_CONFIG_MAP_INT, "fleet.interval_nsec", "-1",
FLB_CONFIG_MAP_STR, "fleet.interval_nsec", "-1",
0, FLB_TRUE, offsetof(struct calyptia, fleet_interval_nsec),
"Set the collector interval (nanoseconds)"
},
Expand Down
59 changes: 59 additions & 0 deletions plugins/custom_calyptia/calyptia.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2015-2024 The Fluent Bit Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef FLB_CALYPTIA_H
#define FLB_CALYPTIA_H

struct calyptia {
/* config map options */
flb_sds_t api_key;
flb_sds_t store_path;
flb_sds_t cloud_host;
flb_sds_t cloud_port;
flb_sds_t machine_id;
int machine_id_auto_configured;

/* used for reporting chunk trace records. */
#ifdef FLB_HAVE_CHUNK_TRACE
flb_sds_t pipeline_id;
#endif /* FLB_HAVE_CHUNK_TRACE */

int cloud_tls;
int cloud_tls_verify;

/* config reader for 'add_label' */
struct mk_list *add_labels;

/* instances */
struct flb_input_instance *i;
struct flb_output_instance *o;
struct flb_input_instance *fleet;
struct flb_custom_instance *ins;

/* Fleet configuration */
flb_sds_t fleet_id; /* fleet-id */
flb_sds_t fleet_name;
flb_sds_t fleet_config_dir; /* fleet configuration directory */
flb_sds_t fleet_max_http_buffer_size;
flb_sds_t fleet_interval_sec;
flb_sds_t fleet_interval_nsec;
};

int set_fleet_input_properties(struct calyptia *ctx, struct flb_input_instance *fleet);
#endif /* FLB_CALYPTIA_H */
13 changes: 8 additions & 5 deletions plugins/in_calyptia_fleet/in_calyptia_fleet.c
Original file line number Diff line number Diff line change
Expand Up @@ -2233,7 +2233,6 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in,
ctx->collect_fd = -1;
ctx->fleet_id_found = FLB_FALSE;


/* Load the config map */
ret = flb_input_config_map_set(in, (void *) ctx);
if (ret == -1) {
Expand Down Expand Up @@ -2278,14 +2277,16 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in,
return -1;
}

/* Log initial interval values */
flb_plg_debug(ctx->ins, "initial collector interval: sec=%d nsec=%d",
ctx->interval_sec, ctx->interval_nsec);

if (ctx->interval_sec <= 0 && ctx->interval_nsec <= 0) {
/* Illegal settings. Override them. */
ctx->interval_sec = atoi(DEFAULT_INTERVAL_SEC);
ctx->interval_nsec = atoi(DEFAULT_INTERVAL_NSEC);
}

if (ctx->interval_sec < atoi(DEFAULT_INTERVAL_SEC)) {
ctx->interval_sec = atoi(DEFAULT_INTERVAL_SEC);
flb_plg_info(ctx->ins, "invalid interval settings, using defaults: sec=%d nsec=%d",
ctx->interval_sec, ctx->interval_nsec);
}

/* Set the context */
Expand Down Expand Up @@ -2328,6 +2329,8 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in,
}

ctx->collect_fd = ret;
flb_plg_info(ctx->ins, "fleet collector initialized with interval: %d sec %d nsec",
ctx->interval_sec, ctx->interval_nsec);

return 0;
}
Expand Down
26 changes: 26 additions & 0 deletions tests/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,32 @@ if(FLB_OUT_LIB)
FLB_RT_TEST(FLB_IN_KUBERNETES_EVENTS "in_kubernetes_events.c")
endif()

if (FLB_CUSTOM_CALYPTIA)
# Define common variables for calyptia tests
set(CALYPTIA_TEST_LINK_LIBS
fluent-bit-static
${CMAKE_THREAD_LIBS_INIT}
)

# Add calyptia input properties test
set(TEST_TARGET "flb-rt-calyptia_input_properties")
add_executable(${TEST_TARGET}
"custom_calyptia_input_test.c"
"../../plugins/custom_calyptia/calyptia.c"
)

target_link_libraries(${TEST_TARGET}
${CALYPTIA_TEST_LINK_LIBS}
)

add_test(NAME ${TEST_TARGET}
COMMAND ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${TEST_TARGET}
WORKING_DIRECTORY ${CMAKE_HOME_DIRECTORY}/build)

set_tests_properties(${TEST_TARGET} PROPERTIES LABELS "runtime")
add_dependencies(${TEST_TARGET} fluent-bit-static)
endif()

if(FLB_IN_EBPF)
# Define common variables
set(EBPF_TEST_INCLUDE_DIRS
Expand Down
Loading

0 comments on commit 68ecb95

Please sign in to comment.