From 25e27e6698a3dd36985745722e0a3c1a97909af4 Mon Sep 17 00:00:00 2001 From: David Garske Date: Wed, 22 Nov 2023 13:01:51 -0800 Subject: [PATCH 1/9] Fix for continue during variable part of the header. A continue in the middle of variable would not correctly process for large messages. --- src/mqtt_packet.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/mqtt_packet.c b/src/mqtt_packet.c index 1b9fd0008..5f105ab3d 100644 --- a/src/mqtt_packet.c +++ b/src/mqtt_packet.c @@ -1984,7 +1984,9 @@ int MqttPacket_Read(MqttClient *client, byte* rx_buf, int rx_buf_len, int i; client->packet.stat = MQTT_PK_READ_HEAD; - for (i = 0; i < MQTT_PACKET_MAX_LEN_BYTES; i++) { + for (i = (client->packet.header_len - MQTT_PACKET_HEADER_MIN_SIZE); + i < MQTT_PACKET_MAX_LEN_BYTES; + i++) { /* Check if another byte is needed */ if ((header->len[i] & MQTT_PACKET_LEN_ENCODE_MASK) == 0) { /* Variable byte length can be determined */ From 8e86413e91212017b84882c50223aa8dff994b2e Mon Sep 17 00:00:00 2001 From: David Garske Date: Wed, 22 Nov 2023 13:02:02 -0800 Subject: [PATCH 2/9] Further improve WOLFMQTT_TEST_NONBLOCK. --- src/mqtt_socket.c | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/src/mqtt_socket.c b/src/mqtt_socket.c index 299577e80..c5700ca4e 100644 --- a/src/mqtt_socket.c +++ b/src/mqtt_socket.c @@ -64,6 +64,18 @@ int MqttSocket_TlsSocketReceive(WOLFSSL* ssl, char *buf, int sz, MqttClient *client = (MqttClient*)ptr; (void)ssl; /* Not used */ +#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) + static int testSmallerTlsRead = 0; + if (!testSmallerTlsRead) { + if (sz > 2) + sz /= 2; + testSmallerTlsRead = 1; + } + else { + testSmallerTlsRead = 0; + } +#endif + rc = client->net->read(client->net->context, (byte*)buf, sz, client->tls.timeout_ms); @@ -87,6 +99,18 @@ int MqttSocket_TlsSocketSend(WOLFSSL* ssl, char *buf, int sz, MqttClient *client = (MqttClient*)ptr; (void)ssl; /* Not used */ +#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) + static int testSmallerTlsWrite = 0; + if (!testSmallerTlsWrite) { + if (sz > 2) + sz /= 2; + testSmallerTlsWrite = 1; + } + else { + testSmallerTlsWrite = 0; + } +#endif + rc = client->net->write(client->net->context, (byte*)buf, sz, client->tls.timeout_ms); @@ -173,7 +197,7 @@ static int MqttSocket_WriteDo(MqttClient *client, const byte* buf, int buf_len, #if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) static int testSmallerWrite = 0; if (!testSmallerWrite) { - if (buf_len > 1) + if (buf_len > 2) buf_len /= 2; testSmallerWrite = 1; } @@ -299,7 +323,7 @@ static int MqttSocket_ReadDo(MqttClient *client, byte* buf, int buf_len, #if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) static int testSmallerRead = 0; if (!testSmallerRead) { - if (buf_len > 1) + if (buf_len > 2) buf_len /= 2; testSmallerRead = 1; } From 97cb820562e5a3e189bc71c737ab365333a1797b Mon Sep 17 00:00:00 2001 From: David Garske Date: Wed, 22 Nov 2023 13:02:14 -0800 Subject: [PATCH 3/9] Fixes for using the `-f [file]` option with examples. Allow build-time override of `MAX_BUFFER_SIZE` in examples. --- examples/aws/awsiot.c | 2 ++ examples/azure/azureiothub.c | 2 ++ examples/firmware/fwclient.c | 2 ++ examples/firmware/fwpush.c | 2 ++ examples/mqttclient/mqttclient.c | 4 ++++ examples/multithread/multithread.c | 2 ++ examples/nbclient/nbclient.c | 4 ++++ examples/pub-sub/mqtt-pub.c | 6 +++++- examples/pub-sub/mqtt-sub.c | 4 +++- examples/sn-client/sn-client.c | 12 +++++++----- examples/sn-client/sn-client_qos-1.c | 2 ++ examples/sn-client/sn-multithread.c | 2 ++ examples/wiot/wiot.c | 2 ++ 13 files changed, 39 insertions(+), 7 deletions(-) diff --git a/examples/aws/awsiot.c b/examples/aws/awsiot.c index cd77c3807..09c3a05ad 100644 --- a/examples/aws/awsiot.c +++ b/examples/aws/awsiot.c @@ -60,7 +60,9 @@ static int mTestDone = 0; #define APP_HARDWARE "wolf_aws_iot_demo" #define APP_FIRMWARE_VERSION LIBWOLFMQTT_VERSION_STRING +#ifndef MAX_BUFFER_SIZE #define MAX_BUFFER_SIZE 512 /* Maximum size for network read/write callbacks */ +#endif #define AWSIOT_HOST "a2dujmi05ideo2-ats.iot.us-west-2.amazonaws.com" #define AWSIOT_DEVICE_ID "demoDevice" #define AWSIOT_QOS MQTT_QOS_1 diff --git a/examples/azure/azureiothub.c b/examples/azure/azureiothub.c index 463e67256..dc8205b65 100644 --- a/examples/azure/azureiothub.c +++ b/examples/azure/azureiothub.c @@ -76,7 +76,9 @@ static int mTestDone = 0; * https://azure.microsoft.com/en-us/documentation/articles/iot-hub-sas-tokens/#using-sas-tokens-as-a-device * https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-mqtt-support */ +#ifndef MAX_BUFFER_SIZE #define MAX_BUFFER_SIZE 1024 /* Maximum size for network read/write callbacks */ +#endif #define AZURE_API_VERSION "?api-version=2018-06-30" #define AZURE_HOST "wolfMQTT.azure-devices.net" #define AZURE_DEVICE_ID "demoDevice" diff --git a/examples/firmware/fwclient.c b/examples/firmware/fwclient.c index 351eaf4b4..22259fd8d 100644 --- a/examples/firmware/fwclient.c +++ b/examples/firmware/fwclient.c @@ -55,7 +55,9 @@ #include "examples/mqttnet.h" /* Configuration */ +#ifndef MAX_BUFFER_SIZE #define MAX_BUFFER_SIZE FIRMWARE_MAX_PACKET +#endif /* Locals */ static int mStopRead = 0; diff --git a/examples/firmware/fwpush.c b/examples/firmware/fwpush.c index 7b2ee2ad6..676ee95e7 100644 --- a/examples/firmware/fwpush.c +++ b/examples/firmware/fwpush.c @@ -56,7 +56,9 @@ #include "examples/mqttnet.h" /* Configuration */ +#ifndef MAX_BUFFER_SIZE #define MAX_BUFFER_SIZE FIRMWARE_MAX_PACKET +#endif /* Locals */ static int mStopRead = 0; diff --git a/examples/mqttclient/mqttclient.c b/examples/mqttclient/mqttclient.c index 8c65813a2..e390b94b7 100644 --- a/examples/mqttclient/mqttclient.c +++ b/examples/mqttclient/mqttclient.c @@ -34,7 +34,9 @@ static int mStopRead = 0; /* Maximum size for network read/write callbacks. There is also a v5 define that describes the max MQTT control packet size, DEFAULT_MAX_PKT_SZ. */ +#ifndef MAX_BUFFER_SIZE #define MAX_BUFFER_SIZE 1024 +#endif #ifdef WOLFMQTT_PROPERTY_CB #define MAX_CLIENT_ID_LEN 64 @@ -504,6 +506,8 @@ int mqttclient_test(MQTTCtx *mqttCtx) if ((mqttCtx->pub_file) && (mqttCtx->publish.buffer)) { WOLFMQTT_FREE(mqttCtx->publish.buffer); + mqttCtx->publish.buffer = NULL; + mqttCtx->pub_file = NULL; /* don't try and send file again */ } PRINTF("MQTT Publish: Topic %s, %s (%d)", diff --git a/examples/multithread/multithread.c b/examples/multithread/multithread.c index 4114d4549..da4939802 100644 --- a/examples/multithread/multithread.c +++ b/examples/multithread/multithread.c @@ -42,7 +42,9 @@ /* Maximum size for network read/write callbacks. There is also a v5 define that describes the max MQTT control packet size, DEFAULT_MAX_PKT_SZ. */ +#ifndef MAX_BUFFER_SIZE #define MAX_BUFFER_SIZE 1024 +#endif /* Total size of test message to build */ #define TEST_MESSAGE_SIZE 1048 /* span more than one max packet */ diff --git a/examples/nbclient/nbclient.c b/examples/nbclient/nbclient.c index cf337310d..ed438aa89 100644 --- a/examples/nbclient/nbclient.c +++ b/examples/nbclient/nbclient.c @@ -38,7 +38,9 @@ static int mTestDone = 0; /* Configuration */ /* Maximum size for network read/write callbacks. */ +#ifndef MAX_BUFFER_SIZE #define MAX_BUFFER_SIZE 1024 +#endif #ifdef WOLFMQTT_PROPERTY_CB #define MAX_CLIENT_ID_LEN 64 @@ -444,6 +446,8 @@ int mqttclient_test(MQTTCtx *mqttCtx) if ((mqttCtx->pub_file) && (mqttCtx->publish.buffer)) { WOLFMQTT_FREE(mqttCtx->publish.buffer); + mqttCtx->publish.buffer = NULL; + mqttCtx->pub_file = NULL; /* don't try and send file again */ } PRINTF("MQTT Publish: Topic %s, %s (%d)", diff --git a/examples/pub-sub/mqtt-pub.c b/examples/pub-sub/mqtt-pub.c index 467fccee5..b6d220d39 100644 --- a/examples/pub-sub/mqtt-pub.c +++ b/examples/pub-sub/mqtt-pub.c @@ -30,8 +30,10 @@ /* Configuration */ /* Maximum size for network read/write callbacks. There is also a v5 define that - describes the max MQTT control packet size, DEFAULT_MAX_PKT_SZ. */ + * describes the max MQTT control packet size, DEFAULT_MAX_PKT_SZ. */ +#ifndef MAX_BUFFER_SIZE #define MAX_BUFFER_SIZE 1024 +#endif #ifdef WOLFMQTT_PROPERTY_CB #define MAX_CLIENT_ID_LEN 64 @@ -381,6 +383,8 @@ int pub_client(MQTTCtx *mqttCtx) if ((mqttCtx->pub_file) && (mqttCtx->publish.buffer)) { WOLFMQTT_FREE(mqttCtx->publish.buffer); + mqttCtx->publish.buffer = NULL; + mqttCtx->pub_file = NULL; /* don't try and send file again */ } if (mqttCtx->debug_on) { PRINTF("MQTT Publish: Topic %s, %s (%d)", diff --git a/examples/pub-sub/mqtt-sub.c b/examples/pub-sub/mqtt-sub.c index cbace383c..84d791e10 100644 --- a/examples/pub-sub/mqtt-sub.c +++ b/examples/pub-sub/mqtt-sub.c @@ -33,8 +33,10 @@ static int mStopRead = 0; /* Configuration */ /* Maximum size for network read/write callbacks. There is also a v5 define that - describes the max MQTT control packet size, DEFAULT_MAX_PKT_SZ. */ + * describes the max MQTT control packet size, DEFAULT_MAX_PKT_SZ. */ +#ifndef MAX_BUFFER_SIZE #define MAX_BUFFER_SIZE 1024 +#endif #ifdef WOLFMQTT_PROPERTY_CB #define MAX_CLIENT_ID_LEN 64 diff --git a/examples/sn-client/sn-client.c b/examples/sn-client/sn-client.c index e60c1f5af..15b5a9bf1 100644 --- a/examples/sn-client/sn-client.c +++ b/examples/sn-client/sn-client.c @@ -36,7 +36,9 @@ static int mStopRead = 0; /* Configuration */ /* Maximum size for network read/write callbacks. */ +#ifndef MAX_BUFFER_SIZE #define MAX_BUFFER_SIZE 1024 +#endif #define TEST_MESSAGE "test" #define SHORT_TOPIC_NAME "s1" @@ -215,11 +217,11 @@ int sn_test(MQTTCtx *mqttCtx) /* Send Connect and wait for Connect Ack */ rc = SN_Client_Connect(&mqttCtx->client, connect); - if (rc != MQTT_CODE_SUCCESS) { - PRINTF("MQTT-SN Connect: %s (%d)", - MqttClient_ReturnCodeToString(rc), rc); - goto disconn; - } + if (rc != MQTT_CODE_SUCCESS) { + PRINTF("MQTT-SN Connect: %s (%d)", + MqttClient_ReturnCodeToString(rc), rc); + goto disconn; + } /* Validate Connect Ack info */ PRINTF("....MQTT-SN Connect Ack: Return Code %u", diff --git a/examples/sn-client/sn-client_qos-1.c b/examples/sn-client/sn-client_qos-1.c index cee87110a..ed57b09b2 100644 --- a/examples/sn-client/sn-client_qos-1.c +++ b/examples/sn-client/sn-client_qos-1.c @@ -40,7 +40,9 @@ static int mStopRead = 0; /* Configuration */ /* Maximum size for network read/write callbacks. */ +#ifndef MAX_BUFFER_SIZE #define MAX_BUFFER_SIZE 1024 +#endif #define TEST_MESSAGE "QoS-1 test message" char SHORT_TOPIC_NAME[] = {1}; diff --git a/examples/sn-client/sn-multithread.c b/examples/sn-client/sn-multithread.c index 4e7d934ee..7ca310201 100644 --- a/examples/sn-client/sn-multithread.c +++ b/examples/sn-client/sn-multithread.c @@ -36,7 +36,9 @@ /* Configuration */ /* Maximum size for network read/write callbacks. */ +#ifndef MAX_BUFFER_SIZE #define MAX_BUFFER_SIZE 1024 +#endif #define TEST_MESSAGE "test00" /* Number of publish tasks. Each will send a unique message to the broker. */ #define NUM_PUB_TASKS 10 diff --git a/examples/wiot/wiot.c b/examples/wiot/wiot.c index 9c4e59da8..96b6348cc 100644 --- a/examples/wiot/wiot.c +++ b/examples/wiot/wiot.c @@ -41,7 +41,9 @@ static int mStopRead = 0; static int mTestDone = 0; /* Configuration */ +#ifndef MAX_BUFFER_SIZE #define MAX_BUFFER_SIZE 1024 /* Maximum size for network read/write callbacks */ +#endif /* Undefine if using an IBM WIOT Platform account that you created. */ #define WIOT_USE_QUICKSTART From dd9973c3d61028136af04340f8a1273d52e7e5f5 Mon Sep 17 00:00:00 2001 From: David Garske Date: Fri, 24 Nov 2023 08:24:33 -0800 Subject: [PATCH 4/9] Move the test non-block code into the network callbacks, which know if the non-blocking is allowed. --- examples/mqttnet.c | 52 ++++++++++++++++++++++++++++++++++ src/mqtt_socket.c | 70 ---------------------------------------------- 2 files changed, 52 insertions(+), 70 deletions(-) diff --git a/examples/mqttnet.c b/examples/mqttnet.c index acd9d3340..0a73a2f30 100644 --- a/examples/mqttnet.c +++ b/examples/mqttnet.c @@ -32,6 +32,10 @@ typedef struct MulticastCtx { } MulticastCtx; #endif +#ifndef WOLFMQTT_TEST_NONBLOCK_TIMES +#define WOLFMQTT_TEST_NONBLOCK_TIMES 1 +#endif + /* Private functions */ /* -------------------------------------------------------------------------- */ @@ -643,11 +647,16 @@ static int NetWrite(void *context, const byte* buf, int buf_len, int timeout_ms) { SocketContext *sock = (SocketContext*)context; + MQTTCtx* mqttCtx; int rc; SOERROR_T so_error = 0; #ifndef WOLFMQTT_NO_TIMEOUT struct timeval tv; #endif +#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) + static int testNbWriteAlt = 0; + static int testSmallerWrite = 0; +#endif if (context == NULL || buf == NULL || buf_len <= 0) { return MQTT_CODE_ERROR_BAD_ARG; @@ -656,6 +665,27 @@ static int NetWrite(void *context, const byte* buf, int buf_len, if (sock->fd == SOCKET_INVALID) return MQTT_CODE_ERROR_BAD_ARG; + mqttCtx = sock->mqttCtx; + (void)mqttCtx; + +#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) + if (mqttCtx->useNonBlockMode) { + if (testNbWriteAlt < WOLFMQTT_TEST_NONBLOCK_TIMES) { + testNbWriteAlt++; + return MQTT_CODE_CONTINUE; + } + testNbWriteAlt = 0; + if (!testSmallerWrite) { + if (buf_len > 2) + buf_len /= 2; + testSmallerWrite = 1; + } + else { + testSmallerWrite = 0; + } + } +#endif + #ifndef WOLFMQTT_NO_TIMEOUT /* Setup timeout */ setup_timeout(&tv, timeout_ms); @@ -706,6 +736,10 @@ static int NetRead_ex(void *context, byte* buf, int buf_len, fd_set errfds; struct timeval tv; #endif +#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) + static int testNbReadAlt = 0; + static int testSmallerRead = 0; +#endif if (context == NULL || buf == NULL || buf_len <= 0) { return MQTT_CODE_ERROR_BAD_ARG; @@ -721,6 +755,24 @@ static int NetRead_ex(void *context, byte* buf, int buf_len, mqttCtx = sock->mqttCtx; (void)mqttCtx; +#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) + if (mqttCtx->useNonBlockMode) { + if (testNbReadAlt < WOLFMQTT_TEST_NONBLOCK_TIMES) { + testNbReadAlt++; + return MQTT_CODE_CONTINUE; + } + testNbReadAlt = 0; + if (!testSmallerRead) { + if (buf_len > 2) + buf_len /= 2; + testSmallerRead = 1; + } + else { + testSmallerRead = 0; + } + } +#endif + #ifndef WOLFMQTT_NO_TIMEOUT /* Setup timeout */ setup_timeout(&tv, timeout_ms); diff --git a/src/mqtt_socket.c b/src/mqtt_socket.c index c5700ca4e..7fa70caf9 100644 --- a/src/mqtt_socket.c +++ b/src/mqtt_socket.c @@ -42,11 +42,6 @@ #undef WOLFMQTT_DEBUG_SOCKET #endif -/* #define WOLFMQTT_TEST_NONBLOCK */ -#ifdef WOLFMQTT_TEST_NONBLOCK - #define WOLFMQTT_TEST_NONBLOCK_TIMES 1 -#endif - /* lwip */ #ifdef WOLFSSL_LWIP #undef read @@ -64,18 +59,6 @@ int MqttSocket_TlsSocketReceive(WOLFSSL* ssl, char *buf, int sz, MqttClient *client = (MqttClient*)ptr; (void)ssl; /* Not used */ -#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) - static int testSmallerTlsRead = 0; - if (!testSmallerTlsRead) { - if (sz > 2) - sz /= 2; - testSmallerTlsRead = 1; - } - else { - testSmallerTlsRead = 0; - } -#endif - rc = client->net->read(client->net->context, (byte*)buf, sz, client->tls.timeout_ms); @@ -99,18 +82,6 @@ int MqttSocket_TlsSocketSend(WOLFSSL* ssl, char *buf, int sz, MqttClient *client = (MqttClient*)ptr; (void)ssl; /* Not used */ -#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) - static int testSmallerTlsWrite = 0; - if (!testSmallerTlsWrite) { - if (sz > 2) - sz /= 2; - testSmallerTlsWrite = 1; - } - else { - testSmallerTlsWrite = 0; - } -#endif - rc = client->net->write(client->net->context, (byte*)buf, sz, client->tls.timeout_ms); @@ -153,15 +124,6 @@ static int MqttSocket_WriteDo(MqttClient *client, const byte* buf, int buf_len, { int rc; -#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) - static int testNbWriteAlt = 0; - if (testNbWriteAlt < WOLFMQTT_TEST_NONBLOCK_TIMES) { - testNbWriteAlt++; - return MQTT_CODE_CONTINUE; - } - testNbWriteAlt = 0; -#endif - #ifdef ENABLE_MQTT_TLS if (MqttClient_Flags(client,0,0) & MQTT_CLIENT_FLAG_IS_TLS) { client->tls.timeout_ms = timeout_ms; @@ -194,18 +156,6 @@ static int MqttSocket_WriteDo(MqttClient *client, const byte* buf, int buf_len, else #endif /* ENABLE_MQTT_TLS */ { - #if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) - static int testSmallerWrite = 0; - if (!testSmallerWrite) { - if (buf_len > 2) - buf_len /= 2; - testSmallerWrite = 1; - } - else { - testSmallerWrite = 0; - } - #endif - rc = client->net->write(client->net->context, buf, buf_len, timeout_ms); } @@ -276,15 +226,6 @@ static int MqttSocket_ReadDo(MqttClient *client, byte* buf, int buf_len, { int rc; -#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) - static int testNbReadAlt = 0; - if (testNbReadAlt < WOLFMQTT_TEST_NONBLOCK_TIMES) { - testNbReadAlt++; - return MQTT_CODE_CONTINUE; - } - testNbReadAlt = 0; -#endif - #ifdef ENABLE_MQTT_TLS if (MqttClient_Flags(client,0,0) & MQTT_CLIENT_FLAG_IS_TLS) { client->tls.timeout_ms = timeout_ms; @@ -320,17 +261,6 @@ static int MqttSocket_ReadDo(MqttClient *client, byte* buf, int buf_len, else #endif /* ENABLE_MQTT_TLS */ { - #if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) - static int testSmallerRead = 0; - if (!testSmallerRead) { - if (buf_len > 2) - buf_len /= 2; - testSmallerRead = 1; - } - else { - testSmallerRead = 0; - } - #endif rc = client->net->read(client->net->context, buf, buf_len, timeout_ms); } From 072f5b89ab77c783dcdb1cf73bbb57569d3eaa2b Mon Sep 17 00:00:00 2001 From: David Garske Date: Fri, 24 Nov 2023 08:56:46 -0800 Subject: [PATCH 5/9] By default keep mutex locked if we tried to write. The wolfSSL TLS engine requires an SSL_Write that returns WANT_WRITE to be called with the same buffer/sz, not a different one, even if no data was sent. If user wants to enable the feature anyways they can use `WOLFMQTT_ALLOW_NODATA_UNLOCK`. Only the write that has this logic as the issue doesn't exist for an SSL_Read. --- src/mqtt_client.c | 80 ++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 68 insertions(+), 12 deletions(-) diff --git a/src/mqtt_client.c b/src/mqtt_client.c index f284b0207..badcfdea4 100644 --- a/src/mqtt_client.c +++ b/src/mqtt_client.c @@ -26,6 +26,32 @@ #include "wolfmqtt/mqtt_client.h" +/* DOCUMENTED BUILD OPTIONS: + * + * WOLFMQTT_MULTITHREAD: Enables multi-thread support with mutex protection on + * client struct, write and read. When a pending response is needed its added + * to a linked list and if another thread reads the expected response it is + * flagged, so the other thread knows it completed. + * + * WOLFMQTT_NONBLOCK: Enabled transport support for returning WANT READ/WRITE, + * which becomes WOLFMQTT_CODE_CONTINUE. This prevents blocking if the + * transport (socket) has no data. + * + * WOLFMQTT_V5: Enables MQTT v5.0 support + * + * WOLFMQTT_ALLOW_NODATA_UNLOCK: Used with multi-threading and non-blocking to + * allow unlock if no data was sent/received. Note the TLS stack typically + * requires an attempt to write to continue with same write, not different. + * By default if we attempt a write we keep the mutex locked and return + * MQTT_CODE_CONTINUE + * + * WOLFMQTT_USER_THREADING: Allows custom mutex functions to be defined by the + * user. Example: wm_SemInit + * + * WOLFMQTT_DEBUG_CLIENT: Enables verbose PRINTF for the client code. + */ + + /* Private functions */ /* forward declarations */ @@ -221,7 +247,7 @@ static int MqttReadStart(MqttClient* client, MqttMsgStat* stat) MQTT_TRACE_MSG("Warning, recv already locked!"); rc = MQTT_CODE_ERROR_SYSTEM; } - /* detect if a write is already in progress */ + /* detect if a read is already in progress */ if (wm_SemLock(&client->lockClient) == 0) { if (client->read.total > 0) { MQTT_TRACE_MSG("Partial read in progress!"); @@ -692,7 +718,8 @@ static int MqttClient_DecodePacket(MqttClient* client, byte* rx_buf, #ifdef WOLFMQTT_DISCONNECT_CB /* Call disconnect callback with reason code */ if ((packet_obj != NULL) && client->disconnect_cb) { - client->disconnect_cb(client, p_disc->reason_code, client->disconnect_ctx); + client->disconnect_cb(client, p_disc->reason_code, + client->disconnect_ctx); } #endif #else @@ -1087,8 +1114,9 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, if (rc <= 0) { #ifdef WOLFMQTT_NONBLOCK if (rc == MQTT_CODE_CONTINUE && - (client->packet.stat > MQTT_PK_BEGIN || - client->read.total > 0)) { + (client->packet.stat > MQTT_PK_BEGIN || + client->read.total > 0) + ) { /* advance state, since we received some data */ mms_stat->read = MQTT_MSG_HEADER; } @@ -1580,7 +1608,11 @@ int MqttClient_Connect(MqttClient *client, MqttConnect *mc_connect) /* Send connect packet */ rc = MqttPacket_Write(client, client->tx_buf, xfer); #ifdef WOLFMQTT_NONBLOCK - if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) { + if (rc == MQTT_CODE_CONTINUE + #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK + && client->write.total > 0 + #endif + ) { /* keep send locked and return early */ return rc; } @@ -1986,9 +2018,14 @@ static int MqttPublishMsg(MqttClient *client, MqttPublish *publish, /* Send publish packet */ rc = MqttPacket_Write(client, client->tx_buf, xfer); #ifdef WOLFMQTT_NONBLOCK - if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) + if (rc == MQTT_CODE_CONTINUE + #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK + && client->write.total > 0 + #endif + ) { /* keep send locked and return early */ return rc; + } #endif client->write.len = 0; /* reset len, so publish chunk resets */ @@ -2175,7 +2212,11 @@ int MqttClient_Subscribe(MqttClient *client, MqttSubscribe *subscribe) /* Send subscribe packet */ rc = MqttPacket_Write(client, client->tx_buf, xfer); #ifdef WOLFMQTT_NONBLOCK - if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) { + if (rc == MQTT_CODE_CONTINUE + #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK + && client->write.total > 0 + #endif + ) { /* keep send locked and return early */ return rc; } @@ -2276,7 +2317,11 @@ int MqttClient_Unsubscribe(MqttClient *client, MqttUnsubscribe *unsubscribe) /* Send unsubscribe packet */ rc = MqttPacket_Write(client, client->tx_buf, xfer); #ifdef WOLFMQTT_NONBLOCK - if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) { + if (rc == MQTT_CODE_CONTINUE + #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK + && client->write.total > 0 + #endif + ) { /* keep send locked and return early */ return rc; } @@ -2369,7 +2414,11 @@ int MqttClient_Ping_ex(MqttClient *client, MqttPing* ping) /* Send ping req packet */ rc = MqttPacket_Write(client, client->tx_buf, xfer); #ifdef WOLFMQTT_NONBLOCK - if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) { + if (rc == MQTT_CODE_CONTINUE + #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK + && client->write.total > 0 + #endif + ) { /* keep send locked and return early */ return rc; } @@ -2461,8 +2510,11 @@ int MqttClient_Disconnect_ex(MqttClient *client, MqttDisconnect *p_disconnect) rc = MqttPacket_Write(client, client->tx_buf, xfer); #ifdef WOLFMQTT_NONBLOCK /* if disconnect context avail allow partial write in non-blocking mode */ - if (p_disconnect != NULL && - rc == MQTT_CODE_CONTINUE && client->write.total > 0) { + if (p_disconnect != NULL && rc == MQTT_CODE_CONTINUE + #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK + && client->write.total > 0 + #endif + ) { /* keep send locked and return early */ return rc; } @@ -2544,7 +2596,11 @@ int MqttClient_Auth(MqttClient *client, MqttAuth* auth) /* Send authentication packet */ rc = MqttPacket_Write(client, client->tx_buf, xfer); #ifdef WOLFMQTT_NONBLOCK - if (rc == MQTT_CODE_CONTINUE && client->write.total > 0) { + if (rc == MQTT_CODE_CONTINUE + #ifdef WOLFMQTT_ALLOW_NODATA_UNLOCK + && client->write.total > 0 + #endif + ) { /* keep send locked and return early */ return rc; } From 6fdcee6ab0859b4d6cc63c52bfff031a53f7e418 Mon Sep 17 00:00:00 2001 From: David Garske Date: Fri, 24 Nov 2023 09:09:26 -0800 Subject: [PATCH 6/9] Add back logic to return MQTT_CODE_CONTINUE if a write lock happens while data has partially been written. --- src/mqtt_client.c | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/mqtt_client.c b/src/mqtt_client.c index badcfdea4..25f3b838a 100644 --- a/src/mqtt_client.c +++ b/src/mqtt_client.c @@ -195,15 +195,27 @@ static int MqttWriteStart(MqttClient* client, MqttMsgStat* stat) int rc = MQTT_CODE_SUCCESS; #ifdef WOLFMQTT_MULTITHREAD + #if defined(WOLFMQTT_DEBUG_CLIENT) || !defined(WOLFMQTT_ALLOW_NODATA_UNLOCK) #ifdef WOLFMQTT_DEBUG_CLIENT if (stat->isWriteActive) { MQTT_TRACE_MSG("Warning, send already locked!"); rc = MQTT_CODE_ERROR_SYSTEM; } + #endif /* WOLFMQTT_DEBUG_CLIENT */ + #ifndef WOLFMQTT_ALLOW_NODATA_UNLOCK + /* detect if a write is already in progress */ + if (wm_SemLock(&client->lockClient) == 0) { + if (client->write.total > 0) { + MQTT_TRACE_MSG("Partial write in progress!"); + rc = MQTT_CODE_CONTINUE; /* can't write yet */ + } + wm_SemUnlock(&client->lockClient); + } + #endif /* WOLFMQTT_ALLOW_NODATA_UNLOCK */ if (rc != 0) { return rc; } - #endif /* WOLFMQTT_DEBUG_CLIENT */ + #endif rc = wm_SemLock(&client->lockSend); #endif /* WOLFMQTT_MULTITHREAD */ From 033634773fd9ff10d0b2b44d1cda1a49b3caf5b2 Mon Sep 17 00:00:00 2001 From: David Garske Date: Mon, 27 Nov 2023 13:05:57 -0800 Subject: [PATCH 7/9] Improve logic for detection of active read or write. Add test case for trying to send multiple publishes in the same thread. --- examples/multithread/multithread.c | 72 ++++++++++++-------- src/mqtt_client.c | 106 ++++++++++++++++++++--------- wolfmqtt/mqtt_client.h | 3 + 3 files changed, 122 insertions(+), 59 deletions(-) diff --git a/examples/multithread/multithread.c b/examples/multithread/multithread.c index da4939802..0dc13bf79 100644 --- a/examples/multithread/multithread.c +++ b/examples/multithread/multithread.c @@ -38,7 +38,8 @@ /* Configuration */ /* Number of publish tasks. Each will send a unique message to the broker. */ -#define NUM_PUB_TASKS 10 +#define NUM_PUB_TASKS 5 +#define NUM_PUB_PER_TASK 2 /* Maximum size for network read/write callbacks. There is also a v5 define that describes the max MQTT control packet size, DEFAULT_MAX_PKT_SZ. */ @@ -470,7 +471,8 @@ static int TestIsDone(int rc, MQTTCtx* mqttCtx) /* check if we are in test mode and done */ wm_SemLock(&mtLock); if ((rc == 0 || rc == MQTT_CODE_CONTINUE) && mqttCtx->test_mode && - mNumMsgsDone == NUM_PUB_TASKS && mNumMsgsRecvd == NUM_PUB_TASKS + mNumMsgsDone == (NUM_PUB_TASKS * NUM_PUB_PER_TASK) && + mNumMsgsRecvd == (NUM_PUB_TASKS * NUM_PUB_PER_TASK) #ifdef WOLFMQTT_NONBLOCK && !MqttClient_IsMessageActive(&mqttCtx->client, NULL) #endif @@ -598,37 +600,51 @@ static DWORD WINAPI publish_task( LPVOID param ) static void *publish_task(void *param) #endif { - int rc; + int rc[NUM_PUB_PER_TASK], i; MQTTCtx *mqttCtx = (MQTTCtx*)param; - MqttPublish publish; - word32 startSec = 0; - - /* Publish Topic */ - XMEMSET(&publish, 0, sizeof(MqttPublish)); - publish.retain = 0; - publish.qos = mqttCtx->qos; - publish.duplicate = 0; - publish.topic_name = mqttCtx->topic_name; - publish.packet_id = mqtt_get_packetid_threadsafe(); - publish.buffer = (byte*)mTestMessage; - publish.total_len = sizeof(mTestMessage); + MqttPublish publish[NUM_PUB_PER_TASK]; + word32 startSec[NUM_PUB_PER_TASK]; + + /* Build publish */ + for (i=0; iqos; + publish[i].duplicate = 0; + publish[i].topic_name = mqttCtx->topic_name; + publish[i].packet_id = mqtt_get_packetid_threadsafe(); + publish[i].buffer = (byte*)mTestMessage; + publish[i].total_len = sizeof(mTestMessage); + + rc[i] = MQTT_CODE_CONTINUE; + startSec[i] = 0; + } - do { - rc = MqttClient_Publish_WriteOnly(&mqttCtx->client, &publish, NULL); - rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_PUBLISH, - mqttCtx->cmd_timeout_ms); - } while (rc == MQTT_CODE_CONTINUE); - if (rc != MQTT_CODE_SUCCESS) { - MqttClient_CancelMessage(&mqttCtx->client, (MqttObject*)&publish); + /* Send until != continue */ + for (i=0; iclient, &publish[i], + NULL); + rc[i] = check_response(mqttCtx, rc[i], &startSec[i], + MQTT_PACKET_TYPE_PUBLISH, mqttCtx->cmd_timeout_ms); + } } - PRINTF("MQTT Publish: Topic %s, %s (%d)", - publish.topic_name, - MqttClient_ReturnCodeToString(rc), rc); + /* Report result */ + for (i=0; iclient, (MqttObject*)&publish[i]); + } - wm_SemLock(&mtLock); - mNumMsgsDone++; - wm_SemUnlock(&mtLock); + PRINTF("MQTT Publish: Topic %s, %s (%d)", + publish[i].topic_name, + MqttClient_ReturnCodeToString(rc[i]), rc[i]); + + wm_SemLock(&mtLock); + mNumMsgsDone++; + wm_SemUnlock(&mtLock); + } THREAD_EXIT(0); } diff --git a/src/mqtt_client.c b/src/mqtt_client.c index 25f3b838a..4a4294dba 100644 --- a/src/mqtt_client.c +++ b/src/mqtt_client.c @@ -76,7 +76,7 @@ static int MqttClient_CancelMessage(MqttClient *client, MqttObject* msg); #elif defined(__MACH__) /* Apple style dispatch semaphore */ - int wm_SemInit(wm_Sem *s){ + int wm_SemInit(wm_Sem *s) { /* dispatch_release() fails hard, with Trace/BPT trap signal, if the * sem's internal count is less than the value passed in with * dispatch_semaphore_create(). work around this by initializing @@ -92,7 +92,7 @@ static int MqttClient_CancelMessage(MqttClient *client, MqttObject* msg); return 0; } - int wm_SemFree(wm_Sem *s){ + int wm_SemFree(wm_Sem *s) { if ((s == NULL) || (s->sem == NULL)) return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); @@ -194,38 +194,52 @@ static int MqttWriteStart(MqttClient* client, MqttMsgStat* stat) { int rc = MQTT_CODE_SUCCESS; -#ifdef WOLFMQTT_MULTITHREAD - #if defined(WOLFMQTT_DEBUG_CLIENT) || !defined(WOLFMQTT_ALLOW_NODATA_UNLOCK) - #ifdef WOLFMQTT_DEBUG_CLIENT +#if defined(WOLFMQTT_DEBUG_CLIENT) || !defined(WOLFMQTT_ALLOW_NODATA_UNLOCK) + #ifdef WOLFMQTT_DEBUG_CLIENT if (stat->isWriteActive) { MQTT_TRACE_MSG("Warning, send already locked!"); rc = MQTT_CODE_ERROR_SYSTEM; } - #endif /* WOLFMQTT_DEBUG_CLIENT */ - #ifndef WOLFMQTT_ALLOW_NODATA_UNLOCK + #endif + #ifndef WOLFMQTT_ALLOW_NODATA_UNLOCK /* detect if a write is already in progress */ - if (wm_SemLock(&client->lockClient) == 0) { - if (client->write.total > 0) { + #ifdef WOLFMQTT_MULTITHREAD + if (wm_SemLock(&client->lockClient) == 0) + #endif + { + if (client->write.isActive) { MQTT_TRACE_MSG("Partial write in progress!"); rc = MQTT_CODE_CONTINUE; /* can't write yet */ } + #ifdef WOLFMQTT_MULTITHREAD wm_SemUnlock(&client->lockClient); + #endif } - #endif /* WOLFMQTT_ALLOW_NODATA_UNLOCK */ - if (rc != 0) { + #endif /* WOLFMQTT_ALLOW_NODATA_UNLOCK */ + if (rc != MQTT_CODE_SUCCESS) { return rc; } - #endif +#endif +#ifdef WOLFMQTT_MULTITHREAD rc = wm_SemLock(&client->lockSend); -#endif /* WOLFMQTT_MULTITHREAD */ - if (rc == 0) { +#endif + if (rc == MQTT_CODE_SUCCESS) { stat->isWriteActive = 1; + + #ifdef WOLFMQTT_MULTITHREAD + if (wm_SemLock(&client->lockClient) == 0) + #endif + { + client->write.isActive = 1; + #ifdef WOLFMQTT_MULTITHREAD + wm_SemUnlock(&client->lockClient); + #endif + } + MQTT_TRACE_MSG("lockSend"); } - (void)client; - return rc; } static void MqttWriteStop(MqttClient* client, MqttMsgStat* stat) @@ -237,8 +251,16 @@ static void MqttWriteStop(MqttClient* client, MqttMsgStat* stat) } #endif - /* reset write */ - XMEMSET(&client->write, 0, sizeof(client->write)); +#ifdef WOLFMQTT_MULTITHREAD + if (wm_SemLock(&client->lockClient) == 0) +#endif + { + /* reset write */ + XMEMSET(&client->write, 0, sizeof(client->write)); + #ifdef WOLFMQTT_MULTITHREAD + wm_SemUnlock(&client->lockClient); + #endif + } if (stat->isWriteActive) { MQTT_TRACE_MSG("unlockSend"); @@ -253,31 +275,47 @@ static int MqttReadStart(MqttClient* client, MqttMsgStat* stat) { int rc = MQTT_CODE_SUCCESS; -#ifdef WOLFMQTT_MULTITHREAD - #ifdef WOLFMQTT_DEBUG_CLIENT +#ifdef WOLFMQTT_DEBUG_CLIENT if (stat->isReadActive) { MQTT_TRACE_MSG("Warning, recv already locked!"); rc = MQTT_CODE_ERROR_SYSTEM; } /* detect if a read is already in progress */ - if (wm_SemLock(&client->lockClient) == 0) { - if (client->read.total > 0) { + #ifdef WOLFMQTT_MULTITHREAD + if (wm_SemLock(&client->lockClient) == 0) + #endif + { + if (client->read.isActive) { MQTT_TRACE_MSG("Partial read in progress!"); rc = MQTT_CODE_CONTINUE; /* can't read yet */ } + #ifdef WOLFMQTT_MULTITHREAD wm_SemUnlock(&client->lockClient); + #endif } if (rc != 0) return rc; - #endif /* WOLFMQTT_DEBUG_CLIENT */ +#endif /* WOLFMQTT_DEBUG_CLIENT */ +#ifdef WOLFMQTT_MULTITHREAD rc = wm_SemLock(&client->lockRecv); -#endif /* WOLFMQTT_MULTITHREAD */ - if (rc == 0) { +#endif + if (rc == MQTT_CODE_SUCCESS) { stat->isReadActive = 1; + + #ifdef WOLFMQTT_MULTITHREAD + if (wm_SemLock(&client->lockClient) == 0) + #endif + { + client->read.isActive = 1; + #ifdef WOLFMQTT_MULTITHREAD + wm_SemUnlock(&client->lockClient); + #endif + } + MQTT_TRACE_MSG("lockRecv"); } - (void)client; + return rc; } static void MqttReadStop(MqttClient* client, MqttMsgStat* stat) @@ -289,12 +327,20 @@ static void MqttReadStop(MqttClient* client, MqttMsgStat* stat) } #endif - /* reset read */ - XMEMSET(&client->read, 0, sizeof(client->read)); +#ifdef WOLFMQTT_MULTITHREAD + if (wm_SemLock(&client->lockClient) == 0) +#endif + { + /* reset read */ + XMEMSET(&client->read, 0, sizeof(client->read)); + #ifdef WOLFMQTT_MULTITHREAD + wm_SemUnlock(&client->lockClient); + #endif + } if (stat->isReadActive) { - stat->isReadActive = 0; MQTT_TRACE_MSG("unlockRecv"); + stat->isReadActive = 0; #ifdef WOLFMQTT_MULTITHREAD wm_SemUnlock(&client->lockRecv); #endif @@ -2738,14 +2784,12 @@ int MqttClient_CancelMessage(MqttClient *client, MqttObject* msg) #ifdef WOLFMQTT_DEBUG_CLIENT PRINTF("Cancel Read Lock"); #endif - mms_stat->isReadActive = 0; MqttReadStop(client, mms_stat); } if (mms_stat->isWriteActive) { #ifdef WOLFMQTT_DEBUG_CLIENT PRINTF("Cancel Write Lock"); #endif - mms_stat->isWriteActive = 0; MqttWriteStop(client, mms_stat); } diff --git a/wolfmqtt/mqtt_client.h b/wolfmqtt/mqtt_client.h index e8ad0299c..a8bcd7d5f 100644 --- a/wolfmqtt/mqtt_client.h +++ b/wolfmqtt/mqtt_client.h @@ -131,6 +131,9 @@ typedef struct _MqttSk { int pos; /* position inside current buffer */ int len; /* length of current segment being sent */ int total; /* number bytes sent or received */ + + /* status bit for if client read or write is active */ + byte isActive:1; } MqttSk; #ifdef WOLFMQTT_DISCONNECT_CB From 8a0cafe48a23db2b210e2b8451e25a71d62911b8 Mon Sep 17 00:00:00 2001 From: David Garske Date: Mon, 27 Nov 2023 13:33:15 -0800 Subject: [PATCH 8/9] Add packet_id to the publish log message in examples. --- examples/aws/awsiot.c | 9 +++++---- examples/azure/azureiothub.c | 9 +++++---- examples/firmware/fwpush.c | 4 ++-- examples/mqttclient/mqttclient.c | 8 ++++---- examples/mqttsimple/mqttsimple.c | 5 +++-- examples/multithread/multithread.c | 8 ++++---- examples/nbclient/nbclient.c | 4 ++-- examples/pub-sub/mqtt-pub.c | 4 ++-- examples/wiot/wiot.c | 9 +++++---- 9 files changed, 32 insertions(+), 28 deletions(-) diff --git a/examples/aws/awsiot.c b/examples/aws/awsiot.c index 09c3a05ad..8ae5516c7 100644 --- a/examples/aws/awsiot.c +++ b/examples/aws/awsiot.c @@ -617,8 +617,9 @@ int awsiot_test(MQTTCtx *mqttCtx) if (rc == MQTT_CODE_CONTINUE) { return rc; } - PRINTF("MQTT Publish: Topic %s, %s (%d)", - mqttCtx->publish.topic_name, MqttClient_ReturnCodeToString(rc), rc); + PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)", + mqttCtx->publish.topic_name, mqttCtx->publish.packet_id, + MqttClient_ReturnCodeToString(rc), rc); if (rc != MQTT_CODE_SUCCESS) { goto disconn; } @@ -676,8 +677,8 @@ int awsiot_test(MQTTCtx *mqttCtx) mqttCtx->publish.buffer = (byte*)mqttCtx->app_ctx; mqttCtx->publish.total_len = (word32)XSTRLEN((char*)mqttCtx->app_ctx); rc = MqttClient_Publish(&mqttCtx->client, &mqttCtx->publish); - PRINTF("MQTT Publish: Topic %s, %s (%d)", - mqttCtx->publish.topic_name, + PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)", + mqttCtx->publish.topic_name, mqttCtx->publish.packet_id, MqttClient_ReturnCodeToString(rc), rc); } } diff --git a/examples/azure/azureiothub.c b/examples/azure/azureiothub.c index dc8205b65..80189cbc7 100644 --- a/examples/azure/azureiothub.c +++ b/examples/azure/azureiothub.c @@ -439,8 +439,9 @@ int azureiothub_test(MQTTCtx *mqttCtx) if (rc == MQTT_CODE_CONTINUE) { return rc; } - PRINTF("MQTT Publish: Topic %s, %s (%d)", - mqttCtx->publish.topic_name, MqttClient_ReturnCodeToString(rc), rc); + PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)", + mqttCtx->publish.topic_name, mqttCtx->publish.packet_id, + MqttClient_ReturnCodeToString(rc), rc); if (rc != MQTT_CODE_SUCCESS) { goto disconn; } @@ -496,8 +497,8 @@ int azureiothub_test(MQTTCtx *mqttCtx) mqttCtx->publish.total_len = (word16)rc; rc = MqttClient_Publish(&mqttCtx->client, &mqttCtx->publish); - PRINTF("MQTT Publish: Topic %s, %s (%d)", - mqttCtx->publish.topic_name, + PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)", + mqttCtx->publish.topic_name, mqttCtx->publish.packet_id, MqttClient_ReturnCodeToString(rc), rc); } } diff --git a/examples/firmware/fwpush.c b/examples/firmware/fwpush.c index 676ee95e7..fbefbdc90 100644 --- a/examples/firmware/fwpush.c +++ b/examples/firmware/fwpush.c @@ -455,8 +455,8 @@ int fwpush_test(MQTTCtx *mqttCtx) return rc; } - PRINTF("MQTT Publish: Topic %s, %s (%d)", - mqttCtx->publish.topic_name, + PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)", + mqttCtx->publish.topic_name, mqttCtx->publish.packet_id, MqttClient_ReturnCodeToString(rc), rc); if (rc != MQTT_CODE_SUCCESS) { goto disconn; diff --git a/examples/mqttclient/mqttclient.c b/examples/mqttclient/mqttclient.c index e390b94b7..7869a18ff 100644 --- a/examples/mqttclient/mqttclient.c +++ b/examples/mqttclient/mqttclient.c @@ -510,8 +510,8 @@ int mqttclient_test(MQTTCtx *mqttCtx) mqttCtx->pub_file = NULL; /* don't try and send file again */ } - PRINTF("MQTT Publish: Topic %s, %s (%d)", - mqttCtx->publish.topic_name, + PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)", + mqttCtx->publish.topic_name, mqttCtx->publish.packet_id, MqttClient_ReturnCodeToString(rc), rc); #ifdef WOLFMQTT_V5 if (mqttCtx->qos > 0) { @@ -576,8 +576,8 @@ int mqttclient_test(MQTTCtx *mqttCtx) mqttCtx->publish.total_len = (word16)rc; rc = MqttClient_Publish(&mqttCtx->client, &mqttCtx->publish); - PRINTF("MQTT Publish: Topic %s, %s (%d)", - mqttCtx->publish.topic_name, + PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)", + mqttCtx->publish.topic_name, mqttCtx->publish.packet_id, MqttClient_ReturnCodeToString(rc), rc); } } diff --git a/examples/mqttsimple/mqttsimple.c b/examples/mqttsimple/mqttsimple.c index 5fa115b53..814c8f370 100644 --- a/examples/mqttsimple/mqttsimple.c +++ b/examples/mqttsimple/mqttsimple.c @@ -424,8 +424,9 @@ int mqttsimple_test(void) if (rc != MQTT_CODE_SUCCESS) { goto exit; } - PRINTF("MQTT Publish: Topic %s, Qos %d, Message %s", - mqttObj.publish.topic_name, mqttObj.publish.qos, mqttObj.publish.buffer); + PRINTF("MQTT Publish: Topic %s, ID %d, Qos %d, Message %s", + mqttObj.publish.topic_name, mqttObj.publish.packet_id, + mqttObj.publish.qos, mqttObj.publish.buffer); /* Wait for messages */ while (1) { diff --git a/examples/multithread/multithread.c b/examples/multithread/multithread.c index 0dc13bf79..8a6ee52c9 100644 --- a/examples/multithread/multithread.c +++ b/examples/multithread/multithread.c @@ -559,8 +559,8 @@ static void *waitMessage_task(void *param) MqttClient_CancelMessage(&mqttCtx->client, (MqttObject*)&mqttCtx->publish); } - PRINTF("MQTT Publish: Topic %s, %s (%d)", - mqttCtx->publish.topic_name, + PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)", + mqttCtx->publish.topic_name, mqttCtx->publish.packet_id, MqttClient_ReturnCodeToString(rc), rc); } } @@ -637,8 +637,8 @@ static void *publish_task(void *param) MqttClient_CancelMessage(&mqttCtx->client, (MqttObject*)&publish[i]); } - PRINTF("MQTT Publish: Topic %s, %s (%d)", - publish[i].topic_name, + PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)", + publish[i].topic_name, publish[i].packet_id, MqttClient_ReturnCodeToString(rc[i]), rc[i]); wm_SemLock(&mtLock); diff --git a/examples/nbclient/nbclient.c b/examples/nbclient/nbclient.c index ed438aa89..786c12e70 100644 --- a/examples/nbclient/nbclient.c +++ b/examples/nbclient/nbclient.c @@ -450,8 +450,8 @@ int mqttclient_test(MQTTCtx *mqttCtx) mqttCtx->pub_file = NULL; /* don't try and send file again */ } - PRINTF("MQTT Publish: Topic %s, %s (%d)", - mqttCtx->publish.topic_name, + PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)", + mqttCtx->publish.topic_name, mqttCtx->publish.packet_id, MqttClient_ReturnCodeToString(rc), rc); if (rc != MQTT_CODE_SUCCESS) { goto disconn; diff --git a/examples/pub-sub/mqtt-pub.c b/examples/pub-sub/mqtt-pub.c index b6d220d39..40c1fc0a3 100644 --- a/examples/pub-sub/mqtt-pub.c +++ b/examples/pub-sub/mqtt-pub.c @@ -387,8 +387,8 @@ int pub_client(MQTTCtx *mqttCtx) mqttCtx->pub_file = NULL; /* don't try and send file again */ } if (mqttCtx->debug_on) { - PRINTF("MQTT Publish: Topic %s, %s (%d)", - mqttCtx->publish.topic_name, + PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)", + mqttCtx->publish.topic_name, mqttCtx->publish.packet_id, MqttClient_ReturnCodeToString(rc), rc); } if (rc != MQTT_CODE_SUCCESS) { diff --git a/examples/wiot/wiot.c b/examples/wiot/wiot.c index 96b6348cc..28a06aa41 100644 --- a/examples/wiot/wiot.c +++ b/examples/wiot/wiot.c @@ -265,8 +265,9 @@ int wiot_test(MQTTCtx *mqttCtx) rc = MqttClient_Publish(&mqttCtx->client, &mqttCtx->publish); - PRINTF("MQTT Publish: Topic %s, %s (%d)", - mqttCtx->publish.topic_name, MqttClient_ReturnCodeToString(rc), rc); + PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)", + mqttCtx->publish.topic_name, mqttCtx->publish.packet_id, + MqttClient_ReturnCodeToString(rc), rc); if (rc != MQTT_CODE_SUCCESS) { goto disconn; } @@ -316,8 +317,8 @@ int wiot_test(MQTTCtx *mqttCtx) mqttCtx->publish.buffer = mqttCtx->rx_buf; mqttCtx->publish.total_len = (word16)rc; rc = MqttClient_Publish(&mqttCtx->client, &mqttCtx->publish); - PRINTF("MQTT Publish: Topic %s, %s (%d)", - mqttCtx->publish.topic_name, + PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)", + mqttCtx->publish.topic_name, mqttCtx->publish.packet_id, MqttClient_ReturnCodeToString(rc), rc); } } From bc7ae0af80ce2053e1e0992eb5e1f51bd3344896 Mon Sep 17 00:00:00 2001 From: David Garske Date: Mon, 27 Nov 2023 14:08:44 -0800 Subject: [PATCH 9/9] Fixed issue with QoS2 on received publish ACK getting skipped if write is already locked. --- src/mqtt_client.c | 61 ++++++++++++++++++++---------------------- wolfmqtt/mqtt_client.h | 2 +- 2 files changed, 30 insertions(+), 33 deletions(-) diff --git a/src/mqtt_client.c b/src/mqtt_client.c index 4a4294dba..4f1aa7fdd 100644 --- a/src/mqtt_client.c +++ b/src/mqtt_client.c @@ -307,7 +307,12 @@ static int MqttReadStart(MqttClient* client, MqttMsgStat* stat) if (wm_SemLock(&client->lockClient) == 0) #endif { + /* mark read active */ client->read.isActive = 1; + + /* reset the packet state used by MqttPacket_Read */ + client->packet.stat = MQTT_PK_BEGIN; + #ifdef WOLFMQTT_MULTITHREAD wm_SemUnlock(&client->lockClient); #endif @@ -1155,9 +1160,6 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, return rc; } - /* reset the packet state used by MqttPacket_Read */ - client->packet.stat = MQTT_PK_BEGIN; - mms_stat->read = MQTT_MSG_WAIT; } FALL_THROUGH; @@ -1311,6 +1313,10 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, if (rc >= 0) { rc = MQTT_CODE_SUCCESS; } + else { + /* error, break */ + break; + } #ifdef WOLFMQTT_MULTITHREAD if (pendResp) { @@ -1327,39 +1333,21 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, } #endif /* WOLFMQTT_MULTITHREAD */ - /* are we sending ACK or done with message? */ + /* Determine if we are sending ACK or done */ if (MqttIsPubRespPacket(resp.packet_type)) { + /* if we get here, then we are sending an ACK */ mms_stat->read = MQTT_MSG_ACK; - } - else { - mms_stat->read = MQTT_MSG_BEGIN; + mms_stat->ack = MQTT_MSG_WAIT; + + /* setup ACK in shared context */ + XMEMCPY(&client->packetAck, &resp, sizeof(MqttPublishResp)); + #ifdef WOLFMQTT_V5 + client->packetAck.protocol_level = client->protocol_level; + #endif } /* done reading */ MqttReadStop(client, mms_stat); - - /* if error, leave */ - if (rc != MQTT_CODE_SUCCESS) { - break; - } - - /* if not sending an ACK, we are done */ - if (!MqttIsPubRespPacket(resp.packet_type)) { - break; - } - - /* Flag write active / lock mutex */ - if ((rc = MqttWriteStart(client, mms_stat)) != 0) { - break; - } - - /* setup ACK in shared context */ - XMEMCPY(&client->packetAck, &resp, sizeof(MqttPublishResp)); - #ifdef WOLFMQTT_V5 - client->packetAck.protocol_level = client->protocol_level; - #endif - - mms_stat->ack = MQTT_MSG_ACK; break; } @@ -1382,10 +1370,19 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, switch (mms_stat->ack) { case MQTT_MSG_BEGIN: - case MQTT_MSG_WAIT: /* wait for read to set ack */ break; + case MQTT_MSG_WAIT: + { + /* Flag write active / lock mutex */ + if ((rc = MqttWriteStart(client, mms_stat)) != 0) { + break; + } + mms_stat->ack = MQTT_MSG_ACK; + } + FALL_THROUGH; + case MQTT_MSG_ACK: { /* send ack */ @@ -1453,7 +1450,7 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, #endif /* no data read or ack done, then reset state */ - if (mms_stat->read == MQTT_MSG_WAIT || mms_stat->read == MQTT_MSG_ACK) { + if (mms_stat->read == MQTT_MSG_WAIT) { mms_stat->read = MQTT_MSG_BEGIN; } diff --git a/wolfmqtt/mqtt_client.h b/wolfmqtt/mqtt_client.h index a8bcd7d5f..ddb70afe1 100644 --- a/wolfmqtt/mqtt_client.h +++ b/wolfmqtt/mqtt_client.h @@ -115,7 +115,7 @@ enum MqttClientFlags { WOLFMQTT_API word32 MqttClient_Flags(struct _MqttClient *client, word32 mask, word32 flags); typedef enum _MqttPkStat { - MQTT_PK_BEGIN, + MQTT_PK_BEGIN = 0, MQTT_PK_READ_HEAD, MQTT_PK_READ } MqttPkStat;