Skip to content

Commit

Permalink
in_forward: Implement handshake protocol (#8561)
Browse files Browse the repository at this point in the history
* in_forward: Implement handshake mechanism

Handshake process on in_forward in Fluent Bit is:

1. send HELO                                    (FW_HANDSHAKE_HELO)
2. check PING                                   (none)
3. send PONG                                    (FW_HANDSHAKE_PINGPONG)
4. Mark a connect as established                (FW_HANDSHAKE_ESTABLISHED)
5. Process retrived records

On waiting the actual records, we just early return from the event
handler to wait the next read event for ingested logs.

---------

Signed-off-by: Hiroshi Hatake <[email protected]>
  • Loading branch information
cosmo0920 authored Mar 14, 2024
1 parent e1c6bcf commit dbfc4f6
Show file tree
Hide file tree
Showing 7 changed files with 927 additions and 3 deletions.
112 changes: 112 additions & 0 deletions plugins/in_forward/fw.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
*/

#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_kv.h>
#include <fluent-bit/flb_input.h>
#include <fluent-bit/flb_engine.h>
#include <fluent-bit/flb_downstream.h>
#include <fluent-bit/flb_input_plugin.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_network.h>
#include <msgpack.h>

Expand Down Expand Up @@ -153,6 +155,92 @@ static int in_fw_collect(struct flb_input_instance *ins,
return 0;
}

static void delete_users(struct flb_in_fw_config *ctx)
{
struct mk_list *tmp;
struct mk_list *head;
struct flb_in_fw_user *user;

mk_list_foreach_safe(head, tmp, &ctx->users) {
user = mk_list_entry(head, struct flb_in_fw_user, _head);
flb_sds_destroy(user->name);
flb_sds_destroy(user->password);
mk_list_del(&user->_head);
flb_free(user);
}
}

static int setup_users(struct flb_in_fw_config *ctx,
struct flb_input_instance *ins)
{
flb_sds_t tmp;
struct mk_list *head;
struct mk_list *split;
struct flb_split_entry *sentry;
struct flb_kv *kv;
struct flb_in_fw_user *user;

/* Iterate all input properties */
mk_list_foreach(head, &ins->properties) {
kv = mk_list_entry(head, struct flb_kv, _head);

/* Create a new user */
user = flb_malloc(sizeof(struct flb_in_fw_user));
if (!user) {
flb_errno();
return -1;
}

/* Get the type */
if (strcasecmp(kv->key, "security.users") != 0) {
/* Other property. Skip */
flb_free(user);
continue;
}

/* As a value we expect a pair of a username and a passowrd */
split = flb_utils_split(kv->val, ' ', 1);
if (mk_list_size(split) != 2) {
flb_plg_error(ctx->ins,
"invalid value, expected username and password");
delete_users(ctx);
flb_free(user);
flb_utils_split_free(split);
return -1;
}

/* Get first value (user's name) */
sentry = mk_list_entry_first(split, struct flb_split_entry, _head);
tmp = flb_sds_create_len(sentry->value, sentry->len + 1);
if (tmp == NULL) {
delete_users(ctx);
flb_free(user);
flb_utils_split_free(split);
return -1;
}
user->name = tmp;

/* Get remaining content (password) */
sentry = mk_list_entry_last(split, struct flb_split_entry, _head);
tmp = flb_sds_create_len(sentry->value, sentry->len);
if (tmp == NULL) {
delete_users(ctx);
flb_free(user);
flb_utils_split_free(split);
return -1;
}
user->password = tmp;

/* Release split */
flb_utils_split_free(split);

/* Link to parent list */
mk_list_add(&user->_head, &ctx->users);
}

return 0;
}

/* Initialize plugin */
static int in_fw_init(struct flb_input_instance *ins,
struct flb_config *config, void *data)
Expand All @@ -172,6 +260,7 @@ static int in_fw_init(struct flb_input_instance *ins,
ctx->coll_fd = -1;
ctx->ins = ins;
mk_list_init(&ctx->connections);
mk_list_init(&ctx->users);

/* Set the context */
flb_input_set_context(ins, ctx);
Expand Down Expand Up @@ -229,6 +318,13 @@ static int in_fw_init(struct flb_input_instance *ins,
}
}

/* Load users */
ret = setup_users(ctx, ins);
if (ret == -1) {
flb_free(ctx);
return -1;
}

flb_input_downstream_set(ctx->downstream, ctx->ins);

flb_net_socket_nonblocking(ctx->downstream->server_fd);
Expand Down Expand Up @@ -275,6 +371,7 @@ static int in_fw_exit(void *data, struct flb_config *config)
return 0;
}

