Skip to content

Commit

Permalink
out_azure_kusto : fixed return values and added fluentbit version to …
Browse files Browse the repository at this point in the history
…azure kusto http headers

out_azure_kusto : removed extra comma
out_azure_kusto : made refresh interval configurable
out_azure_kusto : randomized refresh interval
out_azure_kusto : azure kusto ingestion resources dynamic parsing
out_azure_kusto : added kusto http header and randomizing ingestion resources loading
out_azure_kusto : added additional mutexes

Signed-off-by: Tanmaya Panda <[email protected]>
  • Loading branch information
tanmaya-panda1 committed Jul 5, 2024
1 parent c3ee584 commit b00f91d
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 123 deletions.
68 changes: 44 additions & 24 deletions plugins/out_azure_kusto/azure_kusto.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

/* Fluent Bit
* ==========
* Copyright (C) 2015-2022 The Fluent Bit Authors
* Copyright (C) 2015-2024 The Fluent Bit Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,8 +22,9 @@
#include <fluent-bit/flb_oauth2.h>
#include <fluent-bit/flb_output_plugin.h>
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_signv4.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_gzip.h>
#include <fluent-bit/flb_version.h>

#include "azure_kusto.h"
#include "azure_kusto_conf.h"
Expand Down Expand Up @@ -98,10 +99,6 @@ flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx)
ctx->o->access_token);
}

// if (output){
// flb_sds_destroy(output);
// }

if (pthread_mutex_unlock(&ctx->token_mutex)) {
flb_plg_error(ctx->ins, "error unlocking mutex");
if (output) {
Expand Down Expand Up @@ -132,34 +129,24 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs

flb_plg_debug(ctx->ins, "before getting upstream connection");

//flb_upstream_init();

flb_plg_debug(ctx->ins, "Logging attributes of flb_azure_kusto_resources:");
flb_plg_debug(ctx->ins, "blob_ha: %p", ctx->resources->blob_ha);
flb_plg_debug(ctx->ins, "queue_ha: %p", ctx->resources->queue_ha);
flb_plg_debug(ctx->ins, "identity_token: %s", ctx->resources->identity_token);
flb_plg_debug(ctx->ins, "load_time: %lu", ctx->resources->load_time);

ctx->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout ;
ctx->u->base.net.keepalive_max_recycle = ctx->keep_alive_max_connection_recycle;

/* Get upstream connection */
u_conn = flb_upstream_conn_get(ctx->u);
flb_plg_debug(ctx->ins, "inside execute ingest csl commnad :: after flb_upstream_conn_get");

if (!u_conn) {
FLB_OUTPUT_RETURN(FLB_RETRY);
}

if (u_conn) {
token = get_azure_kusto_token(ctx);
flb_plg_debug(ctx->ins, "after get azure kusto token");
flb_plg_debug(ctx->ins, "after get azure kusto token");

if (token) {
/* Compose request body */
body = flb_sds_create_size(sizeof(FLB_AZURE_KUSTO_MGMT_BODY_TEMPLATE) - 1 +
strlen(csl));
flb_plg_debug(ctx->ins, "after flb sds create size");

if (body) {
flb_sds_snprintf(&body, flb_sds_alloc(body),
Expand All @@ -176,6 +163,9 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs
flb_http_add_header(c, "Accept", 6, "application/json", 16);
flb_http_add_header(c, "Authorization", 13, token,
flb_sds_len(token));
flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR));
flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16);
flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16);
flb_http_buffer_size(c, FLB_HTTP_DATA_SIZE_MAX * 10);

/* Send HTTP request */
Expand Down Expand Up @@ -268,8 +258,6 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
return -1;
}

// ctx->u->base.net.keepalive = FLB_FALSE;

/* Create oauth2 context */
ctx->o =
flb_oauth2_create(ctx->config, ctx->oauth_url, FLB_AZURE_KUSTO_TOKEN_REFRESH);
Expand Down Expand Up @@ -398,10 +386,14 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
size_t json_size;
size_t tag_len;
struct flb_azure_kusto *ctx = out_context;
int is_compressed = FLB_FALSE;

(void)i_ins;
(void)config;

void *final_payload = NULL;
size_t final_payload_size = 0;

flb_plg_trace(ctx->ins, "flushing bytes %zu", event_chunk->size);

tag_len = flb_sds_len(event_chunk->tag);
Expand All @@ -423,7 +415,27 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
FLB_OUTPUT_RETURN(FLB_RETRY);
}

ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, json, json_size);
flb_plg_trace(ctx->ins, "payload size before compression %d", json_size);
/* Map buffer */
final_payload = json;
final_payload_size = json_size;
if (ctx->compression_enabled == FLB_TRUE) {
ret = flb_gzip_compress((void *) json, json_size,
&final_payload, &final_payload_size);
if (ret != 0) {
flb_plg_error(ctx->ins,
"cannot gzip payload");
flb_sds_destroy(json);
FLB_OUTPUT_RETURN(FLB_RETRY);
}
else {
is_compressed = FLB_TRUE;
flb_plg_debug(ctx->ins, "enabled payload gzip compression");
/* JSON buffer will be cleared at cleanup: */
}
}
flb_plg_trace(ctx->ins, "payload size after compression %zu", final_payload_size);
ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size);
flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret);
if (ret != 0) {
flb_plg_error(ctx->ins, "cannot perform queued ingestion");
Expand All @@ -434,6 +446,10 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
/* Cleanup */
flb_sds_destroy(json);

/* release compressed payload */
if (is_compressed == FLB_TRUE) {
flb_free(final_payload);
}
/* Done */
FLB_OUTPUT_RETURN(FLB_OK);
}
Expand Down Expand Up @@ -502,10 +518,14 @@ static struct flb_config_map config_map[] = {
"This property is ignored"},
{FLB_CONFIG_MAP_TIME, "ingestion_endpoint_connect_timeout", FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT, 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, ingestion_endpoint_connect_timeout),
"Set the ingestion endpoint connection timeout in seconds"},
{FLB_CONFIG_MAP_INT, "keep_alive_max_connection_recycle", FLB_AZURE_KUSTO_KEEP_ALIVE_MAX_RECYCLE, 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, keep_alive_max_connection_recycle),
"Set the max connection recycle value of keep alive connections"},
"Set the connection timeout of various kusto endpoints (kusto ingest endpoint, kusto ingestion blob endpoint, kusto ingestion queue endpoint) in seconds"},
{FLB_CONFIG_MAP_BOOL, "compression_enabled", "true", 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, compression_enabled), "Enable HTTP payload compression (gzip)."
},
{FLB_CONFIG_MAP_TIME, "ingestion_resources_refresh_interval", FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC,0, FLB_TRUE,
offsetof(struct flb_azure_kusto, ingestion_resources_refresh_interval),
"Set the azure kusto ingestion resources refresh interval"
},
/* EOF */
{0}};

Expand Down
11 changes: 7 additions & 4 deletions plugins/out_azure_kusto/azure_kusto.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

/* Fluent Bit
* ==========
* Copyright (C) 2015-2022 The Fluent Bit Authors
* Copyright (C) 2015-2024 The Fluent Bit Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -49,10 +49,9 @@
#define AZURE_KUSTO_RESOURCE_UPSTREAM_URI "uri"
#define AZURE_KUSTO_RESOURCE_UPSTREAM_SAS "sas"

#define FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC 3600
#define FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC "3600"

#define FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT "60"
#define FLB_AZURE_KUSTO_KEEP_ALIVE_MAX_RECYCLE "20"


struct flb_azure_kusto_resources {
Expand All @@ -75,7 +74,11 @@ struct flb_azure_kusto {
flb_sds_t ingestion_mapping_reference;

int ingestion_endpoint_connect_timeout;
int keep_alive_max_connection_recycle;

/* compress payload */
int compression_enabled;

int ingestion_resources_refresh_interval;

/* records configuration */
flb_sds_t log_key;
Expand Down
82 changes: 47 additions & 35 deletions plugins/out_azure_kusto/azure_kusto_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

/* Fluent Bit
* ==========
* Copyright (C) 2015-2022 The Fluent Bit Authors
* Copyright (C) 2015-2024 The Fluent Bit Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -160,8 +160,8 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi
{
jsmn_parser parser;
jsmntok_t *t;
jsmntok_t *tokens;
int tok_size = 100;
jsmntok_t *tokens = NULL;
//int tok_size = 100;
int ret = -1;
int i;
int blob_count = 0;
Expand Down Expand Up @@ -199,10 +199,13 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi
}

jsmn_init(&parser);
tokens = flb_calloc(1, sizeof(jsmntok_t) * tok_size);
//tokens = flb_calloc(1, sizeof(jsmntok_t) * tok_size);

/* Dynamically allocate memory for tokens based on response length */
tokens = flb_calloc(1, sizeof(jsmntok_t) * (flb_sds_len(response)));

