Skip to content

Commit

Permalink
Merge branch 'name' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
Ernst79 authored Jan 10, 2024
2 parents 1128e2e + 384ed76 commit bfcdc2b
Show file tree
Hide file tree
Showing 47 changed files with 378 additions and 660 deletions.
28 changes: 17 additions & 11 deletions custom_components/ble_monitor/binary_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,15 @@ async def async_add_binary_sensor(key, device_model, firmware, auto_sensors, man
if key not in sensors_by_key:
sensors_by_key[key] = {}
if measurement not in sensors_by_key[key]:
entity_description = [item for item in BINARY_SENSOR_TYPES if item.key is measurement][0]
sensors[measurement] = globals()[entity_description.sensor_class](
self.config, key, device_model, firmware, entity_description, manufacturer
)
self.add_entities([sensors[measurement]])
sensors_by_key[key].update(sensors)
try:
entity_description = [item for item in BINARY_SENSOR_TYPES if item.key is measurement][0]
sensors[measurement] = globals()[entity_description.sensor_class](
self.config, key, device_model, firmware, entity_description, manufacturer
)
self.add_entities([sensors[measurement]])
sensors_by_key[key].update(sensors)
except IndexError:
_LOGGER.error("Error adding measurement %s", measurement)
else:
sensors = sensors_by_key[key]
else:
Expand All @@ -102,11 +105,14 @@ async def async_add_binary_sensor(key, device_model, firmware, auto_sensors, man
sensors = {}
sensors_by_key[key] = {}
for measurement in device_sensors:
entity_description = [item for item in BINARY_SENSOR_TYPES if item.key is measurement][0]
sensors[measurement] = globals()[entity_description.sensor_class](
self.config, key, device_model, firmware, entity_description, manufacturer
)
self.add_entities([sensors[measurement]])
try:
entity_description = [item for item in BINARY_SENSOR_TYPES if item.key is measurement][0]
sensors[measurement] = globals()[entity_description.sensor_class](
self.config, key, device_model, firmware, entity_description, manufacturer
)
self.add_entities([sensors[measurement]])
except IndexError:
_LOGGER.error("Error adding measurement %s", measurement)
sensors_by_key[key] = sensors
else:
sensors = sensors_by_key[key]
Expand Down
170 changes: 95 additions & 75 deletions custom_components/ble_monitor/ble_parser/__init__.py

Large diffs are not rendered by default.

27 changes: 7 additions & 20 deletions custom_components/ble_monitor/ble_parser/acconeer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
}


def parse_acconeer(self, data, source_mac, rssi):
def parse_acconeer(self, data: bytes, mac: str):
"""Acconeer parser"""
msg_length = len(data)
firmware = "Acconeer"
acconeer_mac = source_mac
device_id = data[4]
xvalue = data[5:]
result = {"firmware": firmware}
Expand All @@ -30,12 +29,7 @@ def parse_acconeer(self, data, source_mac, rssi):
# Acconeer Sensors
device_type = ACCONEER_SENSOR_IDS[device_id]
measurements = MEASUREMENTS[device_id]
(
battery_level,
temperature,
presence,
reserved2
) = unpack("<HhHQ", xvalue)
(battery_level, temperature, presence, _) = unpack("<HhHQ", xvalue)

if "presence" in measurements:
result.update({
Expand All @@ -56,34 +50,27 @@ def parse_acconeer(self, data, source_mac, rssi):
if device_type is None:
if self.report_unknown == "Acconeer":
_LOGGER.info(
"BLE ADV from UNKNOWN Acconeer DEVICE: RSSI: %s, MAC: %s, ADV: %s",
rssi,
to_mac(source_mac),
"BLE ADV from UNKNOWN Acconeer DEVICE: MAC: %s, ADV: %s",
to_mac(mac),
data.hex()
)
return None

# Check for duplicate messages
packet_id = xvalue.hex()
try:
prev_packet = self.lpacket_ids[acconeer_mac]
prev_packet = self.lpacket_ids[mac]
except KeyError:
# start with empty first packet
prev_packet = None
if prev_packet == packet_id:
# only process new messages
if self.filter_duplicates is True:
return None
self.lpacket_ids[acconeer_mac] = packet_id

# check for MAC presence in sensor whitelist, if needed
if self.discovery is False and acconeer_mac not in self.sensor_whitelist:
_LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(acconeer_mac))
return None
self.lpacket_ids[mac] = packet_id

result.update({
"rssi": rssi,
"mac": to_unformatted_mac(acconeer_mac),
"mac": to_unformatted_mac(mac),
"type": device_type,
"packet": packet_id,
"firmware": firmware,
Expand Down
16 changes: 4 additions & 12 deletions custom_components/ble_monitor/ble_parser/airmentor.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,9 @@ def parse_2s(msg_type, xvalue):
return None


def parse_airmentor(self, data: bytes, source_mac: str, rssi):
def parse_airmentor(self, data: bytes, mac: str):
"""Parser for Air Mentor"""
data_length = len(data)
airmentor_mac = source_mac

device_type = None
result = None
Expand All @@ -127,21 +126,14 @@ def parse_airmentor(self, data: bytes, source_mac: str, rssi):
if device_type is None or result is None:
if self.report_unknown == "Air Mentor":
_LOGGER.info(
"BLE ADV from UNKNOWN Air Mentor DEVICE: RSSI: %s, MAC: %s, ADV: %s",
rssi,
to_mac(source_mac),
"BLE ADV from UNKNOWN Air Mentor DEVICE: MAC: %s, ADV: %s",
to_mac(mac),
data.hex()
)
return None

# check for MAC presence in sensor whitelist, if needed
if self.discovery is False and airmentor_mac not in self.sensor_whitelist:
_LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(airmentor_mac))
return None

result.update({
"rssi": rssi,
"mac": to_unformatted_mac(airmentor_mac),
"mac": to_unformatted_mac(mac),
"type": device_type,
"packet": "no packet id",
"firmware": firmware,
Expand Down
20 changes: 6 additions & 14 deletions custom_components/ble_monitor/ble_parser/almendo.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
_LOGGER = logging.getLogger(__name__)


def parse_almendo(self, data, source_mac, rssi):
def parse_almendo(self, data: bytes, mac: str):
"""Almendo parser"""
result = {
"mac": to_unformatted_mac(source_mac),
"rssi": rssi,
"mac": to_unformatted_mac(mac),
"data": False,
"packet": "no packet id"
}
Expand Down Expand Up @@ -48,25 +47,18 @@ def parse_almendo(self, data, source_mac, rssi):
if result is None:
if self.report_unknown == "Almendo":
_LOGGER.info(
"BLE ADV from UNKNOWN Almendo DEVICE: RSSI: %s, "
"BLE ADV from UNKNOWN Almendo DEVICE: "
"MAC: %s, ADV: %s",
rssi,
to_mac(source_mac),
to_mac(mac),
data.hex(),
)
return None
# check for MAC presence in sensor whitelist, if needed
if self.discovery is False and source_mac not in self.sensor_whitelist:
_LOGGER.debug(
"Discovery is disabled. MAC: %s is not whitelisted!",
to_mac(source_mac),
)
return None

if version != 1:
_LOGGER.info(
"Protocol version %i on device %s not yet known "
"by the Almendo parser",
version,
to_mac(source_mac),
to_mac(mac),
)
return result
20 changes: 6 additions & 14 deletions custom_components/ble_monitor/ble_parser/altbeacon.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,23 @@

from .const import (CONF_DATA, CONF_FIRMWARE, CONF_MAC, CONF_MAJOR,
CONF_MANUFACTURER, CONF_MEASURED_POWER, CONF_MINOR,
CONF_PACKET, CONF_RSSI, CONF_TRACKER_ID, CONF_TYPE,
CONF_UUID, DEFAULT_MANUFACTURER, MANUFACTURER_DICT)
CONF_PACKET, CONF_TRACKER_ID, CONF_TYPE, CONF_UUID,
DEFAULT_MANUFACTURER, MANUFACTURER_DICT)
from .helpers import to_mac, to_unformatted_mac, to_uuid

_LOGGER = logging.getLogger(__name__)

DEVICE_TYPE: Final = "AltBeacon"


def parse_altbeacon(self, data: str, comp_id: int, source_mac: str, rssi: float):
def parse_altbeacon(self, data: str, comp_id: int, mac: str):
"""parser for Alt Beacon"""
if len(data) >= 27:
uuid = data[6:22]
(major, minor, power) = unpack(">HHb", data[22:27])

tracker_data = {
CONF_RSSI: rssi,
CONF_MAC: to_unformatted_mac(source_mac),
CONF_MAC: to_unformatted_mac(mac),
CONF_UUID: to_uuid(uuid).replace('-', ''),
CONF_TRACKER_ID: uuid,
CONF_MAJOR: major,
Expand All @@ -42,18 +41,11 @@ def parse_altbeacon(self, data: str, comp_id: int, source_mac: str, rssi: float)
else:
if self.report_unknown == DEVICE_TYPE:
_LOGGER.info(
"BLE ADV from UNKNOWN %s DEVICE: RSSI: %s, MAC: %s, ADV: %s",
"BLE ADV from UNKNOWN %s DEVICE: MAC: %s, ADV: %s",
DEVICE_TYPE,
rssi,
to_mac(source_mac),
to_mac(mac),
data.hex()
)
return None, None

# check for UUID presence in sensor whitelist, if needed
if self.discovery is False and uuid and uuid not in self.sensor_whitelist:
_LOGGER.debug("Discovery is disabled. UUID: %s is not whitelisted!", to_uuid(uuid))

return None, None

return sensor_data, tracker_data
12 changes: 3 additions & 9 deletions custom_components/ble_monitor/ble_parser/amazfit.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
_LOGGER = logging.getLogger(__name__)


def parse_amazfit(self, service_data, man_spec_data, source_mac, rssi):
def parse_amazfit(self, service_data: str, man_spec_data: str, mac: str):
"""parser for Amazfit scale and Miband 4 and 5"""
if service_data:
service_data_length = len(service_data)
Expand Down Expand Up @@ -65,23 +65,17 @@ def parse_amazfit(self, service_data, man_spec_data, source_mac, rssi):
if self.report_unknown == "Amazfit":
_LOGGER.info(
"BLE ADV from UNKNOWN Amazfit Scale DEVICE: MAC: %s, service data: %s, manufacturer data: %s",
to_mac(source_mac),
to_mac(mac),
service_data,
man_spec_data
)
return None

# check for MAC presence in sensor whitelist, if needed
if self.discovery is False and source_mac.lower() not in self.sensor_whitelist:
_LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(source_mac))
return None

result.update({
"type": device_type,
"firmware": firmware,
"mac": to_unformatted_mac(source_mac),
"mac": to_unformatted_mac(mac),
"packet": 'no packet id',
"rssi": rssi,
"data": True,
})
return result
16 changes: 5 additions & 11 deletions custom_components/ble_monitor/ble_parser/atc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
_LOGGER = logging.getLogger(__name__)


def parse_atc(self, data, source_mac, rssi):
def parse_atc(self, data: bytes, mac: str):
"""Parse ATC BLE advertisements"""
device_type = "ATC"
msg_length = len(data)
Expand Down Expand Up @@ -45,7 +45,7 @@ def parse_atc(self, data, source_mac, rssi):
adv_priority = 29
elif msg_length == 15:
# Parse BLE message in Custom format with encryption
atc_mac = source_mac
atc_mac = mac
packet_id = data[4]
firmware = "ATC (Custom encrypted)"
decrypted_data = decrypt_atc(self, data, atc_mac)
Expand All @@ -70,7 +70,7 @@ def parse_atc(self, data, source_mac, rssi):
adv_priority = 39
elif msg_length == 12:
# Parse BLE message in Atc1441 format with encryption
atc_mac = source_mac
atc_mac = mac
packet_id = data[4]
firmware = "ATC (Atc1441 encrypted)"
decrypted_data = decrypt_atc(self, data, atc_mac)
Expand All @@ -97,17 +97,12 @@ def parse_atc(self, data, source_mac, rssi):
else:
if self.report_unknown == "ATC":
_LOGGER.info(
"BLE ADV from UNKNOWN ATC DEVICE: RSSI: %s, MAC: %s, AdStruct: %s",
rssi,
to_mac(source_mac),
"BLE ADV from UNKNOWN ATC DEVICE: MAC: %s, AdStruct: %s",
to_mac(mac),
data.hex()
)
return None

# check for MAC presence in sensor whitelist, if needed
if self.discovery is False and atc_mac not in self.sensor_whitelist:
return None

try:
prev_packet = self.lpacket_ids[atc_mac]
except KeyError:
Expand All @@ -134,7 +129,6 @@ def parse_atc(self, data, source_mac, rssi):
self.lpacket_ids[atc_mac] = packet_id

result.update({
"rssi": rssi,
"mac": to_unformatted_mac(atc_mac),
"type": device_type,
"packet": packet_id,
Expand Down
15 changes: 4 additions & 11 deletions custom_components/ble_monitor/ble_parser/bluemaestro.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
_LOGGER = logging.getLogger(__name__)


def parse_bluemaestro(self, data, source_mac, rssi):
def parse_bluemaestro(self, data: bytes, mac: str):
"""Parse BlueMaestro advertisement."""
msg_length = len(data)
firmware = "BlueMaestro"
device_id = data[4]
bluemaestro_mac = source_mac
bluemaestro_mac = mac
msg = data[5:]
if msg_length == 18 and device_id in [0x16, 0x17]:
# BlueMaestro Tempo Disc THD
Expand Down Expand Up @@ -52,20 +52,13 @@ def parse_bluemaestro(self, data, source_mac, rssi):
else:
if self.report_unknown == "BlueMaestro":
_LOGGER.info(
"BLE ADV from UNKNOWN BlueMaestro DEVICE: RSSI: %s, MAC: %s, ADV: %s",
rssi,
to_mac(source_mac),
"BLE ADV from UNKNOWN BlueMaestro DEVICE: MAC: %s, ADV: %s",
to_mac(mac),
data.hex()
)
return None

# check for MAC presence in whitelist, if needed
if self.discovery is False and bluemaestro_mac not in self.sensor_whitelist:
_LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(bluemaestro_mac))
return None

result.update({
"rssi": rssi,
"mac": to_unformatted_mac(bluemaestro_mac),
"type": device_type,
"packet": log_cnt,
Expand Down
Loading

0 comments on commit bfcdc2b

Please sign in to comment.