diff --git a/README.md b/README.md index beae892..14b1b66 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ ![Build with PlatformIO](https://github.com/marvinroger/async-mqtt-client/workflows/Build%20with%20Platformio/badge.svg) ![cpplint](https://github.com/marvinroger/async-mqtt-client/workflows/cpplint/badge.svg) -An Arduino for ESP8266 and ESP32 asynchronous [MQTT](http://mqtt.org/) client implementation, built on [me-no-dev/ESPAsyncTCP (ESP8266)](https://github.com/me-no-dev/ESPAsyncTCP) | [me-no-dev/AsyncTCP (ESP32)](https://github.com/me-no-dev/AsyncTCP) . +An Arduino for ESP8266 and ESP32 asynchronous [MQTT](http://mqtt.org/) client implementation, built on [me-no-dev/ESPAsyncTCP (ESP8266)](https://github.com/me-no-dev/ESPAsyncTCP) | [me-no-dev/AsyncTCP (ESP32)](https://github.com/me-no-dev/AsyncTCP). ## Features @@ -13,7 +13,8 @@ An Arduino for ESP8266 and ESP32 asynchronous [MQTT](http://mqtt.org/) client im * Publish at QoS 0, 1 and 2 * SSL/TLS support * Available in the [PlatformIO registry](http://platformio.org/lib/show/346/AsyncMqttClient) +* **New:** Option to discover the MQTT server by searching for the service `_mqtt` over the `_tcp` port. ## Requirements, installation and usage -The project is documented in the [/docs folder](docs). +The project is documented in the [/docs folder](docs). \ No newline at end of file diff --git a/examples/FullyFeatured-ESP32/FullyFeatured-ESP32.ino b/examples/FullyFeatured-ESP32/FullyFeatured-ESP32.ino index c226569..cf8b015 100644 --- a/examples/FullyFeatured-ESP32/FullyFeatured-ESP32.ino +++ b/examples/FullyFeatured-ESP32/FullyFeatured-ESP32.ino @@ -2,133 +2,132 @@ This example uses FreeRTOS softwaretimers as there is no built-in Ticker library */ - #include extern "C" { - #include "freertos/FreeRTOS.h" - #include "freertos/timers.h" +#include "freertos/FreeRTOS.h" +#include "freertos/timers.h" } #include #define WIFI_SSID "yourSSID" #define WIFI_PASSWORD "yourpass" -#define MQTT_HOST IPAddress(192, 168, 1, 10) -#define MQTT_PORT 1883 +const char* mqttServiceName = "_mqtt"; +const char* protocol = "_tcp"; AsyncMqttClient mqttClient; TimerHandle_t mqttReconnectTimer; TimerHandle_t wifiReconnectTimer; void connectToWifi() { - Serial.println("Connecting to Wi-Fi..."); - WiFi.begin(WIFI_SSID, WIFI_PASSWORD); + Serial.println("Connecting to Wi-Fi..."); + WiFi.begin(WIFI_SSID, WIFI_PASSWORD); } void connectToMqtt() { - Serial.println("Connecting to MQTT..."); - mqttClient.connect(); + Serial.println("Connecting to MQTT..."); + mqttClient.connect(); } void WiFiEvent(WiFiEvent_t event) { Serial.printf("[WiFi-event] event: %d\n", event); - switch(event) { - case SYSTEM_EVENT_STA_GOT_IP: - Serial.println("WiFi connected"); - Serial.println("IP address: "); - Serial.println(WiFi.localIP()); - connectToMqtt(); - break; - case SYSTEM_EVENT_STA_DISCONNECTED: - Serial.println("WiFi lost connection"); - xTimerStop(mqttReconnectTimer, 0); // ensure we don't reconnect to MQTT while reconnecting to Wi-Fi - xTimerStart(wifiReconnectTimer, 0); - break; + switch (event) { + case SYSTEM_EVENT_STA_GOT_IP: + Serial.println("WiFi connected"); + Serial.println("IP address: "); + Serial.println(WiFi.localIP()); + connectToMqtt(); + break; + case SYSTEM_EVENT_STA_DISCONNECTED: + Serial.println("WiFi lost connection"); + xTimerStop(mqttReconnectTimer, 0); // ensure we don't reconnect to MQTT while reconnecting to Wi-Fi + xTimerStart(wifiReconnectTimer, 0); + break; } } void onMqttConnect(bool sessionPresent) { - Serial.println("Connected to MQTT."); - Serial.print("Session present: "); - Serial.println(sessionPresent); - uint16_t packetIdSub = mqttClient.subscribe("test/lol", 2); - Serial.print("Subscribing at QoS 2, packetId: "); - Serial.println(packetIdSub); - mqttClient.publish("test/lol", 0, true, "test 1"); - Serial.println("Publishing at QoS 0"); - uint16_t packetIdPub1 = mqttClient.publish("test/lol", 1, true, "test 2"); - Serial.print("Publishing at QoS 1, packetId: "); - Serial.println(packetIdPub1); - uint16_t packetIdPub2 = mqttClient.publish("test/lol", 2, true, "test 3"); - Serial.print("Publishing at QoS 2, packetId: "); - Serial.println(packetIdPub2); + Serial.println("Connected to MQTT."); + Serial.print("Session present: "); + Serial.println(sessionPresent); + uint16_t packetIdSub = mqttClient.subscribe("test/lol", 2); + Serial.print("Subscribing at QoS 2, packetId: "); + Serial.println(packetIdSub); + mqttClient.publish("test/lol", 0, true, "test 1"); + Serial.println("Publishing at QoS 0"); + uint16_t packetIdPub1 = mqttClient.publish("test/lol", 1, true, "test 2"); + Serial.print("Publishing at QoS 1, packetId: "); + Serial.println(packetIdPub1); + uint16_t packetIdPub2 = mqttClient.publish("test/lol", 2, true, "test 3"); + Serial.print("Publishing at QoS 2, packetId: "); + Serial.println(packetIdPub2); } void onMqttDisconnect(AsyncMqttClientDisconnectReason reason) { - Serial.println("Disconnected from MQTT."); + Serial.println("Disconnected from MQTT."); - if (WiFi.isConnected()) { - xTimerStart(mqttReconnectTimer, 0); - } + if (WiFi.isConnected()) { + xTimerStart(mqttReconnectTimer, 0); + } } void onMqttSubscribe(uint16_t packetId, uint8_t qos) { - Serial.println("Subscribe acknowledged."); - Serial.print(" packetId: "); - Serial.println(packetId); - Serial.print(" qos: "); - Serial.println(qos); + Serial.println("Subscribe acknowledged."); + Serial.print(" packetId: "); + Serial.println(packetId); + Serial.print(" qos: "); + Serial.println(qos); } void onMqttUnsubscribe(uint16_t packetId) { - Serial.println("Unsubscribe acknowledged."); - Serial.print(" packetId: "); - Serial.println(packetId); + Serial.println("Unsubscribe acknowledged."); + Serial.print(" packetId: "); + Serial.println(packetId); } void onMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) { - Serial.println("Publish received."); - Serial.print(" topic: "); - Serial.println(topic); - Serial.print(" qos: "); - Serial.println(properties.qos); - Serial.print(" dup: "); - Serial.println(properties.dup); - Serial.print(" retain: "); - Serial.println(properties.retain); - Serial.print(" len: "); - Serial.println(len); - Serial.print(" index: "); - Serial.println(index); - Serial.print(" total: "); - Serial.println(total); + Serial.println("Publish received."); + Serial.print(" topic: "); + Serial.println(topic); + Serial.print(" qos: "); + Serial.println(properties.qos); + Serial.print(" dup: "); + Serial.println(properties.dup); + Serial.print(" retain: "); + Serial.println(properties.retain); + Serial.print(" len: "); + Serial.println(len); + Serial.print(" index: "); + Serial.println(index); + Serial.print(" total: "); + Serial.println(total); } void onMqttPublish(uint16_t packetId) { - Serial.println("Publish acknowledged."); - Serial.print(" packetId: "); - Serial.println(packetId); + Serial.println("Publish acknowledged."); + Serial.print(" packetId: "); + Serial.println(packetId); } void setup() { - Serial.begin(115200); - Serial.println(); - Serial.println(); + Serial.begin(115200); + Serial.println(); + Serial.println(); - mqttReconnectTimer = xTimerCreate("mqttTimer", pdMS_TO_TICKS(2000), pdFALSE, (void*)0, reinterpret_cast(connectToMqtt)); - wifiReconnectTimer = xTimerCreate("wifiTimer", pdMS_TO_TICKS(2000), pdFALSE, (void*)0, reinterpret_cast(connectToWifi)); + mqttReconnectTimer = xTimerCreate("mqttTimer", pdMS_TO_TICKS(2000), pdFALSE, (void*)0, reinterpret_cast(connectToMqtt)); + wifiReconnectTimer = xTimerCreate("wifiTimer", pdMS_TO_TICKS(2000), pdFALSE, (void*)0, reinterpret_cast(connectToWifi)); - WiFi.onEvent(WiFiEvent); + WiFi.onEvent(WiFiEvent); - mqttClient.onConnect(onMqttConnect); - mqttClient.onDisconnect(onMqttDisconnect); - mqttClient.onSubscribe(onMqttSubscribe); - mqttClient.onUnsubscribe(onMqttUnsubscribe); - mqttClient.onMessage(onMqttMessage); - mqttClient.onPublish(onMqttPublish); - mqttClient.setServer(MQTT_HOST, MQTT_PORT); + mqttClient.onConnect(onMqttConnect); + mqttClient.onDisconnect(onMqttDisconnect); + mqttClient.onSubscribe(onMqttSubscribe); + mqttClient.onUnsubscribe(onMqttUnsubscribe); + mqttClient.onMessage(onMqttMessage); + mqttClient.onPublish(onMqttPublish); + mqttClient.setServer(mqttServiceName, protocol); - connectToWifi(); + connectToWifi(); } void loop() { diff --git a/src/AsyncMqttClient.cpp b/src/AsyncMqttClient.cpp index b4375cf..9ef5975 100644 --- a/src/AsyncMqttClient.cpp +++ b/src/AsyncMqttClient.cpp @@ -1,232 +1,263 @@ #include "AsyncMqttClient.hpp" AsyncMqttClient::AsyncMqttClient() -: _client() -, _head(nullptr) -, _tail(nullptr) -, _sent(0) -, _state(DISCONNECTED) -, _disconnectReason(AsyncMqttClientDisconnectReason::TCP_DISCONNECTED) -, _lastClientActivity(0) -, _lastServerActivity(0) -, _lastPingRequestTime(0) -, _generatedClientId{0} -, _ip() -, _host(nullptr) -, _useIp(false) + : _client(), + _head(nullptr), + _tail(nullptr), + _sent(0), + _state(DISCONNECTED), + _disconnectReason(AsyncMqttClientDisconnectReason::TCP_DISCONNECTED), + _lastClientActivity(0), + _lastServerActivity(0), + _lastPingRequestTime(0), + _generatedClientId{0}, + _ip(), + _host(nullptr), + _useIp(false) #if ASYNC_TCP_SSL_ENABLED -, _secure(false) + , + _secure(false) #endif -, _port(0) -, _keepAlive(15) -, _cleanSession(true) -, _clientId(nullptr) -, _username(nullptr) -, _password(nullptr) -, _willTopic(nullptr) -, _willPayload(nullptr) -, _willPayloadLength(0) -, _willQos(0) -, _willRetain(false) + , + _hostName(), + _port(0), + _keepAlive(15), + _cleanSession(true), + _clientId(nullptr), + _username(nullptr), + _password(nullptr), + _willTopic(nullptr), + _willPayload(nullptr), + _willPayloadLength(0), + _willQos(0), + _willRetain(false) #if ASYNC_TCP_SSL_ENABLED -, _secureServerFingerprints() + , + _secureServerFingerprints() #endif -, _onConnectUserCallbacks() -, _onDisconnectUserCallbacks() -, _onSubscribeUserCallbacks() -, _onUnsubscribeUserCallbacks() -, _onMessageUserCallbacks() -, _onPublishUserCallbacks() -, _parsingInformation { .bufferState = AsyncMqttClientInternals::BufferState::NONE } -, _currentParsedPacket(nullptr) -, _remainingLengthBufferPosition(0) -, _remainingLengthBuffer{0} -, _pendingPubRels() { - _client.onConnect([](void* obj, AsyncClient* c) { (static_cast(obj))->_onConnect(); }, this); - _client.onDisconnect([](void* obj, AsyncClient* c) { (static_cast(obj))->_onDisconnect(); }, this); - // _client.onError([](void* obj, AsyncClient* c, int8_t error) { (static_cast(obj))->_onError(error); }, this); - // _client.onTimeout([](void* obj, AsyncClient* c, uint32_t time) { (static_cast(obj))->_onTimeout(); }, this); - _client.onAck([](void* obj, AsyncClient* c, size_t len, uint32_t time) { (static_cast(obj))->_onAck(len); }, this); - _client.onData([](void* obj, AsyncClient* c, void* data, size_t len) { (static_cast(obj))->_onData(static_cast(data), len); }, this); - _client.onPoll([](void* obj, AsyncClient* c) { (static_cast(obj))->_onPoll(); }, this); - _client.setNoDelay(true); // send small packets immediately (PINGREQ/DISCONN are only 2 bytes) + , + _onConnectUserCallbacks(), + _onDisconnectUserCallbacks(), + _onSubscribeUserCallbacks(), + _onUnsubscribeUserCallbacks(), + _onMessageUserCallbacks(), + _onPublishUserCallbacks(), + _parsingInformation{.bufferState = AsyncMqttClientInternals::BufferState::NONE}, + _currentParsedPacket(nullptr), + _remainingLengthBufferPosition(0), + _remainingLengthBuffer{0}, + _pendingPubRels() { + + _client.onConnect([](void* obj, AsyncClient* c) { (static_cast(obj))->_onConnect(); }, this); + _client.onDisconnect([](void* obj, AsyncClient* c) { (static_cast(obj))->_onDisconnect(); }, this); + // _client.onError([](void* obj, AsyncClient* c, int8_t error) { (static_cast(obj))->_onError(error); }, this); + // _client.onTimeout([](void* obj, AsyncClient* c, uint32_t time) { (static_cast(obj))->_onTimeout(); }, this); + _client.onAck([](void* obj, AsyncClient* c, size_t len, uint32_t time) { (static_cast(obj))->_onAck(len); }, this); + _client.onData([](void* obj, AsyncClient* c, void* data, size_t len) { (static_cast(obj))->_onData(static_cast(data), len); }, this); + _client.onPoll([](void* obj, AsyncClient* c) { (static_cast(obj))->_onPoll(); }, this); + _client.setNoDelay(true); // send small packets immediately (PINGREQ/DISCONN are only 2 bytes) #ifdef ESP32 - sprintf(_generatedClientId, "esp32-%06llx", ESP.getEfuseMac()); - _xSemaphore = xSemaphoreCreateMutex(); + sprintf(_generatedClientId, "esp32-%06llx", ESP.getEfuseMac()); + _xSemaphore = xSemaphoreCreateMutex(); #elif defined(ESP8266) - sprintf(_generatedClientId, "esp8266-%06x", ESP.getChipId()); + sprintf(_generatedClientId, "esp8266-%06x", ESP.getChipId()); #endif - _clientId = _generatedClientId; + _clientId = _generatedClientId; - setMaxTopicLength(128); + setMaxTopicLength(128); } AsyncMqttClient::~AsyncMqttClient() { - delete _currentParsedPacket; - delete[] _parsingInformation.topicBuffer; - _clear(); - _pendingPubRels.clear(); - _pendingPubRels.shrink_to_fit(); - _clearQueue(false); // _clear() doesn't clear session data + delete _currentParsedPacket; + delete[] _parsingInformation.topicBuffer; + _clear(); + _pendingPubRels.clear(); + _pendingPubRels.shrink_to_fit(); + _clearQueue(false); // _clear() doesn't clear session data #ifdef ESP32 - vSemaphoreDelete(_xSemaphore); + vSemaphoreDelete(_xSemaphore); #endif } AsyncMqttClient& AsyncMqttClient::setKeepAlive(uint16_t keepAlive) { - _keepAlive = keepAlive; - return *this; + _keepAlive = keepAlive; + return *this; } AsyncMqttClient& AsyncMqttClient::setClientId(const char* clientId) { - _clientId = clientId; - return *this; + _clientId = clientId; + return *this; } AsyncMqttClient& AsyncMqttClient::setCleanSession(bool cleanSession) { - _cleanSession = cleanSession; - return *this; + _cleanSession = cleanSession; + return *this; } AsyncMqttClient& AsyncMqttClient::setMaxTopicLength(uint16_t maxTopicLength) { - _parsingInformation.maxTopicLength = maxTopicLength; - delete[] _parsingInformation.topicBuffer; - _parsingInformation.topicBuffer = new char[maxTopicLength + 1]; - return *this; + _parsingInformation.maxTopicLength = maxTopicLength; + delete[] _parsingInformation.topicBuffer; + _parsingInformation.topicBuffer = new char[maxTopicLength + 1]; + return *this; } AsyncMqttClient& AsyncMqttClient::setCredentials(const char* username, const char* password) { - _username = username; - _password = password; - return *this; + _username = username; + _password = password; + return *this; } AsyncMqttClient& AsyncMqttClient::setWill(const char* topic, uint8_t qos, bool retain, const char* payload, size_t length) { - _willTopic = topic; - _willQos = qos; - _willRetain = retain; - _willPayload = payload; - _willPayloadLength = length; - return *this; + _willTopic = topic; + _willQos = qos; + _willRetain = retain; + _willPayload = payload; + _willPayloadLength = length; + return *this; } AsyncMqttClient& AsyncMqttClient::setServer(IPAddress ip, uint16_t port) { - _useIp = true; - _ip = ip; - _port = port; - return *this; + _useIp = true; + _ip = ip; + _port = port; + return *this; } AsyncMqttClient& AsyncMqttClient::setServer(const char* host, uint16_t port) { - _useIp = false; - _host = host; - _port = port; - return *this; + _useIp = false; + _host = host; + _port = port; + return *this; +} + +AsyncMqttClient& AsyncMqttClient::setServer(const char* serviceName, const char* protocol) { + _serviceName = serviceName; + _protocol = protocol; + + // std::cout << "host name: " << hostName << std::endl; + if (_hostName != "") { + if (MDNS.begin(_hostName)) { // Nome do host do ESP32 + int n = MDNS.queryService(_serviceName, _protocol); + if (n > 0) { + _ip = MDNS.IP(0); + _port = MDNS.port(0); + _useIp = true; + return *this; + } + } + } +} + +AsyncMqttClient& AsyncMqttClient::setHostName(const char* hostName) { + if (strlen(hostName) > 0) { + _hostName = hostName; + } + return *this; } #if ASYNC_TCP_SSL_ENABLED AsyncMqttClient& AsyncMqttClient::setSecure(bool secure) { - _secure = secure; - return *this; + _secure = secure; + return *this; } AsyncMqttClient& AsyncMqttClient::addServerFingerprint(const uint8_t* fingerprint) { - std::array newFingerprint; - memcpy(newFingerprint.data(), fingerprint, SHA1_SIZE); - _secureServerFingerprints.push_back(newFingerprint); - return *this; + std::array newFingerprint; + memcpy(newFingerprint.data(), fingerprint, SHA1_SIZE); + _secureServerFingerprints.push_back(newFingerprint); + return *this; } #endif AsyncMqttClient& AsyncMqttClient::onConnect(AsyncMqttClientInternals::OnConnectUserCallback callback) { - _onConnectUserCallbacks.push_back(callback); - return *this; + _onConnectUserCallbacks.push_back(callback); + return *this; } AsyncMqttClient& AsyncMqttClient::onDisconnect(AsyncMqttClientInternals::OnDisconnectUserCallback callback) { - _onDisconnectUserCallbacks.push_back(callback); - return *this; + _onDisconnectUserCallbacks.push_back(callback); + return *this; } AsyncMqttClient& AsyncMqttClient::onSubscribe(AsyncMqttClientInternals::OnSubscribeUserCallback callback) { - _onSubscribeUserCallbacks.push_back(callback); - return *this; + _onSubscribeUserCallbacks.push_back(callback); + return *this; } AsyncMqttClient& AsyncMqttClient::onUnsubscribe(AsyncMqttClientInternals::OnUnsubscribeUserCallback callback) { - _onUnsubscribeUserCallbacks.push_back(callback); - return *this; + _onUnsubscribeUserCallbacks.push_back(callback); + return *this; } AsyncMqttClient& AsyncMqttClient::onMessage(AsyncMqttClientInternals::OnMessageUserCallback callback) { - _onMessageUserCallbacks.push_back(callback); - return *this; + _onMessageUserCallbacks.push_back(callback); + return *this; } AsyncMqttClient& AsyncMqttClient::onPublish(AsyncMqttClientInternals::OnPublishUserCallback callback) { - _onPublishUserCallbacks.push_back(callback); - return *this; + _onPublishUserCallbacks.push_back(callback); + return *this; } void AsyncMqttClient::_freeCurrentParsedPacket() { - delete _currentParsedPacket; - _currentParsedPacket = nullptr; + delete _currentParsedPacket; + _currentParsedPacket = nullptr; } void AsyncMqttClient::_clear() { - _lastPingRequestTime = 0; - _freeCurrentParsedPacket(); - _clearQueue(true); // keep session data for now + _lastPingRequestTime = 0; + _freeCurrentParsedPacket(); + _clearQueue(true); // keep session data for now - _parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::NONE; + _parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::NONE; - _client.setRxTimeout(0); + _client.setRxTimeout(0); } /* TCP */ void AsyncMqttClient::_onConnect() { - log_i("TCP conn, MQTT CONNECT"); + log_i("TCP conn, MQTT CONNECT"); #if ASYNC_TCP_SSL_ENABLED - if (_secure && _secureServerFingerprints.size() > 0) { - SSL* clientSsl = _client.getSSL(); - - bool sslFoundFingerprint = false; - for (std::array fingerprint : _secureServerFingerprints) { - if (ssl_match_fingerprint(clientSsl, fingerprint.data()) == SSL_OK) { - sslFoundFingerprint = true; - break; - } - } + if (_secure && _secureServerFingerprints.size() > 0) { + SSL* clientSsl = _client.getSSL(); + + bool sslFoundFingerprint = false; + for (std::array fingerprint : _secureServerFingerprints) { + if (ssl_match_fingerprint(clientSsl, fingerprint.data()) == SSL_OK) { + sslFoundFingerprint = true; + break; + } + } - if (!sslFoundFingerprint) { - _disconnectReason = AsyncMqttClientDisconnectReason::TLS_BAD_FINGERPRINT; - _client.close(true); - return; + if (!sslFoundFingerprint) { + _disconnectReason = AsyncMqttClientDisconnectReason::TLS_BAD_FINGERPRINT; + _client.close(true); + return; + } } - } #endif - AsyncMqttClientInternals::OutPacket* msg = - new AsyncMqttClientInternals::ConnectOutPacket(_cleanSession, - _username, - _password, - _willTopic, - _willRetain, - _willQos, - _willPayload, - _willPayloadLength, - _keepAlive, - _clientId); - _addFront(msg); - _handleQueue(); + AsyncMqttClientInternals::OutPacket* msg = + new AsyncMqttClientInternals::ConnectOutPacket(_cleanSession, + _username, + _password, + _willTopic, + _willRetain, + _willQos, + _willPayload, + _willPayloadLength, + _keepAlive, + _clientId); + _addFront(msg); + _handleQueue(); } void AsyncMqttClient::_onDisconnect() { - log_i("TCP disconn"); - _state = DISCONNECTED; + log_i("TCP disconn"); + _state = DISCONNECTED; - _clear(); + _clear(); - for (auto callback : _onDisconnectUserCallbacks) callback(_disconnectReason); + for (auto callback : _onDisconnectUserCallbacks) callback(_disconnectReason); } /* @@ -241,515 +272,515 @@ void AsyncMqttClient::_onTimeout() { */ void AsyncMqttClient::_onAck(size_t len) { - log_i("ack %u", len); - _handleQueue(); + log_i("ack %u", len); + _handleQueue(); } void AsyncMqttClient::_onData(char* data, size_t len) { - log_i("data rcv (%u)", len); - size_t currentBytePosition = 0; - char currentByte; - _lastServerActivity = millis(); - do { - switch (_parsingInformation.bufferState) { - case AsyncMqttClientInternals::BufferState::NONE: - currentByte = data[currentBytePosition++]; - _parsingInformation.packetType = currentByte >> 4; - _parsingInformation.packetFlags = (currentByte << 4) >> 4; - _parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::REMAINING_LENGTH; - switch (_parsingInformation.packetType) { - case AsyncMqttClientInternals::PacketType.CONNACK: - log_i("rcv CONNACK"); - _currentParsedPacket = new AsyncMqttClientInternals::ConnAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onConnAck, this, std::placeholders::_1, std::placeholders::_2)); - _client.setRxTimeout(0); - break; - case AsyncMqttClientInternals::PacketType.PINGRESP: - log_i("rcv PINGRESP"); - _currentParsedPacket = new AsyncMqttClientInternals::PingRespPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPingResp, this)); - break; - case AsyncMqttClientInternals::PacketType.SUBACK: - log_i("rcv SUBACK"); - _currentParsedPacket = new AsyncMqttClientInternals::SubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onSubAck, this, std::placeholders::_1, std::placeholders::_2)); - break; - case AsyncMqttClientInternals::PacketType.UNSUBACK: - log_i("rcv UNSUBACK"); - _currentParsedPacket = new AsyncMqttClientInternals::UnsubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onUnsubAck, this, std::placeholders::_1)); - break; - case AsyncMqttClientInternals::PacketType.PUBLISH: - log_i("rcv PUBLISH"); - _currentParsedPacket = new AsyncMqttClientInternals::PublishPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5, std::placeholders::_6, std::placeholders::_7, std::placeholders::_8, std::placeholders::_9), std::bind(&AsyncMqttClient::_onPublish, this, std::placeholders::_1, std::placeholders::_2)); - break; - case AsyncMqttClientInternals::PacketType.PUBREL: - log_i("rcv PUBREL"); - _currentParsedPacket = new AsyncMqttClientInternals::PubRelPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubRel, this, std::placeholders::_1)); - break; - case AsyncMqttClientInternals::PacketType.PUBACK: - log_i("rcv PUBACK"); - _currentParsedPacket = new AsyncMqttClientInternals::PubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubAck, this, std::placeholders::_1)); - break; - case AsyncMqttClientInternals::PacketType.PUBREC: - log_i("rcv PUBREC"); - _currentParsedPacket = new AsyncMqttClientInternals::PubRecPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubRec, this, std::placeholders::_1)); - break; - case AsyncMqttClientInternals::PacketType.PUBCOMP: - log_i("rcv PUBCOMP"); - _currentParsedPacket = new AsyncMqttClientInternals::PubCompPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubComp, this, std::placeholders::_1)); - break; - default: - log_i("rcv PROTOCOL VIOLATION"); - disconnect(true); - break; - } - break; - case AsyncMqttClientInternals::BufferState::REMAINING_LENGTH: - currentByte = data[currentBytePosition++]; - _remainingLengthBuffer[_remainingLengthBufferPosition++] = currentByte; - if (currentByte >> 7 == 0) { - _parsingInformation.remainingLength = AsyncMqttClientInternals::Helpers::decodeRemainingLength(_remainingLengthBuffer); - _remainingLengthBufferPosition = 0; - if (_parsingInformation.remainingLength > 0) { - _parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::VARIABLE_HEADER; - } else { - // PINGRESP is a special case where it has no variable header, so the packet ends right here - _parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::NONE; - _onPingResp(); - } + log_i("data rcv (%u)", len); + size_t currentBytePosition = 0; + char currentByte; + _lastServerActivity = millis(); + do { + switch (_parsingInformation.bufferState) { + case AsyncMqttClientInternals::BufferState::NONE: + currentByte = data[currentBytePosition++]; + _parsingInformation.packetType = currentByte >> 4; + _parsingInformation.packetFlags = (currentByte << 4) >> 4; + _parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::REMAINING_LENGTH; + switch (_parsingInformation.packetType) { + case AsyncMqttClientInternals::PacketType.CONNACK: + log_i("rcv CONNACK"); + _currentParsedPacket = new AsyncMqttClientInternals::ConnAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onConnAck, this, std::placeholders::_1, std::placeholders::_2)); + _client.setRxTimeout(0); + break; + case AsyncMqttClientInternals::PacketType.PINGRESP: + log_i("rcv PINGRESP"); + _currentParsedPacket = new AsyncMqttClientInternals::PingRespPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPingResp, this)); + break; + case AsyncMqttClientInternals::PacketType.SUBACK: + log_i("rcv SUBACK"); + _currentParsedPacket = new AsyncMqttClientInternals::SubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onSubAck, this, std::placeholders::_1, std::placeholders::_2)); + break; + case AsyncMqttClientInternals::PacketType.UNSUBACK: + log_i("rcv UNSUBACK"); + _currentParsedPacket = new AsyncMqttClientInternals::UnsubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onUnsubAck, this, std::placeholders::_1)); + break; + case AsyncMqttClientInternals::PacketType.PUBLISH: + log_i("rcv PUBLISH"); + _currentParsedPacket = new AsyncMqttClientInternals::PublishPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5, std::placeholders::_6, std::placeholders::_7, std::placeholders::_8, std::placeholders::_9), std::bind(&AsyncMqttClient::_onPublish, this, std::placeholders::_1, std::placeholders::_2)); + break; + case AsyncMqttClientInternals::PacketType.PUBREL: + log_i("rcv PUBREL"); + _currentParsedPacket = new AsyncMqttClientInternals::PubRelPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubRel, this, std::placeholders::_1)); + break; + case AsyncMqttClientInternals::PacketType.PUBACK: + log_i("rcv PUBACK"); + _currentParsedPacket = new AsyncMqttClientInternals::PubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubAck, this, std::placeholders::_1)); + break; + case AsyncMqttClientInternals::PacketType.PUBREC: + log_i("rcv PUBREC"); + _currentParsedPacket = new AsyncMqttClientInternals::PubRecPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubRec, this, std::placeholders::_1)); + break; + case AsyncMqttClientInternals::PacketType.PUBCOMP: + log_i("rcv PUBCOMP"); + _currentParsedPacket = new AsyncMqttClientInternals::PubCompPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubComp, this, std::placeholders::_1)); + break; + default: + log_i("rcv PROTOCOL VIOLATION"); + disconnect(true); + break; + } + break; + case AsyncMqttClientInternals::BufferState::REMAINING_LENGTH: + currentByte = data[currentBytePosition++]; + _remainingLengthBuffer[_remainingLengthBufferPosition++] = currentByte; + if (currentByte >> 7 == 0) { + _parsingInformation.remainingLength = AsyncMqttClientInternals::Helpers::decodeRemainingLength(_remainingLengthBuffer); + _remainingLengthBufferPosition = 0; + if (_parsingInformation.remainingLength > 0) { + _parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::VARIABLE_HEADER; + } else { + // PINGRESP is a special case where it has no variable header, so the packet ends right here + _parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::NONE; + _onPingResp(); + } + } + break; + case AsyncMqttClientInternals::BufferState::VARIABLE_HEADER: + _currentParsedPacket->parseVariableHeader(data, len, ¤tBytePosition); + break; + case AsyncMqttClientInternals::BufferState::PAYLOAD: + _currentParsedPacket->parsePayload(data, len, ¤tBytePosition); + break; + default: + currentBytePosition = len; } - break; - case AsyncMqttClientInternals::BufferState::VARIABLE_HEADER: - _currentParsedPacket->parseVariableHeader(data, len, ¤tBytePosition); - break; - case AsyncMqttClientInternals::BufferState::PAYLOAD: - _currentParsedPacket->parsePayload(data, len, ¤tBytePosition); - break; - default: - currentBytePosition = len; - } - } while (currentBytePosition != len); + } while (currentBytePosition != len); } void AsyncMqttClient::_onPoll() { - // if there is too much time the client has sent a ping request without a response, disconnect client to avoid half open connections - if (_lastPingRequestTime != 0 && (millis() - _lastPingRequestTime) >= (_keepAlive * 1000 * 2)) { - log_w("PING t/o, disconnecting"); - disconnect(true); - return; - } - // send ping to ensure the server will receive at least one message inside keepalive window - if (_state == CONNECTED && _lastPingRequestTime == 0 && (millis() - _lastClientActivity) >= (_keepAlive * 1000 * 0.7)) { - _sendPing(); - // send ping to verify if the server is still there (ensure this is not a half connection) - } else if (_state == CONNECTED && _lastPingRequestTime == 0 && (millis() - _lastServerActivity) >= (_keepAlive * 1000 * 0.7)) { - _sendPing(); - } - _handleQueue(); + // if there is too much time the client has sent a ping request without a response, disconnect client to avoid half open connections + if (_lastPingRequestTime != 0 && (millis() - _lastPingRequestTime) >= (_keepAlive * 1000 * 2)) { + log_w("PING t/o, disconnecting"); + disconnect(true); + return; + } + // send ping to ensure the server will receive at least one message inside keepalive window + if (_state == CONNECTED && _lastPingRequestTime == 0 && (millis() - _lastClientActivity) >= (_keepAlive * 1000 * 0.7)) { + _sendPing(); + // send ping to verify if the server is still there (ensure this is not a half connection) + } else if (_state == CONNECTED && _lastPingRequestTime == 0 && (millis() - _lastServerActivity) >= (_keepAlive * 1000 * 0.7)) { + _sendPing(); + } + _handleQueue(); } /* QUEUE */ void AsyncMqttClient::_insert(AsyncMqttClientInternals::OutPacket* packet) { - // We only use this for QoS2 PUBREL so there must be a PUBLISH packet present. - // The queue therefore cannot be empty and _head points to this PUBLISH packet. - SEMAPHORE_TAKE(); - log_i("new insert #%u", packet->packetType()); - packet->next = _head->next; - _head->next = packet; - if (_head == _tail) { // PUB packet is the only one in the queue - _tail = packet; - } - SEMAPHORE_GIVE(); - _handleQueue(); + // We only use this for QoS2 PUBREL so there must be a PUBLISH packet present. + // The queue therefore cannot be empty and _head points to this PUBLISH packet. + SEMAPHORE_TAKE(); + log_i("new insert #%u", packet->packetType()); + packet->next = _head->next; + _head->next = packet; + if (_head == _tail) { // PUB packet is the only one in the queue + _tail = packet; + } + SEMAPHORE_GIVE(); + _handleQueue(); } void AsyncMqttClient::_addFront(AsyncMqttClientInternals::OutPacket* packet) { - // This is only used for the CONNECT packet, to be able to establish a connection - // before anything else. The queue can be empty or has packets from the continued session. - // In both cases, _head should always point to the CONNECT packet afterwards. - SEMAPHORE_TAKE(); - log_i("new front #%u", packet->packetType()); - if (_head == nullptr) { - _tail = packet; - } else { - packet->next = _head; - } - _head = packet; - SEMAPHORE_GIVE(); - _handleQueue(); + // This is only used for the CONNECT packet, to be able to establish a connection + // before anything else. The queue can be empty or has packets from the continued session. + // In both cases, _head should always point to the CONNECT packet afterwards. + SEMAPHORE_TAKE(); + log_i("new front #%u", packet->packetType()); + if (_head == nullptr) { + _tail = packet; + } else { + packet->next = _head; + } + _head = packet; + SEMAPHORE_GIVE(); + _handleQueue(); } void AsyncMqttClient::_addBack(AsyncMqttClientInternals::OutPacket* packet) { - SEMAPHORE_TAKE(); - log_i("new back #%u", packet->packetType()); - if (!_tail) { - _head = packet; - } else { - _tail->next = packet; - } - _tail = packet; - _tail->next = nullptr; - SEMAPHORE_GIVE(); - _handleQueue(); + SEMAPHORE_TAKE(); + log_i("new back #%u", packet->packetType()); + if (!_tail) { + _head = packet; + } else { + _tail->next = packet; + } + _tail = packet; + _tail->next = nullptr; + SEMAPHORE_GIVE(); + _handleQueue(); } void AsyncMqttClient::_handleQueue() { - SEMAPHORE_TAKE(); - // On ESP32, onDisconnect is called within the close()-call. So we need to make sure we don't lock - bool disconnect = false; - - while (_head && _client.space() > 10) { // safe but arbitrary value, send at least 10 bytes - // 1. try to send - if (_head->size() > _sent) { - // On SSL the TCP library returns the total amount of bytes, not just the unencrypted payload length. - // So we calculate the amount to be written ourselves. - size_t willSend = std::min(_head->size() - _sent, _client.space()); - size_t realSent = _client.add(reinterpret_cast(_head->data(_sent)), willSend, ASYNC_WRITE_FLAG_COPY); // flag is set by LWIP anyway, added for clarity - _sent += willSend; - (void)realSent; - _client.send(); - _lastClientActivity = millis(); - _lastPingRequestTime = 0; - #if ASYNC_TCP_SSL_ENABLED - log_i("snd #%u: (tls: %u) %u/%u", _head->packetType(), realSent, _sent, _head->size()); - #else - log_i("snd #%u: %u/%u", _head->packetType(), _sent, _head->size()); - #endif - if (_head->packetType() == AsyncMqttClientInternals::PacketType.DISCONNECT) { - disconnect = true; - } - } + SEMAPHORE_TAKE(); + // On ESP32, onDisconnect is called within the close()-call. So we need to make sure we don't lock + bool disconnect = false; + + while (_head && _client.space() > 10) { // safe but arbitrary value, send at least 10 bytes + // 1. try to send + if (_head->size() > _sent) { + // On SSL the TCP library returns the total amount of bytes, not just the unencrypted payload length. + // So we calculate the amount to be written ourselves. + size_t willSend = std::min(_head->size() - _sent, _client.space()); + size_t realSent = _client.add(reinterpret_cast(_head->data(_sent)), willSend, ASYNC_WRITE_FLAG_COPY); // flag is set by LWIP anyway, added for clarity + _sent += willSend; + (void)realSent; + _client.send(); + _lastClientActivity = millis(); + _lastPingRequestTime = 0; +#if ASYNC_TCP_SSL_ENABLED + log_i("snd #%u: (tls: %u) %u/%u", _head->packetType(), realSent, _sent, _head->size()); +#else + log_i("snd #%u: %u/%u", _head->packetType(), _sent, _head->size()); +#endif + if (_head->packetType() == AsyncMqttClientInternals::PacketType.DISCONNECT) { + disconnect = true; + } + } - // 2. stop processing when we have to wait for an MQTT acknowledgment - if (_head->size() == _sent) { - if (_head->released()) { - log_i("p #%d rel", _head->packetType()); - AsyncMqttClientInternals::OutPacket* tmp = _head; - _head = _head->next; - if (!_head) _tail = nullptr; - delete tmp; - _sent = 0; - } else { - break; // sending is complete however send next only after mqtt confirmation - } + // 2. stop processing when we have to wait for an MQTT acknowledgment + if (_head->size() == _sent) { + if (_head->released()) { + log_i("p #%d rel", _head->packetType()); + AsyncMqttClientInternals::OutPacket* tmp = _head; + _head = _head->next; + if (!_head) _tail = nullptr; + delete tmp; + _sent = 0; + } else { + break; // sending is complete however send next only after mqtt confirmation + } + } } - } - SEMAPHORE_GIVE(); - if (disconnect) { - log_i("snd DISCONN, disconnecting"); - _client.close(); - } + SEMAPHORE_GIVE(); + if (disconnect) { + log_i("snd DISCONN, disconnecting"); + _client.close(); + } } void AsyncMqttClient::_clearQueue(bool keepSessionData) { - SEMAPHORE_TAKE(); - AsyncMqttClientInternals::OutPacket* packet = _head; - _head = nullptr; - _tail = nullptr; - - while (packet) { - /* MQTT spec 3.1.2.4 Clean Session: - * - QoS 1 and QoS 2 messages which have been sent to the Server, but have not been completely acknowledged. - * - QoS 2 messages which have been received from the Server, but have not been completely acknowledged. - * + (unsent PUB messages with QoS > 0) - * - * To be kept: - * - possibly first message (sent to server but not acked) - * - PUBREC messages (QoS 2 PUB received but not acked) - * - PUBCOMP messages (QoS 2 PUBREL received but not acked) - */ - if (keepSessionData) { - if (packet->qos() > 0 && packet->size() <= _sent) { // check for qos includes check for PUB-packet type - reinterpret_cast(packet)->setDup(); - AsyncMqttClientInternals::OutPacket* next = packet->next; - log_i("keep #%u", packet->packetType()); - SEMAPHORE_GIVE(); - _addBack(packet); - SEMAPHORE_TAKE(); - packet = next; - } else if (packet->qos() > 0 || - packet->packetType() == AsyncMqttClientInternals::PacketType.PUBREC || - packet->packetType() == AsyncMqttClientInternals::PacketType.PUBCOMP) { - AsyncMqttClientInternals::OutPacket* next = packet->next; - log_i("keep #%u", packet->packetType()); - SEMAPHORE_GIVE(); - _addBack(packet); - SEMAPHORE_TAKE(); - packet = next; - } else { - AsyncMqttClientInternals::OutPacket* next = packet->next; - delete packet; - packet = next; - } - /* Delete everything when not keeping session data - */ - } else { - AsyncMqttClientInternals::OutPacket* next = packet->next; - delete packet; - packet = next; + SEMAPHORE_TAKE(); + AsyncMqttClientInternals::OutPacket* packet = _head; + _head = nullptr; + _tail = nullptr; + + while (packet) { + /* MQTT spec 3.1.2.4 Clean Session: + * - QoS 1 and QoS 2 messages which have been sent to the Server, but have not been completely acknowledged. + * - QoS 2 messages which have been received from the Server, but have not been completely acknowledged. + * + (unsent PUB messages with QoS > 0) + * + * To be kept: + * - possibly first message (sent to server but not acked) + * - PUBREC messages (QoS 2 PUB received but not acked) + * - PUBCOMP messages (QoS 2 PUBREL received but not acked) + */ + if (keepSessionData) { + if (packet->qos() > 0 && packet->size() <= _sent) { // check for qos includes check for PUB-packet type + reinterpret_cast(packet)->setDup(); + AsyncMqttClientInternals::OutPacket* next = packet->next; + log_i("keep #%u", packet->packetType()); + SEMAPHORE_GIVE(); + _addBack(packet); + SEMAPHORE_TAKE(); + packet = next; + } else if (packet->qos() > 0 || + packet->packetType() == AsyncMqttClientInternals::PacketType.PUBREC || + packet->packetType() == AsyncMqttClientInternals::PacketType.PUBCOMP) { + AsyncMqttClientInternals::OutPacket* next = packet->next; + log_i("keep #%u", packet->packetType()); + SEMAPHORE_GIVE(); + _addBack(packet); + SEMAPHORE_TAKE(); + packet = next; + } else { + AsyncMqttClientInternals::OutPacket* next = packet->next; + delete packet; + packet = next; + } + /* Delete everything when not keeping session data + */ + } else { + AsyncMqttClientInternals::OutPacket* next = packet->next; + delete packet; + packet = next; + } } - } - _sent = 0; - SEMAPHORE_GIVE(); + _sent = 0; + SEMAPHORE_GIVE(); } /* MQTT */ void AsyncMqttClient::_onPingResp() { - log_i("PINGRESP"); - _freeCurrentParsedPacket(); - _lastPingRequestTime = 0; + log_i("PINGRESP"); + _freeCurrentParsedPacket(); + _lastPingRequestTime = 0; } void AsyncMqttClient::_onConnAck(bool sessionPresent, uint8_t connectReturnCode) { - log_i("CONNACK"); - _freeCurrentParsedPacket(); + log_i("CONNACK"); + _freeCurrentParsedPacket(); - if (!sessionPresent) { - _pendingPubRels.clear(); - _pendingPubRels.shrink_to_fit(); - _clearQueue(false); // remove session data - } + if (!sessionPresent) { + _pendingPubRels.clear(); + _pendingPubRels.shrink_to_fit(); + _clearQueue(false); // remove session data + } - if (connectReturnCode == 0) { - _state = CONNECTED; - for (auto callback : _onConnectUserCallbacks) callback(sessionPresent); - } else { - // Callbacks are handled by the onDisconnect function which is called from the AsyncTcp lib - _disconnectReason = static_cast(connectReturnCode); - return; - } - _handleQueue(); // send any remaining data from continued session + if (connectReturnCode == 0) { + _state = CONNECTED; + for (auto callback : _onConnectUserCallbacks) callback(sessionPresent); + } else { + // Callbacks are handled by the onDisconnect function which is called from the AsyncTcp lib + _disconnectReason = static_cast(connectReturnCode); + return; + } + _handleQueue(); // send any remaining data from continued session } void AsyncMqttClient::_onSubAck(uint16_t packetId, char status) { - log_i("SUBACK"); - _freeCurrentParsedPacket(); - SEMAPHORE_TAKE(); - if (_head && _head->packetId() == packetId) { - _head->release(); - log_i("SUB released"); - } - SEMAPHORE_GIVE(); + log_i("SUBACK"); + _freeCurrentParsedPacket(); + SEMAPHORE_TAKE(); + if (_head && _head->packetId() == packetId) { + _head->release(); + log_i("SUB released"); + } + SEMAPHORE_GIVE(); - for (auto callback : _onSubscribeUserCallbacks) callback(packetId, status); + for (auto callback : _onSubscribeUserCallbacks) callback(packetId, status); - _handleQueue(); // subscribe confirmed, ready to send next queued item + _handleQueue(); // subscribe confirmed, ready to send next queued item } void AsyncMqttClient::_onUnsubAck(uint16_t packetId) { - log_i("UNSUBACK"); - _freeCurrentParsedPacket(); - SEMAPHORE_TAKE(); - if (_head && _head->packetId() == packetId) { - _head->release(); - log_i("UNSUB released"); - } - SEMAPHORE_GIVE(); + log_i("UNSUBACK"); + _freeCurrentParsedPacket(); + SEMAPHORE_TAKE(); + if (_head && _head->packetId() == packetId) { + _head->release(); + log_i("UNSUB released"); + } + SEMAPHORE_GIVE(); - for (auto callback : _onUnsubscribeUserCallbacks) callback(packetId); + for (auto callback : _onUnsubscribeUserCallbacks) callback(packetId); - _handleQueue(); // unsubscribe confirmed, ready to send next queued item + _handleQueue(); // unsubscribe confirmed, ready to send next queued item } void AsyncMqttClient::_onMessage(char* topic, char* payload, uint8_t qos, bool dup, bool retain, size_t len, size_t index, size_t total, uint16_t packetId) { - bool notifyPublish = true; - - if (qos == 2) { - for (AsyncMqttClientInternals::PendingPubRel pendingPubRel : _pendingPubRels) { - if (pendingPubRel.packetId == packetId) { - notifyPublish = false; - break; - } + bool notifyPublish = true; + + if (qos == 2) { + for (AsyncMqttClientInternals::PendingPubRel pendingPubRel : _pendingPubRels) { + if (pendingPubRel.packetId == packetId) { + notifyPublish = false; + break; + } + } } - } - if (notifyPublish) { - AsyncMqttClientMessageProperties properties; - properties.qos = qos; - properties.dup = dup; - properties.retain = retain; + if (notifyPublish) { + AsyncMqttClientMessageProperties properties; + properties.qos = qos; + properties.dup = dup; + properties.retain = retain; - for (auto callback : _onMessageUserCallbacks) callback(topic, payload, properties, len, index, total); - } + for (auto callback : _onMessageUserCallbacks) callback(topic, payload, properties, len, index, total); + } } void AsyncMqttClient::_onPublish(uint16_t packetId, uint8_t qos) { - AsyncMqttClientInternals::PendingAck pendingAck; - - if (qos == 1) { - pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBACK; - pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBACK_RESERVED; - pendingAck.packetId = packetId; - AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PubAckOutPacket(pendingAck); - _addBack(msg); - } else if (qos == 2) { - pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBREC; - pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBREC_RESERVED; - pendingAck.packetId = packetId; - AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PubAckOutPacket(pendingAck); - _addBack(msg); - - bool pubRelAwaiting = false; - for (AsyncMqttClientInternals::PendingPubRel pendingPubRel : _pendingPubRels) { - if (pendingPubRel.packetId == packetId) { - pubRelAwaiting = true; - break; - } - } + AsyncMqttClientInternals::PendingAck pendingAck; + + if (qos == 1) { + pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBACK; + pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBACK_RESERVED; + pendingAck.packetId = packetId; + AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PubAckOutPacket(pendingAck); + _addBack(msg); + } else if (qos == 2) { + pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBREC; + pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBREC_RESERVED; + pendingAck.packetId = packetId; + AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PubAckOutPacket(pendingAck); + _addBack(msg); + + bool pubRelAwaiting = false; + for (AsyncMqttClientInternals::PendingPubRel pendingPubRel : _pendingPubRels) { + if (pendingPubRel.packetId == packetId) { + pubRelAwaiting = true; + break; + } + } - if (!pubRelAwaiting) { - AsyncMqttClientInternals::PendingPubRel pendingPubRel; - pendingPubRel.packetId = packetId; - _pendingPubRels.push_back(pendingPubRel); + if (!pubRelAwaiting) { + AsyncMqttClientInternals::PendingPubRel pendingPubRel; + pendingPubRel.packetId = packetId; + _pendingPubRels.push_back(pendingPubRel); + } } - } - _freeCurrentParsedPacket(); + _freeCurrentParsedPacket(); } void AsyncMqttClient::_onPubRel(uint16_t packetId) { - _freeCurrentParsedPacket(); + _freeCurrentParsedPacket(); - AsyncMqttClientInternals::PendingAck pendingAck; - pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBCOMP; - pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBCOMP_RESERVED; - pendingAck.packetId = packetId; - if (_head && _head->packetId() == packetId) { - AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PubAckOutPacket(pendingAck); - _head->release(); - _insert(msg); - log_i("PUBREC released"); - } + AsyncMqttClientInternals::PendingAck pendingAck; + pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBCOMP; + pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBCOMP_RESERVED; + pendingAck.packetId = packetId; + if (_head && _head->packetId() == packetId) { + AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PubAckOutPacket(pendingAck); + _head->release(); + _insert(msg); + log_i("PUBREC released"); + } - for (size_t i = 0; i < _pendingPubRels.size(); i++) { - if (_pendingPubRels[i].packetId == packetId) { - _pendingPubRels.erase(_pendingPubRels.begin() + i); - _pendingPubRels.shrink_to_fit(); + for (size_t i = 0; i < _pendingPubRels.size(); i++) { + if (_pendingPubRels[i].packetId == packetId) { + _pendingPubRels.erase(_pendingPubRels.begin() + i); + _pendingPubRels.shrink_to_fit(); + } } - } } void AsyncMqttClient::_onPubAck(uint16_t packetId) { - _freeCurrentParsedPacket(); - if (_head && _head->packetId() == packetId) { - _head->release(); - log_i("PUB released"); - } + _freeCurrentParsedPacket(); + if (_head && _head->packetId() == packetId) { + _head->release(); + log_i("PUB released"); + } - for (auto callback : _onPublishUserCallbacks) callback(packetId); + for (auto callback : _onPublishUserCallbacks) callback(packetId); } void AsyncMqttClient::_onPubRec(uint16_t packetId) { - _freeCurrentParsedPacket(); - - // We will only be sending 1 QoS>0 PUB message at a time (to honor message - // ordering). So no need to store ACKS in a separate container as it will - // be stored in the outgoing queue until a PUBCOMP comes in. - AsyncMqttClientInternals::PendingAck pendingAck; - pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBREL; - pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBREL_RESERVED; - pendingAck.packetId = packetId; - log_i("snd PUBREL"); + _freeCurrentParsedPacket(); + + // We will only be sending 1 QoS>0 PUB message at a time (to honor message + // ordering). So no need to store ACKS in a separate container as it will + // be stored in the outgoing queue until a PUBCOMP comes in. + AsyncMqttClientInternals::PendingAck pendingAck; + pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBREL; + pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBREL_RESERVED; + pendingAck.packetId = packetId; + log_i("snd PUBREL"); - AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PubAckOutPacket(pendingAck); - if (_head && _head->packetId() == packetId) { - _head->release(); - log_i("PUB released"); - } - _insert(msg); + AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PubAckOutPacket(pendingAck); + if (_head && _head->packetId() == packetId) { + _head->release(); + log_i("PUB released"); + } + _insert(msg); } void AsyncMqttClient::_onPubComp(uint16_t packetId) { - _freeCurrentParsedPacket(); + _freeCurrentParsedPacket(); - // _head points to the PUBREL package - if (_head && _head->packetId() == packetId) { - _head->release(); - log_i("PUBREL released"); - } + // _head points to the PUBREL package + if (_head && _head->packetId() == packetId) { + _head->release(); + log_i("PUBREL released"); + } - for (auto callback : _onPublishUserCallbacks) callback(packetId); + for (auto callback : _onPublishUserCallbacks) callback(packetId); } void AsyncMqttClient::_sendPing() { - log_i("PING"); - _lastPingRequestTime = millis(); - AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PingReqOutPacket; - _addBack(msg); + log_i("PING"); + _lastPingRequestTime = millis(); + AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PingReqOutPacket; + _addBack(msg); } bool AsyncMqttClient::connected() const { - return _state == CONNECTED; + return _state == CONNECTED; } void AsyncMqttClient::connect() { - if (_state != DISCONNECTED) return; - log_i("CONNECTING"); - _state = CONNECTING; - _disconnectReason = AsyncMqttClientDisconnectReason::TCP_DISCONNECTED; // reset any previous + if (_state != DISCONNECTED) return; + log_i("CONNECTING"); + _state = CONNECTING; + _disconnectReason = AsyncMqttClientDisconnectReason::TCP_DISCONNECTED; // reset any previous - _client.setRxTimeout(_keepAlive); + _client.setRxTimeout(_keepAlive); #if ASYNC_TCP_SSL_ENABLED - if (_useIp) { - _client.connect(_ip, _port, _secure); - } else { - _client.connect(_host, _port, _secure); - } + if (_useIp) { + _client.connect(_ip, _port, _secure); + } else { + _client.connect(_host, _port, _secure); + } #else - if (_useIp) { - _client.connect(_ip, _port); - } else { - _client.connect(_host, _port); - } + if (_useIp) { + _client.connect(_ip, _port); + } else { + _client.connect(_host, _port); + } #endif } void AsyncMqttClient::disconnect(bool force) { - if (_state == DISCONNECTED) return; - log_i("DISCONNECT (f:%d)", force); - if (force) { - _state = DISCONNECTED; - _client.close(true); - } else if (_state != DISCONNECTING) { - _state = DISCONNECTING; - AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::DisconnOutPacket; - _addBack(msg); - } + if (_state == DISCONNECTED) return; + log_i("DISCONNECT (f:%d)", force); + if (force) { + _state = DISCONNECTED; + _client.close(true); + } else if (_state != DISCONNECTING) { + _state = DISCONNECTING; + AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::DisconnOutPacket; + _addBack(msg); + } } uint16_t AsyncMqttClient::subscribe(const char* topic, uint8_t qos) { - if (_state != CONNECTED) return 0; - log_i("SUBSCRIBE"); + if (_state != CONNECTED) return 0; + log_i("SUBSCRIBE"); - AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::SubscribeOutPacket(topic, qos); - _addBack(msg); - return msg->packetId(); + AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::SubscribeOutPacket(topic, qos); + _addBack(msg); + return msg->packetId(); } uint16_t AsyncMqttClient::unsubscribe(const char* topic) { - if (_state != CONNECTED) return 0; - log_i("UNSUBSCRIBE"); + if (_state != CONNECTED) return 0; + log_i("UNSUBSCRIBE"); - AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::UnsubscribeOutPacket(topic); - _addBack(msg); - return msg->packetId(); + AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::UnsubscribeOutPacket(topic); + _addBack(msg); + return msg->packetId(); } uint16_t AsyncMqttClient::publish(const char* topic, uint8_t qos, bool retain, const char* payload, size_t length, bool dup, uint16_t message_id) { - if (_state != CONNECTED || GET_FREE_MEMORY() < MQTT_MIN_FREE_MEMORY) return 0; - log_i("PUBLISH"); + if (_state != CONNECTED || GET_FREE_MEMORY() < MQTT_MIN_FREE_MEMORY) return 0; + log_i("PUBLISH"); - AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PublishOutPacket(topic, qos, retain, payload, length); - _addBack(msg); - return msg->packetId(); + AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PublishOutPacket(topic, qos, retain, payload, length); + _addBack(msg); + return msg->packetId(); } bool AsyncMqttClient::clearQueue() { - if (_state != DISCONNECTED) return false; - _clearQueue(false); - return true; + if (_state != DISCONNECTED) return false; + _clearQueue(false); + return true; } const char* AsyncMqttClient::getClientId() const { - return _clientId; + return _clientId; } diff --git a/src/AsyncMqttClient.hpp b/src/AsyncMqttClient.hpp index 1e81103..02322e7 100644 --- a/src/AsyncMqttClient.hpp +++ b/src/AsyncMqttClient.hpp @@ -1,6 +1,7 @@ #pragma once #include +// #include #include #include "Arduino.h" @@ -11,169 +12,179 @@ #ifdef ESP32 #include +#include +#include #include #elif defined(ESP8266) #include +#include #else #error Platform not supported #endif +#include #if ASYNC_TCP_SSL_ENABLED #include #define SHA1_SIZE 20 #endif -#include "AsyncMqttClient/Flags.hpp" -#include "AsyncMqttClient/ParsingInformation.hpp" -#include "AsyncMqttClient/MessageProperties.hpp" -#include "AsyncMqttClient/Helpers.hpp" #include "AsyncMqttClient/Callbacks.hpp" #include "AsyncMqttClient/DisconnectReasons.hpp" -#include "AsyncMqttClient/Storage.hpp" - -#include "AsyncMqttClient/Packets/Packet.hpp" +#include "AsyncMqttClient/Flags.hpp" +#include "AsyncMqttClient/Helpers.hpp" +#include "AsyncMqttClient/MessageProperties.hpp" #include "AsyncMqttClient/Packets/ConnAckPacket.hpp" -#include "AsyncMqttClient/Packets/PingRespPacket.hpp" -#include "AsyncMqttClient/Packets/SubAckPacket.hpp" -#include "AsyncMqttClient/Packets/UnsubAckPacket.hpp" -#include "AsyncMqttClient/Packets/PublishPacket.hpp" -#include "AsyncMqttClient/Packets/PubRelPacket.hpp" -#include "AsyncMqttClient/Packets/PubAckPacket.hpp" -#include "AsyncMqttClient/Packets/PubRecPacket.hpp" -#include "AsyncMqttClient/Packets/PubCompPacket.hpp" - #include "AsyncMqttClient/Packets/Out/Connect.hpp" +#include "AsyncMqttClient/Packets/Out/Disconn.hpp" #include "AsyncMqttClient/Packets/Out/PingReq.hpp" #include "AsyncMqttClient/Packets/Out/PubAck.hpp" -#include "AsyncMqttClient/Packets/Out/Disconn.hpp" +#include "AsyncMqttClient/Packets/Out/Publish.hpp" #include "AsyncMqttClient/Packets/Out/Subscribe.hpp" #include "AsyncMqttClient/Packets/Out/Unsubscribe.hpp" -#include "AsyncMqttClient/Packets/Out/Publish.hpp" +#include "AsyncMqttClient/Packets/Packet.hpp" +#include "AsyncMqttClient/Packets/PingRespPacket.hpp" +#include "AsyncMqttClient/Packets/PubAckPacket.hpp" +#include "AsyncMqttClient/Packets/PubCompPacket.hpp" +#include "AsyncMqttClient/Packets/PubRecPacket.hpp" +#include "AsyncMqttClient/Packets/PubRelPacket.hpp" +#include "AsyncMqttClient/Packets/PublishPacket.hpp" +#include "AsyncMqttClient/Packets/SubAckPacket.hpp" +#include "AsyncMqttClient/Packets/UnsubAckPacket.hpp" +#include "AsyncMqttClient/ParsingInformation.hpp" +#include "AsyncMqttClient/Storage.hpp" class AsyncMqttClient { - public: - AsyncMqttClient(); - ~AsyncMqttClient(); - - AsyncMqttClient& setKeepAlive(uint16_t keepAlive); - AsyncMqttClient& setClientId(const char* clientId); - AsyncMqttClient& setCleanSession(bool cleanSession); - AsyncMqttClient& setMaxTopicLength(uint16_t maxTopicLength); - AsyncMqttClient& setCredentials(const char* username, const char* password = nullptr); - AsyncMqttClient& setWill(const char* topic, uint8_t qos, bool retain, const char* payload = nullptr, size_t length = 0); - AsyncMqttClient& setServer(IPAddress ip, uint16_t port); - AsyncMqttClient& setServer(const char* host, uint16_t port); + public: + AsyncMqttClient(); + ~AsyncMqttClient(); + + AsyncMqttClient& setKeepAlive(uint16_t keepAlive); + AsyncMqttClient& setClientId(const char* clientId); + AsyncMqttClient& setCleanSession(bool cleanSession); + AsyncMqttClient& setMaxTopicLength(uint16_t maxTopicLength); + AsyncMqttClient& setCredentials(const char* username, const char* password = nullptr); + AsyncMqttClient& setWill(const char* topic, uint8_t qos, bool retain, const char* payload = nullptr, size_t length = 0); + AsyncMqttClient& setServer(IPAddress ip, uint16_t port); + AsyncMqttClient& setServer(const char* host, uint16_t port); + AsyncMqttClient& setServer(const char* serviceName, const char* protocol); + AsyncMqttClient& setHostName(const char* hostName); + #if ASYNC_TCP_SSL_ENABLED - AsyncMqttClient& setSecure(bool secure); - AsyncMqttClient& addServerFingerprint(const uint8_t* fingerprint); + AsyncMqttClient& setSecure(bool secure); + AsyncMqttClient& addServerFingerprint(const uint8_t* fingerprint); #endif - AsyncMqttClient& onConnect(AsyncMqttClientInternals::OnConnectUserCallback callback); - AsyncMqttClient& onDisconnect(AsyncMqttClientInternals::OnDisconnectUserCallback callback); - AsyncMqttClient& onSubscribe(AsyncMqttClientInternals::OnSubscribeUserCallback callback); - AsyncMqttClient& onUnsubscribe(AsyncMqttClientInternals::OnUnsubscribeUserCallback callback); - AsyncMqttClient& onMessage(AsyncMqttClientInternals::OnMessageUserCallback callback); - AsyncMqttClient& onPublish(AsyncMqttClientInternals::OnPublishUserCallback callback); - - bool connected() const; - void connect(); - void disconnect(bool force = false); - uint16_t subscribe(const char* topic, uint8_t qos); - uint16_t unsubscribe(const char* topic); - uint16_t publish(const char* topic, uint8_t qos, bool retain, const char* payload = nullptr, size_t length = 0, bool dup = false, uint16_t message_id = 0); - bool clearQueue(); // Not MQTT compliant! - - const char* getClientId() const; - - private: - AsyncClient _client; - AsyncMqttClientInternals::OutPacket* _head; - AsyncMqttClientInternals::OutPacket* _tail; - size_t _sent; - enum { - CONNECTING, - CONNECTED, - DISCONNECTING, - DISCONNECTED - } _state; - AsyncMqttClientDisconnectReason _disconnectReason; - uint32_t _lastClientActivity; - uint32_t _lastServerActivity; - uint32_t _lastPingRequestTime; - - char _generatedClientId[18 + 1]; // esp8266-abc123 and esp32-abcdef123456 - IPAddress _ip; - const char* _host; - bool _useIp; + AsyncMqttClient& onConnect(AsyncMqttClientInternals::OnConnectUserCallback callback); + AsyncMqttClient& onDisconnect(AsyncMqttClientInternals::OnDisconnectUserCallback callback); + AsyncMqttClient& onSubscribe(AsyncMqttClientInternals::OnSubscribeUserCallback callback); + AsyncMqttClient& onUnsubscribe(AsyncMqttClientInternals::OnUnsubscribeUserCallback callback); + AsyncMqttClient& onMessage(AsyncMqttClientInternals::OnMessageUserCallback callback); + AsyncMqttClient& onPublish(AsyncMqttClientInternals::OnPublishUserCallback callback); + + bool connected() const; + void connect(); + void disconnect(bool force = false); + uint16_t subscribe(const char* topic, uint8_t qos); + uint16_t unsubscribe(const char* topic); + uint16_t publish(const char* topic, uint8_t qos, bool retain, const char* payload = nullptr, size_t length = 0, bool dup = false, uint16_t message_id = 0); + bool clearQueue(); // Not MQTT compliant! + + const char* getClientId() const; + + private: + AsyncClient _client; + AsyncMqttClientInternals::OutPacket* _head; + AsyncMqttClientInternals::OutPacket* _tail; + size_t _sent; + enum { + CONNECTING, + CONNECTED, + DISCONNECTING, + DISCONNECTED + } _state; + AsyncMqttClientDisconnectReason _disconnectReason; + uint32_t _lastClientActivity; + uint32_t _lastServerActivity; + uint32_t _lastPingRequestTime; + Preferences preferences; + + char _generatedClientId[18 + 1]; // esp8266-abc123 and esp32-abcdef123456 + IPAddress _ip; + const char* _host; + bool _useIp; + const char* _serviceName; + const char* _protocol; + const char* _hostName; + #if ASYNC_TCP_SSL_ENABLED - bool _secure; + bool _secure; #endif - uint16_t _port; - uint16_t _keepAlive; - bool _cleanSession; - const char* _clientId; - const char* _username; - const char* _password; - const char* _willTopic; - const char* _willPayload; - uint16_t _willPayloadLength; - uint8_t _willQos; - bool _willRetain; + uint16_t _port; + uint16_t _keepAlive; + bool _cleanSession; + const char* _clientId; + const char* _username; + const char* _password; + const char* _willTopic; + const char* _willPayload; + uint16_t _willPayloadLength; + uint8_t _willQos; + bool _willRetain; #if ASYNC_TCP_SSL_ENABLED - std::vector> _secureServerFingerprints; + std::vector> _secureServerFingerprints; #endif - std::vector _onConnectUserCallbacks; - std::vector _onDisconnectUserCallbacks; - std::vector _onSubscribeUserCallbacks; - std::vector _onUnsubscribeUserCallbacks; - std::vector _onMessageUserCallbacks; - std::vector _onPublishUserCallbacks; + std::vector _onConnectUserCallbacks; + std::vector _onDisconnectUserCallbacks; + std::vector _onSubscribeUserCallbacks; + std::vector _onUnsubscribeUserCallbacks; + std::vector _onMessageUserCallbacks; + std::vector _onPublishUserCallbacks; - AsyncMqttClientInternals::ParsingInformation _parsingInformation; - AsyncMqttClientInternals::Packet* _currentParsedPacket; - uint8_t _remainingLengthBufferPosition; - char _remainingLengthBuffer[4]; + AsyncMqttClientInternals::ParsingInformation _parsingInformation; + AsyncMqttClientInternals::Packet* _currentParsedPacket; + uint8_t _remainingLengthBufferPosition; + char _remainingLengthBuffer[4]; - std::vector _pendingPubRels; + std::vector _pendingPubRels; #if defined(ESP32) - SemaphoreHandle_t _xSemaphore = nullptr; + SemaphoreHandle_t _xSemaphore = nullptr; #elif defined(ESP8266) - bool _xSemaphore = false; + bool _xSemaphore = false; #endif - void _clear(); - void _freeCurrentParsedPacket(); - - // TCP - void _onConnect(); - void _onDisconnect(); - // void _onError(int8_t error); - // void _onTimeout(); - void _onAck(size_t len); - void _onData(char* data, size_t len); - void _onPoll(); - - // QUEUE - void _insert(AsyncMqttClientInternals::OutPacket* packet); // for PUBREL - void _addFront(AsyncMqttClientInternals::OutPacket* packet); // for CONNECT - void _addBack(AsyncMqttClientInternals::OutPacket* packet); // all the rest - void _handleQueue(); - void _clearQueue(bool keepSessionData); - - // MQTT - void _onPingResp(); - void _onConnAck(bool sessionPresent, uint8_t connectReturnCode); - void _onSubAck(uint16_t packetId, char status); - void _onUnsubAck(uint16_t packetId); - void _onMessage(char* topic, char* payload, uint8_t qos, bool dup, bool retain, size_t len, size_t index, size_t total, uint16_t packetId); - void _onPublish(uint16_t packetId, uint8_t qos); - void _onPubRel(uint16_t packetId); - void _onPubAck(uint16_t packetId); - void _onPubRec(uint16_t packetId); - void _onPubComp(uint16_t packetId); - - void _sendPing(); + void _clear(); + void _freeCurrentParsedPacket(); + + // TCP + void _onConnect(); + void _onDisconnect(); + // void _onError(int8_t error); + // void _onTimeout(); + void _onAck(size_t len); + void _onData(char* data, size_t len); + void _onPoll(); + + // QUEUE + void _insert(AsyncMqttClientInternals::OutPacket* packet); // for PUBREL + void _addFront(AsyncMqttClientInternals::OutPacket* packet); // for CONNECT + void _addBack(AsyncMqttClientInternals::OutPacket* packet); // all the rest + void _handleQueue(); + void _clearQueue(bool keepSessionData); + + // MQTT + void _onPingResp(); + void _onConnAck(bool sessionPresent, uint8_t connectReturnCode); + void _onSubAck(uint16_t packetId, char status); + void _onUnsubAck(uint16_t packetId); + void _onMessage(char* topic, char* payload, uint8_t qos, bool dup, bool retain, size_t len, size_t index, size_t total, uint16_t packetId); + void _onPublish(uint16_t packetId, uint8_t qos); + void _onPubRel(uint16_t packetId); + void _onPubAck(uint16_t packetId); + void _onPubRec(uint16_t packetId); + void _onPubComp(uint16_t packetId); + + void _sendPing(); };