Skip to content

Commit

Permalink
in_process_exporter_metrics: Implement tunring on/off metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Hiroshi Hatake <[email protected]>
  • Loading branch information
cosmo0920 committed Oct 25, 2023
1 parent 6708c73 commit e86b866
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 83 deletions.
8 changes: 8 additions & 0 deletions plugins/in_process_exporter_metrics/pe.c
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,14 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_pe, process_regex_exclude_list_text),
"exclude list regular expression"
},

{
FLB_CONFIG_MAP_CLIST, "metrics",
PE_DEFAULT_ENABLED_METRICS,
0, FLB_TRUE, offsetof(struct flb_pe, metrics),
"Comma separated list of keys to enable metrics."
},

/* EOF */
{0}
};
Expand Down
20 changes: 17 additions & 3 deletions plugins/in_process_exporter_metrics/pe.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,28 @@
#include <fluent-bit/flb_hash_table.h>
#include <fluent-bit/flb_metrics.h>

#define PE_DEFAULT_ENABLED_METRICS "cpu,io,memory,state,context_switches,fd,start_time,thread_wchan,thread"

#define METRIC_CPU (1 << 0)
#define METRIC_IO (1 << 1)
#define METRIC_MEMORY (1 << 2)
#define METRIC_STATE (1 << 3)
#define METRIC_CTXT (1 << 4)
#define METRIC_FD (1 << 5)
#define METRIC_START_TIME (1 << 6)
#define METRIC_THREAD_WCHAN (1 << 7)
#define METRIC_THREAD (1 << 8)

struct flb_pe {
/* configuration */
flb_sds_t path_procfs;
int scrape_interval;

int coll_fd; /* collector fd */
struct cmt *cmt; /* cmetrics context */
struct flb_input_instance *ins; /* input instance */
int coll_fd; /* collector fd */
struct cmt *cmt; /* cmetrics context */
struct flb_input_instance *ins; /* input instance */
struct mk_list *metrics; /* enabled metrics */
int enabled_flag; /* indicate enabled metrics */

/*
* Metrics Contexts
Expand Down
49 changes: 48 additions & 1 deletion plugins/in_process_exporter_metrics/pe_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ struct flb_pe *flb_pe_config_create(struct flb_input_instance *ins,
{
int ret;
struct flb_pe *ctx;
struct mk_list *head;
struct flb_slist_entry *entry;

ctx = flb_calloc(1, sizeof(struct flb_pe));
if (!ctx) {
Expand All @@ -42,6 +44,52 @@ struct flb_pe *flb_pe_config_create(struct flb_input_instance *ins,
return NULL;
}

/* Check and initialize enabled metrics */
if (ctx->metrics) {
mk_list_foreach(head, ctx->metrics) {
entry = mk_list_entry(head, struct flb_slist_entry, _head);
if (strncasecmp(entry->str, "cpu", 3) == 0) {
ctx->enabled_flag |= METRIC_CPU;
flb_plg_debug(ctx->ins, "enabled metrics %s", entry->str);
}
else if (strncasecmp(entry->str, "io", 2) == 0) {
ctx->enabled_flag |= METRIC_IO;
flb_plg_debug(ctx->ins, "enabled metrics %s", entry->str);
}
else if (strncasecmp(entry->str, "memory", 6) == 0) {
ctx->enabled_flag |= METRIC_MEMORY;
flb_plg_debug(ctx->ins, "enabled metrics %s", entry->str);
}
else if (strncasecmp(entry->str, "state", 5) == 0) {
ctx->enabled_flag |= METRIC_STATE;
flb_plg_debug(ctx->ins, "enabled metrics %s", entry->str);
}
else if (strncasecmp(entry->str, "context_switches", 16) == 0) {
ctx->enabled_flag |= METRIC_CTXT;
flb_plg_debug(ctx->ins, "enabled metrics %s", entry->str);
}
else if (strncasecmp(entry->str, "fd", 2) == 0) {
ctx->enabled_flag |= METRIC_FD;
flb_plg_debug(ctx->ins, "enabled metrics %s", entry->str);
}
else if (strncasecmp(entry->str, "start_time", 9) == 0) {
ctx->enabled_flag |= METRIC_START_TIME;
flb_plg_debug(ctx->ins, "enabled metrics %s", entry->str);
}
else if (strncasecmp(entry->str, "thread_wchan", 12) == 0) {
ctx->enabled_flag |= METRIC_THREAD_WCHAN;
flb_plg_debug(ctx->ins, "enabled metrics %s", entry->str);
}
else if (strncasecmp(entry->str, "thread", 6) == 0) {
ctx->enabled_flag |= METRIC_THREAD;
flb_plg_debug(ctx->ins, "enabled metrics %s", entry->str);
}
else {
flb_plg_warn(ctx->ins, "Unknown metrics: %s", entry->str);
}
}
}

/* mount points */
flb_plg_info(ins, "path.procfs = %s", ctx->path_procfs);

Expand All @@ -52,7 +100,6 @@ struct flb_pe *flb_pe_config_create(struct flb_input_instance *ins,
return NULL;
}


return ctx;
}

