Skip to content

Commit

Permalink
Add on_connect callback to mqtt listener
Browse files Browse the repository at this point in the history
  • Loading branch information
olekoliinyk committed Dec 18, 2024
1 parent 07f2d21 commit 7ef1f81
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 19 deletions.
11 changes: 10 additions & 1 deletion nat-lab/bin/mqtt-listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,22 @@ def on_message(_client, _userdata, message):
sys.exit(0)


def on_connect(client, _userdata, _flags, rc, _properties):
if rc == 0:
print("Connected to MQTT Broker")
client.subscribe("meshnet", qos=0)
else:
print(f"Failed to connect with result code: {rc}")
sys.exit(1)


def main(mqtt_broker_host, mqtt_broker_port, mqtt_broker_user, mqtt_broker_password):

mqttc = mqtt.Client(
mqtt.CallbackAPIVersion.VERSION2, client_id="receiver", protocol=mqtt.MQTTv311
)

mqttc.on_connect = on_connect
mqttc.on_message = on_message

mqttc.username_pw_set(
Expand All @@ -32,7 +42,6 @@ def main(mqtt_broker_host, mqtt_broker_port, mqtt_broker_user, mqtt_broker_passw
cert_reqs=ssl.CERT_REQUIRED,
)
mqttc.connect(mqtt_broker_host, port=mqtt_broker_port, keepalive=1)
mqttc.subscribe("meshnet", qos=0)
mqttc.loop_forever()


Expand Down
53 changes: 35 additions & 18 deletions nat-lab/tests/test_notification_center.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,34 @@ async def run_mqtt_listener(
mqtt_broker_user,
mqtt_broker_password,
):
stdout_buffer = []

async def stdout_callback(output):
print(f"MQTT Listener stdout: {output}")
stdout_buffer.append(output)

async def stderr_callback(output):
print(f"MQTT Listener stderr: {output}")

mqtt_process = await exit_stack.enter_async_context(
connection.create_process([
"python3",
"-u",
"/opt/bin/mqtt-listener.py",
mqtt_broker_host,
mqtt_broker_port,
mqtt_broker_user,
mqtt_broker_password,
]).run()
]).run(stdout_callback=stdout_callback, stderr_callback=stderr_callback)
)
while not mqtt_process.is_executing():
await sleep(0.1)
print("Waiting for MQTT listener to start executing ...")
print("MQTT listener is executing ...")
return mqtt_process

try:
await mqtt_process.wait_stdin_ready(timeout=10.0)
print("MQTT listener process stdin is ready")
except TimeoutError:
raise TimeoutError("Timed out waiting for MQTT listener stdin readiness")

return mqtt_process, stdout_buffer


async def get_mqtt_broker_credentials(connection):
Expand Down Expand Up @@ -112,7 +125,7 @@ async def test_nc_register():
mqtt_host, mqtt_port, user, password = await get_mqtt_broker_credentials(
connection
)
mqtt_process = await run_mqtt_listener(
mqtt_process, stdout_buffer = await run_mqtt_listener(
exit_stack, mqtt_connection, mqtt_host, mqtt_port, user, password
)

Expand All @@ -127,6 +140,9 @@ async def test_nc_register():
machines_endpoint = f"{CORE_API_URL}/v1/meshnet/machines"
payload = json.dumps(request_json, separators=(",", ":"))

while not any("Connected to MQTT Broker" in line for line in stdout_buffer):
await sleep(0.1)

https_process_stdout = await send_https_request(
connection,
machines_endpoint,
Expand All @@ -146,20 +162,22 @@ async def test_nc_register():
assert request_json["device_type"] == response.device_type
assert response.traffic_routing_supported is False

mqtt_payload = mqtt_process.get_stdout()
while "message_id" not in mqtt_payload:
while not any("message_id" in line for line in stdout_buffer):
await sleep(0.1)
mqtt_payload = mqtt_process.get_stdout()
print("Waiting for MQTT stdout ...")

print(f"MQTT stdout: {mqtt_payload}")
mqtt_payload = stdout_buffer[
-1
] # The last item in the buffer is expected to be a message from the meshnet topic
assert mqtt_payload
verify_mqtt_payload(mqtt_payload)

mqtt_process = await run_mqtt_listener(
mqtt_process, stdout_buffer = await run_mqtt_listener(
exit_stack, mqtt_connection, mqtt_host, mqtt_port, user, password
)

while not any("Connected to MQTT Broker" in line for line in stdout_buffer):
await sleep(0.1)

await send_https_request(
connection,
f"{machines_endpoint}/{identifier}",
Expand All @@ -180,12 +198,11 @@ async def test_nc_register():

assert len(machines) == 0

mqtt_payload = mqtt_process.get_stdout()
while "message_id" not in mqtt_payload:
while not any("message_id" in line for line in stdout_buffer):
await sleep(0.1)
mqtt_payload = mqtt_process.get_stdout()
print("Waiting for MQTT stdout ...")

print(f"MQTT stdout: {mqtt_payload}")
mqtt_payload = stdout_buffer[
-1
] # The last item in the buffer is expected to be a message from the meshnet topic
assert mqtt_payload
verify_mqtt_payload(mqtt_payload)

0 comments on commit 7ef1f81

Please sign in to comment.