Skip to content

Commit

Permalink
py: match sync/async code
Browse files Browse the repository at this point in the history
  • Loading branch information
jordens committed Nov 28, 2024
1 parent c238c5e commit 8a028ff
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 18 deletions.
35 changes: 19 additions & 16 deletions py/miniconf-mqtt/miniconf/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import uuid
from typing import Dict, Any

from aiomqtt import Client, Message, MqttError
from paho.mqtt.properties import Properties, PacketTypes
import paho.mqtt
from paho.mqtt.properties import Properties, PacketTypes
from aiomqtt import Client, Message, MqttError

MQTTv5 = paho.mqtt.enums.MQTTProtocolVersion.MQTTv5

Expand Down Expand Up @@ -111,18 +111,17 @@ def _dispatch(self, message: Message):
LOGGER.warning("Discarding message without response code user property")
return

response = message.payload.decode("utf-8")
resp = message.payload.decode("utf-8")
if code == "Continue":
ret.append(response)
return

if code == "Ok":
if response:
ret.append(response)
ret.append(resp)
elif code == "Ok":
if resp:
ret.append(resp)
fut.set_result(ret)
del self._inflight[cd]
else:
fut.set_exception(MiniconfException(code, response))
del self._inflight[cd]
fut.set_exception(MiniconfException(code, resp))
del self._inflight[cd]

async def _do(self, topic: str, *, response=1, **kwargs):
response = int(response)
Expand Down Expand Up @@ -150,6 +149,7 @@ async def _do(self, topic: str, *, response=1, **kwargs):
return ret[0]
assert ret
return ret
return None

async def set(self, path: str, value, retain=False, response=True, **kwargs):
"""Write the provided data to the specified path.
Expand Down Expand Up @@ -206,7 +206,10 @@ async def clear(self, path: str, response=True, **kwargs):
path: The path to clear. Must be a leaf node.
"""
return await self._do(
f"{self.prefix}/settings{path}", retain=True, response=response, **kwargs
f"{self.prefix}/settings{path}",
retain=True,
response=response,
**kwargs,
)


Expand Down Expand Up @@ -234,10 +237,6 @@ async def discover(
suffix = "/alive"
topic = f"{prefix}{suffix}"

t_start = asyncio.get_running_loop().time()
await client.subscribe(topic)
t_subscribe = asyncio.get_running_loop().time() - t_start

async def listen():
async for message in client.messages:
peer = message.topic.value.removesuffix(suffix)
Expand All @@ -249,6 +248,10 @@ async def listen():
logging.info(f"Discovered {peer} alive")
discovered[peer] = payload

t_start = asyncio.get_running_loop().time()
await client.subscribe(topic)
t_subscribe = asyncio.get_running_loop().time() - t_start

try:
await asyncio.wait_for(
listen(), timeout=rel_timeout * t_subscribe + abs_timeout
Expand Down
9 changes: 7 additions & 2 deletions py/miniconf-mqtt/miniconf/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ class Miniconf:
"""Miniconf over MQTT (synchronous)"""

def __init__(self, client: Client, prefix: str):
"""
Args:
client: A connected MQTT5 client.
prefix: The MQTT toptic prefix of the device to control.
"""
self.client = client
self.prefix = prefix
self.response_topic = f"{prefix}/response"
Expand Down Expand Up @@ -173,6 +178,8 @@ def get(self, path: str, **kwargs):
def clear(self, path: str, response=True, **kwargs):
"""Clear retained value from a path.
This does not change (`set()`) or reset/clear the value on the device.
Args:
path: The path to clear. Must be a leaf node.
"""
Expand Down Expand Up @@ -208,8 +215,6 @@ def discover(
suffix = "/alive"
topic = f"{prefix}{suffix}"

discovered = {}

def on_message(_client, _userdata, message):
logging.debug(f"Got message from {message.topic}: {message.payload}")
peer = message.topic.removesuffix(suffix)
Expand Down

0 comments on commit 8a028ff

Please sign in to comment.