Skip to content

Commit

Permalink
* Added compression condition for multiline.gz
Browse files Browse the repository at this point in the history
* Added a new compress flag

* Compress by default

* Free changes

* Changes to GZIP

Signed-off-by: Tanmaya Panda <[email protected]>
  • Loading branch information
ag-ramachandran authored and tanmaya-panda1 committed Feb 20, 2024
1 parent 8ad934e commit 1230105
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 5 deletions.
36 changes: 34 additions & 2 deletions plugins/out_azure_kusto/azure_kusto.c
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ static int azure_kusto_format(struct flb_azure_kusto *ctx, const char *tag, int
msgpack_pack_object(&mp_pck, *log_event.body);
}

/* Convert from msgpack to JSON */
/* Convert from msgpack to JSON - This is the record to be written*/
out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);

/* Cleanup */
Expand Down Expand Up @@ -386,10 +386,14 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
size_t json_size;
size_t tag_len;
struct flb_azure_kusto *ctx = out_context;
int is_compressed = FLB_FALSE;

(void)i_ins;
(void)config;

void *final_payload = NULL;
size_t final_payload_size = 0;

flb_plg_trace(ctx->ins, "flushing bytes %zu", event_chunk->size);

tag_len = flb_sds_len(event_chunk->tag);
Expand All @@ -411,7 +415,27 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
FLB_OUTPUT_RETURN(FLB_RETRY);
}

ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, json, json_size);
flb_plg_trace(ctx->ins, "payload size before compression %d", json_size);
/* Map buffer */
final_payload = json;
final_payload_size = json_size;
if (ctx->compression_enabled == FLB_TRUE) {
ret = flb_gzip_compress((void *) json, json_size,
&final_payload, &final_payload_size);
if (ret != 0) {
flb_plg_error(ctx->ins,
"cannot gzip payload");
flb_sds_destroy(json);
FLB_OUTPUT_RETURN(FLB_RETRY);
}
else {
is_compressed = FLB_TRUE;
flb_plg_debug(ctx->ins, "enabled payload gzip compression");
/* JSON buffer will be cleared at cleanup: */
}
}
flb_plg_trace(ctx->ins, "payload size after compression %d", final_payload_size);
ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size);
flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret);
if (ret != 0) {
flb_plg_error(ctx->ins, "cannot perform queued ingestion");
Expand All @@ -422,6 +446,10 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
/* Cleanup */
flb_sds_destroy(json);

/* release compressed payload */
if (is_compressed == FLB_TRUE) {
flb_free(final_payload);
}
/* Done */
FLB_OUTPUT_RETURN(FLB_OK);
}
Expand Down Expand Up @@ -491,7 +519,11 @@ static struct flb_config_map config_map[] = {
{FLB_CONFIG_MAP_TIME, "ingestion_endpoint_connect_timeout", FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT, 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, ingestion_endpoint_connect_timeout),
"Set the ingestion endpoint connection timeout in seconds"},
{FLB_CONFIG_MAP_BOOL, "compression_enabled", "true", 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, compression_enabled), "Enable HTTP payload compression (gzip)."
},
/* EOF */

{0}};

struct flb_output_plugin out_azure_kusto_plugin = {
Expand Down
3 changes: 3 additions & 0 deletions plugins/out_azure_kusto/azure_kusto.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ struct flb_azure_kusto {

int ingestion_endpoint_connect_timeout;

/* compress payload */
int compression_enabled;

/* records configuration */
flb_sds_t log_key;
int include_tag_key;
Expand Down
14 changes: 11 additions & 3 deletions plugins/out_azure_kusto/azure_kusto_ingest.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ static flb_sds_t azure_kusto_create_blob_uri(struct flb_azure_kusto *ctx,
size_t blob_uri_size;
char *blob_sas;
size_t blob_sas_size;
const char *extension;

if (ctx->compression_enabled) {
extension = ".multijson.gz";
}
else {
extension = ".multijson";
}

ret = flb_hash_table_get(u_node->ht, AZURE_KUSTO_RESOURCE_UPSTREAM_URI, 3,
(void **)&blob_uri, &blob_uri_size);
Expand All @@ -109,11 +117,11 @@ static flb_sds_t azure_kusto_create_blob_uri(struct flb_azure_kusto *ctx,

/* uri will be https://<blob_host>/<container_uri>/<blob_id>.multijson?<sas_token> */
uri = flb_sds_create_size(flb_sds_len(u_node->host) + blob_uri_size + blob_sas_size +
flb_sds_len(blob_id) + 21);
flb_sds_len(blob_id) + 11 + strlen(extension));

if (uri) {
flb_sds_snprintf(&uri, flb_sds_alloc(uri), "https://%s%s/%s.multijson?%s",
u_node->host, blob_uri, blob_id, blob_sas);
flb_sds_snprintf(&uri, flb_sds_alloc(uri), "https://%s%s/%s%s?%s",
u_node->host, blob_uri, blob_id, extension ,blob_sas);
flb_plg_debug(ctx->ins, "created blob uri %s", uri);
}
else {
Expand Down

0 comments on commit 1230105

Please sign in to comment.