Skip to content

Commit

Permalink
new func subx. add QoS docs. Fixes #56 (#67)
Browse files Browse the repository at this point in the history
Addition of subx to allow user to choose quality of service level
Docs on quality of service levels (for publishing and subscribing) - affects performance/quality/throughput
Prev sub method hardcoded with a QoS level.
Existing sub functionality not changed (calls subx).
  • Loading branch information
sshanks-kx authored Mar 14, 2023
1 parent c4a8ed8 commit bc34e09
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 6 deletions.
51 changes: 48 additions & 3 deletions docs/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ They allow you to interact with MQTT brokers and send and receive messages.
[`pub`](#mqttpub) publish a message to a topic<br>
[`pubx`](#mqttpubx) publish a message to a topic controlling qos and ret<br>
[`sub`](#mqttsub) subscribe to a topic<br>
[`subx`](#mqttsubx) subscribe to a topic<br>
[`unsub`](#mqttunsub) unsubscribe from a topic

**Callback functions**<br>
Expand Down Expand Up @@ -209,7 +210,7 @@ q).mqtt.pub[`topic1;"This is a test message"];
>
> Where
>
> 1. `kqos` is set to 1. The broker/client will deliver the message at least once, with confirmation required.
> 1. `kqos` is set to 1. The broker/client will deliver the message at least once, with ack messages required.
> 2. `kret` is set to `0b`. Messages are not retained after sending.

Expand All @@ -225,7 +226,10 @@ Where

- `topic` is a symbol denoting the topic that the message is to be sent to
- `msg` is a string of the message being sent to the broker
- `kqos` is an long denoting the quality of service to be used
- `kqos` is an long denoting the quality of service to be used. The higher the quality of service, the lower the throughput/performance. Possible values are:
- 0 (QoS 0 - at most once) comparable to TCP
- 1 (Qos 1 - at least once) as with QoS 0 but with use of ack message for each message sent
- 2 (QoS 2 - only once) as with QoS 1 but with use of multiple request/response flows for each message
- `kret` is a boolean denoting if published messages are to be retained

returns a callback to the process stating that the message has been sent to the broker.
Expand All @@ -248,7 +252,7 @@ _Subscribe to a topic on a Mosquitto broker process_
.mqtt.sub topic
```

Where `topic` is a symbol denoting the topic that the process should listen to, returns a callback to the process when a message is received on topic stating that the message was received and what that message is.
Where `topic` is a symbol denoting the topic that the process should listen to. Received msgs on that topic can then be processed via the callback [.mqtt.msgrcvd](mqttmsgrecv)

```q
// Connect to the host broker and publish a message
Expand All @@ -259,6 +263,47 @@ q).mqtt.sub[`topic1]
(`msgrcvd;"topic1";"This is a test message")
```

> This function is a projection of the function `.mqtt.pubx` defined below.
>
> Where
>
> 1. `kqos` is set to 1. The broker/client will deliver the message at least once, with confirmation messages sent to the broker from the subscriber for each message received.
## `.mqtt.subx`

_Subscribe to a topic on a Mosquitto broker process, controlling quality of service_

```txt
.mqtt.subx[topic;kqos]
```

Where `

- `topic` is a symbol denoting the topic that the process should listen to. Received msgs on that topic can then be processed via the callback [.mqtt.msgrcvd](mqttmsgrecv)
- `kqos` is an long denoting the quality of service to be used. The higher the quality of service, the lower the throughput/performance. Possible values are:
- 0 (QoS 0 - at most once) comparable to TCP
- 1 (Qos 1 - at least once) as with QoS 0 but with use of ack message for each message sent
- 2 (QoS 2 - only once) as with QoS 1 but with use of multiple request/response flows for each message

> Note that the subscriber QoS can downgrade the intended Qos from the publisher, but not upgrade it. For example, when publisher subscriber using QoS 0 or 1 with a Mosquitto broker:
>
> | Pub QoS | Sub QoS | Acks From Broker to Pub | Acks from Sub to Broker |
> | --- | --- | --- | --- |
> |0|0|No|No|
> |1|0|Yes|No|
> |0|1|No|No|
> |1|1|Yes|Yes|
>
> In this scenario, when the publisher uses QoS 1 and the subscriber has less requirements for QoS (ie. can set to 0), the subscriber will gain better performance.
```q
// Connect to the host broker and publish a message
q).mqtt.conn[`$"tcp://localhost:1883";`rcv]
// Subscribe to topic1 and recieve a message sent to that topic
q).mqtt.subx[`topic1;0]
(`msgrcvd;"topic1";"This is a test message")
```

## `.mqtt.unsub`

Expand Down
3 changes: 2 additions & 1 deletion q/mqtt.q
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
connX:`mqttkdb 2:(`connX;3);
init :`mqttkdb 2:(`init;1);
pubx :`mqttkdb 2:(`pub ;4);
sub :`mqttkdb 2:(`sub ;1);
subx :`mqttkdb 2:(`sub ;2);
unsub:`mqttkdb 2:(`unsub;1);
isConnected:`mqttkdb 2:(`isConnected;1);
disconnect :`mqttkdb 2:(`disconnect;1);


pub:.mqtt.pubx[;;1;0];
sub:.mqtt.subx[;1];
conn:{[tcpconn;pname;opt]connX[tcpconn;pname](enlist[`]!enlist(::)),opt};
msgsent:{0N!(`msgsent;x);};
disconn:{0N!(`disconn;x);};
Expand Down
18 changes: 16 additions & 2 deletions src/mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -279,13 +279,27 @@ EXP K pub(K topic, K msg, K kqos, K kret){
/* Subscribe to a topic
* topic = topic name as a symbol
*/
EXP K sub(K topic){
EXP K sub(K topic,K kqos){
int err;
long qos;
if(topic->t != -KS)
return krr("topic type");
if(client == 0)
return krr("not connected");
if(MQTTCLIENT_SUCCESS != (err = MQTTClient_subscribe(client, topic->s, 1)))
switch(kqos->t){
case -KH:
qos = kqos->h;
break;
case -KI:
qos = kqos->i;
break;
case -KJ:
qos = kqos->j;
break;
default:
return krr("qos type");
}
if(MQTTCLIENT_SUCCESS != (err = MQTTClient_subscribe(client, topic->s, qos)))
return krr((S)MQTTClient_strerror(err));
return (K)0;
}
Expand Down

0 comments on commit bc34e09

Please sign in to comment.