diff --git a/plugins/out_azure_kusto/azure_kusto.c b/plugins/out_azure_kusto/azure_kusto.c index 4431246fcd7..3523426b69a 100644 --- a/plugins/out_azure_kusto/azure_kusto.c +++ b/plugins/out_azure_kusto/azure_kusto.c @@ -22,8 +22,9 @@ #include #include #include -#include #include +#include +#include #include "azure_kusto.h" #include "azure_kusto_conf.h" @@ -131,7 +132,6 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs flb_plg_debug(ctx->ins, "Logging attributes of flb_azure_kusto_resources:"); flb_plg_debug(ctx->ins, "blob_ha: %p", ctx->resources->blob_ha); flb_plg_debug(ctx->ins, "queue_ha: %p", ctx->resources->queue_ha); - flb_plg_debug(ctx->ins, "identity_token: %s", ctx->resources->identity_token); flb_plg_debug(ctx->ins, "load_time: %lu", ctx->resources->load_time); ctx->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout ; @@ -163,7 +163,7 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs flb_http_add_header(c, "Accept", 6, "application/json", 16); flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token)); - flb_http_add_header(c, "x-ms-client-version", 19, "Kusto.Fluent-Bit:1.0.0", 22); + flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR)); flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16); flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16); flb_http_buffer_size(c, FLB_HTTP_DATA_SIZE_MAX * 10); @@ -434,7 +434,7 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, /* JSON buffer will be cleared at cleanup: */ } } - flb_plg_trace(ctx->ins, "payload size after compression %d", final_payload_size); + flb_plg_trace(ctx->ins, "payload size after compression %zu", final_payload_size); ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size); flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret); if (ret != 0) { @@ -518,7 +518,7 @@ static struct flb_config_map config_map[] = { "This property is ignored"}, {FLB_CONFIG_MAP_TIME, "ingestion_endpoint_connect_timeout", FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT, 0, FLB_TRUE, offsetof(struct flb_azure_kusto, ingestion_endpoint_connect_timeout), - "Set the ingestion endpoint connection timeout in seconds"}, + "Set the connection timeout of various kusto endpoints (kusto ingest endpoint, kusto ingestion blob endpoint, kusto ingestion queue endpoint) in seconds"}, {FLB_CONFIG_MAP_BOOL, "compression_enabled", "true", 0, FLB_TRUE, offsetof(struct flb_azure_kusto, compression_enabled), "Enable HTTP payload compression (gzip)." }, diff --git a/plugins/out_azure_kusto/azure_kusto_ingest.c b/plugins/out_azure_kusto/azure_kusto_ingest.c index ada9007636f..13c60cfcbc2 100644 --- a/plugins/out_azure_kusto/azure_kusto_ingest.c +++ b/plugins/out_azure_kusto/azure_kusto_ingest.c @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -165,16 +166,16 @@ static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t if (u_conn) { if (pthread_mutex_lock(&ctx->blob_mutex)) { - flb_plg_error(ctx->ins, "error unlocking mutex"); - return -1; + flb_plg_error(ctx->ins, "error locking blob mutex"); + return NULL; } flb_plg_debug(ctx->ins,"inside blob before create blob uri"); uri = azure_kusto_create_blob_uri(ctx, u_node, blob_id); if (pthread_mutex_unlock(&ctx->blob_mutex)) { - flb_plg_error(ctx->ins, "error unlocking mutex"); - return -1; + flb_plg_error(ctx->ins, "error unlocking blob mutex"); + return NULL; } if (uri) { @@ -188,7 +189,7 @@ static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t flb_http_add_header(c, "x-ms-blob-type", 14, "BlockBlob", 9); flb_http_add_header(c, "x-ms-date", 9, tmp, len); flb_http_add_header(c, "x-ms-version", 12, "2019-12-12", 10); - flb_http_add_header(c, "x-ms-client-version", 19, "Kusto.Fluent-Bit:1.0.0", 22); + flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR)); flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16); flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16); @@ -253,8 +254,8 @@ static flb_sds_t create_ingestion_message(struct flb_azure_kusto *ctx, flb_sds_t if (pthread_mutex_lock(&ctx->blob_mutex)) { - flb_plg_error(ctx->ins, "error unlocking mutex"); - return -1; + flb_plg_error(ctx->ins, "error locking blob mutex"); + return NULL; } uuid = generate_uuid(); @@ -266,7 +267,6 @@ static flb_sds_t create_ingestion_message(struct flb_azure_kusto *ctx, flb_sds_t flb_plg_debug(ctx->ins,"payload size :: %lu",payload_size); flb_plg_debug(ctx->ins,"database_name :: %s",ctx->database_name); flb_plg_debug(ctx->ins,"table name :: %s",ctx->table_name); - flb_plg_debug(ctx->ins,"identity token :: %s", ctx->resources->identity_token); if (message) { message_len = @@ -325,8 +325,8 @@ static flb_sds_t create_ingestion_message(struct flb_azure_kusto *ctx, flb_sds_t if (pthread_mutex_unlock(&ctx->blob_mutex)) { - flb_plg_error(ctx->ins, "error unlocking mutex"); - return -1; + flb_plg_error(ctx->ins, "error unlocking blob mutex"); + return NULL; } return message; @@ -425,7 +425,7 @@ static int azure_kusto_enqueue_ingestion(struct flb_azure_kusto *ctx, flb_sds_t 20); flb_http_add_header(c, "x-ms-date", 9, tmp, len); flb_http_add_header(c, "x-ms-version", 12, "2019-12-12", 10); - flb_http_add_header(c, "x-ms-client-version", 19, "Kusto.Fluent-Bit:1.0.0", 22); + flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR)); flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16); flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16); @@ -531,7 +531,7 @@ int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, if (pthread_mutex_lock(&ctx->blob_mutex)) { flb_plg_error(ctx->ins, "error unlocking mutex"); - return NULL; + return -1; } /* flb________ */ @@ -540,7 +540,7 @@ int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, if (pthread_mutex_unlock(&ctx->blob_mutex)) { flb_plg_error(ctx->ins, "error unlocking mutex"); - return NULL; + return -1; } if (blob_id) {