diff --git a/examples/multithread/multithread.c b/examples/multithread/multithread.c index 92ce00692..617272606 100755 --- a/examples/multithread/multithread.c +++ b/examples/multithread/multithread.c @@ -33,19 +33,29 @@ #include +#ifdef WOLFMQTT_MULTITHREAD + /* Configuration */ +/* Number of publish tasks. Each will send a unique message to the broker. */ +#define NUM_PUB_TASKS 10 + /* Maximum size for network read/write callbacks. There is also a v5 define that describes the max MQTT control packet size, DEFAULT_MAX_PKT_SZ. */ #define MAX_BUFFER_SIZE 1024 -#define TEST_MESSAGE "test00" -/* Number of publish tasks. Each will send a unique message to the broker. */ -#define NUM_PUB_TASKS 10 - -#ifdef WOLFMQTT_MULTITHREAD +/* Total size of test message to build */ +#define TEST_MESSAGE_SIZE 4680 /* Locals */ +static const char testMessageBase[] = { + 'a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z', + 'A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z', + '0','1','2','3','4','5','6','7','8','9', + ' ', '!', '"', '#', '$', '%', '&', '\'', '(', ')', '*', '+', ',', '-','.', '/', + ':', ';', '<', '=', '>', '?', '@', '[', '\\', ']', '^', '_','`', '{', '|', '}' +}; +static char mTestMessage[TEST_MESSAGE_SIZE]; static int mStopRead = 0; static int mNumMsgsRecvd; static int mNumMsgsDone; @@ -171,15 +181,9 @@ static int mqtt_message_cb(MqttClient *client, MqttMessage *msg, PRINTF("MQTT Message: Topic %s, Qos %d, Id %d, Len %u, %u, %u", buf, msg->qos, msg->packet_id, msg->total_len, msg->buffer_len, msg->buffer_pos); - /* for test mode: count the number of TEST_MESSAGE matches received */ + /* for test mode: count the number of messages received */ if (mqttCtx->test_mode) { - if (XSTRLEN(TEST_MESSAGE) == msg->buffer_len && - /* Only compare the "test" part */ - XSTRNCMP(TEST_MESSAGE, (char*)msg->buffer, - msg->buffer_len-2) == 0) - { - mNumMsgsRecvd++; - } + mNumMsgsRecvd++; } } @@ -507,7 +511,7 @@ static void *waitMessage_task(void *param) rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_ANY); /* check return code */ - if (rc == MQTT_CODE_CONTINUE) { + if (rc == MQTT_CODE_CONTINUE || rc == MQTT_CODE_PUB_CONTINUE) { continue; } #ifdef WOLFMQTT_ENABLE_STDIN_CAP @@ -571,7 +575,6 @@ static void *publish_task(void *param) #endif { int rc; - char buf[7]; MQTTCtx *mqttCtx = (MQTTCtx*)param; MqttPublish publish; word32 startSec = 0; @@ -583,16 +586,13 @@ static void *publish_task(void *param) publish.duplicate = 0; publish.topic_name = mqttCtx->topic_name; publish.packet_id = mqtt_get_packetid_threadsafe(); - XSTRNCPY(buf, TEST_MESSAGE, sizeof(buf)); - buf[4] = '0' + ((publish.packet_id / 10) % 10); - buf[5] = '0' + (publish.packet_id % 10); - publish.buffer = (byte*)buf; - publish.total_len = (word16)XSTRLEN(buf); + publish.buffer = (byte*)mTestMessage; + publish.total_len = sizeof(mTestMessage); do { rc = MqttClient_Publish_WriteOnly(&mqttCtx->client, &publish, NULL); rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_PUBLISH); - } while (rc == MQTT_CODE_CONTINUE); + } while (rc == MQTT_CODE_CONTINUE || rc == MQTT_CODE_PUB_CONTINUE); if (rc != MQTT_CODE_SUCCESS) { MqttClient_CancelMessage(&mqttCtx->client, (MqttObject*)&publish); } @@ -679,8 +679,18 @@ static int unsubscribe_do(MQTTCtx *mqttCtx) int multithread_test(MQTTCtx *mqttCtx) { int rc = 0, i, threadCount = 0; + size_t msgSz; THREAD_T threadList[NUM_PUB_TASKS+3]; + /* Build test message */ + for (msgSz=0; msgSz TEST_MESSAGE_SIZE) + x = TEST_MESSAGE_SIZE - msgSz; + XMEMCPY(&mTestMessage[msgSz], testMessageBase, x); + msgSz += x; + } + rc = multithread_test_init(mqttCtx); if (rc == 0) { if (THREAD_CREATE(&threadList[threadCount++], subscribe_task, mqttCtx)) { @@ -705,6 +715,7 @@ int multithread_test(MQTTCtx *mqttCtx) PRINTF("THREAD_CREATE failed: %d", errno); return -1; } + /* Create threads that publish unique messages */ for (i = 0; i < NUM_PUB_TASKS; i++) { if (THREAD_CREATE(&threadList[threadCount++], publish_task, mqttCtx)) { diff --git a/src/mqtt_client.c b/src/mqtt_client.c index 0a5091bb8..23ef2c371 100644 --- a/src/mqtt_client.c +++ b/src/mqtt_client.c @@ -1906,7 +1906,7 @@ static int MqttPublishMsg(MqttClient *client, MqttPublish *publish, { rc = MqttClient_Publish_WritePayload(client, publish, pubCb); #ifdef WOLFMQTT_NONBLOCK - if (rc == MQTT_CODE_CONTINUE) + if (rc == MQTT_CODE_CONTINUE || rc == MQTT_CODE_PUB_CONTINUE) return rc; #endif #ifdef WOLFMQTT_MULTITHREAD