Skip to content

Commit

Permalink
fixed multiple file input
Browse files Browse the repository at this point in the history
  • Loading branch information
root committed Jan 27, 2024
1 parent 8733b25 commit f9c9699
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 18 deletions.
44 changes: 44 additions & 0 deletions plugins/out_azure_kusto/azure_kusto.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx)
ctx->o->access_token);
}

// if (output){
// flb_sds_destroy(output);
// }

if (pthread_mutex_unlock(&ctx->token_mutex)) {
flb_plg_error(ctx->ins, "error unlocking mutex");
if (output) {
Expand Down Expand Up @@ -126,16 +130,36 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs
struct flb_http_client *c;
flb_sds_t resp = NULL;

flb_plg_debug(ctx->ins, "before getting upstream connection");

//flb_upstream_init();

flb_plg_debug(ctx->ins, "Logging attributes of flb_azure_kusto_resources:");
flb_plg_debug(ctx->ins, "blob_ha: %p", ctx->resources->blob_ha);
flb_plg_debug(ctx->ins, "queue_ha: %p", ctx->resources->queue_ha);
flb_plg_debug(ctx->ins, "identity_token: %s", ctx->resources->identity_token);
flb_plg_debug(ctx->ins, "load_time: %lu", ctx->resources->load_time);

ctx->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout ;
ctx->u->base.net.keepalive_max_recycle = ctx->keep_alive_max_connection_recycle;

/* Get upstream connection */
u_conn = flb_upstream_conn_get(ctx->u);
flb_plg_debug(ctx->ins, "inside execute ingest csl commnad :: after flb_upstream_conn_get");

if (!u_conn) {
FLB_OUTPUT_RETURN(FLB_RETRY);
}

if (u_conn) {
token = get_azure_kusto_token(ctx);
flb_plg_debug(ctx->ins, "after get azure kusto token");

if (token) {
/* Compose request body */
body = flb_sds_create_size(sizeof(FLB_AZURE_KUSTO_MGMT_BODY_TEMPLATE) - 1 +
strlen(csl));
flb_plg_debug(ctx->ins, "after flb sds create size");

if (body) {
flb_sds_snprintf(&body, flb_sds_alloc(body),
Expand Down Expand Up @@ -212,6 +236,8 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
int io_flags = FLB_IO_TLS;
struct flb_azure_kusto *ctx;

flb_plg_debug(ins, "inside azure kusto init");

/* Create config context */
ctx = flb_azure_kusto_conf_create(ins, config);
if (!ctx) {
Expand All @@ -231,6 +257,7 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
*/
pthread_mutex_init(&ctx->token_mutex, NULL);
pthread_mutex_init(&ctx->resources_mutex, NULL);
pthread_mutex_init(&ctx->blob_mutex, NULL);

/*
* Create upstream context for Kusto Ingestion endpoint
Expand All @@ -241,6 +268,8 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
return -1;
}

// ctx->u->base.net.keepalive = FLB_FALSE;

/* Create oauth2 context */
ctx->o =
flb_oauth2_create(ctx->config, ctx->oauth_url, FLB_AZURE_KUSTO_TOKEN_REFRESH);
Expand All @@ -250,6 +279,8 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
}
flb_output_upstream_set(ctx->u, ins);

flb_plg_debug(ctx->ins, "azure kusto init completed");

return 0;
}

Expand Down Expand Up @@ -377,6 +408,7 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,

/* Load or refresh ingestion resources */
ret = azure_kusto_load_ingestion_resources(ctx, config);
flb_plg_trace(ctx->ins, "after flushing bytes xxxx %d", ret);
if (ret != 0) {
flb_plg_error(ctx->ins, "cannot load ingestion resources");
FLB_OUTPUT_RETURN(FLB_RETRY);
Expand All @@ -385,12 +417,14 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
/* Reformat msgpack to JSON payload */
ret = azure_kusto_format(ctx, event_chunk->tag, tag_len, event_chunk->data,
event_chunk->size, (void **)&json, &json_size);
flb_plg_trace(ctx->ins, "after kusto format xxxx %d", ret);
if (ret != 0) {
flb_plg_error(ctx->ins, "cannot reformat data into json");
FLB_OUTPUT_RETURN(FLB_RETRY);
}

ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, json, json_size);
flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret);
if (ret != 0) {
flb_plg_error(ctx->ins, "cannot perform queued ingestion");
flb_sds_destroy(json);
Expand All @@ -417,6 +451,10 @@ static int cb_azure_kusto_exit(void *data, struct flb_config *config)
ctx->u = NULL;
}

pthread_mutex_destroy(&ctx->resources_mutex);
pthread_mutex_destroy(&ctx->token_mutex);
pthread_mutex_destroy(&ctx->blob_mutex);

flb_azure_kusto_conf_destroy(ctx);

return 0;
Expand Down Expand Up @@ -462,6 +500,12 @@ static struct flb_config_map config_map[] = {
offsetof(struct flb_azure_kusto, time_key),
"The key name of the time. If 'include_time_key' is false, "
"This property is ignored"},
{FLB_CONFIG_MAP_TIME, "ingestion_endpoint_connect_timeout", FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT, 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, ingestion_endpoint_connect_timeout),
"Set the ingestion endpoint connection timeout in seconds"},
{FLB_CONFIG_MAP_INT, "keep_alive_max_connection_recycle", FLB_AZURE_KUSTO_KEEP_ALIVE_MAX_RECYCLE, 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, keep_alive_max_connection_recycle),
"Set the max connection recycle value of keep alive connections"},
/* EOF */
{0}};

Expand Down
9 changes: 9 additions & 0 deletions plugins/out_azure_kusto/azure_kusto.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@

#define FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC 3600

#define FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT "60"
#define FLB_AZURE_KUSTO_KEEP_ALIVE_MAX_RECYCLE "20"


struct flb_azure_kusto_resources {
struct flb_upstream_ha *blob_ha;
struct flb_upstream_ha *queue_ha;
Expand All @@ -70,6 +74,9 @@ struct flb_azure_kusto {
flb_sds_t table_name;
flb_sds_t ingestion_mapping_reference;

int ingestion_endpoint_connect_timeout;
int keep_alive_max_connection_recycle;

/* records configuration */
flb_sds_t log_key;
int include_tag_key;
Expand All @@ -94,6 +101,8 @@ struct flb_azure_kusto {
/* mutex for loading reosurces */
pthread_mutex_t resources_mutex;

pthread_mutex_t blob_mutex;

/* Upstream connection to the backend server */
struct flb_upstream *u;

Expand Down
47 changes: 29 additions & 18 deletions plugins/out_azure_kusto/azure_kusto_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,8 @@ static flb_sds_t parse_ingestion_identity_token(struct flb_azure_kusto *ctx,
return identity_token;
}



int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
struct flb_config *config)
{
Expand All @@ -426,11 +428,6 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
struct flb_upstream_ha *queue_ha = NULL;
time_t now;

if (pthread_mutex_lock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error locking mutex");
return -1;
}

now = time(NULL);

/* check if we have all resources and they are not stale */
Expand All @@ -451,36 +448,50 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
blob_ha = flb_upstream_ha_create("azure_kusto_blob_ha");

if (blob_ha) {

if (pthread_mutex_lock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error locking mutex");
return -1;
}
ret =
parse_storage_resources(ctx, config, response, blob_ha, queue_ha);

if (pthread_mutex_unlock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error unlocking mutex");
return -1;
}

if (ret == 0) {
flb_sds_destroy(response);
response = NULL;

response =
execute_ingest_csl_command(ctx, ".get kusto identity token");

if (pthread_mutex_lock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error locking mutex");
return -1;
}
if (response) {
identity_token =
parse_ingestion_identity_token(ctx, response);

if (identity_token) {
ret = flb_azure_kusto_resources_clear(ctx->resources);
// ret = flb_azure_kusto_resources_clear(ctx->resources);

if (ret != -1) {
// if (ret != -1) {
ctx->resources->blob_ha = blob_ha;
ctx->resources->queue_ha = queue_ha;
ctx->resources->identity_token = identity_token;
ctx->resources->load_time = now;

ret = 0;
}
else {
flb_plg_error(
ctx->ins,
"error destroying previous ingestion resources");
}
//}
// else {
// flb_plg_error(
// ctx->ins,
// "error destroying previous ingestion resources");
// }
}
else {
flb_plg_error(ctx->ins,
Expand All @@ -492,6 +503,11 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
flb_plg_error(ctx->ins, "error getting kusto identity token");
ret = -1;
}

if (pthread_mutex_unlock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error unlocking mutex");
return -1;
}
}
else {
flb_plg_error(ctx->ins,
Expand Down Expand Up @@ -525,11 +541,6 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
}
}

if (pthread_mutex_unlock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error unlocking mutex");
return -1;
}

return ret;
}

Expand Down
Loading

0 comments on commit f9c9699

Please sign in to comment.