Skip to content

Commit

Permalink
Feature: Add device trigger support (#25)
Browse files Browse the repository at this point in the history
Adds support for device triggers

Currently tested with two different variants of `WPH-01`, a double side
and a single sided variant. The change introduces a different concept of
creating an MQTT device, where it returns a list of entities instead of
one entity, as some devices can have multiple entities.
  • Loading branch information
ha-enthus1ast authored Dec 18, 2023
1 parent 507cdbd commit 6d83198
Show file tree
Hide file tree
Showing 11 changed files with 316 additions and 114 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ This project connects bluetooth devices in a plejd mesh to an MQTT broker. Devic

The project currently supports:
- `Plejd Light`
- `Plejd Device Trigger`

Not supported currently:
- `Plejd Switch`
- `Plejd Device Trigger`
- `Plejd Sensor`
- `Plejd Scenes`

Expand Down
1 change: 0 additions & 1 deletion plejd_mqtt_ha/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

def main():
"""Entry point of the application."""

# Load environment variables
log_level = os.getenv("LOG_LEVEL", "ERROR").upper()
config = os.getenv("CONFIG", "/config/settings.yaml")
Expand Down
17 changes: 13 additions & 4 deletions plejd_mqtt_ha/bt_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,21 @@ def _proxy_callback(_, data: bytearray) -> None:
decrypted_data = self._encrypt_decrypt_data(self._crypto_key, encoded_address, data)
# Since it is a mesh one client handles all subscriptions and we need to dispatch to
# the correct device
sender_addr = decrypted_data[0] # Address of the device that sent data in the mesh
command = int.from_bytes(bytes=decrypted_data[3:5], byteorder="big")
if command == constants.PlejdCommand.BLE_CMD_REMOTE_CLICK.value:
sender_addr = decrypted_data[5] # Address of the device that sent data in the mesh
else:
sender_addr = decrypted_data[0]

logging.debug(
f"Recevied data from device address {sender_addr} using command {command}"
)
logging.debug(f"Raw decrypted data received: {decrypted_data.hex()}")

if sender_addr not in self._callbacks:
logging.warning(
f"Received data from device address {sender_addr} but found no registered"
"callback"
f"Received data from device address {sender_addr} using command {command} "
"but found no registered callback"
)
return
self._callbacks[sender_addr](bytearray(decrypted_data)) # Sender callback
Expand Down Expand Up @@ -345,7 +355,6 @@ async def get_plejd_time(self, ble_address: int) -> datetime:

# Check that last_data is long enough
if len(last_data) < 9:
logging.error("Buffer is too short to unpack time")
raise PlejdBluetoothError("Buffer is too short to unpack time")

# Convert from unix timestamp
Expand Down
3 changes: 3 additions & 0 deletions plejd_mqtt_ha/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ class PlejdCommand(int, Enum):
BLE_CMD_TIME_UPDATE = 0x001B
BLE_CMD_REMOTE_CLICK = 0x0016

BLE_CMD_UNKNOWN_1 = 0x0 # TODO: Seems to be sent when button down is pressed
BLE_CMD_UNKNOWN_2 = 0x1011 # TODO: Seems to be sent sometimes(?) when a button is pressed


class PlejdLightAction(str, Enum):
"""BLE payload for possible actions on a light."""
Expand Down
8 changes: 8 additions & 0 deletions plejd_mqtt_ha/mdl/bt_data_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,11 @@ class BTLightData(BTData):
brightness: int
"""Brightness of the Plejd light
"""


class BTDeviceTriggerData(BTData):
"""Parsed data type coming from a plejd light."""

input: int
"""Which input is triggered
"""
60 changes: 55 additions & 5 deletions plejd_mqtt_ha/mdl/bt_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"""

import logging
from typing import Callable, Generic, TypeVar
from typing import Callable, Generic, Optional, TypeVar

from plejd_mqtt_ha import constants
from plejd_mqtt_ha.bt_client import (
Expand All @@ -27,8 +27,12 @@
PlejdNotConnectedError,
PlejdTimeoutError,
)
from plejd_mqtt_ha.mdl.bt_data_type import BTData, BTLightData
from plejd_mqtt_ha.mdl.bt_device_info import BTDeviceInfo, BTLightInfo
from plejd_mqtt_ha.mdl.bt_data_type import BTData, BTDeviceTriggerData, BTLightData
from plejd_mqtt_ha.mdl.bt_device_info import (
BTDeviceInfo,
BTDeviceTriggerInfo,
BTLightInfo,
)

PlejdDeviceTypeT = TypeVar("PlejdDeviceTypeT", bound=BTDeviceInfo)

Expand Down Expand Up @@ -70,7 +74,7 @@ def _proxy_callback(decrypted_data: bytearray) -> None:
self._device_info.ble_address, _proxy_callback
)

def _decode_response(self, decrypted_data: bytearray) -> BTLightData:
def _decode_response(self, decrypted_data: bytearray) -> Optional[BTData]:
"""Device specific decoding to be implemented by subclass, ie device class.
Parameters
Expand Down Expand Up @@ -197,8 +201,18 @@ async def brightness(self, brightness: int) -> bool:

return True

def _decode_response(self, decrypted_data: bytearray) -> BTLightData:
def _decode_response(self, decrypted_data: bytearray) -> Optional[BTLightData]:
# Overriden
command = int.from_bytes(bytes=decrypted_data[3:5], byteorder="big")

if command == constants.PlejdCommand.BLE_CMD_TIME_UPDATE:
logging.debug("Ignoring time update in light")
return None

if command not in self._device_info.supported_commands:
logging.error(
f"Command {command} not supported for device category {self._device_info.category}"
)

state = decrypted_data[5] if len(decrypted_data) > 5 else 0
brightness = decrypted_data[7] if len(decrypted_data) > 7 else 0
Expand All @@ -211,3 +225,39 @@ def _decode_response(self, decrypted_data: bytearray) -> BTLightData:
brightness=brightness,
)
return response