if (tokens) {
ret = jsmn_parse(&parser, response, flb_sds_len(response), tokens, tok_size);
ret = jsmn_parse(&parser, response, flb_sds_len(response), tokens, flb_sds_len(response));

if (ret > 0) {
/* skip all tokens until we reach "Rows" */
Expand Down Expand Up @@ -416,7 +419,23 @@ static flb_sds_t parse_ingestion_identity_token(struct flb_azure_kusto *ctx,
return identity_token;
}




/**
* This method returns random integers from range -600 to +600 which needs to be added
* to the kusto ingestion resources refresh interval to even out the spikes
* in kusto DM for .get ingestion resources upon expiry
* */
int azure_kusto_generate_random_integer() {
// Seed the random number generator
int pid = getpid();
unsigned long address = (unsigned long)&address;
unsigned int seed = pid ^ (address & 0xFFFFFFFF) * time(0);
srand(seed);
// Generate a random integer in the range [-600, 600]
int random_integer = rand() % 1201 - 600;
return random_integer;
}

int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
struct flb_config *config)
Expand All @@ -428,17 +447,20 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
struct flb_upstream_ha *queue_ha = NULL;
time_t now;

int generated_random_integer = azure_kusto_generate_random_integer();
flb_plg_debug(ctx->ins, "generated random integer is %d", generated_random_integer);

now = time(NULL);

/* check if we have all resources and they are not stale */
if (ctx->resources->blob_ha && ctx->resources->queue_ha &&
ctx->resources->identity_token &&
now - ctx->resources->load_time < FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC) {
now - ctx->resources->load_time < ctx->ingestion_resources_refresh_interval + generated_random_integer) {
flb_plg_debug(ctx->ins, "resources are already loaded and are not stale");
ret = 0;
}
else {
flb_plg_info(ctx->ins, "loading kusto ingestion resourcs");
flb_plg_info(ctx->ins, "loading kusto ingestion resourcs and refresh interval is %d", ctx->ingestion_resources_refresh_interval + generated_random_integer);
response = execute_ingest_csl_command(ctx, ".get ingestion resources");

if (response) {
Expand All @@ -448,18 +470,18 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
blob_ha = flb_upstream_ha_create("azure_kusto_blob_ha");

if (blob_ha) {
if (pthread_mutex_lock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error locking mutex");
return -1;
}

if (pthread_mutex_lock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error locking mutex");
return -1;
}
ret =
parse_storage_resources(ctx, config, response, blob_ha, queue_ha);

if (pthread_mutex_unlock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error unlocking mutex");
return -1;
}
if (pthread_mutex_unlock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error unlocking mutex");
return -1;
}

if (ret == 0) {
flb_sds_destroy(response);
Expand All @@ -468,46 +490,36 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
response =
execute_ingest_csl_command(ctx, ".get kusto identity token");

if (pthread_mutex_lock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error locking mutex");
return -1;
}
if (response) {
if (pthread_mutex_lock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error locking mutex");
return -1;
}
identity_token =
parse_ingestion_identity_token(ctx, response);

if (identity_token) {
// ret = flb_azure_kusto_resources_clear(ctx->resources);

// if (ret != -1) {
ctx->resources->blob_ha = blob_ha;
ctx->resources->queue_ha = queue_ha;
ctx->resources->identity_token = identity_token;
ctx->resources->load_time = now;

ret = 0;
//}
// else {
// flb_plg_error(
// ctx->ins,
// "error destroying previous ingestion resources");
// }
}
else {
flb_plg_error(ctx->ins,
"error parsing ingestion identity token");
ret = -1;
}
if (pthread_mutex_unlock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error unlocking mutex");
return -1;
}
}
else {
flb_plg_error(ctx->ins, "error getting kusto identity token");
ret = -1;
}

if (pthread_mutex_unlock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error unlocking mutex");
return -1;
}
}
else {
flb_plg_error(ctx->ins,
Expand Down
Loading

0 comments on commit b00f91d

Please sign in to comment.