delete_users(ctx);
fw_conn_del_all(ctx);
fw_config_destroy(ctx);
return 0;
Expand All @@ -287,6 +384,21 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_in_fw_config, tag_prefix),
"Prefix incoming tag with the defined value."
},
{
FLB_CONFIG_MAP_STR, "shared_key", NULL,
0, FLB_FALSE, 0,
"Shared key for authentication"
},
{
FLB_CONFIG_MAP_STR, "self_hostname", NULL,
0, FLB_FALSE, 0,
"Hostname"
},
{
FLB_CONFIG_MAP_STR, "security.users", NULL,
FLB_CONFIG_MAP_MULT, FLB_FALSE, 0,
"Specify username and password pairs."
},
{
FLB_CONFIG_MAP_STR, "unix_path", NULL,
0, FLB_TRUE, offsetof(struct flb_in_fw_config, unix_path),
Expand Down
24 changes: 24 additions & 0 deletions plugins/in_forward/fw.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,25 @@
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_log_event_encoder.h>

enum {
FW_HANDSHAKE_HELO = 1,
FW_HANDSHAKE_PINGPONG = 2,
FW_HANDSHAKE_ESTABLISHED = 3,
};

struct flb_in_fw_helo {
flb_sds_t nonce;
int nonce_len;
flb_sds_t salt;
int salt_len;
};

struct flb_in_fw_user {
flb_sds_t name;
flb_sds_t password;
struct mk_list _head;
};

struct flb_in_fw_config {
size_t buffer_max_size; /* Max Buffer size */
size_t buffer_chunk_size; /* Chunk allocation size */
Expand All @@ -40,6 +59,11 @@ struct flb_in_fw_config {
unsigned int unix_perm; /* Permission for socket */
flb_sds_t unix_perm_str; /* Permission (config map) */

/* secure forward */
flb_sds_t shared_key; /* shared key */
flb_sds_t self_hostname; /* hostname used in certificate */
struct mk_list users; /* username and password pairs */

int coll_fd;
struct flb_downstream *downstream; /* Client manager */
struct mk_list connections; /* List of active connections */
Expand Down
21 changes: 21 additions & 0 deletions plugins/in_forward/fw_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,24 @@ struct flb_in_fw_config *fw_config_init(struct flb_input_instance *i_ins)
flb_debug("[in_fw] Listen='%s' TCP_Port=%s",
config->listen, config->tcp_port);
}

/* Shared Key */
p = flb_input_get_property("shared_key", i_ins);
if (p) {
config->shared_key = flb_sds_create(p);
}
else {
config->shared_key = NULL;
}

/* Self Hostname */
p = flb_input_get_property("self_hostname", i_ins);
if (p) {
config->self_hostname = flb_sds_create(p);
}
else {
config->self_hostname = flb_sds_create("localhost");
}
return config;
}

Expand Down Expand Up @@ -114,6 +132,9 @@ int fw_config_destroy(struct flb_in_fw_config *config)
flb_free(config->tcp_port);
}

flb_sds_destroy(config->shared_key);
flb_sds_destroy(config->self_hostname);

flb_free(config);

return 0;
Expand Down
48 changes: 46 additions & 2 deletions plugins/in_forward/fw_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,24 @@ int fw_conn_event(void *data)

