Skip to content

Commit

Permalink
Merge pull request #373 from dgarske/test_pubfile
Browse files Browse the repository at this point in the history
Fixes for non-blocking with larger payload and improvements to the test and examples
  • Loading branch information
embhorn authored Nov 28, 2023
2 parents fef107f + bc7ae0a commit 3b93bdb
Show file tree
Hide file tree
Showing 19 changed files with 350 additions and 178 deletions.
11 changes: 7 additions & 4 deletions examples/aws/awsiot.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ static int mTestDone = 0;
#define APP_HARDWARE "wolf_aws_iot_demo"
#define APP_FIRMWARE_VERSION LIBWOLFMQTT_VERSION_STRING

#ifndef MAX_BUFFER_SIZE
#define MAX_BUFFER_SIZE 512 /* Maximum size for network read/write callbacks */
#endif
#define AWSIOT_HOST "a2dujmi05ideo2-ats.iot.us-west-2.amazonaws.com"
#define AWSIOT_DEVICE_ID "demoDevice"
#define AWSIOT_QOS MQTT_QOS_1
Expand Down Expand Up @@ -615,8 +617,9 @@ int awsiot_test(MQTTCtx *mqttCtx)
if (rc == MQTT_CODE_CONTINUE) {
return rc;
}
PRINTF("MQTT Publish: Topic %s, %s (%d)",
mqttCtx->publish.topic_name, MqttClient_ReturnCodeToString(rc), rc);
PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)",
mqttCtx->publish.topic_name, mqttCtx->publish.packet_id,
MqttClient_ReturnCodeToString(rc), rc);
if (rc != MQTT_CODE_SUCCESS) {
goto disconn;
}
Expand Down Expand Up @@ -674,8 +677,8 @@ int awsiot_test(MQTTCtx *mqttCtx)
mqttCtx->publish.buffer = (byte*)mqttCtx->app_ctx;
mqttCtx->publish.total_len = (word32)XSTRLEN((char*)mqttCtx->app_ctx);
rc = MqttClient_Publish(&mqttCtx->client, &mqttCtx->publish);
PRINTF("MQTT Publish: Topic %s, %s (%d)",
mqttCtx->publish.topic_name,
PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)",
mqttCtx->publish.topic_name, mqttCtx->publish.packet_id,
MqttClient_ReturnCodeToString(rc), rc);
}
}
Expand Down
11 changes: 7 additions & 4 deletions examples/azure/azureiothub.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ static int mTestDone = 0;
* https://azure.microsoft.com/en-us/documentation/articles/iot-hub-sas-tokens/#using-sas-tokens-as-a-device
* https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-mqtt-support
*/
#ifndef MAX_BUFFER_SIZE
#define MAX_BUFFER_SIZE 1024 /* Maximum size for network read/write callbacks */
#endif
#define AZURE_API_VERSION "?api-version=2018-06-30"
#define AZURE_HOST "wolfMQTT.azure-devices.net"
#define AZURE_DEVICE_ID "demoDevice"
Expand Down Expand Up @@ -437,8 +439,9 @@ int azureiothub_test(MQTTCtx *mqttCtx)
if (rc == MQTT_CODE_CONTINUE) {
return rc;
}
PRINTF("MQTT Publish: Topic %s, %s (%d)",
mqttCtx->publish.topic_name, MqttClient_ReturnCodeToString(rc), rc);
PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)",
mqttCtx->publish.topic_name, mqttCtx->publish.packet_id,
MqttClient_ReturnCodeToString(rc), rc);
if (rc != MQTT_CODE_SUCCESS) {
goto disconn;
}
Expand Down Expand Up @@ -494,8 +497,8 @@ int azureiothub_test(MQTTCtx *mqttCtx)
mqttCtx->publish.total_len = (word16)rc;

rc = MqttClient_Publish(&mqttCtx->client, &mqttCtx->publish);
PRINTF("MQTT Publish: Topic %s, %s (%d)",
mqttCtx->publish.topic_name,
PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)",
mqttCtx->publish.topic_name, mqttCtx->publish.packet_id,
MqttClient_ReturnCodeToString(rc), rc);
}
}
Expand Down
2 changes: 2 additions & 0 deletions examples/firmware/fwclient.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@
#include "examples/mqttnet.h"

