Using aiomqtt in a class in 'fire-and-forget' way #243
Replies: 4 comments
-
Sorry there was a small mistake in I've also commented the Exception in callback BaseSelectorEventLoop.add_writer(<ssl.SSLSocke...pe=1, proto=6>, <function Cli...x7f6b10cc5940>)
handle: <Handle BaseSelectorEventLoop.add_writer(<ssl.SSLSocke...pe=1, proto=6>, <function Cli...x7f6b10cc5940>)>
Traceback (most recent call last):
File "/home/es/.pyenv/versions/3.11.4/lib/python3.11/asyncio/events.py", line 80, in _run
self._context.run(self._callback, *self._args)
File "/home/es/.pyenv/versions/3.11.4/lib/python3.11/asyncio/selector_events.py", line 350, in add_writer
self._add_writer(fd, callback, *args)
File "/home/es/.pyenv/versions/3.11.4/lib/python3.11/asyncio/selector_events.py", line 302, in _add_writer
key = self._selector.get_key(fd)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/es/.pyenv/versions/3.11.4/lib/python3.11/selectors.py", line 190, in get_key
return mapping[fileobj]
~~~~~~~^^^^^^^^^
File "/home/es/.pyenv/versions/3.11.4/lib/python3.11/selectors.py", line 71, in __getitem__
fd = self._selector._fileobj_lookup(fileobj)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/es/.pyenv/versions/3.11.4/lib/python3.11/selectors.py", line 225, in _fileobj_lookup
return _fileobj_to_fd(fileobj)
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/es/.pyenv/versions/3.11.4/lib/python3.11/selectors.py", line 42, in _fileobj_to_fd
raise ValueError("Invalid file descriptor: {}".format(fd))
ValueError: Invalid file descriptor: -1
Caught exception in on_socket_unregister_write: [Errno 2] No such file or directory
MQTT error "[Errno 2] No such file or directory". Reconnecting in 5 seconds.
Exception in callback BaseSelectorEventLoop.add_reader(7, <function Cli...x7f6b10cc5800>)
handle: <Handle BaseSelectorEventLoop.add_reader(7, <function Cli...x7f6b10cc5800>)>
Traceback (most recent call last):
File "/home/es/.pyenv/versions/3.11.4/lib/python3.11/asyncio/events.py", line 80, in _run
self._context.run(self._callback, *self._args)
File "/home/es/.pyenv/versions/3.11.4/lib/python3.11/asyncio/selector_events.py", line 340, in add_reader
self._add_reader(fd, callback, *args)
File "/home/es/.pyenv/versions/3.11.4/lib/python3.11/asyncio/selector_events.py", line 271, in _add_reader
self._selector.modify(fd, mask | selectors.EVENT_READ,
File "/home/es/.pyenv/versions/3.11.4/lib/python3.11/selectors.py", line 389, in modify
self._selector.modify(key.fd, selector_events)
FileNotFoundError: [Errno 2] No such file or directory Here is the updated code: import asyncio
import aiomqtt
#import paho.mqtt as mqtt
import ssl
from dotenv import load_dotenv
import os
import logging
class MessageLogger:
def __init__(self) -> None:
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.DEBUG)
self.messages = []
load_dotenv()
self.client_id = 'dev'
self.MQTTS_BROKER = os.getenv("MQTTS_BROKER")
self.MQTTS_PORT = int(os.getenv("MQTTS_PORT"))
self.MQTTS_USERNAME = os.getenv("MQTTS_USERNAME")
self.MQTTS_PASSWORD = os.getenv("MQTTS_PASSWORD")
self.FIWARE = os.getenv("FIWARE")
self.ATTRS = os.getenv("ATTRS")
self.tls_params = aiomqtt.TLSParameters(
ca_certs=None,
certfile=None,
keyfile=None,
cert_reqs=ssl.CERT_NONE,
tls_version=ssl.PROTOCOL_TLSv1_2,
ciphers=None,
)
async def _maintain_connection(self) -> None:
reconnect_interval = 5
while True:
try:
self.logger.info("Connecting to {} MQTT server".format(self.MQTTS_BROKER))
await self.client.connect()
self.logger.info("Connected")
except aiomqtt.MqttError as error:
self.logger.error(f'MQTT error "{error}". Reconnecting in {reconnect_interval} seconds.')
await asyncio.sleep(reconnect_interval)
except Exception as error:
print(f'Error "{error}"..')
async def __aenter__(self):
self.client = aiomqtt.Client(
hostname = self.MQTTS_BROKER,
port = self.MQTTS_PORT,
username = self.MQTTS_USERNAME,
password = self.MQTTS_PASSWORD,
client_id = self.client_id,
tls_params = self.tls_params,
keepalive = 60,
)
await self.client.__aenter__()
#self._reconnect_task = asyncio.create_task(self._maintain_connection())
return self
async def __aexit__(self, exc_type, exc_value, traceback):
self.logger.info("Stopping MQTT")
self._reconnect_task.cancel()
#await self._reconnect_task
await self.client.__aexit__(exc_type, exc_value, traceback)
async def listen(self):
reconnect_interval = 5
asyncio.create_task(self._maintain_connection())
while True:
try:
async with self.client.messages() as messages:
await self.client.subscribe(f"{self.FIWARE}{self.ATTRS}") #
self.logger.info("subscribed")
async for message in messages:
message_ = message.payload.decode("utf-8")
self.logger.info(message_)
self.messages.append(message_)
except aiomqtt.MqttError as error:
self.logger.error(f'Error "{error}". Reconnecting in {reconnect_interval} seconds.')
await asyncio.sleep(reconnect_interval)
except Exception as error:
print(f'Error "{error}"..')
async def main():
tasks = set()
message_logger = MessageLogger()
try:
async with message_logger:
print("...")
# Other things I've tried
# L = await asyncio.gather(message_logger.listen()) # This also stops here
#loop = asyncio.new_event_loop()
#asyncio.set_event_loop(loop)
#task = loop.create_task(message_logger.listen()) # RuntimeWarning: coroutine 'MessageLogger.listen' was never awaited
task = asyncio.create_task(message_logger.listen())
task.add_done_callback(tasks.discard)
print("add_done_callback") # never reached
tasks.add(task)
print("created")
await asyncio.gather(*tasks)
print("never reached") # never reached
print(message_logger.messages)
except Exception as error:
print(f'Error "{error}"..')
asyncio.run(main()) |
Beta Was this translation helpful? Give feedback.
-
I'm still unable to run even a more simple example without using a class almost verbatim from the doc: import asyncio
import aiomqtt
from dotenv import load_dotenv
import ssl
import os
load_dotenv()
client_id = "dev"
MQTTS_BROKER = os.getenv("MQTTS_BROKER")
MQTTS_PORT = int(os.getenv("MQTTS_PORT"))
MQTTS_USERNAME = os.getenv("MQTTS_USERNAME")
MQTTS_PASSWORD = os.getenv("MQTTS_PASSWORD")
FIWARE = os.getenv("FIWARE")
ATTRS = os.getenv("ATTRS")
tls_params = aiomqtt.TLSParameters(
ca_certs=None,
certfile=None,
keyfile=None,
cert_reqs=ssl.CERT_NONE,
tls_version=ssl.PROTOCOL_TLSv1_2,
ciphers=None,
)
client = aiomqtt.Client(
hostname=MQTTS_BROKER,
port=MQTTS_PORT,
username=MQTTS_USERNAME,
password=MQTTS_PASSWORD,
client_id=client_id,
tls_params=tls_params,
keepalive=60,
)
async def connect():
try:
await client.connect()
except Exception as error:
print(f'Error "{error}"..')
async def listen():
#await connect()
try:
topic = f"{FIWARE}{ATTRS}"
async with client.messages() as messages:
await client.subscribe(topic)
async for message in messages:
print(message.payload)
except Exception as error:
print(f'Error "{error}"..')
async def publish():
try:
await client.publish("temperature/outside", payload=28.4)
except Exception as error:
print(f'Error "{error}"..')
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(connect())
tg.create_task(listen())
tg.create_task(publish())
while True:
await asyncio.sleep(1)
except Exception as error:
print(f'Error "{error}"..')
asyncio.run(main()) I got:
|
Beta Was this translation helpful? Give feedback.
-
Hi Eli, Thanks for opening this discussion! Looks like Let's start as simple as possible. Did you find the section "Listening without blocking" in our docs? If I got your comments and code right, you want to (1.) receive MQTT messages, (2.) send MQTT messages, and (3.) run other This means that we need to combine the example from the "Listening without blocking" section with the example from the "Sharing the connection" section. (This is why your last example didn't work; The client only connects when it runs into the "with" context manager.) This is a minimal working example using the mosquitto test broker that does all three: import asyncio
import aiomqtt
async def sleep(seconds):
await asyncio.sleep(seconds)
print(f"Slept for {seconds} seconds!")
async def publish(client):
await client.publish("temperature/outside", 25.0)
await client.publish("temperature/inside", 21.5)
async def listen(client):
async with client.messages() as messages:
await client.subscribe("temperature/#")
async for message in messages:
print(message.payload)
async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
async with asyncio.TaskGroup() as tg:
tg.create_task(listen(client))
tg.create_task(sleep(1)) # Simulates some other async task
tg.create_task(publish(client))
tg.create_task(sleep(3))
asyncio.run(main()) You'll notice that the two "sleep" tasks and the "publish" task finish, but the program doesn't finish, because the "listener" task keeps waiting on messages. If this is not what you want, you can have a look at the "Stop listening" section in our docs. Let me know if that helps! 😊 I'd also be happy to know if/how we can make our docs clearer on this! |
Beta Was this translation helpful? Give feedback.
-
Hi Felix, I suggest to add some docs on how to use it a class and/or improve the code to make it easy to use. I've end up making my fork of async-paho-mqtt-client apply some fix, and this works well for me because I can use the on_message callback in my async class. |
Beta Was this translation helpful? Give feedback.
-
Greetings, I'm trying my best to make aiomqtt work inside my class where other unrelated async operation needs to happen (like writting to db, websockets server etc), but so far I can't get it to listen and do other things afterwards.
Trying to stick to the official example and your doc and #198 this is the code I've tried so far:
No new messages are printed and the code never reach the
print("never reached")
As you can see from the commented lines I've also tried with other two ways: calling directly
L = await asyncio.gather(message_logger.listen())
that also results in a blocking task,and by creating a new loop:
that returns with
RuntimeWarning: coroutine 'MessageLogger.listen' was never awaited
I'm not familiar with asyncio but all this seems a bit odd that there is no straightforward way to call another async function in the background and keep executing code afterward, am I missing something?
Beta Was this translation helpful? Give feedback.
All reactions