Skip to content

Commit

Permalink
in_mqtt: added buffer size setting and fixed a leak
Browse files Browse the repository at this point in the history
Signed-off-by: Leonardo Alminana <[email protected]>
  • Loading branch information
leonardo-albertovich committed Aug 5, 2024
1 parent a6aac45 commit fb26f7e
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 3 deletions.
5 changes: 5 additions & 0 deletions plugins/in_mqtt/mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_in_mqtt_config, payload_key),
"Key where the payload will be preserved"
},
{
FLB_CONFIG_MAP_SIZE, "buffer_size", MQTT_CONNECTION_DEFAULT_BUFFER_SIZE,
0, FLB_TRUE, offsetof(struct flb_in_mqtt_config, buffer_size),
"Maximum payload size"
},
/* EOF */
{0}
};
Expand Down
3 changes: 2 additions & 1 deletion plugins/in_mqtt/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ struct flb_in_mqtt_config {
char *tcp_port; /* TCP Port */

flb_sds_t payload_key; /* payload key */

size_t buffer_size; /* connection buffer size */

int msgp_len; /* msgpack data length */
char msgp[MQTT_MSGP_BUF_SIZE]; /* msgpack static buffer */
struct flb_input_instance *ins; /* plugin input instance */
Expand Down
16 changes: 15 additions & 1 deletion plugins/in_mqtt/mqtt_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ int mqtt_conn_event(void *data)
event = &connection->event;

if (event->mask & MK_EVENT_READ) {
available = sizeof(conn->buf) - conn->buf_len;
available = conn->buf_size - conn->buf_len;

bytes = flb_io_net_read(connection,
(void *) &conn->buf[conn->buf_len],
Expand Down Expand Up @@ -93,6 +93,16 @@ struct mqtt_conn *mqtt_conn_add(struct flb_connection *connection,
return NULL;
}

conn->buf = flb_calloc(ctx->buffer_size, 1);

if (conn->buf == NULL) {
flb_errno();
flb_free(conn);
return NULL;
}

conn->buf_size = ctx->buffer_size;

conn->connection = connection;

/* Set data for the event-loop */
Expand Down Expand Up @@ -137,6 +147,10 @@ int mqtt_conn_del(struct mqtt_conn *conn)
/* Release resources */
mk_list_del(&conn->_head);

if (conn->buf != NULL) {
flb_free(conn->buf);
}

flb_free(conn);

return 0;
Expand Down
5 changes: 4 additions & 1 deletion plugins/in_mqtt/mqtt_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

#include <fluent-bit/flb_connection.h>

#define MQTT_CONNECTION_DEFAULT_BUFFER_SIZE "2048"

enum {
MQTT_NEW = 1, /* it's a new connection */
MQTT_CONNECTED = 2, /* MQTT connection per protocol spec OK */
Expand All @@ -36,7 +38,8 @@ struct mqtt_conn {
int buf_frame_end; /* Frame end position */
int buf_pos; /* Index position */
int buf_len; /* Buffer content length */
unsigned char buf[1024]; /* Buffer data */
size_t buf_size; /* Buffer size */
unsigned char *buf; /* Buffer data */
struct flb_in_mqtt_config *ctx; /* Plugin configuration context */
struct flb_connection *connection;
struct mk_list _head; /* Link to flb_in_mqtt_config->conns */
Expand Down
4 changes: 4 additions & 0 deletions plugins/in_mqtt/mqtt_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,19 @@ static int mqtt_data_append(char *topic, size_t topic_len,
return -1;
}


printf("PAYLOAD : ||||||%.*s|||||\n\n", msg_len, msg);
off = 0;
msgpack_unpacked_init(&result);
if (msgpack_unpack_next(&result, pack, out, &off) != MSGPACK_UNPACK_SUCCESS) {
msgpack_unpacked_destroy(&result);
flb_free(pack);
return -1;
}

if (result.data.type != MSGPACK_OBJECT_MAP){
msgpack_unpacked_destroy(&result);
flb_free(pack);
return -1;
}
root = result.data;
Expand Down

0 comments on commit fb26f7e

Please sign in to comment.