diff --git a/plugins/out_azure_kusto/azure_kusto.c b/plugins/out_azure_kusto/azure_kusto.c index 65e2bd2c219..4b79d80c4bb 100644 --- a/plugins/out_azure_kusto/azure_kusto.c +++ b/plugins/out_azure_kusto/azure_kusto.c @@ -164,8 +164,8 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token)); flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR)); - flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16); - flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16); + flb_http_add_header(c, "x-ms-app", 8, "Fluent-Bit", 10); + flb_http_add_header(c, "x-ms-user", 9, "Fluent-Bit", 10); flb_http_buffer_size(c, FLB_HTTP_DATA_SIZE_MAX * 10); /* Send HTTP request */ @@ -226,8 +226,6 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi int io_flags = FLB_IO_TLS; struct flb_azure_kusto *ctx; - flb_plg_debug(ins, "inside azure kusto init"); - /* Create config context */ ctx = flb_azure_kusto_conf_create(ins, config); if (!ctx) { @@ -400,7 +398,7 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, /* Load or refresh ingestion resources */ ret = azure_kusto_load_ingestion_resources(ctx, config); - flb_plg_trace(ctx->ins, "after flushing bytes xxxx %d", ret); + flb_plg_trace(ctx->ins, "load_ingestion_resources: ret=%d", ret); if (ret != 0) { flb_plg_error(ctx->ins, "cannot load ingestion resources"); FLB_OUTPUT_RETURN(FLB_RETRY); @@ -409,13 +407,12 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, /* Reformat msgpack to JSON payload */ ret = azure_kusto_format(ctx, event_chunk->tag, tag_len, event_chunk->data, event_chunk->size, (void **)&json, &json_size); - flb_plg_trace(ctx->ins, "after kusto format xxxx %d", ret); + flb_plg_trace(ctx->ins, "format: ret=%d", ret); if (ret != 0) { flb_plg_error(ctx->ins, "cannot reformat data into json"); FLB_OUTPUT_RETURN(FLB_RETRY); } - flb_plg_trace(ctx->ins, "payload size before compression %d", json_size); /* Map buffer */ final_payload = json; final_payload_size = json_size; @@ -430,11 +427,10 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, } else { is_compressed = FLB_TRUE; - flb_plg_debug(ctx->ins, "enabled payload gzip compression"); /* JSON buffer will be cleared at cleanup: */ } } - flb_plg_trace(ctx->ins, "payload size after compression %zu", final_payload_size); + flb_plg_trace(ctx->ins, "payload size before compression %zu & after compression %zu ", json_size ,final_payload_size); ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size); flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret); if (ret != 0) { @@ -517,16 +513,18 @@ static struct flb_config_map config_map[] = { "The key name of the time. If 'include_time_key' is false, " "This property is ignored"}, {FLB_CONFIG_MAP_TIME, "ingestion_endpoint_connect_timeout", FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT, 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, ingestion_endpoint_connect_timeout), - "Set the connection timeout of various kusto endpoints (kusto ingest endpoint, kusto ingestion blob endpoint, kusto ingestion queue endpoint) in seconds"}, + offsetof(struct flb_azure_kusto, ingestion_endpoint_connect_timeout), + "Set the connection timeout of various kusto endpoints (kusto ingest endpoint, kusto ingestion blob endpoint, kusto ingestion queue endpoint) in seconds." + "The default is 60 seconds."}, {FLB_CONFIG_MAP_BOOL, "compression_enabled", "true", 0, FLB_TRUE, - offsetof(struct flb_azure_kusto, compression_enabled), "Enable HTTP payload compression (gzip)." - }, + offsetof(struct flb_azure_kusto, compression_enabled), + "Enable HTTP payload compression (gzip)." + "The default is true."}, {FLB_CONFIG_MAP_TIME, "ingestion_resources_refresh_interval", FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC,0, FLB_TRUE, - offsetof(struct flb_azure_kusto, ingestion_resources_refresh_interval), - "Set the azure kusto ingestion resources refresh interval" - }, - /* EOF */ + offsetof(struct flb_azure_kusto, ingestion_resources_refresh_interval), + "Set the azure kusto ingestion resources refresh interval" + "The default is 3600 seconds."}, + /* EOF */ {0}}; struct flb_output_plugin out_azure_kusto_plugin = { diff --git a/plugins/out_azure_kusto/azure_kusto_conf.c b/plugins/out_azure_kusto/azure_kusto_conf.c index 052097101c9..30c38ab3492 100644 --- a/plugins/out_azure_kusto/azure_kusto_conf.c +++ b/plugins/out_azure_kusto/azure_kusto_conf.c @@ -161,7 +161,6 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi jsmn_parser parser; jsmntok_t *t; jsmntok_t *tokens = NULL; - //int tok_size = 100; int ret = -1; int i; int blob_count = 0; @@ -199,11 +198,16 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi } jsmn_init(&parser); - //tokens = flb_calloc(1, sizeof(jsmntok_t) * tok_size); /* Dynamically allocate memory for tokens based on response length */ tokens = flb_calloc(1, sizeof(jsmntok_t) * (flb_sds_len(response))); + if (!tokens) { + flb_errno(); /* Log the error using flb_errno() */ + flb_plg_error(ctx->ins, "failed to allocate memory for tokens"); + return -1; + } + if (tokens) { ret = jsmn_parse(&parser, response, flb_sds_len(response), tokens, flb_sds_len(response)); @@ -427,12 +431,12 @@ static flb_sds_t parse_ingestion_identity_token(struct flb_azure_kusto *ctx, * in kusto DM for .get ingestion resources upon expiry * */ int azure_kusto_generate_random_integer() { - // Seed the random number generator + /* Seed the random number generator */ int pid = getpid(); unsigned long address = (unsigned long)&address; unsigned int seed = pid ^ (address & 0xFFFFFFFF) * time(0); srand(seed); - // Generate a random integer in the range [-600, 600] + /* Generate a random integer in the range [-600, 600] */ int random_integer = rand() % 1201 - 600; return random_integer; }