/* Configuration */
#ifndef MAX_BUFFER_SIZE
#define MAX_BUFFER_SIZE FIRMWARE_MAX_PACKET
#endif

/* Locals */
static int mStopRead = 0;
Expand Down
6 changes: 4 additions & 2 deletions examples/firmware/fwpush.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@
#include "examples/mqttnet.h"

/* Configuration */
#ifndef MAX_BUFFER_SIZE
#define MAX_BUFFER_SIZE FIRMWARE_MAX_PACKET
#endif

/* Locals */
static int mStopRead = 0;
Expand Down Expand Up @@ -453,8 +455,8 @@ int fwpush_test(MQTTCtx *mqttCtx)
return rc;
}

PRINTF("MQTT Publish: Topic %s, %s (%d)",
mqttCtx->publish.topic_name,
PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)",
mqttCtx->publish.topic_name, mqttCtx->publish.packet_id,
MqttClient_ReturnCodeToString(rc), rc);
if (rc != MQTT_CODE_SUCCESS) {
goto disconn;
Expand Down
12 changes: 8 additions & 4 deletions examples/mqttclient/mqttclient.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ static int mStopRead = 0;

/* Maximum size for network read/write callbacks. There is also a v5 define that
describes the max MQTT control packet size, DEFAULT_MAX_PKT_SZ. */
#ifndef MAX_BUFFER_SIZE
#define MAX_BUFFER_SIZE 1024
#endif

#ifdef WOLFMQTT_PROPERTY_CB
#define MAX_CLIENT_ID_LEN 64
Expand Down Expand Up @@ -504,10 +506,12 @@ int mqttclient_test(MQTTCtx *mqttCtx)

if ((mqttCtx->pub_file) && (mqttCtx->publish.buffer)) {
WOLFMQTT_FREE(mqttCtx->publish.buffer);
mqttCtx->publish.buffer = NULL;
mqttCtx->pub_file = NULL; /* don't try and send file again */
}

PRINTF("MQTT Publish: Topic %s, %s (%d)",
mqttCtx->publish.topic_name,
PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)",
mqttCtx->publish.topic_name, mqttCtx->publish.packet_id,
MqttClient_ReturnCodeToString(rc), rc);
#ifdef WOLFMQTT_V5
if (mqttCtx->qos > 0) {
Expand Down Expand Up @@ -572,8 +576,8 @@ int mqttclient_test(MQTTCtx *mqttCtx)
mqttCtx->publish.total_len = (word16)rc;
rc = MqttClient_Publish(&mqttCtx->client,
&mqttCtx->publish);
PRINTF("MQTT Publish: Topic %s, %s (%d)",
mqttCtx->publish.topic_name,
PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)",
mqttCtx->publish.topic_name, mqttCtx->publish.packet_id,
MqttClient_ReturnCodeToString(rc), rc);
}
}
Expand Down
52 changes: 52 additions & 0 deletions examples/mqttnet.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ typedef struct MulticastCtx {
} MulticastCtx;
#endif

#ifndef WOLFMQTT_TEST_NONBLOCK_TIMES
#define WOLFMQTT_TEST_NONBLOCK_TIMES 1
#endif

/* Private functions */

