-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MQTT5 Support #8
Comments
Yes! The idea is to support it at some point. I'm not yet sure how integrated it should be tough. Should there be different clients or is it integrated that just a setting can be changed to enable protocol v5? Since I'm also working on https://github.com/256dpi/gomqtt, I'm planning to implement it there first and port the learnings over to lwmqtt. I also have some more ideas to improve the current implementation, maybe these come first to create a stable interface. Anyhow, I'm happy if people want to contribute and push this forward, as I'm a little busy at the moment. |
I updated my haskell MQTT library to support MQTT5 and had the same concern initially. I wasn't sure whether to just make a whole new library or whether I could smash everything together. Turns out, you can model an API for v5 and have it degrade to v3.1.1 pretty easily. In practice, all the things move properties around (a couple data types do it in a bit of gross way, but that's deep down in the parsers). Some of the requests and responses are richer. I connect at a given protocol and then dispatch my serialization and parsers at the protocol. For 3.1.1, I synthesize bits that aren't in the protocol so clients can write code for v5 and have it work reliably against 3.1.1, even though there's a bit of feature degradation. (also, happy anniversary to this FR. 😄) |
I just started hacking something together on this. The codebase is pretty nice, so it's relatively straightforward to add stuff. |
@dustin Thanks for looking into this! |
I only got about an hour into this last night. I've got a way to go, but it's promising: |
I spent a bit of time tonight on tests. All of the tests in my client are done via QuickCheck which allows me to describe valid input and and then have it generate random scenarios and let me run test sequences against them. I can both generate and parse packets in both directions, so I generate a sampling of every possible packet and verify I can serialize it and then parse it into the exact same thing each time. This finds lots of bugs very quickly in practice. I didn't want to hand write out a bunch of packets, so I wrote a little program to generate a set of tests for the stuff I'm working on and verify that the protocol encoder generates the expected packet for a given arbitrary input. It gets... some of them right. 90% of the tests I'm generating are currently failing in the 5.0 publish encoder. I've tested a few of these interactively, but there are scenarios I didn't try. (Oops, while typing this message, I figured out a mistake I made in a few property encodings that the test picked up so now they all work). The tests aren't beautiful, but writing one type of this saves me massive time and hits a wide surface. https://gist.github.com/dustin/796323d5d128c5ba79533f6682c02243 (also note that none of the 3.1.1 tests failed, so... good job 😄) |
I've not had a lot of time to work on this in the past few days, but I do think I've got a few things done quite thoroughly:
I'm testing a variety of encoding the above (and decoding pub) for both 3.1.1 and 5.0. Lots of weird edge cases came up (e.g., in 3.1.1, passwords can't be present if a username isn't, but they can in 5.0). Protocol compatibility is staying pretty clean. The API can't be compatible, though. Most things take a collection of properties and return values are different (though I've not really got into that yet). e.g., I'm trying to avoid too many compromises, but I'm overall confident that we can have a correct and tight MQTT 3.1.1 and MQTT 5 implementation based on the great work already done here.
|
I've mainly got the new parsers for QoS > 0 exchanges to do and we're good. My repo's a little bit of a mess. I'm not sure how much you care about that. Mostly generated tests and the test generator. I could split that stuff out:
There are still a lot of things that are possible that aren't quite exposed (e.g., unsubscribe can report per-topic failures, which bubble up, but also properties which can contain a useful error message). Since pretty much every packet has properties in and out, that can be mildly annoying if you don't care, but it's the primary reason I want v5 at this point. :) |
For an idea of where I am right now. This is sync.c#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <lwmqtt/unix.h>
#define COMMAND_TIMEOUT 5000
#define MESSAGE_TIMEOUT 1000
lwmqtt_unix_network_t network = {0};
lwmqtt_unix_timer_t timer1, timer2, timer3;
lwmqtt_client_t client;
static void prop_byte_printer(void *ref, lwmqtt_prop_t prop, uint8_t value) {
printf(" Property %x (byte): 0x%x\n", prop, value);
}
static void prop_int16_printer(void *ref, lwmqtt_prop_t prop, int16_t value) {
printf(" Property %x (int): %d\n", prop, value);
}
static void prop_int32_printer(void *ref, lwmqtt_prop_t prop, int32_t value) {
printf(" Property %x (int32): %d\n", prop, value);
}
static void prop_str_printer(void *ref, lwmqtt_prop_t prop, lwmqtt_string_t value) {
printf(" Property %x (string): %.*s\n", prop, (int)value.len, value.data);
}
static void prop_user_printer(void *ref, lwmqtt_string_t k, lwmqtt_string_t v) {
printf(" User property: k=%.*s, v=%.*s\n", (int)k.len, k.data, (int)v.len, v.data);
}
static void message_arrived(lwmqtt_client_t *client, void *ref, lwmqtt_string_t topic, lwmqtt_message_t msg,
lwmqtt_serialized_properties_t props) {
printf("message_arrived: %.*s => %.*s (%d)\n", (int)topic.len, topic.data, (int)msg.payload_len, (char *)msg.payload,
(int)msg.payload_len);
lwmqtt_property_callbacks_t cb = {
.byte_prop = prop_byte_printer,
.int16_prop = prop_int16_printer,
.int32_prop = prop_int32_printer,
.str_prop = prop_str_printer,
.user_prop = prop_user_printer,
};
if (lwmqtt_property_visitor(NULL, props, cb) != LWMQTT_SUCCESS) {
exit(1);
}
}
int main() {
// initialize client
lwmqtt_init(&client, malloc(512), 512, malloc(512), 512);
// configure client
lwmqtt_set_network(&client, &network, lwmqtt_unix_network_read, lwmqtt_unix_network_write);
lwmqtt_set_timers(&client, &timer1, &timer2, lwmqtt_unix_timer_set, lwmqtt_unix_timer_get);
lwmqtt_set_callback(&client, NULL, message_arrived);
lwmqtt_set_protocol(&client, LWMQTT_MQTT5);
// configure message time
lwmqtt_unix_timer_set(&timer3, MESSAGE_TIMEOUT);
// connect to broker
lwmqtt_err_t err = lwmqtt_unix_network_connect(&network, "localhost", 1883);
if (err != LWMQTT_SUCCESS) {
printf("failed lwmqtt_unix_network_connect: %d\n", err);
exit(1);
}
// prepare options
lwmqtt_options_t options = lwmqtt_default_options;
options.client_id = lwmqtt_string("lwmqtt");
options.username = lwmqtt_string("try");
options.password = lwmqtt_string("try");
options.keep_alive = 5;
// send connect packet
lwmqtt_return_code_t return_code;
err = lwmqtt_connect(&client, options, NULL, &return_code, COMMAND_TIMEOUT);
if (err != LWMQTT_SUCCESS) {
printf("failed lwmqtt_connect: %d (%d)\n", err, return_code);
exit(1);
}
// log
printf("connected!\n");
// subscribe to topic
lwmqtt_sub_options_t subopts = lwmqtt_default_sub_options;
err = lwmqtt_subscribe_one(&client, lwmqtt_string("hello"), subopts, COMMAND_TIMEOUT);
if (err != LWMQTT_SUCCESS) {
printf("failed lwmqtt_subscribe: %d\n", err);
exit(1);
}
// loop forever
for (;;) {
// check if data is available
size_t available = 0;
err = lwmqtt_unix_network_peek(&network, &available);
if (err != LWMQTT_SUCCESS) {
printf("failed lwmqtt_unix_network_peek: %d\n", err);
exit(1);
}
// process data if available
if (available > 0) {
err = lwmqtt_yield(&client, available, COMMAND_TIMEOUT);
if (err != LWMQTT_SUCCESS) {
printf("failed lwmqtt_yield: %d\n", err);
exit(1);
}
}
// keep connection alive
err = lwmqtt_keep_alive(&client, COMMAND_TIMEOUT);
if (err != LWMQTT_SUCCESS) {
printf("failed lwmqtt_keep_alive: %d\n", err);
exit(1);
}
// check if message is due
if (lwmqtt_unix_timer_get(&timer3) <= 0) {
// prepare message
lwmqtt_message_t msg = {.qos = LWMQTT_QOS0, .retained = true, .payload = (uint8_t *)("world"), .payload_len = 5};
// publish message
lwmqtt_property_t proplist[] = {
{.prop = LWMQTT_PROP_MESSAGE_EXPIRY_INTERVAL, .value = {.int32 = 30}},
{.prop = LWMQTT_PROP_USER_PROPERTY,
.value = {.pair = {.k = lwmqtt_string("hello from"), .v = lwmqtt_string("lwmqtt")}}},
};
lwmqtt_properties_t props = {2, (lwmqtt_property_t *)&proplist};
err = lwmqtt_publish(&client, lwmqtt_string("hello"), msg, props, COMMAND_TIMEOUT);
if (err != LWMQTT_SUCCESS) {
printf("failed lwmqtt_keep_alive: %d\n", err);
exit(1);
}
// reset timer
lwmqtt_unix_timer_set(&timer3, MESSAGE_TIMEOUT);
}
// sleep for 100ms
usleep(100 * 1000);
}
} output:
|
Is MQTT v5 support planned?
The text was updated successfully, but these errors were encountered: