Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_azure_kusto : fix multiple files tail issue and timeout issue #8430

Merged
merged 10 commits into from
Nov 26, 2024
68 changes: 65 additions & 3 deletions plugins/out_azure_kusto/azure_kusto.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
#include <fluent-bit/flb_oauth2.h>
#include <fluent-bit/flb_output_plugin.h>
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_signv4.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_gzip.h>
#include <fluent-bit/flb_version.h>

#include "azure_kusto.h"
#include "azure_kusto_conf.h"
Expand Down Expand Up @@ -126,11 +127,21 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs
struct flb_http_client *c;
flb_sds_t resp = NULL;

flb_plg_debug(ctx->ins, "before getting upstream connection");

flb_plg_debug(ctx->ins, "Logging attributes of flb_azure_kusto_resources:");
flb_plg_debug(ctx->ins, "blob_ha: %p", ctx->resources->blob_ha);
flb_plg_debug(ctx->ins, "queue_ha: %p", ctx->resources->queue_ha);
flb_plg_debug(ctx->ins, "load_time: %lu", ctx->resources->load_time);

ctx->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout ;

/* Get upstream connection */
u_conn = flb_upstream_conn_get(ctx->u);

if (u_conn) {
token = get_azure_kusto_token(ctx);
flb_plg_debug(ctx->ins, "after get azure kusto token");

if (token) {
/* Compose request body */
Expand All @@ -152,6 +163,9 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs
flb_http_add_header(c, "Accept", 6, "application/json", 16);
flb_http_add_header(c, "Authorization", 13, token,
flb_sds_len(token));
flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR));
flb_http_add_header(c, "x-ms-app", 8, "Fluent-Bit", 10);
flb_http_add_header(c, "x-ms-user", 9, "Fluent-Bit", 10);
flb_http_buffer_size(c, FLB_HTTP_DATA_SIZE_MAX * 10);

/* Send HTTP request */
Expand Down Expand Up @@ -231,6 +245,7 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
*/
pthread_mutex_init(&ctx->token_mutex, NULL);
pthread_mutex_init(&ctx->resources_mutex, NULL);
pthread_mutex_init(&ctx->blob_mutex, NULL);

/*
* Create upstream context for Kusto Ingestion endpoint
Expand All @@ -250,6 +265,8 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
}
flb_output_upstream_set(ctx->u, ins);

flb_plg_debug(ctx->ins, "azure kusto init completed");

return 0;
}

Expand Down Expand Up @@ -367,16 +384,21 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
size_t json_size;
size_t tag_len;
struct flb_azure_kusto *ctx = out_context;
int is_compressed = FLB_FALSE;

(void)i_ins;
(void)config;

void *final_payload = NULL;
size_t final_payload_size = 0;

flb_plg_trace(ctx->ins, "flushing bytes %zu", event_chunk->size);

tag_len = flb_sds_len(event_chunk->tag);

/* Load or refresh ingestion resources */
ret = azure_kusto_load_ingestion_resources(ctx, config);
flb_plg_trace(ctx->ins, "load_ingestion_resources: ret=%d", ret);
if (ret != 0) {
flb_plg_error(ctx->ins, "cannot load ingestion resources");
FLB_OUTPUT_RETURN(FLB_RETRY);
Expand All @@ -385,12 +407,32 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
/* Reformat msgpack to JSON payload */
ret = azure_kusto_format(ctx, event_chunk->tag, tag_len, event_chunk->data,
event_chunk->size, (void **)&json, &json_size);
flb_plg_trace(ctx->ins, "format: ret=%d", ret);
if (ret != 0) {
flb_plg_error(ctx->ins, "cannot reformat data into json");
FLB_OUTPUT_RETURN(FLB_RETRY);
}

ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, json, json_size);
/* Map buffer */
final_payload = json;
final_payload_size = json_size;
if (ctx->compression_enabled == FLB_TRUE) {
ret = flb_gzip_compress((void *) json, json_size,
&final_payload, &final_payload_size);
if (ret != 0) {
flb_plg_error(ctx->ins,
"cannot gzip payload");
flb_sds_destroy(json);
FLB_OUTPUT_RETURN(FLB_RETRY);
}
else {
is_compressed = FLB_TRUE;
/* JSON buffer will be cleared at cleanup: */
}
}
flb_plg_trace(ctx->ins, "payload size before compression %zu & after compression %zu ", json_size ,final_payload_size);
ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size);
flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret);
if (ret != 0) {
flb_plg_error(ctx->ins, "cannot perform queued ingestion");
flb_sds_destroy(json);
Expand All @@ -400,6 +442,10 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
/* Cleanup */
flb_sds_destroy(json);

/* release compressed payload */
if (is_compressed == FLB_TRUE) {
flb_free(final_payload);
}
/* Done */
FLB_OUTPUT_RETURN(FLB_OK);
}
Expand All @@ -417,6 +463,10 @@ static int cb_azure_kusto_exit(void *data, struct flb_config *config)
ctx->u = NULL;
}

