Skip to content

Commit

Permalink
Add on_connect callback to mqtt listener
Browse files Browse the repository at this point in the history
  • Loading branch information
olekoliinyk committed Dec 17, 2024
1 parent 84b3469 commit 3f259dc
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 13 deletions.
11 changes: 10 additions & 1 deletion nat-lab/bin/mqtt-listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,22 @@ def on_message(_client, _userdata, message):
sys.exit(0)


def on_connect(client, _userdata, _flags, rc, _properties):
if rc == 0:
print("Connected to MQTT Broker")
client.subscribe("meshnet", qos=0)
else:
print(f"Failed to connect with result code: {rc}")
sys.exit(1)


def main(mqtt_broker_host, mqtt_broker_port, mqtt_broker_user, mqtt_broker_password):

mqttc = mqtt.Client(
mqtt.CallbackAPIVersion.VERSION2, client_id="receiver", protocol=mqtt.MQTTv311
)

mqttc.on_connect = on_connect
mqttc.on_message = on_message

mqttc.username_pw_set(
Expand All @@ -32,7 +42,6 @@ def main(mqtt_broker_host, mqtt_broker_port, mqtt_broker_user, mqtt_broker_passw
cert_reqs=ssl.CERT_REQUIRED,
)
mqttc.connect(mqtt_broker_host, port=mqtt_broker_port, keepalive=1)
mqttc.subscribe("meshnet", qos=0)
mqttc.loop_forever()


Expand Down
32 changes: 20 additions & 12 deletions nat-lab/tests/test_notification_center.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ async def run_mqtt_listener(
mqtt_broker_user,
mqtt_broker_password,
):
stdout_buffer = []

async def stdout_callback(output):
print(f"MQTT Listener stdout: {output}")
stdout_buffer.append(output)

async def stderr_callback(output):
print(f"MQTT Listener stderr: {output}")

mqtt_process = await exit_stack.enter_async_context(
connection.create_process([
"python3",
Expand All @@ -67,11 +76,12 @@ async def run_mqtt_listener(
mqtt_broker_port,
mqtt_broker_user,
mqtt_broker_password,
]).run()
]).run(stdout_callback=stdout_callback, stderr_callback=stderr_callback)
)
while not mqtt_process.is_executing():
while not any("Connected to MQTT Broker" in line for line in stdout_buffer):
await sleep(0.1)
print("Waiting for MQTT listener to start executing ...")
print(f"Stdout buffer {stdout_buffer}")
print("MQTT listener is executing ...")
return mqtt_process

Expand Down Expand Up @@ -146,13 +156,12 @@ async def test_nc_register():
assert request_json["device_type"] == response.device_type
assert response.traffic_routing_supported is False

mqtt_payload = mqtt_process.get_stdout()
while "message_id" not in mqtt_payload:
mqtt_stdout = mqtt_process.get_stdout()
while "message_id" not in mqtt_stdout:
await sleep(0.1)
mqtt_payload = mqtt_process.get_stdout()
print("Waiting for MQTT stdout ...")
mqtt_stdout = mqtt_process.get_stdout()

print(f"MQTT stdout: {mqtt_payload}")
mqtt_payload = mqtt_stdout.split("\n")[-1]
assert mqtt_payload
verify_mqtt_payload(mqtt_payload)

Expand Down Expand Up @@ -180,12 +189,11 @@ async def test_nc_register():

assert len(machines) == 0

mqtt_payload = mqtt_process.get_stdout()
while "message_id" not in mqtt_payload:
mqtt_stdout = mqtt_process.get_stdout()
while "message_id" not in mqtt_stdout:
await sleep(0.1)
mqtt_payload = mqtt_process.get_stdout()
print("Waiting for MQTT stdout ...")
mqtt_stdout = mqtt_process.get_stdout()

print(f"MQTT stdout: {mqtt_payload}")
mqtt_payload = mqtt_stdout.split("\n")[-1]
assert mqtt_payload
verify_mqtt_payload(mqtt_payload)

0 comments on commit 3f259dc

Please sign in to comment.