Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Msgpack extended error handling #8256

Closed
wants to merge 11 commits into from
24 changes: 23 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
# Modified Work:
#
# Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES, ALL RIGHTS RESERVED.

# This software product is a proprietary product of NVIDIA CORPORATION &
# AFFILIATES (the "Company") and all right, title, and interest in and to the
# software product, including all associated intellectual property rights, are
# and shall remain exclusively with the Company.
#
# This software product is governed by the End User License Agreement
# provided with the software product.
#


cmake_minimum_required(VERSION 3.0)
project(fluent-bit)

Expand Down Expand Up @@ -84,6 +98,7 @@ option(FLB_PARSER "Build with Parser support" Yes)
option(FLB_TLS "Build with SSL/TLS support" Yes)
option(FLB_BINARY "Build executable binary" Yes)
option(FLB_EXAMPLES "Build examples" Yes)
option(FLB_RAW_MSGPACK_API "Build raw msgpack api" Yes)
option(FLB_SHARED_LIB "Build shared library" Yes)
option(FLB_VALGRIND "Enable Valgrind support" No)
option(FLB_TRACE "Enable trace mode" No)
Expand Down Expand Up @@ -140,6 +155,7 @@ option(FLB_IN_RANDOM "Enable random input plugin" Yes)
option(FLB_IN_SERIAL "Enable Serial input plugin" Yes)
option(FLB_IN_STDIN "Enable Standard input plugin" Yes)
option(FLB_IN_DUMMY_THREAD "Enable Threaded dummy plugin" Yes)
option(FLB_IN_RAW_MSGPACK "Enable Raw MessagePack input plugin" Yes)
option(FLB_IN_SYSLOG "Enable Syslog input plugin" Yes)
option(FLB_IN_TAIL "Enable Tail input plugin" Yes)
option(FLB_IN_TCP "Enable TCP input plugin" Yes)
Expand All @@ -166,6 +182,7 @@ option(FLB_OUT_AZURE_BLOB "Enable Azure output plugin" Yes)
option(FLB_OUT_BIGQUERY "Enable BigQuery output plugin" Yes)
option(FLB_OUT_CALYPTIA "Enable Calyptia monitoring plugin" Yes)
option(FLB_OUT_COUNTER "Enable Counter output plugin" Yes)
option(FLB_OUT_COLLECTX "Enable Collectx output plugin" Yes)
option(FLB_OUT_DATADOG "Enable DataDog output plugin" Yes)
option(FLB_OUT_ES "Enable Elasticsearch output plugin" Yes)
option(FLB_OUT_EXIT "Enable Exit output plugin" Yes)
Expand All @@ -187,13 +204,14 @@ option(FLB_OUT_SLACK "Enable Slack output plugin" Yes)
option(FLB_OUT_SPLUNK "Enable Splunk output plugin" Yes)
option(FLB_OUT_STACKDRIVER "Enable Stackdriver output plugin" Yes)
option(FLB_OUT_STDOUT "Enable STDOUT output plugin" Yes)
option(FLB_OUT_STDOUT_RAW "Enable STDOUT RAW output plugin" Yes)
option(FLB_OUT_SYSLOG "Enable Syslog output plugin" Yes)
option(FLB_OUT_LIB "Enable library mode output plugin" Yes)
option(FLB_OUT_NULL "Enable dev null output plugin" Yes)
option(FLB_OUT_FLOWCOUNTER "Enable flowcount output plugin" Yes)
option(FLB_OUT_LOGDNA "Enable LogDNA output plugin" Yes)
option(FLB_OUT_LOKI "Enable Loki output plugin" Yes)
option(FLB_OUT_KAFKA "Enable Kafka output plugin" No)
option(FLB_OUT_KAFKA "Enable Kafka output plugin" Yes)
option(FLB_OUT_KAFKA_REST "Enable Kafka Rest output plugin" Yes)
option(FLB_OUT_CLOUDWATCH_LOGS "Enable AWS CloudWatch output plugin" Yes)
option(FLB_OUT_KINESIS_FIREHOSE "Enable AWS Firehose output plugin" Yes)
Expand Down Expand Up @@ -252,6 +270,7 @@ if(FLB_ALL)
set(FLB_IN_MQTT 1)
set(FLB_IN_SERIAL 1)
set(FLB_IN_STDIN 1)
set(FLB_IN_RAW_MSGPACK 1)
set(FLB_IN_HEAD 1)
set(FLB_IN_PROC 1)
set(FLB_IN_DISK 1)
Expand All @@ -273,7 +292,9 @@ if(FLB_ALL)
set(FLB_OUT_RETRY 1)
set(FLB_OUT_TD 1)
set(FLB_OUT_STDOUT 1)
set(FLB_OUT_COLLECTX 1)
set(FLB_OUT_S3 1)
set(FLB_OUT_STDOUT_RAW 1)
set(FLB_OUT_SYSLOG 1)
set(FLB_OUT_LIB 1)
set(FLB_OUT_FLOWCOUNTER 1)
Expand Down Expand Up @@ -969,6 +990,7 @@ if(FLB_EXAMPLES)
add_subdirectory(examples)
endif()


if(FLB_TESTS_RUNTIME)
enable_testing()
add_subdirectory(tests/runtime/)
Expand Down
110 changes: 110 additions & 0 deletions conf/flb_from_clx.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# Modified Work:
#
# Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES, ALL RIGHTS RESERVED.

