Skip to content

Commit

Permalink
added blob_uri and queue_uri mutex
Browse files Browse the repository at this point in the history
Signed-off-by: Tanmaya Panda <[email protected]>
  • Loading branch information
tanmaya-panda1 committed Jan 28, 2024
1 parent ff84a1b commit 2671c89
Showing 1 changed file with 20 additions and 2 deletions.
22 changes: 20 additions & 2 deletions plugins/out_azure_kusto/azure_kusto_ingest.c
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,19 @@ static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t
}

if (u_conn) {
if (pthread_mutex_lock(&ctx->blob_mutex)) {
flb_plg_error(ctx->ins, "error unlocking mutex");
return -1;
}

flb_plg_debug(ctx->ins,"inside blob before create blob uri");
uri = azure_kusto_create_blob_uri(ctx, u_node, blob_id);

if (pthread_mutex_unlock(&ctx->blob_mutex)) {
flb_plg_error(ctx->ins, "error unlocking mutex");
return -1;
}

if (uri) {
flb_plg_debug(ctx->ins, "uploading payload to blob uri: %s", uri);
c = flb_http_client(u_conn, FLB_HTTP_PUT, uri, payload, payload_size, NULL, 0,
Expand Down Expand Up @@ -379,16 +389,24 @@ static int azure_kusto_enqueue_ingestion(struct flb_azure_kusto *ctx, flb_sds_t
flb_plg_error(ctx->ins, "error getting queue upstream");
return -1;
}


u_node->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout;
u_node->u->base.net.keepalive_max_recycle = ctx->keep_alive_max_connection_recycle;

u_conn = flb_upstream_conn_get(u_node->u);

if (u_conn) {
if (pthread_mutex_lock(&ctx->blob_mutex)) {
flb_plg_error(ctx->ins, "error unlocking mutex");
return -1;
}
uri = azure_kusto_create_queue_uri(ctx, u_node);

if (pthread_mutex_unlock(&ctx->blob_mutex)) {
flb_plg_error(ctx->ins, "error unlocking mutex");
return -1;
}

if (uri) {
payload = create_ingestion_message(ctx, blob_uri, payload_size);

Expand Down

0 comments on commit 2671c89

Please sign in to comment.