Skip to content

Commit

Permalink
proof of concept for input plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
braydonk committed Aug 14, 2024
1 parent 945d333 commit bc455dc
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 164 deletions.
6 changes: 3 additions & 3 deletions include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ struct flb_config {
int is_running; /* service running ? */
double flush; /* Flush timeout */

/*
* Maximum grace time on shutdown. If set to -1, the engine will
/*
* Maximum grace time on shutdown. If set to -1, the engine will
* shutdown when all remaining tasks are flushed
*/
int grace;
int grace;
int grace_count; /* Count of grace shutdown tries */
flb_pipefd_t flush_fd; /* Timer FD associated to flush */
int convert_nan_to_null; /* convert null to nan ? */
Expand Down
81 changes: 44 additions & 37 deletions include/fluent-bit/flb_input.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ struct flb_input_instance {
/* flag to pause input when storage is full */
int storage_pause_on_chunks_overlimit;

struct flb_input_metrics *input_metrics;

/*
* Input network info:
*
Expand Down Expand Up @@ -309,43 +311,6 @@ struct flb_input_instance {
/* List of downstreams */
struct mk_list downstreams;

/*
* CMetrics
* --------
*
* All metrics available for an input plugin instance.
*/
struct cmt *cmt; /* parent context */
struct cmt_counter *cmt_bytes; /* metric: input_bytes_total */
struct cmt_counter *cmt_records; /* metric: input_records_total */

/* is the input instance overlimit ?: 1 or 0 */
struct cmt_gauge *cmt_storage_overlimit;

/* is the input instance paused or not ?: 1 or 0 */
struct cmt_gauge *cmt_ingestion_paused;

/* memory bytes used by chunks */
struct cmt_gauge *cmt_storage_memory_bytes;

/* total number of chunks */
struct cmt_gauge *cmt_storage_chunks;

/* total number of chunks up in memory */
struct cmt_gauge *cmt_storage_chunks_up;

/* total number of chunks down */
struct cmt_gauge *cmt_storage_chunks_down;

/* number of chunks in a busy state */
struct cmt_gauge *cmt_storage_chunks_busy;

/* total bytes used by chunks in a busy state */
struct cmt_gauge *cmt_storage_chunks_busy_bytes;

/* memory ring buffer (memrb) metrics */
struct cmt_counter *cmt_memrb_dropped_chunks;
struct cmt_counter *cmt_memrb_dropped_bytes;

/*
* Indexes for generated chunks: simple hash tables that keeps the latest
Expand Down Expand Up @@ -385,6 +350,46 @@ struct flb_input_instance {
struct flb_config *config;
};

struct flb_input_metrics {
/*
* CMetrics
* --------
*
* All metrics available for an input plugin instance.
*/
struct cmt *cmt; /* parent context */
struct cmt_counter *cmt_bytes; /* metric: input_bytes_total */
struct cmt_counter *cmt_records; /* metric: input_records_total */

/* is the input instance overlimit ?: 1 or 0 */
struct cmt_gauge *cmt_storage_overlimit;

/* is the input instance paused or not ?: 1 or 0 */
struct cmt_gauge *cmt_ingestion_paused;

/* memory bytes used by chunks */
struct cmt_gauge *cmt_storage_memory_bytes;

/* total number of chunks */
struct cmt_gauge *cmt_storage_chunks;

/* total number of chunks up in memory */
struct cmt_gauge *cmt_storage_chunks_up;

/* total number of chunks down */
struct cmt_gauge *cmt_storage_chunks_down;

/* number of chunks in a busy state */
struct cmt_gauge *cmt_storage_chunks_busy;

/* total bytes used by chunks in a busy state */
struct cmt_gauge *cmt_storage_chunks_busy_bytes;

/* memory ring buffer (memrb) metrics */
struct cmt_counter *cmt_memrb_dropped_chunks;
struct cmt_counter *cmt_memrb_dropped_bytes;
};

struct flb_input_collector {
struct mk_event event;
struct mk_event_loop *evl; /* event loop */
Expand Down Expand Up @@ -733,6 +738,8 @@ int flb_input_upstream_set(struct flb_upstream *u, struct flb_input_instance *in
int flb_input_downstream_set(struct flb_downstream *stream,
struct flb_input_instance *ins);

struct flb_input_metrics *flb_input_metrics_create();

/* processors */
int flb_input_instance_processors_load(struct flb_input_instance *ins, struct flb_cf_group *processors);

Expand Down
2 changes: 1 addition & 1 deletion plugins/in_fluentbit_metrics/metrics.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ static int in_metrics_init(struct flb_input_instance *in,
ctx->coll_fd_runtime = ret;

/* Internal metrics */
ctx->c = cmt_counter_create(ctx->ins->cmt,
ctx->c = cmt_counter_create(ctx->ins->input_metrics->cmt,
"fluentbit", "input_metrics", "scrapes_total",
"Number of total metrics scrapes",
1, (char *[]) {"name"});
Expand Down
6 changes: 3 additions & 3 deletions plugins/in_podman_metrics/podman_metrics.c
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ static int create_counter(struct flb_in_metrics *ctx, struct cmt_counter **count
/* if counter was not yet created, it means that this function is called for the first time per counter type */
if (*counter == NULL) {
flb_plg_debug(ctx->ins, "Creating counter for %s, %s_%s_%s", name, COUNTER_PREFIX, metric_prefix, metric_name);
*counter = cmt_counter_create(ctx->ins->cmt, COUNTER_PREFIX, metric_prefix, metric_name, description, label_count, fields);
*counter = cmt_counter_create(ctx->ins->input_metrics->cmt, COUNTER_PREFIX, metric_prefix, metric_name, description, label_count, fields);
}

/* Allow setting value that is not grater that current one (if, for example, memory usage stays exactly the same) */
Expand Down Expand Up @@ -279,7 +279,7 @@ static int create_gauge(struct flb_in_metrics *ctx, struct cmt_gauge **gauge, fl
/* if gauge was not yet created, it means that this function is called for the first time per counter type */
if (*gauge == NULL) {
flb_plg_debug(ctx->ins, "Creating gauge for %s, %s_%s_%s", name, COUNTER_PREFIX, metric_prefix, metric_name);
*gauge = cmt_gauge_create(ctx->ins->cmt, COUNTER_PREFIX, metric_prefix, metric_name, description, label_count, fields);
*gauge = cmt_gauge_create(ctx->ins->input_metrics->cmt, COUNTER_PREFIX, metric_prefix, metric_name, description, label_count, fields);
}

flb_plg_debug(ctx->ins, "Set gauge for %s, %s_%s_%s: %lu", name, COUNTER_PREFIX, metric_prefix, metric_name, value);
Expand Down Expand Up @@ -386,7 +386,7 @@ static int scrape_metrics(struct flb_config *config, struct flb_in_metrics *ctx)
return -1;
}

if (flb_input_metrics_append(ctx->ins, NULL, 0, ctx->ins->cmt) == -1) {
if (flb_input_metrics_append(ctx->ins, NULL, 0, ctx->ins->input_metrics->cmt) == -1) {
flb_plg_error(ctx->ins, "Could not append metrics");
return -1;
}
Expand Down
6 changes: 3 additions & 3 deletions plugins/in_tail/tail_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -396,19 +396,19 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
#endif

#ifdef FLB_HAVE_METRICS
ctx->cmt_files_opened = cmt_counter_create(ins->cmt,
ctx->cmt_files_opened = cmt_counter_create(ins->input_metrics->cmt,
"fluentbit", "input",
"files_opened_total",
"Total number of opened files",
1, (char *[]) {"name"});

ctx->cmt_files_closed = cmt_counter_create(ins->cmt,
ctx->cmt_files_closed = cmt_counter_create(ins->input_metrics->cmt,
"fluentbit", "input",
"files_closed_total",
"Total number of closed files",
1, (char *[]) {"name"});

ctx->cmt_files_rotated = cmt_counter_create(ins->cmt,
ctx->cmt_files_rotated = cmt_counter_create(ins->input_metrics->cmt,
"fluentbit", "input",
"files_rotated_total",
"Total number of rotated files",
Expand Down
Loading

0 comments on commit bc455dc

Please sign in to comment.