diff --git a/plugins/custom_calyptia/calyptia.c b/plugins/custom_calyptia/calyptia.c index c3554157598..76f5868efdc 100644 --- a/plugins/custom_calyptia/calyptia.c +++ b/plugins/custom_calyptia/calyptia.c @@ -30,40 +30,7 @@ #include -struct calyptia { - /* config map options */ - flb_sds_t api_key; - flb_sds_t store_path; - flb_sds_t cloud_host; - flb_sds_t cloud_port; - flb_sds_t machine_id; - int machine_id_auto_configured; - -/* used for reporting chunk trace records. */ -#ifdef FLB_HAVE_CHUNK_TRACE - flb_sds_t pipeline_id; -#endif /* FLB_HAVE_CHUNK_TRACE */ - - int cloud_tls; - int cloud_tls_verify; - - /* config reader for 'add_label' */ - struct mk_list *add_labels; - - /* instances */ - struct flb_input_instance *i; - struct flb_output_instance *o; - struct flb_input_instance *fleet; - struct flb_custom_instance *ins; - - /* Fleet configuration */ - flb_sds_t fleet_id; /* fleet-id */ - flb_sds_t fleet_name; - flb_sds_t fleet_config_dir; /* fleet configuration directory */ - flb_sds_t fleet_max_http_buffer_size; - int fleet_interval_sec; - int fleet_interval_nsec; -}; +#include "calyptia.h" /* * Check if the key belongs to a sensitive data field, if so report it. We never @@ -232,6 +199,53 @@ flb_sds_t custom_calyptia_pipeline_config_get(struct flb_config *ctx) return buf; } +int set_fleet_input_properties(struct calyptia *ctx, struct flb_input_instance *fleet) +{ + if (!fleet) { + flb_plg_error(ctx->ins, "invalid fleet input instance"); + return -1; + } + + if (ctx->fleet_name) { + flb_input_set_property(fleet, "fleet_name", ctx->fleet_name); + } + + if (ctx->fleet_id) { + flb_input_set_property(fleet, "fleet_id", ctx->fleet_id); + } + + flb_input_set_property(fleet, "api_key", ctx->api_key); + flb_input_set_property(fleet, "host", ctx->cloud_host); + flb_input_set_property(fleet, "port", ctx->cloud_port); + + /* Set TLS properties */ + flb_input_set_property(fleet, "tls", ctx->cloud_tls == 1 ? "on" : "off"); + flb_input_set_property(fleet, "tls.verify", ctx->cloud_tls_verify == 1 ? "on" : "off"); + + /* Optional configurations */ + if (ctx->fleet_config_dir) { + flb_input_set_property(fleet, "config_dir", ctx->fleet_config_dir); + } + + if (ctx->fleet_max_http_buffer_size) { + flb_input_set_property(fleet, "max_http_buffer_size", ctx->fleet_max_http_buffer_size); + } + + if (ctx->machine_id) { + flb_input_set_property(fleet, "machine_id", ctx->machine_id); + } + + if (ctx->fleet_interval_sec) { + flb_input_set_property(fleet, "interval_sec", ctx->fleet_interval_sec); + } + + if (ctx->fleet_interval_nsec) { + flb_input_set_property(fleet, "interval_nsec", ctx->fleet_interval_nsec); + } + + return 0; +} + static struct flb_output_instance *setup_cloud_output(struct flb_config *config, struct calyptia *ctx) { int ret; @@ -387,16 +401,14 @@ static flb_sds_t get_machine_id(struct calyptia *ctx) } static int cb_calyptia_init(struct flb_custom_instance *ins, - struct flb_config *config, - void *data) + struct flb_config *config, + void *data) { int ret; struct calyptia *ctx; - int is_fleet_mode; (void) data; ctx = flb_calloc(1, sizeof(struct calyptia)); - if (!ctx) { flb_errno(); return -1; @@ -405,7 +417,6 @@ static int cb_calyptia_init(struct flb_custom_instance *ins, /* Load the config map */ ret = flb_custom_config_map_set(ins, (void *) ctx); - if (ret == -1) { flb_free(ctx); return -1; @@ -416,21 +427,17 @@ static int cb_calyptia_init(struct flb_custom_instance *ins, /* If no machine_id has been provided via a configuration option get it from the local machine-id. */ if (!ctx->machine_id) { - /* machine id */ ctx->machine_id = get_machine_id(ctx); - if (ctx->machine_id == NULL) { flb_plg_error(ctx->ins, "unable to retrieve machine_id"); flb_free(ctx); return -1; } - ctx->machine_id_auto_configured = 1; } /* input collector */ ctx->i = flb_input_new(config, "fluentbit_metrics", NULL, FLB_TRUE); - if (!ctx->i) { flb_plg_error(ctx->ins, "could not load metrics collector"); flb_free(ctx); @@ -439,76 +446,40 @@ static int cb_calyptia_init(struct flb_custom_instance *ins, flb_input_set_property(ctx->i, "tag", "_calyptia_cloud"); flb_input_set_property(ctx->i, "scrape_on_start", "true"); + // This scrape interval should be configurable. flb_input_set_property(ctx->i, "scrape_interval", "30"); - if (ctx->fleet_name || ctx->fleet_id) { - is_fleet_mode = FLB_TRUE; - } - else { - is_fleet_mode = FLB_FALSE; - } - - /* output cloud connector */ - if ((is_fleet_mode == FLB_TRUE && ctx->fleet_id != NULL) || - (is_fleet_mode == FLB_FALSE)) { + /* Setup cloud output if needed */ + if (ctx->fleet_id != NULL || !ctx->fleet_name) { ctx->o = setup_cloud_output(config, ctx); - if (ctx->o == NULL) { flb_free(ctx); return -1; } + /* Set fleet_id for output if present */ + if (ctx->fleet_id) { + flb_output_set_property(ctx->o, "fleet_id", ctx->fleet_id); + } } + /* Setup fleet input if needed */ if (ctx->fleet_id || ctx->fleet_name) { - ctx->fleet = flb_input_new(config, "calyptia_fleet", NULL, FLB_FALSE); - + ctx->fleet = flb_input_new(config, "calyptia_fleet", NULL, FLB_FALSE); if (!ctx->fleet) { flb_plg_error(ctx->ins, "could not load Calyptia Fleet plugin"); return -1; } - if (ctx->fleet_name) { - flb_input_set_property(ctx->fleet, "fleet_name", ctx->fleet_name); - } - - if (ctx->fleet_id) { - flb_output_set_property(ctx->o, "fleet_id", ctx->fleet_id); - flb_input_set_property(ctx->fleet, "fleet_id", ctx->fleet_id); - } - - flb_input_set_property(ctx->fleet, "api_key", ctx->api_key); - flb_input_set_property(ctx->fleet, "host", ctx->cloud_host); - flb_input_set_property(ctx->fleet, "port", ctx->cloud_port); - - if (ctx->cloud_tls == 1) { - flb_input_set_property(ctx->fleet, "tls", "on"); - } - else { - flb_input_set_property(ctx->fleet, "tls", "off"); - } - - if (ctx->cloud_tls_verify == 1) { - flb_input_set_property(ctx->fleet, "tls.verify", "on"); - } - else { - flb_input_set_property(ctx->fleet, "tls.verify", "off"); - } - - if (ctx->fleet_config_dir) { - flb_input_set_property(ctx->fleet, "config_dir", ctx->fleet_config_dir); - } - - if (ctx->fleet_max_http_buffer_size) { - flb_input_set_property(ctx->fleet, "max_http_buffer_size", ctx->fleet_max_http_buffer_size); - } - if (ctx->machine_id) { - flb_input_set_property(ctx->fleet, "machine_id", ctx->machine_id); + ret = set_fleet_input_properties(ctx, ctx->fleet); + if (ret == -1) { + return -1; } } if (ctx->o) { flb_router_connect(ctx->i, ctx->o); } + flb_plg_info(ins, "custom initialized!"); return 0; } @@ -587,12 +558,12 @@ static struct flb_config_map config_map[] = { "Base path for the configuration directory." }, { - FLB_CONFIG_MAP_INT, "fleet.interval_sec", "-1", + FLB_CONFIG_MAP_STR, "fleet.interval_sec", "-1", 0, FLB_TRUE, offsetof(struct calyptia, fleet_interval_sec), "Set the collector interval" }, { - FLB_CONFIG_MAP_INT, "fleet.interval_nsec", "-1", + FLB_CONFIG_MAP_STR, "fleet.interval_nsec", "-1", 0, FLB_TRUE, offsetof(struct calyptia, fleet_interval_nsec), "Set the collector interval (nanoseconds)" }, diff --git a/plugins/custom_calyptia/calyptia.h b/plugins/custom_calyptia/calyptia.h new file mode 100644 index 00000000000..e1f4dd36770 --- /dev/null +++ b/plugins/custom_calyptia/calyptia.h @@ -0,0 +1,59 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_CALYPTIA_H +#define FLB_CALYPTIA_H + +struct calyptia { + /* config map options */ + flb_sds_t api_key; + flb_sds_t store_path; + flb_sds_t cloud_host; + flb_sds_t cloud_port; + flb_sds_t machine_id; + int machine_id_auto_configured; + +/* used for reporting chunk trace records. */ +#ifdef FLB_HAVE_CHUNK_TRACE + flb_sds_t pipeline_id; +#endif /* FLB_HAVE_CHUNK_TRACE */ + + int cloud_tls; + int cloud_tls_verify; + + /* config reader for 'add_label' */ + struct mk_list *add_labels; + + /* instances */ + struct flb_input_instance *i; + struct flb_output_instance *o; + struct flb_input_instance *fleet; + struct flb_custom_instance *ins; + + /* Fleet configuration */ + flb_sds_t fleet_id; /* fleet-id */ + flb_sds_t fleet_name; + flb_sds_t fleet_config_dir; /* fleet configuration directory */ + flb_sds_t fleet_max_http_buffer_size; + flb_sds_t fleet_interval_sec; + flb_sds_t fleet_interval_nsec; +}; + +int set_fleet_input_properties(struct calyptia *ctx, struct flb_input_instance *fleet); +#endif /* FLB_CALYPTIA_H */ diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index f61068ceede..28e3fc8719c 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -2233,7 +2233,6 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, ctx->collect_fd = -1; ctx->fleet_id_found = FLB_FALSE; - /* Load the config map */ ret = flb_input_config_map_set(in, (void *) ctx); if (ret == -1) { @@ -2278,14 +2277,16 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, return -1; } + /* Log initial interval values */ + flb_plg_debug(ctx->ins, "initial collector interval: sec=%d nsec=%d", + ctx->interval_sec, ctx->interval_nsec); + if (ctx->interval_sec <= 0 && ctx->interval_nsec <= 0) { /* Illegal settings. Override them. */ ctx->interval_sec = atoi(DEFAULT_INTERVAL_SEC); ctx->interval_nsec = atoi(DEFAULT_INTERVAL_NSEC); - } - - if (ctx->interval_sec < atoi(DEFAULT_INTERVAL_SEC)) { - ctx->interval_sec = atoi(DEFAULT_INTERVAL_SEC); + flb_plg_info(ctx->ins, "invalid interval settings, using defaults: sec=%d nsec=%d", + ctx->interval_sec, ctx->interval_nsec); } /* Set the context */ @@ -2328,6 +2329,8 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, } ctx->collect_fd = ret; + flb_plg_info(ctx->ins, "fleet collector initialized with interval: %d sec %d nsec", + ctx->interval_sec, ctx->interval_nsec); return 0; } diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index ea1a1da3199..f355294ed9f 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -60,6 +60,32 @@ if(FLB_OUT_LIB) FLB_RT_TEST(FLB_IN_KUBERNETES_EVENTS "in_kubernetes_events.c") endif() +if (FLB_CUSTOM_CALYPTIA) + # Define common variables for calyptia tests + set(CALYPTIA_TEST_LINK_LIBS + fluent-bit-static + ${CMAKE_THREAD_LIBS_INIT} + ) + + # Add calyptia input properties test + set(TEST_TARGET "flb-rt-calyptia_input_properties") + add_executable(${TEST_TARGET} + "custom_calyptia_input_test.c" + "../../plugins/custom_calyptia/calyptia.c" + ) + + target_link_libraries(${TEST_TARGET} + ${CALYPTIA_TEST_LINK_LIBS} + ) + + add_test(NAME ${TEST_TARGET} + COMMAND ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${TEST_TARGET} + WORKING_DIRECTORY ${CMAKE_HOME_DIRECTORY}/build) + + set_tests_properties(${TEST_TARGET} PROPERTIES LABELS "runtime") + add_dependencies(${TEST_TARGET} fluent-bit-static) +endif() + if(FLB_IN_EBPF) # Define common variables set(EBPF_TEST_INCLUDE_DIRS diff --git a/tests/runtime/custom_calyptia_input_test.c b/tests/runtime/custom_calyptia_input_test.c new file mode 100644 index 00000000000..94d104ab4b8 --- /dev/null +++ b/tests/runtime/custom_calyptia_input_test.c @@ -0,0 +1,165 @@ +#include +#include +#include +#include +#include +#include +#include +#include "flb_tests_runtime.h" +#include "../../plugins/custom_calyptia/calyptia.h" + +/* Test context structure */ +struct test_context { + struct calyptia *ctx; + struct flb_input_instance *fleet; + struct flb_config *config; +}; + +/* Initialize test context */ +static struct test_context *init_test_context() +{ + struct test_context *t_ctx = flb_calloc(1, sizeof(struct test_context)); + if (!t_ctx) { + return NULL; + } + + t_ctx->config = flb_config_init(); + if (!t_ctx->config) { + flb_free(t_ctx); + return NULL; + } + + t_ctx->ctx = flb_calloc(1, sizeof(struct calyptia)); + if (!t_ctx->ctx) { + flb_config_exit(t_ctx->config); + flb_free(t_ctx); + return NULL; + } + + /* Initialize plugin instance for logging */ + t_ctx->ctx->ins = flb_calloc(1, sizeof(struct flb_custom_instance)); + if (!t_ctx->ctx->ins) { + flb_free(t_ctx->ctx); + flb_config_exit(t_ctx->config); + flb_free(t_ctx); + return NULL; + } + + /* Initialize test values in ctx */ + t_ctx->ctx->api_key = flb_strdup("test_api_key"); + t_ctx->ctx->fleet_config_dir = flb_strdup("/test/config/dir"); + t_ctx->ctx->fleet_id = flb_strdup("test_fleet_id"); + t_ctx->ctx->fleet_name = flb_strdup("test_fleet"); + t_ctx->ctx->machine_id = flb_strdup("test_machine_id"); + t_ctx->ctx->fleet_max_http_buffer_size = flb_strdup("1024"); + t_ctx->ctx->fleet_interval_sec = flb_strdup("60"); + t_ctx->ctx->fleet_interval_nsec = flb_strdup("500000000"); + + t_ctx->fleet = flb_input_new(t_ctx->config, "calyptia_fleet", NULL, FLB_FALSE); + if (!t_ctx->fleet) { + if (t_ctx->ctx->ins) flb_free(t_ctx->ctx->ins); + flb_free(t_ctx->ctx); + flb_config_exit(t_ctx->config); + flb_free(t_ctx); + return NULL; + } + + return t_ctx; +} + +/* Cleanup test context */ +static void cleanup_test_context(struct test_context *t_ctx) +{ + if (!t_ctx) { + return; + } + + if (t_ctx->ctx) { + if (t_ctx->ctx->api_key) flb_free(t_ctx->ctx->api_key); + if (t_ctx->ctx->fleet_config_dir) flb_free(t_ctx->ctx->fleet_config_dir); + if (t_ctx->ctx->fleet_id) flb_free(t_ctx->ctx->fleet_id); + if (t_ctx->ctx->fleet_name) flb_free(t_ctx->ctx->fleet_name); + if (t_ctx->ctx->machine_id) flb_free(t_ctx->ctx->machine_id); + if (t_ctx->ctx->fleet_max_http_buffer_size) flb_free(t_ctx->ctx->fleet_max_http_buffer_size); + if (t_ctx->ctx->ins) flb_free(t_ctx->ctx->ins); + flb_free(t_ctx->ctx); + } + + if (t_ctx->config) { + flb_config_exit(t_ctx->config); + } + + flb_free(t_ctx); +} + +void test_set_fleet_input_properties() +{ + struct test_context *t_ctx = init_test_context(); + TEST_CHECK(t_ctx != NULL); + + /* Test setting properties */ + int ret = set_fleet_input_properties(t_ctx->ctx, t_ctx->fleet); + TEST_CHECK(ret == 0); + + /* Verify properties were set correctly */ + const char *value; + + /* Check api_key */ + value = flb_input_get_property("api_key", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("api_key expected=%s got=%s", t_ctx->ctx->api_key, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->api_key) == 0); + + /* Check config_dir */ + value = flb_input_get_property("config_dir", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("config_dir expected=%s got=%s", t_ctx->ctx->fleet_config_dir, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_config_dir) == 0); + + /* Check fleet_id */ + value = flb_input_get_property("fleet_id", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("fleet_id expected=%s got=%s", t_ctx->ctx->fleet_id, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_id) == 0); + + /* Check fleet_name */ + value = flb_input_get_property("fleet_name", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("fleet_name expected=%s got=%s", t_ctx->ctx->fleet_name, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_name) == 0); + + /* Check machine_id */ + value = flb_input_get_property("machine_id", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("machine_id expected=%s got=%s", t_ctx->ctx->machine_id, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->machine_id) == 0); + + /* Check max_http_buffer_size */ + value = flb_input_get_property("max_http_buffer_size", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("max_http_buffer_size expected=%s got=%s", t_ctx->ctx->fleet_max_http_buffer_size, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_max_http_buffer_size) == 0); + + // /* Check interval_sec */ + value = flb_input_get_property("interval_sec", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("interval_sec expected=%s got=%s", t_ctx->ctx->fleet_interval_sec, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_interval_sec) == 0); + + // /* Check interval_nsec */ + value = flb_input_get_property("interval_sec", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("interval_sec expected=%s got=%s", t_ctx->ctx->fleet_interval_sec, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_interval_sec) == 0); + + ret = set_fleet_input_properties(t_ctx->ctx, NULL); + TEST_CHECK(ret == -1); + + cleanup_test_context(t_ctx); +} + +/* Define test list */ +TEST_LIST = { + {"set_fleet_input_properties", test_set_fleet_input_properties}, + {NULL, NULL} +}; \ No newline at end of file