diff --git a/docs/research.ods b/docs/research.ods index b9bfcf8..d976dc8 100644 Binary files a/docs/research.ods and b/docs/research.ods differ diff --git a/openfreebuds/driver/huawei/handler/config_equalizer.py b/openfreebuds/driver/huawei/handler/config_equalizer.py index 0414619..bc1b390 100644 --- a/openfreebuds/driver/huawei/handler/config_equalizer.py +++ b/openfreebuds/driver/huawei/handler/config_equalizer.py @@ -4,7 +4,6 @@ from openfreebuds.driver.huawei.driver.generic import OfbDriverHandlerHuawei from openfreebuds.driver.huawei.package import HuaweiSppPackage from openfreebuds.exceptions import OfbNotSupportedError, OfbTooManyItemsError -from openfreebuds.utils import reverse_dict from openfreebuds.utils.logger import create_logger KNOWN_BUILT_IN_PRESETS = { @@ -13,6 +12,14 @@ 3: "equalizer_preset_treble", 9: "equalizer_preset_voices", } +FAKE_BUILT_IN_PRESETS = [ + (-56, "equalizer_preset_symphony", "0f0f0afb0f190ffb322d"), + (-55, "equalizer_preset_hi_fi_live", "fb141e0a0000e7f60a00"), +] +FAKE_BUILT_IN_PRESETS_COPY_NAME = { + "equalizer_preset_symphony": "Symphony (copy)", + "equalizer_preset_hi_fi_live": "Hi-Fi Live (Copy)", +} log = create_logger("OfbHuaweiEqualizerPresetHandler") @@ -34,20 +41,34 @@ def __init__( self, w_presets: Optional[dict[int, str]] = None, w_custom: bool = False, + w_fake_built_in: bool = False, w_custom_rows: int = 10, w_custom_max_count: int = 3, ): self.w_custom: bool = w_custom self.w_custom_rows = w_custom_rows self.w_custom_max_count = w_custom_max_count + self.w_fake_built_in = w_fake_built_in self.w_options_predefined: bool = w_presets is not None self.changes_saved: bool = True - self.custom_preset_values: dict[int, bytes] = {} self.current_rollback_data: bytes = b"" - self.options: Optional[dict[int, str]] = None + self.default_preset_data: list[tuple[Optional[int], str, Optional[str]]] = [] + self.data_overrides: dict[int, str] = {} + + # Predefined set of built-in modes if w_presets: - self.options = {i: f"equalizer_preset_{name}" for i, name in w_presets.items()} + for i, name in w_presets: + self.default_preset_data.append((i, name, None)) + + # Load predefined presets + if w_fake_built_in: + self.default_preset_data.extend((a, b, None) for a, b, c in FAKE_BUILT_IN_PRESETS) + self.data_overrides = {i: d for i, _, d in FAKE_BUILT_IN_PRESETS} + elif w_custom: + self.default_preset_data.extend((None, b, c) for _, b, c in FAKE_BUILT_IN_PRESETS) + + self.preset_data = self.default_preset_data async def on_init(self): resp = await self.driver.send_package( @@ -68,14 +89,12 @@ async def set_property(self, group: str, prop: str, value): raise OfbNotSupportedError("Impossible error") async def _toggle_save(self, save: bool): - mode_str = await self.driver.get_property("sound", "equalizer_preset", None) - mode_id = reverse_dict(self.options).get(mode_str) + mode_id, mode_str, data = await self.find_current_mode() if mode_id is None: + log.info(f"Skip unknown mode override {mode_str}") return if save: - data = await self.driver.get_property("sound", "equalizer_rows", "[]") - data = b"".join([x.to_bytes(1, byteorder="big", signed=True) for x in json.loads(data)]) log.info(f"Will save persistent preset data={data}, mode_id={mode_id}") else: data = self.current_rollback_data @@ -92,15 +111,14 @@ async def _toggle_save(self, save: bool): self.changes_saved = True async def _change_current_mode(self, value: str): - mode_str = await self.driver.get_property("sound", "equalizer_preset", None) - mode_id = reverse_dict(self.options).get(mode_str) + mode_id, mode_str, mode_data = await self.find_current_mode() if mode_id is None: - log.info(f"Skip unknown mode {mode_str}/{mode_id}") + log.info(f"Skip unknown mode override {mode_str}") return data = b"".join([x.to_bytes(1, byteorder="big", signed=True) for x in json.loads(value)]) if self.changes_saved: - self.current_rollback_data = self.custom_preset_values[mode_id] + self.current_rollback_data = mode_data log.info(f"Will replace id={mode_id}, label={mode_str} with data={data}") pkg = HuaweiSppPackage.change_rq( @@ -113,53 +131,73 @@ async def _change_current_mode(self, value: str): self.changes_saved = False async def _delete_current_mode(self): - mode_str = await self.driver.get_property("sound", "equalizer_preset", None) - mode_id = reverse_dict(self.options).get(mode_str) + mode_id, mode_str, mode_data = await self.find_current_mode() if mode_id is None: + log.info(f"Skip unknown mode deletion {mode_str}") return - data = self.custom_preset_values[mode_id] - log.info(f"Delete id={mode_id}, label={mode_str}, data={data}") + log.info(f"Delete id={mode_id}, label={mode_str}, data={mode_data}") pkg = HuaweiSppPackage.change_rq( b"\x2b\x49", - _build_payload(mode_id, mode_str, data, 2) + _build_payload(mode_id, mode_str, mode_data, 2) ) await self.driver.send_package(pkg) await self.on_init() + async def find_current_mode(self): + mode_str = await self.driver.get_property("sound", "equalizer_preset", None) + candidates = [(p_id, label, data) for p_id, label, data in self.preset_data if label == mode_str] + if len(candidates) < 1: + return None, None, None + return candidates[0] + async def _set_current_mode(self, mode_str): - mode_id = reverse_dict(self.options).get(mode_str) + candidates = [(p_id, label, data) for p_id, label, data in self.preset_data if label == mode_str] + custom_modes = [p_id for p_id, _, data in self.preset_data if data is not None and p_id is not None] + mode_id = 0 + + # What is going to do? + if len(candidates) < 1: + # Create new mode from scratch + mode_data = b"\x00" * self.w_custom_rows + current_mode = await self.driver.get_property("sound", "equalizer_preset", None) + candidate_data = [data for _, label, data in self.preset_data if label == current_mode] + if len(candidate_data) > 0 and candidate_data[0] is not None: + mode_data = candidate_data[0] + + do_create = True + elif candidates[0][0] is None: + # Load predefined mode as custom + mode_id, _, mode_data = candidates[0] + mode_str = FAKE_BUILT_IN_PRESETS_COPY_NAME.get(mode_str, mode_str) + do_create = True + else: + mode_id, mode_str_orig, mode_data = candidates[0] + if mode_id in self.data_overrides: + mode_data = self.data_overrides[mode_id] + do_create = False - # New mode creation - if mode_id is None: + # Preparation for new mode + if do_create: if not self.w_custom: raise OfbNotSupportedError("Device didn't support custom equalizer presets") - if len(self.custom_preset_values.keys()) >= self.w_custom_max_count: + if len(custom_modes) >= self.w_custom_max_count: raise OfbTooManyItemsError() - mode_id = 100 - while mode_id in self.custom_preset_values: + while mode_id in custom_modes: mode_id += 1 - data = b"\x00" * self.w_custom_rows + self.preset_data.append((mode_id, mode_str, mode_data)) + log.info(f"Will create new preset id={mode_id}, data={mode_data}") - current_mode = await self.driver.get_property("sound", "equalizer_preset", None) - current_mode_id = reverse_dict(self.options).get(current_mode) - if current_mode_id in self.custom_preset_values: - data = self.custom_preset_values[current_mode_id] - - self.custom_preset_values[mode_id] = data - log.info(f"Will create new preset id={mode_id}, data={data}") - - if mode_id in self.custom_preset_values: + if mode_data is not None: # Is custom mode, use advanced payload - data = self.custom_preset_values[mode_id] pkg = HuaweiSppPackage.change_rq( b"\x2b\x49", - _build_payload(mode_id, mode_str, data, 1) + _build_payload(mode_id, mode_str, mode_data, 1) ) else: - # Is built'in mode + # Is built-in mode pkg = HuaweiSppPackage.change_rq( b"\x2b\x49", [(1, mode_id)] @@ -179,48 +217,59 @@ async def on_package(self, package: HuaweiSppPackage): } available_modes = package.find_param(3) + self.preset_data = [] if not self.w_options_predefined and len(available_modes) > 0: - self.options = {i: KNOWN_BUILT_IN_PRESETS.get(i, f"preset_{i}") for i in available_modes} - # log.info(f"Read built-in options {self.options}") + for p_id in available_modes: + self.preset_data.append((p_id, KNOWN_BUILT_IN_PRESETS.get(p_id, f"preset_{p_id}"), None)) + self.preset_data.extend(self.default_preset_data) custom_modes = package.find_param(8) if self.w_custom and len(custom_modes) > 0: offset = 0 - self.custom_preset_values = {} while offset < len(custom_modes): mode_id, mode_label, mode_data = _parse_custom_mode(custom_modes[offset:offset + 36]) - self.custom_preset_values[mode_id] = mode_data - self.options[mode_id] = mode_label + self.preset_data.append((mode_id, mode_label, mode_data)) # log.info(f"Read custom mode id={mode_id}, label={mode_label}, data={mode_data}") offset += 36 - new_props["equalizer_preset_options"] = ",".join(self.options.values()) + log.info(self.preset_data) + + new_props["equalizer_preset_options"] = ",".join([l for _, l, _ in self.preset_data]) + if self.w_custom and not self.w_fake_built_in: + new_props["equalizer_preset_create_options"] = ",".join(l for _, l, _ in FAKE_BUILT_IN_PRESETS) current_mode = package.find_param(2) if len(current_mode) == 1: current_mode = int.from_bytes(current_mode, byteorder="big", signed=True) - new_props["equalizer_preset"] = self.options.get(current_mode, f"unknown_{current_mode}") - - if current_mode in self.custom_preset_values: - data = _eq_bytes_to_array(self.custom_preset_values[current_mode]) - new_props["equalizer_rows"] = json.dumps(data) + value = f"unknown_{current_mode}" + for p_id, label, data in self.preset_data: + if p_id == current_mode: + value = label + if data is not None: + new_props["equalizer_rows"] = json.dumps(_eq_bytes_to_array(data)) + break + new_props["equalizer_preset"] = value await self.driver.put_property("sound", None, new_props, extend_group=True) -def _eq_bytes_to_array(data: bytes): +def _eq_bytes_to_array(data: bytes | str): + if isinstance(data, str): + data = bytes.fromhex(data) return [int.from_bytes((x,), byteorder="big", signed=True) for x in data] def _parse_custom_mode(data: bytes): count_lines = data[1] - data_lines = data[2:2 + count_lines] + data_lines = data[2:2 + count_lines].hex() label = data[2 + count_lines:].split(b"\x00", 1)[0].decode("utf8") return data[0], label, data_lines -def _build_payload(mode_id: int, mode_str: str, data: bytes, action: int): +def _build_payload(mode_id: int, mode_str: str, data: bytes | str, action: int): + if isinstance(data, str): + data = bytes.fromhex(data) return [ (1, mode_id), (2, len(data)), diff --git a/openfreebuds/driver/huawei/package.py b/openfreebuds/driver/huawei/package.py index 91d8e32..159fc4c 100644 --- a/openfreebuds/driver/huawei/package.py +++ b/openfreebuds/driver/huawei/package.py @@ -43,16 +43,22 @@ def to_table_string(self): """ Pretty-print this pacakge contents """ + hex_len = 40 + for p_type in self.parameters: + cand_l = self.parameters[p_type].hex() + if len(cand_l) > hex_len: + hex_len = len(cand_l) + out = build_table_row(12, "COMMAND_ID") out += build_table_row(10, "2 bytes") - out += build_table_row(40, self.command_id.hex(), []) + "\n" + out += build_table_row(hex_len, self.command_id.hex(), []) + "\n" - out += 70 * "=" + "\n" + out += (30 + hex_len) * "=" + "\n" for p_type in self.parameters: p_value = self.parameters[p_type] out += build_table_row(12, f"PARAM {p_type}") out += build_table_row(10, f"{len(p_value)} bytes") - out += build_table_row(40, p_value.hex()) + out += build_table_row(hex_len, p_value.hex()) if all(c < 128 for c in p_value): # ASCII string diff --git a/openfreebuds_qt/app/module/sound_quality.py b/openfreebuds_qt/app/module/sound_quality.py index 03fd504..09289c7 100644 --- a/openfreebuds_qt/app/module/sound_quality.py +++ b/openfreebuds_qt/app/module/sound_quality.py @@ -36,6 +36,7 @@ def __init__(self, *args, **kwargs): self._eq_last_options: list[str] = [] self._eq_rows: list[QSlider] = [] self._last_preset_data: list[int] = [] + self._last_copy_options: list[str] = [] self.setupUi(self) @@ -101,7 +102,10 @@ async def update_ui(self, event: OfbCoreEvent): if event.is_changed("sound", "equalizer_preset"): value = sound.get("equalizer_preset") options = sound.get("equalizer_preset_options") + copy_options = sound.get("equalizer_preset_create_options") self.eq_root.setVisible(value is not None and options is not None) + if copy_options is not None: + self._last_copy_options = list(copy_options.split(",")) if options is not None: self._eq_last_options = list(options.split(",")) fill_combo_box(self.eq_preset_box, self._eq_last_options, self.eq_preset_option_names, value) @@ -140,7 +144,25 @@ async def on_sq_set_quality(self): async def on_eq_preset_change(self, index: int): async with qt_error_handler("OfbQtSoundQualityModule_ChangePreset", self.ctx): value = self._eq_last_options[index] - await self.ofb.set_property("sound", "equalizer_preset", value) + + if value in self._last_copy_options: + dialog = QMessageBox(QMessageBox.Icon.Information, + self.tr("Notice"), + self.tr("This preset isn't available in your device, it will be created as " + "custom preset."), + QMessageBox.StandardButton.Ok) + dialog.setWindowModality(Qt.WindowModality.WindowModal) + await run_dialog_async(dialog) + + try: + await self.ofb.set_property("sound", "equalizer_preset", value) + except OfbTooManyItemsError: + dialog = QMessageBox(QMessageBox.Icon.Critical, + self.tr("Failed"), + self.tr("Can't create: too many custom preset created in device."), + QMessageBox.StandardButton.Ok) + dialog.setWindowModality(Qt.WindowModality.WindowModal) + await run_dialog_async(dialog) @asyncSlot() async def new_preset(self): diff --git a/openfreebuds_qt/qt_i18n.py b/openfreebuds_qt/qt_i18n.py index 25da1cf..0ac80f8 100644 --- a/openfreebuds_qt/qt_i18n.py +++ b/openfreebuds_qt/qt_i18n.py @@ -17,6 +17,8 @@ def get_eq_preset_names(): "equalizer_preset_hardbass": QApplication.translate("EqPresetName", "Bass-boost"), "equalizer_preset_treble": QApplication.translate("EqPresetName", "Treble-boost"), "equalizer_preset_voices": QApplication.translate("EqPresetName", "Voices"), + "equalizer_preset_symphony": QApplication.translate("EqPresetName", "Symphony"), + "equalizer_preset_hi_fi_live": QApplication.translate("EqPresetName", "Hi-Fi Live"), } diff --git a/scripts/debug/huawei_pkg_parse.py b/scripts/debug/huawei_pkg_parse.py new file mode 100644 index 0000000..a1f046c --- /dev/null +++ b/scripts/debug/huawei_pkg_parse.py @@ -0,0 +1,17 @@ +import sys +import traceback + +from openfreebuds.driver.huawei.package import HuaweiSppPackage + +while True: + try: + data = bytes.fromhex(input("HEX-stream: ")) + if data[0] != b"\x5a": + data = b"\x5a\x00" + data.split(b"\x5a\x00", 1)[1] + + print("") + print(f"HEX: {data.hex()}") + print("") + print(HuaweiSppPackage.from_bytes(data).to_table_string()) + except Exception: + traceback.print_exc()