diff --git a/custom_components/ble_monitor/binary_sensor.py b/custom_components/ble_monitor/binary_sensor.py index e4892ff70..14a5cb788 100644 --- a/custom_components/ble_monitor/binary_sensor.py +++ b/custom_components/ble_monitor/binary_sensor.py @@ -88,12 +88,15 @@ async def async_add_binary_sensor(key, device_model, firmware, auto_sensors, man if key not in sensors_by_key: sensors_by_key[key] = {} if measurement not in sensors_by_key[key]: - entity_description = [item for item in BINARY_SENSOR_TYPES if item.key is measurement][0] - sensors[measurement] = globals()[entity_description.sensor_class]( - self.config, key, device_model, firmware, entity_description, manufacturer - ) - self.add_entities([sensors[measurement]]) - sensors_by_key[key].update(sensors) + try: + entity_description = [item for item in BINARY_SENSOR_TYPES if item.key is measurement][0] + sensors[measurement] = globals()[entity_description.sensor_class]( + self.config, key, device_model, firmware, entity_description, manufacturer + ) + self.add_entities([sensors[measurement]]) + sensors_by_key[key].update(sensors) + except IndexError: + _LOGGER.error("Error adding measurement %s", measurement) else: sensors = sensors_by_key[key] else: @@ -102,11 +105,14 @@ async def async_add_binary_sensor(key, device_model, firmware, auto_sensors, man sensors = {} sensors_by_key[key] = {} for measurement in device_sensors: - entity_description = [item for item in BINARY_SENSOR_TYPES if item.key is measurement][0] - sensors[measurement] = globals()[entity_description.sensor_class]( - self.config, key, device_model, firmware, entity_description, manufacturer - ) - self.add_entities([sensors[measurement]]) + try: + entity_description = [item for item in BINARY_SENSOR_TYPES if item.key is measurement][0] + sensors[measurement] = globals()[entity_description.sensor_class]( + self.config, key, device_model, firmware, entity_description, manufacturer + ) + self.add_entities([sensors[measurement]]) + except IndexError: + _LOGGER.error("Error adding measurement %s", measurement) sensors_by_key[key] = sensors else: sensors = sensors_by_key[key] diff --git a/custom_components/ble_monitor/ble_parser/__init__.py b/custom_components/ble_monitor/ble_parser/__init__.py index 5f74251a1..27aca8d15 100644 --- a/custom_components/ble_monitor/ble_parser/__init__.py +++ b/custom_components/ble_monitor/ble_parser/__init__.py @@ -16,7 +16,7 @@ from .const import JAALEE_TYPES, TILT_TYPES from .govee import parse_govee from .grundfos import parse_grundfos -from .helpers import to_mac, to_unformatted_mac +from .helpers import to_mac, to_unformatted_mac, to_uuid from .hhcc import parse_hhcc from .holyiot import parse_holyiot from .hormann import parse_hormann @@ -185,8 +185,9 @@ def parse_advertisement( man_spec_data_list: Optional[list] = None ): """parse BLE advertisement""" - sensor_data = None - tracker_data = None + sensor_data = {} + tracker_data = {} + uuid = None unknown_sensor = False if service_data_list is None: service_data_list = [] @@ -200,63 +201,63 @@ def parse_advertisement( uuid16 = (service_data[3] << 8) | service_data[2] if uuid16 == 0x1809: # UUID16 = Health Thermometer service (used by Relsib) - sensor_data = parse_relsib(self, service_data, mac, rssi) + sensor_data = parse_relsib(self, service_data, mac) break if uuid16 == 0x181A: # UUID16 = Environmental Sensing (used by ATC or b-parasite) if len(service_data) == 22 or len(service_data) == 20: - sensor_data = parse_bparasite(self, service_data, mac, rssi) + sensor_data = parse_bparasite(self, service_data, mac) else: - sensor_data = parse_atc(self, service_data, mac, rssi) + sensor_data = parse_atc(self, service_data, mac) break elif uuid16 in [0x181B, 0x181D]: # UUID16 = Body Composition and Weight Scale (used by Mi Scale) - sensor_data = parse_miscale(self, service_data, mac, rssi) + sensor_data = parse_miscale(self, service_data, mac) break elif uuid16 in [0x181C, 0x181E]: # UUID16 = User Data and Bond Management (used by BTHome) - sensor_data = parse_bthome(self, service_data, uuid16, mac, rssi) + sensor_data = parse_bthome(self, service_data, uuid16, mac) break elif uuid16 == 0x5242: # UUID16 = HolyIOT - sensor_data = parse_holyiot(self, service_data, mac, rssi) + sensor_data = parse_holyiot(self, service_data, mac) break elif uuid16 in [0xAA20, 0xAA21, 0xAA22] and local_name == "ECo": # UUID16 = Relsib - sensor_data = parse_relsib(self, service_data, mac, rssi) + sensor_data = parse_relsib(self, service_data, mac) break elif uuid16 == 0xF525: # UUID16 = Jaalee - sensor_data = parse_jaalee(self, service_data, mac, rssi) + sensor_data = parse_jaalee(self, service_data, mac) break elif uuid16 == 0xFCD2: # UUID16 = Allterco Robotics ltd (BTHome V2) - sensor_data = parse_bthome(self, service_data, uuid16, mac, rssi) + sensor_data = parse_bthome(self, service_data, uuid16, mac) break elif uuid16 in [0xFD3D, 0x0D00]: # UUID16 = unknown (used by Switchbot) - sensor_data = parse_switchbot(self, service_data, mac, rssi) + sensor_data = parse_switchbot(self, service_data, mac) break elif uuid16 == 0xFD50: # UUID16 = Hangzhou Tuya Information Technology Co., Ltd (HHCC) - sensor_data = parse_hhcc(self, service_data, mac, rssi) + sensor_data = parse_hhcc(self, service_data, mac) break elif uuid16 == 0xFDCD: # UUID16 = Qingping - sensor_data = parse_qingping(self, service_data, mac, rssi) + sensor_data = parse_qingping(self, service_data, mac) break elif uuid16 == 0xFE95: # UUID16 = Xiaomi - sensor_data = parse_xiaomi(self, service_data, mac, rssi) + sensor_data = parse_xiaomi(self, service_data, mac) break elif uuid16 == 0xFEAA: if len(service_data) == 19: # UUID16 = Google (used by KKM) - sensor_data = parse_kkm(self, service_data, mac, rssi) + sensor_data = parse_kkm(self, service_data, mac) break elif len(service_data) >= 23: # UUID16 = Google (used by Ruuvitag V2/V4) - sensor_data = parse_ruuvitag(self, service_data, mac, rssi) + sensor_data = parse_ruuvitag(self, service_data, mac) break elif uuid16 == 0xFEE0: # UUID16 = Anhui Huami Information Technology Co., Ltd. (Amazfit) @@ -264,16 +265,16 @@ def parse_advertisement( man_spec_data = man_spec_data_list[0] else: man_spec_data = None - sensor_data = parse_amazfit(self, service_data, man_spec_data, mac, rssi) + sensor_data = parse_amazfit(self, service_data, man_spec_data, mac) elif uuid16 == 0xFFF9: # UUID16 = FIDO (used by Cleargrass) - sensor_data = parse_qingping(self, service_data, mac, rssi) + sensor_data = parse_qingping(self, service_data, mac) break elif uuid16 == 0x2A6E or uuid16 == 0x2A6F: # UUID16 = Temperature and Humidity (used by Teltonika) if len(service_data_list) == 2: service_data = b"".join(service_data_list) - sensor_data = parse_teltonika(self, service_data, local_name, mac, rssi) + sensor_data = parse_teltonika(self, service_data, local_name, mac) break else: unknown_sensor = True @@ -285,184 +286,186 @@ def parse_advertisement( # Filter on Company Identifier if comp_id == 0x0001 and data_len in [0x09, 0x0C, 0x22, 0x25]: # Govee H5101/H5102/H5106/H5177 - sensor_data = parse_govee(self, man_spec_data, service_class_uuid16, local_name, mac, rssi) + sensor_data = parse_govee(self, man_spec_data, service_class_uuid16, local_name, mac) break elif comp_id == 0x004C and man_spec_data[4] == 0x02: # iBeacon if int.from_bytes(man_spec_data[6:22], byteorder='big') in TILT_TYPES: # Tilt - sensor_data, tracker_data = parse_tilt(self, man_spec_data, mac, rssi) + sensor_data, tracker_data = parse_tilt(self, man_spec_data, mac) + uuid = man_spec_data[6:22] break elif int.from_bytes(man_spec_data[6:22], byteorder='big') in JAALEE_TYPES: # Jaalee - sensor_data = parse_jaalee(self, man_spec_data, mac, rssi) + sensor_data = parse_jaalee(self, man_spec_data, mac) break elif len(man_spec_data_list) == 2: # Teltonika Eye (can send both iBeacon and Teltonika data in one message) second_man_spec_data = man_spec_data_list[1] second_comp_id = (second_man_spec_data[3] << 8) | second_man_spec_data[2] if second_comp_id == 0x089A: - sensor_data = parse_teltonika(self, second_man_spec_data, local_name, mac, rssi) + sensor_data = parse_teltonika(self, second_man_spec_data, local_name, mac) break else: # iBeacon - sensor_data, tracker_data = parse_ibeacon(self, man_spec_data, mac, rssi) + sensor_data, tracker_data = parse_ibeacon(self, man_spec_data, mac) + uuid = man_spec_data[6:22] break elif comp_id == 0x00DC and data_len == 0x0E: # Oral-b - sensor_data = parse_oral_b(self, man_spec_data, mac, rssi) + sensor_data = parse_oral_b(self, man_spec_data, mac) break elif comp_id == 0x05CD and data_len == 0x15: # Chef iQ - sensor_data = parse_chefiq(self, man_spec_data, mac, rssi) + sensor_data = parse_chefiq(self, man_spec_data, mac) break elif comp_id == 0x0131: # Oras - sensor_data = parse_oras(self, man_spec_data, mac, rssi) + sensor_data = parse_oras(self, man_spec_data, mac) break elif comp_id == 0x0157 and data_len == 0x1B: # Miband - sensor_data = parse_amazfit(self, None, man_spec_data, mac, rssi) + sensor_data = parse_amazfit(self, None, man_spec_data, mac) break elif comp_id == 0x0194 and data_len == 0x0C: # Blustream - sensor_data = parse_blustream(self, man_spec_data, mac, rssi) + sensor_data = parse_blustream(self, man_spec_data, mac) break elif comp_id == 0x0499: # Ruuvitag V3/V5 - sensor_data = parse_ruuvitag(self, man_spec_data, mac, rssi) + sensor_data = parse_ruuvitag(self, man_spec_data, mac) break elif comp_id == 0x0757: # Teltonika (Ela rebrand) if len(man_spec_data_list) == 2: man_spec_data = b"".join(man_spec_data_list) - sensor_data = parse_teltonika(self, man_spec_data, local_name, mac, rssi) + sensor_data = parse_teltonika(self, man_spec_data, local_name, mac) break elif comp_id == 0x07B4: # Hörmann if len(man_spec_data_list) == 2: man_spec_data = b"".join(man_spec_data_list) - sensor_data = parse_hormann(self, man_spec_data, mac, rssi) + sensor_data = parse_hormann(self, man_spec_data, mac) break elif comp_id == 0x089A: # Teltonika - sensor_data = parse_teltonika(self, man_spec_data, local_name, mac, rssi) + sensor_data = parse_teltonika(self, man_spec_data, local_name, mac) break elif comp_id == 0x094F and data_len == 0x15: # Mikrotik - sensor_data = parse_mikrotik(self, man_spec_data, mac, rssi) + sensor_data = parse_mikrotik(self, man_spec_data, mac) break elif comp_id == 0x06E8: # Almendo (Blusensor) - sensor_data = parse_almendo(self, man_spec_data, mac, rssi) + sensor_data = parse_almendo(self, man_spec_data, mac) break elif comp_id == 0x1000 and data_len == 0x15: # Moat S2 - sensor_data = parse_moat(self, man_spec_data, mac, rssi) + sensor_data = parse_moat(self, man_spec_data, mac) break elif comp_id == 0x0133 and data_len in [0x11, 0x15]: # BlueMaestro - sensor_data = parse_bluemaestro(self, man_spec_data, mac, rssi) + sensor_data = parse_bluemaestro(self, man_spec_data, mac) break elif comp_id == 0x01AE and data_len == 0x0F: # SmartDry - sensor_data = parse_smartdry(self, man_spec_data, mac, rssi) + sensor_data = parse_smartdry(self, man_spec_data, mac) break elif comp_id == 0x06D5: # Sensirion - sensor_data = parse_sensirion(self, man_spec_data, local_name, mac, rssi) + sensor_data = parse_sensirion(self, man_spec_data, local_name, mac) break elif comp_id in [0x2111, 0x2112, 0x2121, 0x2122] and data_len == 0x0B: # Air Mentor - sensor_data = parse_airmentor(self, man_spec_data, mac, rssi) + sensor_data = parse_airmentor(self, man_spec_data, mac) break elif comp_id == 0x2730 and data_len in [0x14, 0x2D]: # Govee H5182 - sensor_data = parse_govee(self, man_spec_data, service_class_uuid16, local_name, mac, rssi) + sensor_data = parse_govee(self, man_spec_data, service_class_uuid16, local_name, mac) break elif comp_id == 0x1B36 and data_len in [0x14, 0x2D]: # Govee H5184 - sensor_data = parse_govee(self, man_spec_data, service_class_uuid16, local_name, mac, rssi) + sensor_data = parse_govee(self, man_spec_data, service_class_uuid16, local_name, mac) break elif comp_id in [0x4A32, 0x332, 0x4C32] and data_len in [0x17, 0x2D]: # Govee H5185 - sensor_data = parse_govee(self, man_spec_data, service_class_uuid16, local_name, mac, rssi) + sensor_data = parse_govee(self, man_spec_data, service_class_uuid16, local_name, mac) break elif comp_id in [0x67DD, 0xE02F, 0xF79F] and data_len in [0x11, 0x2A]: # Govee H5183 - sensor_data = parse_govee(self, man_spec_data, service_class_uuid16, local_name, mac, rssi) + sensor_data = parse_govee(self, man_spec_data, service_class_uuid16, local_name, mac) elif comp_id in [0x5112, 0x5122, 0x6111, 0x6121] and data_len == 0x0f: # Air Mentor 2S - sensor_data = parse_airmentor(self, man_spec_data, mac, rssi) + sensor_data = parse_airmentor(self, man_spec_data, mac) break elif comp_id == 0x8801 and data_len in [0x0C, 0x25]: # Govee H5179 - sensor_data = parse_govee(self, man_spec_data, service_class_uuid16, local_name, mac, rssi) + sensor_data = parse_govee(self, man_spec_data, service_class_uuid16, local_name, mac) break elif comp_id == 0xAA55 and data_len == 0x14: # Thermobeacon - sensor_data = parse_thermobeacon(self, man_spec_data, mac, rssi) + sensor_data = parse_thermobeacon(self, man_spec_data, mac) break elif comp_id == 0xEC88 and data_len in [0x09, 0x0A, 0x0C, 0x22, 0x24, 0x25]: # Govee H5051/H5071/H5072/H5075/H5074 - sensor_data = parse_govee(self, man_spec_data, service_class_uuid16, local_name, mac, rssi) + sensor_data = parse_govee(self, man_spec_data, service_class_uuid16, local_name, mac) break elif comp_id == 0xF214 and data_len == 0x16: # Grundfos - sensor_data = parse_grundfos(self, man_spec_data, mac, rssi) + sensor_data = parse_grundfos(self, man_spec_data, mac) break elif comp_id == 0xFFFF and data_len == 0x1E: # Kegtron - sensor_data = parse_kegtron(self, man_spec_data, mac, rssi) + sensor_data = parse_kegtron(self, man_spec_data, mac) break elif comp_id == 0xA0AC and data_len == 0x0F and man_spec_data[14] in [0x06, 0x0D]: # Laica - sensor_data = parse_laica(self, man_spec_data, mac, rssi) + sensor_data = parse_laica(self, man_spec_data, mac) break # Filter on part of the UUID16 elif man_spec_data[2] == 0xC0 and data_len == 0x10: # Xiaogui Scale - sensor_data = parse_xiaogui(self, man_spec_data, mac, rssi) + sensor_data = parse_xiaogui(self, man_spec_data, mac) break elif man_spec_data[3] == 0x82 and data_len == 0x0E: # iNode - sensor_data = parse_inode(self, man_spec_data, mac, rssi) + sensor_data = parse_inode(self, man_spec_data, mac) break elif man_spec_data[3] in [ 0x91, 0x92, 0x93, 0x94, 0x95, 0x96, 0x9A, 0x9B, 0x9C, 0x9D ] and data_len == 0x19: # iNode Care Sensors - sensor_data = parse_inode(self, man_spec_data, mac, rssi) + sensor_data = parse_inode(self, man_spec_data, mac) break # Filter on service class uuid16 elif service_class_uuid16 == 0x20AA and data_len == 0x0E: # Jinou BEC07-5 - sensor_data = parse_jinou(self, man_spec_data, mac, rssi) + sensor_data = parse_jinou(self, man_spec_data, mac) break elif service_class_uuid16 in [0x5182, 0x5184] and data_len in [0x14, 0x2D]: # Govee H5182 and H5184 - sensor_data = parse_govee(self, man_spec_data, service_class_uuid16, local_name, mac, rssi) + sensor_data = parse_govee(self, man_spec_data, service_class_uuid16, local_name, mac) break elif service_class_uuid16 == 0x5183 and data_len in [0x11, 0x2A]: # Govee H5183 - sensor_data = parse_govee(self, man_spec_data, service_class_uuid16, local_name, mac, rssi) + sensor_data = parse_govee(self, man_spec_data, service_class_uuid16, local_name, mac) break elif service_class_uuid16 in [0x5185, 0x5198] and data_len in [0x17, 0x30]: # Govee H5185 and H5198 - sensor_data = parse_govee(self, man_spec_data, service_class_uuid16, local_name, mac, rssi) + sensor_data = parse_govee(self, man_spec_data, service_class_uuid16, local_name, mac) break elif service_class_uuid16 == 0xF0FF: if comp_id in [0x0010, 0x0011, 0x0015, 0x0018, 0x001B] and data_len in [0x15, 0x17]: # Thermobeacon - sensor_data = parse_thermobeacon(self, man_spec_data, mac, rssi) + sensor_data = parse_thermobeacon(self, man_spec_data, mac) break elif (comp_id in [0x0000, 0x0001] or local_name in ["iBBQ", "xBBQ", "sps", "tps"]) and ( data_len in [0x0A, 0x0D, 0x0F, 0x13, 0x17] ): # Inkbird - sensor_data = parse_inkbird(self, man_spec_data, local_name, mac, rssi) + sensor_data = parse_inkbird(self, man_spec_data, local_name, mac) break else: unknown_sensor = True @@ -472,27 +475,28 @@ def parse_advertisement( b'\xb0\x0a\x09\xec\xd7\x9d\xb8\x93\xba\x42\xd6\x11\x00\x00\x09\xef' ) and data_len in [0x06, 0x08]: # Sensorpush - sensor_data = parse_sensorpush(self, man_spec_data, mac, rssi) + sensor_data = parse_sensorpush(self, man_spec_data, mac) break # Filter on complete local name elif local_name in ["sps", "tps"] and data_len == 0x0A: # Inkbird IBS-TH - sensor_data = parse_inkbird(self, man_spec_data, local_name, mac, rssi) + sensor_data = parse_inkbird(self, man_spec_data, local_name, mac) break elif local_name[0:5] in ["TP357", "TP359"] and data_len >= 0x07: # Thermopro - sensor_data = parse_thermopro(self, man_spec_data, local_name[0:5], mac, rssi) + sensor_data = parse_thermopro(self, man_spec_data, local_name[0:5], mac) break # Filter on other parts of the manufacturer specific data elif data_len == 0x1B and ((man_spec_data[4] << 8) | man_spec_data[5]) == 0xBEAC: # AltBeacon - sensor_data, tracker_data = parse_altbeacon(self, man_spec_data, comp_id, mac, rssi) + sensor_data, tracker_data = parse_altbeacon(self, man_spec_data, comp_id, mac) + uuid = man_spec_data[6:22] break elif man_spec_data[0] == 0x12 and comp_id == 0xACC0: # Acconeer - sensor_data = parse_acconeer(self, man_spec_data, mac, rssi) + sensor_data = parse_acconeer(self, man_spec_data, mac) break else: unknown_sensor = True @@ -515,17 +519,33 @@ def parse_advertisement( ) break + # Ignore sensor data for MAC addresses not in sensor whitelist, when discovery is disabled + if self.discovery is False and mac not in self.sensor_whitelist: + _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(mac)) + sensor_data = None + # Ignore sensor data for UUID not in sensor whitelist, when discovery is disabled + if self.discovery is False and uuid and uuid not in self.sensor_whitelist: + _LOGGER.debug("Discovery is disabled. UUID: %s is not whitelisted!", to_uuid(uuid)) + + sensor_data = None + # add rssi and local name to the sensor_data output + if sensor_data: + sensor_data.update({ + "rssi": rssi, + "local_name": local_name, + }) + else: + sensor_data = None + # check for monitored device trackers tracker_id = tracker_data['tracker_id'] if tracker_data and 'tracker_id' in tracker_data else mac if tracker_id in self.tracker_whitelist: - if tracker_data is not None: - tracker_data.update({"is connected": True}) - else: - tracker_data = { - "is connected": True, - "mac": to_unformatted_mac(mac), - "rssi": rssi, - } + tracker_data.update({ + "is connected": True, + "mac": to_unformatted_mac(mac), + "rssi": rssi, + "local_name": local_name, + }) else: tracker_data = None diff --git a/custom_components/ble_monitor/ble_parser/acconeer.py b/custom_components/ble_monitor/ble_parser/acconeer.py index 7b6b0627b..0b8826573 100644 --- a/custom_components/ble_monitor/ble_parser/acconeer.py +++ b/custom_components/ble_monitor/ble_parser/acconeer.py @@ -17,11 +17,10 @@ } -def parse_acconeer(self, data, source_mac, rssi): +def parse_acconeer(self, data: bytes, mac: str): """Acconeer parser""" msg_length = len(data) firmware = "Acconeer" - acconeer_mac = source_mac device_id = data[4] xvalue = data[5:] result = {"firmware": firmware} @@ -30,12 +29,7 @@ def parse_acconeer(self, data, source_mac, rssi): # Acconeer Sensors device_type = ACCONEER_SENSOR_IDS[device_id] measurements = MEASUREMENTS[device_id] - ( - battery_level, - temperature, - presence, - reserved2 - ) = unpack("= 27: uuid = data[6:22] (major, minor, power) = unpack(">HHb", data[22:27]) tracker_data = { - CONF_RSSI: rssi, - CONF_MAC: to_unformatted_mac(source_mac), + CONF_MAC: to_unformatted_mac(mac), CONF_UUID: to_uuid(uuid).replace('-', ''), CONF_TRACKER_ID: uuid, CONF_MAJOR: major, @@ -42,18 +41,11 @@ def parse_altbeacon(self, data: str, comp_id: int, source_mac: str, rssi: float) else: if self.report_unknown == DEVICE_TYPE: _LOGGER.info( - "BLE ADV from UNKNOWN %s DEVICE: RSSI: %s, MAC: %s, ADV: %s", + "BLE ADV from UNKNOWN %s DEVICE: MAC: %s, ADV: %s", DEVICE_TYPE, - rssi, - to_mac(source_mac), + to_mac(mac), data.hex() ) return None, None - # check for UUID presence in sensor whitelist, if needed - if self.discovery is False and uuid and uuid not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. UUID: %s is not whitelisted!", to_uuid(uuid)) - - return None, None - return sensor_data, tracker_data diff --git a/custom_components/ble_monitor/ble_parser/amazfit.py b/custom_components/ble_monitor/ble_parser/amazfit.py index a64288a38..4b9f59d29 100644 --- a/custom_components/ble_monitor/ble_parser/amazfit.py +++ b/custom_components/ble_monitor/ble_parser/amazfit.py @@ -7,7 +7,7 @@ _LOGGER = logging.getLogger(__name__) -def parse_amazfit(self, service_data, man_spec_data, source_mac, rssi): +def parse_amazfit(self, service_data: str, man_spec_data: str, mac: str): """parser for Amazfit scale and Miband 4 and 5""" if service_data: service_data_length = len(service_data) @@ -65,23 +65,17 @@ def parse_amazfit(self, service_data, man_spec_data, source_mac, rssi): if self.report_unknown == "Amazfit": _LOGGER.info( "BLE ADV from UNKNOWN Amazfit Scale DEVICE: MAC: %s, service data: %s, manufacturer data: %s", - to_mac(source_mac), + to_mac(mac), service_data, man_spec_data ) return None - # check for MAC presence in sensor whitelist, if needed - if self.discovery is False and source_mac.lower() not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(source_mac)) - return None - result.update({ "type": device_type, "firmware": firmware, - "mac": to_unformatted_mac(source_mac), + "mac": to_unformatted_mac(mac), "packet": 'no packet id', - "rssi": rssi, "data": True, }) return result diff --git a/custom_components/ble_monitor/ble_parser/atc.py b/custom_components/ble_monitor/ble_parser/atc.py index ebc498eb8..7332c77c8 100644 --- a/custom_components/ble_monitor/ble_parser/atc.py +++ b/custom_components/ble_monitor/ble_parser/atc.py @@ -9,7 +9,7 @@ _LOGGER = logging.getLogger(__name__) -def parse_atc(self, data, source_mac, rssi): +def parse_atc(self, data: bytes, mac: str): """Parse ATC BLE advertisements""" device_type = "ATC" msg_length = len(data) @@ -45,7 +45,7 @@ def parse_atc(self, data, source_mac, rssi): adv_priority = 29 elif msg_length == 15: # Parse BLE message in Custom format with encryption - atc_mac = source_mac + atc_mac = mac packet_id = data[4] firmware = "ATC (Custom encrypted)" decrypted_data = decrypt_atc(self, data, atc_mac) @@ -70,7 +70,7 @@ def parse_atc(self, data, source_mac, rssi): adv_priority = 39 elif msg_length == 12: # Parse BLE message in Atc1441 format with encryption - atc_mac = source_mac + atc_mac = mac packet_id = data[4] firmware = "ATC (Atc1441 encrypted)" decrypted_data = decrypt_atc(self, data, atc_mac) @@ -97,17 +97,12 @@ def parse_atc(self, data, source_mac, rssi): else: if self.report_unknown == "ATC": _LOGGER.info( - "BLE ADV from UNKNOWN ATC DEVICE: RSSI: %s, MAC: %s, AdStruct: %s", - rssi, - to_mac(source_mac), + "BLE ADV from UNKNOWN ATC DEVICE: MAC: %s, AdStruct: %s", + to_mac(mac), data.hex() ) return None - # check for MAC presence in sensor whitelist, if needed - if self.discovery is False and atc_mac not in self.sensor_whitelist: - return None - try: prev_packet = self.lpacket_ids[atc_mac] except KeyError: @@ -134,7 +129,6 @@ def parse_atc(self, data, source_mac, rssi): self.lpacket_ids[atc_mac] = packet_id result.update({ - "rssi": rssi, "mac": to_unformatted_mac(atc_mac), "type": device_type, "packet": packet_id, diff --git a/custom_components/ble_monitor/ble_parser/bluemaestro.py b/custom_components/ble_monitor/ble_parser/bluemaestro.py index 9d8c57b63..1f2a1fe6b 100644 --- a/custom_components/ble_monitor/ble_parser/bluemaestro.py +++ b/custom_components/ble_monitor/ble_parser/bluemaestro.py @@ -7,12 +7,12 @@ _LOGGER = logging.getLogger(__name__) -def parse_bluemaestro(self, data, source_mac, rssi): +def parse_bluemaestro(self, data: bytes, mac: str): """Parse BlueMaestro advertisement.""" msg_length = len(data) firmware = "BlueMaestro" device_id = data[4] - bluemaestro_mac = source_mac + bluemaestro_mac = mac msg = data[5:] if msg_length == 18 and device_id in [0x16, 0x17]: # BlueMaestro Tempo Disc THD @@ -52,20 +52,13 @@ def parse_bluemaestro(self, data, source_mac, rssi): else: if self.report_unknown == "BlueMaestro": _LOGGER.info( - "BLE ADV from UNKNOWN BlueMaestro DEVICE: RSSI: %s, MAC: %s, ADV: %s", - rssi, - to_mac(source_mac), + "BLE ADV from UNKNOWN BlueMaestro DEVICE: MAC: %s, ADV: %s", + to_mac(mac), data.hex() ) return None - # check for MAC presence in whitelist, if needed - if self.discovery is False and bluemaestro_mac not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(bluemaestro_mac)) - return None - result.update({ - "rssi": rssi, "mac": to_unformatted_mac(bluemaestro_mac), "type": device_type, "packet": log_cnt, diff --git a/custom_components/ble_monitor/ble_parser/blustream.py b/custom_components/ble_monitor/ble_parser/blustream.py index f133d8b0f..97abd87fc 100644 --- a/custom_components/ble_monitor/ble_parser/blustream.py +++ b/custom_components/ble_monitor/ble_parser/blustream.py @@ -7,11 +7,11 @@ _LOGGER = logging.getLogger(__name__) -def parse_blustream(self, data, source_mac, rssi): +def parse_blustream(self, data: bytes, mac: str): """Parse Blustream advertisement.""" msg_length = len(data) firmware = "Blustream" - blustream_mac = source_mac + blustream_mac = mac msg = data[8:] if msg_length == 13: @@ -26,20 +26,13 @@ def parse_blustream(self, data, source_mac, rssi): else: if self.report_unknown == "Blustream": _LOGGER.info( - "BLE ADV from UNKNOWN Blustream DEVICE: RSSI: %s, MAC: %s, ADV: %s", - rssi, - to_mac(source_mac), + "BLE ADV from UNKNOWN Blustream DEVICE: MAC: %s, ADV: %s", + to_mac(mac), data.hex() ) return None - # check for MAC presence in whitelist, if needed - if self.discovery is False and blustream_mac not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(blustream_mac)) - return None - result.update({ - "rssi": rssi, "mac": to_unformatted_mac(blustream_mac), "type": device_type, "packet": "no packet id", diff --git a/custom_components/ble_monitor/ble_parser/bparasite.py b/custom_components/ble_monitor/ble_parser/bparasite.py index 78ed01f00..030673b52 100644 --- a/custom_components/ble_monitor/ble_parser/bparasite.py +++ b/custom_components/ble_monitor/ble_parser/bparasite.py @@ -7,14 +7,13 @@ _LOGGER = logging.getLogger(__name__) -def parse_bparasite(self, data, source_mac, rssi): +def parse_bparasite(self, data: bytes, mac: str): """Check for adstruc length""" msg_length = len(data) if msg_length == 22: # TODO: Use protocol bits? - bpara_mac = data[14:20] device_type = "b-parasite V1.1.0" firmware = "b-parasite V1.1.0 (with illuminance)" - (protocol, packet_id, batt, temp, humi, moist, mac, light) = unpack(">BBHhHH6sH", data[4:]) + (protocol, packet_id, batt, temp, humi, moist, bpara_mac, light) = unpack(">BBHhHH6sH", data[4:]) result = { "temperature": temp / (100 if (protocol >> 4) == 2 else 1000), "humidity": (humi / 65536) * 100, @@ -24,10 +23,9 @@ def parse_bparasite(self, data, source_mac, rssi): "data": True } elif msg_length == 20: - bpara_mac = data[14:20] device_type = "b-parasite V1.0.0" firmware = "b-parasite V1.0.0 (without illuminance)" - (protocol, packet_id, batt, temp, humi, moist, mac) = unpack(">BBHhHH6s", data[4:]) + (protocol, packet_id, batt, temp, humi, moist, bpara_mac) = unpack(">BBHhHH6s", data[4:]) result = { "temperature": temp / (100 if (protocol >> 4) == 2 else 1000), "humidity": (humi / 65536) * 100, @@ -38,20 +36,15 @@ def parse_bparasite(self, data, source_mac, rssi): else: if self.report_unknown == "b-parasite": _LOGGER.info( - "BLE ADV from UNKNOWN b-parasite DEVICE: RSSI: %s, MAC: %s, AdStruct(%d): %s", - rssi, - to_mac(source_mac), + "BLE ADV from UNKNOWN b-parasite DEVICE: MAC: %s, AdStruct(%d): %s", + to_mac(mac), msg_length, data.hex() ) return None - # check for MAC presence in sensor whitelist, if needed - if self.discovery is False and bpara_mac not in self.sensor_whitelist: - return None - try: - prev_packet = self.lpacket_ids[bpara_mac] + prev_packet = self.lpacket_ids[mac] except KeyError: # start with empty first packet prev_packet = None @@ -61,11 +54,10 @@ def parse_bparasite(self, data, source_mac, rssi): if prev_packet == packet_id: return None - self.lpacket_ids[bpara_mac] = packet_id + self.lpacket_ids[mac] = packet_id result.update({ - "rssi": rssi, - "mac": to_unformatted_mac(bpara_mac), + "mac": to_unformatted_mac(mac), "type": device_type, "packet": packet_id, "firmware": firmware, diff --git a/custom_components/ble_monitor/ble_parser/bthome.py b/custom_components/ble_monitor/ble_parser/bthome.py index 966f40326..662cfd20f 100644 --- a/custom_components/ble_monitor/ble_parser/bthome.py +++ b/custom_components/ble_monitor/ble_parser/bthome.py @@ -87,11 +87,10 @@ def parse_event_properties( } -def parse_bthome(self, data, uuid16, source_mac, rssi): +def parse_bthome(self, data: str, uuid16: int, mac: bytes): """BTHome BLE parser""" self.uuid16 = uuid16 - self.bthome_mac = source_mac - self.rssi = rssi + self.mac = mac if self.uuid16 == 0xFCD2: # BTHome V2 format @@ -319,9 +318,8 @@ def parse_payload(self, payload, sw_version): if not result: if self.report_unknown == "BTHome": _LOGGER.info( - "BLE ADV from BTHome DEVICE: RSSI: %s, MAC: %s, ADV: %s", - self.rssi, - to_mac(self.bthome_mac), + "BLE ADV from BTHome DEVICE: MAC: %s, ADV: %s", + to_mac(self.mac), payload.hex() ) return None @@ -333,7 +331,7 @@ def parse_payload(self, payload, sw_version): # Check for duplicate messages if self.packet_id: try: - prev_packet = self.lpacket_ids[self.bthome_mac] + prev_packet = self.lpacket_ids[self.mac] except KeyError: # start with empty first packet prev_packet = None @@ -341,18 +339,12 @@ def parse_payload(self, payload, sw_version): # only process new messages if self.filter_duplicates is True: return None - self.lpacket_ids[self.bthome_mac] = self.packet_id + self.lpacket_ids[self.mac] = self.packet_id else: self.packet_id = "no packet id" - # check for MAC presence in sensor whitelist, if needed - if self.discovery is False and self.bthome_mac not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(self.bthome_mac)) - return None - result.update({ - "rssi": self.rssi, - "mac": to_unformatted_mac(self.bthome_mac), + "mac": to_unformatted_mac(self.mac), "packet": self.packet_id, "type": self.device_type, "firmware": self.firmware, @@ -368,15 +360,15 @@ def decrypt_data(self, data: bytes, sw_version: int): _LOGGER.debug("Invalid data length (for decryption), adv: %s", data.hex()) # try to find encryption key for current device try: - key = self.aeskeys[self.bthome_mac] + key = self.aeskeys[self.mac] if len(key) != 16: _LOGGER.error("Encryption key should be 16 bytes (32 characters) long") return None, None except KeyError: # no encryption key found - if self.bthome_mac not in self.no_key_message: - _LOGGER.error("No encryption key found for device with MAC %s", to_mac(self.bthome_mac)) - self.no_key_message.append(self.bthome_mac) + if self.mac not in self.no_key_message: + _LOGGER.error("No encryption key found for device with MAC %s", to_mac(self.mac)) + self.no_key_message.append(self.mac) return None, None # prepare the data for decryption @@ -389,7 +381,7 @@ def decrypt_data(self, data: bytes, sw_version: int): mic = data[-4:] # nonce: mac [6], uuid16 [2 (v1) or 3 (v2)], count_id [4] - nonce = b"".join([self.bthome_mac, uuid, count_id]) + nonce = b"".join([self.mac, uuid, count_id]) cipher = AES.new(key, AES.MODE_CCM, nonce=nonce, mac_len=4) if sw_version == 1: cipher.update(b"\x11") @@ -405,7 +397,7 @@ def decrypt_data(self, data: bytes, sw_version: int): if decrypted_payload is None: _LOGGER.error( "Decryption failed for %s, decrypted payload is None", - to_mac(self.bthome_mac), + to_mac(self.mac), ) return None, None return decrypted_payload, count_id diff --git a/custom_components/ble_monitor/ble_parser/chefiq.py b/custom_components/ble_monitor/ble_parser/chefiq.py index ee052fa41..abe19b239 100644 --- a/custom_components/ble_monitor/ble_parser/chefiq.py +++ b/custom_components/ble_monitor/ble_parser/chefiq.py @@ -7,11 +7,10 @@ _LOGGER = logging.getLogger(__name__) -def parse_chefiq(self, data, source_mac, rssi): +def parse_chefiq(self, data: str, mac: bytes): """Parse Chef iQ advertisement.""" msg_length = len(data) firmware = "Chef iQ" - chefiq_mac = source_mac msg = data[6:] if msg_length == 22: # Chef iQ CQ60 @@ -32,24 +31,14 @@ def parse_chefiq(self, data, source_mac, rssi): else: if self.report_unknown == "Chef iQ": _LOGGER.info( - "BLE ADV from UNKNOWN Chef iQ DEVICE: RSSI: %s, MAC: %s, ADV: %s", - rssi, - to_mac(source_mac), + "BLE ADV from UNKNOWN Chef iQ DEVICE: MAC: %s, ADV: %s", + to_mac(mac), data.hex() ) return None - # check for MAC presence in whitelist, if needed - if self.discovery is False and chefiq_mac not in self.sensor_whitelist: - _LOGGER.debug( - "Discovery is disabled. MAC: %s is not whitelisted!", - to_mac(chefiq_mac) - ) - return None - result.update({ - "rssi": rssi, - "mac": to_unformatted_mac(chefiq_mac), + "mac": to_unformatted_mac(mac), "type": device_type, "packet": log_cnt, "firmware": firmware, diff --git a/custom_components/ble_monitor/ble_parser/const.py b/custom_components/ble_monitor/ble_parser/const.py index 1df0a8a2b..e9146295e 100644 --- a/custom_components/ble_monitor/ble_parser/const.py +++ b/custom_components/ble_monitor/ble_parser/const.py @@ -7,7 +7,6 @@ CONF_FIRMWARE: Final = "firmware" CONF_DATA: Final = "data" CONF_MANUFACTURER: Final = "manufacturer" -CONF_RSSI: Final = "rssi" CONF_BATTERY: Final = "battery" CONF_CONDUCTIVITY: Final = "conductivity" diff --git a/custom_components/ble_monitor/ble_parser/govee.py b/custom_components/ble_monitor/ble_parser/govee.py index cf6396108..3ca9dc961 100644 --- a/custom_components/ble_monitor/ble_parser/govee.py +++ b/custom_components/ble_monitor/ble_parser/govee.py @@ -50,7 +50,7 @@ def decode_pm25_from_4_bytes(packet_value: int) -> int: return int(packet_value % 1000) -def parse_govee(self, data, service_class_uuid16, local_name, source_mac, rssi): +def parse_govee(self, data: str, service_class_uuid16: int, local_name: str, mac: bytes): """Parser for Govee sensors""" # The parser needs to handle the bug in the Govee BLE advertisement # data as INTELLI_ROCKS sometimes ends up glued on to the end of the message @@ -58,7 +58,6 @@ def parse_govee(self, data, service_class_uuid16, local_name, source_mac, rssi): data = data[:-25] msg_length = len(data) firmware = "Govee" - govee_mac = source_mac device_id = (data[3] << 8) | data[2] result = {"firmware": firmware} if msg_length == 10 and ( @@ -135,8 +134,8 @@ def parse_govee(self, data, service_class_uuid16, local_name, source_mac, rssi): device_type = "H5178" elif sensor_id == 1: device_type = "H5178-outdoor" - govee_mac_outdoor = int.from_bytes(govee_mac, 'big') + 1 - govee_mac = bytearray(govee_mac_outdoor.to_bytes(len(govee_mac), 'big')) + mac_outdoor = int.from_bytes(mac, 'big') + 1 + mac = bytearray(mac_outdoor.to_bytes(len(mac), 'big')) else: _LOGGER.debug( "Unknown sensor id for Govee H5178, please report to the developers, data: %s", @@ -235,21 +234,14 @@ def parse_govee(self, data, service_class_uuid16, local_name, source_mac, rssi): else: if self.report_unknown == "Govee": _LOGGER.info( - "BLE ADV from UNKNOWN Govee DEVICE: RSSI: %s, MAC: %s, ADV: %s", - rssi, - to_mac(source_mac), + "BLE ADV from UNKNOWN Govee DEVICE: MAC: %s, ADV: %s", + to_mac(mac), data.hex() ) return None - # check for MAC presence in sensor whitelist, if needed - if self.discovery is False and govee_mac not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(govee_mac)) - return None - result.update({ - "rssi": rssi, - "mac": to_unformatted_mac(govee_mac), + "mac": to_unformatted_mac(mac), "type": device_type, "packet": "no packet id", "firmware": firmware, diff --git a/custom_components/ble_monitor/ble_parser/grundfos.py b/custom_components/ble_monitor/ble_parser/grundfos.py index e58c4c5c7..d1b8dd1fe 100644 --- a/custom_components/ble_monitor/ble_parser/grundfos.py +++ b/custom_components/ble_monitor/ble_parser/grundfos.py @@ -21,11 +21,10 @@ } -def parse_grundfos(self, data, source_mac, rssi): +def parse_grundfos(self, data: str, mac: bytes): """Grundfos parser""" device_type = "MI401" firmware = "Grundfos" - grundfos_mac = source_mac xvalue = data[6:17] (packet, bat_status, pump_id, flow, press, pump_mode, temp) = unpack( @@ -45,20 +44,13 @@ def parse_grundfos(self, data, source_mac, rssi): if self.report_unknown == "Grundfos": _LOGGER.info( - "BLE ADV from UNKNOWN Grundfos DEVICE: RSSI: %s, MAC: %s, ADV: %s", - rssi, - to_mac(grundfos_mac), + "BLE ADV from UNKNOWN Grundfos DEVICE: MAC: %s, ADV: %s", + to_mac(mac), data.hex() ) - # check for MAC presence in sensor whitelist, if needed - if self.discovery is False and grundfos_mac not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(grundfos_mac)) - return None - result.update({ - "rssi": rssi, - "mac": to_unformatted_mac(grundfos_mac), + "mac": to_unformatted_mac(mac), "type": device_type, "firmware": firmware, "data": True diff --git a/custom_components/ble_monitor/ble_parser/hhcc.py b/custom_components/ble_monitor/ble_parser/hhcc.py index 35ce81f57..40b111a86 100644 --- a/custom_components/ble_monitor/ble_parser/hhcc.py +++ b/custom_components/ble_monitor/ble_parser/hhcc.py @@ -4,17 +4,16 @@ from .const import (CONF_BATTERY, CONF_CONDUCTIVITY, CONF_DATA, CONF_FIRMWARE, CONF_ILLUMINANCE, CONF_MAC, CONF_MOISTURE, CONF_PACKET, - CONF_RSSI, CONF_TEMPERATURE, CONF_TYPE) + CONF_TEMPERATURE, CONF_TYPE) from .helpers import to_mac, to_unformatted_mac _LOGGER = logging.getLogger(__name__) -def parse_hhcc(self, data: str, source_mac: bytes, rssi: float): +def parse_hhcc(self, data: str, mac: bytes): """HHCC parser""" if len(data) == 13: device_type = "HHCCJCY10" - hhcc_mac = source_mac xvalue_1 = data[4:7] xvalue_2 = data[7:10] xvalue_3 = data[10:13] @@ -32,15 +31,13 @@ def parse_hhcc(self, data: str, source_mac: bytes, rssi: float): CONF_ILLUMINANCE: illu, CONF_CONDUCTIVITY: cond, CONF_BATTERY: batt, - CONF_RSSI: rssi, - CONF_MAC: to_unformatted_mac(hhcc_mac), + CONF_MAC: to_unformatted_mac(mac), } else: if self.report_unknown == "HHCC": _LOGGER.info( - "BLE ADV from UNKNOWN HHCC DEVICE: RSSI: %s, MAC: %s, ADV: %s", - rssi, - to_mac(source_mac), + "BLE ADV from UNKNOWN HHCC DEVICE: MAC: %s, ADV: %s", + to_mac(mac), data.hex() ) return None @@ -49,7 +46,7 @@ def parse_hhcc(self, data: str, source_mac: bytes, rssi: float): if packet_id: print("packet_id is", packet_id) try: - prev_packet = self.lpacket_ids[hhcc_mac] + prev_packet = self.lpacket_ids[mac] except KeyError: # start with empty first packet prev_packet = None @@ -57,13 +54,8 @@ def parse_hhcc(self, data: str, source_mac: bytes, rssi: float): # only process new messages if self.filter_duplicates is True: return None - self.lpacket_ids[hhcc_mac] = packet_id + self.lpacket_ids[mac] = packet_id else: packet_id = "no packet id" - # check for MAC presence in sensor whitelist, if needed - if self.discovery is False and hhcc_mac not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(hhcc_mac)) - return None - return sensor_data diff --git a/custom_components/ble_monitor/ble_parser/holyiot.py b/custom_components/ble_monitor/ble_parser/holyiot.py index b8d0db545..4c4b2a8ed 100644 --- a/custom_components/ble_monitor/ble_parser/holyiot.py +++ b/custom_components/ble_monitor/ble_parser/holyiot.py @@ -7,7 +7,7 @@ _LOGGER = logging.getLogger(__name__) -def parse_holyiot(self, data, source_mac, rssi): +def parse_holyiot(self, data: str, mac: bytes): """HolyIOT parser""" msg_length = len(data) firmware = "HolyIOT" @@ -16,11 +16,11 @@ def parse_holyiot(self, data, source_mac, rssi): if msg_length == 17: device_type = "HolyIOT BLE tracker" holyiot_mac = data[6:12] - if holyiot_mac != source_mac: + if holyiot_mac != mac: _LOGGER.debug( "HolyIOT MAC address doesn't match data MAC address. Data: %s with source mac: %s and HolyIOT mac: %s", data.hex(), - source_mac, + mac, holyiot_mac, ) return None @@ -66,21 +66,14 @@ def parse_holyiot(self, data, source_mac, rssi): else: if self.report_unknown == "HolyIOT": _LOGGER.info( - "BLE ADV from UNKNOWN HolyIOT DEVICE: RSSI: %s, MAC: %s, ADV: %s", - rssi, - to_mac(source_mac), + "BLE ADV from UNKNOWN HolyIOT DEVICE: MAC: %s, ADV: %s", + to_mac(mac), data.hex() ) return None - # check for MAC presence in sensor whitelist, if needed - if self.discovery is False and holyiot_mac.lower() not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(holyiot_mac)) - return None - result.update({ - "rssi": rssi, - "mac": to_unformatted_mac(holyiot_mac), + "mac": to_unformatted_mac(mac), "type": device_type, "packet": "no packet id", "firmware": firmware, diff --git a/custom_components/ble_monitor/ble_parser/hormann.py b/custom_components/ble_monitor/ble_parser/hormann.py index e85471149..509f5c106 100644 --- a/custom_components/ble_monitor/ble_parser/hormann.py +++ b/custom_components/ble_monitor/ble_parser/hormann.py @@ -6,10 +6,9 @@ _LOGGER = logging.getLogger(__name__) -def parse_hormann(self, data, source_mac, rssi): +def parse_hormann(self, data: str, mac: bytes): """Hörmann parser""" result = {"firmware": "Hörmann"} - hormann_mac = source_mac # Hörmann adv contain two 0xFF manufacturer specific data packets packet_start = 0 @@ -48,22 +47,15 @@ def parse_hormann(self, data, source_mac, rssi): if device_type is None: if self.report_unknown == "Hormann": _LOGGER.info( - "BLE ADV from UNKNOWN Hormann DEVICE: RSSI: %s, MAC: %s, DEVICE TYPE: %s, ADV: %s", - rssi, - to_mac(source_mac), + "BLE ADV from UNKNOWN Hormann DEVICE: MAC: %s, DEVICE TYPE: %s, ADV: %s", + to_mac(mac), device_type, data.hex() ) return None - # check for MAC presence in sensor whitelist, if needed - if self.discovery is False and hormann_mac not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(hormann_mac)) - return None - result.update({ - "rssi": rssi, - "mac": to_unformatted_mac(hormann_mac), + "mac": to_unformatted_mac(mac), "type": device_type, "packet": "no packet id", "data": True diff --git a/custom_components/ble_monitor/ble_parser/ibeacon.py b/custom_components/ble_monitor/ble_parser/ibeacon.py index 90df76f04..994f72d2c 100644 --- a/custom_components/ble_monitor/ble_parser/ibeacon.py +++ b/custom_components/ble_monitor/ble_parser/ibeacon.py @@ -5,8 +5,8 @@ from .const import (CONF_CYPRESS_HUMIDITY, CONF_CYPRESS_TEMPERATURE, CONF_DATA, CONF_FIRMWARE, CONF_MAC, CONF_MAJOR, CONF_MEASURED_POWER, - CONF_MINOR, CONF_PACKET, CONF_RSSI, CONF_TRACKER_ID, - CONF_TYPE, CONF_UUID) + CONF_MINOR, CONF_PACKET, CONF_TRACKER_ID, CONF_TYPE, + CONF_UUID) from .helpers import to_mac, to_unformatted_mac, to_uuid _LOGGER = logging.getLogger(__name__) @@ -14,15 +14,14 @@ DEVICE_TYPE: Final = "iBeacon" -def parse_ibeacon(self, data: str, source_mac: str, rssi: float): +def parse_ibeacon(self, data: str, mac: str): """Parse iBeacon advertisements""" if data[5] == 0x15 and len(data) >= 27: uuid = data[6:22] (major, minor, power) = unpack(">HHb", data[22:27]) tracker_data = { - CONF_RSSI: rssi, - CONF_MAC: to_unformatted_mac(source_mac), + CONF_MAC: to_unformatted_mac(mac), CONF_UUID: to_uuid(uuid).replace('-', ''), CONF_TRACKER_ID: uuid, CONF_MAJOR: major, @@ -41,18 +40,11 @@ def parse_ibeacon(self, data: str, source_mac: str, rssi: float): else: if self.report_unknown == DEVICE_TYPE: _LOGGER.info( - "BLE ADV from UNKNOWN %s DEVICE: RSSI: %s, MAC: %s, ADV: %s", + "BLE ADV from UNKNOWN %s DEVICE: MAC: %s, ADV: %s", DEVICE_TYPE, - rssi, - to_mac(source_mac), + to_mac(mac), data.hex() ) return None, None - # check for UUID presence in sensor whitelist, if needed - if self.discovery is False and uuid and uuid not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. UUID: %s is not whitelisted!", to_uuid(uuid)) - - return None, None - return sensor_data, tracker_data diff --git a/custom_components/ble_monitor/ble_parser/inkbird.py b/custom_components/ble_monitor/ble_parser/inkbird.py index d86791a78..f7ed6af9f 100644 --- a/custom_components/ble_monitor/ble_parser/inkbird.py +++ b/custom_components/ble_monitor/ble_parser/inkbird.py @@ -16,13 +16,12 @@ def convert_temperature(temp): return temperature -def parse_inkbird(self, data, complete_local_name, source_mac, rssi): +def parse_inkbird(self, data: bytes, complete_local_name: str, mac: str): """Inkbird parser""" msg_length = len(data) firmware = "Inkbird" result = {"firmware": firmware} if msg_length == 11 and complete_local_name in ["sps", "tps"]: - inkbird_mac = source_mac xvalue = data[2:10] (temp, hum, probe, modbus, bat) = unpack("HHBB", data[22:]) # data follows the iBeacon temperature and humidity definition temp = round(175 * temp / 65535 - 45, 2) @@ -32,12 +31,12 @@ def parse_jaalee(self, data, source_mac, rssi): batt = data[4] jaalee_mac_reversed = data[5:11] jaalee_mac = jaalee_mac_reversed[::-1] - if jaalee_mac != source_mac: + if jaalee_mac != mac: _LOGGER.debug( "Jaalee MAC address doesn't match data MAC address. " "Data: %s with source mac: %s and jaalee mac: %s", data.hex(), - source_mac, + mac, jaalee_mac, ) return None @@ -56,21 +55,14 @@ def parse_jaalee(self, data, source_mac, rssi): else: if self.report_unknown == "Jaalee": _LOGGER.info( - "BLE ADV from UNKNOWN Jaalee DEVICE: RSSI: %s, MAC: %s, ADV: %s", - rssi, - to_mac(source_mac), + "BLE ADV from UNKNOWN Jaalee DEVICE: MAC: %s, ADV: %s", + to_mac(mac), data.hex() ) return None - # check for MAC presence in sensor whitelist, if needed - if self.discovery is False and jaalee_mac.lower() not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(jaalee_mac)) - return None - result.update({ - "rssi": rssi, - "mac": to_unformatted_mac(jaalee_mac), + "mac": to_unformatted_mac(mac), "type": device_type, "packet": "no packet id", "firmware": firmware, diff --git a/custom_components/ble_monitor/ble_parser/jinou.py b/custom_components/ble_monitor/ble_parser/jinou.py index 9be67ad53..9a7e7ac60 100644 --- a/custom_components/ble_monitor/ble_parser/jinou.py +++ b/custom_components/ble_monitor/ble_parser/jinou.py @@ -6,14 +6,13 @@ _LOGGER = logging.getLogger(__name__) -def parse_jinou(self, data, source_mac, rssi): +def parse_jinou(self, data: bytes, mac: str): """Jinou parser""" msg_length = len(data) firmware = "Jinou" result = {"firmware": firmware} if msg_length == 15: device_type = "BEC07-5" - jinou_mac = source_mac temp = float(str(data[3]) + "." + str(data[4])) if data[2] == 1: temp = temp * -1 @@ -28,21 +27,14 @@ def parse_jinou(self, data, source_mac, rssi): else: if self.report_unknown == "Jinou": _LOGGER.info( - "BLE ADV from UNKNOWN Jinou DEVICE: RSSI: %s, MAC: %s, ADV: %s", - rssi, - to_mac(source_mac), + "BLE ADV from UNKNOWN Jinou DEVICE: MAC: %s, ADV: %s", + to_mac(mac), data.hex() ) return None - # check for MAC presence in sensor whitelist, if needed - if self.discovery is False and jinou_mac not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(jinou_mac)) - return None - result.update({ - "rssi": rssi, - "mac": to_unformatted_mac(jinou_mac), + "mac": to_unformatted_mac(mac), "type": device_type, "packet": "no packet id", "firmware": firmware, diff --git a/custom_components/ble_monitor/ble_parser/kegtron.py b/custom_components/ble_monitor/ble_parser/kegtron.py index f77f21223..3429457f6 100644 --- a/custom_components/ble_monitor/ble_parser/kegtron.py +++ b/custom_components/ble_monitor/ble_parser/kegtron.py @@ -21,12 +21,11 @@ } -def parse_kegtron(self, data, source_mac, rssi): +def parse_kegtron(self, data: bytes, mac: str): """Parser for Kegtron sensors""" msg_length = len(data) if msg_length == 31: firmware = "Kegtron" - kegtron_mac = source_mac (device_id,) = unpack(">B", data[10:11]) if device_id & (1 << 6): device_type = "Kegtron KT-200" @@ -75,17 +74,11 @@ def parse_kegtron(self, data, source_mac, rssi): else: return None - # check for MAC presence in sensor whitelist, if needed - if self.discovery is False and kegtron_mac not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(kegtron_mac)) - return None - result.update({ "type": device_type, "firmware": firmware, - "mac": to_unformatted_mac(kegtron_mac), + "mac": to_unformatted_mac(mac), "packet": "no packet id", - "rssi": rssi, "data": True, }) return result @@ -93,7 +86,7 @@ def parse_kegtron(self, data, source_mac, rssi): if self.report_unknown == "Kegtron": _LOGGER.debug( "UNKNOWN dataobject from Kegtron DEVICE: MAC: %s, ADV: %s", - to_mac(source_mac), + to_mac(mac), data.hex() ) return None diff --git a/custom_components/ble_monitor/ble_parser/kkm.py b/custom_components/ble_monitor/ble_parser/kkm.py index 4e05c8494..1626d3066 100644 --- a/custom_components/ble_monitor/ble_parser/kkm.py +++ b/custom_components/ble_monitor/ble_parser/kkm.py @@ -8,14 +8,12 @@ _LOGGER = logging.getLogger(__name__) -def parse_kkm(self, data, source_mac, rssi): +def parse_kkm(self, data: bytes, mac: str): """Parser for KKM sensors.""" - kkm_mac = source_mac device_type = "K6 Sensor Beacon" result = { - "mac": to_unformatted_mac(kkm_mac), + "mac": to_unformatted_mac(mac), "type": device_type, - "rssi": rssi, "data": False, } if len(data) == 19: @@ -50,9 +48,8 @@ def parse_kkm(self, data, source_mac, rssi): if result is None: if self.report_unknown == "KKM": _LOGGER.info( - "BLE ADV from UNKNOWN KKM DEVICE: RSSI: %s, MAC: %s, ADV: %s", - rssi, - to_mac(source_mac), + "BLE ADV from UNKNOWN KKM DEVICE: MAC: %s, ADV: %s", + to_mac(mac), data.hex(), ) return None @@ -71,9 +68,5 @@ def parse_kkm(self, data, source_mac, rssi): else: batt = 0 result["battery"] = round(batt, 1) - # check for MAC presence in sensor whitelist, if needed - if self.discovery is False and kkm_mac.lower() not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(kkm_mac)) - return None return result diff --git a/custom_components/ble_monitor/ble_parser/laica.py b/custom_components/ble_monitor/ble_parser/laica.py index daae6a991..7f914f6ce 100755 --- a/custom_components/ble_monitor/ble_parser/laica.py +++ b/custom_components/ble_monitor/ble_parser/laica.py @@ -30,15 +30,14 @@ def read_impedance(data): return impedance -def parse_laica(self, data, source_mac, rssi): +def parse_laica(self, data: bytes, mac: str): """Parser for Laica sensors""" xvalue = data[4:] result = { "type": "Laica Smart Scale", "firmware": "Laica", - "mac": to_unformatted_mac(source_mac), - "rssi": rssi, + "mac": to_unformatted_mac(mac), "data": False, } @@ -58,7 +57,7 @@ def parse_laica(self, data, source_mac, rssi): # Check for duplicate messages packet_id = xvalue.hex() try: - prev_packet = self.lpacket_ids[source_mac] + prev_packet = self.lpacket_ids[mac] except KeyError: # start with empty first packet prev_packet = None @@ -66,7 +65,7 @@ def parse_laica(self, data, source_mac, rssi): # only process new messages if self.filter_duplicates is True: return None - self.lpacket_ids[source_mac] = packet_id + self.lpacket_ids[mac] = packet_id result.update({ "packet": packet_id, diff --git a/custom_components/ble_monitor/ble_parser/mikrotik.py b/custom_components/ble_monitor/ble_parser/mikrotik.py index e86267415..ebdc6ee31 100644 --- a/custom_components/ble_monitor/ble_parser/mikrotik.py +++ b/custom_components/ble_monitor/ble_parser/mikrotik.py @@ -13,7 +13,7 @@ def convert_8_8_to_float(val_1, val_2): return val_1 + (val_2 / 256) -def parse_mikrotik(self, data, source_mac, rssi): +def parse_mikrotik(self, data: bytes, mac: str): """Inkbird parser""" msg_length = len(data) firmware = "Mikrotik" @@ -23,7 +23,7 @@ def parse_mikrotik(self, data, source_mac, rssi): ( version, user_data, - salt, + _, acc_x_frac, acc_x, acc_y_frac, acc_y, acc_z_frac, acc_z, @@ -39,7 +39,7 @@ def parse_mikrotik(self, data, source_mac, rssi): _LOGGER.info( "Mikrotik device with MAC address %s uses encryption, which is not supported (yet)" "Disable encryption if you want to use this device in Home Assistant", - to_mac(source_mac), + to_mac(mac), ) return None @@ -88,21 +88,14 @@ def parse_mikrotik(self, data, source_mac, rssi): else: if self.report_unknown == "Mikrotik": _LOGGER.info( - "BLE ADV from UNKNOWN Mikrotik DEVICE: RSSI: %s, MAC: %s, ADV: %s", - rssi, - to_mac(source_mac), + "BLE ADV from UNKNOWN Mikrotik DEVICE: MAC: %s, ADV: %s", + to_mac(mac), data.hex() ) return None - # check for MAC presence in sensor whitelist, if needed - if self.discovery is False and source_mac not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(source_mac)) - return None - result.update({ - "rssi": rssi, - "mac": to_unformatted_mac(source_mac), + "mac": to_unformatted_mac(mac), "type": device_type, "packet": "no packet id", "firmware": firmware, diff --git a/custom_components/ble_monitor/ble_parser/miscale.py b/custom_components/ble_monitor/ble_parser/miscale.py index 3d7135af9..1946b1c45 100644 --- a/custom_components/ble_monitor/ble_parser/miscale.py +++ b/custom_components/ble_monitor/ble_parser/miscale.py @@ -7,7 +7,7 @@ _LOGGER = logging.getLogger(__name__) -def parse_miscale(self, data, source_mac, rssi): +def parse_miscale(self, data: bytes, mac: str): """Parser for Xiaomi Mi Scales.""" msg_length = len(data) uuid16 = (data[3] << 8) | data[2] @@ -62,7 +62,7 @@ def parse_miscale(self, data, source_mac, rssi): if self.report_unknown == "Mi Scale": _LOGGER.info( "BLE ADV from UNKNOWN Mi Scale DEVICE: MAC: %s, ADV: %s", - to_mac(source_mac), + to_mac(mac), data.hex() ) return None @@ -87,12 +87,11 @@ def parse_miscale(self, data, source_mac, rssi): pass firmware = device_type - miscale_mac = source_mac # Check for duplicate messages packet_id = xvalue.hex() try: - prev_packet = self.lpacket_ids[miscale_mac] + prev_packet = self.lpacket_ids[mac] except KeyError: # start with empty first packet prev_packet = None @@ -100,23 +99,17 @@ def parse_miscale(self, data, source_mac, rssi): # only process new messages if self.filter_duplicates is True: return None - self.lpacket_ids[miscale_mac] = packet_id + self.lpacket_ids[mac] = packet_id if prev_packet is None: if self.filter_duplicates is True: # ignore first message after a restart return None - # check for MAC presence in sensor whitelist, if needed - if self.discovery is False and miscale_mac not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(miscale_mac)) - return None - result.update({ "type": device_type, "firmware": firmware, - "mac": to_unformatted_mac(miscale_mac), + "mac": to_unformatted_mac(mac), "packet": packet_id, - "rssi": rssi, "data": True, }) return result diff --git a/custom_components/ble_monitor/ble_parser/moat.py b/custom_components/ble_monitor/ble_parser/moat.py index 4357cbea2..4822c860b 100644 --- a/custom_components/ble_monitor/ble_parser/moat.py +++ b/custom_components/ble_monitor/ble_parser/moat.py @@ -7,11 +7,10 @@ _LOGGER = logging.getLogger(__name__) -def parse_moat(self, data, source_mac, rssi): +def parse_moat(self, data: bytes, mac: str): """Parser for Moat sensors""" msg_length = len(data) firmware = "Moat" - moat_mac = source_mac device_id = (data[3] << 8) | data[2] result = {"firmware": firmware} if msg_length == 22 and device_id == 0x1000: @@ -41,21 +40,15 @@ def parse_moat(self, data, source_mac, rssi): else: if self.report_unknown == "Moat": _LOGGER.info( - "BLE ADV from UNKNOWN Moat DEVICE: RSSI: %s, MAC: %s, ADV: %s", - rssi, - to_mac(source_mac), + "BLE ADV from UNKNOWN Moat DEVICE: MAC: %s, ADV: %s", + + to_mac(mac), data.hex() ) return None - # check for MAC presence in whitelist, if needed - if self.discovery is False and moat_mac not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(moat_mac)) - return None - result.update({ - "rssi": rssi, - "mac": to_unformatted_mac(moat_mac), + "mac": to_unformatted_mac(mac), "type": device_type, "packet": "no packet id", "firmware": firmware, diff --git a/custom_components/ble_monitor/ble_parser/oral_b.py b/custom_components/ble_monitor/ble_parser/oral_b.py index 236803dee..e203cf346 100644 --- a/custom_components/ble_monitor/ble_parser/oral_b.py +++ b/custom_components/ble_monitor/ble_parser/oral_b.py @@ -28,11 +28,10 @@ } -def parse_oral_b(self, data, source_mac, rssi): +def parse_oral_b(self, data: bytes, mac: str): """Parser for Oral-B toothbrush.""" msg_length = len(data) firmware = "Oral-B" - oral_b_mac = source_mac result = {"firmware": firmware} if msg_length == 15: (state, pressure, counter, mode, sector, sector_timer, no_of_sectors) = unpack( @@ -93,21 +92,14 @@ def parse_oral_b(self, data, source_mac, rssi): else: if self.report_unknown == "Oral-B": _LOGGER.info( - "BLE ADV from UNKNOWN Oral-B DEVICE: RSSI: %s, MAC: %s, ADV: %s", - rssi, - to_mac(source_mac), + "BLE ADV from UNKNOWN Oral-B DEVICE: MAC: %s, ADV: %s", + to_mac(mac), data.hex() ) return None - # check for MAC presence in whitelist, if needed - if self.discovery is False and oral_b_mac not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(oral_b_mac)) - return None - result.update({ - "rssi": rssi, - "mac": to_unformatted_mac(oral_b_mac), + "mac": to_unformatted_mac(mac), "type": device_type, "packet": "no packet id", "firmware": firmware, diff --git a/custom_components/ble_monitor/ble_parser/oras.py b/custom_components/ble_monitor/ble_parser/oras.py index 4997da066..702cb80bc 100644 --- a/custom_components/ble_monitor/ble_parser/oras.py +++ b/custom_components/ble_monitor/ble_parser/oras.py @@ -6,11 +6,10 @@ _LOGGER = logging.getLogger(__name__) -def parse_oras(self, data, source_mac, rssi): +def parse_oras(self, data: bytes, mac: str): """Parser for Oras toothbrush.""" msg_length = len(data) firmware = "Oras" - oras_mac = source_mac result = {"firmware": firmware} if msg_length == 22: device_type = "Electra Washbasin Faucet" @@ -19,21 +18,14 @@ def parse_oras(self, data, source_mac, rssi): else: if self.report_unknown == "Oras": _LOGGER.info( - "BLE ADV from UNKNOWN Oras DEVICE: RSSI: %s, MAC: %s, ADV: %s", - rssi, - to_mac(source_mac), + "BLE ADV from UNKNOWN Oras DEVICE: MAC: %s, ADV: %s", + to_mac(mac), data.hex() ) return None - # check for MAC presence in whitelist, if needed - if self.discovery is False and oras_mac not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(oras_mac)) - return None - result.update({ - "rssi": rssi, - "mac": to_unformatted_mac(oras_mac), + "mac": to_unformatted_mac(mac), "type": device_type, "packet": "no packet id", "firmware": firmware, diff --git a/custom_components/ble_monitor/ble_parser/qingping.py b/custom_components/ble_monitor/ble_parser/qingping.py index 4173b0439..b40816b66 100644 --- a/custom_components/ble_monitor/ble_parser/qingping.py +++ b/custom_components/ble_monitor/ble_parser/qingping.py @@ -7,7 +7,7 @@ _LOGGER = logging.getLogger(__name__) -def parse_qingping(self, data, source_mac, rssi): +def parse_qingping(self, data: bytes, mac: str): """Qingping parser""" msg_length = len(data) if msg_length > 12: @@ -31,13 +31,12 @@ def parse_qingping(self, data, source_mac, rssi): device_type = None if device_type == "CGDN1": - qingping_mac = source_mac + qingping_mac = mac else: qingping_mac_reversed = data[6:12] qingping_mac = qingping_mac_reversed[::-1] result = { - "rssi": rssi, "packet": "no packet id", } xdata_point = 14 @@ -90,26 +89,19 @@ def parse_qingping(self, data, source_mac, rssi): if device_type is None: if self.report_unknown == "Qingping": _LOGGER.info( - "BLE ADV from UNKNOWN Qingping DEVICE: RSSI: %s, MAC: %s, ADV: %s", - rssi, - to_mac(source_mac), + "BLE ADV from UNKNOWN Qingping DEVICE: MAC: %s, ADV: %s", + to_mac(mac), data.hex() ) return None # check for MAC presence in message and in service data - if qingping_mac != source_mac: + if qingping_mac != mac: _LOGGER.debug("Invalid MAC address for Qingping device") return None - # check for MAC presence in sensor whitelist, if needed - if self.discovery is False and qingping_mac not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(qingping_mac)) - return None - result.update({ - "rssi": rssi, - "mac": to_unformatted_mac(qingping_mac), + "mac": to_unformatted_mac(mac), "type": device_type, "firmware": firmware, "data": True diff --git a/custom_components/ble_monitor/ble_parser/relsib.py b/custom_components/ble_monitor/ble_parser/relsib.py index d340ecb04..ad14845c9 100644 --- a/custom_components/ble_monitor/ble_parser/relsib.py +++ b/custom_components/ble_monitor/ble_parser/relsib.py @@ -7,15 +7,13 @@ _LOGGER = logging.getLogger(__name__) -def parse_relsib(self, data, source_mac, rssi): +def parse_relsib(self, data: bytes, mac: str): """Relsib parser""" msg_length = len(data) uuid16 = (data[3] << 8) | data[2] - relsib_mac = source_mac result = { - "rssi": rssi, "packet": "no packet id", - "mac": to_unformatted_mac(relsib_mac), + "mac": to_unformatted_mac(mac), "firmware": "Relsib", } if uuid16 in [0xAA20, 0xAA21, 0xAA22] and msg_length == 26: @@ -57,18 +55,12 @@ def parse_relsib(self, data, source_mac, rssi): if device_type is None: if self.report_unknown == "Relsib": _LOGGER.info( - "BLE ADV from UNKNOWN Relsib DEVICE: RSSI: %s, MAC: %s, ADV: %s", - rssi, - to_mac(source_mac), + "BLE ADV from UNKNOWN Relsib DEVICE: MAC: %s, ADV: %s", + to_mac(mac), data.hex() ) return None - # check for MAC presence in sensor whitelist, if needed - if self.discovery is False and relsib_mac not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(relsib_mac)) - return None - result.update({ "type": device_type, "data": True diff --git a/custom_components/ble_monitor/ble_parser/ruuvitag.py b/custom_components/ble_monitor/ble_parser/ruuvitag.py index 26b6e5ecb..07e3f8a21 100644 --- a/custom_components/ble_monitor/ble_parser/ruuvitag.py +++ b/custom_components/ble_monitor/ble_parser/ruuvitag.py @@ -9,14 +9,12 @@ _LOGGER = logging.getLogger(__name__) -def parse_ruuvitag(self, data, source_mac, rssi): +def parse_ruuvitag(self, data: bytes, mac: str): """Ruuvitag parser""" - ruuvitag_mac = source_mac device_type = "Ruuvitag" result = { - "mac": to_unformatted_mac(ruuvitag_mac), + "mac": to_unformatted_mac(mac), "type": device_type, - "rssi": rssi, "data": False, } adstuct_type = data[1] @@ -104,7 +102,7 @@ def parse_ruuvitag(self, data, source_mac, rssi): # Check for duplicate messages try: - prev_packet = self.lpacket_ids[ruuvitag_mac] + prev_packet = self.lpacket_ids[mac] except KeyError: # start with empty first packet prev_packet = None @@ -112,14 +110,14 @@ def parse_ruuvitag(self, data, source_mac, rssi): if self.filter_duplicates is True: # only process new messages return None - self.lpacket_ids[ruuvitag_mac] = packet_id + self.lpacket_ids[mac] = packet_id if prev_packet is None: if self.filter_duplicates is True: # ignore first message after a restart return None # Check for an increased movement counter try: - prev_movement = self.movements_list[ruuvitag_mac] + prev_movement = self.movements_list[mac] except KeyError: # start with empty movement first prev_movement = None @@ -127,7 +125,7 @@ def parse_ruuvitag(self, data, source_mac, rssi): motion = 0 else: motion = 1 - self.movements_list[ruuvitag_mac] = move_cnt + self.movements_list[mac] = move_cnt result.update( { @@ -172,9 +170,8 @@ def parse_ruuvitag(self, data, source_mac, rssi): if result is None: if self.report_unknown == "Ruuvitag": _LOGGER.info( - "BLE ADV from UNKNOWN Ruuvitag DEVICE: RSSI: %s, MAC: %s, ADV: %s", - rssi, - to_mac(source_mac), + "BLE ADV from UNKNOWN Ruuvitag DEVICE: MAC: %s, ADV: %s", + to_mac(mac), data.hex(), ) return None @@ -193,14 +190,11 @@ def parse_ruuvitag(self, data, source_mac, rssi): else: batt = 0 result["battery"] = round(batt, 1) - # check for MAC presence in sensor whitelist, if needed - if self.discovery is False and ruuvitag_mac not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(ruuvitag_mac)) - return None + if version < 5: _LOGGER.info( "Firmware version %i is outdated, consider updating your ruuvitag with MAC: %s to view all sensors", version, - to_mac(ruuvitag_mac), + to_mac(mac), ) return result diff --git a/custom_components/ble_monitor/ble_parser/sensirion.py b/custom_components/ble_monitor/ble_parser/sensirion.py index 727e00835..c72a0bfe0 100644 --- a/custom_components/ble_monitor/ble_parser/sensirion.py +++ b/custom_components/ble_monitor/ble_parser/sensirion.py @@ -13,29 +13,21 @@ ] -def parse_sensirion(self, data, complete_local_name, source_mac, rssi): +def parse_sensirion(self, data: bytes, complete_local_name: str, mac: str): """Sensirion parser""" result = {"firmware": "Sensirion"} - sensirion_mac = source_mac device_type = complete_local_name if device_type not in SENSIRION_DEVICES: if self.report_unknown == "Sensirion": _LOGGER.info( - "BLE ADV from UNKNOWN Sensirion DEVICE: %s RSSI: %s, MAC: %s, ADV: %s", + "BLE ADV from UNKNOWN Sensirion DEVICE: %s, MAC: %s, ADV: %s", device_type, - rssi, - to_mac(source_mac), + to_mac(mac), data.hex() ) return None - # check for MAC presence in sensor whitelist, if needed - if self.discovery is False and source_mac not in self.sensor_whitelist: - _LOGGER.debug( - "Discovery is disabled. MAC: %s is not whitelisted!", to_mac(source_mac)) - return None - # not all of the following values are used yet, but this explains the full protocol # bytes 1+2 (length and type) are part of the header advertisementLength = data[0] # redundant @@ -52,8 +44,7 @@ def parse_sensirion(self, data, complete_local_name, source_mac, rssi): else: result.update(samples) result.update({ - "rssi": rssi, - "mac": to_unformatted_mac(sensirion_mac), + "mac": to_unformatted_mac(mac), "type": device_type, "packet": "no packet id", "data": True diff --git a/custom_components/ble_monitor/ble_parser/sensorpush.py b/custom_components/ble_monitor/ble_parser/sensorpush.py index b1176ce9e..4a0a87f1d 100644 --- a/custom_components/ble_monitor/ble_parser/sensorpush.py +++ b/custom_components/ble_monitor/ble_parser/sensorpush.py @@ -67,10 +67,9 @@ def decode_values(mfg_data: bytes, device_type_id: int) -> dict: return values -def parse_sensorpush(self, data, source_mac, rssi): +def parse_sensorpush(self, data: bytes, mac: str): """Sensorpush parser""" result = {"firmware": "SensorPush"} - sensorpush_mac = source_mac device_type = None page_id = data[2] & 0x03 @@ -82,21 +81,14 @@ def parse_sensorpush(self, data, source_mac, rssi): if device_type is None: if self.report_unknown == "SensorPush": _LOGGER.info( - "BLE ADV from UNKNOWN SensorPush DEVICE: RSSI: %s, MAC: %s, ADV: %s", - rssi, - to_mac(source_mac), + "BLE ADV from UNKNOWN SensorPush DEVICE: MAC: %s, ADV: %s", + to_mac(mac), data.hex() ) return None - # check for MAC presence in sensor whitelist, if needed - if self.discovery is False and sensorpush_mac not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(sensorpush_mac)) - return None - result.update({ - "rssi": rssi, - "mac": to_unformatted_mac(sensorpush_mac), + "mac": to_unformatted_mac(mac), "type": device_type, "packet": "no packet id", "data": True diff --git a/custom_components/ble_monitor/ble_parser/smartdry.py b/custom_components/ble_monitor/ble_parser/smartdry.py index 6918fa310..1bd09240b 100644 --- a/custom_components/ble_monitor/ble_parser/smartdry.py +++ b/custom_components/ble_monitor/ble_parser/smartdry.py @@ -7,10 +7,9 @@ _LOGGER = logging.getLogger(__name__) -def parse_smartdry(self, data, source_mac, rssi): +def parse_smartdry(self, data: bytes, mac: str): """Parser for SmartDry cloth dryer""" msg_length = len(data) - smartdry_mac = source_mac if msg_length == 16: device_type = "SmartDry cloth dryer" firmware = "SmartDry" @@ -56,21 +55,14 @@ def parse_smartdry(self, data, source_mac, rssi): if device_type is None: if self.report_unknown == "SmartDry": _LOGGER.info( - "BLE ADV from UNKNOWN SmartDry DEVICE: RSSI: %s, MAC: %s, ADV: %s", - rssi, - to_mac(source_mac), + "BLE ADV from UNKNOWN SmartDry DEVICE: MAC: %s, ADV: %s", + to_mac(mac), data.hex() ) return None - # check for MAC presence in sensor whitelist, if needed - if self.discovery is False and smartdry_mac.lower() not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(smartdry_mac)) - return None - result.update({ - "rssi": rssi, - "mac": to_unformatted_mac(smartdry_mac), + "mac": to_unformatted_mac(mac), "type": device_type, "packet": "no packet id", "firmware": firmware, diff --git a/custom_components/ble_monitor/ble_parser/switchbot.py b/custom_components/ble_monitor/ble_parser/switchbot.py index 663528a6a..7cc90ee17 100644 --- a/custom_components/ble_monitor/ble_parser/switchbot.py +++ b/custom_components/ble_monitor/ble_parser/switchbot.py @@ -7,10 +7,9 @@ _LOGGER = logging.getLogger(__name__) -def parse_switchbot(self, data, source_mac, rssi): +def parse_switchbot(self, data: bytes, mac: str): """Switchbot parser""" msg_length = len(data) - switchbot_mac = source_mac device_id = data[4] if msg_length == 10 and device_id in [0x54, 0x69]: @@ -43,21 +42,14 @@ def parse_switchbot(self, data, source_mac, rssi): if device_type == "unknown": if self.report_unknown == "Switchbot": _LOGGER.info( - "BLE ADV from UNKNOWN Switchbot DEVICE: RSSI: %s, MAC: %s, ADV: %s", - rssi, - to_mac(source_mac), + "BLE ADV from UNKNOWN Switchbot DEVICE: MAC: %s, ADV: %s", + to_mac(mac), data.hex() ) return None - # check for MAC presence in sensor whitelist, if needed - if self.discovery is False and switchbot_mac not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(switchbot_mac)) - return None - result.update({ - "rssi": rssi, - "mac": to_unformatted_mac(switchbot_mac), + "mac": to_unformatted_mac(mac), "type": device_type, "packet": "no packet id", "firmware": firmware, diff --git a/custom_components/ble_monitor/ble_parser/teltonika.py b/custom_components/ble_monitor/ble_parser/teltonika.py index b4c33214c..948e69da3 100644 --- a/custom_components/ble_monitor/ble_parser/teltonika.py +++ b/custom_components/ble_monitor/ble_parser/teltonika.py @@ -7,10 +7,9 @@ _LOGGER = logging.getLogger(__name__) -def parse_teltonika(self, data, complete_local_name, source_mac, rssi): +def parse_teltonika(self, data: bytes, complete_local_name: str, mac: str): """Teltonika parser""" result = {"firmware": "Teltonika"} - teltonika_mac = source_mac device_id = (data[3] << 8) | data[2] if device_id == 0x089A: @@ -114,22 +113,15 @@ def parse_teltonika(self, data, complete_local_name, source_mac, rssi): if device_type is None: if self.report_unknown == "Teltonika": _LOGGER.info( - "BLE ADV from UNKNOWN Teltonika DEVICE: RSSI: %s, MAC: %s, DEVICE TYPE: %s, ADV: %s", - rssi, - to_mac(source_mac), + "BLE ADV from UNKNOWN Teltonika DEVICE: MAC: %s, DEVICE TYPE: %s, ADV: %s", + to_mac(mac), device_type, data.hex() ) return None - # check for MAC presence in sensor whitelist, if needed - if self.discovery is False and teltonika_mac not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(teltonika_mac)) - return None - result.update({ - "rssi": rssi, - "mac": to_unformatted_mac(teltonika_mac), + "mac": to_unformatted_mac(mac), "type": device_type, "packet": "no packet id", "data": True diff --git a/custom_components/ble_monitor/ble_parser/thermobeacon.py b/custom_components/ble_monitor/ble_parser/thermobeacon.py index a96312b2e..286847f16 100644 --- a/custom_components/ble_monitor/ble_parser/thermobeacon.py +++ b/custom_components/ble_monitor/ble_parser/thermobeacon.py @@ -7,7 +7,7 @@ _LOGGER = logging.getLogger(__name__) -def parse_thermobeacon(self, data, source_mac, rssi): +def parse_thermobeacon(self, data: bytes, mac: str): """Thermobeacon parser""" msg_length = len(data) device_id = data[2] @@ -61,26 +61,19 @@ def parse_thermobeacon(self, data, source_mac, rssi): if device_type is None: if self.report_unknown == "Thermobeacon": _LOGGER.info( - "BLE ADV from UNKNOWN Thermobeacon DEVICE: RSSI: %s, MAC: %s, ADV: %s", - rssi, - to_mac(source_mac), + "BLE ADV from UNKNOWN Thermobeacon DEVICE: MAC: %s, ADV: %s", + to_mac(mac), data.hex() ) return None # check for MAC presence in message and in service data - if thermobeacon_mac != source_mac: + if thermobeacon_mac != mac: _LOGGER.debug("Invalid MAC address for Thermobeacon device") return None - # check for MAC presence in sensor whitelist, if needed - if self.discovery is False and thermobeacon_mac not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(thermobeacon_mac)) - return None - result.update({ - "rssi": rssi, - "mac": to_unformatted_mac(thermobeacon_mac), + "mac": to_unformatted_mac(mac), "type": device_type, "packet": "no packet id", "firmware": firmware, diff --git a/custom_components/ble_monitor/ble_parser/thermopro.py b/custom_components/ble_monitor/ble_parser/thermopro.py index ba900ca28..97c58507d 100644 --- a/custom_components/ble_monitor/ble_parser/thermopro.py +++ b/custom_components/ble_monitor/ble_parser/thermopro.py @@ -7,12 +7,10 @@ _LOGGER = logging.getLogger(__name__) -def parse_thermopro(self, data, device_type, source_mac, rssi): +def parse_thermopro(self, data: bytes, device_type, mac: str): """Thermopro parser""" if device_type in ["TP357", "TP359"]: firmware = "Thermopro" - thermopro_mac = source_mac - xvalue = data[3:6] (temp, humi) = unpack("hhb", data[22:27]) tracker_data = { - CONF_RSSI: rssi, - CONF_MAC: to_unformatted_mac(source_mac), + CONF_MAC: to_unformatted_mac(mac), CONF_UUID: to_uuid(uuid).replace('-', ''), CONF_TRACKER_ID: uuid, CONF_MAJOR: major, @@ -40,17 +39,10 @@ def parse_tilt(self, data: str, source_mac: str, rssi: float): else: if self.report_unknown == "Tilt": _LOGGER.info( - "BLE ADV from UNKNOWN TILT DEVICE: RSSI: %s, MAC: %s, ADV: %s", - rssi, - to_mac(source_mac), + "BLE ADV from UNKNOWN TILT DEVICE: MAC: %s, ADV: %s", + to_mac(mac), data.hex() ) return None, None - # check for UUID presence in sensor whitelist, if needed - if self.discovery is False and uuid and uuid not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. UUID: %s is not whitelisted!", to_uuid(uuid)) - - return None, None - return sensor_data, tracker_data diff --git a/custom_components/ble_monitor/ble_parser/xiaogui.py b/custom_components/ble_monitor/ble_parser/xiaogui.py index 626b6648f..7333c2bbb 100644 --- a/custom_components/ble_monitor/ble_parser/xiaogui.py +++ b/custom_components/ble_monitor/ble_parser/xiaogui.py @@ -7,7 +7,7 @@ _LOGGER = logging.getLogger(__name__) -def parse_xiaogui(self, data, source_mac, rssi): +def parse_xiaogui(self, data: bytes, mac: str): """Xiaogui Scales parser""" msg_length = len(data) @@ -15,14 +15,13 @@ def parse_xiaogui(self, data, source_mac, rssi): firmware = "Xiaogui" xiaogui_mac = data[11:] - if xiaogui_mac != source_mac: + if xiaogui_mac != mac: _LOGGER.error("Xiaogui MAC address doesn't match data MAC address. Data: %s", data.hex()) return None result = { "firmware": firmware, "mac": to_unformatted_mac(xiaogui_mac), - "rssi": rssi, "data": True, } @@ -70,7 +69,7 @@ def parse_xiaogui(self, data, source_mac, rssi): if self.report_unknown == "Xiaogui": _LOGGER.info( "BLE ADV from UNKNOWN Xiaogui DEVICE: MAC: %s, ADV: %s", - to_mac(source_mac), + to_mac(mac), data.hex() ) return None @@ -79,18 +78,13 @@ def parse_xiaogui(self, data, source_mac, rssi): # Check for duplicate messages try: - prev_packet = self.lpacket_ids[xiaogui_mac] + prev_packet = self.lpacket_ids[mac] except KeyError: # start with empty first packet prev_packet = None if prev_packet == packet_id: # only process new messages return None - self.lpacket_ids[xiaogui_mac] = packet_id - - # check for MAC presence in whitelist, if needed - if self.discovery is False and xiaogui_mac not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(xiaogui_mac)) - return None + self.lpacket_ids[mac] = packet_id return result diff --git a/custom_components/ble_monitor/ble_parser/xiaomi.py b/custom_components/ble_monitor/ble_parser/xiaomi.py index d716d2888..4023ccb20 100755 --- a/custom_components/ble_monitor/ble_parser/xiaomi.py +++ b/custom_components/ble_monitor/ble_parser/xiaomi.py @@ -1080,7 +1080,7 @@ def obj5a16(xobj): } -def parse_xiaomi(self, data, source_mac, rssi): +def parse_xiaomi(self, data: bytes, mac: str): """Parser for Xiaomi sensors""" # check for adstruc length i = 9 # till Frame Counter @@ -1120,11 +1120,9 @@ def parse_xiaomi(self, data, source_mac, rssi): return None xiaomi_mac_reversed = data[9:15] xiaomi_mac = xiaomi_mac_reversed[::-1] - if xiaomi_mac != source_mac: + if xiaomi_mac != mac: _LOGGER.debug("Xiaomi MAC address doesn't match data MAC address. Data: %s", data.hex()) return None - else: - xiaomi_mac = source_mac # determine the device type device_id = data[6] + (data[7] << 8) @@ -1133,9 +1131,8 @@ def parse_xiaomi(self, data, source_mac, rssi): except KeyError: if self.report_unknown == "Xiaomi": _LOGGER.info( - "BLE ADV from UNKNOWN Xiaomi device: RSSI: %s, MAC: %s, ADV: %s", - rssi, - to_mac(source_mac), + "BLE ADV from UNKNOWN Xiaomi device: MAC: %s, ADV: %s", + to_mac(mac), data.hex() ) _LOGGER.debug("Unknown Xiaomi device found. Data: %s", data.hex()) @@ -1161,14 +1158,9 @@ def parse_xiaomi(self, data, source_mac, rssi): elif frctrl_auth_mode == 2: sinfo += ', Standard certification' - # check for MAC presence in sensor whitelist, if needed - if self.discovery is False and xiaomi_mac not in self.sensor_whitelist: - _LOGGER.debug("Discovery is disabled. MAC: %s is not whitelisted!", to_mac(xiaomi_mac)) - return None - # check for unique packet_id and advertisement priority try: - prev_packet = self.lpacket_ids[xiaomi_mac] + prev_packet = self.lpacket_ids[mac] except KeyError: # start with empty first packet prev_packet = None @@ -1177,13 +1169,13 @@ def parse_xiaomi(self, data, source_mac, rssi): # Check for adv priority and packet_id for devices that can also send in ATC format adv_priority = 19 try: - prev_adv_priority = self.adv_priority[xiaomi_mac] + prev_adv_priority = self.adv_priority[mac] except KeyError: # start with initial adv priority prev_adv_priority = 0 if adv_priority > prev_adv_priority: # always process advertisements with a higher priority - self.adv_priority[xiaomi_mac] = adv_priority + self.adv_priority[mac] = adv_priority elif adv_priority == prev_adv_priority: # only process messages with same priority that have a unique packet id if prev_packet == packet_id: @@ -1196,14 +1188,14 @@ def parse_xiaomi(self, data, source_mac, rssi): else: # do not process advertisements with lower priority (ATC advertisements will be used instead) prev_adv_priority -= 1 - self.adv_priority[xiaomi_mac] = prev_adv_priority + self.adv_priority[mac] = prev_adv_priority return None else: if prev_packet == packet_id: if self.filter_duplicates is True: # only process messages with highest priority and messages with unique packet id return None - self.lpacket_ids[xiaomi_mac] = packet_id + self.lpacket_ids[mac] = packet_id # check for capability byte present if frctrl_capability_include != 0: @@ -1228,9 +1220,9 @@ def parse_xiaomi(self, data, source_mac, rssi): sinfo += ', Encryption' firmware = "Xiaomi (MiBeacon V" + str(frctrl_version) + " encrypted)" if frctrl_version <= 3: - payload = decrypt_mibeacon_legacy(self, data, i, xiaomi_mac) + payload = decrypt_mibeacon_legacy(self, data, i, mac) else: - payload = decrypt_mibeacon_v4_v5(self, data, i, xiaomi_mac) + payload = decrypt_mibeacon_v4_v5(self, data, i, mac) else: # No encryption # check minimum advertisement length with data firmware = "Xiaomi (MiBeacon V" + str(frctrl_version) + ")" @@ -1245,8 +1237,7 @@ def parse_xiaomi(self, data, source_mac, rssi): return None result = { - "rssi": rssi, - "mac": to_unformatted_mac(xiaomi_mac), + "mac": to_unformatted_mac(mac), "type": device_type, "packet": packet_id, "firmware": firmware, @@ -1283,25 +1274,25 @@ def parse_xiaomi(self, data, source_mac, rssi): return result -def decrypt_mibeacon_v4_v5(self, data, i, xiaomi_mac): +def decrypt_mibeacon_v4_v5(self, data, i, mac): """decrypt MiBeacon v4/v5 encrypted advertisements""" # check for minimum length of encrypted advertisement if len(data) < i + 9: _LOGGER.debug("Invalid data length (for decryption), adv: %s", data.hex()) # try to find encryption key for current device try: - key = self.aeskeys[xiaomi_mac] + key = self.aeskeys[mac] if len(key) != 16: _LOGGER.error("Encryption key should be 16 bytes (32 characters) long") return None except KeyError: # no encryption key found - if xiaomi_mac not in self.no_key_message: - _LOGGER.error("No encryption key found for device with MAC %s", to_mac(xiaomi_mac)) - self.no_key_message.append(xiaomi_mac) + if mac not in self.no_key_message: + _LOGGER.error("No encryption key found for device with MAC %s", to_mac(mac)) + self.no_key_message.append(mac) return None - nonce = b"".join([xiaomi_mac[::-1], data[6:9], data[-7:-4]]) + nonce = b"".join([mac[::-1], data[6:9], data[-7:-4]]) aad = b"\x11" token = data[-4:] cipherpayload = data[i:-7] @@ -1319,30 +1310,30 @@ def decrypt_mibeacon_v4_v5(self, data, i, xiaomi_mac): if decrypted_payload is None: _LOGGER.error( "Decryption failed for %s, decrypted payload is None", - to_mac(xiaomi_mac), + to_mac(mac), ) return None return decrypted_payload -def decrypt_mibeacon_legacy(self, data, i, xiaomi_mac): +def decrypt_mibeacon_legacy(self, data, i, mac): """decrypt MiBeacon v2/v3 encrypted advertisements""" # check for minimum length of encrypted advertisement if len(data) < i + 7: _LOGGER.debug("Invalid data length (for decryption), adv: %s", data.hex()) # try to find encryption key for current device try: - aeskey = self.aeskeys[xiaomi_mac] + aeskey = self.aeskeys[mac] if len(aeskey) != 12: _LOGGER.error("Encryption key should be 12 bytes (24 characters) long") return None key = b"".join([aeskey[0:6], bytes.fromhex("8d3d3c97"), aeskey[6:]]) except KeyError: # no encryption key found - _LOGGER.error("No encryption key found for device with MAC %s", to_mac(xiaomi_mac)) + _LOGGER.error("No encryption key found for device with MAC %s", to_mac(mac)) return None - nonce = b"".join([data[4:9], data[-4:-1], xiaomi_mac[::-1][:-1]]) + nonce = b"".join([data[4:9], data[-4:-1], mac[::-1][:-1]]) aad = b"\x11" cipherpayload = data[i:-4] cipher = AES.new(key, AES.MODE_CCM, nonce=nonce, mac_len=4) @@ -1358,7 +1349,7 @@ def decrypt_mibeacon_legacy(self, data, i, xiaomi_mac): if decrypted_payload is None: _LOGGER.warning( "Decryption failed for %s, decrypted payload is None", - to_mac(xiaomi_mac), + to_mac(mac), ) return None return decrypted_payload diff --git a/custom_components/ble_monitor/const.py b/custom_components/ble_monitor/const.py index 4dbb4fa83..be9f64d95 100755 --- a/custom_components/ble_monitor/const.py +++ b/custom_components/ble_monitor/const.py @@ -189,7 +189,7 @@ class BLEMonitorBinarySensorEntityDescription( force_update=True, ), BLEMonitorBinarySensorEntityDescription( - key="gas_detected", + key="gas detected", sensor_class="BaseBinarySensor", update_behavior="Instantly", name="gas", @@ -1637,7 +1637,6 @@ class BLEMonitorBinarySensorEntityDescription( 'YM-K1501' : [["rssi"], ["temperature"], ["switch"]], 'YM-K1501EU' : [["rssi"], ["temperature"], ["switch"]], 'V-SK152' : [["rssi"], ["temperature"], ["switch"]], - 'SJWS01LM' : [["battery", "rssi"], ["button"], ["moisture detected"]], 'MJYD02YL' : [["battery", "rssi"], [], ["light", "motion"]], 'MUE4094RT' : [["rssi"], [], ["motion"]], 'RTCGQ02LM' : [["battery", "rssi"], ["button"], ["light", "motion"]], @@ -1667,7 +1666,6 @@ class BLEMonitorBinarySensorEntityDescription( 'MHO-C401' : [["temperature", "humidity", "battery", "voltage", "rssi"], [], []], 'MHO-C303' : [["temperature", "humidity", "battery", "rssi"], [], []], 'JQJCY01YM' : [["temperature", "humidity", "battery", "formaldehyde", "rssi"], [], []], - 'JTYJGD03MI' : [["rssi"], ["button", "battery"], ["smoke detector"]], 'K9B-1BTN' : [["rssi"], ["one btn switch"], []], 'K9B-2BTN' : [["rssi"], ["two btn switch left", "two btn switch right"], []], 'K9B-3BTN' : [["rssi"], ["three btn switch left", "three btn switch middle", "three btn switch right"], []], @@ -1772,7 +1770,6 @@ class BLEMonitorBinarySensorEntityDescription( 'YM-K1501' : 'Xiaomi', 'YM-K1501EU' : 'Xiaomi', 'V-SK152' : 'Viomi', - 'SJWS01LM' : 'Xiaomi', 'MJYD02YL' : 'Xiaomi', 'MUE4094RT' : 'Xiaomi', 'RTCGQ02LM' : 'Xiaomi', @@ -1801,7 +1798,6 @@ class BLEMonitorBinarySensorEntityDescription( 'MHO-C401' : 'Miaomiaoce', 'MHO-C303' : 'Miaomiaoce', 'JQJCY01YM' : 'Honeywell', - 'JTYJGD03MI' : 'Honeywell', 'YLAI003' : 'Yeelight', 'YLYK01YL' : 'Yeelight', 'YLYK01YL-FANCL' : 'Yeelight', @@ -1927,6 +1923,7 @@ class BLEMonitorBinarySensorEntityDescription( 'MI401' : 'Grundfos', 'HHCCJCY10' : 'HHCC', 'HolyIOT BLE tracker' : 'HolyIOT', + 'JTYJGD03MI' : 'Honeywell', 'Supramatic E4 BS' : 'Hörmann', 'IBS-TH' : 'Inkbird', 'IBS-TH2/P01B' : 'Inkbird', @@ -1952,6 +1949,7 @@ class BLEMonitorBinarySensorEntityDescription( 'Tilt Yellow' : 'Tilt', 'Tilt Pink' : 'Tilt', 'MMC-W505' : 'Xiaomi', + 'SJWS01LM' : 'Xiaomi', } diff --git a/custom_components/ble_monitor/test/test_xiaomi_parser.py b/custom_components/ble_monitor/test/test_xiaomi_parser.py index 30dd4b1ea..c2a0a3dc2 100644 --- a/custom_components/ble_monitor/test/test_xiaomi_parser.py +++ b/custom_components/ble_monitor/test/test_xiaomi_parser.py @@ -316,6 +316,29 @@ def test_Xiaomi_V_SK152(self): def test_Xiaomi_SJWS01LM(self): """Test Xiaomi parser for SJWS01LM.""" + self.aeskeys = {} + data_string = "043e2902010000bc27e044ef541d020106191695fe5859630808bc27e044ef54f58fe704000000fc69d15ca8" + data = bytes(bytearray.fromhex(data_string)) + + aeskey = "255e6cabb39b2eddd0de992b9fee2bf2" + + is_ext_packet = True if data[3] == 0x0D else False + mac = (data[8 if is_ext_packet else 7:14 if is_ext_packet else 13])[::-1] + mac_address = mac.hex() + p_mac = bytes.fromhex(mac_address.replace(":", "").lower()) + p_key = bytes.fromhex(aeskey.lower()) + self.aeskeys[p_mac] = p_key + # pylint: disable=unused-variable + ble_parser = BleParser(aeskeys=self.aeskeys) + sensor_msg, tracker_msg = ble_parser.parse_raw_data(data) + + assert sensor_msg["firmware"] == "Xiaomi (MiBeacon V5 encrypted)" + assert sensor_msg["type"] == "SJWS01LM" + assert sensor_msg["mac"] == "54EF44E027BC" + assert sensor_msg["packet"] == 8 + assert sensor_msg["data"] + assert sensor_msg["moisture detected"] + assert sensor_msg["rssi"] == -88 def test_Xiaomi_MJYD02YL(self): """Test Xiaomi parser for MJYD02YL."""