event = &connection->event;


if (event->mask & MK_EVENT_READ) {
if (conn->handshake_status == FW_HANDSHAKE_PINGPONG) {
flb_plg_trace(ctx->ins, "handshake status = %d", conn->handshake_status);

ret = fw_prot_secure_forward_handshake(ctx->ins, conn);
if (ret == -1) {
flb_plg_trace(ctx->ins, "fd=%i closed connection", event->fd);
fw_conn_del(conn);

return -1;
}

conn->handshake_status = FW_HANDSHAKE_ESTABLISHED;
return 0;
}

flb_plg_trace(ctx->ins, "handshake status = %d", conn->handshake_status);

available = (conn->buf_size - conn->buf_len);
if (available < 1) {
if (conn->buf_size >= ctx->buffer_max_size) {
Expand Down Expand Up @@ -116,6 +132,7 @@ struct fw_conn *fw_conn_add(struct flb_connection *connection, struct flb_in_fw_
{
struct fw_conn *conn;
int ret;
struct flb_in_fw_helo *helo = NULL;

conn = flb_malloc(sizeof(struct fw_conn));
if (!conn) {
Expand All @@ -124,7 +141,25 @@ struct fw_conn *fw_conn_add(struct flb_connection *connection, struct flb_in_fw_
return NULL;
}

conn->handshake_status = FW_HANDSHAKE_ESTABLISHED;
if (ctx->shared_key != NULL) {
conn->handshake_status = FW_HANDSHAKE_HELO;
helo = flb_malloc(sizeof(struct flb_in_fw_helo));
if (!helo) {
flb_errno();

return NULL;
}
ret = fw_prot_secure_forward_handshake_start(ctx->ins, connection, helo);
if (ret != 0) {
return NULL;
}

conn->handshake_status = FW_HANDSHAKE_PINGPONG;
}

conn->connection = connection;
conn->helo = helo;

/* Set data for the event-loop */
connection->user_data = conn;
Expand Down Expand Up @@ -178,6 +213,15 @@ int fw_conn_del(struct fw_conn *conn)
/* Release resources */
mk_list_del(&conn->_head);

if (conn->helo != NULL) {
if (conn->helo->nonce != NULL) {
flb_sds_destroy(conn->helo->nonce);
}
if (conn->helo->salt != NULL) {
flb_sds_destroy(conn->helo->salt);
}
flb_free(conn->helo);
}
flb_free(conn->buf);
flb_free(conn);

Expand All @@ -196,4 +240,4 @@ int fw_conn_del_all(struct flb_in_fw_config *ctx)
}

return 0;
}
}
7 changes: 7 additions & 0 deletions plugins/in_forward/fw_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

#define FLB_IN_FW_CHUNK_SIZE "1024000" /* 1MB */
#define FLB_IN_FW_CHUNK_MAX_SIZE "6144000" /* =FLB_IN_FW_CHUNK_SIZE * 6. 6MB */
#define FLB_IN_FW_NONCE_SIZE 16
#define FLB_IN_FW_SALT_SIZE 16

enum {
FW_NEW = 1, /* it's a new connection */
Expand All @@ -33,16 +35,21 @@ struct fw_conn_stream {
size_t tag_len;
};

struct flb_in_fw_helo;

/* Respresents a connection */
struct fw_conn {
int status; /* Connection status */
int handshake_status; /* handshake status */

/* Buffer */
char *buf; /* Buffer data */
int buf_len; /* Data length */
int buf_size; /* Buffer size */
size_t rest; /* Unpacking offset */

struct flb_in_fw_helo *helo; /* secure forward HELO phase */

struct flb_input_instance *in; /* Parent plugin instance */
struct flb_in_fw_config *ctx; /* Plugin configuration context */
struct flb_connection *connection;
Expand Down
Loading

0 comments on commit dbfc4f6

Please sign in to comment.