# This software product is a proprietary product of NVIDIA CORPORATION &
# AFFILIATES (the "Company") and all right, title, and interest in and to the
# software product, including all associated intellectual property rights, are
# and shall remain exclusively with the Company.
#
# This software product is governed by the End User License Agreement
# provided with the software product.
#

[SERVICE]
# Flush
# =====
# set an interval of seconds before to flush records to a destination
flush 1

# Daemon
# ======
# instruct Fluent Bit to run in foreground or background mode.
daemon Off

# Log_Level
# =========
# Set the verbosity level of the service, values can be:
#
# - error
# - warning
# - info
# - debug
# - trace
#
# by default 'info' is set, that means it includes 'error' and 'warning'.
log_level info

# Parsers File
# ============
# specify an optional 'Parsers' configuration file
parsers_file parsers.conf

# Plugins File
# ============
# specify an optional 'Plugins' configuration file to load external plugins.
plugins_file plugins.conf

# HTTP Server
# ===========
# Enable/Disable the built-in HTTP Server for metrics
http_server Off
http_listen 0.0.0.0
http_port 2020

# Storage
# =======
# Fluent Bit can use memory and filesystem buffering based mechanisms
#
# - https://docs.fluentbit.io/manual/administration/buffering-and-storage
#
# storage metrics
# ---------------
# publish storage pipeline metrics in '/api/v1/storage'. The metrics are
# exported only if the 'http_server' optoin is enabled.
#
storage.metrics on

# storage.path
# ------------
# absolute file system path to store filesystem data buffers (chunks).
#
# storage.path /tmp/storage

# storage.sync
# ------------
# configure the synchronization mode used to store the data into the
# filesystem. It can take the values normal or full.
#
# storage.sync normal

# storage.checksum
# ----------------
# enable the data integrity check when writing and reading data from the
# filesystem. The storage layer uses the CRC32 algorithm.
#
# storage.checksum off

# storage.backlog.mem_limit
# -------------------------
# if storage.path is set, Fluent Bit will look for data chunks that were
# not delivered and are still in the storage layer, these are called
# backlog data. This option configure a hint of maximum value of memory
# to use when processing these records.
#
# storage.backlog.mem_limit 5M

[INPUT]
name forward
tag forward.in

interval_sec 0.01
Match *
Host 127.0.0.1 # default host
Port 24284 # default port


[OUTPUT]
name stdout
match *

25 changes: 25 additions & 0 deletions examples/raw_msgpack_api/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES, ALL RIGHTS RESERVED.

# This software product is a proprietary product of NVIDIA CORPORATION &
# AFFILIATES (the "Company") and all right, title, and interest in and to the
# software product, including all associated intellectual property rights, are
# and shall remain exclusively with the Company.
#
# This software product is governed by the End User License Agreement
# provided with the software product.
#

set(src
${src}
raw_msgpack_api.c
)

find_package (Threads)
# add_executable(raw_msgpack_api_test ${src})
# target_link_libraries(raw_msgpack_api_test fluent-bit-shared)
# target_link_libraries(raw_msgpack_api_test ${CMAKE_THREAD_LIBS_INIT})

add_library(raw_msgpack_api SHARED ${src})
target_link_libraries(raw_msgpack_api fluent-bit-shared)
target_link_libraries(raw_msgpack_api ${CMAKE_THREAD_LIBS_INIT})

69 changes: 69 additions & 0 deletions examples/raw_msgpack_api/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<!-- Modified Work:

Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES, ALL RIGHTS RESERVED.

This software product is a proprietary product of NVIDIA CORPORATION &
AFFILIATES (the "Company") and all right, title, and interest in and to the
software product, including all associated intellectual property rights, are
and shall remain exclusively with the Company.

This software product is governed by the End User License Agreement
provided with the software product. -->
# Fluent Bit / Pass Raw MessagePack data

This example implements API for passing raw MessagePack data through:
```raw_msgapck``` input plugin --> ```forward``` output plugin.



# API for Collectx
This library has simple API, consisting of 3 function:


| Component | Description |
| ------------ | ---------------------------------- |
| int init() | Initializes FluentBit instance with custom input plugin and *forward* output plugin. | |
| int add_data(void* data, int len) | Main routine to pass the data. Inputs MessagePack packed raw *data*, copies it into local buffer and signals to input plugin that buffer is ready. Signaling exploits Unix socket. |
| int finalize() | Releases FluentBit instance and Unix socket |

# Build
1. build the Fluent Bit:
- ```cd build```
- ```cmake3 ..```
- ```make```
2. run script "build_so_api.sh" to build the shared library "librawmsgpack.so" into "fluent-bit/build/examples/clx_raw_msgpack/" folder.

# Usage with python:

Type the following:
```
export LD_LIBRARY_PATH=path/to/fluent-bit/build/lib;$LD_LIBRARY_PATH
```

Then, use ctypes to load and call the library:
```
import ctypes
import msgpack

path_to_lib ="./examples/clx_raw_msgpack/librawmsgpack.so"
lib = ctypes.CDLL(path_to_lib)
print(lib)

# init
lib.init.restypes = ctypes.c_int
lib.init()

# prepare arg and res types for "add_data"
lib.add_data.argtypes = [ctypes.c_void_p, ctypes.c_int]
lib.add_data.restypes = ctypes.c_int

# generate and send data to Fluent Bit
for i in range(1000):
# pack some data with MessagePack
buf = msgpack.packb([i,[i+1,i+2]], use_bin_type=True)

y = lib.add_data(buf, len(buf))

# clear memory
lib.hw_exit()
```
Loading
Loading