Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_calyptia: retry agent registration on flush callback #9656

Merged
merged 2 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion plugins/custom_calyptia/calyptia.c
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,12 @@ static struct flb_output_instance *setup_cloud_output(struct flb_config *config,
flb_output_set_property(cloud, "match", "_calyptia_cloud");
flb_output_set_property(cloud, "api_key", ctx->api_key);

if (ctx->register_retry_on_flush) {
flb_output_set_property(cloud, "register_retry_on_flush", "true");
} else {
flb_output_set_property(cloud, "register_retry_on_flush", "false");
}

if (ctx->store_path) {
flb_output_set_property(cloud, "store_path", ctx->store_path);
}
Expand Down Expand Up @@ -585,7 +591,11 @@ static struct flb_config_map config_map[] = {
"Pipeline ID for reporting to calyptia cloud."
},
#endif /* FLB_HAVE_CHUNK_TRACE */

{
FLB_CONFIG_MAP_BOOL, "register_retry_on_flush", "true",
0, FLB_TRUE, offsetof(struct calyptia, register_retry_on_flush),
"Retry agent registration on flush if failed on init."
},
/* EOF */
{0}
};
Expand Down
1 change: 1 addition & 0 deletions plugins/custom_calyptia/calyptia.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ struct calyptia {
flb_sds_t fleet_max_http_buffer_size;
flb_sds_t fleet_interval_sec;
flb_sds_t fleet_interval_nsec;
bool register_retry_on_flush; /* retry registration on flush if failed */
};

int set_fleet_input_properties(struct calyptia *ctx, struct flb_input_instance *fleet);
Expand Down
238 changes: 145 additions & 93 deletions plugins/out_calyptia/calyptia.c
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,24 @@ static int calyptia_http_do(struct flb_calyptia *ctx, struct flb_http_client *c,
int ret;
size_t b_sent;

if( !ctx || !c ) {
return FLB_ERROR;
}

/* Ensure agent_token is not empty when required */
if ((type == CALYPTIA_ACTION_METRICS || type == CALYPTIA_ACTION_PATCH || type == CALYPTIA_ACTION_TRACE) &&
!ctx->agent_token) {
flb_plg_warn(ctx->ins, "agent_token is missing for action type %d", type);
return FLB_ERROR;
}

/* append headers */
if (type == CALYPTIA_ACTION_REGISTER) {
// When registering a new agent api key is required
if (!ctx->api_key) {
flb_plg_error(ctx->ins, "api_key is missing");
return FLB_ERROR;
}
flb_http_add_header(c,
CALYPTIA_H_CTYPE, sizeof(CALYPTIA_H_CTYPE) - 1,
CALYPTIA_H_CTYPE_JSON, sizeof(CALYPTIA_H_CTYPE_JSON) - 1);
Expand Down Expand Up @@ -721,6 +737,21 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins,
return NULL;
}

ctx->metrics_endpoint = flb_sds_create_size(256);
if (!ctx->metrics_endpoint) {
flb_free(ctx);
return NULL;
}

#ifdef FLB_HAVE_CHUNK_TRACE
ctx->trace_endpoint = flb_sds_create_size(256);
if (!ctx->trace_endpoint) {
flb_sds_destroy(ctx->metrics_endpoint);
niedbalski marked this conversation as resolved.
Show resolved Hide resolved
flb_free(ctx);
return NULL;
}
#endif

/* api_key */
if (!ctx->api_key) {
flb_plg_error(ctx->ins, "configuration 'api_key' is missing");
Expand Down Expand Up @@ -771,12 +802,40 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins,
return ctx;
}

static int cb_calyptia_init(struct flb_output_instance *ins,
struct flb_config *config, void *data)
static int register_agent(struct flb_calyptia *ctx, struct flb_config *config)
{
int ret;

/* Try registration */
ret = api_agent_create(config, ctx);
niedbalski marked this conversation as resolved.
Show resolved Hide resolved
if (ret != FLB_OK) {
flb_plg_warn(ctx->ins, "agent registration failed");
return FLB_ERROR;
}

/* Update endpoints */
flb_sds_len_set(ctx->metrics_endpoint, 0);
flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS,
ctx->agent_id);

#ifdef FLB_HAVE_CHUNK_TRACE
if (ctx->pipeline_id) {
flb_sds_len_set(ctx->trace_endpoint, 0);
flb_sds_printf(&ctx->trace_endpoint, CALYPTIA_ENDPOINT_TRACE,
ctx->pipeline_id);
}
#endif

flb_plg_info(ctx->ins, "agent registration successful");
return FLB_OK;
}