Expand Down
182 changes: 103 additions & 79 deletions plugins/in_process_exporter_metrics/pe_process.c
Original file line number Diff line number Diff line change
Expand Up @@ -1036,102 +1036,123 @@ static int process_update(struct flb_pe *ctx)
entry = flb_slist_entry_get(&split_list, 1);
ppid_str = entry->str;

/* node_processes_state
* Note: we don't use hash table for it. Because we need to update
* every state of the processes due to architecture reasons of cmetrics.
*/
cmt_gauge_set(ctx->states, ts, pstate.running, 4, (char *[]){ name, pid_str, ppid_str, "R" });
cmt_gauge_set(ctx->states, ts, pstate.interruptible_sleeping, 4, (char *[]){ name, pid_str, ppid_str, "S" });
cmt_gauge_set(ctx->states, ts, pstate.uninterruptible_sleeping, 4, (char *[]){ name, pid_str, ppid_str, "D" });
cmt_gauge_set(ctx->states, ts, pstate.zombie, 4, (char *[]){ name, pid_str, ppid_str, "Z" });
cmt_gauge_set(ctx->states, ts, pstate.stopped, 4, (char *[]){ name, pid_str, ppid_str, "T" });
cmt_gauge_set(ctx->states, ts, pstate.idle, 4, (char *[]){ name, pid_str, ppid_str, "I" });

/* CPU Seconds (user) */
entry = flb_slist_entry_get(&split_list, 11);
tmp = entry->str;
/* Collect the number of cpu_seconds per process */
if (pe_utils_str_to_uint64(tmp, &val) != -1) {
cmt_counter_set(ctx->cpu_seconds, ts, val/USER_HZ, 4, (char *[]){ name, pid_str, ppid_str, "user" });
}

