Skip to content

Commit

Permalink
Fix for non-blocking publish with payload larger than maximum TX buff…
Browse files Browse the repository at this point in the history
…er. ZD 16769.
  • Loading branch information
dgarske committed Oct 20, 2023
1 parent d8e9699 commit f7ddc30
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 22 deletions.
53 changes: 32 additions & 21 deletions examples/multithread/multithread.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,29 @@
#include <stdint.h>


#ifdef WOLFMQTT_MULTITHREAD

/* Configuration */

/* Number of publish tasks. Each will send a unique message to the broker. */
#define NUM_PUB_TASKS 10

/* Maximum size for network read/write callbacks. There is also a v5 define that
describes the max MQTT control packet size, DEFAULT_MAX_PKT_SZ. */
#define MAX_BUFFER_SIZE 1024
#define TEST_MESSAGE "test00"
/* Number of publish tasks. Each will send a unique message to the broker. */
#define NUM_PUB_TASKS 10


#ifdef WOLFMQTT_MULTITHREAD
/* Total size of test message to build */
#define TEST_MESSAGE_SIZE 4680

/* Locals */
static const char testMessageBase[] = {
'a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z',
'A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z',
'0','1','2','3','4','5','6','7','8','9',
' ', '!', '"', '#', '$', '%', '&', '\'', '(', ')', '*', '+', ',', '-','.', '/',
':', ';', '<', '=', '>', '?', '@', '[', '\\', ']', '^', '_','`', '{', '|', '}'
};
static char mTestMessage[TEST_MESSAGE_SIZE];
static int mStopRead = 0;
static int mNumMsgsRecvd;
static int mNumMsgsDone;
Expand Down Expand Up @@ -171,15 +181,9 @@ static int mqtt_message_cb(MqttClient *client, MqttMessage *msg,
PRINTF("MQTT Message: Topic %s, Qos %d, Id %d, Len %u, %u, %u",
buf, msg->qos, msg->packet_id, msg->total_len, msg->buffer_len, msg->buffer_pos);

/* for test mode: count the number of TEST_MESSAGE matches received */
/* for test mode: count the number of messages received */
if (mqttCtx->test_mode) {
if (XSTRLEN(TEST_MESSAGE) == msg->buffer_len &&
/* Only compare the "test" part */
XSTRNCMP(TEST_MESSAGE, (char*)msg->buffer,
msg->buffer_len-2) == 0)
{
mNumMsgsRecvd++;
}
mNumMsgsRecvd++;
}
}

Expand Down Expand Up @@ -507,7 +511,7 @@ static void *waitMessage_task(void *param)
rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_ANY);

/* check return code */
if (rc == MQTT_CODE_CONTINUE) {
if (rc == MQTT_CODE_CONTINUE || rc == MQTT_CODE_PUB_CONTINUE) {
continue;
}
#ifdef WOLFMQTT_ENABLE_STDIN_CAP
Expand Down Expand Up @@ -571,7 +575,6 @@ static void *publish_task(void *param)
#endif
{
int rc;
char buf[7];
MQTTCtx *mqttCtx = (MQTTCtx*)param;
MqttPublish publish;
word32 startSec = 0;
Expand All @@ -583,16 +586,13 @@ static void *publish_task(void *param)
publish.duplicate = 0;
publish.topic_name = mqttCtx->topic_name;
publish.packet_id = mqtt_get_packetid_threadsafe();
XSTRNCPY(buf, TEST_MESSAGE, sizeof(buf));
buf[4] = '0' + ((publish.packet_id / 10) % 10);
buf[5] = '0' + (publish.packet_id % 10);
publish.buffer = (byte*)buf;
publish.total_len = (word16)XSTRLEN(buf);
publish.buffer = (byte*)mTestMessage;
publish.total_len = sizeof(mTestMessage);

do {
rc = MqttClient_Publish_WriteOnly(&mqttCtx->client, &publish, NULL);
rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_PUBLISH);
} while (rc == MQTT_CODE_CONTINUE);
} while (rc == MQTT_CODE_CONTINUE || rc == MQTT_CODE_PUB_CONTINUE);
if (rc != MQTT_CODE_SUCCESS) {
MqttClient_CancelMessage(&mqttCtx->client, (MqttObject*)&publish);
}
Expand Down Expand Up @@ -679,8 +679,18 @@ static int unsubscribe_do(MQTTCtx *mqttCtx)
int multithread_test(MQTTCtx *mqttCtx)
{
int rc = 0, i, threadCount = 0;
size_t msgSz;
THREAD_T threadList[NUM_PUB_TASKS+3];

/* Build test message */
for (msgSz=0; msgSz<TEST_MESSAGE_SIZE; ) {
size_t x = sizeof(testMessageBase);
if (msgSz + x > TEST_MESSAGE_SIZE)
x = TEST_MESSAGE_SIZE - msgSz;
XMEMCPY(&mTestMessage[msgSz], testMessageBase, x);
msgSz += x;
}

rc = multithread_test_init(mqttCtx);
if (rc == 0) {
if (THREAD_CREATE(&threadList[threadCount++], subscribe_task, mqttCtx)) {
Expand All @@ -705,6 +715,7 @@ int multithread_test(MQTTCtx *mqttCtx)
PRINTF("THREAD_CREATE failed: %d", errno);
return -1;
}

/* Create threads that publish unique messages */
for (i = 0; i < NUM_PUB_TASKS; i++) {
if (THREAD_CREATE(&threadList[threadCount++], publish_task, mqttCtx)) {
Expand Down
2 changes: 1 addition & 1 deletion src/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1906,7 +1906,7 @@ static int MqttPublishMsg(MqttClient *client, MqttPublish *publish,
{
rc = MqttClient_Publish_WritePayload(client, publish, pubCb);
#ifdef WOLFMQTT_NONBLOCK
if (rc == MQTT_CODE_CONTINUE)
if (rc == MQTT_CODE_CONTINUE || rc == MQTT_CODE_PUB_CONTINUE)
return rc;
#endif
#ifdef WOLFMQTT_MULTITHREAD
Expand Down

0 comments on commit f7ddc30

Please sign in to comment.