Skip to content

Commit

Permalink
custom_calyptia: cascade register_retry_on_flush variables.
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Niedbalski <[email protected]>
  • Loading branch information
Jorge Niedbalski committed Nov 28, 2024
1 parent 6cb6a04 commit 4b6637d
Show file tree
Hide file tree
Showing 5 changed files with 389 additions and 28 deletions.
12 changes: 11 additions & 1 deletion plugins/custom_calyptia/calyptia.c
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,12 @@ static struct flb_output_instance *setup_cloud_output(struct flb_config *config,
flb_output_set_property(cloud, "match", "_calyptia_cloud");
flb_output_set_property(cloud, "api_key", ctx->api_key);

if (ctx->register_retry_on_flush) {
flb_output_set_property(cloud, "register_retry_on_flush", "true");
} else {
flb_output_set_property(cloud, "register_retry_on_flush", "false");
}

if (ctx->store_path) {
flb_output_set_property(cloud, "store_path", ctx->store_path);
}
Expand Down Expand Up @@ -585,7 +591,11 @@ static struct flb_config_map config_map[] = {
"Pipeline ID for reporting to calyptia cloud."
},
#endif /* FLB_HAVE_CHUNK_TRACE */

{
FLB_CONFIG_MAP_BOOL, "register_retry_on_flush", "true",
0, FLB_TRUE, offsetof(struct calyptia, register_retry_on_flush),
"Retry agent registration on flush if failed on init."
},
/* EOF */
{0}
};
Expand Down
1 change: 1 addition & 0 deletions plugins/custom_calyptia/calyptia.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ struct calyptia {
flb_sds_t fleet_max_http_buffer_size;
flb_sds_t fleet_interval_sec;
flb_sds_t fleet_interval_nsec;
bool register_retry_on_flush; /* retry registration on flush if failed */
};

