From 82017e9bccc3a4725c3db85fce5f0babea0bd99b Mon Sep 17 00:00:00 2001 From: Bogdan Kolendovskyy Date: Mon, 5 Aug 2024 11:48:41 +0200 Subject: [PATCH] feat: Include message topic in all data events for big messages. When message is larger than the buffer, and must produce several events include topic where it came from in each of those events --- mqtt_client.c | 41 ++++++++++++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/mqtt_client.c b/mqtt_client.c index a5afa3c..5bf3af9 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -521,7 +521,7 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl } else { client->config->reconnect_timeout_ms = MQTT_RECON_DEFAULT_MS; } - + client->config->transport = config->network.transport; if (config->network.if_name) { @@ -1062,7 +1062,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client) } #endif } else { - // get topic + // get and save topic msg_topic = mqtt_get_publish_topic(msg_buf, &msg_topic_len); if (msg_topic == NULL) { ESP_LOGE(TAG, "%s: mqtt_get_publish_topic() failed", __func__); @@ -1077,6 +1077,10 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client) return ESP_FAIL; } } + char *saved_msg_topic = strndup(msg_topic, msg_topic_len); + ESP_MEM_CHECK(TAG, saved_msg_topic, return ESP_ERR_NO_MEM); + size_t saved_msg_topic_len = msg_topic_len; + // post data event client->event.retain = mqtt_get_retain(msg_buf); if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) { @@ -1089,7 +1093,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client) client->event.qos = mqtt_get_qos(msg_buf); client->event.dup = mqtt_get_dup(msg_buf); client->event.total_data_len = msg_data_len + msg_total_len - msg_read_len; -post_data_event: + ESP_LOGD(TAG, "Get data len= %"NEWLIB_NANO_COMPAT_FORMAT", topic len=%"NEWLIB_NANO_COMPAT_FORMAT", total_data: %d offset: %"NEWLIB_NANO_COMPAT_FORMAT, NEWLIB_NANO_COMPAT_CAST(msg_data_len), NEWLIB_NANO_COMPAT_CAST(msg_topic_len), client->event.total_data_len, NEWLIB_NANO_COMPAT_CAST(msg_data_offset)); @@ -1101,24 +1105,39 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client) client->event.topic_len = msg_topic_len; esp_mqtt_dispatch_event(client); - if (msg_read_len < msg_total_len) { + client->event.topic = saved_msg_topic; + client->event.topic_len = saved_msg_topic_len; + while(msg_read_len < msg_total_len) { size_t buf_len = client->mqtt_state.in_buffer_length; - msg_data = (char *)client->mqtt_state.in_buffer; - msg_topic = NULL; - msg_topic_len = 0; msg_data_offset += msg_data_len; - int ret = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, - msg_total_len - msg_read_len > buf_len ? buf_len : msg_total_len - msg_read_len, - client->config->network_timeout_ms); + + size_t read_len; + if(msg_total_len - msg_read_len > buf_len - saved_msg_topic_len) { + read_len = buf_len - saved_msg_topic_len; + } else { + read_len = msg_total_len - msg_read_len; + } + + int ret = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, read_len, client->config->network_timeout_ms); if (ret <= 0) { return esp_mqtt_handle_transport_read_error(ret, client, false) == 0 ? ESP_OK : ESP_FAIL; } msg_data_len = ret; msg_read_len += msg_data_len; - goto post_data_event; + + ESP_LOGD(TAG, "Get data len= %"NEWLIB_NANO_COMPAT_FORMAT", topic len=%"NEWLIB_NANO_COMPAT_FORMAT", total_data: %d offset: %"NEWLIB_NANO_COMPAT_FORMAT, + NEWLIB_NANO_COMPAT_CAST(msg_data_len), NEWLIB_NANO_COMPAT_CAST(msg_topic_len), + client->event.total_data_len, NEWLIB_NANO_COMPAT_CAST(msg_data_offset)); + + client->event.data_len = msg_data_len; + client->event.current_data_offset = msg_data_offset; + + esp_mqtt_dispatch_event(client); + } + free(saved_msg_topic); return ESP_OK; }