Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[out_azure_kusto] introduced gzip compression #8505

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion plugins/out_azure_kusto/azure_kusto.c
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,14 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
size_t json_size;
size_t tag_len;
struct flb_azure_kusto *ctx = out_context;
int is_compressed = FLB_FALSE;

(void)i_ins;
(void)config;

void *final_payload = NULL;
size_t final_payload_size = 0;

flb_plg_trace(ctx->ins, "flushing bytes %zu", event_chunk->size);

tag_len = flb_sds_len(event_chunk->tag);
Expand All @@ -390,7 +394,28 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
FLB_OUTPUT_RETURN(FLB_RETRY);
}

ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, json, json_size);
flb_plg_trace(ctx->ins, "payload size before compression %d", json_size);
/* Map buffer */
final_payload = json;
final_payload_size = json_size;
if (ctx->compression_enabled == FLB_TRUE) {
ret = flb_gzip_compress((void *) json, json_size,
&final_payload, &final_payload_size);
if (ret != 0) {
flb_plg_error(ctx->ins,
"cannot gzip payload");
flb_sds_destroy(json);
FLB_OUTPUT_RETURN(FLB_RETRY);
}
else {
is_compressed = FLB_TRUE;
flb_plg_debug(ctx->ins, "enabled payload gzip compression");
/* JSON buffer will be cleared at cleanup: */
}
}
flb_plg_trace(ctx->ins, "payload size after compression %d", final_payload_size);
ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size);
flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret);
if (ret != 0) {
flb_plg_error(ctx->ins, "cannot perform queued ingestion");
flb_sds_destroy(json);
Expand All @@ -400,6 +425,10 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
/* Cleanup */
flb_sds_destroy(json);

/* release compressed payload */
if (is_compressed == FLB_TRUE) {
flb_free(final_payload);
}
/* Done */
FLB_OUTPUT_RETURN(FLB_OK);
}
Expand Down Expand Up @@ -462,6 +491,9 @@ static struct flb_config_map config_map[] = {
offsetof(struct flb_azure_kusto, time_key),
"The key name of the time. If 'include_time_key' is false, "
"This property is ignored"},
{FLB_CONFIG_MAP_BOOL, "compression_enabled", "true", 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, compression_enabled), "Enable HTTP payload compression (gzip)."
},
/* EOF */
{0}};

Expand Down
3 changes: 3 additions & 0 deletions plugins/out_azure_kusto/azure_kusto.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ struct flb_azure_kusto {
flb_sds_t table_name;
flb_sds_t ingestion_mapping_reference;

/* compress payload */
int compression_enabled;

/* records configuration */
flb_sds_t log_key;
int include_tag_key;
Expand Down
14 changes: 11 additions & 3 deletions plugins/out_azure_kusto/azure_kusto_ingest.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ static flb_sds_t azure_kusto_create_blob_uri(struct flb_azure_kusto *ctx,
size_t blob_uri_size;
char *blob_sas;
size_t blob_sas_size;
const char *extension;

if (ctx->compression_enabled) {
extension = ".multijson.gz";
}
else {
extension = ".multijson";
}

ret = flb_hash_table_get(u_node->ht, AZURE_KUSTO_RESOURCE_UPSTREAM_URI, 3,
(void **)&blob_uri, &blob_uri_size);
Expand All @@ -109,11 +117,11 @@ static flb_sds_t azure_kusto_create_blob_uri(struct flb_azure_kusto *ctx,

/* uri will be https://<blob_host>/<container_uri>/<blob_id>.multijson?<sas_token> */
uri = flb_sds_create_size(flb_sds_len(u_node->host) + blob_uri_size + blob_sas_size +
flb_sds_len(blob_id) + 21);
flb_sds_len(blob_id) + 11 + strlen(extension));

if (uri) {
flb_sds_snprintf(&uri, flb_sds_alloc(uri), "https://%s%s/%s.multijson?%s",
u_node->host, blob_uri, blob_id, blob_sas);
flb_sds_snprintf(&uri, flb_sds_alloc(uri), "https://%s%s/%s%s?%s",
u_node->host, blob_uri, blob_id, extension ,blob_sas);
flb_plg_debug(ctx->ins, "created blob uri %s", uri);
}
else {
Expand Down
Loading