pthread_mutex_destroy(&ctx->resources_mutex);
pthread_mutex_destroy(&ctx->token_mutex);
pthread_mutex_destroy(&ctx->blob_mutex);

flb_azure_kusto_conf_destroy(ctx);

return 0;
Expand Down Expand Up @@ -462,7 +512,19 @@ static struct flb_config_map config_map[] = {
offsetof(struct flb_azure_kusto, time_key),
"The key name of the time. If 'include_time_key' is false, "
"This property is ignored"},
/* EOF */
{FLB_CONFIG_MAP_TIME, "ingestion_endpoint_connect_timeout", FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT, 0, FLB_TRUE,
tanmaya-panda1 marked this conversation as resolved.
Show resolved Hide resolved
offsetof(struct flb_azure_kusto, ingestion_endpoint_connect_timeout),
"Set the connection timeout of various kusto endpoints (kusto ingest endpoint, kusto ingestion blob endpoint, kusto ingestion queue endpoint) in seconds."
"The default is 60 seconds."},
{FLB_CONFIG_MAP_BOOL, "compression_enabled", "true", 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, compression_enabled),
"Enable HTTP payload compression (gzip)."
"The default is true."},
{FLB_CONFIG_MAP_TIME, "ingestion_resources_refresh_interval", FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC,0, FLB_TRUE,
offsetof(struct flb_azure_kusto, ingestion_resources_refresh_interval),
"Set the azure kusto ingestion resources refresh interval"
"The default is 3600 seconds."},
/* EOF */
{0}};

