From cf4db8343c022ee8c2dd33e230599610d5cdb697 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 8 Nov 2023 07:29:29 -0600 Subject: [PATCH] in_process_exporter_metrics: implement process exporter metrics (#7943) --- CMakeLists.txt | 1 + plugins/CMakeLists.txt | 1 + .../CMakeLists.txt | 8 + plugins/in_process_exporter_metrics/pe.c | 174 +++ plugins/in_process_exporter_metrics/pe.h | 89 ++ .../in_process_exporter_metrics/pe_config.c | 117 ++ .../in_process_exporter_metrics/pe_config.h | 31 + .../in_process_exporter_metrics/pe_process.c | 1192 +++++++++++++++++ .../in_process_exporter_metrics/pe_process.h | 29 + .../in_process_exporter_metrics/pe_utils.c | 261 ++++ .../in_process_exporter_metrics/pe_utils.h | 39 + 11 files changed, 1942 insertions(+) create mode 100644 plugins/in_process_exporter_metrics/CMakeLists.txt create mode 100644 plugins/in_process_exporter_metrics/pe.c create mode 100644 plugins/in_process_exporter_metrics/pe.h create mode 100644 plugins/in_process_exporter_metrics/pe_config.c create mode 100644 plugins/in_process_exporter_metrics/pe_config.h create mode 100644 plugins/in_process_exporter_metrics/pe_process.c create mode 100644 plugins/in_process_exporter_metrics/pe_process.h create mode 100644 plugins/in_process_exporter_metrics/pe_utils.c create mode 100644 plugins/in_process_exporter_metrics/pe_utils.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 18af5099d6c..8007e617a4c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -209,6 +209,7 @@ option(FLB_IN_OPENTELEMETRY "Enable OpenTelemetry input plugin" option(FLB_IN_ELASTICSEARCH "Enable Elasticsearch (Bulk API) input plugin" Yes) option(FLB_IN_CALYPTIA_FLEET "Enable Calyptia Fleet input plugin" Yes) option(FLB_IN_SPLUNK "Enable Splunk HTTP HEC input plugin" Yes) +option(FLB_IN_PROCESS_EXPORTER_METRICS "Enable process exporter metrics input plugin" Yes) option(FLB_OUT_AZURE "Enable Azure output plugin" Yes) option(FLB_OUT_AZURE_BLOB "Enable Azure output plugin" Yes) option(FLB_OUT_AZURE_LOGS_INGESTION "Enable Azure Logs Ingestion output plugin" Yes) diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index df100ea3b73..88f2ed40da4 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -206,6 +206,7 @@ if(${CMAKE_SYSTEM_NAME} MATCHES "Linux") REGISTER_IN_PLUGIN("in_docker") REGISTER_IN_PLUGIN("in_docker_events") REGISTER_IN_PLUGIN("in_podman_metrics") + REGISTER_IN_PLUGIN("in_process_exporter_metrics") endif() if(${CMAKE_SYSTEM_NAME} MATCHES "Linux" OR ${CMAKE_SYSTEM_NAME} MATCHES "Darwin") diff --git a/plugins/in_process_exporter_metrics/CMakeLists.txt b/plugins/in_process_exporter_metrics/CMakeLists.txt new file mode 100644 index 00000000000..e322d9b9dff --- /dev/null +++ b/plugins/in_process_exporter_metrics/CMakeLists.txt @@ -0,0 +1,8 @@ +set(src + pe.c + pe_config.c + pe_process.c + pe_utils.c + ) + +FLB_PLUGIN(in_process_exporter_metrics "${src}" "") \ No newline at end of file diff --git a/plugins/in_process_exporter_metrics/pe.c b/plugins/in_process_exporter_metrics/pe.c new file mode 100644 index 00000000000..3ab79011836 --- /dev/null +++ b/plugins/in_process_exporter_metrics/pe.c @@ -0,0 +1,174 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "pe.h" +#include "pe_config.h" + +#include "pe_process.h" + +static void update_metrics(struct flb_input_instance *ins, struct flb_pe *ctx) +{ + pe_process_update(ctx); +} + +/* + * Update the metrics, this function is invoked every time 'scrape_interval' + * expires. + */ +static int cb_pe_collect(struct flb_input_instance *ins, + struct flb_config *config, void *in_context) +{ + int ret; + struct flb_pe *ctx = in_context; + + update_metrics(ins, ctx); + + /* Append the updated metrics */ + ret = flb_input_metrics_append(ins, NULL, 0, ctx->cmt); + if (ret != 0) { + flb_plg_error(ins, "could not append metrics"); + } + + return 0; +} + +static int in_pe_init(struct flb_input_instance *in, + struct flb_config *config, void *data) +{ + int ret; + struct flb_pe *ctx; + + /* Create plugin context */ + ctx = flb_pe_config_create(in, config); + if (!ctx) { + flb_errno(); + return -1; + } + + /* Initialize fds */ + ctx->coll_fd = -1; + + /* Associate context with the instance */ + flb_input_set_context(in, ctx); + + /* Create the collector */ + ret = flb_input_set_collector_time(in, + cb_pe_collect, + ctx->scrape_interval, 0, + config); + if (ret == -1) { + flb_plg_error(ctx->ins, + "could not set collector for Node Exporter Metrics plugin"); + return -1; + } + ctx->coll_fd = ret; + + /* Initialize process metric collectors */ + pe_process_init(ctx); + + update_metrics(in, ctx); + + return 0; +} + +static int in_pe_exit(void *data, struct flb_config *config) +{ + struct flb_pe *ctx = data; + + if (!ctx) { + return 0; + } + + pe_process_exit(ctx); + + flb_pe_config_destroy(ctx); + + return 0; +} + +static void in_pe_pause(void *data, struct flb_config *config) +{ + struct flb_pe *ctx = data; + + flb_input_collector_pause(ctx->coll_fd, ctx->ins); +} + +static void in_pe_resume(void *data, struct flb_config *config) +{ + struct flb_pe *ctx = data; + + flb_input_collector_resume(ctx->coll_fd, ctx->ins); +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_TIME, "scrape_interval", "5", + 0, FLB_TRUE, offsetof(struct flb_pe, scrape_interval), + "scrape interval to collect metrics from the node." + }, + + { + FLB_CONFIG_MAP_STR, "path.procfs", "/proc", + 0, FLB_TRUE, offsetof(struct flb_pe, path_procfs), + "procfs mount point" + }, + + { + FLB_CONFIG_MAP_STR, "process_include_pattern", ".+", + 0, FLB_TRUE, offsetof(struct flb_pe, process_regex_include_list_text), + "include list regular expression" + }, + + { + FLB_CONFIG_MAP_STR, "process_exclude_pattern", NULL, + 0, FLB_TRUE, offsetof(struct flb_pe, process_regex_exclude_list_text), + "exclude list regular expression" + }, + + { + FLB_CONFIG_MAP_CLIST, "metrics", + PE_DEFAULT_ENABLED_METRICS, + 0, FLB_TRUE, offsetof(struct flb_pe, metrics), + "Comma separated list of keys to enable metrics." + }, + + /* EOF */ + {0} +}; + +struct flb_input_plugin in_process_exporter_metrics_plugin = { + .name = "process_exporter_metrics", + .description = "Process Exporter Metrics (Prometheus Compatible)", + .cb_init = in_pe_init, + .cb_pre_run = NULL, + .cb_collect = cb_pe_collect, + .cb_flush_buf = NULL, + .config_map = config_map, + .cb_pause = in_pe_pause, + .cb_resume = in_pe_resume, + .cb_exit = in_pe_exit, + .flags = FLB_INPUT_THREADED +}; diff --git a/plugins/in_process_exporter_metrics/pe.h b/plugins/in_process_exporter_metrics/pe.h new file mode 100644 index 00000000000..96bc37b96e2 --- /dev/null +++ b/plugins/in_process_exporter_metrics/pe.h @@ -0,0 +1,89 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_PROCESS_EXPORTER_H +#define FLB_PROCESS_EXPORTER_H + +/* utils: scan content type expected */ +#define NE_SCAN_FILE 1 +#define NE_SCAN_DIR 2 + +#include +#include +#include +#include +#include + +#define PE_DEFAULT_ENABLED_METRICS "cpu,io,memory,state,context_switches,fd,start_time,thread_wchan,thread" + +#define METRIC_CPU (1 << 0) +#define METRIC_IO (1 << 1) +#define METRIC_MEMORY (1 << 2) +#define METRIC_STATE (1 << 3) +#define METRIC_CTXT (1 << 4) +#define METRIC_FD (1 << 5) +#define METRIC_START_TIME (1 << 6) +#define METRIC_THREAD_WCHAN (1 << 7) +#define METRIC_THREAD (1 << 8) + +struct flb_pe { + /* configuration */ + flb_sds_t path_procfs; + int scrape_interval; + + int coll_fd; /* collector fd */ + struct cmt *cmt; /* cmetrics context */ + struct flb_input_instance *ins; /* input instance */ + struct mk_list *metrics; /* enabled metrics */ + int enabled_flag; /* indicate enabled metrics */ + + /* + * Metrics Contexts + * ---------------- + */ + + /* process */ + struct cmt_gauge *memory_bytes; + struct cmt_gauge *start_time; + struct cmt_gauge *open_fds; + struct cmt_gauge *fd_ratio; + struct cmt_counter *cpu_seconds; + struct cmt_counter *read_bytes; + struct cmt_counter *write_bytes; + struct cmt_counter *major_page_faults; + struct cmt_counter *minor_page_faults; + struct cmt_counter *context_switches; + struct cmt_gauge *num_threads; + struct cmt_gauge *states; + + /* thread */ + struct cmt_gauge *thread_wchan; + struct cmt_counter *thread_cpu_seconds; + struct cmt_counter *thread_io_bytes; + struct cmt_counter *thread_major_page_faults; + struct cmt_counter *thread_minor_page_faults; + struct cmt_counter *thread_context_switches; + + flb_sds_t process_regex_include_list_text; + flb_sds_t process_regex_exclude_list_text; + struct flb_regex *process_regex_include_list; + struct flb_regex *process_regex_exclude_list; +}; + +#endif diff --git a/plugins/in_process_exporter_metrics/pe_config.c b/plugins/in_process_exporter_metrics/pe_config.c new file mode 100644 index 00000000000..9d8fb2a4420 --- /dev/null +++ b/plugins/in_process_exporter_metrics/pe_config.c @@ -0,0 +1,117 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "pe.h" + +struct flb_pe *flb_pe_config_create(struct flb_input_instance *ins, + struct flb_config *config) +{ + int ret; + struct flb_pe *ctx; + struct mk_list *head; + struct flb_slist_entry *entry; + + ctx = flb_calloc(1, sizeof(struct flb_pe)); + if (!ctx) { + flb_errno(); + return NULL; + } + ctx->ins = ins; + ctx->process_regex_include_list = NULL; + ctx->process_regex_exclude_list = NULL; + + /* Load the config map */ + ret = flb_input_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_free(ctx); + return NULL; + } + + /* Check and initialize enabled metrics */ + if (ctx->metrics) { + mk_list_foreach(head, ctx->metrics) { + entry = mk_list_entry(head, struct flb_slist_entry, _head); + if (strncasecmp(entry->str, "cpu", 3) == 0) { + ctx->enabled_flag |= METRIC_CPU; + flb_plg_debug(ctx->ins, "enabled metrics %s", entry->str); + } + else if (strncasecmp(entry->str, "io", 2) == 0) { + ctx->enabled_flag |= METRIC_IO; + flb_plg_debug(ctx->ins, "enabled metrics %s", entry->str); + } + else if (strncasecmp(entry->str, "memory", 6) == 0) { + ctx->enabled_flag |= METRIC_MEMORY; + flb_plg_debug(ctx->ins, "enabled metrics %s", entry->str); + } + else if (strncasecmp(entry->str, "state", 5) == 0) { + ctx->enabled_flag |= METRIC_STATE; + flb_plg_debug(ctx->ins, "enabled metrics %s", entry->str); + } + else if (strncasecmp(entry->str, "context_switches", 16) == 0) { + ctx->enabled_flag |= METRIC_CTXT; + flb_plg_debug(ctx->ins, "enabled metrics %s", entry->str); + } + else if (strncasecmp(entry->str, "fd", 2) == 0) { + ctx->enabled_flag |= METRIC_FD; + flb_plg_debug(ctx->ins, "enabled metrics %s", entry->str); + } + else if (strncasecmp(entry->str, "start_time", 9) == 0) { + ctx->enabled_flag |= METRIC_START_TIME; + flb_plg_debug(ctx->ins, "enabled metrics %s", entry->str); + } + else if (strncasecmp(entry->str, "thread_wchan", 12) == 0) { + ctx->enabled_flag |= METRIC_THREAD_WCHAN; + flb_plg_debug(ctx->ins, "enabled metrics %s", entry->str); + } + else if (strncasecmp(entry->str, "thread", 6) == 0) { + ctx->enabled_flag |= METRIC_THREAD; + flb_plg_debug(ctx->ins, "enabled metrics %s", entry->str); + } + else { + flb_plg_warn(ctx->ins, "Unknown metrics: %s", entry->str); + } + } + } + + /* mount points */ + flb_plg_info(ins, "path.procfs = %s", ctx->path_procfs); + + ctx->cmt = cmt_create(); + if (!ctx->cmt) { + flb_plg_error(ins, "could not initialize CMetrics"); + flb_free(ctx); + return NULL; + } + + return ctx; +} + +void flb_pe_config_destroy(struct flb_pe *ctx) +{ + if (!ctx) { + return; + } + + if (ctx->cmt) { + cmt_destroy(ctx->cmt); + } + + flb_free(ctx); +} diff --git a/plugins/in_process_exporter_metrics/pe_config.h b/plugins/in_process_exporter_metrics/pe_config.h new file mode 100644 index 00000000000..3326c17ebeb --- /dev/null +++ b/plugins/in_process_exporter_metrics/pe_config.h @@ -0,0 +1,31 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_PE_CONFIG_H +#define FLB_PE_CONFIG_H + +#include +#include "pe.h" + +struct flb_pe *flb_pe_config_create(struct flb_input_instance *ins, + struct flb_config *config); + +void flb_pe_config_destroy(struct flb_pe *ctx); + +#endif diff --git a/plugins/in_process_exporter_metrics/pe_process.c b/plugins/in_process_exporter_metrics/pe_process.c new file mode 100644 index 00000000000..fdfb787eb35 --- /dev/null +++ b/plugins/in_process_exporter_metrics/pe_process.c @@ -0,0 +1,1192 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "pe.h" +#include "pe_utils.h" + +#include +#include + +#define USER_HZ 100 + +static int process_configure(struct flb_pe *ctx) +{ + struct cmt_gauge *g; + struct cmt_counter *c; + + /* Initialize regex for the list of including process */ + ctx->process_regex_include_list = flb_regex_create(ctx->process_regex_include_list_text); + if (!ctx->process_regex_include_list) { + flb_plg_error(ctx->ins, + "could not initialize regex pattern for the list of including " + "process: '%s'", + ctx->process_regex_include_list_text); + return -1; + } + + /* Initialize regex for the list of excluding process */ + if (ctx->process_regex_exclude_list_text != NULL) { + ctx->process_regex_exclude_list = flb_regex_create(ctx->process_regex_exclude_list_text); + if (!ctx->process_regex_exclude_list) { + flb_plg_error(ctx->ins, + "could not initialize regex pattern for the list of excluding " + "process: '%s'", + ctx->process_regex_exclude_list_text); + return -1; + } + } + + /* process_cpu_seconds_total */ + c = cmt_counter_create(ctx->cmt, "process", "", "cpu_seconds_total", + "CPU usage in seconds", + 4, (char *[]){"name", "pid", "ppid", "mode"}); + if (!c) { + return -1; + } + ctx->cpu_seconds = c; + + /* process_read_bytes_total */ + c = cmt_counter_create(ctx->cmt, "process", "", "read_bytes_total", + "number of bytes read", + 3, (char *[]){"name", "pid", "ppid"}); + if (!c) { + return -1; + } + ctx->read_bytes = c; + + /* process_write_bytes_total */ + c = cmt_counter_create(ctx->cmt, "process", "", "write_bytes_total", + "number of bytes write", + 3, (char *[]){"name", "pid", "ppid"}); + if (!c) { + return -1; + } + ctx->write_bytes = c; + + /* process_major_page_faults_total */ + c = cmt_counter_create(ctx->cmt, "process", "", "major_page_faults_total", + "Major page fault", + 3, (char *[]){"name", "pid", "ppid"}); + if (!c) { + return -1; + } + ctx->major_page_faults = c; + + /* process_minor_page_faults_total */ + c = cmt_counter_create(ctx->cmt, "process", "", "minor_page_faults_total", + "Minor page fault", + 3, (char *[]){"name", "pid", "ppid"}); + if (!c) { + return -1; + } + ctx->minor_page_faults = c; + + /* process_context_switches_total */ + c = cmt_counter_create(ctx->cmt, "process", "", "context_switches_total", + "Context switches", + 3, (char *[]){"name", "pid", "context_switch_type"}); + if (!c) { + return -1; + } + ctx->context_switches = c; + + /* process_memory_bytes */ + g = cmt_gauge_create(ctx->cmt, "process", "", "memory_bytes", + "number of bytes of memory in use per type (VirtualMemory, RSS)", + 4, (char *[]){"name", "pid", "ppid", "type"}); + if (!g) { + return -1; + } + ctx->memory_bytes = g; + + /* process_open_filedesc */ + g = cmt_gauge_create(ctx->cmt, "process", "", "open_filedesc", + "number of open file descriptors", + 3, (char *[]){"name", "pid", "ppid"}); + if (!g) { + return -1; + } + ctx->open_fds = g; + + /* process_fd_ratio */ + g = cmt_gauge_create(ctx->cmt, "process", "", "fd_ratio", + "the ratio between open fds and max fds", + 3, (char *[]){"name", "pid", "ppid"}); + if (!g) { + return -1; + } + ctx->fd_ratio = g; + + /* process_start_time_seconds */ + g = cmt_gauge_create(ctx->cmt, "process", "", "start_time_seconds", + "start time in seconds since 1970/01/01", + 3, (char *[]){"name", "pid", "ppid"}); + if (!g) { + return -1; + } + ctx->start_time = g; + + /* process_num_threads */ + g = cmt_gauge_create(ctx->cmt, "process", "", "num_threads", + "Number of threads", + 3, (char *[]){"name", "pid", "ppid"}); + if (!g) { + return -1; + } + ctx->num_threads = g; + + /* process_states */ + g = cmt_gauge_create(ctx->cmt, "process", "", "states", + "Process in states Running, Sleeping, Waiting, Zombie, or Other", + 4, (char *[]){"name", "pid", "ppid", "state"}); + if (!g) { + return -1; + } + ctx->states = g; + + /* + * Thread metrics + */ + + /* process_thread_wchan */ + g = cmt_gauge_create(ctx->cmt, "process", "", "thread_wchan", + "Number of threads in this process waiting on each wchan", + 3, (char *[]){"name", "pid", "wchan"}); + if (!g) { + return -1; + } + ctx->thread_wchan = g; + + /* process_thread_cpu_seconds_total */ + c = cmt_counter_create(ctx->cmt, "process", "", "thread_cpu_seconds_total", + "CPU user/system usage in seconds with the same threadname", + 4, (char *[]){"name", "threadname", "thread_id", "mode"}); + if (!c) { + return -1; + } + ctx->thread_cpu_seconds = c; + + /* process_thread_io_bytes_total */ + c = cmt_counter_create(ctx->cmt, "process", "", "thread_io_bytes_total", + "number of bytes read/written by these threads", + 4, (char *[]){"name", "threadname", "thread_id", "iomode"}); + if (!c) { + return -1; + } + ctx->thread_io_bytes = c; + + /* process_thread_major_page_faults_total */ + c = cmt_counter_create(ctx->cmt, "process", "", "thread_major_page_faults_total", + "Major page fault for these threads", + 3, (char *[]){"name", "threadname", "thread_id"}); + if (!c) { + return -1; + } + ctx->thread_major_page_faults = c; + + /* process_thread_minor_page_faults_total */ + c = cmt_counter_create(ctx->cmt, "process", "", "thread_minor_page_faults_total", + "Minor page fault for these threads", + 3, (char *[]){"name", "threadname", "thread_id"}); + if (!c) { + return -1; + } + ctx->thread_minor_page_faults = c; + + /* process_thread_context_switches_total */ + c = cmt_counter_create(ctx->cmt, "process", "", "thread_context_switches_total", + "Context switches", + 4, (char *[]){"name", "threadname", "thread_id", "context_switch_type"}); + if (!c) { + return -1; + } + ctx->thread_context_switches = c; + + return 0; +} + +int pe_process_init(struct flb_pe *ctx) +{ + process_configure(ctx); + + return 0; +} + +struct proc_state { + int64_t running; + int64_t interruptible_sleeping; + int64_t uninterruptible_sleeping; + int64_t zombie; + int64_t stopped; + int64_t idle; +}; + +static int update_process_proc_state(struct flb_pe *ctx, struct proc_state *state, char* state_str) +{ + if (strcmp(state_str, "R") == 0) { + state->running++; + } + else if (strcmp(state_str, "S") == 0) { + state->interruptible_sleeping++; + } + else if (strcmp(state_str, "D") == 0) { + state->uninterruptible_sleeping++; + } + else if (strcmp(state_str, "Z") == 0) { + state->zombie++; + } + else if (strcmp(state_str, "T") == 0) { + state->stopped++; + } + else if (strcmp(state_str, "I") == 0) { + state->idle++; + } + + return 0; +} + +static void reset_proc_state(struct proc_state *state) { + state->running = 0; + state->interruptible_sleeping = 0; + state->uninterruptible_sleeping = 0; + state->zombie = 0; + state->stopped = 0; + state->idle = 0; +} + +static int check_path_for_proc(struct flb_pe *ctx, const char *prefix, const char *path) +{ + int len; + flb_sds_t p; + + /* Compose the proc path */ + p = flb_sds_create(prefix); + if (!p) { + return -1; + } + + if (path) { + flb_sds_cat_safe(&p, "/", 1); + len = strlen(path); + flb_sds_cat_safe(&p, path, len); + } + + if (access(p, F_OK) == -1 && + (errno == ENOENT || errno == ESRCH)) { + flb_sds_destroy(p); + + return -1; + } + + flb_sds_destroy(p); + return 0; +} + +static int get_name(const char *entry, char **out_name, char *id_entry) +{ + flb_sds_t tmp; + flb_sds_t tmp_name; + + tmp = strdup(entry); + tmp_name = strtok(tmp, ")"); + if (tmp_name == NULL) { + return -1; + } + + *out_name = strdup(tmp_name + strlen(id_entry) + 2); + flb_free(tmp); + + return 0; +} + +static int process_proc_thread_io(struct flb_pe *ctx, uint64_t ts, + flb_sds_t name, flb_sds_t thread_name, flb_sds_t thread_id, + struct flb_slist_entry *thread) +{ + int ret; + flb_sds_t tmp; + flb_sds_t status; + uint64_t val; + struct mk_list io_list; + struct mk_list *ihead; + struct flb_slist_entry *entry; + + if (check_path_for_proc(ctx, thread->str, "io") != 0) { + return -1; + } + + mk_list_init(&io_list); + ret = pe_utils_file_read_lines(thread->str, "/io", &io_list); + if (ret == -1) { + return -1; + } + + mk_list_foreach(ihead, &io_list) { + entry = mk_list_entry(ihead, struct flb_slist_entry, _head); + + if (strncmp("read_bytes", entry->str, 10) == 0) { + tmp = strstr(entry->str, ":"); + if (tmp == NULL) { + continue; + } + status = flb_sds_create_len(tmp+1, strlen(tmp+1)); + flb_sds_trim(status); + /* Collect the number of minor page faults per process */ + if (pe_utils_str_to_uint64(status, &val) != -1) { + cmt_counter_set(ctx->thread_io_bytes, ts, val, 4, (char *[]){ name, thread_name, thread_id, "read" }); + } + flb_sds_destroy(status); + } + + if (strncmp("write_bytes", entry->str, 11) == 0) { + tmp = strstr(entry->str, ":"); + if (tmp == NULL) { + continue; + } + status = flb_sds_create_len(tmp+1, strlen(tmp+1)); + flb_sds_trim(status); + /* Collect the number of minor page faults per process */ + if (pe_utils_str_to_uint64(status, &val) != -1) { + cmt_counter_set(ctx->thread_io_bytes, ts, val, 4, (char *[]){ name, thread_name, thread_id, "write" }); + } + flb_sds_destroy(status); + } + } + flb_slist_destroy(&io_list); + + return 0; +} + +static int process_proc_thread_status(struct flb_pe *ctx, uint64_t ts, + flb_sds_t thread_name, flb_sds_t thread_id, + struct flb_slist_entry *thread) +{ + int ret; + flb_sds_t tmp; + flb_sds_t name; + flb_sds_t status; + uint64_t val; + struct mk_list status_list; + struct mk_list *shead; + struct flb_slist_entry *entry; + int include_flag = FLB_FALSE; + + if (check_path_for_proc(ctx, thread->str, "status") != 0) { + return -1; + } + + mk_list_init(&status_list); + ret = pe_utils_file_read_lines(thread->str, "/status", &status_list); + if (ret == -1) { + return -1; + } + + mk_list_foreach(shead, &status_list) { + entry = mk_list_entry(shead, struct flb_slist_entry, _head); + + if (strncmp("Name", entry->str, 4) == 0) { + tmp = strstr(entry->str, ":"); + if (tmp == NULL) { + continue; + } + name = flb_sds_create_len(tmp+1, strlen(tmp+1)); + flb_sds_trim(name); + + /* Check for regexes */ + if (ctx->process_regex_include_list != NULL) { + include_flag = flb_regex_match(ctx->process_regex_include_list, + (unsigned char *) name, + flb_sds_len(name)); + } + else { + include_flag = FLB_TRUE; + } + + if (!include_flag) { + goto cleanup; + } + + if (ctx->process_regex_exclude_list != NULL) { + include_flag = !flb_regex_match(ctx->process_regex_exclude_list, + (unsigned char *) name, + flb_sds_len(name)); + } + else { + include_flag = FLB_TRUE; + } + + if (!include_flag) { + goto cleanup; + } + } + + if (strncmp("voluntary_ctxt_switches", entry->str, 23) == 0) { + tmp = strstr(entry->str, ":"); + if (tmp == NULL) { + continue; + } + status = flb_sds_create_len(tmp+1, strlen(tmp+1)); + flb_sds_trim(status); + /* Collect the number of minor page faults per process */ + if (pe_utils_str_to_uint64(status, &val) != -1) { + cmt_counter_set(ctx->thread_context_switches, ts, val, + 4, (char *[]){ name, thread_name, thread_id, "voluntary_ctxt_switches" }); + } + flb_sds_destroy(status); + } + + if (strncmp("nonvoluntary_ctxt_switches", entry->str, 26) == 0) { + tmp = strstr(entry->str, ":"); + if (tmp == NULL) { + continue; + } + status = flb_sds_create_len(tmp+1, strlen(tmp+1)); + flb_sds_trim(status); + /* Collect the number of minor page faults per process */ + if (pe_utils_str_to_uint64(status, &val) != -1) { + cmt_counter_set(ctx->thread_context_switches, ts, val, + 4, (char *[]){ name, thread_name, thread_id, "nonvoluntary_ctxt_switches" }); + } + flb_sds_destroy(status); + } + } + +cleanup: + flb_sds_destroy(name); + flb_slist_destroy(&status_list); + + return 0; +} + +static int process_thread_update(struct flb_pe *ctx, uint64_t ts, flb_sds_t pid, flb_sds_t name) +{ + int ret; + flb_sds_t tmp; + flb_sds_t thread_name; + flb_sds_t tid_str; + uint64_t val; + const char *pattern = "/[0-9]*"; + struct mk_list *head; + struct mk_list *ehead; + struct mk_list thread_list; + struct mk_list stat_list; + struct mk_list split_list; + struct flb_slist_entry *thread; + struct flb_slist_entry *entry; + char thread_procfs[PATH_MAX]; + + snprintf(thread_procfs, sizeof(thread_procfs) - 1, "%s/%s/task", ctx->path_procfs, pid); + + /* scan thread entries */ + ret = pe_utils_path_scan(ctx, thread_procfs, pattern, NE_SCAN_DIR, &thread_list); + if (ret != 0) { + return -1; + } + + if (mk_list_size(&thread_list) == 0) { + return 0; + } + + /* thread entries */ + mk_list_foreach(head, &thread_list) { + thread = mk_list_entry(head, struct flb_slist_entry, _head); + tid_str = thread->str + strlen(thread_procfs) + 1; + + /* When pid and tid are equal, the state of the thread should be the same + * for pid's. */ + if (strcmp(tid_str, pid) == 0) { + continue; + } + + if (check_path_for_proc(ctx, thread->str, "stat") != 0) { + continue; + } + + mk_list_init(&stat_list); + ret = pe_utils_file_read_lines(thread->str, "/stat", &stat_list); + if (ret == -1) { + continue; + } + + mk_list_foreach(ehead, &stat_list) { + entry = mk_list_entry(ehead, struct flb_slist_entry, _head); + + if (get_name(entry->str, &thread_name, tid_str) != 0) { + continue; + } + + /* split with the close parenthesis. + * The entry of processes stat will start after that. */ + tmp = strstr(entry->str, ")"); + if (tmp == NULL) { + continue; + } + + mk_list_init(&split_list); + ret = flb_slist_split_string(&split_list, tmp+2, ' ', -1); + if (ret == -1) { + continue; + } + + /* Thread CPU Seconds (user) */ + entry = flb_slist_entry_get(&split_list, 11); + tmp = entry->str; + /* Collect the number of cpu_seconds per process */ + if (pe_utils_str_to_uint64(tmp, &val) != -1) { + cmt_counter_set(ctx->thread_cpu_seconds, ts, val/USER_HZ, 4, (char *[]){ name, thread_name, tid_str, "user" }); + } + + /* Thread CPU Seconds (system) */ + entry = flb_slist_entry_get(&split_list, 12); + tmp = entry->str; + /* Collect the number of cpu_seconds per process */ + if (pe_utils_str_to_uint64(tmp, &val) != -1) { + cmt_counter_set(ctx->thread_cpu_seconds, ts, val/USER_HZ, 4, (char *[]){ name, thread_name, tid_str, "system" }); + } + + /* Thread Major Page Faults */ + entry = flb_slist_entry_get(&split_list, 9); + tmp = entry->str; + /* Collect the number of major page faults per process */ + if (pe_utils_str_to_uint64(tmp, &val) != -1) { + cmt_counter_set(ctx->thread_major_page_faults, ts, val, 3, (char *[]){ name, thread_name, tid_str }); + } + + /* Thread Minor Page Faults */ + entry = flb_slist_entry_get(&split_list, 7); + tmp = entry->str; + /* Collect the number of minor page faults per process */ + if (pe_utils_str_to_uint64(tmp, &val) != -1) { + cmt_counter_set(ctx->thread_minor_page_faults, ts, val, 3, (char *[]){ name, thread_name, tid_str }); + } + + ret = process_proc_thread_io(ctx, ts, + name, thread_name, tid_str, + thread); + if (ret == -1) { + goto cleanup; + } + + ret = process_proc_thread_status(ctx, ts, + thread_name, tid_str, + thread); + if (ret == -1) { + goto cleanup; + } + + cleanup: + /* Teardown */ + flb_free(thread_name); + flb_slist_destroy(&split_list); + } + flb_slist_destroy(&stat_list); + } + + flb_slist_destroy(&thread_list); + + return 0; +} + +static int process_proc_wchan(struct flb_pe *ctx, uint64_t ts, + flb_sds_t pid, flb_sds_t name, struct flb_slist_entry *process) +{ + int ret; + struct mk_list wchan_list; + struct mk_list *whead; + struct flb_slist_entry *entry; + + if (check_path_for_proc(ctx, process->str, "wchan") != 0) { + return -1; + } + + /* Collect wchan status */ + mk_list_init(&wchan_list); + ret = pe_utils_file_read_lines(process->str, "/wchan", &wchan_list); + if (ret == -1) { + return -1; + } + + mk_list_foreach(whead, &wchan_list) { + entry = mk_list_entry(whead, struct flb_slist_entry, _head); + if (strcmp("0", entry->str) == 0 || + strcmp("", entry->str) == 0) { + cmt_gauge_set(ctx->thread_wchan, ts, 1, 3, (char *[]) { name, pid, "" }); + } + else { + cmt_gauge_set(ctx->thread_wchan, ts, 1, 3, (char *[]) { name, pid, entry->str }); + } + } + flb_slist_destroy(&wchan_list); + + return 0; +} + +static int process_proc_io(struct flb_pe *ctx, uint64_t ts, + flb_sds_t pid, flb_sds_t ppid, flb_sds_t name, + struct flb_slist_entry *process) +{ + int ret; + flb_sds_t tmp; + flb_sds_t status; + uint64_t val; + struct mk_list io_list; + struct mk_list *ihead; + struct flb_slist_entry *entry; + + if (check_path_for_proc(ctx, process->str, "io") != 0) { + return -1; + } + + mk_list_init(&io_list); + ret = pe_utils_file_read_lines(process->str, "/io", &io_list); + if (ret == -1) { + return -1; + } + + mk_list_foreach(ihead, &io_list) { + entry = mk_list_entry(ihead, struct flb_slist_entry, _head); + + if (strncmp("read_bytes", entry->str, 10) == 0) { + tmp = strstr(entry->str, ":"); + if (tmp == NULL) { + continue; + } + status = flb_sds_create_len(tmp+1, strlen(tmp+1)); + flb_sds_trim(status); + /* Collect the number of minor page faults per process */ + if (pe_utils_str_to_uint64(status, &val) != -1) { + cmt_counter_set(ctx->read_bytes, ts, val, 3, (char *[]){ name, pid, ppid }); + } + flb_sds_destroy(status); + } + + if (strncmp("write_bytes", entry->str, 11) == 0) { + tmp = strstr(entry->str, ":"); + if (tmp == NULL) { + continue; + } + status = flb_sds_create_len(tmp+1, strlen(tmp+1)); + flb_sds_trim(status); + /* Collect the number of minor page faults per process */ + if (pe_utils_str_to_uint64(status, &val) != -1) { + cmt_counter_set(ctx->write_bytes, ts, val, 3, (char *[]){ name, pid, ppid }); + } + flb_sds_destroy(status); + } + } + flb_slist_destroy(&io_list); + + return 0; +} + +static int process_proc_limit_fd(struct flb_pe *ctx, flb_sds_t pid, + struct flb_slist_entry *process, + uint64_t *out_val) +{ + int ret; + uint64_t val; + flb_sds_t status; + struct mk_list limits_list; + struct mk_list split_list; + struct mk_list *lhead; + struct flb_slist_entry *entry; + struct flb_slist_entry *limit; + + mk_list_init(&limits_list); + ret = pe_utils_file_read_lines(process->str, "/limits", &limits_list); + if (ret == -1) { + return -1; + } + + mk_list_foreach(lhead, &limits_list) { + entry = mk_list_entry(lhead, struct flb_slist_entry, _head); + + mk_list_init(&split_list); + if (strncmp("Max open files", entry->str, 14) == 0) { + ret = flb_slist_split_string(&split_list, entry->str, ' ', -1); + if (ret == -1) { + continue; + } + + limit = flb_slist_entry_get(&split_list, 4); + status = flb_sds_create_len(limit->str, strlen(limit->str)); + flb_sds_trim(status); + /* Collect the limit of max open files */ + if (pe_utils_str_to_uint64(status, &val) != -1) { + *out_val = val; + } + flb_sds_destroy(status); + flb_slist_destroy(&split_list); + } + } + flb_slist_destroy(&limits_list); + + return 0; +} + +static int process_proc_fds(struct flb_pe *ctx, uint64_t ts, + flb_sds_t pid, flb_sds_t ppid, flb_sds_t name, + struct flb_slist_entry *process) +{ + int ret; + size_t fds = 0; + uint64_t max_fd = 0; + DIR *dir; + struct dirent *ent; + char fd_procfs[PATH_MAX] = {0}; + + snprintf(fd_procfs, sizeof(fd_procfs) - 1, "%s/%s", process->str, "fd"); + dir = opendir(fd_procfs); + if (dir == NULL && errno == EACCES) { + flb_plg_debug(ctx->ins, "NO read access for path: %s", fd_procfs); + return -1; + } + + while ((ent = readdir(dir)) != NULL) { + if (ent->d_type == DT_LNK) { + fds++; + } + } + closedir(dir); + + cmt_gauge_set(ctx->open_fds, ts, (double)fds, 3, (char *[]){ name, pid, ppid }); + + ret = process_proc_limit_fd(ctx, pid, process, &max_fd); + if (ret != -1) { + cmt_gauge_set(ctx->fd_ratio, ts, (double)fds/max_fd, 3, (char *[]){ name, pid, ppid }); + } + + return 0; +} + +static int process_proc_status(struct flb_pe *ctx, uint64_t ts, flb_sds_t pid, struct flb_slist_entry *process) +{ + int ret; + flb_sds_t tmp; + flb_sds_t name; + flb_sds_t status; + uint64_t val; + struct mk_list status_list; + struct mk_list *shead; + struct flb_slist_entry *entry; + int include_flag = FLB_FALSE; + + if (check_path_for_proc(ctx, process->str, "status") != 0) { + return -1; + } + + mk_list_init(&status_list); + ret = pe_utils_file_read_lines(process->str, "/status", &status_list); + if (ret == -1) { + return -1; + } + + mk_list_foreach(shead, &status_list) { + entry = mk_list_entry(shead, struct flb_slist_entry, _head); + + if (strncmp("Name", entry->str, 4) == 0) { + tmp = strstr(entry->str, ":"); + if (tmp == NULL) { + continue; + } + name = flb_sds_create_len(tmp+1, strlen(tmp+1)); + flb_sds_trim(name); + + /* Check for regexes */ + if (ctx->process_regex_include_list != NULL) { + include_flag = flb_regex_match(ctx->process_regex_include_list, + (unsigned char *) name, + flb_sds_len(name)); + } + else { + include_flag = FLB_TRUE; + } + + if (!include_flag) { + goto cleanup; + } + + if (ctx->process_regex_exclude_list != NULL) { + include_flag = !flb_regex_match(ctx->process_regex_exclude_list, + (unsigned char *) name, + flb_sds_len(name)); + } + else { + include_flag = FLB_TRUE; + } + + if (!include_flag) { + goto cleanup; + } + } + + if (strncmp("voluntary_ctxt_switches", entry->str, 23) == 0) { + tmp = strstr(entry->str, ":"); + if (tmp == NULL) { + continue; + } + status = flb_sds_create_len(tmp+1, strlen(tmp+1)); + flb_sds_trim(status); + /* Collect the number of minor page faults per process */ + if (pe_utils_str_to_uint64(status, &val) != -1) { + cmt_counter_set(ctx->context_switches, ts, val, 3, (char *[]){ name, pid, "voluntary_ctxt_switches" }); + } + flb_sds_destroy(status); + } + + if (strncmp("nonvoluntary_ctxt_switches", entry->str, 26) == 0) { + tmp = strstr(entry->str, ":"); + if (tmp == NULL) { + continue; + } + status = flb_sds_create_len(tmp+1, strlen(tmp+1)); + flb_sds_trim(status); + /* Collect the number of minor page faults per process */ + if (pe_utils_str_to_uint64(status, &val) != -1) { + cmt_counter_set(ctx->context_switches, ts, val, 3, (char *[]){ name, pid, "nonvoluntary_ctxt_switches" }); + } + flb_sds_destroy(status); + } + } + +cleanup: + flb_sds_destroy(name); + flb_slist_destroy(&status_list); + + return 0; +} + +static int process_proc_boot_time(struct flb_pe *ctx, uint64_t *out_boot_time) +{ + int ret; + flb_sds_t tmp; + flb_sds_t status; + uint64_t val; + struct mk_list stat_list; + struct mk_list *rshead; + struct flb_slist_entry *entry; + + if (check_path_for_proc(ctx, ctx->path_procfs, "stat") != 0) { + return -1; + } + + mk_list_init(&stat_list); + ret = pe_utils_file_read_lines(ctx->path_procfs, "/stat", &stat_list); + if (ret == -1) { + return -1; + } + + mk_list_foreach(rshead, &stat_list) { + entry = mk_list_entry(rshead, struct flb_slist_entry, _head); + + if (strncmp("btime", entry->str, 5) == 0) { + tmp = strstr(entry->str, " "); + if (tmp == NULL) { + continue; + } + status = flb_sds_create_len(tmp+1, strlen(tmp+1)); + flb_sds_trim(status); + /* Collect the number of btime */ + if (pe_utils_str_to_uint64(status, &val) != -1) { + *out_boot_time = val; + } + flb_sds_destroy(status); + } + } + flb_slist_destroy(&stat_list); + + return 0; +} + +static int process_update(struct flb_pe *ctx) +{ + int ret; + flb_sds_t tmp; + flb_sds_t name; + flb_sds_t pid_str; + flb_sds_t state_str; + flb_sds_t ppid_str; + flb_sds_t thread_str; + struct mk_list *head; + struct mk_list *ehead; + struct mk_list procfs_list; + struct mk_list stat_list; + struct mk_list split_list; + struct flb_slist_entry *process; + struct flb_slist_entry *entry; + uint64_t val; + uint64_t ts; + const char *pattern = "/[0-9]*"; + struct proc_state pstate; + uint64_t boot_time = 0; + int include_flag = FLB_FALSE; + + mk_list_init(&procfs_list); + + ts = cfl_time_now(); + + /* scan pid entries */ + ret = pe_utils_path_scan(ctx, ctx->path_procfs, pattern, NE_SCAN_DIR, &procfs_list); + if (ret != 0) { + return -1; + } + + if (mk_list_size(&procfs_list) == 0) { + return 0; + } + + /* Collect boot_time (btime) */ + ret = process_proc_boot_time(ctx, &boot_time); + if (ret != 0) { + boot_time = 0; + } + + /* PID entries */ + mk_list_foreach(head, &procfs_list) { + process = mk_list_entry(head, struct flb_slist_entry, _head); + pid_str = process->str + strlen(ctx->path_procfs) + 1; + + if (check_path_for_proc(ctx, process->str, "stat") != 0) { + continue; + } + + mk_list_init(&stat_list); + ret = pe_utils_file_read_lines(process->str, "/stat", &stat_list); + if (ret == -1) { + continue; + } + + mk_list_foreach(ehead, &stat_list) { + entry = mk_list_entry(ehead, struct flb_slist_entry, _head); + + if (get_name(entry->str, &name, pid_str) != 0) { + continue; + } + + /* Check for regexes */ + if (ctx->process_regex_include_list != NULL) { + include_flag = flb_regex_match(ctx->process_regex_include_list, + (unsigned char *) name, + strlen(name)); + } + else { + include_flag = FLB_TRUE; + } + + if (!include_flag) { + flb_free(name); + + continue; + } + + if (ctx->process_regex_exclude_list != NULL) { + include_flag = !flb_regex_match(ctx->process_regex_exclude_list, + (unsigned char *) name, + strlen(name)); + } + else { + include_flag = FLB_TRUE; + } + + if (!include_flag) { + flb_free(name); + + continue; + } + + mk_list_init(&split_list); + + /* split with the close parenthesis. + * The entry of processes stat will start after that. */ + tmp = strstr(entry->str, ")"); + if (tmp == NULL) { + goto cleanup; + } + + ret = flb_slist_split_string(&split_list, tmp+2, ' ', -1); + if (ret == -1) { + goto cleanup; + } + + /* State */ + reset_proc_state(&pstate); + entry = flb_slist_entry_get(&split_list, 0); + state_str = entry->str; + update_process_proc_state(ctx, &pstate, state_str); + + entry = flb_slist_entry_get(&split_list, 1); + ppid_str = entry->str; + + /* State */ + if (ctx->enabled_flag & METRIC_STATE) { + /* node_processes_state + * Note: we don't use hash table for it. Because we need to update + * every state of the processes due to architecture reasons of cmetrics. + */ + cmt_gauge_set(ctx->states, ts, pstate.running, 4, (char *[]){ name, pid_str, ppid_str, "R" }); + cmt_gauge_set(ctx->states, ts, pstate.interruptible_sleeping, 4, (char *[]){ name, pid_str, ppid_str, "S" }); + cmt_gauge_set(ctx->states, ts, pstate.uninterruptible_sleeping, 4, (char *[]){ name, pid_str, ppid_str, "D" }); + cmt_gauge_set(ctx->states, ts, pstate.zombie, 4, (char *[]){ name, pid_str, ppid_str, "Z" }); + cmt_gauge_set(ctx->states, ts, pstate.stopped, 4, (char *[]){ name, pid_str, ppid_str, "T" }); + cmt_gauge_set(ctx->states, ts, pstate.idle, 4, (char *[]){ name, pid_str, ppid_str, "I" }); + } + + /* CPU */ + if (ctx->enabled_flag & METRIC_CPU) { + /* CPU Seconds (user) */ + entry = flb_slist_entry_get(&split_list, 11); + tmp = entry->str; + /* Collect the number of cpu_seconds per process */ + if (pe_utils_str_to_uint64(tmp, &val) != -1) { + cmt_counter_set(ctx->cpu_seconds, ts, val/USER_HZ, 4, (char *[]){ name, pid_str, ppid_str, "user" }); + } + + /* CPU Seconds (system) */ + entry = flb_slist_entry_get(&split_list, 12); + tmp = entry->str; + /* Collect the number of cpu_seconds per process */ + if (pe_utils_str_to_uint64(tmp, &val) != -1) { + cmt_counter_set(ctx->cpu_seconds, ts, val/USER_HZ, 4, (char *[]){ name, pid_str, ppid_str, "system" }); + } + } + + /* StartTime */ + if (ctx->enabled_flag & METRIC_START_TIME) { + entry = flb_slist_entry_get(&split_list, 19); + tmp = entry->str; + /* Collect the number of cpu_seconds per process */ + if (pe_utils_str_to_uint64(tmp, &val) != -1) { + cmt_gauge_set(ctx->start_time, ts, (boot_time + val/USER_HZ), 3, (char *[]){ name, pid_str, ppid_str }); + } + } + + /* Threads */ + if (ctx->enabled_flag & METRIC_THREAD) { + entry = flb_slist_entry_get(&split_list, 17); + thread_str = entry->str; + /* Collect the number of threads per process */ + if (pe_utils_str_to_uint64(thread_str, &val) != -1) { + cmt_gauge_set(ctx->num_threads, ts, val, 3, (char *[]){ name, pid_str, ppid_str }); + } + } + + /* Memory */ + if (ctx->enabled_flag & METRIC_MEMORY) { + /* Memory Size */ + entry = flb_slist_entry_get(&split_list, 20); + tmp = entry->str; + /* Collect the number of Virtual Memory per process */ + if (pe_utils_str_to_uint64(tmp, &val) != -1) { + cmt_gauge_set(ctx->memory_bytes, ts, val, 4, (char *[]){ name, pid_str, ppid_str, "virtual_memory" }); + } + + entry = flb_slist_entry_get(&split_list, 21); + tmp = entry->str; + /* Collect the number of RSS per process */ + if (pe_utils_str_to_uint64(tmp, &val) != -1) { + cmt_gauge_set(ctx->memory_bytes, ts, val, 4, (char *[]){ name, pid_str, ppid_str, "rss" }); + } + + /* Major Page Faults */ + entry = flb_slist_entry_get(&split_list, 9); + tmp = entry->str; + /* Collect the number of major page faults per process */ + if (pe_utils_str_to_uint64(tmp, &val) != -1) { + cmt_counter_set(ctx->major_page_faults, ts, val, 3, (char *[]){ name, pid_str, ppid_str }); + } + + /* Minor Page Faults */ + entry = flb_slist_entry_get(&split_list, 7); + tmp = entry->str; + /* Collect the number of minor page faults per process */ + if (pe_utils_str_to_uint64(tmp, &val) != -1) { + cmt_counter_set(ctx->minor_page_faults, ts, val, 3, (char *[]){ name, pid_str, ppid_str }); + } + } + + /* Collect fds */ + if (ctx->enabled_flag & METRIC_FD) { + ret = process_proc_fds(ctx, ts, pid_str, ppid_str, name, process); + if (ret == -1) { + flb_plg_debug(ctx->ins, "collect fds is failed on the pid = %s", pid_str); + } + } + + /* Collect thread_wchan */ + if (ctx->enabled_flag & METRIC_THREAD_WCHAN) { + ret = process_proc_wchan(ctx, ts, pid_str, name, process); + if (ret == -1) { + flb_plg_debug(ctx->ins, "collect thread_wchan is failed on the pid = %s", pid_str); + } + } + + /* Collect IO status */ + if (ctx->enabled_flag & METRIC_IO) { + ret = process_proc_io(ctx, ts, pid_str, ppid_str, name, process); + if (ret == -1) { + flb_plg_debug(ctx->ins, "collect process io procfs is failed on the pid = %s", pid_str); + } + } + + /* Collect the states of threads */ + if (ctx->enabled_flag & METRIC_THREAD) { + ret = process_thread_update(ctx, ts, pid_str, name); + if (ret == -1) { + flb_plg_debug(ctx->ins, "collect thread procfs is failed on the pid = %s", pid_str); + } + } + + cleanup: + /* Teardown */ + flb_slist_destroy(&split_list); + flb_free(name); + } + flb_slist_destroy(&stat_list); + + /* Context Switches */ + if (ctx->enabled_flag & METRIC_CTXT) { + process_proc_status(ctx, ts, pid_str, process); + } + } + + flb_slist_destroy(&procfs_list); + + return 0; +} + +int pe_process_update(struct flb_pe *ctx) +{ + process_update(ctx); + return 0; +} + +int pe_process_exit(struct flb_pe *ctx) +{ + if (ctx->process_regex_include_list != NULL) { + flb_regex_destroy(ctx->process_regex_include_list); + } + if (ctx->process_regex_exclude_list != NULL) { + flb_regex_destroy(ctx->process_regex_exclude_list); + } + + return 0; +} diff --git a/plugins/in_process_exporter_metrics/pe_process.h b/plugins/in_process_exporter_metrics/pe_process.h new file mode 100644 index 00000000000..c0b9e2f3aa5 --- /dev/null +++ b/plugins/in_process_exporter_metrics/pe_process.h @@ -0,0 +1,29 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_PE_PROCESS_H +#define FLB_IN_PE_PROCESS_H + +#include "pe.h" + +int pe_process_init(struct flb_pe *ctx); +int pe_process_update(struct flb_pe *ctx); +int pe_process_exit(struct flb_pe *ctx); + +#endif diff --git a/plugins/in_process_exporter_metrics/pe_utils.c b/plugins/in_process_exporter_metrics/pe_utils.c new file mode 100644 index 00000000000..0435dd76ad6 --- /dev/null +++ b/plugins/in_process_exporter_metrics/pe_utils.c @@ -0,0 +1,261 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include "pe.h" + +/* required by stat(2), open(2) */ +#include +#include +#include +#include + +#include + +int pe_utils_str_to_double(char *str, double *out_val) +{ + double val; + char *end; + + errno = 0; + val = strtod(str, &end); + if (errno != 0 || *end != '\0') { + return -1; + } + *out_val = val; + return 0; +} + +int pe_utils_str_to_uint64(char *str, uint64_t *out_val) +{ + uint64_t val; + char *end; + + errno = 0; + val = strtoll(str, &end, 10); + if ((errno == ERANGE && (val == LONG_MAX || val == LONG_MIN)) + || (errno != 0 && val == 0)) { + flb_errno(); + return -1; + } + + if (end == str) { + return -1; + } + + *out_val = val; + return 0; +} + +int pe_utils_file_read_uint64(const char *mount, + const char *path, + const char *join_a, const char *join_b, + uint64_t *out_val) +{ + int fd; + int len; + int ret; + flb_sds_t p; + uint64_t val; + ssize_t bytes; + char tmp[32]; + + /* Check the path starts with the mount point to prevent duplication. */ + if (strncasecmp(path, mount, strlen(mount)) == 0 && + path[strlen(mount)] == '/') { + mount = ""; + } + + /* Compose the final path */ + p = flb_sds_create(mount); + if (!p) { + return -1; + } + + len = strlen(path); + flb_sds_cat_safe(&p, path, len); + + if (join_a) { + flb_sds_cat_safe(&p, "/", 1); + len = strlen(join_a); + flb_sds_cat_safe(&p, join_a, len); + } + + if (join_b) { + flb_sds_cat_safe(&p, "/", 1); + len = strlen(join_b); + flb_sds_cat_safe(&p, join_b, len); + } + + fd = open(p, O_RDONLY); + if (fd == -1) { + flb_sds_destroy(p); + return -1; + } + flb_sds_destroy(p); + + bytes = read(fd, &tmp, sizeof(tmp)); + if (bytes == -1) { + flb_errno(); + close(fd); + return -1; + } + close(fd); + + ret = pe_utils_str_to_uint64(tmp, &val); + if (ret == -1) { + return -1; + } + + *out_val = val; + return 0; +} + +/* + * Read a file and every non-empty line is stored as a flb_slist_entry in the + * given list. + */ +int pe_utils_file_read_lines(const char *mount, const char *path, struct mk_list *list) +{ + int len; + int ret; + FILE *f; + char line[512]; + char real_path[2048]; + + mk_list_init(list); + + /* Check the path starts with the mount point to prevent duplication. */ + if (strncasecmp(path, mount, strlen(mount)) == 0 && + path[strlen(mount)] == '/') { + mount = ""; + } + + snprintf(real_path, sizeof(real_path) - 1, "%s%s", mount, path); + f = fopen(real_path, "r"); + if (f == NULL) { + if (errno == EACCES) { + flb_debug("error reading procfs for path %s. errno = %d", real_path, errno); + } + else { + flb_errno(); + } + return -1; + } + + /* Read the content */ + while (fgets(line, sizeof(line) - 1, f)) { + len = strlen(line); + if (line[len - 1] == '\n') { + line[--len] = 0; + if (len && line[len - 1] == '\r') { + line[--len] = 0; + } + } + + ret = flb_slist_add(list, line); + if (ret == -1) { + fclose(f); + flb_slist_destroy(list); + return -1; + } + } + + fclose(f); + return 0; +} + +int pe_utils_path_scan(struct flb_pe *ctx, const char *mount, const char *path, + int expected, struct mk_list *list) +{ + int i; + int ret; + glob_t globbuf; + struct stat st; + char real_path[2048]; + + if (!path) { + return -1; + } + + /* Safe reset for globfree() */ + globbuf.gl_pathv = NULL; + + /* Scan the real path */ + snprintf(real_path, sizeof(real_path) - 1, "%s%s", mount, path); + ret = glob(real_path, GLOB_TILDE | GLOB_ERR, NULL, &globbuf); + if (ret != 0) { + switch (ret) { + case GLOB_NOSPACE: + flb_plg_error(ctx->ins, "no memory space available"); + return -1; + case GLOB_ABORTED: + flb_plg_error(ctx->ins, "read error, check permissions: %s", path); + return -1;; + case GLOB_NOMATCH: + ret = stat(path, &st); + if (ret == -1) { + flb_plg_debug(ctx->ins, "cannot read info from: %s", path); + } + else { + ret = access(path, R_OK); + if (ret == -1 && errno == EACCES) { + flb_plg_error(ctx->ins, "NO read access for path: %s", path); + } + else { + flb_plg_debug(ctx->ins, "NO matches for path: %s", path); + } + } + return -1; + } + } + + if (globbuf.gl_pathc <= 0) { + globfree(&globbuf); + return -1; + } + + /* Initialize list */ + flb_slist_create(list); + + /* For every entry found, generate an output list */ + for (i = 0; i < globbuf.gl_pathc; i++) { + ret = stat(globbuf.gl_pathv[i], &st); + if (ret != 0) { + continue; + } + + if ((expected == NE_SCAN_FILE && S_ISREG(st.st_mode)) || + (expected == NE_SCAN_DIR && S_ISDIR(st.st_mode))) { + + /* Compose the path */ + ret = flb_slist_add(list, globbuf.gl_pathv[i]); + if (ret != 0) { + globfree(&globbuf); + flb_slist_destroy(list); + return -1; + } + } + } + + globfree(&globbuf); + return 0; +} diff --git a/plugins/in_process_exporter_metrics/pe_utils.h b/plugins/in_process_exporter_metrics/pe_utils.h new file mode 100644 index 00000000000..e713fbbbf64 --- /dev/null +++ b/plugins/in_process_exporter_metrics/pe_utils.h @@ -0,0 +1,39 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_PROCESS_EXPORTER_UTILS_H +#define FLB_PROCESS_EXPORTER_UTILS_H + +#include +#include +#include +#include "pe.h" + +int pe_utils_str_to_double(char *str, double *out_val); +int pe_utils_str_to_uint64(char *str, uint64_t *out_val); + +int pe_utils_file_read_uint64(const char *mount, + const char *path, + const char *join_a, const char *join_b, + uint64_t *out_val); + +int pe_utils_file_read_lines(const char *mount, const char *path, struct mk_list *list); +int pe_utils_path_scan(struct flb_pe *ctx, const char *mount, const char *path, + int expected, struct mk_list *list); +#endif