diff --git a/plugins/in_node_exporter_metrics/CMakeLists.txt b/plugins/in_node_exporter_metrics/CMakeLists.txt index 16584dc0609..0a6e4d18247 100644 --- a/plugins/in_node_exporter_metrics/CMakeLists.txt +++ b/plugins/in_node_exporter_metrics/CMakeLists.txt @@ -11,6 +11,7 @@ set(src ne_loadavg.c ne_filefd_linux.c ne_textfile.c + ne_processes.c ne_utils.c ne_config.c ne.c diff --git a/plugins/in_node_exporter_metrics/ne.c b/plugins/in_node_exporter_metrics/ne.c index d7781795788..b241cc09f6b 100644 --- a/plugins/in_node_exporter_metrics/ne.c +++ b/plugins/in_node_exporter_metrics/ne.c @@ -41,6 +41,7 @@ #include "ne_netdev.h" #include "ne_textfile.h" #include "ne_systemd.h" +#include "ne_processes.h" static int ne_timer_cpu_metrics_cb(struct flb_input_instance *ins, struct flb_config *config, void *in_context) @@ -182,6 +183,15 @@ static int ne_timer_systemd_metrics_cb(struct flb_input_instance *ins, return 0; } +static int ne_timer_processes_metrics_cb(struct flb_input_instance *ins, + struct flb_config *config, void *in_context) +{ + struct flb_ne *ctx = in_context; + + ne_processes_update(ctx); + + return 0; +} struct flb_ne_callback { char *name; void (*func)(char *, void *, void *); @@ -329,6 +339,13 @@ static void ne_systemd_update_cb(char *name, void *p1, void *p2) ne_systemd_update(ctx); } +static void ne_processes_update_cb(char *name, void *p1, void *p2) +{ + struct flb_ne *ctx = p1; + + ne_processes_update(ctx); +} + static int ne_update_cb(struct flb_ne *ctx, char *name) { int ret; @@ -356,6 +373,7 @@ struct flb_ne_callback ne_callbacks[] = { { "filefd", ne_filefd_update_cb }, { "textfile", ne_textfile_update_cb }, { "systemd", ne_systemd_update_cb }, + { "processes", ne_processes_update_cb }, { 0 } }; @@ -392,6 +410,7 @@ static int in_ne_init(struct flb_input_instance *in, ctx->coll_filefd_fd = -1; ctx->coll_textfile_fd = -1; ctx->coll_systemd_fd = -1; + ctx->coll_processes_fd = -1; ctx->callback = flb_callback_create(in->name); if (!ctx->callback) { @@ -701,6 +720,26 @@ static int in_ne_init(struct flb_input_instance *in, } ne_systemd_init(ctx); } + else if (strncmp(entry->str, "processes", 9) == 0) { + if (ctx->processes_scrape_interval == 0) { + flb_plg_debug(ctx->ins, "enabled metrics %s", entry->str); + metric_idx = 14; + } + else if (ctx->processes_scrape_interval > 0) { + /* Create the filefd collector */ + ret = flb_input_set_collector_time(in, + ne_timer_processes_metrics_cb, + ctx->processes_scrape_interval, 0, + config); + if (ret == -1) { + flb_plg_error(ctx->ins, + "could not set systemd collector for Node Exporter Metrics plugin"); + return -1; + } + ctx->coll_processes_fd = ret; + } + ne_processes_init(ctx); + } else { flb_plg_warn(ctx->ins, "Unknown metrics: %s", entry->str); metric_idx = -1; @@ -786,6 +825,9 @@ static int in_ne_exit(void *data, struct flb_config *config) else if (strncmp(entry->str, "systemd", 7) == 0) { ne_systemd_exit(ctx); } + else if (strncmp(entry->str, "processes", 9) == 0) { + ne_processes_exit(ctx); + } else { flb_plg_warn(ctx->ins, "Unknown metrics: %s", entry->str); } @@ -817,6 +859,9 @@ static int in_ne_exit(void *data, struct flb_config *config) if (ctx->coll_systemd_fd != -1) { ne_systemd_exit(ctx); } + if (ctx->coll_processes_fd != -1) { + ne_processes_exit(ctx); + } flb_ne_config_destroy(ctx); @@ -870,6 +915,9 @@ static void in_ne_pause(void *data, struct flb_config *config) if (ctx->coll_systemd_fd != -1) { flb_input_collector_pause(ctx->coll_systemd_fd, ctx->ins); } + if (ctx->coll_processes_fd != -1) { + flb_input_collector_pause(ctx->coll_processes_fd, ctx->ins); + } } static void in_ne_resume(void *data, struct flb_config *config) @@ -919,6 +967,9 @@ static void in_ne_resume(void *data, struct flb_config *config) if (ctx->coll_systemd_fd != -1) { flb_input_collector_resume(ctx->coll_systemd_fd, ctx->ins); } + if (ctx->coll_processes_fd != -1) { + flb_input_collector_resume(ctx->coll_processes_fd, ctx->ins); + } } /* Configuration properties map */ @@ -1013,6 +1064,12 @@ static struct flb_config_map config_map[] = { "scrape interval to collect systemd metrics from the node." }, + { + FLB_CONFIG_MAP_TIME, "collector.processes.scrape_interval", "0", + 0, FLB_TRUE, offsetof(struct flb_ne, processes_scrape_interval), + "scrape interval to collect processes metrics from the node." + }, + { FLB_CONFIG_MAP_CLIST, "metrics", "cpu,cpufreq,meminfo,diskstats,filesystem,uname,stat,time,loadavg,vmstat,netdev,filefd,systemd", diff --git a/plugins/in_node_exporter_metrics/ne.h b/plugins/in_node_exporter_metrics/ne.h index ba6e89caa94..253df338f89 100644 --- a/plugins/in_node_exporter_metrics/ne.h +++ b/plugins/in_node_exporter_metrics/ne.h @@ -66,6 +66,7 @@ struct flb_ne { int filefd_scrape_interval; int textfile_scrape_interval; int systemd_scrape_interval; + int processes_scrape_interval; int coll_cpu_fd; /* collector fd (cpu) */ int coll_cpufreq_fd; /* collector fd (cpufreq) */ @@ -81,6 +82,7 @@ struct flb_ne { int coll_filefd_fd; /* collector fd (filefd) */ int coll_textfile_fd; /* collector fd (textfile) */ int coll_systemd_fd ; /* collector fd (systemd) */ + int coll_processes_fd ; /* collector fd (processes) */ /* * Metrics Contexts @@ -186,6 +188,14 @@ struct flb_ne { struct flb_regex *systemd_regex_exclude_list; double libsystemd_version; char *libsystemd_version_text; + + /* processes */ + struct cmt_gauge *processes_thread_alloc; + struct cmt_gauge *processes_threads_limit; + struct cmt_gauge *processes_threads_state; + struct cmt_gauge *processes_procs_state; + struct cmt_gauge *processes_pid_used; + struct cmt_gauge *processes_pid_max; }; #endif diff --git a/plugins/in_node_exporter_metrics/ne_processes.c b/plugins/in_node_exporter_metrics/ne_processes.c new file mode 100644 index 00000000000..eb8f13d5190 --- /dev/null +++ b/plugins/in_node_exporter_metrics/ne_processes.c @@ -0,0 +1,22 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifdef __linux__ +#include "ne_processes_linux.c" +#endif diff --git a/plugins/in_node_exporter_metrics/ne_processes.h b/plugins/in_node_exporter_metrics/ne_processes.h new file mode 100644 index 00000000000..05514bd4bf0 --- /dev/null +++ b/plugins/in_node_exporter_metrics/ne_processes.h @@ -0,0 +1,29 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_NE_PROCESSES_H +#define FLB_IN_NE_PROCESSES_H + +#include "ne.h" + +int ne_processes_init(struct flb_ne *ctx); +int ne_processes_update(struct flb_ne *ctx); +int ne_processes_exit(struct flb_ne *ctx); + +#endif diff --git a/plugins/in_node_exporter_metrics/ne_processes_linux.c b/plugins/in_node_exporter_metrics/ne_processes_linux.c new file mode 100644 index 00000000000..431c8f268ff --- /dev/null +++ b/plugins/in_node_exporter_metrics/ne_processes_linux.c @@ -0,0 +1,416 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "ne.h" +#include "ne_utils.h" + +#include + +static int processes_configure(struct flb_ne *ctx) +{ + struct cmt_gauge *g; + + /* node_processes_threads_max */ + g = cmt_gauge_create(ctx->cmt, "node", "processes", "threads", + "Allocated threads in the system", + 0, NULL); + if (!g) { + return -1; + } + ctx->processes_thread_alloc = g; + + /* node_processes_threads_max */ + g = cmt_gauge_create(ctx->cmt, "node", "processes", "max_threads", + "Limit of threads in the system", + 0, NULL); + if (!g) { + return -1; + } + ctx->processes_threads_limit = g; + + /* node_processes_threads_state */ + g = cmt_gauge_create(ctx->cmt, "node", "processes", "threads_state", + "The number of processes in each thread state", + 1, (char *[]) {"thread_state"}); + if (!g) { + return -1; + } + ctx->processes_threads_state = g; + + /* node_processes_state */ + g = cmt_gauge_create(ctx->cmt, "node", "processes", "state", + "The number of processes in each state", + 1, (char *[]) {"state"}); + if (!g) { + return -1; + } + ctx->processes_procs_state = g; + + /* node_processes_pids */ + g = cmt_gauge_create(ctx->cmt, "node", "processes", "pids", + "The number of PIDs in the system", + 0, NULL); + if (!g) { + return -1; + } + ctx->processes_pid_used = g; + + /* node_processes_max_processeses */ + g = cmt_gauge_create(ctx->cmt, "node", "processes", "max_processeses", + "Limit of PID in the system", + 0, NULL); + if (!g) { + return -1; + } + ctx->processes_pid_max = g; + + return 0; +} + +struct proc_state { + int64_t running; + int64_t interruptible_sleeping; + int64_t uninterruptible_sleeping; + int64_t zombie; + int64_t stopped; + int64_t idle; +}; + +static int update_processes_proc_state(struct flb_ne *ctx, struct proc_state *state, char* state_str) +{ + if (strcmp(state_str, "R") == 0) { + state->running++; + } + else if (strcmp(state_str, "S") == 0) { + state->interruptible_sleeping++; + } + else if (strcmp(state_str, "D") == 0) { + state->uninterruptible_sleeping++; + } + else if (strcmp(state_str, "Z") == 0) { + state->zombie++; + } + else if (strcmp(state_str, "T") == 0) { + state->stopped++; + } + else if (strcmp(state_str, "I") == 0) { + state->idle++; + } + + return 0; +} + +static int check_path_for_proc(struct flb_ne *ctx, const char *prefix, const char *path) +{ + int len; + flb_sds_t p; + + /* Compose the proc path */ + p = flb_sds_create(prefix); + if (!p) { + return -1; + } + + if (path) { + flb_sds_cat_safe(&p, "/", 1); + len = strlen(path); + flb_sds_cat_safe(&p, path, len); + } + + if (access(p, F_OK) == -1 && + (errno == ENOENT || errno == ESRCH)) { + flb_plg_debug(ctx->ins, "error reading stat for path %s. errno = %d", p, errno); + flb_sds_destroy(p); + + return -1; + } + + flb_sds_destroy(p); + return 0; +} + +static int processes_thread_update(struct flb_ne *ctx, flb_sds_t pid_str, flb_sds_t pstate_str, + struct proc_state *tstate) +{ + int ret; + flb_sds_t tmp; + flb_sds_t tid_str; + flb_sds_t state_str; + const char *pattern = "/[0-9]*"; + struct mk_list *head; + struct mk_list *ehead; + struct mk_list thread_list; + struct mk_list stat_list; + struct mk_list split_list; + struct flb_slist_entry *thread; + struct flb_slist_entry *entry; + char thread_procfs[PATH_MAX]; + + snprintf(thread_procfs, sizeof(thread_procfs) - 1, "%s/%s/task", ctx->path_procfs, pid_str); + + /* scan thread entries */ + ret = ne_utils_path_scan(ctx, thread_procfs, pattern, NE_SCAN_DIR, &thread_list); + if (ret != 0) { + return -1; + } + + if (mk_list_size(&thread_list) == 0) { + return 0; + } + + /* thread entries */ + mk_list_foreach(head, &thread_list) { + thread = mk_list_entry(head, struct flb_slist_entry, _head); + tid_str = thread->str + strlen(thread_procfs) + 1; + + /* When pid and tid are equal, the state of the thread should be the same + * for pid's. */ + if (strcmp(tid_str, pid_str) == 0) { + update_processes_proc_state(ctx, tstate, pstate_str); + continue; + } + + if (check_path_for_proc(ctx, thread->str, "stat") != 0) { + continue; + } + + mk_list_init(&stat_list); + ret = ne_utils_file_read_lines(thread->str, "/stat", &stat_list); + if (ret == -1) { + continue; + } + + mk_list_foreach(ehead, &stat_list) { + entry = mk_list_entry(ehead, struct flb_slist_entry, _head); + + /* split with the close parenthesis. + * The entry of processes stat will start after that. */ + tmp = strstr(entry->str, ")"); + if (tmp == NULL) { + continue; + } + + mk_list_init(&split_list); + ret = flb_slist_split_string(&split_list, tmp+2, ' ', -1); + if (ret == -1) { + continue; + } + + /* Thread State */ + entry = flb_slist_entry_get(&split_list, 0); + state_str = entry->str; + update_processes_proc_state(ctx, tstate, state_str); + + flb_slist_destroy(&split_list); + } + flb_slist_destroy(&stat_list); + } + + flb_slist_destroy(&thread_list); + + return 0; +} + +static int processes_update(struct flb_ne *ctx) +{ + int ret; + flb_sds_t tmp; + flb_sds_t pid_str; + flb_sds_t state_str; + flb_sds_t thread_str; + struct mk_list *head; + struct mk_list *ehead; + struct mk_list procfs_list; + struct mk_list stat_list; + struct mk_list split_list; + struct flb_slist_entry *process; + struct flb_slist_entry *entry; + uint64_t val; + uint64_t ts; + const char *pattern = "/[0-9]*"; + int64_t pids = 0; + int64_t threads = 0; + struct proc_state pstate = { + .running = 0, + .interruptible_sleeping = 0, + .uninterruptible_sleeping = 0, + .zombie = 0, + .stopped = 0, + .idle = 0 + }; + struct proc_state tstate = { + .running = 0, + .interruptible_sleeping = 0, + .uninterruptible_sleeping = 0, + .zombie = 0, + .stopped = 0, + .idle = 0 + }; + + mk_list_init(&procfs_list); + + ts = cfl_time_now(); + + ret = ne_utils_file_read_uint64(ctx->path_procfs, "/sys", "kernel", "threads-max", &val); + if (ret == -1) { + return -1; + } + + /* node_processes_threads_max */ + if (ret == 0) { + cmt_gauge_set(ctx->processes_threads_limit, ts, + (double)val, 0, NULL); + } + + ret = ne_utils_file_read_uint64(ctx->path_procfs, "/sys", "kernel", "pid_max", &val); + if (ret == -1) { + return -1; + } + + /* node_processes_max_processes */ + if (ret == 0) { + cmt_gauge_set(ctx->processes_pid_max, ts, + (double)val, 0, NULL); + } + + /* scan pid entries */ + ret = ne_utils_path_scan(ctx, ctx->path_procfs, pattern, NE_SCAN_DIR, &procfs_list); + if (ret != 0) { + return -1; + } + + if (mk_list_size(&procfs_list) == 0) { + return 0; + } + + /* PID entries */ + mk_list_foreach(head, &procfs_list) { + process = mk_list_entry(head, struct flb_slist_entry, _head); + pid_str = process->str + strlen(ctx->path_procfs) + 1; + + if (check_path_for_proc(ctx, process->str, "stat") != 0) { + continue; + } + + mk_list_init(&stat_list); + ret = ne_utils_file_read_lines(process->str, "/stat", &stat_list); + if (ret == -1) { + continue; + } + + mk_list_foreach(ehead, &stat_list) { + entry = mk_list_entry(ehead, struct flb_slist_entry, _head); + + /* split with the close parenthesis. + * The entry of processes stat will start after that. */ + tmp = strstr(entry->str, ")"); + if (tmp == NULL) { + continue; + } + + mk_list_init(&split_list); + ret = flb_slist_split_string(&split_list, tmp+2, ' ', -1); + if (ret == -1) { + continue; + } + + /* State */ + entry = flb_slist_entry_get(&split_list, 0); + state_str = entry->str; + update_processes_proc_state(ctx, &pstate, state_str); + + /* Threads */ + entry = flb_slist_entry_get(&split_list, 17); + thread_str = entry->str; + + /* Collect the number of threads */ + if (ne_utils_str_to_uint64(thread_str, &val) != -1) { + threads += val; + } + + /* Collect the states of threads */ + ret = processes_thread_update(ctx, pid_str, state_str, &tstate); + if (ret != 0) { + flb_slist_destroy(&split_list); + continue; + } + + flb_slist_destroy(&split_list); + } + flb_slist_destroy(&stat_list); + + pids++; + } + + /* node_processes_state + * Note: we don't use hash table for it. Because we need to update + * every state of the processes due to architecture reasons of cmetrics. + */ + cmt_gauge_set(ctx->processes_procs_state, ts, pstate.running, 1, (char *[]){ "R" }); + cmt_gauge_set(ctx->processes_procs_state, ts, pstate.interruptible_sleeping, 1, (char *[]){ "S" }); + cmt_gauge_set(ctx->processes_procs_state, ts, pstate.uninterruptible_sleeping, 1, (char *[]){ "D" }); + cmt_gauge_set(ctx->processes_procs_state, ts, pstate.zombie, 1, (char *[]){ "Z" }); + cmt_gauge_set(ctx->processes_procs_state, ts, pstate.stopped, 1, (char *[]){ "T" }); + cmt_gauge_set(ctx->processes_procs_state, ts, pstate.idle, 1, (char *[]){ "I" }); + + /* node_processes_threads_state + * Note: we don't use hash table for it. Because we need to update + * every state of the processes due to architecture reasons of cmetrics. + */ + cmt_gauge_set(ctx->processes_threads_state, ts, tstate.running, 1, (char *[]){ "R" }); + cmt_gauge_set(ctx->processes_threads_state, ts, tstate.interruptible_sleeping, 1, (char *[]){ "S" }); + cmt_gauge_set(ctx->processes_threads_state, ts, tstate.uninterruptible_sleeping, 1, (char *[]){ "D" }); + cmt_gauge_set(ctx->processes_threads_state, ts, tstate.zombie, 1, (char *[]){ "Z" }); + cmt_gauge_set(ctx->processes_threads_state, ts, tstate.stopped, 1, (char *[]){ "T" }); + cmt_gauge_set(ctx->processes_threads_state, ts, tstate.idle, 1, (char *[]){ "I" }); + + /* node_processes_threads */ + cmt_gauge_set(ctx->processes_thread_alloc, ts, + (double)threads, 0, NULL); + + /* node_processes_pids */ + cmt_gauge_set(ctx->processes_pid_used, ts, + (double)pids, 0, NULL); + + + flb_slist_destroy(&procfs_list); + + return 0; +} + +int ne_processes_init(struct flb_ne *ctx) +{ + processes_configure(ctx); + return 0; +} + +int ne_processes_update(struct flb_ne *ctx) +{ + processes_update(ctx); + return 0; +} + +int ne_processes_exit(struct flb_ne *ctx) +{ + return 0; +}