From 3f259dc01b5480c470ddf31fec35a40b206f044a Mon Sep 17 00:00:00 2001 From: Oleksandr Oliinyk Date: Tue, 17 Dec 2024 15:14:10 +0100 Subject: [PATCH] Add on_connect callback to mqtt listener --- nat-lab/bin/mqtt-listener.py | 11 +++++++- nat-lab/tests/test_notification_center.py | 32 ++++++++++++++--------- 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/nat-lab/bin/mqtt-listener.py b/nat-lab/bin/mqtt-listener.py index caae04590..d4aff3b5a 100644 --- a/nat-lab/bin/mqtt-listener.py +++ b/nat-lab/bin/mqtt-listener.py @@ -12,12 +12,22 @@ def on_message(_client, _userdata, message): sys.exit(0) +def on_connect(client, _userdata, _flags, rc, _properties): + if rc == 0: + print("Connected to MQTT Broker") + client.subscribe("meshnet", qos=0) + else: + print(f"Failed to connect with result code: {rc}") + sys.exit(1) + + def main(mqtt_broker_host, mqtt_broker_port, mqtt_broker_user, mqtt_broker_password): mqttc = mqtt.Client( mqtt.CallbackAPIVersion.VERSION2, client_id="receiver", protocol=mqtt.MQTTv311 ) + mqttc.on_connect = on_connect mqttc.on_message = on_message mqttc.username_pw_set( @@ -32,7 +42,6 @@ def main(mqtt_broker_host, mqtt_broker_port, mqtt_broker_user, mqtt_broker_passw cert_reqs=ssl.CERT_REQUIRED, ) mqttc.connect(mqtt_broker_host, port=mqtt_broker_port, keepalive=1) - mqttc.subscribe("meshnet", qos=0) mqttc.loop_forever() diff --git a/nat-lab/tests/test_notification_center.py b/nat-lab/tests/test_notification_center.py index 99b1a3320..c983b9b29 100644 --- a/nat-lab/tests/test_notification_center.py +++ b/nat-lab/tests/test_notification_center.py @@ -59,6 +59,15 @@ async def run_mqtt_listener( mqtt_broker_user, mqtt_broker_password, ): + stdout_buffer = [] + + async def stdout_callback(output): + print(f"MQTT Listener stdout: {output}") + stdout_buffer.append(output) + + async def stderr_callback(output): + print(f"MQTT Listener stderr: {output}") + mqtt_process = await exit_stack.enter_async_context( connection.create_process([ "python3", @@ -67,11 +76,12 @@ async def run_mqtt_listener( mqtt_broker_port, mqtt_broker_user, mqtt_broker_password, - ]).run() + ]).run(stdout_callback=stdout_callback, stderr_callback=stderr_callback) ) - while not mqtt_process.is_executing(): + while not any("Connected to MQTT Broker" in line for line in stdout_buffer): await sleep(0.1) print("Waiting for MQTT listener to start executing ...") + print(f"Stdout buffer {stdout_buffer}") print("MQTT listener is executing ...") return mqtt_process @@ -146,13 +156,12 @@ async def test_nc_register(): assert request_json["device_type"] == response.device_type assert response.traffic_routing_supported is False - mqtt_payload = mqtt_process.get_stdout() - while "message_id" not in mqtt_payload: + mqtt_stdout = mqtt_process.get_stdout() + while "message_id" not in mqtt_stdout: await sleep(0.1) - mqtt_payload = mqtt_process.get_stdout() - print("Waiting for MQTT stdout ...") + mqtt_stdout = mqtt_process.get_stdout() - print(f"MQTT stdout: {mqtt_payload}") + mqtt_payload = mqtt_stdout.split("\n")[-1] assert mqtt_payload verify_mqtt_payload(mqtt_payload) @@ -180,12 +189,11 @@ async def test_nc_register(): assert len(machines) == 0 - mqtt_payload = mqtt_process.get_stdout() - while "message_id" not in mqtt_payload: + mqtt_stdout = mqtt_process.get_stdout() + while "message_id" not in mqtt_stdout: await sleep(0.1) - mqtt_payload = mqtt_process.get_stdout() - print("Waiting for MQTT stdout ...") + mqtt_stdout = mqtt_process.get_stdout() - print(f"MQTT stdout: {mqtt_payload}") + mqtt_payload = mqtt_stdout.split("\n")[-1] assert mqtt_payload verify_mqtt_payload(mqtt_payload)