From 7ef1f810fd92ddf8c71a1830c7efb40fd982961d Mon Sep 17 00:00:00 2001 From: Oleksandr Oliinyk Date: Tue, 17 Dec 2024 15:14:10 +0100 Subject: [PATCH] Add on_connect callback to mqtt listener --- nat-lab/bin/mqtt-listener.py | 11 ++++- nat-lab/tests/test_notification_center.py | 53 +++++++++++++++-------- 2 files changed, 45 insertions(+), 19 deletions(-) diff --git a/nat-lab/bin/mqtt-listener.py b/nat-lab/bin/mqtt-listener.py index caae04590..d4aff3b5a 100644 --- a/nat-lab/bin/mqtt-listener.py +++ b/nat-lab/bin/mqtt-listener.py @@ -12,12 +12,22 @@ def on_message(_client, _userdata, message): sys.exit(0) +def on_connect(client, _userdata, _flags, rc, _properties): + if rc == 0: + print("Connected to MQTT Broker") + client.subscribe("meshnet", qos=0) + else: + print(f"Failed to connect with result code: {rc}") + sys.exit(1) + + def main(mqtt_broker_host, mqtt_broker_port, mqtt_broker_user, mqtt_broker_password): mqttc = mqtt.Client( mqtt.CallbackAPIVersion.VERSION2, client_id="receiver", protocol=mqtt.MQTTv311 ) + mqttc.on_connect = on_connect mqttc.on_message = on_message mqttc.username_pw_set( @@ -32,7 +42,6 @@ def main(mqtt_broker_host, mqtt_broker_port, mqtt_broker_user, mqtt_broker_passw cert_reqs=ssl.CERT_REQUIRED, ) mqttc.connect(mqtt_broker_host, port=mqtt_broker_port, keepalive=1) - mqttc.subscribe("meshnet", qos=0) mqttc.loop_forever() diff --git a/nat-lab/tests/test_notification_center.py b/nat-lab/tests/test_notification_center.py index 99b1a3320..d91cb80fd 100644 --- a/nat-lab/tests/test_notification_center.py +++ b/nat-lab/tests/test_notification_center.py @@ -59,21 +59,34 @@ async def run_mqtt_listener( mqtt_broker_user, mqtt_broker_password, ): + stdout_buffer = [] + + async def stdout_callback(output): + print(f"MQTT Listener stdout: {output}") + stdout_buffer.append(output) + + async def stderr_callback(output): + print(f"MQTT Listener stderr: {output}") + mqtt_process = await exit_stack.enter_async_context( connection.create_process([ "python3", + "-u", "/opt/bin/mqtt-listener.py", mqtt_broker_host, mqtt_broker_port, mqtt_broker_user, mqtt_broker_password, - ]).run() + ]).run(stdout_callback=stdout_callback, stderr_callback=stderr_callback) ) - while not mqtt_process.is_executing(): - await sleep(0.1) - print("Waiting for MQTT listener to start executing ...") - print("MQTT listener is executing ...") - return mqtt_process + + try: + await mqtt_process.wait_stdin_ready(timeout=10.0) + print("MQTT listener process stdin is ready") + except TimeoutError: + raise TimeoutError("Timed out waiting for MQTT listener stdin readiness") + + return mqtt_process, stdout_buffer async def get_mqtt_broker_credentials(connection): @@ -112,7 +125,7 @@ async def test_nc_register(): mqtt_host, mqtt_port, user, password = await get_mqtt_broker_credentials( connection ) - mqtt_process = await run_mqtt_listener( + mqtt_process, stdout_buffer = await run_mqtt_listener( exit_stack, mqtt_connection, mqtt_host, mqtt_port, user, password ) @@ -127,6 +140,9 @@ async def test_nc_register(): machines_endpoint = f"{CORE_API_URL}/v1/meshnet/machines" payload = json.dumps(request_json, separators=(",", ":")) + while not any("Connected to MQTT Broker" in line for line in stdout_buffer): + await sleep(0.1) + https_process_stdout = await send_https_request( connection, machines_endpoint, @@ -146,20 +162,22 @@ async def test_nc_register(): assert request_json["device_type"] == response.device_type assert response.traffic_routing_supported is False - mqtt_payload = mqtt_process.get_stdout() - while "message_id" not in mqtt_payload: + while not any("message_id" in line for line in stdout_buffer): await sleep(0.1) - mqtt_payload = mqtt_process.get_stdout() - print("Waiting for MQTT stdout ...") - print(f"MQTT stdout: {mqtt_payload}") + mqtt_payload = stdout_buffer[ + -1 + ] # The last item in the buffer is expected to be a message from the meshnet topic assert mqtt_payload verify_mqtt_payload(mqtt_payload) - mqtt_process = await run_mqtt_listener( + mqtt_process, stdout_buffer = await run_mqtt_listener( exit_stack, mqtt_connection, mqtt_host, mqtt_port, user, password ) + while not any("Connected to MQTT Broker" in line for line in stdout_buffer): + await sleep(0.1) + await send_https_request( connection, f"{machines_endpoint}/{identifier}", @@ -180,12 +198,11 @@ async def test_nc_register(): assert len(machines) == 0 - mqtt_payload = mqtt_process.get_stdout() - while "message_id" not in mqtt_payload: + while not any("message_id" in line for line in stdout_buffer): await sleep(0.1) - mqtt_payload = mqtt_process.get_stdout() - print("Waiting for MQTT stdout ...") - print(f"MQTT stdout: {mqtt_payload}") + mqtt_payload = stdout_buffer[ + -1 + ] # The last item in the buffer is expected to be a message from the meshnet topic assert mqtt_payload verify_mqtt_payload(mqtt_payload)