struct flb_output_plugin out_azure_kusto_plugin = {
Expand Down
14 changes: 13 additions & 1 deletion plugins/out_azure_kusto/azure_kusto.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@
#define AZURE_KUSTO_RESOURCE_UPSTREAM_URI "uri"
#define AZURE_KUSTO_RESOURCE_UPSTREAM_SAS "sas"

#define FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC 3600
#define FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC "3600"

#define FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT "60"


struct flb_azure_kusto_resources {
struct flb_upstream_ha *blob_ha;
Expand All @@ -70,6 +73,13 @@ struct flb_azure_kusto {
flb_sds_t table_name;
flb_sds_t ingestion_mapping_reference;

int ingestion_endpoint_connect_timeout;

/* compress payload */
int compression_enabled;

int ingestion_resources_refresh_interval;

/* records configuration */
flb_sds_t log_key;
int include_tag_key;
Expand All @@ -94,6 +104,8 @@ struct flb_azure_kusto {
/* mutex for loading reosurces */
pthread_mutex_t resources_mutex;

pthread_mutex_t blob_mutex;

/* Upstream connection to the backend server */
struct flb_upstream *u;

Expand Down
75 changes: 51 additions & 24 deletions plugins/out_azure_kusto/azure_kusto_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi
{
jsmn_parser parser;
jsmntok_t *t;
jsmntok_t *tokens;
int tok_size = 100;
jsmntok_t *tokens = NULL;
int ret = -1;
int i;
int blob_count = 0;
Expand Down Expand Up @@ -200,10 +199,18 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi
}

jsmn_init(&parser);
tokens = flb_calloc(1, sizeof(jsmntok_t) * tok_size);

/* Dynamically allocate memory for tokens based on response length */
tokens = flb_calloc(1, sizeof(jsmntok_t) * (flb_sds_len(response)));

tanmaya-panda1 marked this conversation as resolved.
Show resolved Hide resolved
if (!tokens) {
flb_errno(); /* Log the error using flb_errno() */
flb_plg_error(ctx->ins, "failed to allocate memory for tokens");
return -1;
}

if (tokens) {
ret = jsmn_parse(&parser, response, flb_sds_len(response), tokens, tok_size);
ret = jsmn_parse(&parser, response, flb_sds_len(response), tokens, flb_sds_len(response));

if (ret > 0) {
/* skip all tokens until we reach "Rows" */
Expand Down Expand Up @@ -417,6 +424,24 @@ static flb_sds_t parse_ingestion_identity_token(struct flb_azure_kusto *ctx,
return identity_token;
}



/**
* This method returns random integers from range -600 to +600 which needs to be added
* to the kusto ingestion resources refresh interval to even out the spikes
* in kusto DM for .get ingestion resources upon expiry
* */
int azure_kusto_generate_random_integer() {
/* Seed the random number generator */
int pid = getpid();
unsigned long address = (unsigned long)&address;
unsigned int seed = pid ^ (address & 0xFFFFFFFF) * time(0);
srand(seed);
/* Generate a random integer in the range [-600, 600] */
int random_integer = rand() % 1201 - 600;
return random_integer;
}

int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
struct flb_config *config)
{
Expand All @@ -427,22 +452,20 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
struct flb_upstream_ha *queue_ha = NULL;
time_t now;

if (pthread_mutex_lock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error locking mutex");
return -1;
}
int generated_random_integer = azure_kusto_generate_random_integer();
flb_plg_debug(ctx->ins, "generated random integer is %d", generated_random_integer);

now = time(NULL);

/* check if we have all resources and they are not stale */
if (ctx->resources->blob_ha && ctx->resources->queue_ha &&
ctx->resources->identity_token &&
now - ctx->resources->load_time < FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC) {
now - ctx->resources->load_time < ctx->ingestion_resources_refresh_interval + generated_random_integer) {
flb_plg_debug(ctx->ins, "resources are already loaded and are not stale");
ret = 0;
}
else {
flb_plg_info(ctx->ins, "loading kusto ingestion resourcs");
flb_plg_info(ctx->ins, "loading kusto ingestion resourcs and refresh interval is %d", ctx->ingestion_resources_refresh_interval + generated_random_integer);
response = execute_ingest_csl_command(ctx, ".get ingestion resources");

if (response) {
Expand All @@ -452,9 +475,19 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
blob_ha = flb_upstream_ha_create("azure_kusto_blob_ha");

if (blob_ha) {

if (pthread_mutex_lock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error locking mutex");
return -1;
}
ret =
parse_storage_resources(ctx, config, response, blob_ha, queue_ha);

if (pthread_mutex_unlock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error unlocking mutex");
return -1;
}

if (ret == 0) {
flb_sds_destroy(response);
response = NULL;
Expand All @@ -463,31 +496,30 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
execute_ingest_csl_command(ctx, ".get kusto identity token");

if (response) {
if (pthread_mutex_lock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error locking mutex");
return -1;
}
identity_token =
parse_ingestion_identity_token(ctx, response);

if (identity_token) {
ret = flb_azure_kusto_resources_clear(ctx->resources);

if (ret != -1) {
ctx->resources->blob_ha = blob_ha;
ctx->resources->queue_ha = queue_ha;
ctx->resources->identity_token = identity_token;
ctx->resources->load_time = now;

ret = 0;
}
else {
flb_plg_error(
ctx->ins,
"error destroying previous ingestion resources");
}
}
else {
flb_plg_error(ctx->ins,
"error parsing ingestion identity token");
ret = -1;
}
if (pthread_mutex_unlock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error unlocking mutex");
return -1;
}
}
else {
flb_plg_error(ctx->ins, "error getting kusto identity token");
Expand Down Expand Up @@ -526,11 +558,6 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
}
}

if (pthread_mutex_unlock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error unlocking mutex");
return -1;
}

return ret;
}

Expand Down
Loading
Loading