Skip to content

Commit

Permalink
Fixes for EMQ X
Browse files Browse the repository at this point in the history
Fix for EMQ X which does not allow subscribing to `#` and occasionally publishes to $SYS.

Other fixes include waiting for the proper client connection to finish subscribing before validating.
  • Loading branch information
MikeDombo committed Feb 10, 2022
1 parent 4654466 commit 35ee775
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions interoperability/client_test5.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ def connectionLost(self, cause):
logging.info("connectionLost %s" % str(cause))

def publishArrived(self, topicName, payload, qos, retained, msgid, properties=None):
if topicName.startswith("$SYS/"):
return True
logging.info("publishArrived %s %s %d %s %d %s", topicName, payload, qos, retained, msgid, str(properties))
self.messages.append((topicName, payload, qos, retained, msgid, properties))
self.messagedicts.append({"topicname" : topicName, "payload" : payload,
Expand All @@ -69,10 +71,12 @@ def cleanRetained():
curclient = mqtt_client.Client("clean retained".encode("utf-8"))
curclient.registerCallback(callback)
curclient.connect(host=host, port=port, cleanstart=True)
curclient.subscribe(["#"], [MQTTV5.SubscribeOptions(0)])
# Not all brokers (EMQ X) allow us to subscribe to #, so subscribe to + and +/# to accomplish the same
curclient.subscribe(["+"], [MQTTV5.SubscribeOptions(0)])
curclient.subscribe(["+/#"], [MQTTV5.SubscribeOptions(0)])
time.sleep(2) # wait for all retained messages to arrive
for message in callback.messages:
logging.info("deleting retained message for topic", message[0])
logging.info("deleting retained message for topic %s", message[0])
curclient.publish(message[0], b"", 0, retained=True)
curclient.disconnect()
time.sleep(.1)
Expand Down Expand Up @@ -339,7 +343,8 @@ def test_subscribe_failure(self):
time.sleep(1)
# subscribeds is a list of (msgid, [qos])
logging.info(callback.subscribeds)
assert callback.subscribeds[0][1][0].value == 0x80, "return code should be 0x80 %s" % callback.subscribeds
self.assertEqual(callback.subscribeds[0][1][0].value, 0x80,
"return code should be 0x80 %s" % callback.subscribeds)
except:
traceback.print_exc()
succeeded = False
Expand Down Expand Up @@ -551,7 +556,7 @@ def test_subscribe_options(self):
aclient.subscribe([topics[0]], [MQTTV5.SubscribeOptions(2, noLocal=True)])
self.waitfor(callback.subscribeds, 1, 3)
bclient.subscribe([topics[0]], [MQTTV5.SubscribeOptions(2, noLocal=True)])
self.waitfor(callback.subscribeds, 1, 3)
self.waitfor(callback2.subscribeds, 1, 3)
aclient.publish(topics[0], b"noLocal test", 1, retained=False)

self.waitfor(callback2.messages, 1, 3)
Expand Down Expand Up @@ -651,6 +656,7 @@ def test_subscribe_identifiers(self):
sub_properties.clear()
sub_properties.SubscriptionIdentifier = 3
bclient.subscribe([topics[0]+"/#"], [MQTTV5.SubscribeOptions(2)], properties=sub_properties)
self.waitfor(callback2.subscribeds, 2, 3)

bclient.publish(topics[0], b"sub identifier test", 1, retained=False)

Expand All @@ -661,7 +667,7 @@ def test_subscribe_identifiers(self):

self.waitfor(callback2.messages, 1, 3)
self.assertEqual(len(callback2.messages), 1, callback2.messages)
expected_subsids = set([2, 3])
expected_subsids = {2, 3}
received_subsids = set(callback2.messages[0][5].SubscriptionIdentifier)
self.assertEqual(received_subsids, expected_subsids, received_subsids)
bclient.disconnect()
Expand All @@ -679,7 +685,7 @@ def test_request_response(self):
self.waitfor(callback.subscribeds, 1, 3)

bclient.subscribe([topics[0]], [MQTTV5.SubscribeOptions(2, noLocal=True)])
self.waitfor(callback.subscribeds, 1, 3)
self.waitfor(callback2.subscribeds, 1, 3)

publish_properties = MQTTV5.Properties(MQTTV5.PacketTypes.PUBLISH)
publish_properties.ResponseTopic = topics[0]
Expand Down

0 comments on commit 35ee775

Please sign in to comment.