From bc34e090339073de99aa54d577cb4370092664b5 Mon Sep 17 00:00:00 2001
From: Simon Shanks <59612559+sshanks-kx@users.noreply.github.com>
Date: Tue, 14 Mar 2023 12:18:10 +0000
Subject: [PATCH] new func subx. add QoS docs. Fixes #56 (#67)
Addition of subx to allow user to choose quality of service level
Docs on quality of service levels (for publishing and subscribing) - affects performance/quality/throughput
Prev sub method hardcoded with a QoS level.
Existing sub functionality not changed (calls subx).
---
docs/reference.md | 51 ++++++++++++++++++++++++++++++++++++++++++++---
q/mqtt.q | 3 ++-
src/mqtt.c | 18 +++++++++++++++--
3 files changed, 66 insertions(+), 6 deletions(-)
diff --git a/docs/reference.md b/docs/reference.md
index 9356023..4e13819 100644
--- a/docs/reference.md
+++ b/docs/reference.md
@@ -13,6 +13,7 @@ They allow you to interact with MQTT brokers and send and receive messages.
[`pub`](#mqttpub) publish a message to a topic
[`pubx`](#mqttpubx) publish a message to a topic controlling qos and ret
[`sub`](#mqttsub) subscribe to a topic
+[`subx`](#mqttsubx) subscribe to a topic
[`unsub`](#mqttunsub) unsubscribe from a topic
**Callback functions**
@@ -209,7 +210,7 @@ q).mqtt.pub[`topic1;"This is a test message"];
>
> Where
>
-> 1. `kqos` is set to 1. The broker/client will deliver the message at least once, with confirmation required.
+> 1. `kqos` is set to 1. The broker/client will deliver the message at least once, with ack messages required.
> 2. `kret` is set to `0b`. Messages are not retained after sending.
@@ -225,7 +226,10 @@ Where
- `topic` is a symbol denoting the topic that the message is to be sent to
- `msg` is a string of the message being sent to the broker
-- `kqos` is an long denoting the quality of service to be used
+- `kqos` is an long denoting the quality of service to be used. The higher the quality of service, the lower the throughput/performance. Possible values are:
+ - 0 (QoS 0 - at most once) comparable to TCP
+ - 1 (Qos 1 - at least once) as with QoS 0 but with use of ack message for each message sent
+ - 2 (QoS 2 - only once) as with QoS 1 but with use of multiple request/response flows for each message
- `kret` is a boolean denoting if published messages are to be retained
returns a callback to the process stating that the message has been sent to the broker.
@@ -248,7 +252,7 @@ _Subscribe to a topic on a Mosquitto broker process_
.mqtt.sub topic
```
-Where `topic` is a symbol denoting the topic that the process should listen to, returns a callback to the process when a message is received on topic stating that the message was received and what that message is.
+Where `topic` is a symbol denoting the topic that the process should listen to. Received msgs on that topic can then be processed via the callback [.mqtt.msgrcvd](mqttmsgrecv)
```q
// Connect to the host broker and publish a message
@@ -259,6 +263,47 @@ q).mqtt.sub[`topic1]
(`msgrcvd;"topic1";"This is a test message")
```
+> This function is a projection of the function `.mqtt.pubx` defined below.
+>
+> Where
+>
+> 1. `kqos` is set to 1. The broker/client will deliver the message at least once, with confirmation messages sent to the broker from the subscriber for each message received.
+
+## `.mqtt.subx`
+
+_Subscribe to a topic on a Mosquitto broker process, controlling quality of service_
+
+```txt
+.mqtt.subx[topic;kqos]
+```
+
+Where `
+
+- `topic` is a symbol denoting the topic that the process should listen to. Received msgs on that topic can then be processed via the callback [.mqtt.msgrcvd](mqttmsgrecv)
+- `kqos` is an long denoting the quality of service to be used. The higher the quality of service, the lower the throughput/performance. Possible values are:
+ - 0 (QoS 0 - at most once) comparable to TCP
+ - 1 (Qos 1 - at least once) as with QoS 0 but with use of ack message for each message sent
+ - 2 (QoS 2 - only once) as with QoS 1 but with use of multiple request/response flows for each message
+
+> Note that the subscriber QoS can downgrade the intended Qos from the publisher, but not upgrade it. For example, when publisher subscriber using QoS 0 or 1 with a Mosquitto broker:
+>
+> | Pub QoS | Sub QoS | Acks From Broker to Pub | Acks from Sub to Broker |
+> | --- | --- | --- | --- |
+> |0|0|No|No|
+> |1|0|Yes|No|
+> |0|1|No|No|
+> |1|1|Yes|Yes|
+>
+> In this scenario, when the publisher uses QoS 1 and the subscriber has less requirements for QoS (ie. can set to 0), the subscriber will gain better performance.
+
+```q
+// Connect to the host broker and publish a message
+q).mqtt.conn[`$"tcp://localhost:1883";`rcv]
+
+// Subscribe to topic1 and recieve a message sent to that topic
+q).mqtt.subx[`topic1;0]
+(`msgrcvd;"topic1";"This is a test message")
+```
## `.mqtt.unsub`
diff --git a/q/mqtt.q b/q/mqtt.q
index 933d5e0..32987cc 100644
--- a/q/mqtt.q
+++ b/q/mqtt.q
@@ -3,13 +3,14 @@
connX:`mqttkdb 2:(`connX;3);
init :`mqttkdb 2:(`init;1);
pubx :`mqttkdb 2:(`pub ;4);
-sub :`mqttkdb 2:(`sub ;1);
+subx :`mqttkdb 2:(`sub ;2);
unsub:`mqttkdb 2:(`unsub;1);
isConnected:`mqttkdb 2:(`isConnected;1);
disconnect :`mqttkdb 2:(`disconnect;1);
pub:.mqtt.pubx[;;1;0];
+sub:.mqtt.subx[;1];
conn:{[tcpconn;pname;opt]connX[tcpconn;pname](enlist[`]!enlist(::)),opt};
msgsent:{0N!(`msgsent;x);};
disconn:{0N!(`disconn;x);};
diff --git a/src/mqtt.c b/src/mqtt.c
index f110a77..daf7a2c 100644
--- a/src/mqtt.c
+++ b/src/mqtt.c
@@ -279,13 +279,27 @@ EXP K pub(K topic, K msg, K kqos, K kret){
/* Subscribe to a topic
* topic = topic name as a symbol
*/
-EXP K sub(K topic){
+EXP K sub(K topic,K kqos){
int err;
+ long qos;
if(topic->t != -KS)
return krr("topic type");
if(client == 0)
return krr("not connected");
- if(MQTTCLIENT_SUCCESS != (err = MQTTClient_subscribe(client, topic->s, 1)))
+ switch(kqos->t){
+ case -KH:
+ qos = kqos->h;
+ break;
+ case -KI:
+ qos = kqos->i;
+ break;
+ case -KJ:
+ qos = kqos->j;
+ break;
+ default:
+ return krr("qos type");
+ }
+ if(MQTTCLIENT_SUCCESS != (err = MQTTClient_subscribe(client, topic->s, qos)))
return krr((S)MQTTClient_strerror(err));
return (K)0;
}