diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f5743f04..8fbd2587 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,7 +8,7 @@ jobs: runs-on: ubuntu-20.04 strategy: matrix: - python-version: [3.7, 3.8] + python-version: [3.8] steps: - uses: actions/checkout@v2 @@ -67,14 +67,14 @@ jobs: TAG_NAME=${GITHUB_REF##*/} # Change slashes to hyphens TAG_NAME=${TAG_NAME//\//-} - docker buildx build --platform linux/arm/v7,linux/arm64/v8,linux/amd64 -t flyte/mqtt-io:${TAG_NAME} -t flyte/mqtt-io:latest --push --build-arg BUILDX_QEMU_ENV=true . + docker buildx build --platform linux/arm/v7,linux/arm64/v8,linux/amd64 -t flyte/mqtt-io:${TAG_NAME} -t flyte/mqtt-io:latest --push . - if: github.event_name == 'push' run: | # Remove prefix push TAG_NAME=${GITHUB_REF##*/} # Change slashes to hyphens TAG_NAME=${TAG_NAME//\//-} - docker buildx build --platform linux/arm/v7,linux/arm64/v8,linux/amd64 -t flyte/mqtt-io:${TAG_NAME} --push --build-arg BUILDX_QEMU_ENV=true . + docker buildx build --platform linux/arm/v7,linux/arm64/v8,linux/amd64 -t flyte/mqtt-io:${TAG_NAME} --push . generate_docs: name: Generate Documentation @@ -85,12 +85,19 @@ jobs: - uses: actions/checkout@v2 with: fetch-depth: 0 - - id: branch-name - uses: tj-actions/branch-names@v2.2 - - name: Ensure we can checkout gh-pages + - name: Get branch names + id: branch-names + uses: tj-actions/branch-names@v8 + - name: Ensure we can checkout gh-pages for release (${{ steps.branch-names.outputs.tag }}) + if: steps.branch-names.outputs.is_tag == 'true' run: | git checkout gh-pages - git checkout ${{ steps.branch-name.outputs.current_branch }} + git checkout ${{ steps.branch-names.outputs.tag }} + - name: Ensure we can checkout gh-pages for pr (${{ steps.branch-names.outputs.current_branch }}) + if: steps.branch-names.outputs.is_tag == 'false' + run: | + git checkout gh-pages + git checkout ${{ steps.branch-names.outputs.current_branch }} - name: Setup Python 3.8 uses: actions/setup-python@v2 with: @@ -98,6 +105,8 @@ jobs: - name: Install dependencies and generate docs run: | pip install poetry + ls + git status poetry install poetry run python docs_src/generate_docs.py diff --git a/CHANGELOG.md b/CHANGELOG.md index f0598a41..52b65484 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,34 @@ Unreleased ========== - Nothing! +.v2.4.0 - 2024-07-20 +==================== +- Bump tj-actions/branch-names from 2.2 to 7.0.7 in /.github/workflows by @dependabot in https://github.com/flyte/mqtt-io/pull/339 +- # Fix for poetry/docutils related bug by @BenjiU in https://github.com/flyte/mqtt-io/pull/367 +- upgrade DHT11/DHT22 backing library by @pansila in https://github.com/flyte/mqtt-io/pull/297 +- Install gcc for slim docker to build rpi.gpio on demand by @BenjiU in https://github.com/flyte/mqtt-io/pull/368 +- Remove lint warnings from bmp085.py by @BenjiU in https://github.com/flyte/mqtt-io/pull/375 +- Add support for YF-S201 flow rate sensor by @linucks in https://github.com/flyte/mqtt-io/pull/370 +- Support for ENS160 digital multi-gas sensor with multiple IAQ data (TVOC, eCO2, AQI) by @linucks in https://github.com/flyte/mqtt-io/pull/371 +- feat: add MH-Z19 sensor module by @kleest in https://github.com/flyte/mqtt-io/pull/365 +- Add Support for Sunxi Linux Boards by @fabys77 in https://github.com/flyte/mqtt-io/pull/100 + +.v2.3.0 - 2024-03-01 +==================== +- 324 pinned pyyaml version incompatible with latest cython 300 by @BenjiU in #325 +- fix pipeline for tagging by @BenjiU in #323 +- pin pyyaml to v6.0.1 by @BenjiU in #326 +- Add new module for sensor adxl345 by @birdie1 in #223 +- Sensor INA219: Use optional i2c_bus_num by @mschlenstedt in #328 +- Update ads1x15.py by @maxthebuch in #329 +- repeat subscribe when reconnected to MQTT broker by @JohannesHennecke in #337 +- Fix non-unique identifiers reporting to HA by @dolai1 in #345 +- docker: use a "slim" base image by @chatziko in #342 +- Fix applying mqtt.reconnect_count by reordering except clauses by @zzeekk in #331 +- Add PMS5003 Particulate Sensor by @johnwang16 in #346 +- gpiod: enable pullup/pulldown by @chatziko in #341 +- docker: slim image, use rustup, build deps only on armv7 by @chatziko in #352 + .v2.2.9d - 2023-07-18 ==================== - new sensors diff --git a/Dockerfile b/Dockerfile index aa53a1b5..b9b7d065 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,7 +3,7 @@ # when building multiarch using buildx, then try this: # https://github.com/docker/buildx/issues/495#issuecomment-761562905 -FROM python:3.8-buster AS base +FROM python:3.8-slim-buster AS base ENV LANG C.UTF-8 ENV LC_ALL C.UTF-8 @@ -11,23 +11,21 @@ ENV LC_ALL C.UTF-8 FROM base AS requirements -ARG BUILDX_QEMU_ENV -RUN apt-get update && \ - apt-get install -y lsb-release rustc libssl-dev libffi-dev python3-venv && \ - apt-get clean && \ - rm -rf /var/lib/apt-/lists/* +# On linux/arm/v7 the cryptography package has no binary wheel, so we need its build +# dependencies. For rust, install the latest from rustup to avoid issues. +ARG TARGETPLATFORM +RUN if [ "${TARGETPLATFORM}" = "linux/arm/v7" ]; then \ + apt-get update && \ + apt-get install -y lsb-release curl g++ pkg-config libssl-dev libffi-dev && \ + \ + (curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | bash -s -- -y) \ + fi +ENV PATH="/root/.cargo/bin:${PATH}" -# This nonsense is required for this reason: -# https://github.com/JonasAlfredsson/docker-nginx-certbot/issues/30 - -RUN pip install --no-cache-dir wheel setuptools-rust && \ - if [ "${BUILDX_QEMU_ENV}" = "true" -a "$(getconf LONG_BIT)" = "32" ]; then \ - pip install --no-cache-dir cryptography==3.3.2; \ - fi && \ - pip install --no-cache-dir poetry COPY pyproject.toml ./ -RUN poetry export -o /requirements.txt && \ +RUN pip install --no-cache-dir poetry && \ + poetry export -o /requirements.txt && \ mkdir -p /home/mqtt_io && \ python -m venv /home/mqtt_io/venv && \ /home/mqtt_io/venv/bin/pip install wheel @@ -35,15 +33,19 @@ RUN poetry export -o /requirements.txt && \ FROM base +# Install gcc so packages installed durring runtime may be build +RUN apt-get update && apt-get install -y gcc && gcc --version + RUN useradd -m -s /bin/bash mqtt_io USER mqtt_io WORKDIR /home/mqtt_io COPY --from=requirements --chown=mqtt_io /home/mqtt_io/venv ./venv COPY --from=requirements /requirements.txt ./ -RUN venv/bin/python -m pip install --upgrade pip -RUN venv/bin/pip install -r requirements.txt +RUN venv/bin/python -m pip install --no-cache-dir --upgrade pip +RUN venv/bin/pip install --no-cache-dir -r requirements.txt COPY --chown=mqtt_io mqtt_io mqtt_io +RUN gcc --version CMD [ "venv/bin/python", "-m", "mqtt_io", "/config.yml" ] diff --git a/README.md b/README.md index 8de16985..37e72ae8 100644 --- a/README.md +++ b/README.md @@ -37,10 +37,16 @@ Hardware support is provided by specific GPIO, Sensor and Stream modules. It's e - BME680 temperature, humidity and pressure sensor (`bme680`) - DHT11/DHT22/AM2302 temperature and humidity sensors (`dht22`) - DS18S20/DS1822/DS18B20/DS1825/DS28EA00/MAX31850K temperature sensors (`ds18b`) + - ENS160 digital multi-gas sensor with multiple IAQ data (TVOC, eCO2, AQI) (`ens160`) - HCSR04 ultrasonic range sensor (connected to the Raspberry Pi on-board GPIO) (`hcsr04`) - INA219 DC current sensor (`ina219`) - LM75 temperature sensor (`lm75`) - MCP3008 analog to digital converter (`mcp3008`) + - ADXl345 3-axis accelerometer up to ±16g (`adxl345`) + - PMS5003 particulate sensor (`pms5003`) + - SHT40/SHT41/SHT45 temperature and humidity sensors (`sht4x`) + - YF-S201 flow rate sensor (`yfs201`) + - FLOWSENSOR generic flow rate sensor like YF-S201 or YF-DN50 (`flowsensor`) ### Streams diff --git a/config.example.yml b/config.example.yml index 62e8819c..b19327d1 100644 --- a/config.example.yml +++ b/config.example.yml @@ -34,6 +34,11 @@ sensor_modules: type: DS18S20 address: 000803702e49 + - name: mhz19 + module: mhz19 + device: "/dev/ttyS1" + range: 5000 + digital_inputs: - name: button module: raspberrypi @@ -89,3 +94,9 @@ stream_modules: ha_discovery: unique_id: mmwave_uart name: MMWave UART + - name: co2_mhz19 + module: mhz19 + interval: 30 + ha_discovery: + name: CO2 MH-Z19 + device_class: carbon_dioxide diff --git a/mqtt_io/__init__.py b/mqtt_io/__init__.py index 4fa6d0d2..723f9a1d 100644 --- a/mqtt_io/__init__.py +++ b/mqtt_io/__init__.py @@ -2,4 +2,4 @@ Top level of MQTT IO package. """ -VERSION = "2.2.9" +VERSION = "2.5.1" diff --git a/mqtt_io/config/__init__.py b/mqtt_io/config/__init__.py index acbc72df..876be1b6 100644 --- a/mqtt_io/config/__init__.py +++ b/mqtt_io/config/__init__.py @@ -140,11 +140,11 @@ def custom_validate_main_config(config: ConfigType) -> ConfigType: bad_configs: Dict[str, Dict[str, List[str]]] = {} # Make sure each of the IO configs refer to an existing module config - module_and_io_sections = dict( - gpio_modules=("digital_inputs", "digital_outputs"), - sensor_modules=("sensor_inputs",), - stream_modules=("stream_reads", "stream_writes"), - ) + module_and_io_sections = { + "gpio_modules": ("digital_inputs", "digital_outputs"), + "sensor_modules": ("sensor_inputs",), + "stream_modules": ("stream_reads", "stream_writes"), + } for module_section, io_sections in module_and_io_sections.items(): validate_gpio_module_names(bad_configs, config, module_section, io_sections) diff --git a/mqtt_io/config/config.schema.yml b/mqtt_io/config/config.schema.yml index 59b1d3a9..3bb138e6 100644 --- a/mqtt_io/config/config.schema.yml +++ b/mqtt_io/config/config.schema.yml @@ -129,10 +129,10 @@ mqtt: description: MQTT Client implementation module path. extra_info: | There's currently only one implementation, which uses the - [asyncio-mqtt](https://github.com/sbtinstruments/asyncio-mqtt/) client. + [aiomqtt](https://github.com/sbtinstruments/aiomqtt/) client. type: string required: no - default: mqtt_io.mqtt.asyncio_mqtt + default: mqtt_io.mqtt.aiomqtt ha_discovery: type: dict required: no diff --git a/mqtt_io/constants.py b/mqtt_io/constants.py index fedcb759..1c0e7edf 100644 --- a/mqtt_io/constants.py +++ b/mqtt_io/constants.py @@ -13,7 +13,7 @@ STREAM_TOPIC = "stream" MODULE_IMPORT_PATH = "mqtt_io.modules" -MODULE_CLASS_NAMES = dict(gpio="GPIO", sensor="Sensor", stream="Stream") +MODULE_CLASS_NAMES = {"gpio": 'GPIO', "sensor": 'Sensor', "stream": 'Stream'} MQTT_SUB_PRIORITY = 1 MQTT_ANNOUNCE_PRIORITY = 2 diff --git a/mqtt_io/events.py b/mqtt_io/events.py index e58e4729..9889d34c 100644 --- a/mqtt_io/events.py +++ b/mqtt_io/events.py @@ -74,6 +74,18 @@ class StreamDataSentEvent(Event): stream_name: str data: bytes +@dataclass +class StreamDataSubscribeEvent(Event): + """ + Trigger MQTT subscribe + """ + +@dataclass +class DigitalSubscribeEvent(Event): + """ + Trigger MQTT subscribe + """ + class EventBus: """ diff --git a/mqtt_io/home_assistant.py b/mqtt_io/home_assistant.py index 2c4adcad..1c06c085 100644 --- a/mqtt_io/home_assistant.py +++ b/mqtt_io/home_assistant.py @@ -25,21 +25,21 @@ def get_common_config( Return config that's common across all HQ discovery announcements. """ disco_conf: ConfigType = mqtt_conf["ha_discovery"] - config = dict(name=io_conf["name"]) + config = {"name": io_conf['name']} config.update( - dict( - availability_topic="/".join( + { + "availability_topic": '/'.join( (mqtt_conf["topic_prefix"], mqtt_conf["status_topic"]) ), - payload_available=mqtt_conf["status_payload_running"], - payload_not_available=mqtt_conf["status_payload_dead"], - device=dict( - manufacturer="MQTT IO", - model=f"v{VERSION}", - identifiers=["mqtt-io", mqtt_options.client_id], - name=disco_conf["name"], - ), - ) + "payload_available": mqtt_conf["status_payload_running"], + "payload_not_available": mqtt_conf["status_payload_dead"], + "device": { + "manufacturer": 'MQTT IO', + "model": f'v{VERSION}', + "identifiers": [mqtt_options.client_id], + "name": disco_conf["name"], + }, + } ) config.update(io_conf.get("ha_discovery", {})) return config @@ -56,12 +56,12 @@ def hass_announce_digital_input( disco_prefix: str = disco_conf["prefix"] sensor_config = get_common_config(in_conf, mqtt_conf, mqtt_options) sensor_config.update( - dict( - unique_id=f"{mqtt_options.client_id}_{in_conf['module']}_input_{name}", - state_topic="/".join((mqtt_conf["topic_prefix"], INPUT_TOPIC, name)), - payload_on=in_conf["on_payload"], - payload_off=in_conf["off_payload"], - ) + { + "unique_id": f'{mqtt_options.client_id}_{in_conf["module"]}_input_{name}', + "state_topic": '/'.join((mqtt_conf["topic_prefix"], INPUT_TOPIC, name)), + "payload_on": in_conf["on_payload"], + "payload_off": in_conf["off_payload"], + } ) return MQTTMessageSend( "/".join( @@ -92,13 +92,13 @@ def hass_announce_digital_output( disco_prefix: str = disco_conf["prefix"] switch_config = get_common_config(out_conf, mqtt_conf, mqtt_options) switch_config.update( - dict( - unique_id=f"{mqtt_options.client_id}_{out_conf['module']}_output_{name}", - state_topic="/".join((prefix, OUTPUT_TOPIC, name)), - command_topic="/".join((prefix, OUTPUT_TOPIC, name, SET_SUFFIX)), - payload_on=out_conf["on_payload"], - payload_off=out_conf["off_payload"], - ) + { + "unique_id": f'{mqtt_options.client_id}_{out_conf["module"]}_output_{name}', + "state_topic": '/'.join((prefix, OUTPUT_TOPIC, name)), + "command_topic": '/'.join((prefix, OUTPUT_TOPIC, name, SET_SUFFIX)), + "payload_on": out_conf["on_payload"], + "payload_off": out_conf["off_payload"], + } ) return MQTTMessageSend( "/".join( @@ -129,10 +129,10 @@ def hass_announce_sensor_input( disco_prefix: str = disco_conf["prefix"] sensor_config = get_common_config(sens_conf, mqtt_conf, mqtt_options) sensor_config.update( - dict( - unique_id=f"{mqtt_options.client_id}_{sens_conf['module']}_sensor_{name}", - state_topic="/".join((prefix, SENSOR_TOPIC, name)), - ) + { + "unique_id": f'{mqtt_options.client_id}_{sens_conf["module"]}_sensor_{name}', + "state_topic": '/'.join((prefix, SENSOR_TOPIC, name)), + } ) if "expire_after" not in sensor_config: sensor_config["expire_after"] = sens_conf["interval"] * 2 + 5 diff --git a/mqtt_io/modules/gpio/__init__.py b/mqtt_io/modules/gpio/__init__.py index af9d3c2c..b6f4540c 100644 --- a/mqtt_io/modules/gpio/__init__.py +++ b/mqtt_io/modules/gpio/__init__.py @@ -11,7 +11,7 @@ import asyncio import logging from concurrent.futures import ThreadPoolExecutor -from enum import Enum, Flag, auto +from enum import Enum, Flag, IntFlag, auto from typing import Any, Callable, Dict, Iterable, List, Optional from ...types import ConfigType, PinType @@ -48,7 +48,7 @@ class InterruptEdge(Enum): BOTH = auto() -class InterruptSupport(Flag): +class InterruptSupport(IntFlag): """ Classifies the kind of support a GPIO module has for interrupts. diff --git a/mqtt_io/modules/gpio/gpiod.py b/mqtt_io/modules/gpio/gpiod.py index bfcc986e..129bea44 100644 --- a/mqtt_io/modules/gpio/gpiod.py +++ b/mqtt_io/modules/gpio/gpiod.py @@ -20,7 +20,7 @@ } -class GPIO(GenericGPIO): +class GPIO(GenericGPIO): # pylint: disable=too-many-instance-attributes """ Implementation of GPIO class for libgpiod (linux kernel >= 4.8). """ @@ -42,6 +42,12 @@ def setup_module(self) -> None: PinDirection.OUTPUT: gpiod.line_request.DIRECTION_OUTPUT, } + self.flags_map = { + PinPUD.OFF: gpiod.line_request.FLAG_BIAS_DISABLE, + PinPUD.UP: gpiod.line_request.FLAG_BIAS_PULL_UP, + PinPUD.DOWN: gpiod.line_request.FLAG_BIAS_PULL_DOWN, + } + self.interrupt_edge_map = { InterruptEdge.RISING: gpiod.line_request.EVENT_RISING_EDGE, InterruptEdge.FALLING: gpiod.line_request.EVENT_FALLING_EDGE, @@ -63,14 +69,14 @@ def setup_pin( pullup: pullup settings are not supported """ # Pullup settings are called bias in libgpiod and are only - # available since Linux Kernel 5.5. They are as of now not - # yet part of python3-gpiod. + # available since Linux Kernel 5.5. line: "gpiod.line" = self.chip.get_line(pin) line_request = self.io.line_request() line_request.consumer = "mqtt-io" line_request.request_type = self.direction_map[direction] + line_request.flags = self.flags_map[pullup] if direction == PinDirection.OUTPUT: if initial is not None: diff --git a/mqtt_io/modules/gpio/mcp23017.py b/mqtt_io/modules/gpio/mcp23017.py index 0b678d94..c1e0570c 100644 --- a/mqtt_io/modules/gpio/mcp23017.py +++ b/mqtt_io/modules/gpio/mcp23017.py @@ -32,7 +32,7 @@ class GPIO(GenericGPIO): | InterruptSupport.SET_TRIGGERS ) PIN_SCHEMA = { - "pin": dict(type="integer", required=True, min=0, max=15), + "pin": {"type": 'integer', "required": True, "min": 0, "max": 15}, } def setup_module(self) -> None: diff --git a/mqtt_io/modules/gpio/mock.py b/mqtt_io/modules/gpio/mock.py index 501061c3..32449ca9 100644 --- a/mqtt_io/modules/gpio/mock.py +++ b/mqtt_io/modules/gpio/mock.py @@ -9,7 +9,7 @@ from . import GenericGPIO, InterruptEdge, InterruptSupport, PinDirection, PinPUD REQUIREMENTS = () -CONFIG_SCHEMA = dict(test=dict(type="boolean", required=False, default=False)) +CONFIG_SCHEMA = {"test": {"type": 'boolean', "required": False, "default": False}} # pylint: disable=useless-super-delegation,too-many-instance-attributes @@ -24,7 +24,7 @@ class GPIO(GenericGPIO): | InterruptSupport.CAPTURE_REGISTER ) PIN_SCHEMA = { - "test": dict(type="boolean", required=False, default=False), + "test": {"type": 'boolean', "required": False, "default": False}, } def __init__(self, config: ConfigType): diff --git a/mqtt_io/modules/gpio/sunxi.py b/mqtt_io/modules/gpio/sunxi.py new file mode 100644 index 00000000..a418104a --- /dev/null +++ b/mqtt_io/modules/gpio/sunxi.py @@ -0,0 +1,57 @@ +""" +Sunxi Board +""" +from typing import Optional +from ...types import ConfigType, PinType +from . import GenericGPIO, PinDirection, PinPUD + +REQUIREMENTS = ("pySUNXI",) + +class GPIO(GenericGPIO): + """ + Implementation of GPIO class for Sunxi native GPIO. + """ + def setup_module(self) -> None: + # pylint: disable=import-outside-toplevel,import-error + from pySUNXI import gpio # type: ignore + + self.io: gpio = gpio + self.direction_map = {PinDirection.INPUT: gpio.INPUT, PinDirection.OUTPUT: gpio.OUTPUT} + + self.pullup_map = { + PinPUD.OFF: gpio.PULLNONE, + PinPUD.UP: gpio.PULLUP, + PinPUD.DOWN: gpio.PULLDOWN, + } + gpio.init() + + def setup_pin( + self, + pin: PinType, + direction: PinDirection, + pullup: PinPUD, + pin_config: ConfigType, + initial: Optional[str] = None, + ) -> None: + direction = self.direction_map[direction] + + if pullup is None: + pullup = self.pullup_map[PinPUD.OFF] + else: + pullup = self.pullup_map[pullup] + + initial_state = {None: -1, "low": 0, "high": 1}[initial] + #self.io.setup(pin, direction, pull_up_down=pullup, initial=initial) + self.io.setcfg(pin, direction) + self.io.pullup(pin, pullup) + if direction==self.io.OUTPUT: + self.io.output(pin, initial_state) + + def set_pin(self, pin: PinType, value: bool) -> None: + self.io.output(pin, value) + + def get_pin(self, pin: PinType) -> bool: + return bool(self.io.input(pin)) + + def cleanup(self) -> None: + pass diff --git a/mqtt_io/modules/sensor/ads1x15.py b/mqtt_io/modules/sensor/ads1x15.py index 17670a0a..f710476f 100644 --- a/mqtt_io/modules/sensor/ads1x15.py +++ b/mqtt_io/modules/sensor/ads1x15.py @@ -1,6 +1,8 @@ """ ADS1x15 analog to digital converters """ +import threading + from typing import cast from ...types import CerberusSchemaType, ConfigType, SensorValueType @@ -12,20 +14,15 @@ REQUIREMENTS = ("adafruit-circuitpython-ads1x15",) CONFIG_SCHEMA: CerberusSchemaType = { - "chip_addr": dict(type="integer", required=False, empty=False, default=0x48), - "type": dict( - type="string", - required=True, - empty=False, - allowed=SENSOR_TYPES, - ), - "pins": dict(type="list", required=True, empty=False, allowed=[0, 1, 2, 3]), - "gain": dict( - required=False, - empty=False, - allowed=[0.6666666666666666, 1, 2, 4, 8, 16], - default=1, - ), + "chip_addr": {"type": 'integer', "required": False, "empty": False, "default": 0x48}, + "type": {"type": 'string', "required": True, "empty": False, "allowed": SENSOR_TYPES}, + "pins": {"type": 'list', "required": True, "empty": False, "allowed": [0, 1, 2, 3]}, + "gain": { + "required": False, + "empty": False, + "allowed": [0.6666666666666666, 1, 2, 4, 8, 16], + "default": 1 + }, } @@ -35,20 +32,20 @@ class Sensor(GenericSensor): """ SENSOR_SCHEMA: CerberusSchemaType = { - "type": dict( - type="string", - required=False, - empty=False, - allowed=["value", "voltage"], - default="value", - ), - "pin": dict( - type="integer", - required=True, - empty=False, - allowed=[0, 1, 2, 3], - default=0, - ), + "type": { + "type": 'string', + "required": False, + "empty": False, + "allowed": ['value', 'voltage'], + "default": 'value', + }, + "pin": { + "type": 'integer', + "required": True, + "empty": False, + "allowed": [0, 1, 2, 3], + "default": 0, + }, } def setup_module(self) -> None: @@ -78,15 +75,21 @@ def setup_module(self) -> None: # Create single-ended input for each pin in config self.channels = {pin: AnalogIn(self.ads, pin) for pin in self.config["pins"]} + # initialize mutex lock + self.lock = threading.Lock() + def get_value(self, sens_conf: ConfigType) -> SensorValueType: """ Get the value or voltage from the sensor """ - sens_type = sens_conf["type"] - data = dict( - value=self.channels[sens_conf["pin"]].value, - voltage=self.channels[sens_conf["pin"]].voltage, - ) + # acquire the lock + with self.lock: + sens_type = sens_conf["type"] + data = { + "value": self.channels[sens_conf['pin']].value, + "voltage": self.channels[sens_conf['pin']].voltage, + } + return cast( float, data[sens_type], diff --git a/mqtt_io/modules/sensor/adxl345.py b/mqtt_io/modules/sensor/adxl345.py new file mode 100644 index 00000000..26e64340 --- /dev/null +++ b/mqtt_io/modules/sensor/adxl345.py @@ -0,0 +1,68 @@ +""" +ADXL345 Digital Accelerometer Sensor + +Mandatory: +- chip_addr + +Optional: +- output_g (set True if output in g). default:m*s² + +Output: +- x (in m*s²) +- y (in m*s²) +- z (in m*s²) +""" + +from json import dumps +from typing import cast + +from ...types import CerberusSchemaType, ConfigType, SensorValueType +from . import GenericSensor + +REQUIREMENTS = ("adxl345",) +CONFIG_SCHEMA: CerberusSchemaType = { + "chip_addr": {"type": 'integer', "required": True, "empty": False}, + "output_g": {"type": 'boolean', "required": False, "empty": False}, +} + + +class Sensor(GenericSensor): + """ + Implementation of Sensor class for the ADXL345 sensor. + """ + + SENSOR_SCHEMA: CerberusSchemaType = { + "type": { + "type": 'string', + "required": False, + "empty": False, + "default": 'all', + "allowed": ['all', 'x', 'y', 'z'], + } + } + + def setup_module(self) -> None: + # pylint: disable=import-outside-toplevel,attribute-defined-outside-init + # pylint: disable=import-error,no-member + from adxl345 import ADXL345 # type: ignore + + self.i2c_addr: int = self.config["chip_addr"] + self.adxl345 = ADXL345(self.i2c_addr) + + def get_value(self, sens_conf: ConfigType) -> SensorValueType: + sens_type = sens_conf["type"] + + if "output_g" in self.config and self.config["output_g"]: + all_axes = self.adxl345.get_axes(True) + else: + all_axes = self.adxl345.get_axes() + + return cast( + float, + { + "x": all_axes['x'], + "y": all_axes['y'], + "z": all_axes['z'], + "all_axes": dumps(all_axes), + }[sens_type], + ) diff --git a/mqtt_io/modules/sensor/aht20.py b/mqtt_io/modules/sensor/aht20.py index 1f297c0d..f3385000 100644 --- a/mqtt_io/modules/sensor/aht20.py +++ b/mqtt_io/modules/sensor/aht20.py @@ -17,13 +17,13 @@ class Sensor(GenericSensor): """ SENSOR_SCHEMA = { - "type": dict( - type="string", - required=False, - empty=False, - default="temperature", - allowed=["temperature", "humidity"], - ) + "type": { + "type": 'string', + "required": False, + "empty": False, + "default": 'temperature', + "allowed": ['temperature', 'humidity'], + } } def setup_module(self) -> None: diff --git a/mqtt_io/modules/sensor/bme280.py b/mqtt_io/modules/sensor/bme280.py index 9f70fd43..c3132b10 100644 --- a/mqtt_io/modules/sensor/bme280.py +++ b/mqtt_io/modules/sensor/bme280.py @@ -9,8 +9,8 @@ REQUIREMENTS = ("smbus2", "RPi.bme280") CONFIG_SCHEMA: CerberusSchemaType = { - "i2c_bus_num": dict(type="integer", required=True, empty=False), - "chip_addr": dict(type="integer", required=True, empty=False), + "i2c_bus_num": {"type": 'integer', "required": True, "empty": False}, + "chip_addr": {"type": 'integer', "required": True, "empty": False}, } @@ -20,13 +20,13 @@ class Sensor(GenericSensor): """ SENSOR_SCHEMA: CerberusSchemaType = { - "type": dict( - type="string", - required=False, - empty=False, - default="temperature", - allowed=["temperature", "humidity", "pressure"], - ) + "type": { + "type": 'string', + "required": False, + "empty": False, + "default": 'temperature', + "allowed": ['temperature', 'humidity', 'pressure'], + } } def setup_module(self) -> None: @@ -48,9 +48,9 @@ def get_value(self, sens_conf: ConfigType) -> SensorValueType: data = self.bme.sample(self.bus, self.address, self.calib) return cast( float, - dict( - temperature=data.temperature, - humidity=data.humidity, - pressure=data.pressure, - )[sens_type], + { + "temperature": data.temperature, + "humidity": data.humidity, + "pressure": data.pressure, + }[sens_type], ) diff --git a/mqtt_io/modules/sensor/bme680.py b/mqtt_io/modules/sensor/bme680.py index 28c64de0..fe14b5ef 100644 --- a/mqtt_io/modules/sensor/bme680.py +++ b/mqtt_io/modules/sensor/bme680.py @@ -9,8 +9,8 @@ REQUIREMENTS = ("smbus2", "bme680") CONFIG_SCHEMA = { - "i2c_bus_num": dict(type="integer", required=False, empty=False), - "chip_addr": dict(type="integer", required=True, empty=False), + "i2c_bus_num": {"type": 'integer', "required": False, "empty": False}, + "chip_addr": {"type": 'integer', "required": True, "empty": False}, } @@ -20,17 +20,17 @@ class Sensor(GenericSensor): """ SENSOR_SCHEMA: CerberusSchemaType = { - "type": dict( - type="string", - required=False, - default="temperature", - allowed=["temperature", "humidity", "pressure"], - ), - "oversampling": dict( - type="string", - required=False, - allowed=["none", "1x", "2x", "4x", "8x", "16x"], - ), + "type": { + "type": 'string', + "required": False, + "default": 'temperature', + "allowed": ['temperature', 'humidity', 'pressure'], + }, + "oversampling": { + "type": 'string', + "required": False, + "allowed": ['none', '1x', '2x', '4x', '8x', '16x'], + }, } def setup_module(self) -> None: @@ -65,9 +65,9 @@ def get_value(self, sens_conf: ConfigType) -> SensorValueType: return None return cast( float, - dict( - temperature=self.sensor.data.temperature, - humidity=self.sensor.data.humidity, - pressure=self.sensor.data.pressure, - )[sens_type], + { + "temperature": self.sensor.data.temperature, + "humidity": self.sensor.data.humidity, + "pressure": self.sensor.data.pressure, + }[sens_type], ) diff --git a/mqtt_io/modules/sensor/bmp085.py b/mqtt_io/modules/sensor/bmp085.py new file mode 100644 index 00000000..542752dd --- /dev/null +++ b/mqtt_io/modules/sensor/bmp085.py @@ -0,0 +1,51 @@ +""" +BMP085 temperature and pressure sensor +""" + +from typing import cast + +from ...types import CerberusSchemaType, ConfigType, SensorValueType +from . import GenericSensor + +REQUIREMENTS = ("Adafruit_BMP",) +CONFIG_SCHEMA: CerberusSchemaType = { + "chip_addr": {"type": 'integer', "required": True, "empty": False}, +} +DATA_READER = { + "temperature": lambda bmp: bmp.read_temperature(), + "pressure": lambda bmp: bmp.read_pressure(), + "altitude": lambda bmp: bmp.read_altitude(), +} + + +class Sensor(GenericSensor): + """ + Implementation of Sensor class for the BME280 sensor. + """ + + SENSOR_SCHEMA: CerberusSchemaType = { + "type": { + "type": 'string', + "required": False, + "empty": False, + "default": 'temperature', + "allowed": ['temperature', 'pressure', 'altitude'], + } + } + + def setup_module(self) -> None: + # pylint: disable=import-outside-toplevel,attribute-defined-outside-init + # pylint: disable=import-error,no-member + from Adafruit_BMP.BMP085 import BMP085 # type: ignore + + self.address: int = self.config["chip_addr"] + self.bmp = BMP085(address=self.address) + + def get_value(self, sens_conf: ConfigType) -> SensorValueType: + """ + Get the temperature, humidity or pressure value from the sensor + """ + sens_type = sens_conf["type"] + #error: Call to untyped function (unknown) in typed context + data = DATA_READER[sens_type](self.bmp) # type: ignore + return cast(float, data) diff --git a/mqtt_io/modules/sensor/dht22.py b/mqtt_io/modules/sensor/dht22.py index 73d1d985..991a3f64 100644 --- a/mqtt_io/modules/sensor/dht22.py +++ b/mqtt_io/modules/sensor/dht22.py @@ -6,16 +6,16 @@ from ...types import CerberusSchemaType, ConfigType, PinType, SensorValueType from . import GenericSensor -REQUIREMENTS = ("Adafruit_DHT",) +REQUIREMENTS = ("adafruit-circuitpython-dht",) ALLOWED_TYPES = ["dht11", "dht22", "am2302"] CONFIG_SCHEMA: CerberusSchemaType = { - "pin": dict(type="integer", required=True, empty=False), - "type": dict( - type="string", - required=True, - empty=False, - allowed=ALLOWED_TYPES + [x.upper() for x in ALLOWED_TYPES], - ), + "pin": {"type": 'integer', "required": True, "empty": False}, + "type": { + "type": 'string', + "required": True, + "empty": False, + "allowed": ALLOWED_TYPES + [x.upper() for x in ALLOWED_TYPES], + }, } @@ -25,33 +25,32 @@ class Sensor(GenericSensor): """ SENSOR_SCHEMA: CerberusSchemaType = { - "type": dict( - type="string", - required=False, - empty=False, - default="temperature", - allowed=["temperature", "humidity"], - ) + "type": { + "type": 'string', + "required": False, + "empty": False, + "default": 'temperature', + "allowed": ['temperature', 'humidity'], + } } def setup_module(self) -> None: # pylint: disable=import-outside-toplevel,import-error - import Adafruit_DHT as DHTsensor # type: ignore + import adafruit_dht # type: ignore + from microcontroller import Pin # type: ignore sensor_type: str = self.config["type"].lower() - self.sensor_type: int if sensor_type == "dht22": - self.sensor_type = DHTsensor.DHT22 + self.sensor = adafruit_dht.DHT22 elif sensor_type == "dht11": - self.sensor_type = DHTsensor.DHT11 + self.sensor = adafruit_dht.DHT11 elif sensor_type == "am2302": - self.sensor_type = DHTsensor.AM2302 + self.sensor = adafruit_dht.DHT21 else: - raise RuntimeConfigError("Supported sensor types: DHT22, DHT11, AM2302") + raise RuntimeConfigError("Supported sensor types: DHT22, DHT11 and AM2302/DHT21") - self.pin: PinType = self.config["pin"] - self.sensor = DHTsensor + self.pin: PinType = Pin(self.config["pin"]) def get_value(self, sens_conf: ConfigType) -> SensorValueType: """ @@ -59,7 +58,8 @@ def get_value(self, sens_conf: ConfigType) -> SensorValueType: """ humidity: SensorValueType temperature: SensorValueType - humidity, temperature = self.sensor.read_retry(self.sensor_type, self.pin) + dht_device = self.sensor(self.pin, use_pulseio=False) + humidity, temperature = dht_device.humidity, dht_device.temperature if sens_conf["type"] == "temperature": return temperature if sens_conf["type"] == "humidity": diff --git a/mqtt_io/modules/sensor/ds18b.py b/mqtt_io/modules/sensor/ds18b.py index 6acda04c..48830938 100644 --- a/mqtt_io/modules/sensor/ds18b.py +++ b/mqtt_io/modules/sensor/ds18b.py @@ -10,13 +10,17 @@ REQUIREMENTS = ("w1thermsensor>=2.0.0",) ALLOWED_TYPES = ["DS18S20", "DS1822", "DS18B20", "DS1825", "DS28EA00", "MAX31850K"] CONFIG_SCHEMA = { - "address": dict(type="string", required=True, empty=False), - "type": dict( - type="string", - required=True, - empty=False, - allowed=ALLOWED_TYPES + [x.lower() for x in ALLOWED_TYPES], - ), + "address": { + "type": 'string', + "required": True, + "empty": False + }, + "type": { + "type": 'string', + "required": True, + "empty": False, + "allowed": ALLOWED_TYPES + [x.lower() for x in ALLOWED_TYPES], + }, } diff --git a/mqtt_io/modules/sensor/ens160.py b/mqtt_io/modules/sensor/ens160.py new file mode 100644 index 00000000..b8da64a3 --- /dev/null +++ b/mqtt_io/modules/sensor/ens160.py @@ -0,0 +1,128 @@ +""" +ENS160 Air Quality Sensor + +sensor_modules: + - name: ens160 + module: ens160 + chip_addr: 0x53 + temperature_compensation: 25 + humidity_compensation: 50 + +sensor_inputs: + - name: air_quality + module: ens160 + interval: 10 + digits: 0 + type: aqi + + - name: volatile_organic_compounds + module: ens160 + interval: 10 + digits: 0 + type: tvoc + + - name: eco2 + module: ens160 + interval: 10 + digits: 0 + type: eco2 +""" + +from typing import cast + +from ...types import CerberusSchemaType, ConfigType +from . import GenericSensor + +DEFAULT_CHIP_ADDR = 0x53 +DEFAULT_TEMPERATURE_COMPENSATION = 25 +DEFAULT_HUMIDITY_COMPENSATION = 50 + + +REQUIREMENTS = ("adafruit-circuitpython-ens160",) +CONFIG_SCHEMA: CerberusSchemaType = { + "chip_addr": { + "type": 'integer', + "required": False, + "empty": False, + "default": 'DEFAULT_CHIP_ADDR', + }, + "temperature_compensation": { + "type": 'float', + "required": False, + "empty": False, + "default": 'DEFAULT_TEMPERATURE_COMPENSATION', + }, + "humidity_compensation": { + "type": 'float', + "required": False, + "empty": False, + "default": 'DEFAULT_HUMIDITY_COMPENSATION', + }, +} + + +class Sensor(GenericSensor): + """ + Implementation of Sensor class for the ENS160 sensor using adafruit-circuitpython-ens160. + + Mesures: + AQI: The air quality index calculated on the basis of UBA + Return value: 1-Excellent, 2-Good, 3-Moderate, 4-Poor, 5-Unhealthy + + TVOC: Total Volatile Organic Compounds concentration + Return value range: 0–65000, unit: ppb + + CO2 equivalent concentration calculated according to the detected data of VOCs and hydrogen + Return value range: 400–65000, unit: ppm + + Five levels: Excellent(400 - 600), Good(600 - 800), Moderate(800 - 1000), + Poor(1000 - 1500), Unhealthy(> 1500) + + NB: Need to think about how to handle the ambient_temp and relative_humidity values as + they are currently hard-coded defaults that can be overridden by user configuration. + Ideally these values would be read from a separate temperature/humdity sensor. + """ + + SENSOR_SCHEMA: CerberusSchemaType = { + "type": { + "type": 'string', + "required": False, + "empty": False, + "default": 'aqi', + "allowed": ['aqi', 'tvoc', 'eco2'], + }, + } + + def setup_module(self) -> None: + # pylint: disable=import-outside-toplevel,import-error + import adafruit_ens160 # type: ignore + import board # type: ignore + + self.adafruit_ens160_module = adafruit_ens160 + i2c = board.I2C() # uses board.SCL and board.SDA + self.ens160 = adafruit_ens160.ENS160(i2c, address=self.config["chip_addr"]) + self.ens160.temperature_compensation = self.config["temperature_compensation"] + self.ens160.humidity_compensation = self.config["humidity_compensation"] + + def get_value(self, sens_conf: ConfigType) -> float: + """Return the sensor value in the configured type.""" + + # data_validity values: + # NORMAL_OP - Normal operation, + # WARM_UP - Warm-Up phase, first 3 minutes after power-on. + # START_UP - Initial Start-Up phase, first full hour of operation after initial power-on. + # Only once in the sensor’s lifetime. + # INVALID_OUT - Invalid output + # note: Note that the status will only be stored in the non-volatile memory after an initial + # 24h of continuous operation. If unpowered before conclusion of said period, the + # ENS160 will resume "Initial Start-up" mode after re-powering. + if self.ens160.data_validity == self.adafruit_ens160_module.INVALID_OUT: + raise RuntimeError("ENS160 sensor is returning invalid output") + + sens_type = sens_conf["type"] + return cast( + int, + {"aqi": self.ens160.AQI, "tvoc": self.ens160.TVOC, "eco2": self.ens160.eCO2}[ + sens_type + ], + ) diff --git a/mqtt_io/modules/sensor/flowsensor.py b/mqtt_io/modules/sensor/flowsensor.py new file mode 100644 index 00000000..b0eaf2b6 --- /dev/null +++ b/mqtt_io/modules/sensor/flowsensor.py @@ -0,0 +1,120 @@ +""" + +Flowsensor: Generic Flow Rate Sensor + +Example configuration: + +sensor_modules: + - name: yfs201 + module: flowsensor + +sensor_inputs: + - name: flow_rate1 + module: flowsensor + pin: 0 + digits: 0 + interval: 10 + factor: 0.2 + +Factor can be calculated from Pulse characteristcs (dicumentation): + +From YF-S201 manual: + Pulse Characteristic: F = 7 * Q (L/MIN). + Pulse frequency (Hz) / 7.0 = flow rate in L/min + ==> Factor = 7.0 + +From YF-DN50 manual: + Pulse Characteristic: F = 0.2 * Q (L/MIN). + Pulse frequency (Hz) / 0.2 = flow rate in L/min + ==> Factor = 0.2 + +If you use "factor = 1", the sensor module returns the frequency in Hertz (Hz). + +""" + +from typing import Dict +from ...types import CerberusSchemaType, ConfigType, SensorValueType +from . import GenericSensor + +REQUIREMENTS = ("gpiozero",) + + +class FLOWSENSOR: + """ + Flowsensor Flow Rate Sensor class + Multiple instances support multiple sensors on different pins + """ + + def __init__(self, gpiozero, name: str, pin: int) -> None: # type: ignore[no-untyped-def] + self.name = name + self.pin = gpiozero.DigitalInputDevice(pin) + self.pin.when_activated = self.count_pulse + self.count = 0 + + def count_pulse(self) -> None: + """Increment pulse count.""" + self.count += 1 + + def reset_count(self) -> None: + """Reset pulse count.""" + self.count = 0 + + def flow_rate(self, sample_window: int, factor: float) -> float: + """Return flow rate in liters per minute. + + From YF-S201 manual: + Pluse Characteristic:F=7Q(L/MIN). + 2L/MIN=16HZ 4L/MIN=32.5HZ 6L/MIN=49.3HZ 8L/MIN=65.5HZ 10L/MIN=82HZ + + Pulse frequency (Hz) / 7.0 = flow rate in L/min + + sample_window is in seconds, so hz is pulse_count / sample_window + """ + hertz = self.count / sample_window + return hertz / factor + + def get_value(self, interval: int, factor: float) -> float: + """Return flow rate in L/min over interval seconds and reset count.""" + flow_rate = self.flow_rate(interval,factor) + self.reset_count() + return flow_rate + + +class Sensor(GenericSensor): + """ + Flowsensor: Flow Rate Sensor + """ + + SENSOR_SCHEMA: CerberusSchemaType = { + "pin": { + "type": 'integer', + "required": True, + "empty": False, + }, + "interval": { + "type": 'integer', + "required": True, + "empty": False, + }, + "factor": { + "type": 'float', + "required": True, + "empty": False, + } + } + + def setup_module(self) -> None: + # pylint: disable=import-outside-toplevel,import-error + import gpiozero # type: ignore + + self.gpiozero = gpiozero + self.sensors: Dict[str, FLOWSENSOR] = {} + + def setup_sensor(self, sens_conf: ConfigType) -> None: + sensor = FLOWSENSOR( + gpiozero=self.gpiozero, name=sens_conf["name"], pin=sens_conf["pin"] + ) + self.sensors[sensor.name] = sensor + + def get_value(self, sens_conf: ConfigType) -> SensorValueType: + return self.sensors[sens_conf["name"]].get_value(sens_conf["interval"],sens_conf["factor"]) diff --git a/mqtt_io/modules/sensor/ina219.py b/mqtt_io/modules/sensor/ina219.py index cf02c5d8..b97afda8 100644 --- a/mqtt_io/modules/sensor/ina219.py +++ b/mqtt_io/modules/sensor/ina219.py @@ -11,6 +11,7 @@ # - gain (40, 80, 160 or 320) -> maximum shunt voltage (milli volt) # Optional: # - low_power: send ina219 to sleep between readings +# - i2c_bus_num: if auto detection fails - like on Pi4 # Output: # - power (in watt) @@ -29,21 +30,26 @@ REQUIREMENTS = ("pi-ina219",) CONFIG_SCHEMA: CerberusSchemaType = { - "chip_addr": dict(type="integer", required=True), - "shunt_ohms": dict(type="float", required=False, empty=False, default=100), - "max_amps": dict(type="float", required=False, empty=False), - "voltage_range": dict( - type="integer", required=False, empty=False, allowed=[16, 32], default=32 - ), - "gain": dict( - type="string", - required=False, - empty=False, - coerce=lambda x: str(x).upper(), - default="AUTO", - allowed=["AUTO", "1_40MV", "2_80MV", "4_160MV", "8_320MV"], - ), - "low_power": dict(type="boolean", required=False, default=False), + "chip_addr": {"type": 'integer', "required": True}, + "i2c_bus_num": {"type": 'integer', "required": False, "default": 1}, + "shunt_ohms": {"type": 'float', "required": False, "empty": False, "default": 0.1}, + "max_amps": {"type": 'float', "required": False, "empty": False}, + "voltage_range": { + "type": 'integer', + "required": False, + "empty": False, + "allowed": [16, 32], + "default": 32 + }, + "gain": { + "type": 'string', + "required": False, + "empty": False, + "coerce": lambda x: str(x).upper(), + "default": 'AUTO', + "allowed": ['AUTO', '1_40MV', '2_80MV', '4_160MV', '8_320MV'], + }, + "low_power": {"type": 'boolean', "required": False, "default": False}, } @@ -53,13 +59,13 @@ class Sensor(GenericSensor): """ SENSOR_SCHEMA: CerberusSchemaType = { - "type": dict( - type="string", - required=False, - empty=False, - default="power", - allowed=["power", "current", "bus_voltage", "shunt_voltage"], - ) + "type": { + "type": 'string', + "required": False, + "empty": False, + "default": 'power', + "allowed": ['power', 'current', 'bus_voltage', 'shunt_voltage'], + } } def setup_module(self) -> None: @@ -71,6 +77,7 @@ def setup_module(self) -> None: self.config["shunt_ohms"], max_expected_amps=self.config.get("max_amps"), address=self.config["chip_addr"], + busnum=self.config["i2c_bus_num"], ) ## Configure ina sensor with range and gain from config or default diff --git a/mqtt_io/modules/sensor/mcp3008.py b/mqtt_io/modules/sensor/mcp3008.py index df38d7c1..1e93fe6f 100644 --- a/mqtt_io/modules/sensor/mcp3008.py +++ b/mqtt_io/modules/sensor/mcp3008.py @@ -12,9 +12,9 @@ REQUIREMENTS = ("adafruit-mcp3008",) CONFIG_SCHEMA = { - "spi_port": dict(type="integer", required=False, empty=False, default=0), - "spi_device": dict(type="integer", required=False, empty=False, default=0), - "chip_addr": dict(type="integer", required=False, empty=False, default=0), + "spi_port": {"type": 'integer', "required": False, "empty": False, "default": 0}, + "spi_device": {"type": 'integer', "required": False, "empty": False, "default": 0}, + "chip_addr": {"type": 'integer', "required": False, "empty": False, "default": 0}, } _LOG = logging.getLogger(__name__) @@ -26,12 +26,7 @@ class Sensor(GenericSensor): """ SENSOR_SCHEMA = { - "channel": dict( - type="integer", - required=True, - min=0, - max=7, - ) + "channel": {"type": 'integer', "required": True, "min": 0, "max": 7} } def setup_module(self) -> None: diff --git a/mqtt_io/modules/sensor/mcp3xxx.py b/mqtt_io/modules/sensor/mcp3xxx.py index 1c0d53e8..283892ac 100644 --- a/mqtt_io/modules/sensor/mcp3xxx.py +++ b/mqtt_io/modules/sensor/mcp3xxx.py @@ -22,17 +22,17 @@ "MCP3308", ] CONFIG_SCHEMA = { - "spi_port": dict(type="integer", required=False, empty=False, default=0), - "spi_device": dict(type="integer", required=False, empty=False, default=0), - "type": dict( - type="string", - required=True, - empty=False, - allowed=ALLOWED_TYPES + [x.lower() for x in ALLOWED_TYPES], - ), - "channel": dict(type="integer", required=False, empty=False, default=0), - "differential": dict(type="boolean", required=False, empty=False, default=False), - "max_voltage": dict(type="float", required=False, empty=False, default=3.3), + "spi_port": {"type": 'integer', "required": False, "empty": False, "default": 0}, + "spi_device": {"type": 'integer', "required": False, "empty": False, "default": 0}, + "type": { + "type": 'string', + "required": True, + "empty": False, + "allowed": ALLOWED_TYPES + [x.lower() for x in ALLOWED_TYPES], + }, + "channel": {"type": 'integer', "required": False, "empty": False, "default": 0}, + "differential": {"type": 'boolean', "required": False, "empty": False, "default": False}, + "max_voltage": {"type": 'float', "required": False, "empty": False, "default": 3.3}, } diff --git a/mqtt_io/modules/sensor/mhz19.py b/mqtt_io/modules/sensor/mhz19.py new file mode 100644 index 00000000..944970d6 --- /dev/null +++ b/mqtt_io/modules/sensor/mhz19.py @@ -0,0 +1,63 @@ +""" +MH-Z19 NDIR CO2 sensor +""" + +from mqtt_io.modules.sensor import GenericSensor +from mqtt_io.types import CerberusSchemaType, ConfigType, SensorValueType + +REQUIREMENTS = ("pyserial",) +CONFIG_SCHEMA: CerberusSchemaType = { + "device": {"type": 'string', "required": True, "empty": False}, + "range": {"type": 'integer', "required": False, "empty": False, "default": 5000, + "allowed": [2000, 5000, 10000]}, +} + +class Sensor(GenericSensor): + """ + Implementation of Sensor class for the MH-Z19 CO2 sensor using UART/serial. + """ + + @staticmethod + def _calc_checksum(data: bytes) -> bytes: + value = sum(data[1:]) % 0x100 + value = (0xff - value + 1) % 0x100 + return value.to_bytes(1, "big") + + @staticmethod + def _check_checksum(data: bytes) -> bool: + return Sensor._calc_checksum(data[:-1]) == data[-1:] + + @staticmethod + def _add_checksum(data: bytes) -> bytes: + return data + Sensor._calc_checksum(data) + + def setup_module(self) -> None: + # pylint: disable=import-error,import-outside-toplevel + import serial # type: ignore + + self.ser = serial.Serial( + port=self.config["device"], + baudrate=9600, + bytesize=serial.EIGHTBITS, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE, + ) + + # setup detection range + cmd = Sensor._add_checksum(b"\xff\x01\x99\x00\x00\x00" + + self.config["range"].to_bytes(2, "big")) + self.ser.write(cmd) + # no response + + def cleanup(self) -> None: + self.ser.close() + + def get_value(self, sens_conf: ConfigType) -> SensorValueType: + self.ser.write(Sensor._add_checksum(b"\xff\x01\x86\x00\x00\x00\x00\x00")) + resp = self.ser.read(9) + + if len(resp) == 9: + if resp[0:2] == b"\xff\x86" and Sensor._check_checksum(resp): + return int.from_bytes(resp[2:4], "big") + + return None diff --git a/mqtt_io/modules/sensor/mock.py b/mqtt_io/modules/sensor/mock.py index 4834c940..fb2b8677 100644 --- a/mqtt_io/modules/sensor/mock.py +++ b/mqtt_io/modules/sensor/mock.py @@ -8,7 +8,7 @@ from . import GenericSensor REQUIREMENTS = () -CONFIG_SCHEMA = dict(test=dict(type="boolean", required=False, default=False)) +CONFIG_SCHEMA = {"test": {"type": 'boolean', "required": False, "default": False}} # pylint: disable=useless-super-delegation diff --git a/mqtt_io/modules/sensor/pms5003.py b/mqtt_io/modules/sensor/pms5003.py new file mode 100644 index 00000000..506d05a2 --- /dev/null +++ b/mqtt_io/modules/sensor/pms5003.py @@ -0,0 +1,77 @@ +""" +PMS5003 Particulate Matter Sensor +""" + +import time +from typing import cast +from ...types import CerberusSchemaType, ConfigType, SensorValueType +from . import GenericSensor + +REQUIREMENTS = ("plantower",) +CONFIG_SCHEMA: CerberusSchemaType = { + "serial_port": {"type": 'string', "required": True, "empty": False}, +} + + +class Sensor(GenericSensor): + """ + Implementation of Sensor class for the PMS5003 sensor. + """ + + SENSOR_SCHEMA: CerberusSchemaType = { + "type": { + "type": 'string', + "required": False, + "empty": False, + "default": 'pm25_std', + "allowed": + ['pm10_cf1', 'pm25_cf1','pm100_cf1', + 'pm10_std','pm25_std','pm100_std', + 'gr03um','gr05um','gr10um', + 'gr25um','gr50um','gr100um'], + } + } + + def setup_module(self) -> None: + # pylint: disable=import-outside-toplevel,import-error + import plantower # type: ignore + + self.serial_port = self.config["serial_port"] + self.sensor = plantower.Plantower(port=self.serial_port) + self.sensor.mode_change(plantower.PMS_PASSIVE_MODE) + self.sensor.set_to_wakeup() + time.sleep(30) #give fan time to stabilize readings + + def get_value(self, sens_conf: ConfigType) -> SensorValueType: + """ + Get the particulate data from the sensor + """ + #turn sensor off if interval between readings is >= 2 minutes + sleep_sensor = sens_conf["interval"] >= 120 + if sleep_sensor: + self.sensor.set_to_wakeup() + time.sleep(30) + sens_type = sens_conf["type"] + result = self.sensor.read() + if sleep_sensor: + self.sensor.set_to_sleep() + return cast( + int, + { + "pm10_cf1": result.pm10_cf1, + "pm25_cf1": result.pm25_cf1, + "pm100_cf1": result.pm100_cf1, + "pm10_std": result.pm10_std, + "pm25_std": result.pm25_std, + "pm100_std": result.pm100_std, + "gr03um": result.gr03um, + "gr05um": result.gr05um, + "gr10um": result.gr10um, + "gr25um": result.gr25um, + "gr50um": result.gr50um, + "gr100u": result.gr100um + }[sens_type], + ) + + def cleanup(self) -> None: + self.sensor.set_to_sleep() diff --git a/mqtt_io/modules/sensor/sht4x.py b/mqtt_io/modules/sensor/sht4x.py new file mode 100644 index 00000000..f723a0d9 --- /dev/null +++ b/mqtt_io/modules/sensor/sht4x.py @@ -0,0 +1,57 @@ +""" +SHT4x temperature and humidity sensor +""" + +from typing import cast + +from ...types import ConfigType, SensorValueType +from . import GenericSensor +from ...exceptions import RuntimeConfigError + +REQUIREMENTS = ("adafruit-circuitpython-sht4x",) + + +class Sensor(GenericSensor): + """ + Implementation of Sensor class for sht4x. + """ + + SENSOR_SCHEMA = { + "type": { + "type": 'string', + "required": False, + "empty": False, + "default": 'temperature', + "allowed": ['temperature', 'humidity'], + } + } + + def setup_module(self) -> None: + # pylint: disable=import-outside-toplevel,import-error + import adafruit_sht4x # type: ignore + import board # type: ignore + import busio # type: ignore + + i2c = busio.I2C(board.SCL, board.SDA) + self.sensor = adafruit_sht4x.SHT4x(i2c) + + @property + def _temperature(self) -> SensorValueType: + return cast(SensorValueType, self.sensor.temperature) + + @property + def _humidity(self) -> SensorValueType: + return cast(SensorValueType, self.sensor.relative_humidity) + + def get_value(self, sens_conf: ConfigType) -> SensorValueType: + """ + Get the temperature value from the sensor + """ + if sens_conf["type"] == "temperature": + return self._temperature + if sens_conf["type"] == "humidity": + return self._humidity + raise RuntimeConfigError( + "sht4x sensor '%s' was not configured to return 'temperature' or 'humidity'" + % sens_conf["name"] + ) diff --git a/mqtt_io/modules/sensor/yfs201.py b/mqtt_io/modules/sensor/yfs201.py new file mode 100644 index 00000000..39534c66 --- /dev/null +++ b/mqtt_io/modules/sensor/yfs201.py @@ -0,0 +1,94 @@ +""" + +YF-S201 Flow Rate Sensor + +Example configuration: + +sensor_modules: + - name: yfs201 + module: yfs201 + +sensor_inputs: + - name: flow_rate1 + module: yfs201 + pin: 0 + digits: 0 + interval: 10 +""" + +from typing import Dict +from ...types import CerberusSchemaType, ConfigType, SensorValueType +from . import GenericSensor + +REQUIREMENTS = ("gpiozero",) + + +class YFS201: + """ + YF-S201 Flow Rate Sensor class + Multiple instances support multiple sensors on different pins + """ + + def __init__(self, gpiozero, name: str, pin: int) -> None: # type: ignore[no-untyped-def] + self.name = name + self.pin = gpiozero.DigitalInputDevice(pin) + self.pin.when_activated = self.count_pulse + self.count = 0 + + def count_pulse(self) -> None: + """Increment pulse count.""" + self.count += 1 + + def reset_count(self) -> None: + """Reset pulse count.""" + self.count = 0 + + def flow_rate(self, sample_window: int) -> float: + """Return flow rate in liters per minute. + + From YF-S201 manual: + Pluse Characteristic:F=7Q(L/MIN). + 2L/MIN=16HZ 4L/MIN=32.5HZ 6L/MIN=49.3HZ 8L/MIN=65.5HZ 10L/MIN=82HZ + + Pulse frequency (Hz) / 7.0 = flow rate in L/min + + sample_window is in seconds, so hz is pulse_count / sample_window + """ + hertz = self.count / sample_window + return hertz / 7.0 + + def get_value(self, interval: int) -> float: + """Return flow rate in L/min over interval seconds and reset count.""" + flow_rate = self.flow_rate(interval) + self.reset_count() + return flow_rate + + +class Sensor(GenericSensor): + """ + YF-S201 Flow Rate Sensor + """ + + SENSOR_SCHEMA: CerberusSchemaType = { + "pin": { + "type": 'integer', + "required": True, + "empty": False, + } + } + + def setup_module(self) -> None: + # pylint: disable=import-outside-toplevel,import-error + import gpiozero # type: ignore + + self.gpiozero = gpiozero + self.sensors: Dict[str, YFS201] = {} + + def setup_sensor(self, sens_conf: ConfigType) -> None: + sensor = YFS201( + gpiozero=self.gpiozero, name=sens_conf["name"], pin=sens_conf["pin"] + ) + self.sensors[sensor.name] = sensor + + def get_value(self, sens_conf: ConfigType) -> SensorValueType: + return self.sensors[sens_conf["name"]].get_value(sens_conf["interval"]) diff --git a/mqtt_io/modules/stream/serial.py b/mqtt_io/modules/stream/serial.py index a800eebf..fe301e4c 100644 --- a/mqtt_io/modules/stream/serial.py +++ b/mqtt_io/modules/stream/serial.py @@ -79,13 +79,13 @@ def setup_module(self) -> None: 7: serial.SEVENBITS, 8: serial.EIGHTBITS, }[self.config["bytesize"]], - parity=dict( - none=serial.PARITY_NONE, - odd=serial.PARITY_ODD, - even=serial.PARITY_EVEN, - mark=serial.PARITY_MARK, - space=serial.PARITY_SPACE, - )[self.config["parity"]], + parity= { + "none": serial.PARITY_NONE, + "odd": serial.PARITY_ODD, + "even": serial.PARITY_EVEN, + "mark": serial.PARITY_MARK, + "space": serial.PARITY_SPACE, + }[self.config["parity"]], stopbits={ 1: serial.STOPBITS_ONE, 1.5: serial.STOPBITS_ONE_POINT_FIVE, diff --git a/mqtt_io/mqtt/asyncio_mqtt.py b/mqtt_io/mqtt/aiomqtt.py similarity index 52% rename from mqtt_io/mqtt/asyncio_mqtt.py rename to mqtt_io/mqtt/aiomqtt.py index 47f406e6..852b4196 100644 --- a/mqtt_io/mqtt/asyncio_mqtt.py +++ b/mqtt_io/mqtt/aiomqtt.py @@ -8,7 +8,7 @@ from functools import wraps from typing import Any, Callable, List, Optional, Tuple, TypeVar, cast -from asyncio_mqtt.client import Client, MqttError, Will # type: ignore +from aiomqtt.client import Client, MqttError, Will, ProtocolVersion # type: ignore from paho.mqtt import client as paho # type: ignore from . import ( @@ -26,8 +26,27 @@ def _map_exception(func: Func) -> Func: + """ + Creates a decorator that wraps a function and maps any raised `MqttError` + exception to a `MQTTException`. + + :param func: The function to be wrapped. + :type func: Func + :return: The wrapped function. + :rtype: Func + """ @wraps(func) async def inner(*args: Any, **kwargs: Any) -> Any: + """ + Decorator for asynchronous functions that catches `MqttError` exceptions + and raises `MQTTException` instead. + + Parameters: + func (Callable): The function to be decorated. + + Returns: + Callable: The decorated function. + """ try: await func(*args, **kwargs) except MqttError as exc: @@ -42,11 +61,20 @@ class MQTTClient(AbstractMQTTClient): """ def __init__(self, options: MQTTClientOptions): + """ + Initializes a new instance of the MQTTClient class. + + Args: + options (MQTTClientOptions): The options for the MQTT client. + + Returns: + None + """ super().__init__(options) protocol_map = { - MQTTProtocol.V31: paho.MQTTv31, - MQTTProtocol.V311: paho.MQTTv311, - MQTTProtocol.V5: paho.MQTTv5, + MQTTProtocol.V31: ProtocolVersion.V31, + MQTTProtocol.V311: ProtocolVersion.V311, + MQTTProtocol.V5: ProtocolVersion.V5, } will = None if options.will is not None: @@ -65,8 +93,8 @@ def __init__(self, options: MQTTClientOptions): port=options.port, username=options.username, password=options.password, - client_id=options.client_id, - # keepalive=options.keepalive, # This isn't implemented yet on 0.8.1 + identifier=options.client_id, + #keepalive=options.keepalive, tls_context=tls_context, protocol=protocol_map[options.protocol], will=will, @@ -76,21 +104,61 @@ def __init__(self, options: MQTTClientOptions): @_map_exception async def connect(self, timeout: int = 10) -> None: - await self._client.connect(timeout=timeout) + """ + Connects to the client asynchronously. + + Args: + timeout (int): The timeout value in seconds (default: 10). + + Returns: + None: This function does not return anything. + """ + await self._client.__aenter__() # pylint: disable=unnecessary-dunder-call @_map_exception async def disconnect(self) -> None: - try: - await self._client.disconnect() - except TimeoutError: - await self._client.force_disconnect() + """ + This function is an asynchronous method that handles the disconnection of the client. + + Parameters: + self: The current instance of the class. + + Returns: + None + """ + await self._client.__aexit__(None, None, None) @_map_exception async def subscribe(self, topics: List[Tuple[str, int]]) -> None: + """ + Subscribe to the given list of topics. + + Args: + topics (List[Tuple[str, int]]): A list of tuples representing the topics + to subscribe to. + Each tuple should contain a string representing the topic name and + an integer representing the QoS level. + + Returns: + None: This function does not return anything. + + Raises: + Exception: If there is an error while subscribing to the topics. + + """ await self._client.subscribe(topics) @_map_exception async def publish(self, msg: MQTTMessageSend) -> None: + """ + Publishes an MQTT message to the specified topic. + + Args: + msg (MQTTMessageSend): The MQTT message to be published. + + Returns: + None: This function does not return anything. + """ await self._client.publish( topic=msg.topic, payload=msg.payload, qos=msg.qos, retain=msg.retain ) @@ -98,6 +166,17 @@ async def publish(self, msg: MQTTMessageSend) -> None: def _on_message( self, client: paho.Client, userdata: Any, msg: paho.MQTTMessage ) -> None: + """ + Callback function that is called when a message is received through MQTT. + + Args: + client (paho.Client): The MQTT client instance. + userdata (Any): The user data associated with the client. + msg (paho.MQTTMessage): The received MQTT message. + + Returns: + None: This function does not return anything. + """ if self._message_queue is None: _LOG.warning("Discarding MQTT message because queue is not initialised") return @@ -111,6 +190,12 @@ def _on_message( @property def message_queue(self) -> "asyncio.Queue[MQTTMessage]": + """ + Returns the message queue for receiving MQTT messages. + + :return: The message queue for receiving MQTT messages. + :rtype: asyncio.Queue[MQTTMessage] + """ if self._message_queue is None: self._message_queue = asyncio.Queue(self._options.message_queue_size) # pylint: disable=protected-access diff --git a/mqtt_io/server.py b/mqtt_io/server.py index 23535d5e..1c6cc439 100644 --- a/mqtt_io/server.py +++ b/mqtt_io/server.py @@ -18,6 +18,7 @@ from hashlib import sha1 from importlib import import_module from typing import Any, Dict, List, Optional, Tuple, Type, Union, overload +from aiomqtt.exceptions import MqttCodeError # type: ignore import backoff # type: ignore from typing_extensions import Literal @@ -51,6 +52,8 @@ SensorReadEvent, StreamDataReadEvent, StreamDataSentEvent, + StreamDataSubscribeEvent, + DigitalSubscribeEvent, ) from .home_assistant import ( hass_announce_digital_input, @@ -107,7 +110,7 @@ def _init_module( module_config: Dict[str, Dict[str, Any]], module_type: str, install_requirements: bool ) -> Union[GenericGPIO, GenericSensor, GenericStream]: """ - Initialise a GPIO module by: + Initialise a module by: - Importing it - Validating its config - Installing any missing requirements for it @@ -148,6 +151,17 @@ class MqttIo: # pylint: disable=too-many-instance-attributes def __init__( self, config: Dict[str, Any], loop: Optional[asyncio.AbstractEventLoop] = None ) -> None: + """ + Initializes the class with the given configuration and event loop. + + Parameters: + config (Dict[str, Any]): The configuration for the class. + loop (Optional[asyncio.AbstractEventLoop]): The event loop to use. + If not provided, the default event loop will be used. + + Returns: + None + """ self.config = config self._init_mqtt_config() @@ -198,6 +212,21 @@ async def create_loop_resources() -> None: self.loop.run_until_complete(create_loop_resources()) def _init_mqtt_config(self) -> None: + """ + Initializes the MQTT configuration. + + This function retrieves the MQTT configuration from the application's + main configuration file and performs the necessary setup tasks. + It sets the topic prefix for MQTT messages, replaces the '' placeholder + in the topic prefix with the IP address of the 'wlan0' interface, + generates a client ID if not provided, and configures TLS options if enabled. + + Parameters: + None + + Returns: + None + """ config: ConfigType = self.config["mqtt"] topic_prefix: str = config["topic_prefix"] @@ -290,7 +319,6 @@ async def publish_stream_data_callback(event: StreamDataReadEvent) -> None: self.stream_configs = {x["name"]: x for x in self.config["stream_modules"]} self.stream_modules = {} - sub_topics: List[str] = [] for stream_conf in self.config["stream_modules"]: stream_module = _init_module( stream_conf, "stream", self.config["options"]["install_requirements"] @@ -325,22 +353,27 @@ async def create_stream_output_queue( ) ) - sub_topics.append( - "/".join( - ( - self.config["mqtt"]["topic_prefix"], - STREAM_TOPIC, - stream_conf["name"], - SEND_SUFFIX, + # Subscribe call back funktion: Subscribe to stream send topics + async def subscribe_callback(event: StreamDataSubscribeEvent) -> None: + sub_topics: List[str] = [] + for stream_conf in self.config["stream_modules"]: + sub_topics.append( + "/".join( + ( + self.config["mqtt"]["topic_prefix"], + STREAM_TOPIC, + stream_conf["name"], + SEND_SUFFIX, + ) ) ) - ) - # Subscribe to stream send topics - if sub_topics: - self.mqtt_task_queue.put_nowait( - PriorityCoro(self._mqtt_subscribe(sub_topics), MQTT_SUB_PRIORITY) - ) + if sub_topics: + self.mqtt_task_queue.put_nowait( + PriorityCoro(self._mqtt_subscribe(sub_topics), MQTT_SUB_PRIORITY) + ) + + self.event_bus.subscribe(StreamDataSubscribeEvent, subscribe_callback) def _init_digital_inputs(self) -> None: """ @@ -421,8 +454,34 @@ async def publish_callback(event: DigitalInputChangedEvent) -> None: ) def _init_digital_outputs(self) -> None: + """ + Initializes the digital outputs. + + This function sets up the MQTT publish callback for the output event. + It creates a digital output queue on the right loop for each module. + It subscribes to outputs when MQTT is initialized. + It also fires DigitalOutputChangedEvents for the initial values of + outputs if required, and reads and publishes the actual pin state + if no publish_initial is requested. + + Parameters: + None + + Returns: + None + """ # Set up MQTT publish callback for output event async def publish_callback(event: DigitalOutputChangedEvent) -> None: + """ + Publishes a callback function for the given DigitalOutputChangedEvent. + + Args: + event (DigitalOutputChangedEvent): The event object containing the details + of the digital output change. + + Returns: + None + """ out_conf = self.digital_output_configs[event.output_name] val = out_conf["on_payload"] if event.to_value else out_conf["off_payload"] self.mqtt_task_queue.put_nowait( @@ -478,23 +537,6 @@ async def create_digital_output_queue( ) ) - # Add tasks to subscribe to outputs when MQTT is initialised - topics = [] - for suffix in (SET_SUFFIX, SET_ON_MS_SUFFIX, SET_OFF_MS_SUFFIX): - topics.append( - "/".join( - ( - self.config["mqtt"]["topic_prefix"], - OUTPUT_TOPIC, - out_conf["name"], - suffix, - ) - ) - ) - self.mqtt_task_queue.put_nowait( - PriorityCoro(self._mqtt_subscribe(topics), MQTT_SUB_PRIORITY) - ) - # Fire DigitalOutputChangedEvents for initial values of outputs if required if out_conf["publish_initial"]: self.event_bus.fire( @@ -516,8 +558,47 @@ async def create_digital_output_queue( ) self.event_bus.fire(DigitalOutputChangedEvent(out_conf["name"], value)) + # Subscribe call back funktion: Add tasks to subscribe to outputs when MQTT is initialised + async def subscribe_callback(event: DigitalSubscribeEvent) -> None: + for out_conf in self.config["digital_outputs"]: + topics = [] + for suffix in (SET_SUFFIX, SET_ON_MS_SUFFIX, SET_OFF_MS_SUFFIX): + topics.append( + "/".join( + ( + self.config["mqtt"]["topic_prefix"], + OUTPUT_TOPIC, + out_conf["name"], + suffix, + ) + ) + ) + self.mqtt_task_queue.put_nowait( + PriorityCoro(self._mqtt_subscribe(topics), MQTT_SUB_PRIORITY) + ) + + self.event_bus.subscribe(DigitalSubscribeEvent, subscribe_callback) + def _init_sensor_inputs(self) -> None: + """ + Initializes the sensor inputs for the class. + + Parameters: + None + + Returns: + None + """ async def publish_sensor_callback(event: SensorReadEvent) -> None: + """ + Publishes a sensor callback event to the MQTT broker. + + Args: + event (SensorReadEvent): The event object containing the sensor data. + + Returns: + None + """ sens_conf = self.sensor_input_configs[event.sensor_name] digits: int = sens_conf["digits"] self.mqtt_task_queue.put_nowait( @@ -555,6 +636,19 @@ async def poll_sensor( sensor_module: GenericSensor = sensor_module, sens_conf: ConfigType = sens_conf, ) -> None: + """ + Asynchronously polls a sensor to retrieve its value at regular intervals. + + Args: + sensor_module (Optional[GenericSensor]): The sensor module to use. + Defaults to the sensor_module provided during function call. + sens_conf (Optional[ConfigType]): The configuration for the sensor. + Defaults to the sens_conf provided during function call. + + Returns: + None + + """ @backoff.on_exception( # type: ignore backoff.expo, Exception, max_time=sens_conf["interval"] ) @@ -565,6 +659,17 @@ async def get_sensor_value( sensor_module: GenericSensor = sensor_module, sens_conf: ConfigType = sens_conf, ) -> SensorValueType: + """ + A decorator that applies exponential backoff to the function `get_sensor_value`. + + Parameters: + sensor_module (GenericSensor): The sensor module to use for getting + the sensor value. + sens_conf (ConfigType): The configuration for the sensor. + + Returns: + SensorValueType: The value retrieved from the sensor. + """ return await sensor_module.async_get_value(sens_conf) while True: @@ -587,6 +692,12 @@ async def get_sensor_value( self.transient_tasks.append(self.loop.create_task(poll_sensor())) async def _connect_mqtt(self) -> None: + """ + Connects to the MQTT broker and sets up the necessary configurations. + + Returns: + None + """ config: ConfigType = self.config["mqtt"] topic_prefix: str = config["topic_prefix"] self.mqtt = AbstractMQTTClient.get_implementation(config["client_module"])( @@ -611,6 +722,9 @@ async def _connect_mqtt(self) -> None: ) ) self.mqtt_connected.set() + self.event_bus.fire(StreamDataSubscribeEvent()) + self.event_bus.fire(DigitalSubscribeEvent()) + def _ha_discovery_announce(self) -> None: """ @@ -663,6 +777,20 @@ async def _mqtt_subscribe(self, topics: List[str]) -> None: _LOG.info("Subscribed to topic: %r", topic) async def _mqtt_publish(self, msg: MQTTMessageSend, wait: bool = True) -> None: + """ + Publishes an MQTT message. + + Args: + msg (MQTTMessageSend): The MQTT message to publish. + wait (bool, optional): Whether to wait for MQTT connection before publishing. + Defaults to True. + + Raises: + RuntimeError: If the MQTT client is None. + + Returns: + None + """ if not self.mqtt_connected.is_set(): if wait: _LOG.debug("_mqtt_publish awaiting MQTT connection") @@ -673,9 +801,9 @@ async def _mqtt_publish(self, msg: MQTTMessageSend, wait: bool = True) -> None: if msg.payload is None: _LOG.debug("Publishing MQTT message on topic %r with no payload", msg.topic) - else: + elif isinstance(msg.payload, (bytearray, bytes)): try: - payload_str = msg.payload.decode("utf8") + payload = msg.payload.decode("utf8") except UnicodeDecodeError: _LOG.debug( "Publishing MQTT message on topic %r with non-unicode payload", @@ -683,8 +811,12 @@ async def _mqtt_publish(self, msg: MQTTMessageSend, wait: bool = True) -> None: ) else: _LOG.debug( - "Publishing MQTT message on topic %r: %r", msg.topic, payload_str + "Publishing MQTT message on topic %r: %r", msg.topic, payload ) + else: + _LOG.debug( + "Publishing MQTT message on topic %r: %r", msg.topic, payload + ) await self.mqtt.publish(msg) @@ -1084,6 +1216,27 @@ async def _mqtt_keep_alive_loop(self) -> None: await asyncio.sleep(config["keepalive"]) async def _mqtt_rx_loop(self) -> None: + """ + Asynchronous function that runs a loop to receive MQTT messages. + + The function first checks if the MQTT connection is established. If not, it awaits + the connection before proceeding. Once the connection is established, the function + enters an infinite loop. + + Within the loop, the function checks if the MQTT client is initialized. If not, it + logs an error message and waits for the client to be initialized before proceeding. + If the client is initialized, the function retrieves a message from the MQTT + message queue. + + If the message payload is `None`, the function logs a warning message and continues + to the next message. Otherwise, it decodes the payload as a UTF-8 string and logs + the received message and topic. + + Finally, the function calls the `_handle_mqtt_msg` method to handle the received + message. + + This function does not take any parameters and does not return any value. + """ if not self.mqtt_connected.is_set(): _LOG.debug("_mqtt_rx_loop awaiting MQTT connection") await self.mqtt_connected.wait() @@ -1109,6 +1262,21 @@ async def _mqtt_rx_loop(self) -> None: await self._handle_mqtt_msg(msg.topic, msg.payload) async def _remove_finished_transient_tasks(self) -> None: + """ + Remove any finished transient tasks from the list of transient tasks. + + This function runs in an infinite loop, sleeping for 1 second in each iteration. + It checks for any finished tasks in the list of transient tasks and removes them. + Once the finished tasks are removed, it gathers the results of those tasks and checks + for any exceptions. If an exception is found, it raises the exception and logs an error + message with the task information. + + Parameters: + None + + Returns: + None + """ while True: await asyncio.sleep(1) finished_tasks = [x for x in self.transient_tasks if x.done()] @@ -1198,6 +1366,19 @@ async def stream_output_loop( self.event_bus.fire(StreamDataSentEvent(stream_conf["name"], data)) async def _main_loop(self) -> None: + """ + Asynchronous main loop function. + + This function is responsible for running the main event loop of the MQTT client. + It handles reconnecting to the MQTT broker if the connection is lost and + manages the execution of critical tasks. + + Parameters: + None + + Returns: + None + """ reconnect = True reconnect_delay = self.config["mqtt"]["reconnect_delay"] reconnects_remaining = None @@ -1221,19 +1402,16 @@ async def _main_loop(self) -> None: if self.config["mqtt"].get("ha_discovery", {}).get("enabled"): self._ha_discovery_announce() - try: - await asyncio.gather(*self.critical_tasks) - except asyncio.CancelledError: - break - except Exception: # pylint: disable=broad-except - _LOG.exception("Exception in critical task:") + await asyncio.gather(*self.critical_tasks) except asyncio.CancelledError: break - except MQTTException: + except (MQTTException,MqttCodeError): if reconnects_remaining is not None: reconnect = reconnects_remaining > 0 reconnects_remaining -= 1 _LOG.exception("Connection to MQTT broker failed") + except Exception: # pylint: disable=broad-except + _LOG.exception("Exception in critical task:") finally: _LOG.debug("Clearing events and cancelling 'critical_tasks'") diff --git a/pyproject.toml b/pyproject.toml index db158977..3472db81 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "mqtt-io" -version = "2.2.9" +version = "2.5.1" description = "Expose GPIO modules (Raspberry Pi, Beaglebone, PCF8754, PiFace2 etc.), digital sensors (LM75 etc.) and serial streams to an MQTT server for remote control and monitoring." readme = "README.md" authors = ["Ellis Percival "] @@ -8,14 +8,16 @@ license = "MIT" documentation = "https://flyte.github.io/mqtt-io/" [tool.poetry.dependencies] -python = "^3.6" +python = "^3.8" PyYAML = "^6.0.1" Cerberus = "^1.3.2" -typing-extensions = "^3.7.4" +typing-extensions = "^4.4.0" dataclasses = { version = "^0.8", python = ">=3.6,<3.7" } -asyncio-mqtt = "^0.8.1" +aiomqtt = "^2.1.0" backoff = "^1.10.0" confp = "^0.4.0" +# Fix for poetry/docutils related bug +docutils = "0.18.1" [tool.poetry.dev-dependencies] mock = { version = "^4.0.3", python = ">=3.6,<3.8" } @@ -25,14 +27,15 @@ pylint = "^2.6.2" mypy = "^0.812" behave = "^1.2.6" coverage = "^5.4" -md-toc = "^7.1.0" +md-toc = "^8.0.0" Sphinx = "^3.5.1" sphinx-autoapi = "^1.7.0" recommonmark = "^0.7.1" -Jinja2 = "^2.11.3" +Jinja2 = "^3.1.3" ast-to-xml="^0.2.3" GitPython = "^3.1.15" semver = "^2.13.0" +docutils = "0.18.1" [build-system] requires = ["poetry-core>=1.0.0"] diff --git a/setup.cfg b/setup.cfg index 62ce42d2..ec238e75 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,9 +1,10 @@ [bumpversion] -current_version = 2.2.9 +current_version = 2.5.1 commit = True tag = True tag_name = {new_version} message = Bump version to {new_version} [bumpversion:file:pyproject.toml] + [bumpversion:file:mqtt_io/__init__.py]