Skip to content

Commit

Permalink
Add options to qos 1 subscription (#19)
Browse files Browse the repository at this point in the history
* Add options to qos 1 subscription

* Add logs and fix type issues

* Add non_retry and non_persistence in fsm

* Fix dialyzer issues

* Define new fields in vmq_msg

* Fix vmq_bridge SUITE

* Add tuple type to topics spec

* Add clauses to subtopics

* Fix bridge subscription error and tests

* Remove unwanted files

* Add clause to tracer ftopics

* Add tests for qos1 opts

* Add metrics for qos1 subscriptions

* Add metrics for dropped messages

* Apply erlfmt

* Fix PR comments

* Fix dialyzer errors

* Resolve PR comments

* Logs cleanup

---------

Co-authored-by: dhruvjain99 <[email protected]>
  • Loading branch information
dumbpreacher and dhruvjain99 authored Apr 13, 2023
1 parent 6a4b245 commit e97d375
Show file tree
Hide file tree
Showing 16 changed files with 541 additions and 57 deletions.
6 changes: 5 additions & 1 deletion apps/vmq_commons/src/packet.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
gen_unsuback/1,
gen_pingreq/0,
gen_pingresp/0,
gen_disconnect/0
gen_disconnect/0,
gen_subscribe/5
]).

expect_packet(Socket, Name, Expected) ->
Expand Down Expand Up @@ -133,6 +134,9 @@ gen_subscribe(MId, Topics) ->
gen_subscribe(MId, Topic, Qos) ->
vmq_parser:gen_subscribe(MId, Topic, Qos).

gen_subscribe(MId, Topic, Qos, NonRetry, NonPersistence) ->
vmq_parser:gen_subscribe(MId, Topic, Qos, NonRetry, NonPersistence).

gen_suback(Mid, Qos) ->
vmq_parser:gen_suback(Mid, Qos).

Expand Down
55 changes: 53 additions & 2 deletions apps/vmq_commons/src/vmq_parser.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
gen_pubcomp/1,
gen_subscribe/2,
gen_subscribe/3,
gen_subscribe/5,
gen_suback/2,
gen_unsubscribe/2,
gen_unsuback/1,
Expand Down Expand Up @@ -261,12 +262,22 @@ parse_topics(<<>>, _, []) ->
{error, no_topic_provided};
parse_topics(<<>>, _, Topics) ->
{ok, Topics};
parse_topics(<<L:16/big, Topic:L/binary, 0:6, QoS:2, Rest/binary>>, ?SUBSCRIBE = Sub, Acc) when
parse_topics(
<<L:16/big, Topic:L/binary, 0:4, NonRetry:1, NonPersistence:1, QoS:2, Rest/binary>>,
?SUBSCRIBE = Sub,
Acc
) when
(QoS >= 0) and (QoS < 3)
->
case vmq_topic:validate_topic(subscribe, Topic) of
{ok, ParsedTopic} ->
parse_topics(Rest, Sub, [{ParsedTopic, QoS} | Acc]);
T = #mqtt_subscribe_topic{
topic = ParsedTopic,
qos = QoS,
non_retry = to_bool(NonRetry),
non_persistence = to_bool(NonPersistence)
},
parse_topics(Rest, Sub, [T | Acc]);
E ->
E
end;
Expand Down Expand Up @@ -385,6 +396,24 @@ serialise_len(N) when N =< ?LOWBITS ->
serialise_len(N) ->
<<1:1, (N rem ?HIGHBIT):7, (serialise_len(N div ?HIGHBIT))/binary>>.

serialise_topics(
?SUBSCRIBE = Sub,
[
#mqtt_subscribe_topic{
topic = Topic,
qos = QoS,
non_persistence = NP,
non_retry = NR
}
| Rest
],
Acc
) ->
NonRetry = to_int(NR),
NonPersistence = to_int(NP),
serialise_topics(Sub, Rest, [
utf8(vmq_topic:unword(Topic)), <<0:4, NonRetry:1, NonPersistence:1, QoS:2>> | Acc
]);
serialise_topics(?SUBSCRIBE = Sub, [{Topic, QoS} | Rest], Acc) ->
serialise_topics(Sub, Rest, [utf8(vmq_topic:unword(Topic)), <<0:6, QoS:2>> | Acc]);
serialise_topics(?UNSUBSCRIBE = Sub, [Topic | Rest], Acc) ->
Expand Down Expand Up @@ -487,9 +516,21 @@ gen_pubcomp(MId) ->