/* -------------------------------------------------------------------------- */
Expand Down Expand Up @@ -658,11 +662,16 @@ static int NetWrite(void *context, const byte* buf, int buf_len,
int timeout_ms)
{
SocketContext *sock = (SocketContext*)context;
MQTTCtx* mqttCtx;
int rc;
SOERROR_T so_error = 0;
#ifndef WOLFMQTT_NO_TIMEOUT
struct timeval tv;
#endif
#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK)
static int testNbWriteAlt = 0;
static int testSmallerWrite = 0;
#endif

if (context == NULL || buf == NULL || buf_len <= 0) {
return MQTT_CODE_ERROR_BAD_ARG;
Expand All @@ -671,6 +680,27 @@ static int NetWrite(void *context, const byte* buf, int buf_len,
if (sock->fd == SOCKET_INVALID)
return MQTT_CODE_ERROR_BAD_ARG;

mqttCtx = sock->mqttCtx;
(void)mqttCtx;

#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK)
if (mqttCtx->useNonBlockMode) {
if (testNbWriteAlt < WOLFMQTT_TEST_NONBLOCK_TIMES) {
testNbWriteAlt++;
return MQTT_CODE_CONTINUE;
}
testNbWriteAlt = 0;
if (!testSmallerWrite) {
if (buf_len > 2)
buf_len /= 2;
testSmallerWrite = 1;
}
else {
testSmallerWrite = 0;
}
}
#endif

#ifndef WOLFMQTT_NO_TIMEOUT
/* Setup timeout */
setup_timeout(&tv, timeout_ms);
Expand Down Expand Up @@ -721,6 +751,10 @@ static int NetRead_ex(void *context, byte* buf, int buf_len,
fd_set errfds;
struct timeval tv;
#endif
#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK)
static int testNbReadAlt = 0;
static int testSmallerRead = 0;
#endif

if (context == NULL || buf == NULL || buf_len <= 0) {
return MQTT_CODE_ERROR_BAD_ARG;
Expand All @@ -736,6 +770,24 @@ static int NetRead_ex(void *context, byte* buf, int buf_len,
mqttCtx = sock->mqttCtx;
(void)mqttCtx;

#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK)
if (mqttCtx->useNonBlockMode) {
if (testNbReadAlt < WOLFMQTT_TEST_NONBLOCK_TIMES) {
testNbReadAlt++;
return MQTT_CODE_CONTINUE;
}
testNbReadAlt = 0;
if (!testSmallerRead) {
if (buf_len > 2)
buf_len /= 2;
testSmallerRead = 1;
}
else {
testSmallerRead = 0;
}
}
#endif

#ifndef WOLFMQTT_NO_TIMEOUT
/* Setup timeout */
setup_timeout(&tv, timeout_ms);
Expand Down
5 changes: 3 additions & 2 deletions examples/mqttsimple/mqttsimple.c
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,9 @@ int mqttsimple_test(void)
if (rc != MQTT_CODE_SUCCESS) {
goto exit;
}
PRINTF("MQTT Publish: Topic %s, Qos %d, Message %s",
mqttObj.publish.topic_name, mqttObj.publish.qos, mqttObj.publish.buffer);
PRINTF("MQTT Publish: Topic %s, ID %d, Qos %d, Message %s",
mqttObj.publish.topic_name, mqttObj.publish.packet_id,
mqttObj.publish.qos, mqttObj.publish.buffer);

/* Wait for messages */
while (1) {
Expand Down
78 changes: 48 additions & 30 deletions examples/multithread/multithread.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@
/* Configuration */

/* Number of publish tasks. Each will send a unique message to the broker. */
#define NUM_PUB_TASKS 10
#define NUM_PUB_TASKS 5
#define NUM_PUB_PER_TASK 2

/* Maximum size for network read/write callbacks. There is also a v5 define that
describes the max MQTT control packet size, DEFAULT_MAX_PKT_SZ. */
#ifndef MAX_BUFFER_SIZE
#define MAX_BUFFER_SIZE 1024
#endif

/* Total size of test message to build */
#define TEST_MESSAGE_SIZE 1048 /* span more than one max packet */
Expand Down Expand Up @@ -468,7 +471,8 @@ static int TestIsDone(int rc, MQTTCtx* mqttCtx)
/* check if we are in test mode and done */
wm_SemLock(&mtLock);
if ((rc == 0 || rc == MQTT_CODE_CONTINUE) && mqttCtx->test_mode &&
mNumMsgsDone == NUM_PUB_TASKS && mNumMsgsRecvd == NUM_PUB_TASKS
mNumMsgsDone == (NUM_PUB_TASKS * NUM_PUB_PER_TASK) &&
mNumMsgsRecvd == (NUM_PUB_TASKS * NUM_PUB_PER_TASK)
#ifdef WOLFMQTT_NONBLOCK
&& !MqttClient_IsMessageActive(&mqttCtx->client, NULL)
#endif
Expand Down Expand Up @@ -555,8 +559,8 @@ static void *waitMessage_task(void *param)
MqttClient_CancelMessage(&mqttCtx->client,
(MqttObject*)&mqttCtx->publish);
}
PRINTF("MQTT Publish: Topic %s, %s (%d)",
mqttCtx->publish.topic_name,
PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)",
mqttCtx->publish.topic_name, mqttCtx->publish.packet_id,
MqttClient_ReturnCodeToString(rc), rc);
}
}
Expand Down Expand Up @@ -596,37 +600,51 @@ static DWORD WINAPI publish_task( LPVOID param )
static void *publish_task(void *param)
#endif
{
int rc;
int rc[NUM_PUB_PER_TASK], i;
MQTTCtx *mqttCtx = (MQTTCtx*)param;
MqttPublish publish;
word32 startSec = 0;

/* Publish Topic */
XMEMSET(&publish, 0, sizeof(MqttPublish));
publish.retain = 0;
publish.qos = mqttCtx->qos;
publish.duplicate = 0;
publish.topic_name = mqttCtx->topic_name;
publish.packet_id = mqtt_get_packetid_threadsafe();
publish.buffer = (byte*)mTestMessage;
publish.total_len = sizeof(mTestMessage);
MqttPublish publish[NUM_PUB_PER_TASK];
word32 startSec[NUM_PUB_PER_TASK];

/* Build publish */
for (i=0; i<NUM_PUB_PER_TASK; i++) {
/* Publish Topic */
XMEMSET(&publish[i], 0, sizeof(MqttPublish));
publish[i].retain = 0;
publish[i].qos = mqttCtx->qos;
publish[i].duplicate = 0;
publish[i].topic_name = mqttCtx->topic_name;
publish[i].packet_id = mqtt_get_packetid_threadsafe();
publish[i].buffer = (byte*)mTestMessage;
publish[i].total_len = sizeof(mTestMessage);

rc[i] = MQTT_CODE_CONTINUE;
startSec[i] = 0;
}

do {
rc = MqttClient_Publish_WriteOnly(&mqttCtx->client, &publish, NULL);
rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_PUBLISH,
mqttCtx->cmd_timeout_ms);
} while (rc == MQTT_CODE_CONTINUE);
if (rc != MQTT_CODE_SUCCESS) {
MqttClient_CancelMessage(&mqttCtx->client, (MqttObject*)&publish);
/* Send until != continue */
for (i=0; i<NUM_PUB_PER_TASK; i++) {
while (rc[i] == MQTT_CODE_CONTINUE) {
rc[i] = MqttClient_Publish_WriteOnly(&mqttCtx->client, &publish[i],
NULL);
rc[i] = check_response(mqttCtx, rc[i], &startSec[i],
MQTT_PACKET_TYPE_PUBLISH, mqttCtx->cmd_timeout_ms);
}
}

PRINTF("MQTT Publish: Topic %s, %s (%d)",
publish.topic_name,
MqttClient_ReturnCodeToString(rc), rc);
/* Report result */
for (i=0; i<NUM_PUB_PER_TASK; i++) {
if (rc[i] != MQTT_CODE_SUCCESS) {
MqttClient_CancelMessage(&mqttCtx->client, (MqttObject*)&publish[i]);
}

wm_SemLock(&mtLock);
mNumMsgsDone++;
wm_SemUnlock(&mtLock);
PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)",
publish[i].topic_name, publish[i].packet_id,
MqttClient_ReturnCodeToString(rc[i]), rc[i]);

wm_SemLock(&mtLock);
mNumMsgsDone++;
wm_SemUnlock(&mtLock);
}

THREAD_EXIT(0);
}
Expand Down
8 changes: 6 additions & 2 deletions examples/nbclient/nbclient.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ static int mTestDone = 0;
/* Configuration */

/* Maximum size for network read/write callbacks. */
#ifndef MAX_BUFFER_SIZE
#define MAX_BUFFER_SIZE 1024
#endif

#ifdef WOLFMQTT_PROPERTY_CB
#define MAX_CLIENT_ID_LEN 64
Expand Down Expand Up @@ -444,10 +446,12 @@ int mqttclient_test(MQTTCtx *mqttCtx)

if ((mqttCtx->pub_file) && (mqttCtx->publish.buffer)) {
WOLFMQTT_FREE(mqttCtx->publish.buffer);
mqttCtx->publish.buffer = NULL;
mqttCtx->pub_file = NULL; /* don't try and send file again */
}

PRINTF("MQTT Publish: Topic %s, %s (%d)",
mqttCtx->publish.topic_name,
PRINTF("MQTT Publish: Topic %s, ID %d, %s (%d)",
mqttCtx->publish.topic_name, mqttCtx->publish.packet_id,
MqttClient_ReturnCodeToString(rc), rc);
if (rc != MQTT_CODE_SUCCESS) {
goto disconn;
Expand Down
Loading

0 comments on commit 3b93bdb

Please sign in to comment.