From 68ecb955bddc0b207173ebe4e70bf09f87c3ab7f Mon Sep 17 00:00:00 2001 From: Jorge Niedbalski Date: Tue, 26 Nov 2024 15:06:09 +0100 Subject: [PATCH] custom_calyptia: honour interval in secs and nano secs. This change makes the custom calyptia plugin honour the configured interval (both in secs and nanosecs) that is cascaded to the instantiated input fleet plugin. Signed-off-by: Jorge Niedbalski --- plugins/custom_calyptia/calyptia.c | 159 +++++++--------- plugins/custom_calyptia/calyptia.h | 59 ++++++ plugins/in_calyptia_fleet/in_calyptia_fleet.c | 13 +- tests/runtime/CMakeLists.txt | 26 +++ tests/runtime/custom_calyptia_input_test.c | 172 ++++++++++++++++++ 5 files changed, 330 insertions(+), 99 deletions(-) create mode 100644 plugins/custom_calyptia/calyptia.h create mode 100644 tests/runtime/custom_calyptia_input_test.c diff --git a/plugins/custom_calyptia/calyptia.c b/plugins/custom_calyptia/calyptia.c index c3554157598..76f5868efdc 100644 --- a/plugins/custom_calyptia/calyptia.c +++ b/plugins/custom_calyptia/calyptia.c @@ -30,40 +30,7 @@ #include -struct calyptia { - /* config map options */ - flb_sds_t api_key; - flb_sds_t store_path; - flb_sds_t cloud_host; - flb_sds_t cloud_port; - flb_sds_t machine_id; - int machine_id_auto_configured; - -/* used for reporting chunk trace records. */ -#ifdef FLB_HAVE_CHUNK_TRACE - flb_sds_t pipeline_id; -#endif /* FLB_HAVE_CHUNK_TRACE */ - - int cloud_tls; - int cloud_tls_verify; - - /* config reader for 'add_label' */ - struct mk_list *add_labels; - - /* instances */ - struct flb_input_instance *i; - struct flb_output_instance *o; - struct flb_input_instance *fleet; - struct flb_custom_instance *ins; - - /* Fleet configuration */ - flb_sds_t fleet_id; /* fleet-id */ - flb_sds_t fleet_name; - flb_sds_t fleet_config_dir; /* fleet configuration directory */ - flb_sds_t fleet_max_http_buffer_size; - int fleet_interval_sec; - int fleet_interval_nsec; -}; +#include "calyptia.h" /* * Check if the key belongs to a sensitive data field, if so report it. We never @@ -232,6 +199,53 @@ flb_sds_t custom_calyptia_pipeline_config_get(struct flb_config *ctx) return buf; } +int set_fleet_input_properties(struct calyptia *ctx, struct flb_input_instance *fleet) +{ + if (!fleet) { + flb_plg_error(ctx->ins, "invalid fleet input instance"); + return -1; + } + + if (ctx->fleet_name) { + flb_input_set_property(fleet, "fleet_name", ctx->fleet_name); + } + + if (ctx->fleet_id) { + flb_input_set_property(fleet, "fleet_id", ctx->fleet_id); + } + + flb_input_set_property(fleet, "api_key", ctx->api_key); + flb_input_set_property(fleet, "host", ctx->cloud_host); + flb_input_set_property(fleet, "port", ctx->cloud_port); + + /* Set TLS properties */ + flb_input_set_property(fleet, "tls", ctx->cloud_tls == 1 ? "on" : "off"); + flb_input_set_property(fleet, "tls.verify", ctx->cloud_tls_verify == 1 ? "on" : "off"); + + /* Optional configurations */ + if (ctx->fleet_config_dir) { + flb_input_set_property(fleet, "config_dir", ctx->fleet_config_dir); + } + + if (ctx->fleet_max_http_buffer_size) { + flb_input_set_property(fleet, "max_http_buffer_size", ctx->fleet_max_http_buffer_size); + } + + if (ctx->machine_id) { + flb_input_set_property(fleet, "machine_id", ctx->machine_id); + } + + if (ctx->fleet_interval_sec) { + flb_input_set_property(fleet, "interval_sec", ctx->fleet_interval_sec); + } + + if (ctx->fleet_interval_nsec) { + flb_input_set_property(fleet, "interval_nsec", ctx->fleet_interval_nsec); + } + + return 0; +} + static struct flb_output_instance *setup_cloud_output(struct flb_config *config, struct calyptia *ctx) { int ret; @@ -387,16 +401,14 @@ static flb_sds_t get_machine_id(struct calyptia *ctx) } static int cb_calyptia_init(struct flb_custom_instance *ins, - struct flb_config *config, - void *data) + struct flb_config *config, + void *data) { int ret; struct calyptia *ctx; - int is_fleet_mode; (void) data; ctx = flb_calloc(1, sizeof(struct calyptia)); - if (!ctx) { flb_errno(); return -1; @@ -405,7 +417,6 @@ static int cb_calyptia_init(struct flb_custom_instance *ins, /* Load the config map */ ret = flb_custom_config_map_set(ins, (void *) ctx); - if (ret == -1) { flb_free(ctx); return -1; @@ -416,21 +427,17 @@ static int cb_calyptia_init(struct flb_custom_instance *ins, /* If no machine_id has been provided via a configuration option get it from the local machine-id. */ if (!ctx->machine_id) { - /* machine id */ ctx->machine_id = get_machine_id(ctx); - if (ctx->machine_id == NULL) { flb_plg_error(ctx->ins, "unable to retrieve machine_id"); flb_free(ctx); return -1; } - ctx->machine_id_auto_configured = 1; } /* input collector */ ctx->i = flb_input_new(config, "fluentbit_metrics", NULL, FLB_TRUE); - if (!ctx->i) { flb_plg_error(ctx->ins, "could not load metrics collector"); flb_free(ctx); @@ -439,76 +446,40 @@ static int cb_calyptia_init(struct flb_custom_instance *ins, flb_input_set_property(ctx->i, "tag", "_calyptia_cloud"); flb_input_set_property(ctx->i, "scrape_on_start", "true"); + // This scrape interval should be configurable. flb_input_set_property(ctx->i, "scrape_interval", "30"); - if (ctx->fleet_name || ctx->fleet_id) { - is_fleet_mode = FLB_TRUE; - } - else { - is_fleet_mode = FLB_FALSE; - } - - /* output cloud connector */ - if ((is_fleet_mode == FLB_TRUE && ctx->fleet_id != NULL) || - (is_fleet_mode == FLB_FALSE)) { + /* Setup cloud output if needed */ + if (ctx->fleet_id != NULL || !ctx->fleet_name) { ctx->o = setup_cloud_output(config, ctx); - if (ctx->o == NULL) { flb_free(ctx); return -1; } + /* Set fleet_id for output if present */ + if (ctx->fleet_id) { + flb_output_set_property(ctx->o, "fleet_id", ctx->fleet_id); + } } + /* Setup fleet input if needed */ if (ctx->fleet_id || ctx->fleet_name) { - ctx->fleet = flb_input_new(config, "calyptia_fleet", NULL, FLB_FALSE); - + ctx->fleet = flb_input_new(config, "calyptia_fleet", NULL, FLB_FALSE); if (!ctx->fleet) { flb_plg_error(ctx->ins, "could not load Calyptia Fleet plugin"); return -1; } - if (ctx->fleet_name) { - flb_input_set_property(ctx->fleet, "fleet_name", ctx->fleet_name); - } - - if (ctx->fleet_id) { - flb_output_set_property(ctx->o, "fleet_id", ctx->fleet_id); - flb_input_set_property(ctx->fleet, "fleet_id", ctx->fleet_id); - } - - flb_input_set_property(ctx->fleet, "api_key", ctx->api_key); - flb_input_set_property(ctx->fleet, "host", ctx->cloud_host); - flb_input_set_property(ctx->fleet, "port", ctx->cloud_port); - - if (ctx->cloud_tls == 1) { - flb_input_set_property(ctx->fleet, "tls", "on"); - } - else { - flb_input_set_property(ctx->fleet, "tls", "off"); - } - - if (ctx->cloud_tls_verify == 1) { - flb_input_set_property(ctx->fleet, "tls.verify", "on"); - } - else { - flb_input_set_property(ctx->fleet, "tls.verify", "off"); - } - - if (ctx->fleet_config_dir) { - flb_input_set_property(ctx->fleet, "config_dir", ctx->fleet_config_dir); - } - - if (ctx->fleet_max_http_buffer_size) { - flb_input_set_property(ctx->fleet, "max_http_buffer_size", ctx->fleet_max_http_buffer_size); - } - if (ctx->machine_id) { - flb_input_set_property(ctx->fleet, "machine_id", ctx->machine_id); + ret = set_fleet_input_properties(ctx, ctx->fleet); + if (ret == -1) { + return -1; } } if (ctx->o) { flb_router_connect(ctx->i, ctx->o); } + flb_plg_info(ins, "custom initialized!"); return 0; } @@ -587,12 +558,12 @@ static struct flb_config_map config_map[] = { "Base path for the configuration directory." }, { - FLB_CONFIG_MAP_INT, "fleet.interval_sec", "-1", + FLB_CONFIG_MAP_STR, "fleet.interval_sec", "-1", 0, FLB_TRUE, offsetof(struct calyptia, fleet_interval_sec), "Set the collector interval" }, { - FLB_CONFIG_MAP_INT, "fleet.interval_nsec", "-1", + FLB_CONFIG_MAP_STR, "fleet.interval_nsec", "-1", 0, FLB_TRUE, offsetof(struct calyptia, fleet_interval_nsec), "Set the collector interval (nanoseconds)" }, diff --git a/plugins/custom_calyptia/calyptia.h b/plugins/custom_calyptia/calyptia.h new file mode 100644 index 00000000000..e1f4dd36770 --- /dev/null +++ b/plugins/custom_calyptia/calyptia.h @@ -0,0 +1,59 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_CALYPTIA_H +#define FLB_CALYPTIA_H + +struct calyptia { + /* config map options */ + flb_sds_t api_key; + flb_sds_t store_path; + flb_sds_t cloud_host; + flb_sds_t cloud_port; + flb_sds_t machine_id; + int machine_id_auto_configured; + +/* used for reporting chunk trace records. */ +#ifdef FLB_HAVE_CHUNK_TRACE + flb_sds_t pipeline_id; +#endif /* FLB_HAVE_CHUNK_TRACE */ + + int cloud_tls; + int cloud_tls_verify; + + /* config reader for 'add_label' */ + struct mk_list *add_labels; + + /* instances */ + struct flb_input_instance *i; + struct flb_output_instance *o; + struct flb_input_instance *fleet; + struct flb_custom_instance *ins; + + /* Fleet configuration */ + flb_sds_t fleet_id; /* fleet-id */ + flb_sds_t fleet_name; + flb_sds_t fleet_config_dir; /* fleet configuration directory */ + flb_sds_t fleet_max_http_buffer_size; + flb_sds_t fleet_interval_sec; + flb_sds_t fleet_interval_nsec; +}; + +int set_fleet_input_properties(struct calyptia *ctx, struct flb_input_instance *fleet); +#endif /* FLB_CALYPTIA_H */ diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index f61068ceede..28e3fc8719c 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -2233,7 +2233,6 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, ctx->collect_fd = -1; ctx->fleet_id_found = FLB_FALSE; - /* Load the config map */ ret = flb_input_config_map_set(in, (void *) ctx); if (ret == -1) { @@ -2278,14 +2277,16 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, return -1; } + /* Log initial interval values */ + flb_plg_debug(ctx->ins, "initial collector interval: sec=%d nsec=%d", + ctx->interval_sec, ctx->interval_nsec); + if (ctx->interval_sec <= 0 && ctx->interval_nsec <= 0) { /* Illegal settings. Override them. */ ctx->interval_sec = atoi(DEFAULT_INTERVAL_SEC); ctx->interval_nsec = atoi(DEFAULT_INTERVAL_NSEC); - } - - if (ctx->interval_sec < atoi(DEFAULT_INTERVAL_SEC)) { - ctx->interval_sec = atoi(DEFAULT_INTERVAL_SEC); + flb_plg_info(ctx->ins, "invalid interval settings, using defaults: sec=%d nsec=%d", + ctx->interval_sec, ctx->interval_nsec); } /* Set the context */ @@ -2328,6 +2329,8 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, } ctx->collect_fd = ret; + flb_plg_info(ctx->ins, "fleet collector initialized with interval: %d sec %d nsec", + ctx->interval_sec, ctx->interval_nsec); return 0; } diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index ea1a1da3199..f355294ed9f 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -60,6 +60,32 @@ if(FLB_OUT_LIB) FLB_RT_TEST(FLB_IN_KUBERNETES_EVENTS "in_kubernetes_events.c") endif() +if (FLB_CUSTOM_CALYPTIA) + # Define common variables for calyptia tests + set(CALYPTIA_TEST_LINK_LIBS + fluent-bit-static + ${CMAKE_THREAD_LIBS_INIT} + ) + + # Add calyptia input properties test + set(TEST_TARGET "flb-rt-calyptia_input_properties") + add_executable(${TEST_TARGET} + "custom_calyptia_input_test.c" + "../../plugins/custom_calyptia/calyptia.c" + ) + + target_link_libraries(${TEST_TARGET} + ${CALYPTIA_TEST_LINK_LIBS} + ) + + add_test(NAME ${TEST_TARGET} + COMMAND ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${TEST_TARGET} + WORKING_DIRECTORY ${CMAKE_HOME_DIRECTORY}/build) + + set_tests_properties(${TEST_TARGET} PROPERTIES LABELS "runtime") + add_dependencies(${TEST_TARGET} fluent-bit-static) +endif() + if(FLB_IN_EBPF) # Define common variables set(EBPF_TEST_INCLUDE_DIRS diff --git a/tests/runtime/custom_calyptia_input_test.c b/tests/runtime/custom_calyptia_input_test.c new file mode 100644 index 00000000000..ffc774c83d9 --- /dev/null +++ b/tests/runtime/custom_calyptia_input_test.c @@ -0,0 +1,172 @@ +#include +#include +#include +#include +#include +#include +#include +#include "flb_tests_runtime.h" +#include "../../plugins/custom_calyptia/calyptia.h" + +/* Test context structure */ +struct test_context { + struct calyptia *ctx; + struct flb_input_instance *fleet; + struct flb_config *config; +}; + +/* Initialize test context */ +static struct test_context *init_test_context() +{ + struct test_context *t_ctx = flb_calloc(1, sizeof(struct test_context)); + if (!t_ctx) { + return NULL; + } + + t_ctx->config = flb_config_init(); + if (!t_ctx->config) { + flb_free(t_ctx); + return NULL; + } + + t_ctx->ctx = flb_calloc(1, sizeof(struct calyptia)); + if (!t_ctx->ctx) { + flb_config_exit(t_ctx->config); + flb_free(t_ctx); + return NULL; + } + + /* Initialize plugin instance for logging */ + t_ctx->ctx->ins = flb_calloc(1, sizeof(struct flb_custom_instance)); + if (!t_ctx->ctx->ins) { + flb_free(t_ctx->ctx); + flb_config_exit(t_ctx->config); + flb_free(t_ctx); + return NULL; + } + + /* Initialize test values in ctx */ + t_ctx->ctx->api_key = flb_strdup("test_api_key"); + t_ctx->ctx->fleet_config_dir = flb_strdup("/test/config/dir"); + t_ctx->ctx->fleet_id = flb_strdup("test_fleet_id"); + t_ctx->ctx->fleet_name = flb_strdup("test_fleet"); + t_ctx->ctx->machine_id = flb_strdup("test_machine_id"); + t_ctx->ctx->fleet_max_http_buffer_size = flb_strdup("1024"); + t_ctx->ctx->fleet_interval_sec = flb_strdup("60"); + t_ctx->ctx->fleet_interval_nsec = flb_strdup("500000000"); + + t_ctx->fleet = flb_input_new(t_ctx->config, "calyptia_fleet", NULL, FLB_FALSE); + if (!t_ctx->fleet) { + if (t_ctx->ctx->ins) flb_free(t_ctx->ctx->ins); + flb_free(t_ctx->ctx); + flb_config_exit(t_ctx->config); + flb_free(t_ctx); + return NULL; + } + + return t_ctx; +} + +static void cleanup_test_context(struct test_context *t_ctx) +{ + if (!t_ctx) { + return; + } + + if (t_ctx->fleet) { + /* Input instance cleanup */ + flb_input_instance_destroy(t_ctx->fleet); + } + + if (t_ctx->ctx) { + if (t_ctx->ctx->api_key) flb_free(t_ctx->ctx->api_key); + if (t_ctx->ctx->fleet_config_dir) flb_free(t_ctx->ctx->fleet_config_dir); + if (t_ctx->ctx->fleet_id) flb_free(t_ctx->ctx->fleet_id); + if (t_ctx->ctx->fleet_name) flb_free(t_ctx->ctx->fleet_name); + if (t_ctx->ctx->machine_id) flb_free(t_ctx->ctx->machine_id); + if (t_ctx->ctx->fleet_max_http_buffer_size) flb_free(t_ctx->ctx->fleet_max_http_buffer_size); + if (t_ctx->ctx->fleet_interval_sec) flb_free(t_ctx->ctx->fleet_interval_sec); + if (t_ctx->ctx->fleet_interval_nsec) flb_free(t_ctx->ctx->fleet_interval_nsec); + if (t_ctx->ctx->ins) flb_free(t_ctx->ctx->ins); + flb_free(t_ctx->ctx); + } + + if (t_ctx->config) { + /* Destroy the config which will cleanup any remaining instances */ + flb_config_exit(t_ctx->config); + } + + flb_free(t_ctx); +} + +void test_set_fleet_input_properties() +{ + struct test_context *t_ctx = init_test_context(); + TEST_CHECK(t_ctx != NULL); + + /* Test setting properties */ + int ret = set_fleet_input_properties(t_ctx->ctx, t_ctx->fleet); + TEST_CHECK(ret == 0); + + /* Verify properties were set correctly */ + const char *value; + + /* Check api_key */ + value = flb_input_get_property("api_key", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("api_key expected=%s got=%s", t_ctx->ctx->api_key, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->api_key) == 0); + + /* Check config_dir */ + value = flb_input_get_property("config_dir", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("config_dir expected=%s got=%s", t_ctx->ctx->fleet_config_dir, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_config_dir) == 0); + + /* Check fleet_id */ + value = flb_input_get_property("fleet_id", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("fleet_id expected=%s got=%s", t_ctx->ctx->fleet_id, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_id) == 0); + + /* Check fleet_name */ + value = flb_input_get_property("fleet_name", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("fleet_name expected=%s got=%s", t_ctx->ctx->fleet_name, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_name) == 0); + + /* Check machine_id */ + value = flb_input_get_property("machine_id", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("machine_id expected=%s got=%s", t_ctx->ctx->machine_id, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->machine_id) == 0); + + /* Check max_http_buffer_size */ + value = flb_input_get_property("max_http_buffer_size", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("max_http_buffer_size expected=%s got=%s", t_ctx->ctx->fleet_max_http_buffer_size, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_max_http_buffer_size) == 0); + + // /* Check interval_sec */ + value = flb_input_get_property("interval_sec", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("interval_sec expected=%s got=%s", t_ctx->ctx->fleet_interval_sec, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_interval_sec) == 0); + + // /* Check interval_nsec */ + value = flb_input_get_property("interval_nsec", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("interval_nsec expected=%s got=%s", t_ctx->ctx->fleet_interval_nsec, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_interval_nsec) == 0); + + ret = set_fleet_input_properties(t_ctx->ctx, NULL); + TEST_CHECK(ret == -1); + + cleanup_test_context(t_ctx); +} + +/* Define test list */ +TEST_LIST = { + {"set_fleet_input_properties", test_set_fleet_input_properties}, + {NULL, NULL} +}; \ No newline at end of file