int set_fleet_input_properties(struct calyptia *ctx, struct flb_input_instance *fleet);
Expand Down
53 changes: 43 additions & 10 deletions plugins/out_calyptia/calyptia.c
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,24 @@ static int calyptia_http_do(struct flb_calyptia *ctx, struct flb_http_client *c,
int ret;
size_t b_sent;

if( !ctx || !c ) {
return FLB_ERROR;
}

/* Ensure agent_token is not empty when required */
if ((type == CALYPTIA_ACTION_METRICS || type == CALYPTIA_ACTION_PATCH || type == CALYPTIA_ACTION_TRACE) &&
!ctx->agent_token) {
flb_plg_warn(ctx->ins, "agent_token is missing for action type %d", type);
return FLB_ERROR;
}

/* append headers */
if (type == CALYPTIA_ACTION_REGISTER) {
// When registering a new agent api key is required
if (!ctx->api_key) {
flb_plg_error(ctx->ins, "api_key is missing");
return FLB_ERROR;
}
flb_http_add_header(c,
CALYPTIA_H_CTYPE, sizeof(CALYPTIA_H_CTYPE) - 1,
CALYPTIA_H_CTYPE_JSON, sizeof(CALYPTIA_H_CTYPE_JSON) - 1);
Expand Down Expand Up @@ -721,6 +737,21 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins,
return NULL;
}

ctx->metrics_endpoint = flb_sds_create_size(256);
if (!ctx->metrics_endpoint) {
flb_free(ctx);
return NULL;
}

#ifdef FLB_HAVE_CHUNK_TRACE
ctx->trace_endpoint = flb_sds_create_size(256);
if (!ctx->trace_endpoint) {
flb_sds_destroy(ctx->metrics_endpoint);
flb_free(ctx);
return NULL;
}
#endif

/* api_key */
if (!ctx->api_key) {
flb_plg_error(ctx->ins, "configuration 'api_key' is missing");
Expand Down Expand Up @@ -905,17 +936,18 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
struct flb_http_client *c = NULL;
struct flb_calyptia *ctx = out_context;
struct cmt *cmt;
flb_sds_t json;
(void) i_ins;
(void) config;

if (!ctx->agent_id && ctx->register_retry_on_flush) {
flb_plg_info(ctx->ins, "agent_id not found and register_retry_on_flush=true, attempting registration");
if ((!ctx->agent_id || !ctx->agent_token) && ctx->register_retry_on_flush) {
flb_plg_info(ctx->ins, "missing agent_id or agent_token, attempting re-registration register_retry_on_flush=true");
if (register_agent(ctx, config) != FLB_OK) {
FLB_OUTPUT_RETURN(FLB_RETRY);
}
}
else if (!ctx->agent_id) {
flb_plg_error(ctx->ins, "no agent_id available and register_retry_on_flush=false");
else if (!ctx->agent_id || !ctx->agent_token) {
flb_plg_error(ctx->ins, "missing agent_id or agent_token, and register_retry_on_flush=false");
FLB_OUTPUT_RETURN(FLB_ERROR);
}

Expand Down Expand Up @@ -981,12 +1013,13 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
}

#ifdef FLB_HAVE_CHUNK_TRACE
if (event_chunk->type == (FLB_EVENT_TYPE_LOGS | FLB_EVENT_TYPE_HAS_TRACE)) {
flb_sds_t json = flb_pack_msgpack_to_json_format(event_chunk->data,
event_chunk->size,
FLB_PACK_JSON_FORMAT_STREAM,
FLB_PACK_JSON_DATE_DOUBLE,
NULL);
if (event_chunk->type & FLB_EVENT_TYPE_LOGS &&
event_chunk->type & FLB_EVENT_TYPE_HAS_TRACE) {
json = flb_pack_msgpack_to_json_format(event_chunk->data,
event_chunk->size,
FLB_PACK_JSON_FORMAT_STREAM,
FLB_PACK_JSON_DATE_DOUBLE,
NULL);
if (json == NULL) {
flb_upstream_conn_release(u_conn);
FLB_OUTPUT_RETURN(FLB_RETRY);
Expand Down
38 changes: 21 additions & 17 deletions tests/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,29 +61,37 @@ if(FLB_OUT_LIB)
endif()

if (FLB_CUSTOM_CALYPTIA)
# Define common variables for calyptia tests
set(CALYPTIA_TEST_LINK_LIBS
fluent-bit-static
${CMAKE_THREAD_LIBS_INIT}
)

# Add calyptia input properties test
set(TEST_TARGET "flb-rt-calyptia_input_properties")
add_executable(${TEST_TARGET}
set(CALYPTIA_TESTS
"custom_calyptia_test.c"
"custom_calyptia_registration_retry_test.c"
"custom_calyptia_input_test.c"
"../../plugins/custom_calyptia/calyptia.c"
)

target_link_libraries(${TEST_TARGET}
${CALYPTIA_TEST_LINK_LIBS}
)
foreach(TEST_SOURCE ${CALYPTIA_TESTS})
get_filename_component(TEST_NAME ${TEST_SOURCE} NAME_WE)

add_test(NAME ${TEST_TARGET}
COMMAND ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${TEST_TARGET}
WORKING_DIRECTORY ${CMAKE_HOME_DIRECTORY}/build)
set(TEST_TARGET "flb-rt-${TEST_NAME}")
add_executable(${TEST_TARGET}
${TEST_SOURCE}
"../../plugins/custom_calyptia/calyptia.c"
)

target_link_libraries(${TEST_TARGET}
${CALYPTIA_TEST_LINK_LIBS}
)

set_tests_properties(${TEST_TARGET} PROPERTIES LABELS "runtime")
add_dependencies(${TEST_TARGET} fluent-bit-static)
add_test(NAME ${TEST_TARGET}
COMMAND ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${TEST_TARGET}
WORKING_DIRECTORY ${CMAKE_HOME_DIRECTORY}/build)

set_tests_properties(${TEST_TARGET} PROPERTIES LABELS "runtime")
add_dependencies(${TEST_TARGET} fluent-bit-static)
endforeach()
endif()

if(FLB_IN_EBPF)
Expand Down Expand Up @@ -222,10 +230,6 @@ if(FLB_IN_LIB)

endif()

if (FLB_CUSTOM_CALYPTIA)
FLB_RT_TEST(FLB_CUSTOM_CALYPTIA "custom_calyptia_test.c")
endif()

if (FLB_PROCESSOR_METRICS_SELECTOR)
FLB_RT_TEST(FLB_PROCESSOR_METRICS_SELECTOR "processor_metrics_selector.c")
endif()
Expand Down
Loading

0 comments on commit 4b6637d

Please sign in to comment.