static int cb_calyptia_init(struct flb_output_instance *ins,
struct flb_config *config, void *data)
{
struct flb_calyptia *ctx;
(void) data;
int ret;

/* create config context */
ctx = config_init(ins, config);
Expand All @@ -791,23 +850,12 @@ static int cb_calyptia_init(struct flb_output_instance *ins,
*/
flb_output_set_http_debug_callbacks(ins);

/* register/update agent */
ret = api_agent_create(config, ctx);
if (ret != FLB_OK) {
flb_plg_error(ctx->ins, "agent registration failed");
ret = register_agent(ctx, config);
if (ret != FLB_OK && !ctx->register_retry_on_flush) {
flb_plg_error(ins, "agent registration failed and register_retry_on_flush=false");
niedbalski marked this conversation as resolved.
Show resolved Hide resolved
return -1;
}

/* metrics endpoint */
ctx->metrics_endpoint = flb_sds_create_size(256);
flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS,
ctx->agent_id);

#ifdef FLB_HAVE_CHUNK_TRACE
ctx->trace_endpoint = flb_sds_create_size(256);
flb_sds_printf(&ctx->trace_endpoint, CALYPTIA_ENDPOINT_TRACE,
ctx->pipeline_id);
#endif /* FLB_HAVE_CHUNK_TRACE */
return 0;
}

Expand All @@ -830,29 +878,79 @@ static void debug_payload(struct flb_calyptia *ctx, void *data, size_t bytes)
cmt_destroy(cmt);
}

static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
struct flb_output_flush *out_flush,
struct flb_input_instance *i_ins,
void *out_context,
struct flb_config *config)
static int cb_calyptia_exit(void *data, struct flb_config *config)
{
int ret = FLB_RETRY;
size_t off = 0;
size_t out_size = 0;
char *out_buf = NULL;
struct flb_calyptia *ctx = data;

if (!ctx) {
return 0;
}

if (ctx->u) {
flb_upstream_destroy(ctx->u);
}

if (ctx->agent_id) {
flb_sds_destroy(ctx->agent_id);
}

if (ctx->agent_token) {
flb_sds_destroy(ctx->agent_token);
}

if (ctx->env) {
flb_env_destroy(ctx->env);
}

if (ctx->metrics_endpoint) {
flb_sds_destroy(ctx->metrics_endpoint);
}

/* used to create records for reporting traces to the cloud. */
#ifdef FLB_HAVE_CHUNK_TRACE
flb_sds_t json;
if (ctx->trace_endpoint) {
flb_sds_destroy(ctx->trace_endpoint);
}
#endif /* FLB_HAVE_CHUNK_TRACE */

if (ctx->fs) {
flb_fstore_destroy(ctx->fs);
}

flb_kv_release(&ctx->kv_labels);
flb_free(ctx);

return 0;
}

static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
struct flb_output_flush *out_flush,
struct flb_input_instance *i_ins,
void *out_context,
struct flb_config *config)
{
int ret;
niedbalski marked this conversation as resolved.
Show resolved Hide resolved
size_t off = 0;
size_t out_size = 0;
char *out_buf = NULL;
struct flb_connection *u_conn;
struct flb_http_client *c = NULL;
struct flb_calyptia *ctx = out_context;
struct cmt *cmt;
flb_sds_t json;
(void) i_ins;
(void) config;

if ((!ctx->agent_id || !ctx->agent_token) && ctx->register_retry_on_flush) {
flb_plg_info(ctx->ins, "missing agent_id or agent_token, attempting re-registration register_retry_on_flush=true");
if (register_agent(ctx, config) != FLB_OK) {
FLB_OUTPUT_RETURN(FLB_RETRY);
}
}
else if (!ctx->agent_id || !ctx->agent_token) {
flb_plg_error(ctx->ins, "missing agent_id or agent_token, and register_retry_on_flush=false");
FLB_OUTPUT_RETURN(FLB_ERROR);
}

/* Get upstream connection */
u_conn = flb_upstream_conn_get(ctx->u);
if (!u_conn) {
Expand Down Expand Up @@ -890,7 +988,7 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,

/* Compose HTTP Client request */
c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->metrics_endpoint,
out_buf, out_size, NULL, 0, NULL, 0);
out_buf, out_size, NULL, 0, NULL, 0);
if (!c) {
if (out_buf != event_chunk->data) {
cmt_encode_msgpack_destroy(out_buf);
Expand All @@ -899,12 +997,12 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
FLB_OUTPUT_RETURN(FLB_RETRY);
}

/* perform request: 'ret' might be FLB_OK, FLB_ERROR or FLB_RETRY */
/* perform request */
ret = calyptia_http_do(ctx, c, CALYPTIA_ACTION_METRICS);
if (ret == FLB_OK) {
flb_plg_debug(ctx->ins, "metrics delivered OK");
}
else if (ret == FLB_ERROR) {
else {
flb_plg_error(ctx->ins, "could not deliver metrics");
debug_payload(ctx, out_buf, out_size);
}
Expand All @@ -915,42 +1013,35 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
}

#ifdef FLB_HAVE_CHUNK_TRACE
if (event_chunk->type == (FLB_EVENT_TYPE_LOGS | FLB_EVENT_TYPE_HAS_TRACE)) {
if (event_chunk->type & FLB_EVENT_TYPE_LOGS &&
event_chunk->type & FLB_EVENT_TYPE_HAS_TRACE) {
json = flb_pack_msgpack_to_json_format(event_chunk->data,
event_chunk->size,
FLB_PACK_JSON_FORMAT_STREAM,
FLB_PACK_JSON_DATE_DOUBLE,
NULL);
event_chunk->size,
FLB_PACK_JSON_FORMAT_STREAM,
FLB_PACK_JSON_DATE_DOUBLE,
NULL);
if (json == NULL) {
flb_upstream_conn_release(u_conn);
FLB_OUTPUT_RETURN(FLB_RETRY);
}
out_buf = (char *)json;
niedbalski marked this conversation as resolved.
Show resolved Hide resolved
out_size = flb_sds_len(json);

if (flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS,
ctx->agent_id) == NULL) {
flb_upstream_conn_release(u_conn);
flb_sds_destroy(json);
FLB_OUTPUT_RETURN(FLB_RETRY);
}
c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->trace_endpoint,
out_buf, out_size, NULL, 0, NULL, 0);
(char *) json, flb_sds_len(json),
NULL, 0, NULL, 0);

