Skip to content

Commit

Permalink
core: Extract a function for injecting entries to msgpack map
Browse files Browse the repository at this point in the history
Signed-off-by: Hiroshi Hatake <[email protected]>
  • Loading branch information
cosmo0920 committed Sep 27, 2023
1 parent 3c9c910 commit 223322a
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 75 deletions.
39 changes: 39 additions & 0 deletions include/fluent-bit/flb_msgpack_append_message.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2015-2023 The Fluent Bit Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef FLB_MSGPACK_APPEND_MESSAGE_H
#define FLB_MSGPACK_APPEND_MESSAGE_H

/* Error codes */
#define FLB_MAP_EXPAND_SUCCESS 0
#define FLB_MAP_NOT_MODIFIED -1
#define FLB_MAP_EXPANSION_ERROR -2
#define FLB_MAP_EXPANSION_INVALID_VALUE_TYPE -3

#include <fluent-bit/flb_pack.h>

int flb_msgpack_append_message_to_record(char **result_buffer,
size_t *result_size,
flb_sds_t message_key_name,
char *base_object_buffer,
size_t base_object_size,
char *message_buffer,
size_t message_size,
int message_type);
#endif
79 changes: 9 additions & 70 deletions plugins/in_tcp/tcp_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <fluent-bit/flb_network.h>
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_error.h>
#include <fluent-bit/flb_msgpack_append_message.h>

#include "tcp.h"
#include "tcp_conn.h"
Expand All @@ -32,68 +33,6 @@ static inline void consume_bytes(char *buf, int bytes, int length)
memmove(buf, buf + bytes, length - bytes);
}

static int append_message_to_record_data(char **result_buffer,
size_t *result_size,
flb_sds_t message_key_name,
char *base_object_buffer,
size_t base_object_size,
char *message_buffer,
size_t message_size,
int message_type)
{
int result = FLB_MAP_NOT_MODIFIED;
char *modified_data_buffer;
int modified_data_size;
msgpack_object_kv *new_map_entries[1];
msgpack_object_kv message_entry;
*result_buffer = NULL;
*result_size = 0;
modified_data_buffer = NULL;

if (message_key_name != NULL) {
new_map_entries[0] = &message_entry;

message_entry.key.type = MSGPACK_OBJECT_STR;
message_entry.key.via.str.size = flb_sds_len(message_key_name);
message_entry.key.via.str.ptr = message_key_name;

if (message_type == MSGPACK_OBJECT_BIN) {
message_entry.val.type = MSGPACK_OBJECT_BIN;
message_entry.val.via.bin.size = message_size;
message_entry.val.via.bin.ptr = message_buffer;
}
else if (message_type == MSGPACK_OBJECT_STR) {
message_entry.val.type = MSGPACK_OBJECT_STR;
message_entry.val.via.str.size = message_size;
message_entry.val.via.str.ptr = message_buffer;
}
else {
result = FLB_MAP_EXPANSION_INVALID_VALUE_TYPE;
}

if (result == FLB_MAP_NOT_MODIFIED) {
result = flb_msgpack_expand_map(base_object_buffer,
base_object_size,
new_map_entries, 1,
&modified_data_buffer,
&modified_data_size);
if (result == 0) {
result = FLB_MAP_EXPAND_SUCCESS;
}
else {
result = FLB_MAP_EXPANSION_ERROR;
}
}
}

if (result == FLB_MAP_EXPAND_SUCCESS) {
*result_buffer = modified_data_buffer;
*result_size = modified_data_size;
}

return result;
}

static inline int process_pack(struct tcp_conn *conn,
char *pack, size_t size)
{
Expand Down Expand Up @@ -132,14 +71,14 @@ static inline int process_pack(struct tcp_conn *conn,
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
if (entry.type == MSGPACK_OBJECT_MAP) {
if (ctx->source_address_key != NULL && source_address != NULL) {
ret = append_message_to_record_data(&appended_address_buffer,
&appended_address_size,
ctx->source_address_key,
pack + prev_off,
size,
source_address,
strlen(source_address),
MSGPACK_OBJECT_STR);
ret = flb_msgpack_append_message_to_record(&appended_address_buffer,
&appended_address_size,
ctx->source_address_key,
pack + prev_off,
size,
source_address,
strlen(source_address),
MSGPACK_OBJECT_STR);
}

if (ret == FLB_MAP_EXPANSION_ERROR) {
Expand Down
5 changes: 0 additions & 5 deletions plugins/in_tcp/tcp_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@

#define FLB_IN_TCP_CHUNK "32768"

#define FLB_MAP_EXPAND_SUCCESS 0
#define FLB_MAP_NOT_MODIFIED -1
#define FLB_MAP_EXPANSION_ERROR -2
#define FLB_MAP_EXPANSION_INVALID_VALUE_TYPE -3

enum {
TCP_NEW = 1, /* it's a new connection */
TCP_CONNECTED = 2, /* MQTT connection per protocol spec OK */
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ set(src
flb_log_event_encoder_dynamic_field.c
flb_processor.c
flb_reload.c
flb_msgpack_append_message.c
)

# Config format
Expand Down
82 changes: 82 additions & 0 deletions src/flb_msgpack_append_message.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2015-2023 The Fluent Bit Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <fluent-bit/flb_msgpack_append_message.h>

int flb_msgpack_append_message_to_record(char **result_buffer,
size_t *result_size,
flb_sds_t message_key_name,
char *base_object_buffer,
size_t base_object_size,
char *message_buffer,
size_t message_size,
int message_type)
{
int result = FLB_MAP_NOT_MODIFIED;
char *modified_data_buffer;
int modified_data_size;
msgpack_object_kv *new_map_entries[1];
msgpack_object_kv message_entry;
*result_buffer = NULL;
*result_size = 0;
modified_data_buffer = NULL;

if (message_key_name != NULL) {
new_map_entries[0] = &message_entry;

message_entry.key.type = MSGPACK_OBJECT_STR;
message_entry.key.via.str.size = flb_sds_len(message_key_name);
message_entry.key.via.str.ptr = message_key_name;

if (message_type == MSGPACK_OBJECT_BIN) {
message_entry.val.type = MSGPACK_OBJECT_BIN;
message_entry.val.via.bin.size = message_size;
message_entry.val.via.bin.ptr = message_buffer;
}
else if (message_type == MSGPACK_OBJECT_STR) {
message_entry.val.type = MSGPACK_OBJECT_STR;
message_entry.val.via.str.size = message_size;
message_entry.val.via.str.ptr = message_buffer;
}
else {
result = FLB_MAP_EXPANSION_INVALID_VALUE_TYPE;
}

if (result == FLB_MAP_NOT_MODIFIED) {
result = flb_msgpack_expand_map(base_object_buffer,
base_object_size,
new_map_entries, 1,
&modified_data_buffer,
&modified_data_size);
if (result == 0) {
result = FLB_MAP_EXPAND_SUCCESS;
}
else {
result = FLB_MAP_EXPANSION_ERROR;
}
}
}

if (result == FLB_MAP_EXPAND_SUCCESS) {
*result_buffer = modified_data_buffer;
*result_size = modified_data_size;
}

return result;
}

0 comments on commit 223322a

Please sign in to comment.