From 6243a8a5efc56e69c2eb50384016394b32e494d7 Mon Sep 17 00:00:00 2001 From: Tanmaya Panda Date: Sun, 28 Jan 2024 17:02:24 +0530 Subject: [PATCH] added blob_uri and queue_uri mutex Signed-off-by: Tanmaya Panda --- plugins/out_azure_kusto/azure_kusto_ingest.c | 22 ++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/plugins/out_azure_kusto/azure_kusto_ingest.c b/plugins/out_azure_kusto/azure_kusto_ingest.c index 11217edc330..d48170ffeff 100644 --- a/plugins/out_azure_kusto/azure_kusto_ingest.c +++ b/plugins/out_azure_kusto/azure_kusto_ingest.c @@ -162,9 +162,19 @@ static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t } if (u_conn) { + if (pthread_mutex_lock(&ctx->blob_mutex)) { + flb_plg_error(ctx->ins, "error unlocking mutex"); + return -1; + } + flb_plg_debug(ctx->ins,"inside blob before create blob uri"); uri = azure_kusto_create_blob_uri(ctx, u_node, blob_id); + if (pthread_mutex_unlock(&ctx->blob_mutex)) { + flb_plg_error(ctx->ins, "error unlocking mutex"); + return -1; + } + if (uri) { flb_plg_debug(ctx->ins, "uploading payload to blob uri: %s", uri); c = flb_http_client(u_conn, FLB_HTTP_PUT, uri, payload, payload_size, NULL, 0, @@ -379,16 +389,24 @@ static int azure_kusto_enqueue_ingestion(struct flb_azure_kusto *ctx, flb_sds_t flb_plg_error(ctx->ins, "error getting queue upstream"); return -1; } - u_node->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout; u_node->u->base.net.keepalive_max_recycle = ctx->keep_alive_max_connection_recycle; - + u_conn = flb_upstream_conn_get(u_node->u); if (u_conn) { + if (pthread_mutex_lock(&ctx->blob_mutex)) { + flb_plg_error(ctx->ins, "error unlocking mutex"); + return -1; + } uri = azure_kusto_create_queue_uri(ctx, u_node); + if (pthread_mutex_unlock(&ctx->blob_mutex)) { + flb_plg_error(ctx->ins, "error unlocking mutex"); + return -1; + } + if (uri) { payload = create_ingestion_message(ctx, blob_uri, payload_size);