if (!c) {
flb_upstream_conn_release(u_conn);
flb_sds_destroy(json);
flb_sds_destroy(ctx->metrics_endpoint);
FLB_OUTPUT_RETURN(FLB_RETRY);
}

/* perform request: 'ret' might be FLB_OK, FLB_ERROR or FLB_RETRY */
ret = calyptia_http_do(ctx, c, CALYPTIA_ACTION_TRACE);
if (ret == FLB_OK) {
flb_plg_debug(ctx->ins, "trace delivered OK");
}
else if (ret == FLB_ERROR) {
else {
flb_plg_error(ctx->ins, "could not deliver trace");
debug_payload(ctx, out_buf, out_size);
debug_payload(ctx, (char *) json, flb_sds_len(json));
}
flb_sds_destroy(json);
}
Expand All @@ -961,51 +1052,8 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
if (c) {
flb_http_client_destroy(c);
}
FLB_OUTPUT_RETURN(ret);
}

static int cb_calyptia_exit(void *data, struct flb_config *config)
{
struct flb_calyptia *ctx = data;

if (!ctx) {
return 0;
}

if (ctx->u) {
flb_upstream_destroy(ctx->u);
}

if (ctx->agent_id) {
flb_sds_destroy(ctx->agent_id);
}

if (ctx->agent_token) {
flb_sds_destroy(ctx->agent_token);
}

if (ctx->env) {
flb_env_destroy(ctx->env);
}

if (ctx->metrics_endpoint) {
flb_sds_destroy(ctx->metrics_endpoint);
}

#ifdef FLB_HAVE_CHUNK_TRACE
if (ctx->trace_endpoint) {
flb_sds_destroy(ctx->trace_endpoint);
}
#endif /* FLB_HAVE_CHUNK_TRACE */

if (ctx->fs) {
flb_fstore_destroy(ctx->fs);
}

flb_kv_release(&ctx->kv_labels);
flb_free(ctx);

return 0;
FLB_OUTPUT_RETURN(ret);
}

/* Configuration properties map */
Expand Down Expand Up @@ -1057,7 +1105,11 @@ static struct flb_config_map config_map[] = {
"Pipeline ID for calyptia core traces."
},
#endif

{
FLB_CONFIG_MAP_BOOL, "register_retry_on_flush", "true",
0, FLB_TRUE, offsetof(struct flb_calyptia, register_retry_on_flush),
"Retry agent registration on flush if failed on init."
niedbalski marked this conversation as resolved.
Show resolved Hide resolved
},
/* EOF */
{0}
};
Expand Down
1 change: 1 addition & 0 deletions plugins/out_calyptia/calyptia.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ struct flb_calyptia {
flb_sds_t trace_endpoint;
flb_sds_t pipeline_id;
#endif /* FLB_HAVE_CHUNK_TRACE */
bool register_retry_on_flush; /* retry registration on flush if failed */
};

#endif
Loading
Loading