Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/gzip compress #8504

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 63 additions & 2 deletions plugins/out_azure_kusto/azure_kusto.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,22 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs
struct flb_http_client *c;
flb_sds_t resp = NULL;

flb_plg_debug(ctx->ins, "before getting upstream connection");

flb_plg_debug(ctx->ins, "Logging attributes of flb_azure_kusto_resources:");
flb_plg_debug(ctx->ins, "blob_ha: %p", ctx->resources->blob_ha);
flb_plg_debug(ctx->ins, "queue_ha: %p", ctx->resources->queue_ha);
flb_plg_debug(ctx->ins, "identity_token: %s", ctx->resources->identity_token);
flb_plg_debug(ctx->ins, "load_time: %lu", ctx->resources->load_time);

ctx->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout ;

/* Get upstream connection */
u_conn = flb_upstream_conn_get(ctx->u);

if (u_conn) {
token = get_azure_kusto_token(ctx);
flb_plg_debug(ctx->ins, "after get azure kusto token");

if (token) {
/* Compose request body */
Expand All @@ -152,6 +163,9 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs
flb_http_add_header(c, "Accept", 6, "application/json", 16);
flb_http_add_header(c, "Authorization", 13, token,
flb_sds_len(token));
flb_http_add_header(c, "x-ms-client-version", 19, "Kusto.Fluent-Bit:1.0.0", 22);
flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16);
flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16);
flb_http_buffer_size(c, FLB_HTTP_DATA_SIZE_MAX * 10);

/* Send HTTP request */
Expand Down Expand Up @@ -212,6 +226,8 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
int io_flags = FLB_IO_TLS;
struct flb_azure_kusto *ctx;

flb_plg_debug(ins, "inside azure kusto init");

/* Create config context */
ctx = flb_azure_kusto_conf_create(ins, config);
if (!ctx) {
Expand All @@ -231,6 +247,7 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
*/
pthread_mutex_init(&ctx->token_mutex, NULL);
pthread_mutex_init(&ctx->resources_mutex, NULL);
pthread_mutex_init(&ctx->blob_mutex, NULL);

/*
* Create upstream context for Kusto Ingestion endpoint
Expand All @@ -250,6 +267,8 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
}
flb_output_upstream_set(ctx->u, ins);

flb_plg_debug(ctx->ins, "azure kusto init completed");

return 0;
}

Expand Down Expand Up @@ -339,7 +358,7 @@ static int azure_kusto_format(struct flb_azure_kusto *ctx, const char *tag, int
msgpack_pack_object(&mp_pck, *log_event.body);
}

/* Convert from msgpack to JSON */
/* Convert from msgpack to JSON - This is the record to be written*/
out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);

/* Cleanup */
Expand Down Expand Up @@ -367,16 +386,21 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
size_t json_size;
size_t tag_len;
struct flb_azure_kusto *ctx = out_context;
int is_compressed = FLB_FALSE;

(void)i_ins;
(void)config;

void *final_payload = NULL;
size_t final_payload_size = 0;

flb_plg_trace(ctx->ins, "flushing bytes %zu", event_chunk->size);

tag_len = flb_sds_len(event_chunk->tag);

/* Load or refresh ingestion resources */
ret = azure_kusto_load_ingestion_resources(ctx, config);
flb_plg_trace(ctx->ins, "after flushing bytes xxxx %d", ret);
if (ret != 0) {
flb_plg_error(ctx->ins, "cannot load ingestion resources");
FLB_OUTPUT_RETURN(FLB_RETRY);
Expand All @@ -385,12 +409,34 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
/* Reformat msgpack to JSON payload */
ret = azure_kusto_format(ctx, event_chunk->tag, tag_len, event_chunk->data,
event_chunk->size, (void **)&json, &json_size);
flb_plg_trace(ctx->ins, "after kusto format xxxx %d", ret);
if (ret != 0) {
flb_plg_error(ctx->ins, "cannot reformat data into json");
FLB_OUTPUT_RETURN(FLB_RETRY);
}

ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, json, json_size);
flb_plg_trace(ctx->ins, "payload size before compression %d", json_size);
/* Map buffer */
final_payload = json;
final_payload_size = json_size;
if (ctx->compression_enabled == FLB_TRUE) {
ret = flb_gzip_compress((void *) json, json_size,
&final_payload, &final_payload_size);
if (ret != 0) {
flb_plg_error(ctx->ins,
"cannot gzip payload");
flb_sds_destroy(json);
FLB_OUTPUT_RETURN(FLB_RETRY);
}
else {
is_compressed = FLB_TRUE;
flb_plg_debug(ctx->ins, "enabled payload gzip compression");
/* JSON buffer will be cleared at cleanup: */
}
}
flb_plg_trace(ctx->ins, "payload size after compression %d", final_payload_size);
ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size);
flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret);
if (ret != 0) {
flb_plg_error(ctx->ins, "cannot perform queued ingestion");
flb_sds_destroy(json);
Expand All @@ -400,6 +446,10 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
/* Cleanup */
flb_sds_destroy(json);

/* release compressed payload */
if (is_compressed == FLB_TRUE) {
flb_free(final_payload);
}
/* Done */
FLB_OUTPUT_RETURN(FLB_OK);
}
Expand All @@ -417,6 +467,10 @@ static int cb_azure_kusto_exit(void *data, struct flb_config *config)
ctx->u = NULL;
}

