You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
hello community,
I am trying to implement a aiomqtt client to listen to mqtt messages from within a fastapi app and have controls to start and stop the client with some socketio events.
this is my aiomqtt based MqttReceiver class:
import asyncio
import logging
import aiomqtt
logging.basicConfig(format='%(filename)s: %(message)s', level=logging.DEBUG)
class MqttReceiver:
def __init__(self, hostname: str, port: int, username: str, password: str, topics: list, loop: asyncio.AbstractEventLoop):
self.hostname = hostname
self.port = port
self.username = username
self.password = password
self.topics = topics
self.client = aiomqtt.Client(
hostname=self.hostname,
port=self.port,
username=self.username,
password=self.password
)
self.task = None
self.loop = loop
async def listen(self):
async with self.client:
for topic in self.topics:
await self.client.subscribe(topic)
async for message in self.client.messages:
logging.info(f"Received MQTT message: {message.payload.decode()}")
async def start(self, ):
if self.task and not self.task.done():
logging.warning("MQTT receiver is already running.")
return
self.task = self.loop.create_task(self.listen())
logging.debug("MQTT receiver started.")
async def stop(self):
if self.task:
self.task.cancel()
try:
await self.task
except asyncio.CancelledError:
logging.info("MQTT receiver task was cancelled.")
finally:
self.task = None
logging.debug("MQTT receiver stopped.")
I have ssen the example where the client can be run as lifespan event of the fastapi app, but I want to keep the mqtt receiver modular in a separate class and then be able to return messages received by the client back to the fastapi app and combine it with other data nd emit a sio event.
I get the concept that the aiomqtt client must run as an asyncio task on the same event loop as the fastapi app, but how can it be achieved when the client is designed as a separate calss with controls to start ans stop it as needed?
It is my first time using fastapi, aiomqtt and async approach, so please bear with any rookie mistakes :)
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
hello community,
I am trying to implement a aiomqtt client to listen to mqtt messages from within a fastapi app and have controls to start and stop the client with some socketio events.
this is my aiomqtt based MqttReceiver class:
and this is my fastapi app:
I have ssen the example where the client can be run as lifespan event of the fastapi app, but I want to keep the mqtt receiver modular in a separate class and then be able to return messages received by the client back to the fastapi app and combine it with other data nd emit a sio event.
I get the concept that the aiomqtt client must run as an asyncio task on the same event loop as the fastapi app, but how can it be achieved when the client is designed as a separate calss with controls to start ans stop it as needed?
It is my first time using fastapi, aiomqtt and async approach, so please bear with any rookie mistakes :)
Beta Was this translation helpful? Give feedback.
All reactions