From 59f22d39ad8c064844cd5bde0f444fff53560cd0 Mon Sep 17 00:00:00 2001 From: Tanmaya Panda Date: Sun, 28 Jan 2024 16:48:44 +0530 Subject: [PATCH] formatted code Signed-off-by: Tanmaya Panda --- plugins/out_azure_kusto/azure_kusto.c | 14 +---- plugins/out_azure_kusto/azure_kusto_conf.c | 42 ++++++-------- plugins/out_azure_kusto/azure_kusto_ingest.c | 60 ++++---------------- 3 files changed, 28 insertions(+), 88 deletions(-) diff --git a/plugins/out_azure_kusto/azure_kusto.c b/plugins/out_azure_kusto/azure_kusto.c index ee47e326e2d..1f3d8a75604 100644 --- a/plugins/out_azure_kusto/azure_kusto.c +++ b/plugins/out_azure_kusto/azure_kusto.c @@ -98,10 +98,6 @@ flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx) ctx->o->access_token); } - // if (output){ - // flb_sds_destroy(output); - // } - if (pthread_mutex_unlock(&ctx->token_mutex)) { flb_plg_error(ctx->ins, "error unlocking mutex"); if (output) { @@ -132,8 +128,6 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs flb_plg_debug(ctx->ins, "before getting upstream connection"); - //flb_upstream_init(); - flb_plg_debug(ctx->ins, "Logging attributes of flb_azure_kusto_resources:"); flb_plg_debug(ctx->ins, "blob_ha: %p", ctx->resources->blob_ha); flb_plg_debug(ctx->ins, "queue_ha: %p", ctx->resources->queue_ha); @@ -145,21 +139,19 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs /* Get upstream connection */ u_conn = flb_upstream_conn_get(ctx->u); - flb_plg_debug(ctx->ins, "inside execute ingest csl commnad :: after flb_upstream_conn_get"); if (!u_conn) { - FLB_OUTPUT_RETURN(FLB_RETRY); + FLB_OUTPUT_RETURN(FLB_RETRY); } if (u_conn) { token = get_azure_kusto_token(ctx); - flb_plg_debug(ctx->ins, "after get azure kusto token"); + flb_plg_debug(ctx->ins, "after get azure kusto token"); if (token) { /* Compose request body */ body = flb_sds_create_size(sizeof(FLB_AZURE_KUSTO_MGMT_BODY_TEMPLATE) - 1 + strlen(csl)); - flb_plg_debug(ctx->ins, "after flb sds create size"); if (body) { flb_sds_snprintf(&body, flb_sds_alloc(body), @@ -268,8 +260,6 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi return -1; } - // ctx->u->base.net.keepalive = FLB_FALSE; - /* Create oauth2 context */ ctx->o = flb_oauth2_create(ctx->config, ctx->oauth_url, FLB_AZURE_KUSTO_TOKEN_REFRESH); diff --git a/plugins/out_azure_kusto/azure_kusto_conf.c b/plugins/out_azure_kusto/azure_kusto_conf.c index d1f9c0a48db..c4195f2aadf 100644 --- a/plugins/out_azure_kusto/azure_kusto_conf.c +++ b/plugins/out_azure_kusto/azure_kusto_conf.c @@ -449,17 +449,17 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, if (blob_ha) { - if (pthread_mutex_lock(&ctx->resources_mutex)) { - flb_plg_error(ctx->ins, "error locking mutex"); - return -1; - } + if (pthread_mutex_lock(&ctx->resources_mutex)) { + flb_plg_error(ctx->ins, "error locking mutex"); + return -1; + } ret = parse_storage_resources(ctx, config, response, blob_ha, queue_ha); - if (pthread_mutex_unlock(&ctx->resources_mutex)) { - flb_plg_error(ctx->ins, "error unlocking mutex"); - return -1; - } + if (pthread_mutex_unlock(&ctx->resources_mutex)) { + flb_plg_error(ctx->ins, "error unlocking mutex"); + return -1; + } if (ret == 0) { flb_sds_destroy(response); @@ -468,46 +468,36 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, response = execute_ingest_csl_command(ctx, ".get kusto identity token"); - if (pthread_mutex_lock(&ctx->resources_mutex)) { - flb_plg_error(ctx->ins, "error locking mutex"); - return -1; - } if (response) { + if (pthread_mutex_lock(&ctx->resources_mutex)) { + flb_plg_error(ctx->ins, "error locking mutex"); + return -1; + } identity_token = parse_ingestion_identity_token(ctx, response); if (identity_token) { - // ret = flb_azure_kusto_resources_clear(ctx->resources); - - // if (ret != -1) { ctx->resources->blob_ha = blob_ha; ctx->resources->queue_ha = queue_ha; ctx->resources->identity_token = identity_token; ctx->resources->load_time = now; ret = 0; - //} - // else { - // flb_plg_error( - // ctx->ins, - // "error destroying previous ingestion resources"); - // } } else { flb_plg_error(ctx->ins, "error parsing ingestion identity token"); ret = -1; } + if (pthread_mutex_unlock(&ctx->resources_mutex)) { + flb_plg_error(ctx->ins, "error unlocking mutex"); + return -1; + } } else { flb_plg_error(ctx->ins, "error getting kusto identity token"); ret = -1; } - - if (pthread_mutex_unlock(&ctx->resources_mutex)) { - flb_plg_error(ctx->ins, "error unlocking mutex"); - return -1; - } } else { flb_plg_error(ctx->ins, diff --git a/plugins/out_azure_kusto/azure_kusto_ingest.c b/plugins/out_azure_kusto/azure_kusto_ingest.c index 028b5757d16..11217edc330 100644 --- a/plugins/out_azure_kusto/azure_kusto_ingest.c +++ b/plugins/out_azure_kusto/azure_kusto_ingest.c @@ -143,19 +143,6 @@ static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t gmtime_r(&now, &tm); len = strftime(tmp, sizeof(tmp) - 1, "%a, %d %b %Y %H:%M:%S GMT", &tm); - - flb_plg_debug(ctx->ins,"inside blob before locking mutex"); - - - - // if (pthread_mutex_lock(&ctx->blob_mutex)) { - // flb_plg_error(ctx->ins, "error locking mutex"); - // return -1; - // } - - - flb_plg_debug(ctx->ins,"inside blob after locking mutex"); - u_node = flb_upstream_ha_node_get(ctx->resources->blob_ha); if (!u_node) { flb_plg_error(ctx->ins, "error getting blob upstream"); @@ -163,40 +150,21 @@ static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t } - // if (pthread_mutex_unlock(&ctx->blob_mutex)) { - // flb_plg_error(ctx->ins, "error unlocking mutex"); - // return -1; - // } - - flb_plg_debug(ctx->ins,"inside blob after upstream ha node get"); u_node->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout; u_node->u->base.net.keepalive_max_recycle = ctx->keep_alive_max_connection_recycle; u_conn = flb_upstream_conn_get(u_node->u); - - //if (pthread_mutex_unlock(&ctx->blob_mutex)) { - // flb_plg_error(ctx->ins, "error unlocking mutex"); - // return -1; - //} - if (!u_conn) { - flb_plg_error(ctx->ins,"cannot create upstream connection for blob commit"); - return FLB_RETRY; + flb_plg_error(ctx->ins,"cannot create upstream connection for blob commit"); + return FLB_RETRY; } - - flb_plg_debug(ctx->ins,"inside blob after retrieving connection"); - if (u_conn) { - - flb_plg_debug(ctx->ins,"inside blob before create blob uri"); + flb_plg_debug(ctx->ins,"inside blob before create blob uri"); uri = azure_kusto_create_blob_uri(ctx, u_node, blob_id); - - flb_plg_debug(ctx->ins,"inside blob after create blob uri"); - if (uri) { flb_plg_debug(ctx->ins, "uploading payload to blob uri: %s", uri); c = flb_http_client(u_conn, FLB_HTTP_PUT, uri, payload, payload_size, NULL, 0, @@ -233,7 +201,6 @@ static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t } flb_http_client_destroy(c); - flb_upstream_conn_release(u_conn); } else { flb_plg_error(ctx->ins, @@ -255,12 +222,6 @@ static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t flb_plg_error(ctx->ins, "error getting blob container upstream connection"); } - - //if (pthread_mutex_unlock(&ctx->blob_mutex)) { - // flb_plg_error(ctx->ins, "error unlocking mutex"); - // return -1; - //} - return uri; } @@ -284,12 +245,12 @@ static flb_sds_t create_ingestion_message(struct flb_azure_kusto *ctx, flb_sds_t if (uuid) { message = flb_sds_create(NULL); - flb_plg_debug(ctx->ins,"uuid :: %s",uuid); - flb_plg_debug(ctx->ins,"blob uri :: %s",blob_uri); - flb_plg_debug(ctx->ins,"payload size :: %lu",payload_size); - flb_plg_debug(ctx->ins,"database_name :: %s",ctx->database_name); - flb_plg_debug(ctx->ins,"table name :: %s",ctx->table_name); - flb_plg_debug(ctx->ins,"identity token :: %s", ctx->resources->identity_token); + flb_plg_debug(ctx->ins,"uuid :: %s",uuid); + flb_plg_debug(ctx->ins,"blob uri :: %s",blob_uri); + flb_plg_debug(ctx->ins,"payload size :: %lu",payload_size); + flb_plg_debug(ctx->ins,"database_name :: %s",ctx->database_name); + flb_plg_debug(ctx->ins,"table name :: %s",ctx->table_name); + flb_plg_debug(ctx->ins,"identity token :: %s", ctx->resources->identity_token); if (message) { message_len = @@ -422,8 +383,7 @@ static int azure_kusto_enqueue_ingestion(struct flb_azure_kusto *ctx, flb_sds_t u_node->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout; u_node->u->base.net.keepalive_max_recycle = ctx->keep_alive_max_connection_recycle; - - + u_conn = flb_upstream_conn_get(u_node->u); if (u_conn) {