/* CPU Seconds (system) */
entry = flb_slist_entry_get(&split_list, 12);
tmp = entry->str;
/* Collect the number of cpu_seconds per process */
if (pe_utils_str_to_uint64(tmp, &val) != -1) {
cmt_counter_set(ctx->cpu_seconds, ts, val/USER_HZ, 4, (char *[]){ name, pid_str, ppid_str, "system" });
/* State */
if (ctx->enabled_flag & METRIC_STATE) {
/* node_processes_state
* Note: we don't use hash table for it. Because we need to update
* every state of the processes due to architecture reasons of cmetrics.
*/
cmt_gauge_set(ctx->states, ts, pstate.running, 4, (char *[]){ name, pid_str, ppid_str, "R" });
cmt_gauge_set(ctx->states, ts, pstate.interruptible_sleeping, 4, (char *[]){ name, pid_str, ppid_str, "S" });
cmt_gauge_set(ctx->states, ts, pstate.uninterruptible_sleeping, 4, (char *[]){ name, pid_str, ppid_str, "D" });
cmt_gauge_set(ctx->states, ts, pstate.zombie, 4, (char *[]){ name, pid_str, ppid_str, "Z" });
cmt_gauge_set(ctx->states, ts, pstate.stopped, 4, (char *[]){ name, pid_str, ppid_str, "T" });
cmt_gauge_set(ctx->states, ts, pstate.idle, 4, (char *[]){ name, pid_str, ppid_str, "I" });
}

/* CPU */
if (ctx->enabled_flag & METRIC_CPU) {
/* CPU Seconds (user) */
entry = flb_slist_entry_get(&split_list, 11);
tmp = entry->str;
/* Collect the number of cpu_seconds per process */
if (pe_utils_str_to_uint64(tmp, &val) != -1) {
cmt_counter_set(ctx->cpu_seconds, ts, val/USER_HZ, 4, (char *[]){ name, pid_str, ppid_str, "user" });
}

/* CPU Seconds (system) */
entry = flb_slist_entry_get(&split_list, 12);
tmp = entry->str;
/* Collect the number of cpu_seconds per process */
if (pe_utils_str_to_uint64(tmp, &val) != -1) {
cmt_counter_set(ctx->cpu_seconds, ts, val/USER_HZ, 4, (char *[]){ name, pid_str, ppid_str, "system" });
}
}

/* StartTime */
entry = flb_slist_entry_get(&split_list, 19);
tmp = entry->str;
/* Collect the number of cpu_seconds per process */
if (pe_utils_str_to_uint64(tmp, &val) != -1) {
cmt_gauge_set(ctx->start_time, ts, (boot_time + val/USER_HZ), 3, (char *[]){ name, pid_str, ppid_str });
if (ctx->enabled_flag & METRIC_START_TIME) {
entry = flb_slist_entry_get(&split_list, 19);
tmp = entry->str;
/* Collect the number of cpu_seconds per process */
if (pe_utils_str_to_uint64(tmp, &val) != -1) {
cmt_gauge_set(ctx->start_time, ts, (boot_time + val/USER_HZ), 3, (char *[]){ name, pid_str, ppid_str });
}
}

/* Threads */
entry = flb_slist_entry_get(&split_list, 17);
thread_str = entry->str;
/* Collect the number of threads per process */
if (pe_utils_str_to_uint64(thread_str, &val) != -1) {
cmt_gauge_set(ctx->num_threads, ts, val, 3, (char *[]){ name, pid_str, ppid_str });
}

/* Memory Size */
entry = flb_slist_entry_get(&split_list, 20);
tmp = entry->str;
/* Collect the number of Virtual Memory per process */
if (pe_utils_str_to_uint64(tmp, &val) != -1) {
cmt_gauge_set(ctx->memory_bytes, ts, val, 4, (char *[]){ name, pid_str, ppid_str, "virtual_memory" });
}

entry = flb_slist_entry_get(&split_list, 21);
tmp = entry->str;
/* Collect the number of RSS per process */
if (pe_utils_str_to_uint64(tmp, &val) != -1) {
cmt_gauge_set(ctx->memory_bytes, ts, val, 4, (char *[]){ name, pid_str, ppid_str, "rss" });
}

/* Major Page Faults */
entry = flb_slist_entry_get(&split_list, 9);
tmp = entry->str;
/* Collect the number of major page faults per process */
if (pe_utils_str_to_uint64(tmp, &val) != -1) {
cmt_counter_set(ctx->major_page_faults, ts, val, 3, (char *[]){ name, pid_str, ppid_str });
}

/* Minor Page Faults */
entry = flb_slist_entry_get(&split_list, 7);
tmp = entry->str;
/* Collect the number of minor page faults per process */
if (pe_utils_str_to_uint64(tmp, &val) != -1) {
cmt_counter_set(ctx->minor_page_faults, ts, val, 3, (char *[]){ name, pid_str, ppid_str });
if (ctx->enabled_flag & METRIC_THREAD) {
entry = flb_slist_entry_get(&split_list, 17);
thread_str = entry->str;
/* Collect the number of threads per process */
if (pe_utils_str_to_uint64(thread_str, &val) != -1) {
cmt_gauge_set(ctx->num_threads, ts, val, 3, (char *[]){ name, pid_str, ppid_str });
}
}

/* Memory */
if (ctx->enabled_flag & METRIC_MEMORY) {
/* Memory Size */
entry = flb_slist_entry_get(&split_list, 20);
tmp = entry->str;
/* Collect the number of Virtual Memory per process */
if (pe_utils_str_to_uint64(tmp, &val) != -1) {
cmt_gauge_set(ctx->memory_bytes, ts, val, 4, (char *[]){ name, pid_str, ppid_str, "virtual_memory" });
}

entry = flb_slist_entry_get(&split_list, 21);
tmp = entry->str;
/* Collect the number of RSS per process */
if (pe_utils_str_to_uint64(tmp, &val) != -1) {
cmt_gauge_set(ctx->memory_bytes, ts, val, 4, (char *[]){ name, pid_str, ppid_str, "rss" });
}

/* Major Page Faults */
entry = flb_slist_entry_get(&split_list, 9);
tmp = entry->str;
/* Collect the number of major page faults per process */
if (pe_utils_str_to_uint64(tmp, &val) != -1) {
cmt_counter_set(ctx->major_page_faults, ts, val, 3, (char *[]){ name, pid_str, ppid_str });
}

/* Minor Page Faults */
entry = flb_slist_entry_get(&split_list, 7);
tmp = entry->str;
/* Collect the number of minor page faults per process */
if (pe_utils_str_to_uint64(tmp, &val) != -1) {
cmt_counter_set(ctx->minor_page_faults, ts, val, 3, (char *[]){ name, pid_str, ppid_str });
}
}

/* Collect fds */
ret = process_proc_fds(ctx, ts, pid_str, ppid_str, name, process);
if (ret == -1) {
flb_plg_debug(ctx->ins, "collect fds is failed on the pid = %s", pid_str);
if (ctx->enabled_flag & METRIC_FD) {
ret = process_proc_fds(ctx, ts, pid_str, ppid_str, name, process);
if (ret == -1) {
flb_plg_debug(ctx->ins, "collect fds is failed on the pid = %s", pid_str);
}
}

/* Collect thread_wchan */
ret = process_proc_wchan(ctx, ts, pid_str, name, process);
if (ret == -1) {
flb_plg_debug(ctx->ins, "collect wchan is failed on the pid = %s", pid_str);
if (ctx->enabled_flag & METRIC_THREAD_WCHAN) {
ret = process_proc_wchan(ctx, ts, pid_str, name, process);
if (ret == -1) {
flb_plg_debug(ctx->ins, "collect thread_wchan is failed on the pid = %s", pid_str);
}
}

/* Collect IO status */
ret = process_proc_io(ctx, ts, pid_str, ppid_str, name, process);
if (ret == -1) {
flb_plg_debug(ctx->ins, "collect process io procfs is failed on the pid = %s", pid_str);
if (ctx->enabled_flag & METRIC_IO) {
ret = process_proc_io(ctx, ts, pid_str, ppid_str, name, process);
if (ret == -1) {
flb_plg_debug(ctx->ins, "collect process io procfs is failed on the pid = %s", pid_str);
}
}

/* Collect the states of threads */
ret = process_thread_update(ctx, ts, pid_str, name);
if (ret == -1) {
flb_plg_debug(ctx->ins, "collect thread procfs is failed on the pid = %s", pid_str);
if (ctx->enabled_flag & METRIC_THREAD) {
ret = process_thread_update(ctx, ts, pid_str, name);
if (ret == -1) {
flb_plg_debug(ctx->ins, "collect thread procfs is failed on the pid = %s", pid_str);
}
}

cleanup:
Expand All @@ -1141,7 +1162,10 @@ static int process_update(struct flb_pe *ctx)
}
flb_slist_destroy(&stat_list);

process_proc_status(ctx, ts, pid_str, process);
/* Context Switches */
if (ctx->enabled_flag & METRIC_CTXT) {
process_proc_status(ctx, ts, pid_str, process);
}
}

flb_slist_destroy(&procfs_list);
Expand Down

0 comments on commit e86b866

Please sign in to comment.