From 8ad934e1e1191fe2e10896d037637a0c8f6c9951 Mon Sep 17 00:00:00 2001 From: Tanmaya Panda Date: Mon, 12 Feb 2024 10:59:23 +0530 Subject: [PATCH 1/7] header and randomizing commits --- plugins/out_azure_kusto/azure_kusto.c | 7 +++---- plugins/out_azure_kusto/azure_kusto.h | 4 +--- plugins/out_azure_kusto/azure_kusto_conf.c | 12 ++++++++---- plugins/out_azure_kusto/azure_kusto_ingest.c | 9 +++++++-- 4 files changed, 19 insertions(+), 13 deletions(-) diff --git a/plugins/out_azure_kusto/azure_kusto.c b/plugins/out_azure_kusto/azure_kusto.c index 4b42d81bf17..6e8779db1de 100644 --- a/plugins/out_azure_kusto/azure_kusto.c +++ b/plugins/out_azure_kusto/azure_kusto.c @@ -135,7 +135,6 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs flb_plg_debug(ctx->ins, "load_time: %lu", ctx->resources->load_time); ctx->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout ; - ctx->u->base.net.keepalive_max_recycle = ctx->keep_alive_max_connection_recycle; /* Get upstream connection */ u_conn = flb_upstream_conn_get(ctx->u); @@ -164,6 +163,9 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs flb_http_add_header(c, "Accept", 6, "application/json", 16); flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token)); + flb_http_add_header(c, "x-ms-client-version", 19, "Kusto.Fluent-Bit:1.0.0", 22); + flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16); + flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16); flb_http_buffer_size(c, FLB_HTTP_DATA_SIZE_MAX * 10); /* Send HTTP request */ @@ -489,9 +491,6 @@ static struct flb_config_map config_map[] = { {FLB_CONFIG_MAP_TIME, "ingestion_endpoint_connect_timeout", FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT, 0, FLB_TRUE, offsetof(struct flb_azure_kusto, ingestion_endpoint_connect_timeout), "Set the ingestion endpoint connection timeout in seconds"}, - {FLB_CONFIG_MAP_INT, "keep_alive_max_connection_recycle", FLB_AZURE_KUSTO_KEEP_ALIVE_MAX_RECYCLE, 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, keep_alive_max_connection_recycle), - "Set the max connection recycle value of keep alive connections"}, /* EOF */ {0}}; diff --git a/plugins/out_azure_kusto/azure_kusto.h b/plugins/out_azure_kusto/azure_kusto.h index cd0199ba0d1..6f9e8507104 100644 --- a/plugins/out_azure_kusto/azure_kusto.h +++ b/plugins/out_azure_kusto/azure_kusto.h @@ -49,10 +49,9 @@ #define AZURE_KUSTO_RESOURCE_UPSTREAM_URI "uri" #define AZURE_KUSTO_RESOURCE_UPSTREAM_SAS "sas" -#define FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC 3600 +#define FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC 7200 #define FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT "60" -#define FLB_AZURE_KUSTO_KEEP_ALIVE_MAX_RECYCLE "20" struct flb_azure_kusto_resources { @@ -75,7 +74,6 @@ struct flb_azure_kusto { flb_sds_t ingestion_mapping_reference; int ingestion_endpoint_connect_timeout; - int keep_alive_max_connection_recycle; /* records configuration */ flb_sds_t log_key; diff --git a/plugins/out_azure_kusto/azure_kusto_conf.c b/plugins/out_azure_kusto/azure_kusto_conf.c index c4195f2aadf..61ffeb0fb35 100644 --- a/plugins/out_azure_kusto/azure_kusto_conf.c +++ b/plugins/out_azure_kusto/azure_kusto_conf.c @@ -160,8 +160,8 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi { jsmn_parser parser; jsmntok_t *t; - jsmntok_t *tokens; - int tok_size = 100; + jsmntok_t *tokens = NULL; + //int tok_size = 100; int ret = -1; int i; int blob_count = 0; @@ -199,10 +199,14 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi } jsmn_init(&parser); - tokens = flb_calloc(1, sizeof(jsmntok_t) * tok_size); + + //tokens = flb_calloc(1, sizeof(jsmntok_t) * tok_size); + + // Dynamically allocate memory for tokens based on response length + tokens = flb_calloc(1, sizeof(jsmntok_t) * (flb_sds_len(response))); if (tokens) { - ret = jsmn_parse(&parser, response, flb_sds_len(response), tokens, tok_size); + ret = jsmn_parse(&parser, response, flb_sds_len(response), tokens, flb_sds_len(response)); if (ret > 0) { /* skip all tokens until we reach "Rows" */ diff --git a/plugins/out_azure_kusto/azure_kusto_ingest.c b/plugins/out_azure_kusto/azure_kusto_ingest.c index deb641c69c2..3c82bd02855 100644 --- a/plugins/out_azure_kusto/azure_kusto_ingest.c +++ b/plugins/out_azure_kusto/azure_kusto_ingest.c @@ -152,7 +152,6 @@ static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t flb_plg_debug(ctx->ins,"inside blob after upstream ha node get"); u_node->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout; - u_node->u->base.net.keepalive_max_recycle = ctx->keep_alive_max_connection_recycle; u_conn = flb_upstream_conn_get(u_node->u); @@ -181,6 +180,10 @@ static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t flb_http_add_header(c, "x-ms-blob-type", 14, "BlockBlob", 9); flb_http_add_header(c, "x-ms-date", 9, tmp, len); flb_http_add_header(c, "x-ms-version", 12, "2019-12-12", 10); + flb_http_add_header(c, "x-ms-client-version", 19, "Kusto.Fluent-Bit:1.0.0", 22); + flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16); + flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16); + ret = flb_http_do(c, &resp_size); flb_plg_debug(ctx->ins, @@ -386,7 +389,6 @@ static int azure_kusto_enqueue_ingestion(struct flb_azure_kusto *ctx, flb_sds_t } u_node->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout; - u_node->u->base.net.keepalive_max_recycle = ctx->keep_alive_max_connection_recycle; u_conn = flb_upstream_conn_get(u_node->u); @@ -415,6 +417,9 @@ static int azure_kusto_enqueue_ingestion(struct flb_azure_kusto *ctx, flb_sds_t 20); flb_http_add_header(c, "x-ms-date", 9, tmp, len); flb_http_add_header(c, "x-ms-version", 12, "2019-12-12", 10); + flb_http_add_header(c, "x-ms-client-version", 19, "Kusto.Fluent-Bit:1.0.0", 22); + flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16); + flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16); ret = flb_http_do(c, &resp_size); flb_plg_debug(ctx->ins, From a35ac29bacf42e89014959a861ca3dedc3ff4668 Mon Sep 17 00:00:00 2001 From: Tanmaya Panda Date: Tue, 13 Feb 2024 02:11:03 +0530 Subject: [PATCH 2/7] azure kusto ingestion resources dynamic parsing Signed-off-by: Tanmaya Panda --- plugins/out_azure_kusto/azure_kusto_conf.c | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/plugins/out_azure_kusto/azure_kusto_conf.c b/plugins/out_azure_kusto/azure_kusto_conf.c index 61ffeb0fb35..9a513eb4648 100644 --- a/plugins/out_azure_kusto/azure_kusto_conf.c +++ b/plugins/out_azure_kusto/azure_kusto_conf.c @@ -2,7 +2,7 @@ /* Fluent Bit * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors + * Copyright (C) 2015-2024 The Fluent Bit Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -199,10 +199,9 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi } jsmn_init(&parser); - //tokens = flb_calloc(1, sizeof(jsmntok_t) * tok_size); - // Dynamically allocate memory for tokens based on response length + /* Dynamically allocate memory for tokens based on response length */ tokens = flb_calloc(1, sizeof(jsmntok_t) * (flb_sds_len(response))); if (tokens) { @@ -420,7 +419,7 @@ static flb_sds_t parse_ingestion_identity_token(struct flb_azure_kusto *ctx, return identity_token; } - + int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, struct flb_config *config) @@ -452,7 +451,7 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, blob_ha = flb_upstream_ha_create("azure_kusto_blob_ha"); if (blob_ha) { - + if (pthread_mutex_lock(&ctx->resources_mutex)) { flb_plg_error(ctx->ins, "error locking mutex"); return -1; From a7e871c9582415b90da0b4af52a3408cdedb96d6 Mon Sep 17 00:00:00 2001 From: Ramachandran A G Date: Mon, 19 Feb 2024 11:23:44 +0530 Subject: [PATCH 3/7] * introduced gzip compression Signed-off-by: Tanmaya Panda --- plugins/out_azure_kusto/azure_kusto.c | 35 ++++++++++++++++++-- plugins/out_azure_kusto/azure_kusto.h | 5 ++- plugins/out_azure_kusto/azure_kusto_ingest.c | 14 ++++++-- 3 files changed, 48 insertions(+), 6 deletions(-) diff --git a/plugins/out_azure_kusto/azure_kusto.c b/plugins/out_azure_kusto/azure_kusto.c index 6e8779db1de..952cb375b4c 100644 --- a/plugins/out_azure_kusto/azure_kusto.c +++ b/plugins/out_azure_kusto/azure_kusto.c @@ -2,7 +2,7 @@ /* Fluent Bit * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors + * Copyright (C) 2015-2024 The Fluent Bit Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -386,10 +386,14 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, size_t json_size; size_t tag_len; struct flb_azure_kusto *ctx = out_context; + int is_compressed = FLB_FALSE; (void)i_ins; (void)config; + void *final_payload = NULL; + size_t final_payload_size = 0; + flb_plg_trace(ctx->ins, "flushing bytes %zu", event_chunk->size); tag_len = flb_sds_len(event_chunk->tag); @@ -411,7 +415,27 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, FLB_OUTPUT_RETURN(FLB_RETRY); } - ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, json, json_size); + flb_plg_trace(ctx->ins, "payload size before compression %d", json_size); + /* Map buffer */ + final_payload = json; + final_payload_size = json_size; + if (ctx->compression_enabled == FLB_TRUE) { + ret = flb_gzip_compress((void *) json, json_size, + &final_payload, &final_payload_size); + if (ret != 0) { + flb_plg_error(ctx->ins, + "cannot gzip payload"); + flb_sds_destroy(json); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + else { + is_compressed = FLB_TRUE; + flb_plg_debug(ctx->ins, "enabled payload gzip compression"); + /* JSON buffer will be cleared at cleanup: */ + } + } + flb_plg_trace(ctx->ins, "payload size after compression %d", final_payload_size); + ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size); flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret); if (ret != 0) { flb_plg_error(ctx->ins, "cannot perform queued ingestion"); @@ -422,6 +446,10 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, /* Cleanup */ flb_sds_destroy(json); + /* release compressed payload */ + if (is_compressed == FLB_TRUE) { + flb_free(final_payload); + } /* Done */ FLB_OUTPUT_RETURN(FLB_OK); } @@ -491,6 +519,9 @@ static struct flb_config_map config_map[] = { {FLB_CONFIG_MAP_TIME, "ingestion_endpoint_connect_timeout", FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT, 0, FLB_TRUE, offsetof(struct flb_azure_kusto, ingestion_endpoint_connect_timeout), "Set the ingestion endpoint connection timeout in seconds"}, + {FLB_CONFIG_MAP_BOOL, "compression_enabled", "true", 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, compression_enabled), "Enable HTTP payload compression (gzip)." + }, /* EOF */ {0}}; diff --git a/plugins/out_azure_kusto/azure_kusto.h b/plugins/out_azure_kusto/azure_kusto.h index 6f9e8507104..9136f25294a 100644 --- a/plugins/out_azure_kusto/azure_kusto.h +++ b/plugins/out_azure_kusto/azure_kusto.h @@ -2,7 +2,7 @@ /* Fluent Bit * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors + * Copyright (C) 2015-2024 The Fluent Bit Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -75,6 +75,9 @@ struct flb_azure_kusto { int ingestion_endpoint_connect_timeout; + /* compress payload */ + int compression_enabled; + /* records configuration */ flb_sds_t log_key; int include_tag_key; diff --git a/plugins/out_azure_kusto/azure_kusto_ingest.c b/plugins/out_azure_kusto/azure_kusto_ingest.c index 3c82bd02855..df0397e5a08 100644 --- a/plugins/out_azure_kusto/azure_kusto_ingest.c +++ b/plugins/out_azure_kusto/azure_kusto_ingest.c @@ -92,6 +92,14 @@ static flb_sds_t azure_kusto_create_blob_uri(struct flb_azure_kusto *ctx, size_t blob_uri_size; char *blob_sas; size_t blob_sas_size; + const char *extension; + + if (ctx->compression_enabled) { + extension = ".multijson.gz"; + } + else { + extension = ".multijson"; + } ret = flb_hash_table_get(u_node->ht, AZURE_KUSTO_RESOURCE_UPSTREAM_URI, 3, (void **)&blob_uri, &blob_uri_size); @@ -109,11 +117,11 @@ static flb_sds_t azure_kusto_create_blob_uri(struct flb_azure_kusto *ctx, /* uri will be https:////.multijson? */ uri = flb_sds_create_size(flb_sds_len(u_node->host) + blob_uri_size + blob_sas_size + - flb_sds_len(blob_id) + 21); + flb_sds_len(blob_id) + 11 + strlen(extension)); if (uri) { - flb_sds_snprintf(&uri, flb_sds_alloc(uri), "https://%s%s/%s.multijson?%s", - u_node->host, blob_uri, blob_id, blob_sas); + flb_sds_snprintf(&uri, flb_sds_alloc(uri), "https://%s%s/%s%s?%s", + u_node->host, blob_uri, blob_id, extension ,blob_sas); flb_plg_debug(ctx->ins, "created blob uri %s", uri); } else { From b4246c02563ef6588efdc38f75b8039b7954c50e Mon Sep 17 00:00:00 2001 From: Tanmaya Panda Date: Wed, 21 Feb 2024 02:09:59 +0530 Subject: [PATCH 4/7] randomized refresh interval Signed-off-by: Tanmaya Panda --- plugins/out_azure_kusto/azure_kusto_conf.c | 23 ++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/plugins/out_azure_kusto/azure_kusto_conf.c b/plugins/out_azure_kusto/azure_kusto_conf.c index 9a513eb4648..79ff26cba28 100644 --- a/plugins/out_azure_kusto/azure_kusto_conf.c +++ b/plugins/out_azure_kusto/azure_kusto_conf.c @@ -421,6 +421,22 @@ static flb_sds_t parse_ingestion_identity_token(struct flb_azure_kusto *ctx, +/** + * This method returns random integers from range -600 to +600 which needs to be added + * to the kusto ingestion resources refresh interval to even out the spikes + * in kusto DM for .get ingestion resources upon expiry + * */ +int azure_kusto_generate_random_integer() { + // Seed the random number generator + int pid = getpid(); + unsigned long address = (unsigned long)&address; + unsigned int seed = pid ^ (address & 0xFFFFFFFF) * time(0); + srand(seed); + // Generate a random integer in the range [-600, 600] + int random_integer = rand() % 1201 - 600; + return random_integer; +} + int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, struct flb_config *config) { @@ -431,17 +447,20 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, struct flb_upstream_ha *queue_ha = NULL; time_t now; + int generated_random_integer = azure_kusto_generate_random_integer(); + flb_plg_debug(ctx->ins, "generated random integer is %d", generated_random_integer); + now = time(NULL); /* check if we have all resources and they are not stale */ if (ctx->resources->blob_ha && ctx->resources->queue_ha && ctx->resources->identity_token && - now - ctx->resources->load_time < FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC) { + now - ctx->resources->load_time < FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC + generated_random_integer) { flb_plg_debug(ctx->ins, "resources are already loaded and are not stale"); ret = 0; } else { - flb_plg_info(ctx->ins, "loading kusto ingestion resourcs"); + flb_plg_info(ctx->ins, "loading kusto ingestion resourcs and refresh interval is %d", ,FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC + generated_random_integer); response = execute_ingest_csl_command(ctx, ".get ingestion resources"); if (response) { From 27718ce705d3c621f706be8f5be73bc269ca809a Mon Sep 17 00:00:00 2001 From: Tanmaya Panda Date: Wed, 21 Feb 2024 10:25:19 +0530 Subject: [PATCH 5/7] made refresh interval configurable Signed-off-by: Tanmaya Panda --- plugins/out_azure_kusto/azure_kusto.c | 4 ++++ plugins/out_azure_kusto/azure_kusto.h | 4 +++- plugins/out_azure_kusto/azure_kusto_conf.c | 4 ++-- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/plugins/out_azure_kusto/azure_kusto.c b/plugins/out_azure_kusto/azure_kusto.c index 952cb375b4c..4431246fcd7 100644 --- a/plugins/out_azure_kusto/azure_kusto.c +++ b/plugins/out_azure_kusto/azure_kusto.c @@ -522,6 +522,10 @@ static struct flb_config_map config_map[] = { {FLB_CONFIG_MAP_BOOL, "compression_enabled", "true", 0, FLB_TRUE, offsetof(struct flb_azure_kusto, compression_enabled), "Enable HTTP payload compression (gzip)." }, + {FLB_CONFIG_MAP_TIME, "ingestion_resources_refresh_interval", FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC,0, FLB_TRUE, + offsetof(struct flb_azure_kusto, ingestion_resources_refresh_interval), + "Set the azure kusto ingestion resources refresh interval" + }, /* EOF */ {0}}; diff --git a/plugins/out_azure_kusto/azure_kusto.h b/plugins/out_azure_kusto/azure_kusto.h index 9136f25294a..9e3eb7b3182 100644 --- a/plugins/out_azure_kusto/azure_kusto.h +++ b/plugins/out_azure_kusto/azure_kusto.h @@ -49,7 +49,7 @@ #define AZURE_KUSTO_RESOURCE_UPSTREAM_URI "uri" #define AZURE_KUSTO_RESOURCE_UPSTREAM_SAS "sas" -#define FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC 7200 +#define FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC "3600" #define FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT "60" @@ -78,6 +78,8 @@ struct flb_azure_kusto { /* compress payload */ int compression_enabled; + int ingestion_resources_refresh_interval; + /* records configuration */ flb_sds_t log_key; int include_tag_key; diff --git a/plugins/out_azure_kusto/azure_kusto_conf.c b/plugins/out_azure_kusto/azure_kusto_conf.c index 79ff26cba28..e9d77c6ce34 100644 --- a/plugins/out_azure_kusto/azure_kusto_conf.c +++ b/plugins/out_azure_kusto/azure_kusto_conf.c @@ -455,12 +455,12 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, /* check if we have all resources and they are not stale */ if (ctx->resources->blob_ha && ctx->resources->queue_ha && ctx->resources->identity_token && - now - ctx->resources->load_time < FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC + generated_random_integer) { + now - ctx->resources->load_time < ctx->ingestion_resources_refresh_interval + generated_random_integer) { flb_plg_debug(ctx->ins, "resources are already loaded and are not stale"); ret = 0; } else { - flb_plg_info(ctx->ins, "loading kusto ingestion resourcs and refresh interval is %d", ,FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC + generated_random_integer); + flb_plg_info(ctx->ins, "loading kusto ingestion resourcs and refresh interval is %d", ,ctx->ingestion_resources_refresh_interval + generated_random_integer); response = execute_ingest_csl_command(ctx, ".get ingestion resources"); if (response) { From 6ffc9e717e1fa1c5064b62b979eea622e55f0d6f Mon Sep 17 00:00:00 2001 From: Tanmaya Panda Date: Wed, 21 Feb 2024 12:45:45 +0530 Subject: [PATCH 6/7] removed extra comma Signed-off-by: Tanmaya Panda --- plugins/out_azure_kusto/azure_kusto_conf.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/out_azure_kusto/azure_kusto_conf.c b/plugins/out_azure_kusto/azure_kusto_conf.c index e9d77c6ce34..052097101c9 100644 --- a/plugins/out_azure_kusto/azure_kusto_conf.c +++ b/plugins/out_azure_kusto/azure_kusto_conf.c @@ -460,7 +460,7 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, ret = 0; } else { - flb_plg_info(ctx->ins, "loading kusto ingestion resourcs and refresh interval is %d", ,ctx->ingestion_resources_refresh_interval + generated_random_integer); + flb_plg_info(ctx->ins, "loading kusto ingestion resourcs and refresh interval is %d", ctx->ingestion_resources_refresh_interval + generated_random_integer); response = execute_ingest_csl_command(ctx, ".get ingestion resources"); if (response) { From 39c200bd8d847d790fc31fe88c1c0b0b6a534714 Mon Sep 17 00:00:00 2001 From: Tanmaya Panda Date: Thu, 4 Jul 2024 17:23:08 +0530 Subject: [PATCH 7/7] added flb version and fixed return types Signed-off-by: Tanmaya Panda --- plugins/out_azure_kusto/azure_kusto.c | 10 ++++---- plugins/out_azure_kusto/azure_kusto_ingest.c | 26 ++++++++++---------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/plugins/out_azure_kusto/azure_kusto.c b/plugins/out_azure_kusto/azure_kusto.c index 4431246fcd7..3523426b69a 100644 --- a/plugins/out_azure_kusto/azure_kusto.c +++ b/plugins/out_azure_kusto/azure_kusto.c @@ -22,8 +22,9 @@ #include #include #include -#include #include +#include +#include #include "azure_kusto.h" #include "azure_kusto_conf.h" @@ -131,7 +132,6 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs flb_plg_debug(ctx->ins, "Logging attributes of flb_azure_kusto_resources:"); flb_plg_debug(ctx->ins, "blob_ha: %p", ctx->resources->blob_ha); flb_plg_debug(ctx->ins, "queue_ha: %p", ctx->resources->queue_ha); - flb_plg_debug(ctx->ins, "identity_token: %s", ctx->resources->identity_token); flb_plg_debug(ctx->ins, "load_time: %lu", ctx->resources->load_time); ctx->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout ; @@ -163,7 +163,7 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs flb_http_add_header(c, "Accept", 6, "application/json", 16); flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token)); - flb_http_add_header(c, "x-ms-client-version", 19, "Kusto.Fluent-Bit:1.0.0", 22); + flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR)); flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16); flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16); flb_http_buffer_size(c, FLB_HTTP_DATA_SIZE_MAX * 10); @@ -434,7 +434,7 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, /* JSON buffer will be cleared at cleanup: */ } } - flb_plg_trace(ctx->ins, "payload size after compression %d", final_payload_size); + flb_plg_trace(ctx->ins, "payload size after compression %zu", final_payload_size); ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size); flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret); if (ret != 0) { @@ -518,7 +518,7 @@ static struct flb_config_map config_map[] = { "This property is ignored"}, {FLB_CONFIG_MAP_TIME, "ingestion_endpoint_connect_timeout", FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT, 0, FLB_TRUE, offsetof(struct flb_azure_kusto, ingestion_endpoint_connect_timeout), - "Set the ingestion endpoint connection timeout in seconds"}, + "Set the connection timeout of various kusto endpoints (kusto ingest endpoint, kusto ingestion blob endpoint, kusto ingestion queue endpoint) in seconds"}, {FLB_CONFIG_MAP_BOOL, "compression_enabled", "true", 0, FLB_TRUE, offsetof(struct flb_azure_kusto, compression_enabled), "Enable HTTP payload compression (gzip)." }, diff --git a/plugins/out_azure_kusto/azure_kusto_ingest.c b/plugins/out_azure_kusto/azure_kusto_ingest.c index ada9007636f..13c60cfcbc2 100644 --- a/plugins/out_azure_kusto/azure_kusto_ingest.c +++ b/plugins/out_azure_kusto/azure_kusto_ingest.c @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -165,16 +166,16 @@ static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t if (u_conn) { if (pthread_mutex_lock(&ctx->blob_mutex)) { - flb_plg_error(ctx->ins, "error unlocking mutex"); - return -1; + flb_plg_error(ctx->ins, "error locking blob mutex"); + return NULL; } flb_plg_debug(ctx->ins,"inside blob before create blob uri"); uri = azure_kusto_create_blob_uri(ctx, u_node, blob_id); if (pthread_mutex_unlock(&ctx->blob_mutex)) { - flb_plg_error(ctx->ins, "error unlocking mutex"); - return -1; + flb_plg_error(ctx->ins, "error unlocking blob mutex"); + return NULL; } if (uri) { @@ -188,7 +189,7 @@ static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t flb_http_add_header(c, "x-ms-blob-type", 14, "BlockBlob", 9); flb_http_add_header(c, "x-ms-date", 9, tmp, len); flb_http_add_header(c, "x-ms-version", 12, "2019-12-12", 10); - flb_http_add_header(c, "x-ms-client-version", 19, "Kusto.Fluent-Bit:1.0.0", 22); + flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR)); flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16); flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16); @@ -253,8 +254,8 @@ static flb_sds_t create_ingestion_message(struct flb_azure_kusto *ctx, flb_sds_t if (pthread_mutex_lock(&ctx->blob_mutex)) { - flb_plg_error(ctx->ins, "error unlocking mutex"); - return -1; + flb_plg_error(ctx->ins, "error locking blob mutex"); + return NULL; } uuid = generate_uuid(); @@ -266,7 +267,6 @@ static flb_sds_t create_ingestion_message(struct flb_azure_kusto *ctx, flb_sds_t flb_plg_debug(ctx->ins,"payload size :: %lu",payload_size); flb_plg_debug(ctx->ins,"database_name :: %s",ctx->database_name); flb_plg_debug(ctx->ins,"table name :: %s",ctx->table_name); - flb_plg_debug(ctx->ins,"identity token :: %s", ctx->resources->identity_token); if (message) { message_len = @@ -325,8 +325,8 @@ static flb_sds_t create_ingestion_message(struct flb_azure_kusto *ctx, flb_sds_t if (pthread_mutex_unlock(&ctx->blob_mutex)) { - flb_plg_error(ctx->ins, "error unlocking mutex"); - return -1; + flb_plg_error(ctx->ins, "error unlocking blob mutex"); + return NULL; } return message; @@ -425,7 +425,7 @@ static int azure_kusto_enqueue_ingestion(struct flb_azure_kusto *ctx, flb_sds_t 20); flb_http_add_header(c, "x-ms-date", 9, tmp, len); flb_http_add_header(c, "x-ms-version", 12, "2019-12-12", 10); - flb_http_add_header(c, "x-ms-client-version", 19, "Kusto.Fluent-Bit:1.0.0", 22); + flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR)); flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16); flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16); @@ -531,7 +531,7 @@ int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, if (pthread_mutex_lock(&ctx->blob_mutex)) { flb_plg_error(ctx->ins, "error unlocking mutex"); - return NULL; + return -1; } /* flb________ */ @@ -540,7 +540,7 @@ int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, if (pthread_mutex_unlock(&ctx->blob_mutex)) { flb_plg_error(ctx->ins, "error unlocking mutex"); - return NULL; + return -1; } if (blob_id) {