diff --git a/lib/logitech_receiver/notifications.py b/lib/logitech_receiver/notifications.py index 6560289a2..7d767a3ba 100644 --- a/lib/logitech_receiver/notifications.py +++ b/lib/logitech_receiver/notifications.py @@ -39,6 +39,7 @@ from .common import Alert from .common import BatteryStatus from .common import Notification +from .device import Device from .hidpp10_constants import Registers if typing.TYPE_CHECKING: @@ -56,16 +57,16 @@ notification_lock = threading.Lock() -def process(device, notification): +def process(device, hidpp_notification: HIDPPNotification): assert device - assert notification + assert hidpp_notification if not device.isDevice: - return _process_receiver_notification(device, notification) - return _process_device_notification(device, notification) + return process_receiver_notification(device, hidpp_notification) + return process_device_notification(device, hidpp_notification) -def _process_receiver_notification(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool | None: +def process_receiver_notification(receiver: Receiver, hidpp_notification: HIDPPNotification) -> bool | None: # supposedly only 0x4x notifications arrive for the receiver assert hidpp_notification.sub_id in [ Notification.CONNECT_DISCONNECT, @@ -167,25 +168,25 @@ def _process_receiver_notification(receiver: Receiver, hidpp_notification: HIDPP logger.warning("%s: unhandled notification %s", receiver, hidpp_notification) -def _process_device_notification(device, n): +def process_device_notification(device: Device, hidpp_notification: HIDPPNotification): # incoming packets with SubId >= 0x80 are supposedly replies from HID++ 1.0 requests, should never get here - assert n.sub_id & 0x80 == 0 + assert hidpp_notification.sub_id & 0x80 == 0 - if n.sub_id == Notification.NO_OPERATION: + if hidpp_notification.sub_id == Notification.NO_OPERATION: # dispose it return False # Allow the device object to handle the notification using custom per-device state. - handling_ret = device.handle_notification(n) + handling_ret = device.handle_notification(hidpp_notification) if handling_ret is not None: return handling_ret # 0x40 to 0x7F appear to be HID++ 1.0 or DJ notifications - if n.sub_id >= 0x40: - if n.report_id == base.DJ_MESSAGE_ID: - return _process_dj_notification(device, n) + if hidpp_notification.sub_id >= 0x40: + if hidpp_notification.report_id == base.DJ_MESSAGE_ID: + return _process_dj_notification(device, hidpp_notification) else: - return _process_hidpp10_notification(device, n) + return _process_hidpp10_notification(device, hidpp_notification) # These notifications are from the device itself, so it must be active device.online = True @@ -194,63 +195,67 @@ def _process_device_notification(device, n): # some custom battery events for HID++ 1.0 devices if device.protocol < 2.0: - return _process_hidpp10_custom_notification(device, n) + return _process_hidpp10_custom_notification(device, hidpp_notification) # assuming 0x00 to 0x3F are feature (HID++ 2.0) notifications if not device.features: - logger.warning("%s: feature notification but features not set up: %02X %s", device, n.sub_id, n) + logger.warning( + "%s: feature notification but features not set up: %02X %s", device, hidpp_notification.sub_id, hidpp_notification + ) return False try: - feature = device.features.get_feature(n.sub_id) + feature = device.features.get_feature(hidpp_notification.sub_id) except IndexError: - logger.warning("%s: notification from invalid feature index %02X: %s", device, n.sub_id, n) + logger.warning( + "%s: notification from invalid feature index %02X: %s", device, hidpp_notification.sub_id, hidpp_notification + ) return False - return _process_feature_notification(device, n, feature) + return _process_feature_notification(device, hidpp_notification, feature) -def _process_dj_notification(device, n): +def _process_dj_notification(device, hidpp_notification: HIDPPNotification): if logger.isEnabledFor(logging.DEBUG): - logger.debug("%s (%s) DJ %s", device, device.protocol, n) + logger.debug("%s (%s) DJ %s", device, device.protocol, hidpp_notification) - if n.sub_id == Notification.CONNECT_DISCONNECT: + if hidpp_notification.sub_id == Notification.CONNECT_DISCONNECT: # do all DJ paired notifications also show up as HID++ 1.0 notifications? if logger.isEnabledFor(logging.INFO): - logger.info("%s: ignoring DJ unpaired: %s", device, n) + logger.info("%s: ignoring DJ unpaired: %s", device, hidpp_notification) return True - if n.sub_id == Notification.DJ_PAIRING: + if hidpp_notification.sub_id == Notification.DJ_PAIRING: # do all DJ paired notifications also show up as HID++ 1.0 notifications? if logger.isEnabledFor(logging.INFO): - logger.info("%s: ignoring DJ paired: %s", device, n) + logger.info("%s: ignoring DJ paired: %s", device, hidpp_notification) return True - if n.sub_id == Notification.CONNECTED: - connected = not n.address & 0x01 + if hidpp_notification.sub_id == Notification.CONNECTED: + connected = not hidpp_notification.address & 0x01 if logger.isEnabledFor(logging.INFO): - logger.info("%s: DJ connection: %s %s", device, connected, n) + logger.info("%s: DJ connection: %s %s", device, connected, hidpp_notification) device.changed(active=connected, alert=Alert.NONE, reason=_("connected") if connected else _("disconnected")) return True - logger.warning("%s: unrecognized DJ %s", device, n) + logger.warning("%s: unrecognized DJ %s", device, hidpp_notification) -def _process_hidpp10_custom_notification(device, n): +def _process_hidpp10_custom_notification(device, hidpp_notification: HIDPPNotification): if logger.isEnabledFor(logging.DEBUG): - logger.debug("%s (%s) custom notification %s", device, device.protocol, n) + logger.debug("%s (%s) custom notification %s", device, device.protocol, hidpp_notification) - if n.sub_id in (Registers.BATTERY_STATUS, Registers.BATTERY_CHARGE): - assert n.data[-1:] == b"\x00" - data = chr(n.address).encode() + n.data - device.set_battery_info(hidpp10.parse_battery_status(n.sub_id, data)) + if hidpp_notification.sub_id in (Registers.BATTERY_STATUS, Registers.BATTERY_CHARGE): + assert hidpp_notification.data[-1:] == b"\x00" + data = chr(hidpp_notification.address).encode() + hidpp_notification.data + device.set_battery_info(hidpp10.parse_battery_status(hidpp_notification.sub_id, data)) return True - logger.warning("%s: unrecognized %s", device, n) + logger.warning("%s: unrecognized %s", device, hidpp_notification) -def _process_hidpp10_notification(device, n): - if n.sub_id == Notification.CONNECT_DISCONNECT: # device unpairing - if n.address == 0x02: +def _process_hidpp10_notification(device: Device, hidpp_notification: HIDPPNotification): + if hidpp_notification.sub_id == Notification.CONNECT_DISCONNECT: # device unpairing + if hidpp_notification.address == 0x02: # device un-paired device.wpid = None if device.number in device.receiver: @@ -258,21 +263,29 @@ def _process_hidpp10_notification(device, n): device.changed(active=False, alert=Alert.ALL, reason=_("unpaired")) ## device.status = None else: - logger.warning("%s: disconnection with unknown type %02X: %s", device, n.address, n) + logger.warning( + "%s: disconnection with unknown type %02X: %s", device, hidpp_notification.address, hidpp_notification + ) return True - if n.sub_id == Notification.DJ_PAIRING: # device connection (and disconnection) - flags = ord(n.data[:1]) & 0xF0 - if n.address == 0x02: # very old 27 MHz protocol - wpid = "00" + common.strhex(n.data[2:3]) + if hidpp_notification.sub_id == Notification.DJ_PAIRING: # device connection (and disconnection) + unicode_as_int = ord(hidpp_notification.data[:1]) + flags = unicode_as_int & 0xF0 + if hidpp_notification.address == 0x02: # very old 27 MHz protocol + wpid = "00" + common.strhex(hidpp_notification.data[2:3]) link_established = True link_encrypted = bool(flags & 0x80) - elif n.address > 0x00: # all other protocols are supposed to be almost the same - wpid = common.strhex(n.data[2:3] + n.data[1:2]) + elif hidpp_notification.address > 0x00: # all other protocols are supposed to be almost the same + wpid = common.strhex(hidpp_notification.data[2:3] + hidpp_notification.data[1:2]) link_established = not (flags & 0x40) - link_encrypted = bool(flags & 0x20) or n.address == 0x10 # Bolt protocol always encrypted + link_encrypted = bool(flags & 0x20) or hidpp_notification.address == 0x10 # Bolt protocol always encrypted else: - logger.warning("%s: connection notification with unknown protocol %02X: %s", device.number, n.address, n) + logger.warning( + "%s: connection notification with unknown protocol %02X: %s", + device.number, + hidpp_notification.address, + hidpp_notification, + ) return True if wpid != device.wpid: logger.warning("%s wpid mismatch, got %s", device, wpid) @@ -280,7 +293,7 @@ def _process_hidpp10_notification(device, n): logger.debug( "%s: protocol %s connection notification: software=%s, encrypted=%s, link=%s, payload=%s", device, - n.address, + hidpp_notification.address, bool(flags & 0x10), link_encrypted, link_established, @@ -292,75 +305,81 @@ def _process_hidpp10_notification(device, n): device.changed(active=link_established) return True - if n.sub_id == Notification.RAW_INPUT: + if hidpp_notification.sub_id == Notification.RAW_INPUT: # raw input event? just ignore it - # if n.address == 0x01, no idea what it is, but they keep on coming - # if n.address == 0x03, appears to be an actual input event, because they only come when input happents + # no idea what it is, but they keep on coming + # if hidpp_notification.address == 0x01 + # appears to be an actual input event, because they only come when input happen + # if hidpp_notification.address == 0x03 return True - if n.sub_id == Notification.POWER: - if n.address == 0x01: + if hidpp_notification.sub_id == Notification.POWER: + if hidpp_notification.address == 0x01: if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: device powered on", device) reason = device.status_string() or _("powered on") device.changed(active=True, alert=Alert.NOTIFICATION, reason=reason) else: - logger.warning("%s: unknown %s", device, n) + logger.warning("%s: unknown %s", device, hidpp_notification) return True - logger.warning("%s: unrecognized %s", device, n) + logger.warning("%s: unrecognized %s", device, hidpp_notification) -def _process_feature_notification(device, n, feature): +def _process_feature_notification(device, hidpp_notification: HIDPPNotification, feature): if logger.isEnabledFor(logging.DEBUG): logger.debug( - "%s: notification for feature %s, report %s, data %s", device, feature, n.address >> 4, common.strhex(n.data) + "%s: notification for feature %s, report %s, data %s", + device, + feature, + hidpp_notification.address >> 4, + common.strhex(hidpp_notification.data), ) if feature == _F.BATTERY_STATUS: - if n.address == 0x00: - device.set_battery_info(hidpp20.decipher_battery_status(n.data)[1]) - elif n.address == 0x10: + if hidpp_notification.address == 0x00: + device.set_battery_info(hidpp20.decipher_battery_status(hidpp_notification.data)[1]) + elif hidpp_notification.address == 0x10: if logger.isEnabledFor(logging.INFO): - logger.info("%s: spurious BATTERY status %s", device, n) + logger.info("%s: spurious BATTERY status %s", device, hidpp_notification) else: - logger.warning("%s: unknown BATTERY %s", device, n) + logger.warning("%s: unknown BATTERY %s", device, hidpp_notification) elif feature == _F.BATTERY_VOLTAGE: - if n.address == 0x00: - device.set_battery_info(hidpp20.decipher_battery_voltage(n.data)[1]) + if hidpp_notification.address == 0x00: + device.set_battery_info(hidpp20.decipher_battery_voltage(hidpp_notification.data)[1]) else: - logger.warning("%s: unknown VOLTAGE %s", device, n) + logger.warning("%s: unknown VOLTAGE %s", device, hidpp_notification) elif feature == _F.UNIFIED_BATTERY: - if n.address == 0x00: - device.set_battery_info(hidpp20.decipher_battery_unified(n.data)[1]) + if hidpp_notification.address == 0x00: + device.set_battery_info(hidpp20.decipher_battery_unified(hidpp_notification.data)[1]) else: - logger.warning("%s: unknown UNIFIED BATTERY %s", device, n) + logger.warning("%s: unknown UNIFIED BATTERY %s", device, hidpp_notification) elif feature == _F.ADC_MEASUREMENT: - if n.address == 0x00: - result = hidpp20.decipher_adc_measurement(n.data) + if hidpp_notification.address == 0x00: + result = hidpp20.decipher_adc_measurement(hidpp_notification.data) if result: device.set_battery_info(result[1]) else: # this feature is used to signal device becoming inactive device.changed(active=False) else: - logger.warning("%s: unknown ADC MEASUREMENT %s", device, n) + logger.warning("%s: unknown ADC MEASUREMENT %s", device, hidpp_notification) elif feature == _F.SOLAR_DASHBOARD: - if n.data[5:9] == b"GOOD": - charge, lux, adc = struct.unpack("!BHH", n.data[:5]) + if hidpp_notification.data[5:9] == b"GOOD": + charge, lux, adc = struct.unpack("!BHH", hidpp_notification.data[:5]) # guesstimate the battery voltage, emphasis on 'guess' # status_text = '%1.2fV' % (adc * 2.67793237653 / 0x0672) status_text = BatteryStatus.DISCHARGING - if n.address == 0x00: + if hidpp_notification.address == 0x00: device.set_battery_info(common.Battery(charge, None, status_text, None)) - elif n.address == 0x10: + elif hidpp_notification.address == 0x10: if lux > 200: status_text = BatteryStatus.RECHARGING device.set_battery_info(common.Battery(charge, None, status_text, None, lux)) - elif n.address == 0x20: + elif hidpp_notification.address == 0x20: if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: Light Check button pressed", device) device.changed(alert=Alert.SHOW_WINDOW) @@ -371,72 +390,72 @@ def _process_feature_notification(device, n, feature): reports_period = 2 # seconds device.feature_request(_F.SOLAR_DASHBOARD, 0x00, reports_count, reports_period) else: - logger.warning("%s: unknown SOLAR CHARGE %s", device, n) + logger.warning("%s: unknown SOLAR CHARGE %s", device, hidpp_notification) else: - logger.warning("%s: SOLAR CHARGE not GOOD? %s", device, n) + logger.warning("%s: SOLAR CHARGE not GOOD? %s", device, hidpp_notification) elif feature == _F.WIRELESS_DEVICE_STATUS: - if n.address == 0x00: + if hidpp_notification.address == 0x00: if logger.isEnabledFor(logging.DEBUG): - logger.debug("wireless status: %s", n) - reason = "powered on" if n.data[2] == 1 else None - if n.data[1] == 1: # device is asking for software reconfiguration so need to change status + logger.debug("wireless status: %s", hidpp_notification) + reason = "powered on" if hidpp_notification.data[2] == 1 else None + if hidpp_notification.data[1] == 1: # device is asking for software reconfiguration so need to change status alert = Alert.NONE device.changed(active=True, alert=alert, reason=reason, push=True) else: - logger.warning("%s: unknown WIRELESS %s", device, n) + logger.warning("%s: unknown WIRELESS %s", device, hidpp_notification) elif feature == _F.TOUCHMOUSE_RAW_POINTS: - if n.address == 0x00: + if hidpp_notification.address == 0x00: if logger.isEnabledFor(logging.INFO): - logger.info("%s: TOUCH MOUSE points %s", device, n) - elif n.address == 0x10: - touch = ord(n.data[:1]) + logger.info("%s: TOUCH MOUSE points %s", device, hidpp_notification) + elif hidpp_notification.address == 0x10: + touch = ord(hidpp_notification.data[:1]) button_down = bool(touch & 0x02) mouse_lifted = bool(touch & 0x01) if logger.isEnabledFor(logging.INFO): logger.info("%s: TOUCH MOUSE status: button_down=%s mouse_lifted=%s", device, button_down, mouse_lifted) else: - logger.warning("%s: unknown TOUCH MOUSE %s", device, n) + logger.warning("%s: unknown TOUCH MOUSE %s", device, hidpp_notification) # TODO: what are REPROG_CONTROLS_V{2,3}? elif feature == _F.REPROG_CONTROLS: - if n.address == 0x00: + if hidpp_notification.address == 0x00: if logger.isEnabledFor(logging.INFO): - logger.info("%s: reprogrammable key: %s", device, n) + logger.info("%s: reprogrammable key: %s", device, hidpp_notification) else: - logger.warning("%s: unknown REPROG_CONTROLS %s", device, n) + logger.warning("%s: unknown REPROG_CONTROLS %s", device, hidpp_notification) elif feature == _F.BACKLIGHT2: - if n.address == 0x00: - level = struct.unpack("!B", n.data[1:2])[0] + if hidpp_notification.address == 0x00: + level = struct.unpack("!B", hidpp_notification.data[1:2])[0] if device.setting_callback: device.setting_callback(device, settings_templates.Backlight2Level, [level]) elif feature == _F.REPROG_CONTROLS_V4: - if n.address == 0x00: + if hidpp_notification.address == 0x00: if logger.isEnabledFor(logging.DEBUG): - cid1, cid2, cid3, cid4 = struct.unpack("!HHHH", n.data[:8]) + cid1, cid2, cid3, cid4 = struct.unpack("!HHHH", hidpp_notification.data[:8]) logger.debug("%s: diverted controls pressed: 0x%x, 0x%x, 0x%x, 0x%x", device, cid1, cid2, cid3, cid4) - elif n.address == 0x10: + elif hidpp_notification.address == 0x10: if logger.isEnabledFor(logging.DEBUG): - dx, dy = struct.unpack("!hh", n.data[:4]) + dx, dy = struct.unpack("!hh", hidpp_notification.data[:4]) logger.debug("%s: rawXY dx=%i dy=%i", device, dx, dy) - elif n.address == 0x20: + elif hidpp_notification.address == 0x20: if logger.isEnabledFor(logging.DEBUG): logger.debug("%s: received analyticsKeyEvents", device) elif logger.isEnabledFor(logging.INFO): - logger.info("%s: unknown REPROG_CONTROLS_V4 %s", device, n) + logger.info("%s: unknown REPROG_CONTROLS_V4 %s", device, hidpp_notification) elif feature == _F.HIRES_WHEEL: - if n.address == 0x00: + if hidpp_notification.address == 0x00: if logger.isEnabledFor(logging.INFO): - flags, delta_v = struct.unpack(">bh", n.data[:3]) + flags, delta_v = struct.unpack(">bh", hidpp_notification.data[:3]) high_res = (flags & 0x10) != 0 periods = flags & 0x0F logger.info("%s: WHEEL: res: %d periods: %d delta V:%-3d", device, high_res, periods, delta_v) - elif n.address == 0x10: - ratchet = n.data[0] + elif hidpp_notification.address == 0x10: + ratchet = hidpp_notification.data[0] if logger.isEnabledFor(logging.INFO): logger.info("%s: WHEEL: ratchet: %d", device, ratchet) if ratchet < 2: # don't process messages with unusual ratchet values @@ -444,19 +463,19 @@ def _process_feature_notification(device, n, feature): device.setting_callback(device, settings_templates.ScrollRatchet, [2 if ratchet else 1]) else: if logger.isEnabledFor(logging.INFO): - logger.info("%s: unknown WHEEL %s", device, n) + logger.info("%s: unknown WHEEL %s", device, hidpp_notification) elif feature == _F.ONBOARD_PROFILES: - if n.address > 0x10: + if hidpp_notification.address > 0x10: if logger.isEnabledFor(logging.INFO): - logger.info("%s: unknown ONBOARD PROFILES %s", device, n) + logger.info("%s: unknown ONBOARD PROFILES %s", device, hidpp_notification) else: - if n.address == 0x00: - profile_sector = struct.unpack("!H", n.data[:2])[0] + if hidpp_notification.address == 0x00: + profile_sector = struct.unpack("!H", hidpp_notification.data[:2])[0] if profile_sector: settings_templates.profile_change(device, profile_sector) - elif n.address == 0x10: - resolution_index = struct.unpack("!B", n.data[:1])[0] + elif hidpp_notification.address == 0x10: + resolution_index = struct.unpack("!B", hidpp_notification.data[:1])[0] profile_sector = struct.unpack("!H", device.feature_request(_F.ONBOARD_PROFILES, 0x40)[:2])[0] if device.setting_callback: for profile in device.profiles.profiles.values() if device.profiles else []: @@ -467,18 +486,18 @@ def _process_feature_notification(device, n, feature): break elif feature == _F.BRIGHTNESS_CONTROL: - if n.address > 0x10: + if hidpp_notification.address > 0x10: if logger.isEnabledFor(logging.INFO): - logger.info("%s: unknown BRIGHTNESS CONTROL %s", device, n) + logger.info("%s: unknown BRIGHTNESS CONTROL %s", device, hidpp_notification) else: - if n.address == 0x00: - brightness = struct.unpack("!H", n.data[:2])[0] + if hidpp_notification.address == 0x00: + brightness = struct.unpack("!H", hidpp_notification.data[:2])[0] device.setting_callback(device, settings_templates.BrightnessControl, [brightness]) - elif n.address == 0x10: - brightness = n.data[0] & 0x01 + elif hidpp_notification.address == 0x10: + brightness = hidpp_notification.data[0] & 0x01 if brightness: brightness = struct.unpack("!H", device.feature_request(_F.BRIGHTNESS_CONTROL, 0x10)[:2])[0] device.setting_callback(device, settings_templates.BrightnessControl, [brightness]) - diversion.process_notification(device, n, feature) + diversion.process_notification(device, hidpp_notification, feature) return True diff --git a/tests/logitech_receiver/fake_hidpp.py b/tests/logitech_receiver/fake_hidpp.py index 5b47e48ef..e47990a3d 100644 --- a/tests/logitech_receiver/fake_hidpp.py +++ b/tests/logitech_receiver/fake_hidpp.py @@ -388,6 +388,7 @@ class Device: setting_callback: Any = None sliding = profiles = _backlight = _keys = _remap_keys = _led_effects = _gestures = None _gestures_lock = threading.Lock() + number = "d1" read_register = device.Device.read_register write_register = device.Device.write_register @@ -405,6 +406,7 @@ def __post_init__(self): self.persister = configuration._DeviceEntry() self.features = hidpp20.FeaturesArray(self) self.settings = [] + self.receiver = [] if self.feature is not None: self.features = hidpp20.FeaturesArray(self) self.responses = [ @@ -435,6 +437,18 @@ def ping(self, handle=None, devnumber=None, long_message=False): print("PING", self._protocol) return self._protocol + def handle_notification(self, handle): + pass + + def changed(self, *args, **kwargs): + pass + + def set_battery_info(self, *args, **kwargs): + pass + + def status_string(self): + pass + def match_requests(number, responses, call_args_list): for i in range(0 - number, 0): diff --git a/tests/logitech_receiver/test_notifications.py b/tests/logitech_receiver/test_notifications.py index 9cc8dd333..fd9f578e0 100644 --- a/tests/logitech_receiver/test_notifications.py +++ b/tests/logitech_receiver/test_notifications.py @@ -6,8 +6,11 @@ from logitech_receiver.hidpp10_constants import BoltPairingError from logitech_receiver.hidpp10_constants import PairingError from logitech_receiver.hidpp10_constants import Registers +from logitech_receiver.hidpp20_constants import SupportedFeature from logitech_receiver.receiver import Receiver +from . import fake_hidpp + class MockLowLevelInterface: def open_path(self, path): @@ -22,24 +25,255 @@ def ping(self, handle, number, long_message=False): def request(self, handle, devnumber, request_id, *params, **kwargs): pass + def find_paired_node(self, receiver_path: str, index: int, timeout: int): + return None + + def close(self, device_handle) -> None: + pass + @pytest.mark.parametrize( "sub_id, notification_data, expected_error, expected_new_device", [ (Registers.DISCOVERY_STATUS_NOTIFICATION, b"\x01", BoltPairingError.DEVICE_TIMEOUT, None), + ( + Registers.DEVICE_DISCOVERY_NOTIFICATION, + b"\x01\x01\x01\x01\x01\x01\x01\x01\x01", + None, + None, + ), (Registers.PAIRING_STATUS_NOTIFICATION, b"\x02", BoltPairingError.FAILED, None), (Notification.PAIRING_LOCK, b"\x01", PairingError.DEVICE_TIMEOUT, None), (Notification.PAIRING_LOCK, b"\x02", PairingError.DEVICE_NOT_SUPPORTED, None), (Notification.PAIRING_LOCK, b"\x03", PairingError.TOO_MANY_DEVICES, None), (Notification.PAIRING_LOCK, b"\x06", PairingError.SEQUENCE_TIMEOUT, None), + (Registers.PASSKEY_REQUEST_NOTIFICATION, b"\x06", None, None), + (Registers.PASSKEY_PRESSED_NOTIFICATION, b"\x06", None, None), ], ) def test_process_receiver_notification(sub_id, notification_data, expected_error, expected_new_device): receiver: Receiver = Receiver(MockLowLevelInterface(), None, {}, True, None, None) - notification = HIDPPNotification(0, 0, sub_id, 0, notification_data) + notification = HIDPPNotification(0, 0, sub_id, 0x02, notification_data) - result = notifications._process_receiver_notification(receiver, notification) + result = notifications.process_receiver_notification(receiver, notification) assert result assert receiver.pairing.error == expected_error assert receiver.pairing.new_device is expected_new_device + + +@pytest.mark.parametrize( + "hidpp_notification, expected", + [ + (HIDPPNotification(0, 0, sub_id=Registers.BATTERY_STATUS, address=0, data=b"0x01"), False), + (HIDPPNotification(0, 0, sub_id=Notification.NO_OPERATION, address=0, data=b"0x01"), False), + (HIDPPNotification(0, 0, sub_id=0x40, address=0, data=b"0x01"), True), + ], +) +def test_process_device_notification(hidpp_notification, expected): + device = fake_hidpp.Device() + + result = notifications.process_device_notification(device, hidpp_notification) + + assert result == expected + + +@pytest.mark.parametrize( + "hidpp_notification, expected", + [ + (HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0, data=b"0x01"), True), + (HIDPPNotification(0, 0, sub_id=Notification.DJ_PAIRING, address=0, data=b"0x01"), True), + (HIDPPNotification(0, 0, sub_id=Notification.CONNECTED, address=0, data=b"0x01"), True), + (HIDPPNotification(0, 0, sub_id=Notification.RAW_INPUT, address=0, data=b"0x01"), None), + ], +) +def test_process_dj_notification(hidpp_notification, expected): + device = fake_hidpp.Device() + + result = notifications._process_dj_notification(device, hidpp_notification) + + assert result == expected + + +@pytest.mark.parametrize( + "hidpp_notification, expected", + [ + (HIDPPNotification(0, 0, sub_id=Registers.BATTERY_STATUS, address=0, data=b"\x01\x00"), True), + (HIDPPNotification(0, 0, sub_id=Registers.BATTERY_CHARGE, address=0, data=b"0x01\x00"), True), + (HIDPPNotification(0, 0, sub_id=Notification.RAW_INPUT, address=0, data=b"0x01"), None), + ], +) +def test_process_hidpp10_custom_notification(hidpp_notification, expected): + device = fake_hidpp.Device() + + result = notifications._process_hidpp10_custom_notification(device, hidpp_notification) + + assert result == expected + + +@pytest.mark.parametrize( + "hidpp_notification, expected", + [ + (HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x02, data=b"0x01"), True), + (HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x00, data=b"0x01"), True), + (HIDPPNotification(0, 0, sub_id=Notification.DJ_PAIRING, address=0x00, data=b"0x01"), True), + (HIDPPNotification(0, 0, sub_id=Notification.DJ_PAIRING, address=0x02, data=b"0x01"), True), + (HIDPPNotification(0, 0, sub_id=Notification.DJ_PAIRING, address=0x03, data=b"0x01"), True), + (HIDPPNotification(0, 0, sub_id=Notification.DJ_PAIRING, address=0x03, data=b"0x4040"), True), + (HIDPPNotification(0, 0, sub_id=Notification.RAW_INPUT, address=0x00, data=b"0x01"), True), + (HIDPPNotification(0, 0, sub_id=Notification.POWER, address=0x00, data=b"0x01"), True), + (HIDPPNotification(0, 0, sub_id=Notification.POWER, address=0x01, data=b"0x01"), True), + (HIDPPNotification(0, 0, sub_id=Notification.PAIRING_LOCK, address=0x01, data=b"0x01"), None), + ], +) +def test_process_hidpp10_notification(hidpp_notification, expected): + fake_device = fake_hidpp.Device() + fake_device.receiver = ["rec1", "rec2"] + + result = notifications._process_hidpp10_notification(fake_device, hidpp_notification) + + assert result == expected + + +@pytest.mark.parametrize( + "hidpp_notification, feature", + [ + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x00, data=b"0x01"), + SupportedFeature.BATTERY_STATUS, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x02, data=b"0x01"), + SupportedFeature.BATTERY_STATUS, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x00, data=b"0x01"), + SupportedFeature.BATTERY_VOLTAGE, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x05, data=b"0x01"), + SupportedFeature.BATTERY_VOLTAGE, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x00, data=b"0x01"), + SupportedFeature.UNIFIED_BATTERY, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x02, data=b"0x01"), + SupportedFeature.UNIFIED_BATTERY, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x00, data=b"0x01"), + SupportedFeature.ADC_MEASUREMENT, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x02, data=b"0x01"), + SupportedFeature.ADC_MEASUREMENT, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x00, data=b"01234GOOD"), + SupportedFeature.SOLAR_DASHBOARD, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x10, data=b"01234GOOD"), + SupportedFeature.SOLAR_DASHBOARD, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x20, data=b"01234GOOD"), + SupportedFeature.SOLAR_DASHBOARD, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x02, data=b"01234GOOD"), + SupportedFeature.SOLAR_DASHBOARD, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x02, data=b"CHARGENOTGOOD"), + SupportedFeature.SOLAR_DASHBOARD, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x00, data=b"\x01\x01\x02"), + SupportedFeature.WIRELESS_DEVICE_STATUS, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x02, data=b"0x01"), + SupportedFeature.WIRELESS_DEVICE_STATUS, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x00, data=b"0x01"), + SupportedFeature.TOUCHMOUSE_RAW_POINTS, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x10, data=b"0x01"), + SupportedFeature.TOUCHMOUSE_RAW_POINTS, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x05, data=b"0x01"), + SupportedFeature.TOUCHMOUSE_RAW_POINTS, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x00, data=b"0x01"), + SupportedFeature.REPROG_CONTROLS, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x02, data=b"0x01"), + SupportedFeature.REPROG_CONTROLS, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x00, data=b"0x01"), + SupportedFeature.BACKLIGHT2, + ), + ( + HIDPPNotification( + 0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x00, data=b"\x01\x01\x01\x01\x01\x01\x01\x01" + ), + SupportedFeature.REPROG_CONTROLS_V4, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x10, data=b"0x01"), + SupportedFeature.REPROG_CONTROLS_V4, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x20, data=b"0x01"), + SupportedFeature.REPROG_CONTROLS_V4, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x00, data=b"0x01"), + SupportedFeature.HIRES_WHEEL, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x10, data=b"0x01"), + SupportedFeature.HIRES_WHEEL, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x02, data=b"0x01"), + SupportedFeature.HIRES_WHEEL, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x00, data=b"0x01"), + SupportedFeature.ONBOARD_PROFILES, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x20, data=b"0x01"), + SupportedFeature.ONBOARD_PROFILES, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x00, data=b"0x01"), + SupportedFeature.BRIGHTNESS_CONTROL, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x10, data=b"0x01"), + SupportedFeature.BRIGHTNESS_CONTROL, + ), + ( + HIDPPNotification(0, 0, sub_id=Notification.CONNECT_DISCONNECT, address=0x20, data=b"0x01"), + SupportedFeature.BRIGHTNESS_CONTROL, + ), + ], +) +def test_process_feature_notification(mocker, hidpp_notification, feature): + fake_device = fake_hidpp.Device() + fake_device.receiver = ["rec1", "rec2"] + + result = notifications._process_feature_notification(fake_device, hidpp_notification, feature) + + assert result is True