Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Helgrind thread errors with enable-tls, and enable-curl. #395

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 69 additions & 18 deletions examples/mqttnet.c
Original file line number Diff line number Diff line change
Expand Up @@ -391,37 +391,66 @@ static int NetRead(void *context, byte* buf, int buf_len,
* MQTT_CODE_CONTINUE, or proceed with a smaller buffer read/write.
* Used for testing nonblocking. */
static int
mqttcurl_test_nonblock(int* buf_len, int for_recv)
mqttcurl_test_nonblock_read(int* buf_len)
{
static int testNbAlt = 0;
static int testSmallerBuf = 0;
#if !defined(WOLFMQTT_DEBUG_SOCKET)
(void)for_recv;
#endif
static int testNbReadAlt = 0;
static int testSmallerRead = 0;

if (testNbReadAlt < WOLFMQTT_TEST_NONBLOCK_TIMES) {
testNbReadAlt++;
#if defined(WOLFMQTT_DEBUG_SOCKET)
PRINTF("mqttcurl_test_nonblock_read: returning early with CONTINUE");
#endif
return MQTT_CODE_CONTINUE;
}

testNbReadAlt = 0;

if (!testSmallerRead) {
if (*buf_len > 2) {
*buf_len /= 2;
testSmallerRead = 1;
#if defined(WOLFMQTT_DEBUG_SOCKET)
PRINTF("mqttcurl_test_nonblock_read: testing small buff: %d",
*buf_len);
#endif
}
}
else {
testSmallerRead = 0;
}

return MQTT_CODE_SUCCESS;
}

if (testNbAlt < WOLFMQTT_TEST_NONBLOCK_TIMES) {
testNbAlt++;
static int
mqttcurl_test_nonblock_write(int* buf_len)
{
static int testNbWriteAlt = 0;
static int testSmallerWrite = 0;

if (testNbWriteAlt < WOLFMQTT_TEST_NONBLOCK_TIMES) {
testNbWriteAlt++;
#if defined(WOLFMQTT_DEBUG_SOCKET)
PRINTF("mqttcurl_test_nonblock(%d): returning early with CONTINUE",
for_recv);
PRINTF("mqttcurl_test_nonblock_write: returning early with CONTINUE");
#endif
return MQTT_CODE_CONTINUE;
}

testNbAlt = 0;
testNbWriteAlt = 0;

if (!testSmallerBuf) {
if (!testSmallerWrite) {
if (*buf_len > 2) {
*buf_len /= 2;
testSmallerBuf = 1;
testSmallerWrite = 1;
#if defined(WOLFMQTT_DEBUG_SOCKET)
PRINTF("mqttcurl_test_nonblock(%d): testing small buff: %d",
for_recv, *buf_len);
PRINTF("mqttcurl_test_nonblock_write: testing small buff: %d",
*buf_len);
#endif
}
}
else {
testSmallerBuf = 0;
testSmallerWrite = 0;
}

return MQTT_CODE_SUCCESS;
Expand Down Expand Up @@ -745,7 +774,7 @@ static int NetWrite(void *context, const byte* buf, int buf_len,

#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK)
if (sock->mqttCtx->useNonBlockMode) {
if (mqttcurl_test_nonblock(&buf_len, 0)) {
if (mqttcurl_test_nonblock_write(&buf_len)) {
return MQTT_CODE_CONTINUE;
}
}
Expand Down Expand Up @@ -773,8 +802,19 @@ static int NetWrite(void *context, const byte* buf, int buf_len,
* payload will be transferred in a single shot without buffering.
* todo: add buffering? */
for (size_t i = 0; i < MQTT_CURL_NUM_RETRY; ++i) {
#ifdef WOLFMQTT_MULTITHREAD
int rc = wm_SemLock(&sock->mqttCtx->client.lockCURL);
if (rc != 0) {
return rc;
}
#endif

res = curl_easy_send(sock->curl, buf, buf_len, &sent);

#ifdef WOLFMQTT_MULTITHREAD
wm_SemUnlock(&sock->mqttCtx->client.lockCURL);
#endif

if (res == CURLE_OK) {
#if defined(WOLFMQTT_DEBUG_SOCKET)
PRINTF("info: curl_easy_send(%d) returned: %d, %s", buf_len, res,
Expand Down Expand Up @@ -828,7 +868,7 @@ static int NetRead(void *context, byte* buf, int buf_len,

#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK)
if (sock->mqttCtx->useNonBlockMode) {
if (mqttcurl_test_nonblock(&buf_len, 1)) {
if (mqttcurl_test_nonblock_read(&buf_len)) {
return MQTT_CODE_CONTINUE;
}
}
Expand Down Expand Up @@ -856,8 +896,19 @@ static int NetRead(void *context, byte* buf, int buf_len,
* payload will be transferred in a single shot without buffering.
* todo: add buffering? */
for (size_t i = 0; i < MQTT_CURL_NUM_RETRY; ++i) {
#ifdef WOLFMQTT_MULTITHREAD
int rc = wm_SemLock(&sock->mqttCtx->client.lockCURL);
if (rc != 0) {
return rc;
}
#endif

res = curl_easy_recv(sock->curl, buf, buf_len, &recvd);

#ifdef WOLFMQTT_MULTITHREAD
wm_SemUnlock(&sock->mqttCtx->client.lockCURL);
#endif

if (res == CURLE_OK) {
#if defined(WOLFMQTT_DEBUG_SOCKET)
PRINTF("info: curl_easy_recv(%d) returned: %d, %s", buf_len, res,
Expand Down
16 changes: 16 additions & 0 deletions src/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1551,6 +1551,16 @@ int MqttClient_Init(MqttClient *client, MqttNet* net,
if (rc == 0) {
rc = wm_SemInit(&client->lockClient);
}
#ifdef ENABLE_MQTT_TLS
if (rc == 0) {
rc = wm_SemInit(&client->lockSSL);
}
#endif
#ifdef ENABLE_MQTT_CURL
if (rc == 0) {
rc = wm_SemInit(&client->lockCURL);
}
#endif
#endif

if (rc == 0) {
Expand All @@ -1573,6 +1583,12 @@ void MqttClient_DeInit(MqttClient *client)
(void)wm_SemFree(&client->lockSend);
(void)wm_SemFree(&client->lockRecv);
(void)wm_SemFree(&client->lockClient);
#ifdef ENABLE_MQTT_TLS
(void)wm_SemFree(&client->lockSSL);
#endif
#ifdef ENABLE_MQTT_CURL
(void)wm_SemFree(&client->lockCURL);
#endif
#endif
}
#ifdef WOLFMQTT_V5
Expand Down
35 changes: 30 additions & 5 deletions src/mqtt_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ int MqttSocket_TlsSocketReceive(WOLFSSL* ssl, char *buf, int sz,
(void)ssl; /* Not used */

rc = client->net->read(client->net->context, (byte*)buf, sz,
client->tls.timeout_ms);
client->tls.timeout_ms_read);

/* save network read response */
client->tls.sockRcRead = rc;
Expand All @@ -87,7 +87,7 @@ int MqttSocket_TlsSocketSend(WOLFSSL* ssl, char *buf, int sz,
(void)ssl; /* Not used */

rc = client->net->write(client->net->context, (byte*)buf, sz,
client->tls.timeout_ms);
client->tls.timeout_ms_write);

/* save network write response */
client->tls.sockRcWrite = rc;
Expand Down Expand Up @@ -116,7 +116,8 @@ int MqttSocket_Init(MqttClient *client, MqttNet *net)
#if defined(ENABLE_MQTT_TLS) && !defined(ENABLE_MQTT_CURL)
client->tls.ctx = NULL;
client->tls.ssl = NULL;
client->tls.timeout_ms = client->cmd_timeout_ms;
client->tls.timeout_ms_read = client->cmd_timeout_ms;
client->tls.timeout_ms_write = client->cmd_timeout_ms;
#endif

/* Validate callbacks are not null! */
Expand All @@ -134,8 +135,16 @@ static int MqttSocket_WriteDo(MqttClient *client, const byte* buf, int buf_len,

#if defined(ENABLE_MQTT_TLS) && !defined(ENABLE_MQTT_CURL)
if (MqttClient_Flags(client,0,0) & MQTT_CLIENT_FLAG_IS_TLS) {
client->tls.timeout_ms = timeout_ms;
client->tls.timeout_ms_write = timeout_ms;
client->tls.sockRcWrite = 0; /* init value */

#if defined(WOLFMQTT_MULTITHREAD) && defined(WOLFMQTT_NONBLOCK)
rc = wm_SemLock(&client->lockSSL);
if (rc != 0) {
return rc;
}
#endif

rc = wolfSSL_write(client->tls.ssl, (char*)buf, buf_len);
if (rc < 0) {
#if defined(WOLFMQTT_DEBUG_SOCKET) || defined(WOLFSSL_ASYNC_CRYPT)
Expand All @@ -160,6 +169,10 @@ static int MqttSocket_WriteDo(MqttClient *client, const byte* buf, int buf_len,
}
#endif
}

#if defined(WOLFMQTT_MULTITHREAD) && defined(WOLFMQTT_NONBLOCK)
wm_SemUnlock(&client->lockSSL);
#endif
}
else
#endif /* ENABLE_MQTT_TLS && !ENABLE_MQTT_CURL*/
Expand Down Expand Up @@ -236,8 +249,16 @@ static int MqttSocket_ReadDo(MqttClient *client, byte* buf, int buf_len,

#if defined(ENABLE_MQTT_TLS) && !defined(ENABLE_MQTT_CURL)
if (MqttClient_Flags(client,0,0) & MQTT_CLIENT_FLAG_IS_TLS) {
client->tls.timeout_ms = timeout_ms;
client->tls.timeout_ms_read = timeout_ms;
client->tls.sockRcRead = 0; /* init value */

#if defined(WOLFMQTT_MULTITHREAD) && defined(WOLFMQTT_NONBLOCK)
rc = wm_SemLock(&client->lockSSL);
if (rc != 0) {
return rc;
}
#endif

rc = wolfSSL_read(client->tls.ssl, (char*)buf, buf_len);
if (rc < 0) {
int error = wolfSSL_get_error(client->tls.ssl, 0);
Expand Down Expand Up @@ -265,6 +286,10 @@ static int MqttSocket_ReadDo(MqttClient *client, byte* buf, int buf_len,
rc = MQTT_CODE_ERROR_NETWORK;
}
}

#if defined(WOLFMQTT_MULTITHREAD) && defined(WOLFMQTT_NONBLOCK)
wm_SemUnlock(&client->lockSSL);
#endif
}
else
#endif /* ENABLE_MQTT_TLS && !ENABLE_MQTT_CURL*/
Expand Down
6 changes: 6 additions & 0 deletions wolfmqtt/mqtt_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,12 @@ typedef struct _MqttClient {
wm_Sem lockSend;
wm_Sem lockRecv;
wm_Sem lockClient;
#ifdef ENABLE_MQTT_TLS
wm_Sem lockSSL;
#endif
#ifdef ENABLE_MQTT_CURL
wm_Sem lockCURL;
#endif
struct _MqttPendResp* firstPendResp; /* protected with client lock */
struct _MqttPendResp* lastPendResp; /* protected with client lock */
#endif
Expand Down
3 changes: 2 additions & 1 deletion wolfmqtt/mqtt_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ typedef struct _MqttTls {
WOLFSSL *ssl;
int sockRcRead;
int sockRcWrite;
int timeout_ms;
int timeout_ms_read;
int timeout_ms_write;
} MqttTls;
#endif

Expand Down
Loading