diff --git a/plugins/out_azure_kusto/azure_kusto.c b/plugins/out_azure_kusto/azure_kusto.c index dfa083bc10f..52ba724f19a 100644 --- a/plugins/out_azure_kusto/azure_kusto.c +++ b/plugins/out_azure_kusto/azure_kusto.c @@ -358,7 +358,7 @@ static int azure_kusto_format(struct flb_azure_kusto *ctx, const char *tag, int msgpack_pack_object(&mp_pck, *log_event.body); } - /* Convert from msgpack to JSON */ + /* Convert from msgpack to JSON - This is the record to be written*/ out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); /* Cleanup */ @@ -386,10 +386,14 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, size_t json_size; size_t tag_len; struct flb_azure_kusto *ctx = out_context; + int is_compressed = FLB_FALSE; (void)i_ins; (void)config; + void *final_payload = NULL; + size_t final_payload_size = 0; + flb_plg_trace(ctx->ins, "flushing bytes %zu", event_chunk->size); tag_len = flb_sds_len(event_chunk->tag); @@ -411,7 +415,27 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, FLB_OUTPUT_RETURN(FLB_RETRY); } - ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, json, json_size); + flb_plg_trace(ctx->ins, "payload size before compression %d", json_size); + /* Map buffer */ + final_payload = json; + final_payload_size = json_size; + if (ctx->compression_enabled == FLB_TRUE) { + ret = flb_gzip_compress((void *) json, json_size, + &final_payload, &final_payload_size); + if (ret != 0) { + flb_plg_error(ctx->ins, + "cannot gzip payload"); + flb_sds_destroy(json); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + else { + is_compressed = FLB_TRUE; + flb_plg_debug(ctx->ins, "enabled payload gzip compression"); + /* JSON buffer will be cleared at cleanup: */ + } + } + flb_plg_trace(ctx->ins, "payload size after compression %d", final_payload_size); + ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size); flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret); if (ret != 0) { flb_plg_error(ctx->ins, "cannot perform queued ingestion"); @@ -422,6 +446,10 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, /* Cleanup */ flb_sds_destroy(json); + /* release compressed payload */ + if (is_compressed == FLB_TRUE) { + flb_free(final_payload); + } /* Done */ FLB_OUTPUT_RETURN(FLB_OK); } @@ -491,7 +519,11 @@ static struct flb_config_map config_map[] = { {FLB_CONFIG_MAP_TIME, "ingestion_endpoint_connect_timeout", FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT, 0, FLB_TRUE, offsetof(struct flb_azure_kusto, ingestion_endpoint_connect_timeout), "Set the ingestion endpoint connection timeout in seconds"}, + {FLB_CONFIG_MAP_BOOL, "compression_enabled", "true", 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, compression_enabled), "Enable HTTP payload compression (gzip)." + }, /* EOF */ + {0}}; struct flb_output_plugin out_azure_kusto_plugin = { diff --git a/plugins/out_azure_kusto/azure_kusto.h b/plugins/out_azure_kusto/azure_kusto.h index c97e21f863f..9136f25294a 100644 --- a/plugins/out_azure_kusto/azure_kusto.h +++ b/plugins/out_azure_kusto/azure_kusto.h @@ -75,6 +75,9 @@ struct flb_azure_kusto { int ingestion_endpoint_connect_timeout; + /* compress payload */ + int compression_enabled; + /* records configuration */ flb_sds_t log_key; int include_tag_key; diff --git a/plugins/out_azure_kusto/azure_kusto_ingest.c b/plugins/out_azure_kusto/azure_kusto_ingest.c index 05e3cefa4c1..ada9007636f 100644 --- a/plugins/out_azure_kusto/azure_kusto_ingest.c +++ b/plugins/out_azure_kusto/azure_kusto_ingest.c @@ -92,6 +92,14 @@ static flb_sds_t azure_kusto_create_blob_uri(struct flb_azure_kusto *ctx, size_t blob_uri_size; char *blob_sas; size_t blob_sas_size; + const char *extension; + + if (ctx->compression_enabled) { + extension = ".multijson.gz"; + } + else { + extension = ".multijson"; + } ret = flb_hash_table_get(u_node->ht, AZURE_KUSTO_RESOURCE_UPSTREAM_URI, 3, (void **)&blob_uri, &blob_uri_size); @@ -109,11 +117,11 @@ static flb_sds_t azure_kusto_create_blob_uri(struct flb_azure_kusto *ctx, /* uri will be https:////.multijson? */ uri = flb_sds_create_size(flb_sds_len(u_node->host) + blob_uri_size + blob_sas_size + - flb_sds_len(blob_id) + 21); + flb_sds_len(blob_id) + 11 + strlen(extension)); if (uri) { - flb_sds_snprintf(&uri, flb_sds_alloc(uri), "https://%s%s/%s.multijson?%s", - u_node->host, blob_uri, blob_id, blob_sas); + flb_sds_snprintf(&uri, flb_sds_alloc(uri), "https://%s%s/%s%s?%s", + u_node->host, blob_uri, blob_id, extension ,blob_sas); flb_plg_debug(ctx->ins, "created blob uri %s", uri); } else {