From 10afcab95b2341f35b67e3817ac00a4487d4b56a Mon Sep 17 00:00:00 2001 From: Jorge Niedbalski Date: Fri, 20 Sep 2024 17:39:09 +0200 Subject: [PATCH] in_ebpf: initial version of the plugin This is an initial proposal of a POC of an ebpf ingestor plugin. This adds capabilities to load and attach to an existing ebpf program and consume events from a fixed-sized ring buffer, subsequently those events are ingested in the log ingestion buffer. Events types are known and defined in the fluent-bit codebase and those has to be implemented by the ebpf program to follow when submitted into the ring buffer, this in the future must be serialized and be an extensible part of the project as we possibly make progress towards compability with other ebpf collectors. Also, i've implemented a fallback to allow strings to be passed as the payload of the event, without following a specific event type. Signed-off-by: Jorge Niedbalski --- CMakeLists.txt | 17 ++ cmake/plugins_options.cmake | 1 + cmake/windows-setup.cmake | 1 + plugins/CMakeLists.txt | 1 + plugins/in_ebpf/CMakeLists.txt | 6 + plugins/in_ebpf/in_ebpf.c | 404 +++++++++++++++++++++++++++++++++ plugins/in_ebpf/in_ebpf.h | 68 ++++++ 7 files changed, 498 insertions(+) create mode 100644 plugins/in_ebpf/CMakeLists.txt create mode 100644 plugins/in_ebpf/in_ebpf.c create mode 100644 plugins/in_ebpf/in_ebpf.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 9e42d4faffc..190525c9214 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1032,6 +1032,23 @@ else() set(FLB_ARROW OFF) endif() +# EBPF Support +# ============ +if (FLB_IN_EBPF) + find_package(PkgConfig) + pkg_check_modules(LIBBPF libbpf>=0.5.0) + if (LIBBPF_FOUND) + include_directories(${LIBBPF_INCLUDE_DIRS}) + list(APPEND EXTRA_LIBS ${LIBBPF_LIBRARIES}) + else() + if (FLB_SYSTEM_LINUX AND NOT(LIBBPF_FOUND)) + MESSAGE(FATAL_ERROR "Libbpf required on Linux") + endif() + message(STATUS "Libbpf is not found") + set(FLB_IN_EBPF OFF) + endif() +endif() + # Pthread Local Storage # ===================== # By default we expect the compiler already support thread local storage diff --git a/cmake/plugins_options.cmake b/cmake/plugins_options.cmake index 5a63bbc7d68..7d8462de465 100644 --- a/cmake/plugins_options.cmake +++ b/cmake/plugins_options.cmake @@ -61,6 +61,7 @@ DEFINE_OPTION(FLB_IN_WINLOG "Enable Windows Log input plugin" DEFINE_OPTION(FLB_IN_WINDOWS_EXPORTER_METRICS "Enable windows exporter metrics input plugin" ON) DEFINE_OPTION(FLB_IN_WINEVTLOG "Enable Windows EvtLog input plugin" OFF) DEFINE_OPTION(FLB_IN_WINSTAT "Enable Windows Stat input plugin" OFF) +DEFINE_OPTION(FLB_IN_EBPF "Enable Linux eBPF input plugin" OFF) # Processors # ========== diff --git a/cmake/windows-setup.cmake b/cmake/windows-setup.cmake index b947d6cbfa0..60230408b2e 100644 --- a/cmake/windows-setup.cmake +++ b/cmake/windows-setup.cmake @@ -53,6 +53,7 @@ if(FLB_WINDOWS_DEFAULTS) set(FLB_IN_STORAGE_BACKLOG Yes) set(FLB_IN_EMITTER Yes) set(FLB_IN_PODMAN_METRICS No) + set(FLB_IN_EBPF No) set(FLB_IN_ELASTICSEARCH Yes) set(FLB_IN_SPLUNK Yes) set(FLB_IN_PROMETHEUS_REMOTE_WRITE Yes) diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index ce8cae64d97..ae9bac57af8 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -207,6 +207,7 @@ if(${CMAKE_SYSTEM_NAME} MATCHES "Linux") REGISTER_IN_PLUGIN("in_docker_events") REGISTER_IN_PLUGIN("in_podman_metrics") REGISTER_IN_PLUGIN("in_process_exporter_metrics") + REGISTER_IN_PLUGIN("in_ebpf") endif() if(${CMAKE_SYSTEM_NAME} MATCHES "Linux" OR ${CMAKE_SYSTEM_NAME} MATCHES "Darwin") diff --git a/plugins/in_ebpf/CMakeLists.txt b/plugins/in_ebpf/CMakeLists.txt new file mode 100644 index 00000000000..fc2a7a4a7c1 --- /dev/null +++ b/plugins/in_ebpf/CMakeLists.txt @@ -0,0 +1,6 @@ +set(src + in_ebpf.c +) + +FLB_PLUGIN(in_ebpf "${src}" "") +target_link_libraries(flb-plugin-in_ebpf -lbpf -lelf -lz) \ No newline at end of file diff --git a/plugins/in_ebpf/in_ebpf.c b/plugins/in_ebpf/in_ebpf.c new file mode 100644 index 00000000000..291bb7a1ce9 --- /dev/null +++ b/plugins/in_ebpf/in_ebpf.c @@ -0,0 +1,404 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "in_ebpf.h" + +static int encode_log_event(struct flb_input_instance *ins, + struct flb_log_event_encoder *log_encoder, + const char *event_type_str, + __u32 pid, + const char *data, size_t data_len) +{ + int ret; + + flb_plg_trace(ins, "Encoding log event"); + ret = flb_log_event_encoder_begin_record(log_encoder); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(ins, "Failed to begin log event record"); + return -1; + } + + flb_plg_trace(ins, "Setting current timestamp for log event"); + ret = flb_log_event_encoder_set_current_timestamp(log_encoder); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_rollback_record(log_encoder); + flb_plg_error(ins, "Failed to set timestamp"); + return -1; + } + + /* Append the PID (if provided in the event) */ + if (pid > 0) { + flb_plg_trace(ins, "Appending PID: %u", pid); + ret = flb_log_event_encoder_append_body_cstring(log_encoder, "pid"); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_rollback_record(log_encoder); + flb_plg_error(ins, "Failed to append PID key"); + return -1; + } + ret = flb_log_event_encoder_append_body_uint32(log_encoder, pid); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_rollback_record(log_encoder); + flb_plg_error(ins, "Failed to append PID value"); + return -1; + } + } + + if (event_type_str) { + flb_plg_trace(ins, "Appending event type: %s", event_type_str); + ret = flb_log_event_encoder_append_body_cstring(log_encoder, "event_type"); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_rollback_record(log_encoder); + flb_plg_error(ins, "Failed to append event type key"); + return -1; + } + ret = flb_log_event_encoder_append_body_string(log_encoder, event_type_str, strlen(event_type_str)); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_rollback_record(log_encoder); + flb_plg_error(ins, "Failed to append event type value"); + return -1; + } + } + + if (data_len > 0) { + flb_plg_trace(ins, "Appending event data of length: %zu", data_len); + ret = flb_log_event_encoder_append_body_cstring(log_encoder, "event_data"); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_rollback_record(log_encoder); + flb_plg_error(ins, "Failed to append event data key"); + return -1; + } + ret = flb_log_event_encoder_append_body_string(log_encoder, data, data_len); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_rollback_record(log_encoder); + flb_plg_error(ins, "Failed to append event data value"); + return -1; + } + } else { + flb_plg_trace(ins, "No event data to append (data_len = 0)"); + } + + /* Commit the record */ + ret = flb_log_event_encoder_commit_record(log_encoder); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(ins, "Failed to commit log event record"); + return -1; + } + + /* Append the encoded log event to Fluent Bit instance */ + if (log_encoder->output_length > 0) { + flb_plg_trace(ins, "Appending log event of length: %zu", log_encoder->output_length); + ret = flb_input_log_append(ins, NULL, 0, + log_encoder->output_buffer, + log_encoder->output_length); + if (ret == -1) { + flb_plg_error(ins, "Failed to append log data"); + return -1; + } + flb_log_event_encoder_reset(log_encoder); + } + + return 0; +} + +/* Event handler for the ring buffer */ +static int handle_ebpf_event(void *instance, void *data, size_t data_sz) { + struct flb_input_instance *ins = instance; + struct flb_in_ebpf_config *ctx = ins->context; + struct flb_log_event_encoder *log_encoder = ctx->log_encoder; + const char *event_type_str = NULL; + __u32 pid = 0; + char *event_data = NULL; + size_t event_data_len = 0; + + /* Check if data size is zero and discard if no valid data */ + if (data_sz == 0) { + flb_plg_warn(ins, "Received an event with zero data size. Discarding."); + return 0; // No data to process, skip further actions + } + + /* First, attempt to handle structured event */ + if (data_sz == sizeof(struct flb_in_ebpf_event)) { + /* Structured event */ + struct flb_in_ebpf_event *event = (struct flb_in_ebpf_event *)data; + event_type_str = get_event_type_str(event->event_type); + pid = event->pid; + event_data = event->data; + event_data_len = strlen(event_data); + + flb_plg_trace(ins, "Processed structured event of type: %s", event_type_str); + + } else if (data_sz <= MAX_EVENT_LEN) { + /* Handle raw string case (e.g., command line) */ + event_data = (char *)data; + event_data_len = strlen(event_data); + event_type_str = FLB_IN_EBPF_EVENT_TYPE_UNKNOWN; + + flb_plg_trace(ins, "Processed raw string event"); + } + + /* Encode the event */ + return encode_log_event(ins, log_encoder, event_type_str, pid, event_data, event_data_len); +} + + +/* Collect function for reading the ring buffer */ +static int in_ebpf_collect(struct flb_input_instance *ins, + struct flb_config *config, void *in_context) +{ + struct flb_in_ebpf_config *ctx = in_context; + flb_plg_trace(ins, "Polling on ring buffer '%s'", ctx->ringbuf_map_name); + int err = ring_buffer__consume(ctx->rb); + if (err < 0) { + flb_plg_error(ins, "Error polling the ring buffer: %d", err); + return -1; + } + + return 0; +} + +/* Function to initialize the eBPF program */ +static int in_ebpf_init(struct flb_input_instance *ins, + struct flb_config *config, void *data) +{ + struct flb_in_ebpf_config *ctx; + const char *bpf_obj_file; + const char *bpf_prog_name; + struct bpf_map *map; + int err; + + /* Allocate space for the configuration context */ + ctx = flb_calloc(1, sizeof(struct flb_in_ebpf_config)); + if (!ctx) { + flb_plg_error(ins, "Could not allocate memory for the context"); + return -1; + } + ctx->rb = NULL; + ctx->obj = NULL; + ctx->ins = ins; /* Set the input instance pointer */ + + /* Initialize the log encoder */ + ctx->log_encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT); + if (!ctx->log_encoder) { + flb_plg_error(ins, "Could not create log event encoder"); + flb_free(ctx); + return -1; + } + + /* Set defaults for optional parameters */ + if (!ctx->ringbuf_map_name) { + ctx->ringbuf_map_name = FLB_IN_EBPF_DEFAULT_RINGBUF_MAP_NAME; + } + if (ctx->poll_ms <= 0) { + ctx->poll_ms = atoi(FLB_IN_EBPF_DEFAULT_POLL_MS); + } + + /* Set the context */ + flb_input_set_context(ins, ctx); + + /* Load the config map */ + int ret = flb_input_config_map_set(ins, (void *)ctx); + if (ret == -1) { + flb_plg_error(ins, "Failed to load config map"); + flb_free(ctx); + return -1; + } + + /* Get the BPF object file path */ + bpf_obj_file = ctx->bpf_object_file; + if (!bpf_obj_file) { + flb_plg_error(ins, "No BPF object file specified"); + flb_free(ctx); + return -1; + } + + /* Get the BPF program name */ + bpf_prog_name = ctx->bpf_program_name; + if (!bpf_prog_name) { + flb_plg_error(ins, "No BPF program name specified"); + flb_free(ctx); + return -1; + } + + /* Load the BPF object file */ + ctx->obj = bpf_object__open_file(bpf_obj_file, NULL); + if (!ctx->obj) { + flb_plg_error(ins, "Failed to open BPF object file: %s", bpf_obj_file); + flb_free(ctx); + return -1; + } + + /* Load the BPF object into the kernel */ + err = bpf_object__load(ctx->obj); + if (err) { + flb_plg_error(ins, "Failed to load BPF object: %d", err); + bpf_object__close(ctx->obj); + flb_free(ctx); + return -1; + } + + /* Find the BPF program by its name */ + struct bpf_program *prog = bpf_object__find_program_by_name(ctx->obj, bpf_prog_name); + if (!prog) { + flb_plg_error(ins, "Failed to find BPF program: %s", bpf_prog_name); + bpf_object__close(ctx->obj); + flb_free(ctx); + return -1; + } + + /* Attach the BPF program to the tracepoint */ + struct bpf_link *link = bpf_program__attach(prog); + if (!link) { + flb_plg_error(ins, "Failed to attach BPF program"); + bpf_object__close(ctx->obj); + flb_free(ctx); + return -1; + } + + /* Find the ring buffer map by its name */ + map = bpf_object__find_map_by_name(ctx->obj, ctx->ringbuf_map_name); + if (!map) { + flb_plg_error(ins, "Failed to find the '%s' map in BPF object", ctx->ringbuf_map_name); + bpf_link__destroy(link); + bpf_object__close(ctx->obj); + flb_free(ctx); + return -1; + } + + ctx->map_fd = bpf_map__fd(map); + if (ctx->map_fd < 0) { + flb_plg_error(ins, "Failed to get file descriptor for '%s' map", ctx->ringbuf_map_name); + bpf_link__destroy(link); + bpf_object__close(ctx->obj); + flb_free(ctx); + return -1; + } + + /* Set up the ring buffer */ + ctx->rb = ring_buffer__new(ctx->map_fd, handle_ebpf_event, ins, NULL); + if (!ctx->rb) { + flb_plg_error(ins, "Failed to create ring buffer"); + bpf_link__destroy(link); + bpf_object__close(ctx->obj); + flb_free(ctx); + return -1; + } + + /* Calculate poll time in seconds and nanoseconds */ + int poll_seconds = ctx->poll_ms / 1000; + int poll_nanoseconds = (ctx->poll_ms % 1000) * 1000000; + + /* Initialize the collector */ + ctx->coll_fd = flb_input_set_collector_time(ins, + in_ebpf_collect, + poll_seconds, + poll_nanoseconds, + config); + if (ctx->coll_fd < 0) { + flb_plg_error(ins, "Failed to set up collector"); + ring_buffer__free(ctx->rb); + bpf_link__destroy(link); + bpf_object__close(ctx->obj); + flb_free(ctx); + return -1; + } + + flb_plg_info(ins, "eBPF program '%s' loaded successfully from object file '%s' with ring buffer '%s'", + bpf_prog_name, bpf_obj_file, ctx->ringbuf_map_name); + + return 0; +} + +/* Pause function to stop the collector */ +static void in_ebpf_pause(void *data, struct flb_config *config) +{ + struct flb_in_ebpf_config *ctx = data; + + flb_input_collector_pause(ctx->coll_fd, ctx->ins); +} + +/* Resume function to start the collector */ +static void in_ebpf_resume(void *data, struct flb_config *config) +{ + struct flb_in_ebpf_config *ctx = data; + + flb_input_collector_resume(ctx->coll_fd, ctx->ins); +} + +/* Cleanup function */ +static int in_ebpf_exit(void *in_context, struct flb_config *config) +{ + struct flb_in_ebpf_config *ctx = in_context; + + if (!ctx) { + return 0; + } + + /* Clean up ring buffer and BPF object */ + if (ctx->rb) { + ring_buffer__free(ctx->rb); + } + + if (ctx->obj) { + bpf_object__close(ctx->obj); + } + + if (ctx->log_encoder) { + flb_log_event_encoder_destroy(ctx->log_encoder); + } + + flb_free(ctx); + + return 0; +} + +/* Configuration map for the plugin */ +static struct flb_config_map config_map[] = { + /* Path to the compiled eBPF object file (*.o) */ + { + FLB_CONFIG_MAP_STR, "bpf_object_file", NULL, + 0, FLB_TRUE, offsetof(struct flb_in_ebpf_config, bpf_object_file), + "Path to the eBPF program object file." + }, + + /* Name of the eBPF program (function) to attach from the object file */ + { + FLB_CONFIG_MAP_STR, "bpf_program_name", NULL, + 0, FLB_TRUE, offsetof(struct flb_in_ebpf_config, bpf_program_name), + "Name of the eBPF program to attach." + }, + + /* Name of the ring buffer map in the eBPF program to use for event collection */ + { + FLB_CONFIG_MAP_STR, "ringbuf_map_name", FLB_IN_EBPF_DEFAULT_RINGBUF_MAP_NAME, + 0, FLB_TRUE, offsetof(struct flb_in_ebpf_config, ringbuf_map_name), + "Name of the ring buffer map in the eBPF program." + }, + + /* Poll timeout in milliseconds (-1 for infinite) */ + { + FLB_CONFIG_MAP_INT, "poll_ms", FLB_IN_EBPF_DEFAULT_POLL_MS, + 0, FLB_TRUE, offsetof(struct flb_in_ebpf_config, poll_ms), + "Poll timeout in milliseconds (-1 for infinite)." + }, + {0} +}; + +/* Plugin registration */ +struct flb_input_plugin in_ebpf_plugin = { + .name = "ebpf", + .description = "eBPF input plugin", + .cb_init = in_ebpf_init, + .cb_pre_run = NULL, + .cb_collect = in_ebpf_collect, + .cb_flush_buf = NULL, + .cb_pause = in_ebpf_pause, + .cb_resume = in_ebpf_resume, + .cb_exit = in_ebpf_exit, + .config_map = config_map, +}; \ No newline at end of file diff --git a/plugins/in_ebpf/in_ebpf.h b/plugins/in_ebpf/in_ebpf.h new file mode 100644 index 00000000000..f7805b3fe35 --- /dev/null +++ b/plugins/in_ebpf/in_ebpf.h @@ -0,0 +1,68 @@ +#ifndef FLB_IN_EBPF_H +#define FLB_IN_EBPF_H + +#include +#include +#include +#include + +/* Define default values */ +#define FLB_IN_EBPF_DEFAULT_RINGBUF_MAP_NAME "events" +#define FLB_IN_EBPF_DEFAULT_POLL_MS "1000" // 1 second default poll timeout +#define FLB_IN_EBPF_DEFAULT_ATTRIBUTE_NAME "payload" +#define FLB_IN_EBPF_DEFAULT_RINGBUF_SIZE "8192" // Default ring buffer size in bytes + +#define MAX_EVENT_LEN 128 + +/* Configuration structure for eBPF plugin */ +struct flb_in_ebpf_config { + struct ring_buffer *rb; + struct bpf_object *obj; + struct flb_log_event_encoder *log_encoder; // Log encoder + int map_fd; + size_t ringbuf_size; + size_t ringbuf_consume_count; /* events to consume from ring buffer on each poll */ + char *ringbuf_map_name; + int poll_ms; /* Poll timeout in milliseconds */ + const char *bpf_object_file; /* Path to the eBPF object file */ + const char *bpf_program_name; /* Name of the eBPF program to attach */ + char *attribute_name; /* Configurable attribute name */ + int coll_fd; /* Collector file descriptor */ + struct flb_input_instance *ins; /* Pointer to the input instance */ +}; + +/* Event types enum in UPPERCASE */ +enum FLB_IN_EBPF_EVENT_TYPE { + FLB_IN_EBPF_EVENT_FILESYSTEM = 0, + FLB_IN_EBPF_EVENT_NETWORK = 1, + FLB_IN_EBPF_EVENT_PROCESS = 2 +}; + +/* Event structure sent by eBPF */ +struct flb_in_ebpf_event { + __u32 pid; + __u32 event_type; // Event type as an enum + char data[MAX_EVENT_LEN]; // Event-specific data (filename, network info, etc.) +}; + +/* Define constant strings for event types */ +#define FLB_IN_EBPF_EVENT_TYPE_FILESYSTEM "filesystem" +#define FLB_IN_EBPF_EVENT_TYPE_NETWORK "network" +#define FLB_IN_EBPF_EVENT_TYPE_PROCESS "process" +#define FLB_IN_EBPF_EVENT_TYPE_UNKNOWN "unknown" + +/* Function to map enum values to strings */ +static inline const char *get_event_type_str(int event_type) { + switch (event_type) { + case FLB_IN_EBPF_EVENT_FILESYSTEM: + return FLB_IN_EBPF_EVENT_TYPE_FILESYSTEM; + case FLB_IN_EBPF_EVENT_NETWORK: + return FLB_IN_EBPF_EVENT_TYPE_NETWORK; + case FLB_IN_EBPF_EVENT_PROCESS: + return FLB_IN_EBPF_EVENT_TYPE_PROCESS; + default: + return FLB_IN_EBPF_EVENT_TYPE_UNKNOWN; + } +} + +#endif /* FLB_IN_EBPF_H */