From 64eba24c0cf021f1f47e7df260e95989c1838653 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 7 Jul 2023 20:25:53 +0900 Subject: [PATCH 01/10] in_tcp: Add a capability to inject source IP Signed-off-by: Hiroshi Hatake --- plugins/in_tcp/tcp.c | 5 ++ plugins/in_tcp/tcp.h | 1 + plugins/in_tcp/tcp_conn.c | 130 +++++++++++++++++++++++++++++++++++++- plugins/in_tcp/tcp_conn.h | 4 ++ 4 files changed, 137 insertions(+), 3 deletions(-) diff --git a/plugins/in_tcp/tcp.c b/plugins/in_tcp/tcp.c index 084ea68879f..fcca97eb284 100644 --- a/plugins/in_tcp/tcp.c +++ b/plugins/in_tcp/tcp.c @@ -166,6 +166,11 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_in_tcp_config, buffer_size_str), "Set the buffer size" }, + { + FLB_CONFIG_MAP_STR, "source_address_key", (char *) NULL, + 0, FLB_TRUE, offsetof(struct flb_in_tcp_config, source_address_key), + "Key where the source address will be injected" + }, /* EOF */ {0} }; diff --git a/plugins/in_tcp/tcp.h b/plugins/in_tcp/tcp.h index 3ddcbed06ad..020d3a940c9 100644 --- a/plugins/in_tcp/tcp.h +++ b/plugins/in_tcp/tcp.h @@ -40,6 +40,7 @@ struct flb_in_tcp_config { char *tcp_port; /* TCP Port */ flb_sds_t raw_separator; /* Unescaped string delimiterr */ flb_sds_t separator; /* String delimiter */ + flb_sds_t source_address_key; /* Source IP address */ int collector_id; /* Listener collector id */ struct flb_downstream *downstream; /* Client manager */ struct mk_list connections; /* List of active connections */ diff --git a/plugins/in_tcp/tcp_conn.c b/plugins/in_tcp/tcp_conn.c index 28b4b32221d..b79b22d2b3d 100644 --- a/plugins/in_tcp/tcp_conn.c +++ b/plugins/in_tcp/tcp_conn.c @@ -32,14 +32,84 @@ static inline void consume_bytes(char *buf, int bytes, int length) memmove(buf, buf + bytes, length - bytes); } +static int append_message_to_record_data(char **result_buffer, + size_t *result_size, + flb_sds_t message_key_name, + char *base_object_buffer, + size_t base_object_size, + char *message_buffer, + size_t message_size, + int message_type) +{ + int result; + char *modified_data_buffer; + int modified_data_size; + msgpack_object_kv *new_map_entries[1]; + msgpack_object_kv message_entry; + *result_buffer = NULL; + *result_size = 0; + modified_data_buffer = NULL; + + if (message_key_name != NULL) { + new_map_entries[0] = &message_entry; + + message_entry.key.type = MSGPACK_OBJECT_STR; + message_entry.key.via.str.size = flb_sds_len(message_key_name); + message_entry.key.via.str.ptr = message_key_name; + + if (message_type == MSGPACK_OBJECT_BIN) { + message_entry.val.type = MSGPACK_OBJECT_BIN; + message_entry.val.via.bin.size = message_size; + message_entry.val.via.bin.ptr = message_buffer; + } + else if (message_type == MSGPACK_OBJECT_STR) { + message_entry.val.type = MSGPACK_OBJECT_BIN; + message_entry.val.via.str.size = message_size; + message_entry.val.via.str.ptr = message_buffer; + } + else { + result = FLB_MAP_NOT_MODIFIED; + + return result; + } + + result = flb_msgpack_expand_map(base_object_buffer, + base_object_size, + new_map_entries, 1, + &modified_data_buffer, + &modified_data_size); + if (result == 0) { + result = FLB_MAP_EXPAND_SUCCESS; + } + } + + if (result != FLB_MAP_EXPAND_SUCCESS) { + result = FLB_MAP_EXPANSION_ERROR; + + return result; + } + + *result_buffer = modified_data_buffer; + *result_size = modified_data_size; + + return result; +} + static inline int process_pack(struct tcp_conn *conn, char *pack, size_t size) { int ret; size_t off = 0; msgpack_unpacked result; + msgpack_sbuffer sbuf; + msgpack_packer pck; msgpack_object entry; struct flb_in_tcp_config *ctx; + char *appended_address_buffer; + size_t appended_address_size; + char *source_address; + int i; + int len; ctx = conn->ctx; @@ -50,22 +120,72 @@ static inline int process_pack(struct tcp_conn *conn, while (msgpack_unpack_next(&result, pack, size, &off) == MSGPACK_UNPACK_SUCCESS) { entry = result.data; + appended_address_buffer = NULL; + source_address = NULL; + ret = flb_log_event_encoder_begin_record(ctx->log_encoder); if (ret == FLB_EVENT_ENCODER_SUCCESS) { ret = flb_log_event_encoder_set_current_timestamp(ctx->log_encoder); } + if (ctx->source_address_key != NULL) { + source_address = flb_connection_get_remote_address(conn->connection); + } + if (ret == FLB_EVENT_ENCODER_SUCCESS) { if (entry.type == MSGPACK_OBJECT_MAP) { - ret = flb_log_event_encoder_set_body_from_msgpack_object( - ctx->log_encoder, &entry); + if (source_address != NULL) { + msgpack_sbuffer_init(&sbuf); + msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write); + + len = entry.via.map.size; + msgpack_pack_map(&pck, len); + + for (i=0; isource_address_key, + sbuf.data, + sbuf.size, + source_address, + strlen(source_address), + MSGPACK_OBJECT_STR); + msgpack_sbuffer_destroy(&sbuf); + } + + if (ret == FLB_MAP_EXPANSION_ERROR) { + flb_plg_debug(ctx->ins, "error expanding source_address : %d", ret); + } + + if (appended_address_buffer != NULL) { + ret = flb_log_event_encoder_set_body_from_raw_msgpack( + ctx->log_encoder, appended_address_buffer, appended_address_size); + } + else { + ret = flb_log_event_encoder_set_body_from_msgpack_object( + ctx->log_encoder, &entry); + } } else if (entry.type == MSGPACK_OBJECT_ARRAY) { - ret = flb_log_event_encoder_append_body_values( + if (source_address != NULL) { + ret = flb_log_event_encoder_append_body_values( + ctx->log_encoder, + FLB_LOG_EVENT_CSTRING_VALUE("msg"), + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&entry), + FLB_LOG_EVENT_CSTRING_VALUE(ctx->source_address_key), + FLB_LOG_EVENT_CSTRING_VALUE(source_address)); + } + else { + ret = flb_log_event_encoder_append_body_values( ctx->log_encoder, FLB_LOG_EVENT_CSTRING_VALUE("msg"), FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&entry)); + } } else { ret = FLB_EVENT_ENCODER_ERROR_INVALID_VALUE_TYPE; @@ -75,6 +195,10 @@ static inline int process_pack(struct tcp_conn *conn, ret = flb_log_event_encoder_commit_record(ctx->log_encoder); } + if (appended_address_buffer != NULL) { + flb_free(appended_address_buffer); + } + if (ret != FLB_EVENT_ENCODER_SUCCESS) { break; } diff --git a/plugins/in_tcp/tcp_conn.h b/plugins/in_tcp/tcp_conn.h index f9af869f219..597be7f4661 100644 --- a/plugins/in_tcp/tcp_conn.h +++ b/plugins/in_tcp/tcp_conn.h @@ -25,6 +25,10 @@ #define FLB_IN_TCP_CHUNK "32768" +#define FLB_MAP_EXPAND_SUCCESS 0 +#define FLB_MAP_NOT_MODIFIED -1 +#define FLB_MAP_EXPANSION_ERROR -2 + enum { TCP_NEW = 1, /* it's a new connection */ TCP_CONNECTED = 2, /* MQTT connection per protocol spec OK */ From 0553f6a069817ed8f6290df43694d63f02b2c69d Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 10 Jul 2023 13:38:32 +0900 Subject: [PATCH 02/10] in_tcp: test: Add a source IP injection test case Signed-off-by: Hiroshi Hatake --- tests/runtime/in_tcp.c | 64 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/tests/runtime/in_tcp.c b/tests/runtime/in_tcp.c index eee20bd6438..732af004198 100644 --- a/tests/runtime/in_tcp.c +++ b/tests/runtime/in_tcp.c @@ -255,6 +255,69 @@ void flb_test_tcp() test_ctx_destroy(ctx); } +void flb_test_tcp_with_source_address() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + flb_sockfd_t fd; + int ret; + int num; + ssize_t w_size; + + char *buf = "{\"test\":\"msg\"}"; + size_t size = strlen(buf); + + clear_output_num(); + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"test\":\"msg\",\"source_host\":\"tcp://"; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "source_address_key", "source_host", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* use default host/port */ + fd = connect_tcp(NULL, -1); + if (!TEST_CHECK(fd >= 0)) { + exit(EXIT_FAILURE); + } + + w_size = send(fd, buf, size, 0); + if (!TEST_CHECK(w_size == size)) { + TEST_MSG("failed to send, errno=%d", errno); + flb_socket_close(fd); + exit(EXIT_FAILURE); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs"); + } + + flb_socket_close(fd); + test_ctx_destroy(ctx); +} + void flb_test_tcp_with_tls() { struct flb_connection *client_connection; @@ -552,6 +615,7 @@ void flb_test_issue_5336() TEST_LIST = { {"tcp", flb_test_tcp}, + {"tcp_with_source_address", flb_test_tcp_with_source_address}, {"tcp_with_tls", flb_test_tcp_with_tls}, {"format_none", flb_test_format_none}, {"format_none_separator", flb_test_format_none_separator}, From 63cf945849b8d4d288bd97eb2a01b4c02c50bb3a Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 20 Jul 2023 00:38:36 +0900 Subject: [PATCH 03/10] in_tcp: Fix an incorrect type Signed-off-by: Hiroshi Hatake --- plugins/in_tcp/tcp_conn.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/in_tcp/tcp_conn.c b/plugins/in_tcp/tcp_conn.c index b79b22d2b3d..6684dbfd182 100644 --- a/plugins/in_tcp/tcp_conn.c +++ b/plugins/in_tcp/tcp_conn.c @@ -63,7 +63,7 @@ static int append_message_to_record_data(char **result_buffer, message_entry.val.via.bin.ptr = message_buffer; } else if (message_type == MSGPACK_OBJECT_STR) { - message_entry.val.type = MSGPACK_OBJECT_BIN; + message_entry.val.type = MSGPACK_OBJECT_STR; message_entry.val.via.str.size = message_size; message_entry.val.via.str.ptr = message_buffer; } From 1cb14b5939e0c73a8323212f42b249154b438c63 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 20 Jul 2023 00:39:25 +0900 Subject: [PATCH 04/10] in_tcp: Organize a needless premature return Signed-off-by: Hiroshi Hatake --- plugins/in_tcp/tcp_conn.c | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/plugins/in_tcp/tcp_conn.c b/plugins/in_tcp/tcp_conn.c index 6684dbfd182..0fc003adad6 100644 --- a/plugins/in_tcp/tcp_conn.c +++ b/plugins/in_tcp/tcp_conn.c @@ -85,13 +85,11 @@ static int append_message_to_record_data(char **result_buffer, if (result != FLB_MAP_EXPAND_SUCCESS) { result = FLB_MAP_EXPANSION_ERROR; - - return result; + } else { + *result_buffer = modified_data_buffer; + *result_size = modified_data_size; } - *result_buffer = modified_data_buffer; - *result_size = modified_data_size; - return result; } From ac369820d7992b36aa0781f8146e2b78ed5a0bf8 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 20 Jul 2023 15:46:51 +0900 Subject: [PATCH 05/10] in_tcp: Address comments Signed-off-by: Hiroshi Hatake --- plugins/in_tcp/tcp_conn.c | 29 +++++++++++++++-------------- plugins/in_tcp/tcp_conn.h | 1 + 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/plugins/in_tcp/tcp_conn.c b/plugins/in_tcp/tcp_conn.c index 0fc003adad6..89a43615a9c 100644 --- a/plugins/in_tcp/tcp_conn.c +++ b/plugins/in_tcp/tcp_conn.c @@ -41,7 +41,7 @@ static int append_message_to_record_data(char **result_buffer, size_t message_size, int message_type) { - int result; + int result = FLB_MAP_NOT_MODIFIED; char *modified_data_buffer; int modified_data_size; msgpack_object_kv *new_map_entries[1]; @@ -68,24 +68,25 @@ static int append_message_to_record_data(char **result_buffer, message_entry.val.via.str.ptr = message_buffer; } else { - result = FLB_MAP_NOT_MODIFIED; - - return result; + result = FLB_MAP_EXPANSION_INVALID_VALUE_TYPE; } - result = flb_msgpack_expand_map(base_object_buffer, - base_object_size, - new_map_entries, 1, - &modified_data_buffer, - &modified_data_size); - if (result == 0) { - result = FLB_MAP_EXPAND_SUCCESS; + if (result == FLB_MAP_NOT_MODIFIED) { + result = flb_msgpack_expand_map(base_object_buffer, + base_object_size, + new_map_entries, 1, + &modified_data_buffer, + &modified_data_size); + if (result == 0) { + result = FLB_MAP_EXPAND_SUCCESS; + } + else { + result = FLB_MAP_EXPANSION_ERROR; + } } } - if (result != FLB_MAP_EXPAND_SUCCESS) { - result = FLB_MAP_EXPANSION_ERROR; - } else { + if (result == FLB_MAP_EXPAND_SUCCESS) { *result_buffer = modified_data_buffer; *result_size = modified_data_size; } diff --git a/plugins/in_tcp/tcp_conn.h b/plugins/in_tcp/tcp_conn.h index 597be7f4661..2d1e390f82c 100644 --- a/plugins/in_tcp/tcp_conn.h +++ b/plugins/in_tcp/tcp_conn.h @@ -28,6 +28,7 @@ #define FLB_MAP_EXPAND_SUCCESS 0 #define FLB_MAP_NOT_MODIFIED -1 #define FLB_MAP_EXPANSION_ERROR -2 +#define FLB_MAP_EXPANSION_INVALID_VALUE_TYPE -3 enum { TCP_NEW = 1, /* it's a new connection */ From a107cfdea8c327466ff4b30760bbed4fbd167051 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 26 Jul 2023 17:56:14 +0900 Subject: [PATCH 06/10] in_tcp: Use more strict condition to inject src_ip Signed-off-by: Hiroshi Hatake --- plugins/in_tcp/tcp_conn.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/in_tcp/tcp_conn.c b/plugins/in_tcp/tcp_conn.c index 89a43615a9c..1b44239ac4e 100644 --- a/plugins/in_tcp/tcp_conn.c +++ b/plugins/in_tcp/tcp_conn.c @@ -134,7 +134,7 @@ static inline int process_pack(struct tcp_conn *conn, if (ret == FLB_EVENT_ENCODER_SUCCESS) { if (entry.type == MSGPACK_OBJECT_MAP) { - if (source_address != NULL) { + if (ctx->source_address_key != NULL && source_address != NULL) { msgpack_sbuffer_init(&sbuf); msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write); @@ -171,7 +171,7 @@ static inline int process_pack(struct tcp_conn *conn, } } else if (entry.type == MSGPACK_OBJECT_ARRAY) { - if (source_address != NULL) { + if (ctx->source_address_key != NULL && source_address != NULL) { ret = flb_log_event_encoder_append_body_values( ctx->log_encoder, FLB_LOG_EVENT_CSTRING_VALUE("msg"), From 3c9c9105d57cd5456ff915b5a5e92e972bb68c61 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 26 Jul 2023 18:12:41 +0900 Subject: [PATCH 07/10] in_syslog: Reduce buffer allocation for pass msgpack buffer to inject src_ip Signed-off-by: Hiroshi Hatake --- plugins/in_tcp/tcp_conn.c | 22 ++++------------------ 1 file changed, 4 insertions(+), 18 deletions(-) diff --git a/plugins/in_tcp/tcp_conn.c b/plugins/in_tcp/tcp_conn.c index 1b44239ac4e..04ea0945a17 100644 --- a/plugins/in_tcp/tcp_conn.c +++ b/plugins/in_tcp/tcp_conn.c @@ -99,16 +99,13 @@ static inline int process_pack(struct tcp_conn *conn, { int ret; size_t off = 0; + size_t prev_off = 0; msgpack_unpacked result; - msgpack_sbuffer sbuf; - msgpack_packer pck; msgpack_object entry; struct flb_in_tcp_config *ctx; char *appended_address_buffer; size_t appended_address_size; char *source_address; - int i; - int len; ctx = conn->ctx; @@ -135,26 +132,14 @@ static inline int process_pack(struct tcp_conn *conn, if (ret == FLB_EVENT_ENCODER_SUCCESS) { if (entry.type == MSGPACK_OBJECT_MAP) { if (ctx->source_address_key != NULL && source_address != NULL) { - msgpack_sbuffer_init(&sbuf); - msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write); - - len = entry.via.map.size; - msgpack_pack_map(&pck, len); - - for (i=0; isource_address_key, - sbuf.data, - sbuf.size, + pack + prev_off, + size, source_address, strlen(source_address), MSGPACK_OBJECT_STR); - msgpack_sbuffer_destroy(&sbuf); } if (ret == FLB_MAP_EXPANSION_ERROR) { @@ -202,6 +187,7 @@ static inline int process_pack(struct tcp_conn *conn, break; } } + prev_off = off; } msgpack_unpacked_destroy(&result); From 223322af92d569c4d6c66fc7bc33552c4d32dbd4 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 26 Jul 2023 19:04:29 +0900 Subject: [PATCH 08/10] core: Extract a function for injecting entries to msgpack map Signed-off-by: Hiroshi Hatake --- .../fluent-bit/flb_msgpack_append_message.h | 39 +++++++++ plugins/in_tcp/tcp_conn.c | 79 ++---------------- plugins/in_tcp/tcp_conn.h | 5 -- src/CMakeLists.txt | 1 + src/flb_msgpack_append_message.c | 82 +++++++++++++++++++ 5 files changed, 131 insertions(+), 75 deletions(-) create mode 100644 include/fluent-bit/flb_msgpack_append_message.h create mode 100644 src/flb_msgpack_append_message.c diff --git a/include/fluent-bit/flb_msgpack_append_message.h b/include/fluent-bit/flb_msgpack_append_message.h new file mode 100644 index 00000000000..5d821f27457 --- /dev/null +++ b/include/fluent-bit/flb_msgpack_append_message.h @@ -0,0 +1,39 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_MSGPACK_APPEND_MESSAGE_H +#define FLB_MSGPACK_APPEND_MESSAGE_H + +/* Error codes */ +#define FLB_MAP_EXPAND_SUCCESS 0 +#define FLB_MAP_NOT_MODIFIED -1 +#define FLB_MAP_EXPANSION_ERROR -2 +#define FLB_MAP_EXPANSION_INVALID_VALUE_TYPE -3 + +#include + +int flb_msgpack_append_message_to_record(char **result_buffer, + size_t *result_size, + flb_sds_t message_key_name, + char *base_object_buffer, + size_t base_object_size, + char *message_buffer, + size_t message_size, + int message_type); +#endif diff --git a/plugins/in_tcp/tcp_conn.c b/plugins/in_tcp/tcp_conn.c index 04ea0945a17..3f81ca27878 100644 --- a/plugins/in_tcp/tcp_conn.c +++ b/plugins/in_tcp/tcp_conn.c @@ -23,6 +23,7 @@ #include #include #include +#include #include "tcp.h" #include "tcp_conn.h" @@ -32,68 +33,6 @@ static inline void consume_bytes(char *buf, int bytes, int length) memmove(buf, buf + bytes, length - bytes); } -static int append_message_to_record_data(char **result_buffer, - size_t *result_size, - flb_sds_t message_key_name, - char *base_object_buffer, - size_t base_object_size, - char *message_buffer, - size_t message_size, - int message_type) -{ - int result = FLB_MAP_NOT_MODIFIED; - char *modified_data_buffer; - int modified_data_size; - msgpack_object_kv *new_map_entries[1]; - msgpack_object_kv message_entry; - *result_buffer = NULL; - *result_size = 0; - modified_data_buffer = NULL; - - if (message_key_name != NULL) { - new_map_entries[0] = &message_entry; - - message_entry.key.type = MSGPACK_OBJECT_STR; - message_entry.key.via.str.size = flb_sds_len(message_key_name); - message_entry.key.via.str.ptr = message_key_name; - - if (message_type == MSGPACK_OBJECT_BIN) { - message_entry.val.type = MSGPACK_OBJECT_BIN; - message_entry.val.via.bin.size = message_size; - message_entry.val.via.bin.ptr = message_buffer; - } - else if (message_type == MSGPACK_OBJECT_STR) { - message_entry.val.type = MSGPACK_OBJECT_STR; - message_entry.val.via.str.size = message_size; - message_entry.val.via.str.ptr = message_buffer; - } - else { - result = FLB_MAP_EXPANSION_INVALID_VALUE_TYPE; - } - - if (result == FLB_MAP_NOT_MODIFIED) { - result = flb_msgpack_expand_map(base_object_buffer, - base_object_size, - new_map_entries, 1, - &modified_data_buffer, - &modified_data_size); - if (result == 0) { - result = FLB_MAP_EXPAND_SUCCESS; - } - else { - result = FLB_MAP_EXPANSION_ERROR; - } - } - } - - if (result == FLB_MAP_EXPAND_SUCCESS) { - *result_buffer = modified_data_buffer; - *result_size = modified_data_size; - } - - return result; -} - static inline int process_pack(struct tcp_conn *conn, char *pack, size_t size) { @@ -132,14 +71,14 @@ static inline int process_pack(struct tcp_conn *conn, if (ret == FLB_EVENT_ENCODER_SUCCESS) { if (entry.type == MSGPACK_OBJECT_MAP) { if (ctx->source_address_key != NULL && source_address != NULL) { - ret = append_message_to_record_data(&appended_address_buffer, - &appended_address_size, - ctx->source_address_key, - pack + prev_off, - size, - source_address, - strlen(source_address), - MSGPACK_OBJECT_STR); + ret = flb_msgpack_append_message_to_record(&appended_address_buffer, + &appended_address_size, + ctx->source_address_key, + pack + prev_off, + size, + source_address, + strlen(source_address), + MSGPACK_OBJECT_STR); } if (ret == FLB_MAP_EXPANSION_ERROR) { diff --git a/plugins/in_tcp/tcp_conn.h b/plugins/in_tcp/tcp_conn.h index 2d1e390f82c..f9af869f219 100644 --- a/plugins/in_tcp/tcp_conn.h +++ b/plugins/in_tcp/tcp_conn.h @@ -25,11 +25,6 @@ #define FLB_IN_TCP_CHUNK "32768" -#define FLB_MAP_EXPAND_SUCCESS 0 -#define FLB_MAP_NOT_MODIFIED -1 -#define FLB_MAP_EXPANSION_ERROR -2 -#define FLB_MAP_EXPANSION_INVALID_VALUE_TYPE -3 - enum { TCP_NEW = 1, /* it's a new connection */ TCP_CONNECTED = 2, /* MQTT connection per protocol spec OK */ diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b6233d9f721..a2794bba93e 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -78,6 +78,7 @@ set(src flb_log_event_encoder_dynamic_field.c flb_processor.c flb_reload.c + flb_msgpack_append_message.c ) # Config format diff --git a/src/flb_msgpack_append_message.c b/src/flb_msgpack_append_message.c new file mode 100644 index 00000000000..75df33b5c7a --- /dev/null +++ b/src/flb_msgpack_append_message.c @@ -0,0 +1,82 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +int flb_msgpack_append_message_to_record(char **result_buffer, + size_t *result_size, + flb_sds_t message_key_name, + char *base_object_buffer, + size_t base_object_size, + char *message_buffer, + size_t message_size, + int message_type) +{ + int result = FLB_MAP_NOT_MODIFIED; + char *modified_data_buffer; + int modified_data_size; + msgpack_object_kv *new_map_entries[1]; + msgpack_object_kv message_entry; + *result_buffer = NULL; + *result_size = 0; + modified_data_buffer = NULL; + + if (message_key_name != NULL) { + new_map_entries[0] = &message_entry; + + message_entry.key.type = MSGPACK_OBJECT_STR; + message_entry.key.via.str.size = flb_sds_len(message_key_name); + message_entry.key.via.str.ptr = message_key_name; + + if (message_type == MSGPACK_OBJECT_BIN) { + message_entry.val.type = MSGPACK_OBJECT_BIN; + message_entry.val.via.bin.size = message_size; + message_entry.val.via.bin.ptr = message_buffer; + } + else if (message_type == MSGPACK_OBJECT_STR) { + message_entry.val.type = MSGPACK_OBJECT_STR; + message_entry.val.via.str.size = message_size; + message_entry.val.via.str.ptr = message_buffer; + } + else { + result = FLB_MAP_EXPANSION_INVALID_VALUE_TYPE; + } + + if (result == FLB_MAP_NOT_MODIFIED) { + result = flb_msgpack_expand_map(base_object_buffer, + base_object_size, + new_map_entries, 1, + &modified_data_buffer, + &modified_data_size); + if (result == 0) { + result = FLB_MAP_EXPAND_SUCCESS; + } + else { + result = FLB_MAP_EXPANSION_ERROR; + } + } + } + + if (result == FLB_MAP_EXPAND_SUCCESS) { + *result_buffer = modified_data_buffer; + *result_size = modified_data_size; + } + + return result; +} From b76c55bfe1b66da4a4604fd5f84e3b2fdd9f3df1 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 27 Jul 2023 17:43:14 +0900 Subject: [PATCH 09/10] core: test: Add an internal test case for appending kv elements to records Signed-off-by: Hiroshi Hatake --- tests/internal/CMakeLists.txt | 1 + .../data/msgpack_append_message/map1.json | 5 + tests/internal/msgpack_append_message.c | 113 ++++++++++++++++++ 3 files changed, 119 insertions(+) create mode 100644 tests/internal/data/msgpack_append_message/map1.json create mode 100644 tests/internal/msgpack_append_message.c diff --git a/tests/internal/CMakeLists.txt b/tests/internal/CMakeLists.txt index 5288004dbab..07aefb49c2d 100644 --- a/tests/internal/CMakeLists.txt +++ b/tests/internal/CMakeLists.txt @@ -41,6 +41,7 @@ set(UNIT_TESTS_FILES log_event_decoder.c processor.c uri.c + msgpack_append_message.c ) # Config format diff --git a/tests/internal/data/msgpack_append_message/map1.json b/tests/internal/data/msgpack_append_message/map1.json new file mode 100644 index 00000000000..c824feb7c1d --- /dev/null +++ b/tests/internal/data/msgpack_append_message/map1.json @@ -0,0 +1,5 @@ +{"key1": 123456789, + "key2": 0.999887766, + "key3": "abcdefghijklmnopqrstuvwxyz", + "key4": [{"a": 10, "b": 20}, {"c": 30, "d": 40}] +} diff --git a/tests/internal/msgpack_append_message.c b/tests/internal/msgpack_append_message.c new file mode 100644 index 00000000000..320e5853014 --- /dev/null +++ b/tests/internal/msgpack_append_message.c @@ -0,0 +1,113 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include /* for NAN */ + +/* JSON tests data */ +#define JSON_MAP1 FLB_TESTS_DATA_PATH "/data/msgpack_append_message/map1.json" + +#include "flb_tests_internal.h" + +struct msgpack_append_message_test { + char *msgpack; + char *json; +}; + +static inline int process_pack(char *pack, size_t size) +{ + int ret; + msgpack_unpacked result; + char *appended_buffer = NULL; + size_t appended_size; + char *inject_message = "injected"; + char *inject_key_name = "expanding"; + flb_sds_t inject_key; + size_t off = 0; + size_t prev_off = 0; + flb_sds_t out_buf; + char *p = NULL; + + inject_key = flb_sds_create_len(inject_key_name, strlen(inject_key_name)); + if (!inject_key) { + flb_errno(); + return -1; + } + msgpack_unpacked_init(&result); + while (msgpack_unpack_next(&result, pack, size, &off) == MSGPACK_UNPACK_SUCCESS) { + if (result.data.type == MSGPACK_OBJECT_MAP) { + ret = flb_msgpack_append_message_to_record(&appended_buffer, + &appended_size, + inject_key, + pack + prev_off, + size, + inject_message, + 8, + MSGPACK_OBJECT_STR); + TEST_CHECK(ret == 0); + + out_buf = flb_msgpack_raw_to_json_sds(appended_buffer, appended_size); + TEST_CHECK(out_buf != NULL); + p = strstr(out_buf, "\"expanding\":\"injected\""); + if (!TEST_CHECK(p != NULL)) { + TEST_MSG("\"expanding\":\"injected\" should be appended. out_buf=%s", out_buf); + } + if (out_buf) { + flb_sds_destroy(out_buf); + } + } + prev_off = off; + } + + msgpack_unpacked_destroy(&result); + + flb_sds_destroy(inject_key); + flb_free(appended_buffer); + + return ret; +} + +/* Append a single key-value pair into msgpack map */ +void test_append_basic() +{ + int ret; + size_t len; + char *data; + char *pack; + int out_size; + struct flb_pack_state state; + + data = mk_file_to_buffer(JSON_MAP1); + TEST_CHECK(data != NULL); + + len = strlen(data); + + ret = flb_pack_state_init(&state); + TEST_CHECK(ret == 0); + + ret = flb_pack_json_state(data, len, &pack, &out_size, &state); + TEST_CHECK(ret == 0); + + ret = process_pack(pack, out_size); + TEST_CHECK(ret == 0); + + flb_pack_state_reset(&state); + flb_free(data); + flb_free(pack); +} + +TEST_LIST = { + { "basic", test_append_basic }, + { 0 } +}; From 50077bc7d411295e0bc5afd54cc4efb3bb50b7de Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 1 Sep 2023 12:13:20 +0900 Subject: [PATCH 10/10] msgpack_append_message: test: Remove needless includes Signed-off-by: Hiroshi Hatake --- tests/internal/msgpack_append_message.c | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/internal/msgpack_append_message.c b/tests/internal/msgpack_append_message.c index 320e5853014..abb487e877c 100644 --- a/tests/internal/msgpack_append_message.c +++ b/tests/internal/msgpack_append_message.c @@ -10,10 +10,6 @@ #include #include -#include -#include -#include -#include /* for NAN */ /* JSON tests data */ #define JSON_MAP1 FLB_TESTS_DATA_PATH "/data/msgpack_append_message/map1.json"