Skip to content

Commit

Permalink
output: add support for scheduled timer jobs with coroutines
Browse files Browse the repository at this point in the history
Signed-off-by: Wesley Pettit <[email protected]>
  • Loading branch information
PettitWesley committed Nov 29, 2023
1 parent 94be88a commit bbaa4d8
Show file tree
Hide file tree
Showing 2 changed files with 219 additions and 1 deletion.
213 changes: 212 additions & 1 deletion include/fluent-bit/flb_output.h
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,10 @@ struct flb_output_instance {
struct mk_list flush_list;
struct mk_list flush_list_destroy;

/* similar to flush coroutine list above, timer coroutine list */
struct mk_list timer_coro_list;
struct mk_list timer_coro_list_destroy;

/* Keep a reference to the original context this instance belongs to */
struct flb_config *config;
};
Expand Down Expand Up @@ -470,6 +474,28 @@ struct flb_output_flush {
struct mk_list _head; /* Link to flb_task->threads */
};

/*
* stores timer coros on the timer_coro_list, if the output uses them
*/
struct flb_output_timer_coro {
struct flb_config *config; /* FLB context */
struct flb_output_instance *o_ins; /* output instance */
struct flb_output_coro_timer_data *timer_data; /* callback info */
struct flb_coro *coro; /* parent coro addr */
struct mk_list _head; /* Link to timer_coro_list */
};

/*
* If the output uses timer coros, then this is used as the callback data
* passed to flb_sched_timer_cb_create
*/
struct flb_output_coro_timer_data {
struct flb_output_instance *ins; /* associate coro with this output instance */
flb_sds_t job_name; /* used on engine shutdown, print pending "custom" jobs */
void (*cb) (struct flb_config *config, void *data); /* call this output callback in the coro */
void *data; /* opaque data to pass to the above cb */
};