pthread_mutex_destroy(&ctx->resources_mutex);
pthread_mutex_destroy(&ctx->token_mutex);
pthread_mutex_destroy(&ctx->blob_mutex);

flb_azure_kusto_conf_destroy(ctx);

return 0;
Expand Down Expand Up @@ -462,7 +516,14 @@ static struct flb_config_map config_map[] = {
offsetof(struct flb_azure_kusto, time_key),
"The key name of the time. If 'include_time_key' is false, "
"This property is ignored"},
{FLB_CONFIG_MAP_TIME, "ingestion_endpoint_connect_timeout", FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT, 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, ingestion_endpoint_connect_timeout),
"Set the ingestion endpoint connection timeout in seconds"},
{FLB_CONFIG_MAP_BOOL, "compression_enabled", "true", 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, compression_enabled), "Enable HTTP payload compression (gzip)."
},
/* EOF */

{0}};

struct flb_output_plugin out_azure_kusto_plugin = {
Expand Down
12 changes: 11 additions & 1 deletion plugins/out_azure_kusto/azure_kusto.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@
#define AZURE_KUSTO_RESOURCE_UPSTREAM_URI "uri"
#define AZURE_KUSTO_RESOURCE_UPSTREAM_SAS "sas"

#define FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC 3600
#define FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC 7200

#define FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT "60"


struct flb_azure_kusto_resources {
struct flb_upstream_ha *blob_ha;
Expand All @@ -70,6 +73,11 @@ struct flb_azure_kusto {
flb_sds_t table_name;
flb_sds_t ingestion_mapping_reference;

int ingestion_endpoint_connect_timeout;

/* compress payload */
int compression_enabled;

/* records configuration */
flb_sds_t log_key;
int include_tag_key;
Expand All @@ -94,6 +102,8 @@ struct flb_azure_kusto {
/* mutex for loading reosurces */
pthread_mutex_t resources_mutex;

pthread_mutex_t blob_mutex;

/* Upstream connection to the backend server */
struct flb_upstream *u;

Expand Down
51 changes: 28 additions & 23 deletions plugins/out_azure_kusto/azure_kusto_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi
{
jsmn_parser parser;
jsmntok_t *t;
jsmntok_t *tokens;
int tok_size = 100;
jsmntok_t *tokens = NULL;
//int tok_size = 100;
int ret = -1;
int i;
int blob_count = 0;
Expand Down Expand Up @@ -199,10 +199,14 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi
}

jsmn_init(&parser);
tokens = flb_calloc(1, sizeof(jsmntok_t) * tok_size);

//tokens = flb_calloc(1, sizeof(jsmntok_t) * tok_size);

// Dynamically allocate memory for tokens based on response length
tokens = flb_calloc(1, sizeof(jsmntok_t) * (flb_sds_len(response)));

if (tokens) {
ret = jsmn_parse(&parser, response, flb_sds_len(response), tokens, tok_size);
ret = jsmn_parse(&parser, response, flb_sds_len(response), tokens, flb_sds_len(response));

if (ret > 0) {
/* skip all tokens until we reach "Rows" */
Expand Down Expand Up @@ -416,6 +420,8 @@ static flb_sds_t parse_ingestion_identity_token(struct flb_azure_kusto *ctx,
return identity_token;
}



int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
struct flb_config *config)
{
Expand All @@ -426,11 +432,6 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
struct flb_upstream_ha *queue_ha = NULL;
time_t now;

if (pthread_mutex_lock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error locking mutex");
return -1;
}

now = time(NULL);

/* check if we have all resources and they are not stale */
Expand All @@ -451,9 +452,19 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
blob_ha = flb_upstream_ha_create("azure_kusto_blob_ha");

if (blob_ha) {

if (pthread_mutex_lock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error locking mutex");
return -1;
}
ret =
parse_storage_resources(ctx, config, response, blob_ha, queue_ha);

if (pthread_mutex_unlock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error unlocking mutex");
return -1;
}

if (ret == 0) {
flb_sds_destroy(response);
response = NULL;
Expand All @@ -462,31 +473,30 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
execute_ingest_csl_command(ctx, ".get kusto identity token");

if (response) {
if (pthread_mutex_lock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error locking mutex");
return -1;
}
identity_token =
parse_ingestion_identity_token(ctx, response);

if (identity_token) {
ret = flb_azure_kusto_resources_clear(ctx->resources);

if (ret != -1) {
ctx->resources->blob_ha = blob_ha;
ctx->resources->queue_ha = queue_ha;
ctx->resources->identity_token = identity_token;
ctx->resources->load_time = now;

ret = 0;
}
else {
flb_plg_error(
ctx->ins,
"error destroying previous ingestion resources");
}
}
else {
flb_plg_error(ctx->ins,
"error parsing ingestion identity token");
ret = -1;
}
if (pthread_mutex_unlock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error unlocking mutex");
return -1;
}
}
else {
flb_plg_error(ctx->ins, "error getting kusto identity token");
Expand Down Expand Up @@ -525,11 +535,6 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
}
}

if (pthread_mutex_unlock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error unlocking mutex");
return -1;
}

return ret;
}

Expand Down
Loading