diff --git a/include/fluent-bit/flb_lib.h b/include/fluent-bit/flb_lib.h index 897ea0098ee..4aee2fee667 100644 --- a/include/fluent-bit/flb_lib.h +++ b/include/fluent-bit/flb_lib.h @@ -22,6 +22,7 @@ #include #include +#include /* Lib engine status */ #define FLB_LIB_ERROR -1 @@ -66,6 +67,16 @@ FLB_EXPORT int flb_output_set_test(flb_ctx_t *ctx, int ffd, char *test_name, void *, size_t, void *), void *out_callback_data, void *test_ctx); +FLB_EXPORT int flb_output_set_test_with_ctx_callback( + flb_ctx_t *ctx, int ffd, char *test_name, + void (*out_callback) (void *, int, int, + void *, size_t, void *), + void *out_callback_data, + void *test_ctx, + void *(*test_ctx_callback) ( + struct flb_config *, + struct flb_input_instance *, + void *, void *)); FLB_EXPORT int flb_output_set_callback(flb_ctx_t *ctx, int ffd, char *name, void (*cb)(char *, void *, void *)); FLB_EXPORT int flb_output_set_http_test(flb_ctx_t *ctx, int ffd, char *test_name, diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index c8b007806f2..bfa1cd0ee2b 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -143,9 +143,24 @@ struct flb_test_out_formatter { */ void *rt_data; - /* optional context for flush callback */ + /* optional context for "flush context callback" */ void *flush_ctx; + /* + * Callback + * ========= + * Optional "flush context callback": it references the function that extracts + * optional flush context for "formatter callback". + */ + void *(*flush_ctx_callback) (/* Fluent Bit context */ + struct flb_config *, + /* plugin that ingested the records */ + struct flb_input_instance *, + /* plugin instance context */ + void *plugin_context, + /* context for "flush context callback" */ + void *flush_ctx); + /* * Callback * ========= diff --git a/src/flb_engine_dispatch.c b/src/flb_engine_dispatch.c index b161888e679..903984a5ad3 100644 --- a/src/flb_engine_dispatch.c +++ b/src/flb_engine_dispatch.c @@ -101,18 +101,23 @@ int flb_engine_dispatch_retry(struct flb_task_retry *retry, static void test_run_formatter(struct flb_config *config, struct flb_input_instance *i_ins, struct flb_output_instance *o_ins, - struct flb_task *task, - void *flush_ctx) + struct flb_task *task) { int ret; void *out_buf = NULL; size_t out_size = 0; struct flb_test_out_formatter *otf; struct flb_event_chunk *evc; + void *flush_ctx; otf = &o_ins->test_formatter; evc = task->event_chunk; + flush_ctx = otf->flush_ctx; + if (otf->flush_ctx_callback) { + flush_ctx = otf->flush_ctx_callback(config, i_ins, o_ins->context, flush_ctx); + } + /* Invoke the output plugin formatter test callback */ ret = otf->callback(config, i_ins, @@ -176,9 +181,7 @@ static int tasks_start(struct flb_input_instance *in, out->test_formatter.callback != NULL) { /* Run the formatter test */ - test_run_formatter(config, in, out, - task, - out->test_formatter.flush_ctx); + test_run_formatter(config, in, out, task); /* Remove the route */ mk_list_del(&route->_head); diff --git a/src/flb_lib.c b/src/flb_lib.c index b9674e06824..df5cedf6b49 100644 --- a/src/flb_lib.c +++ b/src/flb_lib.c @@ -549,6 +549,18 @@ int flb_output_set_test(flb_ctx_t *ctx, int ffd, char *test_name, void (*out_callback) (void *, int, int, void *, size_t, void *), void *out_callback_data, void *test_ctx) +{ + return flb_output_set_test_with_ctx_callback(ctx, ffd, test_name, out_callback, + out_callback_data, test_ctx, NULL); +} + +int flb_output_set_test_with_ctx_callback(flb_ctx_t *ctx, int ffd, char *test_name, + void (*out_callback) (void *, int, int, void *, size_t, void *), + void *out_callback_data, + void *test_ctx, + void *(*test_ctx_callback) (struct flb_config *, + struct flb_input_instance *, + void *, void *)) { struct flb_output_instance *o_ins; @@ -570,6 +582,7 @@ int flb_output_set_test(flb_ctx_t *ctx, int ffd, char *test_name, o_ins->test_formatter.rt_out_callback = out_callback; o_ins->test_formatter.rt_data = out_callback_data; o_ins->test_formatter.flush_ctx = test_ctx; + o_ins->test_formatter.flush_ctx_callback = test_ctx_callback; } else { return -1;