static FLB_INLINE int flb_output_is_threaded(struct flb_output_instance *ins)
{
return ins->is_threaded;
Expand All @@ -485,6 +511,19 @@ static FLB_INLINE void flb_output_flush_destroy(struct flb_output_flush *out_flu
flb_free(out_flush);
}

/*
* See below note for flb_out_flush_params
* this is equivalent for timer coroutines
*/
struct flb_out_timer_coro_params {
struct flb_output_timer_coro *output_timer; /* output flush */
struct flb_output_coro_timer_data *timer_data; /* callback info */
struct flb_config *config; /* Fluent Bit context */
struct flb_coro *coro; /* coroutine context */
};

extern FLB_TLS_DEFINE(struct flb_out_timer_coro_params, timer_coro_params);

/*
* libco do not support parameters in the entrypoint function due to the
* complexity of implementation in terms of architecture and compiler, but
Expand Down Expand Up @@ -599,6 +638,125 @@ static FLB_INLINE void output_pre_cb_flush(void)
persisted_params.config);
}

/* same as above but for timer coros */
static FLB_INLINE void output_pre_timer_cb(void)
{
struct flb_coro *coro;
struct flb_out_timer_coro_params *params;
struct flb_output_instance *o_ins;
struct flb_out_thread_instance *th_ins;
struct flb_output_timer_coro *timer_coro;


params = (struct flb_out_timer_coro_params *) FLB_TLS_GET(timer_coro_params);
if (!params) {
flb_error("[output] no timer coro params defined, unexpected");
return;
}

coro = params->coro;
timer_coro = params->output_timer;
o_ins = params->output_timer->o_ins;

/* Run the callback provided by the output plugin */
timer_data->cb(params->config, params->timer_data->data);

/* move coro to destroy queue */
if (flb_output_is_threaded(o_ins) == FLB_TRUE) {
th_ins = flb_output_thread_instance_get();
pthread_mutex_lock(&th_ins->timer_mutex);
mk_list_del(&timer_coro->_head);
mk_list_add(&timer_coro->_head, &th_ins->timer_coro_list_destroy);
pthread_mutex_unlock(&th_ins->timer_mutex);
}
else {
mk_list_del(&timer_coro->_head);
mk_list_add(&timer_coro->_head, &o_ins->timer_coro_list_destroy);
}

/* timer coro is complete; yield back to caller/control code */
flb_coro_yield(coro, FLB_TRUE);
}

/*
* If the output uses scheduled timers with coroutines,
* this function is used as the callback for flb_sched_timer_cb_create
*/
static FLB_INLINE
void flb_output_coro_timer_cb(struct flb_config *config, void *data)
{
size_t stack_size;
struct flb_coro *coro;
struct flb_output_timer_coro *timer_coro;
struct flb_out_thread_instance *th_ins;
struct flb_output_coro_timer_data *ctx = (struct flb_output_coro_timer_data *) data;
struct flb_out_timer_coro_params *params;
struct flb_output_instance *o_ins;

/* Custom output coroutine info */
timer_coro = (struct flb_output_timer_coro *) flb_calloc(1, sizeof(struct flb_output_timer_coro));
if (!timer_coro) {
flb_errno();
return;
}

/* Create a new co-routine */
coro = flb_coro_create(timer_coro);
if (!coro) {
flb_free(timer_coro);
return;
}

o_ins = ctx->ins;
timer_coro->o_ins = o_ins;
timer_coro->config = config;
timer_coro->coro = coro;

coro->callee = co_create(config->coro_stack_size,
output_pre_timer_cb, &stack_size);

if (coro->callee == NULL) {
flb_coro_destroy(coro);
flb_free(timer_coro);
return;
}

#ifdef FLB_HAVE_VALGRIND
coro->valgrind_stack_id = \
VALGRIND_STACK_REGISTER(coro->callee, ((char *) coro->callee) + stack_size);
#endif

if (o_ins->is_threaded == FLB_TRUE) {
th_ins = flb_output_thread_instance_get();
pthread_mutex_lock(&th_ins->timer_mutex);
mk_list_add(&timer_coro->_head, &th_ins->timer_coro_list);
pthread_mutex_unlock(&th_ins->timer_mutex);
}
else {
mk_list_add(&timer_coro->_head, &o_ins->timer_coro_list);
}

params = (struct flb_out_timer_coro_params *) FLB_TLS_GET(timer_coro_params);
if (!params) {
params = (struct flb_out_timer_coro_params *) flb_calloc(1, sizeof(struct flb_out_flush_params));
if (!params) {
flb_errno();
return;
}
}

/* Callback parameters in order */
params->output_timer = timer_coro;
params->timer_data = ctx;
params->config = config;
params->coro = coro;

FLB_TLS_SET(timer_coro_params, params);
coro->caller = co_active();
flb_coro_resume(coro);
return;
}

void flb_output_flush_prepare_destroy(struct flb_output_flush *out_flush);
int flb_output_flush_id_get(struct flb_output_instance *ins);

Expand Down Expand Up @@ -1004,7 +1162,11 @@ static inline void flb_output_return(int ret, struct flb_coro *co) {
flb_output_flush_prepare_destroy(out_flush);
}

/* return the number of co-routines running in the instance */
/*
* return the number of flush co-routines running in the instance
* Currently, this function is only used for FLB_OUTPUT_NO_MULTIPLEX
* and does not count timer_coros, used by S3 output
*/
static inline int flb_output_coros_size(struct flb_output_instance *ins)
{
int size = 0;
Expand All @@ -1023,6 +1185,55 @@ static inline int flb_output_coros_size(struct flb_output_instance *ins)
return size;
}

/* Used in engine flb_running_count */
static inline int flb_output_timer_coros_size(struct flb_output_instance *ins)
{
int size = 0;

if (flb_output_is_threaded(ins) == FLB_TRUE) {
/*
* On threaded mode, we need to count the active co-routines of
* every running thread of the thread pool.
*/
size = flb_output_thread_pool_timer_coros_size(ins);
}
else {
size = mk_list_size(&ins->timer_coro_list);
}

return size;
}

static inline void flb_timer_coros_print(struct mk_list *timer_coro_list)
{
struct flb_output_timer_coro *timer_coro;
struct mk_list *tmp;
struct mk_list *head;
int n = mk_list_size(timer_coro_list);
if (n != 0) {
/* get one coro for the job_name */
mk_list_foreach_safe(head, tmp, timer_coro_list) {
timer_coro = mk_list_entry(head, struct flb_output_timer_coro, _head);
if (timer_coro != NULL) {
flb_info("[task] output=%s still running %d %s(s)",
timer_coro->o_ins->alias, n, timer_coro->timer_data->job_name);
break;
}
}
}
}

/* Used in engine flb_running_print */
static inline void flb_output_timer_coros_print(struct flb_output_instance *ins)
{
if (flb_output_is_threaded(ins) == FLB_TRUE) {
flb_output_thread_pool_timer_coros_print(ins);
}
else {
flb_timer_coros_print(&ins->timer_coro_list);
}
}

static inline void flb_output_return_do(int x)
{
struct flb_coro *coro;
Expand Down
7 changes: 7 additions & 0 deletions src/flb_output.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@
#include <fluent-bit/flb_pack.h>

FLB_TLS_DEFINE(struct flb_out_flush_params, out_flush_params);
FLB_TLS_DEFINE(struct flb_out_timer_coro_params, timer_coro_params);

void flb_output_prepare()
{
FLB_TLS_INIT(out_flush_params);
FLB_TLS_INIT(timer_coro_params);
}

/* Validate the the output address protocol */
Expand Down Expand Up @@ -480,6 +482,11 @@ void flb_output_exit(struct flb_config *config)
if (params) {
flb_free(params);
}
params = FLB_TLS_GET(timer_coro_params);
if (params) {
flb_free(params);
}

}

static inline int instance_id(struct flb_config *config)
Expand Down

0 comments on commit bbaa4d8

Please sign in to comment.