diff --git a/.unreleased/add_on_connect_callback_to_mqtt_listener b/.unreleased/add_on_connect_callback_to_mqtt_listener new file mode 100644 index 000000000..e69de29bb diff --git a/nat-lab/bin/mqtt-listener.py b/nat-lab/bin/mqtt-listener.py index caae04590..d4aff3b5a 100644 --- a/nat-lab/bin/mqtt-listener.py +++ b/nat-lab/bin/mqtt-listener.py @@ -12,12 +12,22 @@ def on_message(_client, _userdata, message): sys.exit(0) +def on_connect(client, _userdata, _flags, rc, _properties): + if rc == 0: + print("Connected to MQTT Broker") + client.subscribe("meshnet", qos=0) + else: + print(f"Failed to connect with result code: {rc}") + sys.exit(1) + + def main(mqtt_broker_host, mqtt_broker_port, mqtt_broker_user, mqtt_broker_password): mqttc = mqtt.Client( mqtt.CallbackAPIVersion.VERSION2, client_id="receiver", protocol=mqtt.MQTTv311 ) + mqttc.on_connect = on_connect mqttc.on_message = on_message mqttc.username_pw_set( @@ -32,7 +42,6 @@ def main(mqtt_broker_host, mqtt_broker_port, mqtt_broker_user, mqtt_broker_passw cert_reqs=ssl.CERT_REQUIRED, ) mqttc.connect(mqtt_broker_host, port=mqtt_broker_port, keepalive=1) - mqttc.subscribe("meshnet", qos=0) mqttc.loop_forever() diff --git a/nat-lab/tests/test_notification_center.py b/nat-lab/tests/test_notification_center.py index 99b1a3320..ec524a069 100644 --- a/nat-lab/tests/test_notification_center.py +++ b/nat-lab/tests/test_notification_center.py @@ -1,11 +1,12 @@ import base64 import json -from asyncio import sleep +from asyncio import Event from config import CORE_API_CA_CERTIFICATE_PATH, CORE_API_URL from contextlib import AsyncExitStack from dataclasses import dataclass from helpers import send_https_request, verify_uuid from utils.connection_util import ConnectionTag, new_connection_by_tag +from utils.output_notifier import OutputNotifier @dataclass @@ -58,22 +59,39 @@ async def run_mqtt_listener( mqtt_broker_port, mqtt_broker_user, mqtt_broker_password, + output_notifier, ): + stdout_buffer = [] + + async def stdout_stderr_callback(output): + print(f"MQTT Listener output: {output}") + stdout_buffer.append(output) + await output_notifier.handle_output(output) + mqtt_process = await exit_stack.enter_async_context( connection.create_process([ "python3", + "-u", "/opt/bin/mqtt-listener.py", mqtt_broker_host, mqtt_broker_port, mqtt_broker_user, mqtt_broker_password, - ]).run() + ]).run( + stdout_callback=stdout_stderr_callback, + stderr_callback=stdout_stderr_callback, + ) ) - while not mqtt_process.is_executing(): - await sleep(0.1) - print("Waiting for MQTT listener to start executing ...") - print("MQTT listener is executing ...") - return mqtt_process + + try: + await mqtt_process.wait_stdin_ready(timeout=10.0) + print("MQTT listener process stdin is ready") + except TimeoutError as e: + raise TimeoutError( + f"Timed out waiting for MQTT listener stdin readiness: {e}" + ) from e + + return stdout_buffer async def get_mqtt_broker_credentials(connection): @@ -98,6 +116,14 @@ async def get_mqtt_broker_credentials(connection): return host, port, user, password +def create_output_notifier(events): + output_notifier = OutputNotifier() + event_objects = {event: Event() for event in events} + for event, event_obj in event_objects.items(): + output_notifier.notify_output(event, event_obj) + return output_notifier, event_objects + + async def test_nc_register(): async with AsyncExitStack() as exit_stack: # Setup connections @@ -112,10 +138,25 @@ async def test_nc_register(): mqtt_host, mqtt_port, user, password = await get_mqtt_broker_credentials( connection ) - mqtt_process = await run_mqtt_listener( - exit_stack, mqtt_connection, mqtt_host, mqtt_port, user, password + + output_notifier, events = create_output_notifier( + ["Connected to MQTT Broker", "message"] + ) + connected_event = events["Connected to MQTT Broker"] + message_event = events["message"] + + stdout_buffer = await run_mqtt_listener( + exit_stack, + mqtt_connection, + mqtt_host, + mqtt_port, + user, + password, + output_notifier, ) + await connected_event.wait() + # Register machine - this is a minimal version which should be also supported request_json = { "public_key": "some-public-key", @@ -146,19 +187,30 @@ async def test_nc_register(): assert request_json["device_type"] == response.device_type assert response.traffic_routing_supported is False - mqtt_payload = mqtt_process.get_stdout() - while "message_id" not in mqtt_payload: - await sleep(0.1) - mqtt_payload = mqtt_process.get_stdout() - print("Waiting for MQTT stdout ...") + await message_event.wait() - print(f"MQTT stdout: {mqtt_payload}") + mqtt_payload = stdout_buffer[ + -1 + ] # The last item in the buffer is expected to be a message from the meshnet topic assert mqtt_payload verify_mqtt_payload(mqtt_payload) - mqtt_process = await run_mqtt_listener( - exit_stack, mqtt_connection, mqtt_host, mqtt_port, user, password + output_notifier, events = create_output_notifier( + ["Connected to MQTT Broker", "message"] + ) + connected_event = events["Connected to MQTT Broker"] + message_event = events["message"] + + stdout_buffer = await run_mqtt_listener( + exit_stack, + mqtt_connection, + mqtt_host, + mqtt_port, + user, + password, + output_notifier, ) + await connected_event.wait() await send_https_request( connection, @@ -180,12 +232,10 @@ async def test_nc_register(): assert len(machines) == 0 - mqtt_payload = mqtt_process.get_stdout() - while "message_id" not in mqtt_payload: - await sleep(0.1) - mqtt_payload = mqtt_process.get_stdout() - print("Waiting for MQTT stdout ...") + await message_event.wait() - print(f"MQTT stdout: {mqtt_payload}") + mqtt_payload = stdout_buffer[ + -1 + ] # The last item in the buffer is expected to be a message from the meshnet topic assert mqtt_payload verify_mqtt_payload(mqtt_payload)