From c3ee584e11c5e34e08ba1b34485d11ee80628d2e Mon Sep 17 00:00:00 2001 From: root Date: Sat, 27 Jan 2024 22:54:04 +0000 Subject: [PATCH 1/5] fixed multiple file input Signed-off-by: Tanmaya Panda --- plugins/out_azure_kusto/azure_kusto.c | 44 ++++++++++ plugins/out_azure_kusto/azure_kusto.h | 9 ++ plugins/out_azure_kusto/azure_kusto_conf.c | 47 +++++++---- plugins/out_azure_kusto/azure_kusto_ingest.c | 88 ++++++++++++++++++++ 4 files changed, 170 insertions(+), 18 deletions(-) diff --git a/plugins/out_azure_kusto/azure_kusto.c b/plugins/out_azure_kusto/azure_kusto.c index 4b8ad9b82e0..ee47e326e2d 100644 --- a/plugins/out_azure_kusto/azure_kusto.c +++ b/plugins/out_azure_kusto/azure_kusto.c @@ -98,6 +98,10 @@ flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx) ctx->o->access_token); } + // if (output){ + // flb_sds_destroy(output); + // } + if (pthread_mutex_unlock(&ctx->token_mutex)) { flb_plg_error(ctx->ins, "error unlocking mutex"); if (output) { @@ -126,16 +130,36 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs struct flb_http_client *c; flb_sds_t resp = NULL; + flb_plg_debug(ctx->ins, "before getting upstream connection"); + + //flb_upstream_init(); + + flb_plg_debug(ctx->ins, "Logging attributes of flb_azure_kusto_resources:"); + flb_plg_debug(ctx->ins, "blob_ha: %p", ctx->resources->blob_ha); + flb_plg_debug(ctx->ins, "queue_ha: %p", ctx->resources->queue_ha); + flb_plg_debug(ctx->ins, "identity_token: %s", ctx->resources->identity_token); + flb_plg_debug(ctx->ins, "load_time: %lu", ctx->resources->load_time); + + ctx->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout ; + ctx->u->base.net.keepalive_max_recycle = ctx->keep_alive_max_connection_recycle; + /* Get upstream connection */ u_conn = flb_upstream_conn_get(ctx->u); + flb_plg_debug(ctx->ins, "inside execute ingest csl commnad :: after flb_upstream_conn_get"); + + if (!u_conn) { + FLB_OUTPUT_RETURN(FLB_RETRY); + } if (u_conn) { token = get_azure_kusto_token(ctx); + flb_plg_debug(ctx->ins, "after get azure kusto token"); if (token) { /* Compose request body */ body = flb_sds_create_size(sizeof(FLB_AZURE_KUSTO_MGMT_BODY_TEMPLATE) - 1 + strlen(csl)); + flb_plg_debug(ctx->ins, "after flb sds create size"); if (body) { flb_sds_snprintf(&body, flb_sds_alloc(body), @@ -212,6 +236,8 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi int io_flags = FLB_IO_TLS; struct flb_azure_kusto *ctx; + flb_plg_debug(ins, "inside azure kusto init"); + /* Create config context */ ctx = flb_azure_kusto_conf_create(ins, config); if (!ctx) { @@ -231,6 +257,7 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi */ pthread_mutex_init(&ctx->token_mutex, NULL); pthread_mutex_init(&ctx->resources_mutex, NULL); + pthread_mutex_init(&ctx->blob_mutex, NULL); /* * Create upstream context for Kusto Ingestion endpoint @@ -241,6 +268,8 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi return -1; } + // ctx->u->base.net.keepalive = FLB_FALSE; + /* Create oauth2 context */ ctx->o = flb_oauth2_create(ctx->config, ctx->oauth_url, FLB_AZURE_KUSTO_TOKEN_REFRESH); @@ -250,6 +279,8 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi } flb_output_upstream_set(ctx->u, ins); + flb_plg_debug(ctx->ins, "azure kusto init completed"); + return 0; } @@ -377,6 +408,7 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, /* Load or refresh ingestion resources */ ret = azure_kusto_load_ingestion_resources(ctx, config); + flb_plg_trace(ctx->ins, "after flushing bytes xxxx %d", ret); if (ret != 0) { flb_plg_error(ctx->ins, "cannot load ingestion resources"); FLB_OUTPUT_RETURN(FLB_RETRY); @@ -385,12 +417,14 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, /* Reformat msgpack to JSON payload */ ret = azure_kusto_format(ctx, event_chunk->tag, tag_len, event_chunk->data, event_chunk->size, (void **)&json, &json_size); + flb_plg_trace(ctx->ins, "after kusto format xxxx %d", ret); if (ret != 0) { flb_plg_error(ctx->ins, "cannot reformat data into json"); FLB_OUTPUT_RETURN(FLB_RETRY); } ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, json, json_size); + flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret); if (ret != 0) { flb_plg_error(ctx->ins, "cannot perform queued ingestion"); flb_sds_destroy(json); @@ -417,6 +451,10 @@ static int cb_azure_kusto_exit(void *data, struct flb_config *config) ctx->u = NULL; } + pthread_mutex_destroy(&ctx->resources_mutex); + pthread_mutex_destroy(&ctx->token_mutex); + pthread_mutex_destroy(&ctx->blob_mutex); + flb_azure_kusto_conf_destroy(ctx); return 0; @@ -462,6 +500,12 @@ static struct flb_config_map config_map[] = { offsetof(struct flb_azure_kusto, time_key), "The key name of the time. If 'include_time_key' is false, " "This property is ignored"}, + {FLB_CONFIG_MAP_TIME, "ingestion_endpoint_connect_timeout", FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT, 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, ingestion_endpoint_connect_timeout), + "Set the ingestion endpoint connection timeout in seconds"}, + {FLB_CONFIG_MAP_INT, "keep_alive_max_connection_recycle", FLB_AZURE_KUSTO_KEEP_ALIVE_MAX_RECYCLE, 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, keep_alive_max_connection_recycle), + "Set the max connection recycle value of keep alive connections"}, /* EOF */ {0}}; diff --git a/plugins/out_azure_kusto/azure_kusto.h b/plugins/out_azure_kusto/azure_kusto.h index ac4eedfd0a8..cd0199ba0d1 100644 --- a/plugins/out_azure_kusto/azure_kusto.h +++ b/plugins/out_azure_kusto/azure_kusto.h @@ -51,6 +51,10 @@ #define FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC 3600 +#define FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT "60" +#define FLB_AZURE_KUSTO_KEEP_ALIVE_MAX_RECYCLE "20" + + struct flb_azure_kusto_resources { struct flb_upstream_ha *blob_ha; struct flb_upstream_ha *queue_ha; @@ -70,6 +74,9 @@ struct flb_azure_kusto { flb_sds_t table_name; flb_sds_t ingestion_mapping_reference; + int ingestion_endpoint_connect_timeout; + int keep_alive_max_connection_recycle; + /* records configuration */ flb_sds_t log_key; int include_tag_key; @@ -94,6 +101,8 @@ struct flb_azure_kusto { /* mutex for loading reosurces */ pthread_mutex_t resources_mutex; + pthread_mutex_t blob_mutex; + /* Upstream connection to the backend server */ struct flb_upstream *u; diff --git a/plugins/out_azure_kusto/azure_kusto_conf.c b/plugins/out_azure_kusto/azure_kusto_conf.c index 5303fef672d..d1f9c0a48db 100644 --- a/plugins/out_azure_kusto/azure_kusto_conf.c +++ b/plugins/out_azure_kusto/azure_kusto_conf.c @@ -416,6 +416,8 @@ static flb_sds_t parse_ingestion_identity_token(struct flb_azure_kusto *ctx, return identity_token; } + + int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, struct flb_config *config) { @@ -426,11 +428,6 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, struct flb_upstream_ha *queue_ha = NULL; time_t now; - if (pthread_mutex_lock(&ctx->resources_mutex)) { - flb_plg_error(ctx->ins, "error locking mutex"); - return -1; - } - now = time(NULL); /* check if we have all resources and they are not stale */ @@ -451,9 +448,19 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, blob_ha = flb_upstream_ha_create("azure_kusto_blob_ha"); if (blob_ha) { + + if (pthread_mutex_lock(&ctx->resources_mutex)) { + flb_plg_error(ctx->ins, "error locking mutex"); + return -1; + } ret = parse_storage_resources(ctx, config, response, blob_ha, queue_ha); + if (pthread_mutex_unlock(&ctx->resources_mutex)) { + flb_plg_error(ctx->ins, "error unlocking mutex"); + return -1; + } + if (ret == 0) { flb_sds_destroy(response); response = NULL; @@ -461,26 +468,30 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, response = execute_ingest_csl_command(ctx, ".get kusto identity token"); + if (pthread_mutex_lock(&ctx->resources_mutex)) { + flb_plg_error(ctx->ins, "error locking mutex"); + return -1; + } if (response) { identity_token = parse_ingestion_identity_token(ctx, response); if (identity_token) { - ret = flb_azure_kusto_resources_clear(ctx->resources); + // ret = flb_azure_kusto_resources_clear(ctx->resources); - if (ret != -1) { + // if (ret != -1) { ctx->resources->blob_ha = blob_ha; ctx->resources->queue_ha = queue_ha; ctx->resources->identity_token = identity_token; ctx->resources->load_time = now; ret = 0; - } - else { - flb_plg_error( - ctx->ins, - "error destroying previous ingestion resources"); - } + //} + // else { + // flb_plg_error( + // ctx->ins, + // "error destroying previous ingestion resources"); + // } } else { flb_plg_error(ctx->ins, @@ -492,6 +503,11 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, flb_plg_error(ctx->ins, "error getting kusto identity token"); ret = -1; } + + if (pthread_mutex_unlock(&ctx->resources_mutex)) { + flb_plg_error(ctx->ins, "error unlocking mutex"); + return -1; + } } else { flb_plg_error(ctx->ins, @@ -525,11 +541,6 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, } } - if (pthread_mutex_unlock(&ctx->resources_mutex)) { - flb_plg_error(ctx->ins, "error unlocking mutex"); - return -1; - } - return ret; } diff --git a/plugins/out_azure_kusto/azure_kusto_ingest.c b/plugins/out_azure_kusto/azure_kusto_ingest.c index d38d92e7fd8..028b5757d16 100644 --- a/plugins/out_azure_kusto/azure_kusto_ingest.c +++ b/plugins/out_azure_kusto/azure_kusto_ingest.c @@ -137,21 +137,66 @@ static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t char tmp[64]; int len; + struct timespec ts; + now = time(NULL); gmtime_r(&now, &tm); len = strftime(tmp, sizeof(tmp) - 1, "%a, %d %b %Y %H:%M:%S GMT", &tm); + + flb_plg_debug(ctx->ins,"inside blob before locking mutex"); + + + + // if (pthread_mutex_lock(&ctx->blob_mutex)) { + // flb_plg_error(ctx->ins, "error locking mutex"); + // return -1; + // } + + + flb_plg_debug(ctx->ins,"inside blob after locking mutex"); + u_node = flb_upstream_ha_node_get(ctx->resources->blob_ha); if (!u_node) { flb_plg_error(ctx->ins, "error getting blob upstream"); return NULL; } + + // if (pthread_mutex_unlock(&ctx->blob_mutex)) { + // flb_plg_error(ctx->ins, "error unlocking mutex"); + // return -1; + // } + + + flb_plg_debug(ctx->ins,"inside blob after upstream ha node get"); + u_node->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout; + u_node->u->base.net.keepalive_max_recycle = ctx->keep_alive_max_connection_recycle; + u_conn = flb_upstream_conn_get(u_node->u); + + //if (pthread_mutex_unlock(&ctx->blob_mutex)) { + // flb_plg_error(ctx->ins, "error unlocking mutex"); + // return -1; + //} + + if (!u_conn) { + flb_plg_error(ctx->ins,"cannot create upstream connection for blob commit"); + return FLB_RETRY; + } + + + flb_plg_debug(ctx->ins,"inside blob after retrieving connection"); + if (u_conn) { + + flb_plg_debug(ctx->ins,"inside blob before create blob uri"); uri = azure_kusto_create_blob_uri(ctx, u_node, blob_id); + + flb_plg_debug(ctx->ins,"inside blob after create blob uri"); + if (uri) { flb_plg_debug(ctx->ins, "uploading payload to blob uri: %s", uri); c = flb_http_client(u_conn, FLB_HTTP_PUT, uri, payload, payload_size, NULL, 0, @@ -188,6 +233,7 @@ static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t } flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); } else { flb_plg_error(ctx->ins, @@ -209,6 +255,12 @@ static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t flb_plg_error(ctx->ins, "error getting blob container upstream connection"); } + + //if (pthread_mutex_unlock(&ctx->blob_mutex)) { + // flb_plg_error(ctx->ins, "error unlocking mutex"); + // return -1; + //} + return uri; } @@ -222,10 +274,23 @@ static flb_sds_t create_ingestion_message(struct flb_azure_kusto *ctx, flb_sds_t size_t b64_len; size_t message_len; + + if (pthread_mutex_lock(&ctx->blob_mutex)) { + flb_plg_error(ctx->ins, "error unlocking mutex"); + return -1; + } + uuid = generate_uuid(); if (uuid) { message = flb_sds_create(NULL); + flb_plg_debug(ctx->ins,"uuid :: %s",uuid); + flb_plg_debug(ctx->ins,"blob uri :: %s",blob_uri); + flb_plg_debug(ctx->ins,"payload size :: %lu",payload_size); + flb_plg_debug(ctx->ins,"database_name :: %s",ctx->database_name); + flb_plg_debug(ctx->ins,"table name :: %s",ctx->table_name); + flb_plg_debug(ctx->ins,"identity token :: %s", ctx->resources->identity_token); + if (message) { message_len = flb_sds_snprintf(&message, 0, @@ -281,6 +346,12 @@ static flb_sds_t create_ingestion_message(struct flb_azure_kusto *ctx, flb_sds_t flb_plg_error(ctx->ins, "error generating unique ingestion UUID"); } + + if (pthread_mutex_unlock(&ctx->blob_mutex)) { + flb_plg_error(ctx->ins, "error unlocking mutex"); + return -1; + } + return message; } @@ -348,6 +419,11 @@ static int azure_kusto_enqueue_ingestion(struct flb_azure_kusto *ctx, flb_sds_t return -1; } + + u_node->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout; + u_node->u->base.net.keepalive_max_recycle = ctx->keep_alive_max_connection_recycle; + + u_conn = flb_upstream_conn_get(u_node->u); if (u_conn) { @@ -466,9 +542,21 @@ int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, flb_sds_t blob_id; flb_sds_t blob_uri; + + if (pthread_mutex_lock(&ctx->blob_mutex)) { + flb_plg_error(ctx->ins, "error unlocking mutex"); + return NULL; + } + /* flb________ */ blob_id = azure_kusto_create_blob_id(ctx, tag, tag_len); + + if (pthread_mutex_unlock(&ctx->blob_mutex)) { + flb_plg_error(ctx->ins, "error unlocking mutex"); + return NULL; + } + if (blob_id) { blob_uri = azure_kusto_create_blob(ctx, blob_id, payload, payload_size); From b00f91d336ced2e1d7d8a274902b72327098c93c Mon Sep 17 00:00:00 2001 From: Tanmaya Panda Date: Sun, 28 Jan 2024 16:48:44 +0530 Subject: [PATCH 2/5] out_azure_kusto : fixed return values and added fluentbit version to azure kusto http headers out_azure_kusto : removed extra comma out_azure_kusto : made refresh interval configurable out_azure_kusto : randomized refresh interval out_azure_kusto : azure kusto ingestion resources dynamic parsing out_azure_kusto : added kusto http header and randomizing ingestion resources loading out_azure_kusto : added additional mutexes Signed-off-by: Tanmaya Panda --- plugins/out_azure_kusto/azure_kusto.c | 68 +++++++++----- plugins/out_azure_kusto/azure_kusto.h | 11 ++- plugins/out_azure_kusto/azure_kusto_conf.c | 82 +++++++++------- plugins/out_azure_kusto/azure_kusto_ingest.c | 98 ++++++++------------ 4 files changed, 136 insertions(+), 123 deletions(-) diff --git a/plugins/out_azure_kusto/azure_kusto.c b/plugins/out_azure_kusto/azure_kusto.c index ee47e326e2d..65e2bd2c219 100644 --- a/plugins/out_azure_kusto/azure_kusto.c +++ b/plugins/out_azure_kusto/azure_kusto.c @@ -2,7 +2,7 @@ /* Fluent Bit * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors + * Copyright (C) 2015-2024 The Fluent Bit Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,8 +22,9 @@ #include #include #include -#include #include +#include +#include #include "azure_kusto.h" #include "azure_kusto_conf.h" @@ -98,10 +99,6 @@ flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx) ctx->o->access_token); } - // if (output){ - // flb_sds_destroy(output); - // } - if (pthread_mutex_unlock(&ctx->token_mutex)) { flb_plg_error(ctx->ins, "error unlocking mutex"); if (output) { @@ -132,34 +129,24 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs flb_plg_debug(ctx->ins, "before getting upstream connection"); - //flb_upstream_init(); - flb_plg_debug(ctx->ins, "Logging attributes of flb_azure_kusto_resources:"); flb_plg_debug(ctx->ins, "blob_ha: %p", ctx->resources->blob_ha); flb_plg_debug(ctx->ins, "queue_ha: %p", ctx->resources->queue_ha); - flb_plg_debug(ctx->ins, "identity_token: %s", ctx->resources->identity_token); flb_plg_debug(ctx->ins, "load_time: %lu", ctx->resources->load_time); ctx->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout ; - ctx->u->base.net.keepalive_max_recycle = ctx->keep_alive_max_connection_recycle; /* Get upstream connection */ u_conn = flb_upstream_conn_get(ctx->u); - flb_plg_debug(ctx->ins, "inside execute ingest csl commnad :: after flb_upstream_conn_get"); - - if (!u_conn) { - FLB_OUTPUT_RETURN(FLB_RETRY); - } if (u_conn) { token = get_azure_kusto_token(ctx); - flb_plg_debug(ctx->ins, "after get azure kusto token"); + flb_plg_debug(ctx->ins, "after get azure kusto token"); if (token) { /* Compose request body */ body = flb_sds_create_size(sizeof(FLB_AZURE_KUSTO_MGMT_BODY_TEMPLATE) - 1 + strlen(csl)); - flb_plg_debug(ctx->ins, "after flb sds create size"); if (body) { flb_sds_snprintf(&body, flb_sds_alloc(body), @@ -176,6 +163,9 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs flb_http_add_header(c, "Accept", 6, "application/json", 16); flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token)); + flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR)); + flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16); + flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16); flb_http_buffer_size(c, FLB_HTTP_DATA_SIZE_MAX * 10); /* Send HTTP request */ @@ -268,8 +258,6 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi return -1; } - // ctx->u->base.net.keepalive = FLB_FALSE; - /* Create oauth2 context */ ctx->o = flb_oauth2_create(ctx->config, ctx->oauth_url, FLB_AZURE_KUSTO_TOKEN_REFRESH); @@ -398,10 +386,14 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, size_t json_size; size_t tag_len; struct flb_azure_kusto *ctx = out_context; + int is_compressed = FLB_FALSE; (void)i_ins; (void)config; + void *final_payload = NULL; + size_t final_payload_size = 0; + flb_plg_trace(ctx->ins, "flushing bytes %zu", event_chunk->size); tag_len = flb_sds_len(event_chunk->tag); @@ -423,7 +415,27 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, FLB_OUTPUT_RETURN(FLB_RETRY); } - ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, json, json_size); + flb_plg_trace(ctx->ins, "payload size before compression %d", json_size); + /* Map buffer */ + final_payload = json; + final_payload_size = json_size; + if (ctx->compression_enabled == FLB_TRUE) { + ret = flb_gzip_compress((void *) json, json_size, + &final_payload, &final_payload_size); + if (ret != 0) { + flb_plg_error(ctx->ins, + "cannot gzip payload"); + flb_sds_destroy(json); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + else { + is_compressed = FLB_TRUE; + flb_plg_debug(ctx->ins, "enabled payload gzip compression"); + /* JSON buffer will be cleared at cleanup: */ + } + } + flb_plg_trace(ctx->ins, "payload size after compression %zu", final_payload_size); + ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size); flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret); if (ret != 0) { flb_plg_error(ctx->ins, "cannot perform queued ingestion"); @@ -434,6 +446,10 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, /* Cleanup */ flb_sds_destroy(json); + /* release compressed payload */ + if (is_compressed == FLB_TRUE) { + flb_free(final_payload); + } /* Done */ FLB_OUTPUT_RETURN(FLB_OK); } @@ -502,10 +518,14 @@ static struct flb_config_map config_map[] = { "This property is ignored"}, {FLB_CONFIG_MAP_TIME, "ingestion_endpoint_connect_timeout", FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT, 0, FLB_TRUE, offsetof(struct flb_azure_kusto, ingestion_endpoint_connect_timeout), - "Set the ingestion endpoint connection timeout in seconds"}, - {FLB_CONFIG_MAP_INT, "keep_alive_max_connection_recycle", FLB_AZURE_KUSTO_KEEP_ALIVE_MAX_RECYCLE, 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, keep_alive_max_connection_recycle), - "Set the max connection recycle value of keep alive connections"}, + "Set the connection timeout of various kusto endpoints (kusto ingest endpoint, kusto ingestion blob endpoint, kusto ingestion queue endpoint) in seconds"}, + {FLB_CONFIG_MAP_BOOL, "compression_enabled", "true", 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, compression_enabled), "Enable HTTP payload compression (gzip)." + }, + {FLB_CONFIG_MAP_TIME, "ingestion_resources_refresh_interval", FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC,0, FLB_TRUE, + offsetof(struct flb_azure_kusto, ingestion_resources_refresh_interval), + "Set the azure kusto ingestion resources refresh interval" + }, /* EOF */ {0}}; diff --git a/plugins/out_azure_kusto/azure_kusto.h b/plugins/out_azure_kusto/azure_kusto.h index cd0199ba0d1..9e3eb7b3182 100644 --- a/plugins/out_azure_kusto/azure_kusto.h +++ b/plugins/out_azure_kusto/azure_kusto.h @@ -2,7 +2,7 @@ /* Fluent Bit * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors + * Copyright (C) 2015-2024 The Fluent Bit Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -49,10 +49,9 @@ #define AZURE_KUSTO_RESOURCE_UPSTREAM_URI "uri" #define AZURE_KUSTO_RESOURCE_UPSTREAM_SAS "sas" -#define FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC 3600 +#define FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC "3600" #define FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT "60" -#define FLB_AZURE_KUSTO_KEEP_ALIVE_MAX_RECYCLE "20" struct flb_azure_kusto_resources { @@ -75,7 +74,11 @@ struct flb_azure_kusto { flb_sds_t ingestion_mapping_reference; int ingestion_endpoint_connect_timeout; - int keep_alive_max_connection_recycle; + + /* compress payload */ + int compression_enabled; + + int ingestion_resources_refresh_interval; /* records configuration */ flb_sds_t log_key; diff --git a/plugins/out_azure_kusto/azure_kusto_conf.c b/plugins/out_azure_kusto/azure_kusto_conf.c index d1f9c0a48db..052097101c9 100644 --- a/plugins/out_azure_kusto/azure_kusto_conf.c +++ b/plugins/out_azure_kusto/azure_kusto_conf.c @@ -2,7 +2,7 @@ /* Fluent Bit * ========== - * Copyright (C) 2015-2022 The Fluent Bit Authors + * Copyright (C) 2015-2024 The Fluent Bit Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -160,8 +160,8 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi { jsmn_parser parser; jsmntok_t *t; - jsmntok_t *tokens; - int tok_size = 100; + jsmntok_t *tokens = NULL; + //int tok_size = 100; int ret = -1; int i; int blob_count = 0; @@ -199,10 +199,13 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi } jsmn_init(&parser); - tokens = flb_calloc(1, sizeof(jsmntok_t) * tok_size); + //tokens = flb_calloc(1, sizeof(jsmntok_t) * tok_size); + + /* Dynamically allocate memory for tokens based on response length */ + tokens = flb_calloc(1, sizeof(jsmntok_t) * (flb_sds_len(response))); if (tokens) { - ret = jsmn_parse(&parser, response, flb_sds_len(response), tokens, tok_size); + ret = jsmn_parse(&parser, response, flb_sds_len(response), tokens, flb_sds_len(response)); if (ret > 0) { /* skip all tokens until we reach "Rows" */ @@ -416,7 +419,23 @@ static flb_sds_t parse_ingestion_identity_token(struct flb_azure_kusto *ctx, return identity_token; } - + + +/** + * This method returns random integers from range -600 to +600 which needs to be added + * to the kusto ingestion resources refresh interval to even out the spikes + * in kusto DM for .get ingestion resources upon expiry + * */ +int azure_kusto_generate_random_integer() { + // Seed the random number generator + int pid = getpid(); + unsigned long address = (unsigned long)&address; + unsigned int seed = pid ^ (address & 0xFFFFFFFF) * time(0); + srand(seed); + // Generate a random integer in the range [-600, 600] + int random_integer = rand() % 1201 - 600; + return random_integer; +} int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, struct flb_config *config) @@ -428,17 +447,20 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, struct flb_upstream_ha *queue_ha = NULL; time_t now; + int generated_random_integer = azure_kusto_generate_random_integer(); + flb_plg_debug(ctx->ins, "generated random integer is %d", generated_random_integer); + now = time(NULL); /* check if we have all resources and they are not stale */ if (ctx->resources->blob_ha && ctx->resources->queue_ha && ctx->resources->identity_token && - now - ctx->resources->load_time < FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC) { + now - ctx->resources->load_time < ctx->ingestion_resources_refresh_interval + generated_random_integer) { flb_plg_debug(ctx->ins, "resources are already loaded and are not stale"); ret = 0; } else { - flb_plg_info(ctx->ins, "loading kusto ingestion resourcs"); + flb_plg_info(ctx->ins, "loading kusto ingestion resourcs and refresh interval is %d", ctx->ingestion_resources_refresh_interval + generated_random_integer); response = execute_ingest_csl_command(ctx, ".get ingestion resources"); if (response) { @@ -448,18 +470,18 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, blob_ha = flb_upstream_ha_create("azure_kusto_blob_ha"); if (blob_ha) { - - if (pthread_mutex_lock(&ctx->resources_mutex)) { - flb_plg_error(ctx->ins, "error locking mutex"); - return -1; - } + + if (pthread_mutex_lock(&ctx->resources_mutex)) { + flb_plg_error(ctx->ins, "error locking mutex"); + return -1; + } ret = parse_storage_resources(ctx, config, response, blob_ha, queue_ha); - if (pthread_mutex_unlock(&ctx->resources_mutex)) { - flb_plg_error(ctx->ins, "error unlocking mutex"); - return -1; - } + if (pthread_mutex_unlock(&ctx->resources_mutex)) { + flb_plg_error(ctx->ins, "error unlocking mutex"); + return -1; + } if (ret == 0) { flb_sds_destroy(response); @@ -468,46 +490,36 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, response = execute_ingest_csl_command(ctx, ".get kusto identity token"); - if (pthread_mutex_lock(&ctx->resources_mutex)) { - flb_plg_error(ctx->ins, "error locking mutex"); - return -1; - } if (response) { + if (pthread_mutex_lock(&ctx->resources_mutex)) { + flb_plg_error(ctx->ins, "error locking mutex"); + return -1; + } identity_token = parse_ingestion_identity_token(ctx, response); if (identity_token) { - // ret = flb_azure_kusto_resources_clear(ctx->resources); - - // if (ret != -1) { ctx->resources->blob_ha = blob_ha; ctx->resources->queue_ha = queue_ha; ctx->resources->identity_token = identity_token; ctx->resources->load_time = now; ret = 0; - //} - // else { - // flb_plg_error( - // ctx->ins, - // "error destroying previous ingestion resources"); - // } } else { flb_plg_error(ctx->ins, "error parsing ingestion identity token"); ret = -1; } + if (pthread_mutex_unlock(&ctx->resources_mutex)) { + flb_plg_error(ctx->ins, "error unlocking mutex"); + return -1; + } } else { flb_plg_error(ctx->ins, "error getting kusto identity token"); ret = -1; } - - if (pthread_mutex_unlock(&ctx->resources_mutex)) { - flb_plg_error(ctx->ins, "error unlocking mutex"); - return -1; - } } else { flb_plg_error(ctx->ins, diff --git a/plugins/out_azure_kusto/azure_kusto_ingest.c b/plugins/out_azure_kusto/azure_kusto_ingest.c index 028b5757d16..14f714bcb48 100644 --- a/plugins/out_azure_kusto/azure_kusto_ingest.c +++ b/plugins/out_azure_kusto/azure_kusto_ingest.c @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -143,19 +144,6 @@ static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t gmtime_r(&now, &tm); len = strftime(tmp, sizeof(tmp) - 1, "%a, %d %b %Y %H:%M:%S GMT", &tm); - - flb_plg_debug(ctx->ins,"inside blob before locking mutex"); - - - - // if (pthread_mutex_lock(&ctx->blob_mutex)) { - // flb_plg_error(ctx->ins, "error locking mutex"); - // return -1; - // } - - - flb_plg_debug(ctx->ins,"inside blob after locking mutex"); - u_node = flb_upstream_ha_node_get(ctx->resources->blob_ha); if (!u_node) { flb_plg_error(ctx->ins, "error getting blob upstream"); @@ -163,39 +151,24 @@ static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t } - // if (pthread_mutex_unlock(&ctx->blob_mutex)) { - // flb_plg_error(ctx->ins, "error unlocking mutex"); - // return -1; - // } - - flb_plg_debug(ctx->ins,"inside blob after upstream ha node get"); u_node->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout; - u_node->u->base.net.keepalive_max_recycle = ctx->keep_alive_max_connection_recycle; u_conn = flb_upstream_conn_get(u_node->u); - - //if (pthread_mutex_unlock(&ctx->blob_mutex)) { - // flb_plg_error(ctx->ins, "error unlocking mutex"); - // return -1; - //} - - if (!u_conn) { - flb_plg_error(ctx->ins,"cannot create upstream connection for blob commit"); - return FLB_RETRY; - } - - - flb_plg_debug(ctx->ins,"inside blob after retrieving connection"); - if (u_conn) { - - flb_plg_debug(ctx->ins,"inside blob before create blob uri"); - uri = azure_kusto_create_blob_uri(ctx, u_node, blob_id); + if (pthread_mutex_lock(&ctx->blob_mutex)) { + flb_plg_error(ctx->ins, "error locking blob mutex"); + return NULL; + } + flb_plg_debug(ctx->ins,"inside blob before create blob uri"); + uri = azure_kusto_create_blob_uri(ctx, u_node, blob_id); - flb_plg_debug(ctx->ins,"inside blob after create blob uri"); + if (pthread_mutex_unlock(&ctx->blob_mutex)) { + flb_plg_error(ctx->ins, "error unlocking blob mutex"); + return NULL; + } if (uri) { flb_plg_debug(ctx->ins, "uploading payload to blob uri: %s", uri); @@ -208,6 +181,10 @@ static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t flb_http_add_header(c, "x-ms-blob-type", 14, "BlockBlob", 9); flb_http_add_header(c, "x-ms-date", 9, tmp, len); flb_http_add_header(c, "x-ms-version", 12, "2019-12-12", 10); + flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR)); + flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16); + flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16); + ret = flb_http_do(c, &resp_size); flb_plg_debug(ctx->ins, @@ -233,7 +210,6 @@ static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t } flb_http_client_destroy(c); - flb_upstream_conn_release(u_conn); } else { flb_plg_error(ctx->ins, @@ -255,12 +231,6 @@ static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t flb_plg_error(ctx->ins, "error getting blob container upstream connection"); } - - //if (pthread_mutex_unlock(&ctx->blob_mutex)) { - // flb_plg_error(ctx->ins, "error unlocking mutex"); - // return -1; - //} - return uri; } @@ -276,20 +246,19 @@ static flb_sds_t create_ingestion_message(struct flb_azure_kusto *ctx, flb_sds_t if (pthread_mutex_lock(&ctx->blob_mutex)) { - flb_plg_error(ctx->ins, "error unlocking mutex"); - return -1; + flb_plg_error(ctx->ins, "error locking blob mutex"); + return NULL; } uuid = generate_uuid(); if (uuid) { message = flb_sds_create(NULL); - flb_plg_debug(ctx->ins,"uuid :: %s",uuid); - flb_plg_debug(ctx->ins,"blob uri :: %s",blob_uri); - flb_plg_debug(ctx->ins,"payload size :: %lu",payload_size); - flb_plg_debug(ctx->ins,"database_name :: %s",ctx->database_name); - flb_plg_debug(ctx->ins,"table name :: %s",ctx->table_name); - flb_plg_debug(ctx->ins,"identity token :: %s", ctx->resources->identity_token); + flb_plg_debug(ctx->ins,"uuid :: %s",uuid); + flb_plg_debug(ctx->ins,"blob uri :: %s",blob_uri); + flb_plg_debug(ctx->ins,"payload size :: %lu",payload_size); + flb_plg_debug(ctx->ins,"database_name :: %s",ctx->database_name); + flb_plg_debug(ctx->ins,"table name :: %s",ctx->table_name); if (message) { message_len = @@ -348,8 +317,8 @@ static flb_sds_t create_ingestion_message(struct flb_azure_kusto *ctx, flb_sds_t if (pthread_mutex_unlock(&ctx->blob_mutex)) { - flb_plg_error(ctx->ins, "error unlocking mutex"); - return -1; + flb_plg_error(ctx->ins, "error unlocking blob mutex"); + return NULL; } return message; @@ -418,17 +387,23 @@ static int azure_kusto_enqueue_ingestion(struct flb_azure_kusto *ctx, flb_sds_t flb_plg_error(ctx->ins, "error getting queue upstream"); return -1; } - u_node->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout; - u_node->u->base.net.keepalive_max_recycle = ctx->keep_alive_max_connection_recycle; - u_conn = flb_upstream_conn_get(u_node->u); if (u_conn) { + if (pthread_mutex_lock(&ctx->blob_mutex)) { + flb_plg_error(ctx->ins, "error unlocking mutex"); + return -1; + } uri = azure_kusto_create_queue_uri(ctx, u_node); + if (pthread_mutex_unlock(&ctx->blob_mutex)) { + flb_plg_error(ctx->ins, "error unlocking mutex"); + return -1; + } + if (uri) { payload = create_ingestion_message(ctx, blob_uri, payload_size); @@ -442,6 +417,9 @@ static int azure_kusto_enqueue_ingestion(struct flb_azure_kusto *ctx, flb_sds_t 20); flb_http_add_header(c, "x-ms-date", 9, tmp, len); flb_http_add_header(c, "x-ms-version", 12, "2019-12-12", 10); + flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR)); + flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16); + flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16); ret = flb_http_do(c, &resp_size); flb_plg_debug(ctx->ins, @@ -545,7 +523,7 @@ int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, if (pthread_mutex_lock(&ctx->blob_mutex)) { flb_plg_error(ctx->ins, "error unlocking mutex"); - return NULL; + return -1; } /* flb____
____ */ @@ -554,7 +532,7 @@ int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, if (pthread_mutex_unlock(&ctx->blob_mutex)) { flb_plg_error(ctx->ins, "error unlocking mutex"); - return NULL; + return -1; } if (blob_id) { From a70b60f76f3fc8e3e60fe80f32e520dd4bdc19f8 Mon Sep 17 00:00:00 2001 From: Ramachandran A G Date: Mon, 19 Feb 2024 11:23:44 +0530 Subject: [PATCH 3/5] out_azure_kusto: introduced gzip compression Signed-off-by: Tanmaya Panda --- plugins/out_azure_kusto/azure_kusto_ingest.c | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/plugins/out_azure_kusto/azure_kusto_ingest.c b/plugins/out_azure_kusto/azure_kusto_ingest.c index 14f714bcb48..a5ec2aebb8f 100644 --- a/plugins/out_azure_kusto/azure_kusto_ingest.c +++ b/plugins/out_azure_kusto/azure_kusto_ingest.c @@ -93,6 +93,14 @@ static flb_sds_t azure_kusto_create_blob_uri(struct flb_azure_kusto *ctx, size_t blob_uri_size; char *blob_sas; size_t blob_sas_size; + const char *extension; + + if (ctx->compression_enabled) { + extension = ".multijson.gz"; + } + else { + extension = ".multijson"; + } ret = flb_hash_table_get(u_node->ht, AZURE_KUSTO_RESOURCE_UPSTREAM_URI, 3, (void **)&blob_uri, &blob_uri_size); @@ -110,11 +118,11 @@ static flb_sds_t azure_kusto_create_blob_uri(struct flb_azure_kusto *ctx, /* uri will be https:////.multijson? */ uri = flb_sds_create_size(flb_sds_len(u_node->host) + blob_uri_size + blob_sas_size + - flb_sds_len(blob_id) + 21); + flb_sds_len(blob_id) + 11 + strlen(extension)); if (uri) { - flb_sds_snprintf(&uri, flb_sds_alloc(uri), "https://%s%s/%s.multijson?%s", - u_node->host, blob_uri, blob_id, blob_sas); + flb_sds_snprintf(&uri, flb_sds_alloc(uri), "https://%s%s/%s%s?%s", + u_node->host, blob_uri, blob_id, extension ,blob_sas); flb_plg_debug(ctx->ins, "created blob uri %s", uri); } else { From 90d65ef60d621e1d0210f665e522201aff24a6a3 Mon Sep 17 00:00:00 2001 From: Tanmaya Panda Date: Wed, 7 Aug 2024 21:18:09 +0530 Subject: [PATCH 4/5] out_azure_kusto: added tracing headers Signed-off-by: Tanmaya Panda --- plugins/out_azure_kusto/azure_kusto_ingest.c | 25 ++++++++++---------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/plugins/out_azure_kusto/azure_kusto_ingest.c b/plugins/out_azure_kusto/azure_kusto_ingest.c index 13c60cfcbc2..de854fbc85c 100644 --- a/plugins/out_azure_kusto/azure_kusto_ingest.c +++ b/plugins/out_azure_kusto/azure_kusto_ingest.c @@ -270,18 +270,19 @@ static flb_sds_t create_ingestion_message(struct flb_azure_kusto *ctx, flb_sds_t if (message) { message_len = - flb_sds_snprintf(&message, 0, - "{\"Id\": \"%s\", \"BlobPath\": \"%s\", " - "\"RawDataSize\": %lu, \"DatabaseName\": " - "\"%s\", \"TableName\": \"%s\"," - "\"AdditionalProperties\": { \"format\": \"multijson\", " - "\"authorizationContext\": " - "\"%s\", \"jsonMappingReference\": \"%s\" }}%c", - uuid, blob_uri, payload_size, ctx->database_name, - ctx->table_name, ctx->resources->identity_token, - ctx->ingestion_mapping_reference == NULL - ? "" - : ctx->ingestion_mapping_reference, 0); + flb_sds_snprintf(&message, 0, + "{\"Id\": \"%s\", \"BlobPath\": \"%s\", " + "\"RawDataSize\": %lu, \"DatabaseName\": " + "\"%s\", \"TableName\": \"%s\", " + "\"ClientVersionForTracing\": \"Kusto.Fluent-Bit:%s\", " + "\"ApplicationForTracing\": \"%s\", " + "\"AdditionalProperties\": { \"format\": \"multijson\", " + "\"authorizationContext\": \"%s\", " + "\"jsonMappingReference\": \"%s\" }}%c", + uuid, blob_uri, payload_size, ctx->database_name, + ctx->table_name, FLB_VERSION_STR, "Kusto.Fluent-Bit", + ctx->resources->identity_token, + ctx->ingestion_mapping_reference == NULL ? "" : ctx->ingestion_mapping_reference, 0); if (message_len != -1) { flb_plg_debug(ctx->ins, "created ingestion message:\n%s", message); From bc0dc2536041027ddf4800b4cb25af0de8d6433c Mon Sep 17 00:00:00 2001 From: Tanmaya Panda Date: Fri, 4 Oct 2024 16:54:28 +0530 Subject: [PATCH 5/5] out_azure_kusto: fixed review comments Signed-off-by: Tanmaya Panda --- plugins/out_azure_kusto/azure_kusto.c | 32 ++++++++++------------ plugins/out_azure_kusto/azure_kusto_conf.c | 12 +++++--- 2 files changed, 23 insertions(+), 21 deletions(-) diff --git a/plugins/out_azure_kusto/azure_kusto.c b/plugins/out_azure_kusto/azure_kusto.c index 65e2bd2c219..4b79d80c4bb 100644 --- a/plugins/out_azure_kusto/azure_kusto.c +++ b/plugins/out_azure_kusto/azure_kusto.c @@ -164,8 +164,8 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token)); flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR)); - flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16); - flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16); + flb_http_add_header(c, "x-ms-app", 8, "Fluent-Bit", 10); + flb_http_add_header(c, "x-ms-user", 9, "Fluent-Bit", 10); flb_http_buffer_size(c, FLB_HTTP_DATA_SIZE_MAX * 10); /* Send HTTP request */ @@ -226,8 +226,6 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi int io_flags = FLB_IO_TLS; struct flb_azure_kusto *ctx; - flb_plg_debug(ins, "inside azure kusto init"); - /* Create config context */ ctx = flb_azure_kusto_conf_create(ins, config); if (!ctx) { @@ -400,7 +398,7 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, /* Load or refresh ingestion resources */ ret = azure_kusto_load_ingestion_resources(ctx, config); - flb_plg_trace(ctx->ins, "after flushing bytes xxxx %d", ret); + flb_plg_trace(ctx->ins, "load_ingestion_resources: ret=%d", ret); if (ret != 0) { flb_plg_error(ctx->ins, "cannot load ingestion resources"); FLB_OUTPUT_RETURN(FLB_RETRY); @@ -409,13 +407,12 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, /* Reformat msgpack to JSON payload */ ret = azure_kusto_format(ctx, event_chunk->tag, tag_len, event_chunk->data, event_chunk->size, (void **)&json, &json_size); - flb_plg_trace(ctx->ins, "after kusto format xxxx %d", ret); + flb_plg_trace(ctx->ins, "format: ret=%d", ret); if (ret != 0) { flb_plg_error(ctx->ins, "cannot reformat data into json"); FLB_OUTPUT_RETURN(FLB_RETRY); } - flb_plg_trace(ctx->ins, "payload size before compression %d", json_size); /* Map buffer */ final_payload = json; final_payload_size = json_size; @@ -430,11 +427,10 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, } else { is_compressed = FLB_TRUE; - flb_plg_debug(ctx->ins, "enabled payload gzip compression"); /* JSON buffer will be cleared at cleanup: */ } } - flb_plg_trace(ctx->ins, "payload size after compression %zu", final_payload_size); + flb_plg_trace(ctx->ins, "payload size before compression %zu & after compression %zu ", json_size ,final_payload_size); ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size); flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret); if (ret != 0) { @@ -517,16 +513,18 @@ static struct flb_config_map config_map[] = { "The key name of the time. If 'include_time_key' is false, " "This property is ignored"}, {FLB_CONFIG_MAP_TIME, "ingestion_endpoint_connect_timeout", FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT, 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, ingestion_endpoint_connect_timeout), - "Set the connection timeout of various kusto endpoints (kusto ingest endpoint, kusto ingestion blob endpoint, kusto ingestion queue endpoint) in seconds"}, + offsetof(struct flb_azure_kusto, ingestion_endpoint_connect_timeout), + "Set the connection timeout of various kusto endpoints (kusto ingest endpoint, kusto ingestion blob endpoint, kusto ingestion queue endpoint) in seconds." + "The default is 60 seconds."}, {FLB_CONFIG_MAP_BOOL, "compression_enabled", "true", 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, compression_enabled), "Enable HTTP payload compression (gzip)." - }, + offsetof(struct flb_azure_kusto, compression_enabled), + "Enable HTTP payload compression (gzip)." + "The default is true."}, {FLB_CONFIG_MAP_TIME, "ingestion_resources_refresh_interval", FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC,0, FLB_TRUE, - offsetof(struct flb_azure_kusto, ingestion_resources_refresh_interval), - "Set the azure kusto ingestion resources refresh interval" - }, - /* EOF */ + offsetof(struct flb_azure_kusto, ingestion_resources_refresh_interval), + "Set the azure kusto ingestion resources refresh interval" + "The default is 3600 seconds."}, + /* EOF */ {0}}; struct flb_output_plugin out_azure_kusto_plugin = { diff --git a/plugins/out_azure_kusto/azure_kusto_conf.c b/plugins/out_azure_kusto/azure_kusto_conf.c index 052097101c9..30c38ab3492 100644 --- a/plugins/out_azure_kusto/azure_kusto_conf.c +++ b/plugins/out_azure_kusto/azure_kusto_conf.c @@ -161,7 +161,6 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi jsmn_parser parser; jsmntok_t *t; jsmntok_t *tokens = NULL; - //int tok_size = 100; int ret = -1; int i; int blob_count = 0; @@ -199,11 +198,16 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi } jsmn_init(&parser); - //tokens = flb_calloc(1, sizeof(jsmntok_t) * tok_size); /* Dynamically allocate memory for tokens based on response length */ tokens = flb_calloc(1, sizeof(jsmntok_t) * (flb_sds_len(response))); + if (!tokens) { + flb_errno(); /* Log the error using flb_errno() */ + flb_plg_error(ctx->ins, "failed to allocate memory for tokens"); + return -1; + } + if (tokens) { ret = jsmn_parse(&parser, response, flb_sds_len(response), tokens, flb_sds_len(response)); @@ -427,12 +431,12 @@ static flb_sds_t parse_ingestion_identity_token(struct flb_azure_kusto *ctx, * in kusto DM for .get ingestion resources upon expiry * */ int azure_kusto_generate_random_integer() { - // Seed the random number generator + /* Seed the random number generator */ int pid = getpid(); unsigned long address = (unsigned long)&address; unsigned int seed = pid ^ (address & 0xFFFFFFFF) * time(0); srand(seed); - // Generate a random integer in the range [-600, 600] + /* Generate a random integer in the range [-600, 600] */ int random_integer = rand() % 1201 - 600; return random_integer; }