Skip to content

Commit

Permalink
out_exit: add time_count (seconds) and record_count parameters for ou…
Browse files Browse the repository at this point in the history
…t_exit.

Signed-off-by: Phillip Whelan <[email protected]>
  • Loading branch information
pwhelan committed May 30, 2024
1 parent 4db79b4 commit 3f6dd33
Showing 1 changed file with 85 additions and 9 deletions.
94 changes: 85 additions & 9 deletions plugins/out_exit/exit.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,22 @@

#include <fluent-bit/flb_output_plugin.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_log_event_decoder.h>

#define FLB_EXIT_FLUSH_COUNT "1"
#define FLB_EXIT_FLUSH_COUNT "-1"
#define FLB_EXIT_RECORD_COUNT "-1"
#define FLB_EXIT_TIME_COUNT "-1"

struct flb_exit {
int is_running;
int count;
struct flb_time start_time;

/* config */
int flush_count;
int record_count;
int time_count;
struct flb_output_instance *ins;
};

static int cb_exit_init(struct flb_output_instance *ins, struct flb_config *config,
Expand All @@ -43,15 +50,28 @@ static int cb_exit_init(struct flb_output_instance *ins, struct flb_config *conf
flb_errno();
return -1;
}
ctx->count = 0;
ctx->ins = ins;
ctx->is_running = FLB_TRUE;
flb_time_get(&ctx->start_time);

ctx->flush_count = -1;
ctx->record_count = -1;
ctx->time_count = -1;

ret = flb_output_config_map_set(ins, (void *) ctx);
if (ret == -1) {
flb_free(ctx);
return -1;
}

if (ctx->flush_count == -1 &&
ctx->record_count == -1 &&
ctx->time_count == -1) {
flb_plg_error(ctx->ins, "no count set for flush, record or time, set at least one");
flb_free(ctx);
return -1;
}

flb_output_set_context(ins, ctx);

return 0;
Expand All @@ -66,11 +86,57 @@ static void cb_exit_flush(struct flb_event_chunk *event_chunk,
(void) i_ins;
(void) out_context;
struct flb_exit *ctx = out_context;

ctx->count++;
if (ctx->is_running == FLB_TRUE && ctx->count >= ctx->flush_count) {
flb_engine_exit(config);
ctx->is_running = FLB_FALSE;
struct flb_log_event_decoder log_decoder;
struct flb_log_event log_event;
struct flb_time now;
struct flb_time run;
int result;

if (ctx->is_running == FLB_TRUE) {
if (ctx->flush_count > 0) {
ctx->flush_count--;
}

if (ctx->record_count > 0 && event_chunk->type == FLB_EVENT_TYPE_LOGS) {
result = flb_log_event_decoder_init(&log_decoder,
(char *) event_chunk->data,
event_chunk->size);
if (result != FLB_EVENT_DECODER_SUCCESS) {
flb_plg_error(ctx->ins,
"Log event decoder initialization error : %d", result);

FLB_OUTPUT_RETURN(FLB_RETRY);
}

while (flb_log_event_decoder_next(&log_decoder,
&log_event) == FLB_EVENT_DECODER_SUCCESS) {
if (ctx->record_count > 0) {
ctx->record_count--;
}
}

result = flb_log_event_decoder_get_last_result(&log_decoder);
flb_log_event_decoder_destroy(&log_decoder);

if (result != FLB_EVENT_DECODER_SUCCESS) {
flb_plg_error(ctx->ins, "Log event decoder error : %d", result);
FLB_OUTPUT_RETURN(FLB_ERROR);
}

FLB_OUTPUT_RETURN(FLB_OK);
}

if (ctx->time_count > 0) {
flb_time_get(&now);
flb_time_diff(&now, &ctx->start_time, &run);
}

if (ctx->flush_count == 0 ||
ctx->record_count == 0 ||
(ctx->time_count && flb_time_to_millisec(&run) > (ctx->time_count*1000))) {
flb_engine_exit(config);
ctx->is_running = FLB_FALSE;
}
}

FLB_OUTPUT_RETURN(FLB_OK);
Expand All @@ -90,7 +156,17 @@ static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_INT, "flush_count", FLB_EXIT_FLUSH_COUNT,
0, FLB_TRUE, offsetof(struct flb_exit, flush_count),
NULL
"number of flushes before exiting"
},
{
FLB_CONFIG_MAP_INT, "record_count", FLB_EXIT_RECORD_COUNT,
0, FLB_TRUE, offsetof(struct flb_exit, record_count),
"number of records received before exiting"
},
{
FLB_CONFIG_MAP_INT, "time_count", FLB_EXIT_TIME_COUNT,
0, FLB_TRUE, offsetof(struct flb_exit, time_count),
"number of seconds before exiting (will trigger upon receiving a flush)"
},

/* EOF */
Expand Down

0 comments on commit 3f6dd33

Please sign in to comment.