Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Failing to subscribe after creating a thread for WaitMessage #413

Open
BlimbBoat opened this issue Jan 16, 2025 · 3 comments
Open

Failing to subscribe after creating a thread for WaitMessage #413

BlimbBoat opened this issue Jan 16, 2025 · 3 comments
Assignees
Labels

Comments

@BlimbBoat
Copy link

Hi,

I think I need some help understanding the current issue that I am having.

Problem

In my original POC (using the code found in the mqtt examples) I subscribed to my topics before creating a thread that would WaitMessage and Ping, which worked fine. Being able to publish messages fine as well.

I plan to change it so that upon request, a MQTT module will connect to the requested topics, which can happen at any time during the runtime of the program.

Scenarios

While trying some ideas to solve the problems I hit two roadblocks.

Scenario 1

With some minor adjustments to the base subscribe code:
rc = MqttClient_Subscribe(&mqttCtx->client, &mqttCtx->subscribe); returns MQTT_CODE_CONTINUE. I added a do while loop which keeps me stuck in an infinite loop.

PendResp Found: 0x2000c8d0, Type Subscribe Ack (9), ID 1, InProc 0, Done 0
MqttClient_WaitType: Type Subscribe Ack (9), ID 1, State 0-0

Scenario 2

This might be completely wrong, because I did not see it in the multithread example, but I added a wm_Sem that the code below will make use of. PUBLISH, SUBSCRIBE and UNSUBSCRIBE utilize the same 1 with mqtt_get_packetid_threadsafe()

static void mqtt_waitmsg_ping_task(void* param)
{
    MQTTCtx *mqttCtx = (MQTTCtx*)param;
	MqttPing ping;
	XMEMSET(&ping, 0, sizeof(ping));
	int rc;
    /* Read Loop */
	// Try to separate ping and read
	while(connected)
	{
        if(wm_SemLock(&mqttctx_lock) == 0) {
            /* Try and read packet */
            rc = MqttClient_WaitMessage_ex(&mqttCtx->client, &mqttCtx->client.msg,
                1000);
            if(rc != MQTT_CODE_ERROR_TIMEOUT && rc != MQTT_CODE_SUCCESS) {
                LOG_ERR("WaitMessage_ex error: %d", rc);
            }

            current_time = k_uptime_get();
            if(current_time >= mqtt_com_start_time + 
                (1000 * (CONFIG_NMDI_MQTT_CLIENT_KEEP_ALIVE_SEC - 1))) {
                MqttClient_Ping_ex(&mqttCtx->client, &ping);
                mqtt_com_start_time = current_time;
            }
            wm_SemUnlock(&mqttctx_lock);
        }
		k_yield();
	}
}

The result will be that rc = client->net->write(client->net->context, buf, buf_len, timeout_ms); returns -1.

Some additional info

Zephyr RTOS
I am connected to a MQTT Broker using TLSv1.3

Any insight is greatly appreciated!

@embhorn embhorn self-assigned this Jan 16, 2025
@embhorn
Copy link
Member

embhorn commented Jan 16, 2025

Hi @BlimbBoat

You'll want to reinitialize the command structures in between successful calls. So for scenario 1, are you zeroing the structure before calling subscribe? Could you share the full context of the API call?

For scenario 2, same question. Are you clearing the ping structure in between successful calls?

Could you tell us a bit about your project using wolfMQTT? Feel free to create a support ticket by emailing [email protected] for a more private conversation.

Thanks,
Eric - @embhorn

@BlimbBoat
Copy link
Author

BlimbBoat commented Jan 17, 2025

Sure! It is almost the same as the one in the example. In this case the code below showcases my attempt with wm_Sem.

MqttSubscribe is set to 0 again after every call. Is this what you meant with command structures?

int client_subscribe(MQTTCtx *mqttCtx, struct mqtt_topic *topic, wm_Sem *s)
{
    int rc = MQTT_CODE_SUCCESS;

    /* Build list of topics */
    XMEMSET(&mqttCtx->subscribe, 0, sizeof(MqttSubscribe));
    
    
    int i = 0;
    mqttCtx->topics[i].topic_filter = topic->topic_name;
    mqttCtx->topics[i].qos = topic->qos;

    /* Subscribe Topic */
    mqttCtx->subscribe.packet_id = mqtt_get_packetid_threadsafe(s);
    mqttCtx->subscribe.topic_count =
            sizeof(mqttCtx->topics) / sizeof(MqttTopic);
    mqttCtx->subscribe.topics = mqttCtx->topics;

    do {
        rc = MqttClient_Subscribe(&mqttCtx->client, &mqttCtx->subscribe);
    } while(rc == MQTT_CODE_CONTINUE);

    return rc;
}

Haven't done the same for Ping though. I'll adjust that

@BlimbBoat
Copy link
Author

BlimbBoat commented Jan 17, 2025

Scenario 1

Same issue

Scenario 2 with wm_Sem

It seems like after mqtt_get_packetid_threadsafe(s). mqttCtx gets zeroed in its entirety

I don't really undestand what is going on with scenario 2, but my guess is that resolving that issue would just lead back to Scenario 1. Not confident about this assumption though.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants