diff --git a/include/fluent-bit/flb_msgpack_append_message.h b/include/fluent-bit/flb_msgpack_append_message.h new file mode 100644 index 00000000000..5d821f27457 --- /dev/null +++ b/include/fluent-bit/flb_msgpack_append_message.h @@ -0,0 +1,39 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_MSGPACK_APPEND_MESSAGE_H +#define FLB_MSGPACK_APPEND_MESSAGE_H + +/* Error codes */ +#define FLB_MAP_EXPAND_SUCCESS 0 +#define FLB_MAP_NOT_MODIFIED -1 +#define FLB_MAP_EXPANSION_ERROR -2 +#define FLB_MAP_EXPANSION_INVALID_VALUE_TYPE -3 + +#include + +int flb_msgpack_append_message_to_record(char **result_buffer, + size_t *result_size, + flb_sds_t message_key_name, + char *base_object_buffer, + size_t base_object_size, + char *message_buffer, + size_t message_size, + int message_type); +#endif diff --git a/plugins/in_tcp/tcp.c b/plugins/in_tcp/tcp.c index 084ea68879f..fcca97eb284 100644 --- a/plugins/in_tcp/tcp.c +++ b/plugins/in_tcp/tcp.c @@ -166,6 +166,11 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_in_tcp_config, buffer_size_str), "Set the buffer size" }, + { + FLB_CONFIG_MAP_STR, "source_address_key", (char *) NULL, + 0, FLB_TRUE, offsetof(struct flb_in_tcp_config, source_address_key), + "Key where the source address will be injected" + }, /* EOF */ {0} }; diff --git a/plugins/in_tcp/tcp.h b/plugins/in_tcp/tcp.h index 3ddcbed06ad..020d3a940c9 100644 --- a/plugins/in_tcp/tcp.h +++ b/plugins/in_tcp/tcp.h @@ -40,6 +40,7 @@ struct flb_in_tcp_config { char *tcp_port; /* TCP Port */ flb_sds_t raw_separator; /* Unescaped string delimiterr */ flb_sds_t separator; /* String delimiter */ + flb_sds_t source_address_key; /* Source IP address */ int collector_id; /* Listener collector id */ struct flb_downstream *downstream; /* Client manager */ struct mk_list connections; /* List of active connections */ diff --git a/plugins/in_tcp/tcp_conn.c b/plugins/in_tcp/tcp_conn.c index 28b4b32221d..3f81ca27878 100644 --- a/plugins/in_tcp/tcp_conn.c +++ b/plugins/in_tcp/tcp_conn.c @@ -23,6 +23,7 @@ #include #include #include +#include #include "tcp.h" #include "tcp_conn.h" @@ -37,9 +38,13 @@ static inline int process_pack(struct tcp_conn *conn, { int ret; size_t off = 0; + size_t prev_off = 0; msgpack_unpacked result; msgpack_object entry; struct flb_in_tcp_config *ctx; + char *appended_address_buffer; + size_t appended_address_size; + char *source_address; ctx = conn->ctx; @@ -50,22 +55,60 @@ static inline int process_pack(struct tcp_conn *conn, while (msgpack_unpack_next(&result, pack, size, &off) == MSGPACK_UNPACK_SUCCESS) { entry = result.data; + appended_address_buffer = NULL; + source_address = NULL; + ret = flb_log_event_encoder_begin_record(ctx->log_encoder); if (ret == FLB_EVENT_ENCODER_SUCCESS) { ret = flb_log_event_encoder_set_current_timestamp(ctx->log_encoder); } + if (ctx->source_address_key != NULL) { + source_address = flb_connection_get_remote_address(conn->connection); + } + if (ret == FLB_EVENT_ENCODER_SUCCESS) { if (entry.type == MSGPACK_OBJECT_MAP) { - ret = flb_log_event_encoder_set_body_from_msgpack_object( - ctx->log_encoder, &entry); + if (ctx->source_address_key != NULL && source_address != NULL) { + ret = flb_msgpack_append_message_to_record(&appended_address_buffer, + &appended_address_size, + ctx->source_address_key, + pack + prev_off, + size, + source_address, + strlen(source_address), + MSGPACK_OBJECT_STR); + } + + if (ret == FLB_MAP_EXPANSION_ERROR) { + flb_plg_debug(ctx->ins, "error expanding source_address : %d", ret); + } + + if (appended_address_buffer != NULL) { + ret = flb_log_event_encoder_set_body_from_raw_msgpack( + ctx->log_encoder, appended_address_buffer, appended_address_size); + } + else { + ret = flb_log_event_encoder_set_body_from_msgpack_object( + ctx->log_encoder, &entry); + } } else if (entry.type == MSGPACK_OBJECT_ARRAY) { - ret = flb_log_event_encoder_append_body_values( + if (ctx->source_address_key != NULL && source_address != NULL) { + ret = flb_log_event_encoder_append_body_values( + ctx->log_encoder, + FLB_LOG_EVENT_CSTRING_VALUE("msg"), + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&entry), + FLB_LOG_EVENT_CSTRING_VALUE(ctx->source_address_key), + FLB_LOG_EVENT_CSTRING_VALUE(source_address)); + } + else { + ret = flb_log_event_encoder_append_body_values( ctx->log_encoder, FLB_LOG_EVENT_CSTRING_VALUE("msg"), FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&entry)); + } } else { ret = FLB_EVENT_ENCODER_ERROR_INVALID_VALUE_TYPE; @@ -75,10 +118,15 @@ static inline int process_pack(struct tcp_conn *conn, ret = flb_log_event_encoder_commit_record(ctx->log_encoder); } + if (appended_address_buffer != NULL) { + flb_free(appended_address_buffer); + } + if (ret != FLB_EVENT_ENCODER_SUCCESS) { break; } } + prev_off = off; } msgpack_unpacked_destroy(&result); diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b6233d9f721..a2794bba93e 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -78,6 +78,7 @@ set(src flb_log_event_encoder_dynamic_field.c flb_processor.c flb_reload.c + flb_msgpack_append_message.c ) # Config format diff --git a/src/flb_msgpack_append_message.c b/src/flb_msgpack_append_message.c new file mode 100644 index 00000000000..75df33b5c7a --- /dev/null +++ b/src/flb_msgpack_append_message.c @@ -0,0 +1,82 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +int flb_msgpack_append_message_to_record(char **result_buffer, + size_t *result_size, + flb_sds_t message_key_name, + char *base_object_buffer, + size_t base_object_size, + char *message_buffer, + size_t message_size, + int message_type) +{ + int result = FLB_MAP_NOT_MODIFIED; + char *modified_data_buffer; + int modified_data_size; + msgpack_object_kv *new_map_entries[1]; + msgpack_object_kv message_entry; + *result_buffer = NULL; + *result_size = 0; + modified_data_buffer = NULL; + + if (message_key_name != NULL) { + new_map_entries[0] = &message_entry; + + message_entry.key.type = MSGPACK_OBJECT_STR; + message_entry.key.via.str.size = flb_sds_len(message_key_name); + message_entry.key.via.str.ptr = message_key_name; + + if (message_type == MSGPACK_OBJECT_BIN) { + message_entry.val.type = MSGPACK_OBJECT_BIN; + message_entry.val.via.bin.size = message_size; + message_entry.val.via.bin.ptr = message_buffer; + } + else if (message_type == MSGPACK_OBJECT_STR) { + message_entry.val.type = MSGPACK_OBJECT_STR; + message_entry.val.via.str.size = message_size; + message_entry.val.via.str.ptr = message_buffer; + } + else { + result = FLB_MAP_EXPANSION_INVALID_VALUE_TYPE; + } + + if (result == FLB_MAP_NOT_MODIFIED) { + result = flb_msgpack_expand_map(base_object_buffer, + base_object_size, + new_map_entries, 1, + &modified_data_buffer, + &modified_data_size); + if (result == 0) { + result = FLB_MAP_EXPAND_SUCCESS; + } + else { + result = FLB_MAP_EXPANSION_ERROR; + } + } + } + + if (result == FLB_MAP_EXPAND_SUCCESS) { + *result_buffer = modified_data_buffer; + *result_size = modified_data_size; + } + + return result; +} diff --git a/tests/internal/CMakeLists.txt b/tests/internal/CMakeLists.txt index 5288004dbab..07aefb49c2d 100644 --- a/tests/internal/CMakeLists.txt +++ b/tests/internal/CMakeLists.txt @@ -41,6 +41,7 @@ set(UNIT_TESTS_FILES log_event_decoder.c processor.c uri.c + msgpack_append_message.c ) # Config format diff --git a/tests/internal/data/msgpack_append_message/map1.json b/tests/internal/data/msgpack_append_message/map1.json new file mode 100644 index 00000000000..c824feb7c1d --- /dev/null +++ b/tests/internal/data/msgpack_append_message/map1.json @@ -0,0 +1,5 @@ +{"key1": 123456789, + "key2": 0.999887766, + "key3": "abcdefghijklmnopqrstuvwxyz", + "key4": [{"a": 10, "b": 20}, {"c": 30, "d": 40}] +} diff --git a/tests/internal/msgpack_append_message.c b/tests/internal/msgpack_append_message.c new file mode 100644 index 00000000000..abb487e877c --- /dev/null +++ b/tests/internal/msgpack_append_message.c @@ -0,0 +1,109 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +/* JSON tests data */ +#define JSON_MAP1 FLB_TESTS_DATA_PATH "/data/msgpack_append_message/map1.json" + +#include "flb_tests_internal.h" + +struct msgpack_append_message_test { + char *msgpack; + char *json; +}; + +static inline int process_pack(char *pack, size_t size) +{ + int ret; + msgpack_unpacked result; + char *appended_buffer = NULL; + size_t appended_size; + char *inject_message = "injected"; + char *inject_key_name = "expanding"; + flb_sds_t inject_key; + size_t off = 0; + size_t prev_off = 0; + flb_sds_t out_buf; + char *p = NULL; + + inject_key = flb_sds_create_len(inject_key_name, strlen(inject_key_name)); + if (!inject_key) { + flb_errno(); + return -1; + } + msgpack_unpacked_init(&result); + while (msgpack_unpack_next(&result, pack, size, &off) == MSGPACK_UNPACK_SUCCESS) { + if (result.data.type == MSGPACK_OBJECT_MAP) { + ret = flb_msgpack_append_message_to_record(&appended_buffer, + &appended_size, + inject_key, + pack + prev_off, + size, + inject_message, + 8, + MSGPACK_OBJECT_STR); + TEST_CHECK(ret == 0); + + out_buf = flb_msgpack_raw_to_json_sds(appended_buffer, appended_size); + TEST_CHECK(out_buf != NULL); + p = strstr(out_buf, "\"expanding\":\"injected\""); + if (!TEST_CHECK(p != NULL)) { + TEST_MSG("\"expanding\":\"injected\" should be appended. out_buf=%s", out_buf); + } + if (out_buf) { + flb_sds_destroy(out_buf); + } + } + prev_off = off; + } + + msgpack_unpacked_destroy(&result); + + flb_sds_destroy(inject_key); + flb_free(appended_buffer); + + return ret; +} + +/* Append a single key-value pair into msgpack map */ +void test_append_basic() +{ + int ret; + size_t len; + char *data; + char *pack; + int out_size; + struct flb_pack_state state; + + data = mk_file_to_buffer(JSON_MAP1); + TEST_CHECK(data != NULL); + + len = strlen(data); + + ret = flb_pack_state_init(&state); + TEST_CHECK(ret == 0); + + ret = flb_pack_json_state(data, len, &pack, &out_size, &state); + TEST_CHECK(ret == 0); + + ret = process_pack(pack, out_size); + TEST_CHECK(ret == 0); + + flb_pack_state_reset(&state); + flb_free(data); + flb_free(pack); +} + +TEST_LIST = { + { "basic", test_append_basic }, + { 0 } +}; diff --git a/tests/runtime/in_tcp.c b/tests/runtime/in_tcp.c index eee20bd6438..732af004198 100644 --- a/tests/runtime/in_tcp.c +++ b/tests/runtime/in_tcp.c @@ -255,6 +255,69 @@ void flb_test_tcp() test_ctx_destroy(ctx); } +void flb_test_tcp_with_source_address() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + flb_sockfd_t fd; + int ret; + int num; + ssize_t w_size; + + char *buf = "{\"test\":\"msg\"}"; + size_t size = strlen(buf); + + clear_output_num(); + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"test\":\"msg\",\"source_host\":\"tcp://"; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "source_address_key", "source_host", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* use default host/port */ + fd = connect_tcp(NULL, -1); + if (!TEST_CHECK(fd >= 0)) { + exit(EXIT_FAILURE); + } + + w_size = send(fd, buf, size, 0); + if (!TEST_CHECK(w_size == size)) { + TEST_MSG("failed to send, errno=%d", errno); + flb_socket_close(fd); + exit(EXIT_FAILURE); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs"); + } + + flb_socket_close(fd); + test_ctx_destroy(ctx); +} + void flb_test_tcp_with_tls() { struct flb_connection *client_connection; @@ -552,6 +615,7 @@ void flb_test_issue_5336() TEST_LIST = { {"tcp", flb_test_tcp}, + {"tcp_with_source_address", flb_test_tcp_with_source_address}, {"tcp_with_tls", flb_test_tcp_with_tls}, {"format_none", flb_test_format_none}, {"format_none_separator", flb_test_format_none_separator},