gen_subscribe(MId, [{_, _} | _] = Topics) ->
BinTopics = [{ensure_binary(Topic), QoS} || {Topic, QoS} <- Topics],
iolist_to_binary(serialise(#mqtt_subscribe{topics = BinTopics, message_id = MId}));
gen_subscribe(MId, [#mqtt_subscribe_topic{} | _] = Topics) ->
BinTopics = [
E#mqtt_subscribe_topic{topic = ensure_binary(T)}
|| #mqtt_subscribe_topic{topic = T} = E <- Topics
],
iolist_to_binary(serialise(#mqtt_subscribe{topics = BinTopics, message_id = MId})).
gen_subscribe(MId, Topic, QoS) ->
gen_subscribe(MId, [{Topic, QoS}]).
gen_subscribe(MId, Topic, QoS, NonRetry, NonPersistence) ->
gen_subscribe(MId, [
#mqtt_subscribe_topic{
topic = Topic, qos = QoS, non_retry = NonRetry, non_persistence = NonPersistence
}
]).

gen_suback(MId, QoSs) when is_list(QoSs) ->
iolist_to_binary(serialise(#mqtt_suback{qos_table = QoSs, message_id = MId}));
Expand All @@ -512,3 +553,13 @@ gen_pingresp() ->

gen_disconnect() ->
iolist_to_binary(serialise(#mqtt_disconnect{})).

to_bool(1) ->
true;
to_bool(0) ->
false.

to_int(true) ->
1;
to_int(false) ->
0.
4 changes: 3 additions & 1 deletion apps/vmq_commons/src/vmq_types_common.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
properties = #{} :: properties(),
expiry_ts ::
undefined
| msg_expiry_ts()
| msg_expiry_ts(),
non_retry = false :: flag(),
non_persistence = false :: flag()
}).
-type msg() :: #vmq_msg{}.
-endif.
11 changes: 10 additions & 1 deletion apps/vmq_commons/src/vmq_types_mqtt.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,18 @@
}).
-type mqtt_pubcomp() :: #mqtt_pubcomp{}.

-record(mqtt_subscribe_topic, {
topic :: topic(),
qos :: qos(),
non_retry = false :: flag(),
non_persistence = false :: flag()
}).

-type mqtt_subscribe_topic() :: #mqtt_subscribe_topic{}.

-record(mqtt_subscribe, {
message_id :: msg_id(),
topics = [] :: [{topic(), qos()}]
topics = [] :: [mqtt_subscribe_topic() | {topic(), qos()}]
}).
-type mqtt_subscribe() :: #mqtt_subscribe{}.

Expand Down
11 changes: 8 additions & 3 deletions apps/vmq_server/src/vmq_cluster_com.erl
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ to_vmq_msgs(Msgs) ->
to_vmq_msg(#vmq_msg{} = Msg) ->
Msg;
to_vmq_msg(
{vmq_msg, MsgRef, RoutingKey, Payload, Retain, Dup, QoS, Mountpoint, Persisted, SGPolicy}
{vmq_msg, MsgRef, RoutingKey, Payload, Retain, Dup, QoS, Mountpoint, Persisted, SGPolicy,
NonRetry, NonPersistence}
) ->
%% Pre-MQTT5 msg record. Fill in the missing ones.
#vmq_msg{
Expand All @@ -235,7 +236,9 @@ to_vmq_msg(
persisted = Persisted,
sg_policy = SGPolicy,
properties = #{},
expiry_ts = undefined
expiry_ts = undefined,
non_retry = NonRetry,
non_persistence = NonPersistence
};
to_vmq_msg(InMsg) when
is_tuple(InMsg),
Expand All @@ -255,5 +258,7 @@ to_vmq_msg(InMsg) when
persisted = element(9, InMsg),
sg_policy = element(10, InMsg),
properties = element(11, InMsg),
expiry_ts = element(12, InMsg)
expiry_ts = element(12, InMsg),
non_retry = element(13, InMsg),
non_persistence = element(14, InMsg)
}.
79 changes: 78 additions & 1 deletion apps/vmq_server/src/vmq_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
incr_mqtt_unsubscribe_received/0,
incr_mqtt_pingreq_received/0,
incr_mqtt_disconnect_received/0,
incr_qos1_opts/1,

incr_mqtt_publish_sent/0,
incr_mqtt_publishes_sent/1,
Expand All @@ -57,6 +58,9 @@
incr_mqtt_error_invalid_pubrec/0,
incr_mqtt_error_invalid_pubcomp/0,

incr_qos1_non_retry_message_dropped/0,
incr_qos1_non_persistence_message_dropped/0,

incr_mqtt_error_publish/0,
incr_mqtt_error_subscribe/0,
incr_mqtt_error_unsubscribe/0,
Expand Down Expand Up @@ -237,6 +241,12 @@ incr_mqtt_error_subscribe() ->
incr_mqtt_error_unsubscribe() ->
incr_item(?MQTT4_UNSUBSCRIBE_ERROR, 1).

incr_qos1_non_retry_message_dropped() ->
incr_item(?QOS1_NON_RETRY_DROPPED, 1).

incr_qos1_non_persistence_message_dropped() ->
incr_item(?QOS1_NON_PERSISTENCE_DROPPED, 1).

incr_sidecar_events(HookName) ->
incr_item({?SIDECAR_EVENTS, HookName}, 1).

Expand Down Expand Up @@ -305,6 +315,9 @@ incr_redis_stale_cmd({CMD, OPERATION}) ->
incr_unauth_redis_cmd({CMD, OPERATION}) ->
incr_item({?UNAUTH_REDIS_CMD, CMD, OPERATION}, 1).

incr_qos1_opts({NON_RETRY, NON_PERSISTENCE}) ->
incr_item({?QOS1_SUBSCRIPTION_OPTS, NON_RETRY, NON_PERSISTENCE}, 1).

incr(Entry) ->
incr_item(Entry, 1).

Expand Down Expand Up @@ -1465,6 +1478,64 @@ counter_entries_def() ->
client_keepalive_expired,
<<"The number of clients which failed to communicate within the keepalive time period.">>
),
m(
counter,
[
{mqtt_version, "4"},
{non_persistence, rcn_to_str(?NON_PERSISTENCE)},
{non_retry, rcn_to_str(?NON_RETRY)}
],
{?QOS1_SUBSCRIPTION_OPTS, ?NON_PERSISTENCE, ?NON_RETRY},
qos1_subscription_opts,
<<"QoS 1 opts in subscription.">>
),
m(
counter,
[
{mqtt_version, "4"},
{non_persistence, rcn_to_str(?NON_PERSISTENCE)},
{non_retry, rcn_to_str(?RETRY)}
],
{?QOS1_SUBSCRIPTION_OPTS, ?NON_PERSISTENCE, ?RETRY},
qos1_subscription_opts,
<<"QoS 1 opts in subscription.">>
),
m(
counter,
[
{mqtt_version, "4"},
{non_persistence, rcn_to_str(?PERSISTENCE)},
{non_retry, rcn_to_str(?NON_RETRY)}
],
{?QOS1_SUBSCRIPTION_OPTS, ?PERSISTENCE, ?NON_RETRY},
qos1_subscription_opts,
<<"QoS 1 opts in subscription.">>
),
m(
counter,
[
{mqtt_version, "4"},
{non_persistence, rcn_to_str(?PERSISTENCE)},
{non_retry, rcn_to_str(?RETRY)}
],
{?QOS1_SUBSCRIPTION_OPTS, ?PERSISTENCE, ?RETRY},
qos1_subscription_opts,
<<"QoS 1 opts in subscription.">>
),
m(
counter,
[{mqtt_version, "4"}],
qos1_non_retry_dropped,
qos1_non_retry_dropped,
<<"QoS 1 non_retry messages dropped.">>
),
m(
counter,
[{mqtt_version, "4"}],
qos1_non_persistence_dropped,
qos1_non_persistence_dropped,
<<"QoS 1 non_persistence messages dropped.">>
),

m(
counter,
Expand Down Expand Up @@ -2677,7 +2748,13 @@ met2idx({?REDIS_CMD_MISS, ?DEL, ?MSG_STORE_DELETE}) -> 303;
met2idx({?REDIS_CMD_MISS, ?FIND, ?MSG_STORE_FIND}) -> 304;
met2idx({?REDIS_CMD, ?LPOP, ?MSG_STORE_DELETE}) -> 305;
met2idx({?REDIS_CMD_ERROR, ?LPOP, ?MSG_STORE_DELETE}) -> 306;
met2idx({?REDIS_CMD_MISS, ?LPOP, ?MSG_STORE_DELETE}) -> 307.
met2idx({?REDIS_CMD_MISS, ?LPOP, ?MSG_STORE_DELETE}) -> 307;
met2idx({?QOS1_SUBSCRIPTION_OPTS, ?NON_RETRY, ?NON_PERSISTENCE}) -> 308;
met2idx({?QOS1_SUBSCRIPTION_OPTS, ?RETRY, ?NON_PERSISTENCE}) -> 309;
met2idx({?QOS1_SUBSCRIPTION_OPTS, ?NON_RETRY, ?PERSISTENCE}) -> 310;
met2idx({?QOS1_SUBSCRIPTION_OPTS, ?RETRY, ?PERSISTENCE}) -> 311;
met2idx(?QOS1_NON_RETRY_DROPPED) -> 312;
met2idx(?QOS1_NON_PERSISTENCE_DROPPED) -> 313.

-ifdef(TEST).
clear_stored_rates() ->
Expand Down
7 changes: 7 additions & 0 deletions apps/vmq_server/src/vmq_metrics.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,10 @@
-define(READ, read).
-define(DELETE_ALL, delete_all).
-define(DELETE, delete).
-define(QOS1_SUBSCRIPTION_OPTS, qos1_subscription_opts).
-define(NON_PERSISTENCE, true).
-define(NON_RETRY, true).
-define(PERSISTENCE, false).
-define(RETRY, false).
-define(QOS1_NON_RETRY_DROPPED, qos1_non_retry_dropped).
-define(QOS1_NON_PERSISTENCE_DROPPED, qos1_non_persistence_dropped).
Loading

0 comments on commit e97d375

Please sign in to comment.