class BTDeviceTrigger(BTDevice[BTDeviceTriggerInfo]):
"""Plejd bluetooth device trigger.
This device only listens to button presses, and does not support any commands.
"""

def _decode_response(self, decrypted_data: bytearray) -> Optional[BTDeviceTriggerData]:
# Overriden
command = int.from_bytes(bytes=decrypted_data[3:5], byteorder="big")

if command == constants.PlejdCommand.BLE_CMD_TIME_UPDATE:
logging.debug("Ignoring time update in device trigger")
return None

if command not in self._device_info.supported_commands:
logging.debug(
f"Command {command} not supported for device category {self._device_info.category}"
)
return None

if len(decrypted_data) < 8:
logging.debug(
f"Device trigger {self._device_info.name} received too short data, ignoring"
)
return None

input = decrypted_data[6] # Which input is triggered

response = BTDeviceTriggerData(
raw_data=decrypted_data,
input=int(input),
)

return response
21 changes: 20 additions & 1 deletion plejd_mqtt_ha/mdl/bt_device_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class BTDeviceInfo(BaseModel):
"""Name of the device"""
hardware_id: str # TODO rename to device address?
"""Adress of the device within the Plejd mesh"""
index: int # TODO this is the index of the entity actually
index: int
"""Index of the entity belonging to the device"""
ble_address: int
"""BLE address of the device"""
Expand All @@ -61,3 +61,22 @@ class BTLightInfo(BTDeviceInfo):

brightness: bool = False
"""Whether or not the light supports setting brightness"""


class BTDeviceTriggerInfo(BTDeviceInfo):
"""Information specific to trigger devices."""

category = "device_trigger"
supported_commands = [
constants.PlejdCommand.BLE_CMD_REMOTE_CLICK,
constants.PlejdCommand.BLE_CMD_STATE_CHANGE,
constants.PlejdCommand.BLE_CMD_DIM_CHANGE,
constants.PlejdCommand.BLE_CMD_DIM2_CHANGE,
]

buttons: list[dict]
"""List of buttons on the device, each button is a dict with the following keys:
- type: Type of button, ie DirectionUp, DirectionDown etc
- double_sided: Whether or not the button is double sided
- input: Input number of the button
"""
Loading

0 comments on commit 6d83198

Please sign in to comment.