From fb26f7e83f93c95fff577c52bee2aeda7cff638f Mon Sep 17 00:00:00 2001 From: Leonardo Alminana Date: Mon, 5 Aug 2024 15:29:02 +0200 Subject: [PATCH] in_mqtt: added buffer size setting and fixed a leak Signed-off-by: Leonardo Alminana --- plugins/in_mqtt/mqtt.c | 5 +++++ plugins/in_mqtt/mqtt.h | 3 ++- plugins/in_mqtt/mqtt_conn.c | 16 +++++++++++++++- plugins/in_mqtt/mqtt_conn.h | 5 ++++- plugins/in_mqtt/mqtt_prot.c | 4 ++++ 5 files changed, 30 insertions(+), 3 deletions(-) diff --git a/plugins/in_mqtt/mqtt.c b/plugins/in_mqtt/mqtt.c index e712c1c8772..79be08d22a3 100644 --- a/plugins/in_mqtt/mqtt.c +++ b/plugins/in_mqtt/mqtt.c @@ -144,6 +144,11 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_in_mqtt_config, payload_key), "Key where the payload will be preserved" }, + { + FLB_CONFIG_MAP_SIZE, "buffer_size", MQTT_CONNECTION_DEFAULT_BUFFER_SIZE, + 0, FLB_TRUE, offsetof(struct flb_in_mqtt_config, buffer_size), + "Maximum payload size" + }, /* EOF */ {0} }; diff --git a/plugins/in_mqtt/mqtt.h b/plugins/in_mqtt/mqtt.h index 28e22e071a2..2a4ec69c808 100644 --- a/plugins/in_mqtt/mqtt.h +++ b/plugins/in_mqtt/mqtt.h @@ -30,7 +30,8 @@ struct flb_in_mqtt_config { char *tcp_port; /* TCP Port */ flb_sds_t payload_key; /* payload key */ - + size_t buffer_size; /* connection buffer size */ + int msgp_len; /* msgpack data length */ char msgp[MQTT_MSGP_BUF_SIZE]; /* msgpack static buffer */ struct flb_input_instance *ins; /* plugin input instance */ diff --git a/plugins/in_mqtt/mqtt_conn.c b/plugins/in_mqtt/mqtt_conn.c index 2d472f92ff3..26b8f071c47 100644 --- a/plugins/in_mqtt/mqtt_conn.c +++ b/plugins/in_mqtt/mqtt_conn.c @@ -48,7 +48,7 @@ int mqtt_conn_event(void *data) event = &connection->event; if (event->mask & MK_EVENT_READ) { - available = sizeof(conn->buf) - conn->buf_len; + available = conn->buf_size - conn->buf_len; bytes = flb_io_net_read(connection, (void *) &conn->buf[conn->buf_len], @@ -93,6 +93,16 @@ struct mqtt_conn *mqtt_conn_add(struct flb_connection *connection, return NULL; } + conn->buf = flb_calloc(ctx->buffer_size, 1); + + if (conn->buf == NULL) { + flb_errno(); + flb_free(conn); + return NULL; + } + + conn->buf_size = ctx->buffer_size; + conn->connection = connection; /* Set data for the event-loop */ @@ -137,6 +147,10 @@ int mqtt_conn_del(struct mqtt_conn *conn) /* Release resources */ mk_list_del(&conn->_head); + if (conn->buf != NULL) { + flb_free(conn->buf); + } + flb_free(conn); return 0; diff --git a/plugins/in_mqtt/mqtt_conn.h b/plugins/in_mqtt/mqtt_conn.h index 001d4e0ce4c..acf6acd3200 100644 --- a/plugins/in_mqtt/mqtt_conn.h +++ b/plugins/in_mqtt/mqtt_conn.h @@ -22,6 +22,8 @@ #include +#define MQTT_CONNECTION_DEFAULT_BUFFER_SIZE "2048" + enum { MQTT_NEW = 1, /* it's a new connection */ MQTT_CONNECTED = 2, /* MQTT connection per protocol spec OK */ @@ -36,7 +38,8 @@ struct mqtt_conn { int buf_frame_end; /* Frame end position */ int buf_pos; /* Index position */ int buf_len; /* Buffer content length */ - unsigned char buf[1024]; /* Buffer data */ + size_t buf_size; /* Buffer size */ + unsigned char *buf; /* Buffer data */ struct flb_in_mqtt_config *ctx; /* Plugin configuration context */ struct flb_connection *connection; struct mk_list _head; /* Link to flb_in_mqtt_config->conns */ diff --git a/plugins/in_mqtt/mqtt_prot.c b/plugins/in_mqtt/mqtt_prot.c index ebfbb425eb0..43951266420 100644 --- a/plugins/in_mqtt/mqtt_prot.c +++ b/plugins/in_mqtt/mqtt_prot.c @@ -144,15 +144,19 @@ static int mqtt_data_append(char *topic, size_t topic_len, return -1; } + +printf("PAYLOAD : ||||||%.*s|||||\n\n", msg_len, msg); off = 0; msgpack_unpacked_init(&result); if (msgpack_unpack_next(&result, pack, out, &off) != MSGPACK_UNPACK_SUCCESS) { msgpack_unpacked_destroy(&result); + flb_free(pack); return -1; } if (result.data.type != MSGPACK_OBJECT_MAP){ msgpack_unpacked_destroy(&result); + flb_free(pack); return -1; } root = result.data;