Skip to content

Commit

Permalink
[Feature] Add 6i equalizer presets (#43)
Browse files Browse the repository at this point in the history
  • Loading branch information
melianmiko committed Oct 13, 2024
1 parent b1d5727 commit fbb3ba9
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 54 deletions.
Binary file modified docs/research.ods
Binary file not shown.
149 changes: 99 additions & 50 deletions openfreebuds/driver/huawei/handler/config_equalizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from openfreebuds.driver.huawei.driver.generic import OfbDriverHandlerHuawei
from openfreebuds.driver.huawei.package import HuaweiSppPackage
from openfreebuds.exceptions import OfbNotSupportedError, OfbTooManyItemsError
from openfreebuds.utils import reverse_dict
from openfreebuds.utils.logger import create_logger

KNOWN_BUILT_IN_PRESETS = {
Expand All @@ -13,6 +12,14 @@
3: "equalizer_preset_treble",
9: "equalizer_preset_voices",
}
FAKE_BUILT_IN_PRESETS = [
(-56, "equalizer_preset_symphony", "0f0f0afb0f190ffb322d"),
(-55, "equalizer_preset_hi_fi_live", "fb141e0a0000e7f60a00"),
]
FAKE_BUILT_IN_PRESETS_COPY_NAME = {
"equalizer_preset_symphony": "Symphony (copy)",
"equalizer_preset_hi_fi_live": "Hi-Fi Live (Copy)",
}

log = create_logger("OfbHuaweiEqualizerPresetHandler")

Expand All @@ -34,20 +41,34 @@ def __init__(
self,
w_presets: Optional[dict[int, str]] = None,
w_custom: bool = False,
w_fake_built_in: bool = False,
w_custom_rows: int = 10,
w_custom_max_count: int = 3,
):
self.w_custom: bool = w_custom
self.w_custom_rows = w_custom_rows
self.w_custom_max_count = w_custom_max_count
self.w_fake_built_in = w_fake_built_in
self.w_options_predefined: bool = w_presets is not None

self.changes_saved: bool = True
self.custom_preset_values: dict[int, bytes] = {}
self.current_rollback_data: bytes = b""
self.options: Optional[dict[int, str]] = None
self.default_preset_data: list[tuple[Optional[int], str, Optional[str]]] = []
self.data_overrides: dict[int, str] = {}

# Predefined set of built-in modes
if w_presets:
self.options = {i: f"equalizer_preset_{name}" for i, name in w_presets.items()}
for i, name in w_presets:
self.default_preset_data.append((i, name, None))

# Load predefined presets
if w_fake_built_in:
self.default_preset_data.extend((a, b, None) for a, b, c in FAKE_BUILT_IN_PRESETS)
self.data_overrides = {i: d for i, _, d in FAKE_BUILT_IN_PRESETS}
elif w_custom:
self.default_preset_data.extend((None, b, c) for _, b, c in FAKE_BUILT_IN_PRESETS)

self.preset_data = self.default_preset_data

async def on_init(self):
resp = await self.driver.send_package(
Expand All @@ -68,14 +89,12 @@ async def set_property(self, group: str, prop: str, value):
raise OfbNotSupportedError("Impossible error")

async def _toggle_save(self, save: bool):
mode_str = await self.driver.get_property("sound", "equalizer_preset", None)
mode_id = reverse_dict(self.options).get(mode_str)
mode_id, mode_str, data = await self.find_current_mode()
if mode_id is None:
log.info(f"Skip unknown mode override {mode_str}")
return

if save:
data = await self.driver.get_property("sound", "equalizer_rows", "[]")
data = b"".join([x.to_bytes(1, byteorder="big", signed=True) for x in json.loads(data)])
log.info(f"Will save persistent preset data={data}, mode_id={mode_id}")
else:
data = self.current_rollback_data
Expand All @@ -92,15 +111,14 @@ async def _toggle_save(self, save: bool):
self.changes_saved = True

async def _change_current_mode(self, value: str):
mode_str = await self.driver.get_property("sound", "equalizer_preset", None)
mode_id = reverse_dict(self.options).get(mode_str)
mode_id, mode_str, mode_data = await self.find_current_mode()
if mode_id is None:
log.info(f"Skip unknown mode {mode_str}/{mode_id}")
log.info(f"Skip unknown mode override {mode_str}")
return

data = b"".join([x.to_bytes(1, byteorder="big", signed=True) for x in json.loads(value)])
if self.changes_saved:
self.current_rollback_data = self.custom_preset_values[mode_id]
self.current_rollback_data = mode_data
log.info(f"Will replace id={mode_id}, label={mode_str} with data={data}")

pkg = HuaweiSppPackage.change_rq(
Expand All @@ -113,53 +131,73 @@ async def _change_current_mode(self, value: str):
self.changes_saved = False

async def _delete_current_mode(self):
mode_str = await self.driver.get_property("sound", "equalizer_preset", None)
mode_id = reverse_dict(self.options).get(mode_str)
mode_id, mode_str, mode_data = await self.find_current_mode()
if mode_id is None:
log.info(f"Skip unknown mode deletion {mode_str}")
return

data = self.custom_preset_values[mode_id]
log.info(f"Delete id={mode_id}, label={mode_str}, data={data}")
log.info(f"Delete id={mode_id}, label={mode_str}, data={mode_data}")

pkg = HuaweiSppPackage.change_rq(
b"\x2b\x49",
_build_payload(mode_id, mode_str, data, 2)
_build_payload(mode_id, mode_str, mode_data, 2)
)
await self.driver.send_package(pkg)
await self.on_init()

async def find_current_mode(self):
mode_str = await self.driver.get_property("sound", "equalizer_preset", None)
candidates = [(p_id, label, data) for p_id, label, data in self.preset_data if label == mode_str]
if len(candidates) < 1:
return None, None, None
return candidates[0]

async def _set_current_mode(self, mode_str):
mode_id = reverse_dict(self.options).get(mode_str)
candidates = [(p_id, label, data) for p_id, label, data in self.preset_data if label == mode_str]
custom_modes = [p_id for p_id, _, data in self.preset_data if data is not None and p_id is not None]
mode_id = 0

# What is going to do?
if len(candidates) < 1:
# Create new mode from scratch
mode_data = b"\x00" * self.w_custom_rows
current_mode = await self.driver.get_property("sound", "equalizer_preset", None)
candidate_data = [data for _, label, data in self.preset_data if label == current_mode]
if len(candidate_data) > 0 and candidate_data[0] is not None:
mode_data = candidate_data[0]

do_create = True
elif candidates[0][0] is None:
# Load predefined mode as custom
mode_id, _, mode_data = candidates[0]
mode_str = FAKE_BUILT_IN_PRESETS_COPY_NAME.get(mode_str, mode_str)
do_create = True
else:
mode_id, mode_str_orig, mode_data = candidates[0]
if mode_id in self.data_overrides:
mode_data = self.data_overrides[mode_id]
do_create = False

# New mode creation
if mode_id is None:
# Preparation for new mode
if do_create:
if not self.w_custom:
raise OfbNotSupportedError("Device didn't support custom equalizer presets")
if len(self.custom_preset_values.keys()) >= self.w_custom_max_count:
if len(custom_modes) >= self.w_custom_max_count:
raise OfbTooManyItemsError()

mode_id = 100
while mode_id in self.custom_preset_values:
while mode_id in custom_modes:
mode_id += 1
data = b"\x00" * self.w_custom_rows
self.preset_data.append((mode_id, mode_str, mode_data))
log.info(f"Will create new preset id={mode_id}, data={mode_data}")

current_mode = await self.driver.get_property("sound", "equalizer_preset", None)
current_mode_id = reverse_dict(self.options).get(current_mode)
if current_mode_id in self.custom_preset_values:
data = self.custom_preset_values[current_mode_id]

self.custom_preset_values[mode_id] = data
log.info(f"Will create new preset id={mode_id}, data={data}")

if mode_id in self.custom_preset_values:
if mode_data is not None:
# Is custom mode, use advanced payload
data = self.custom_preset_values[mode_id]
pkg = HuaweiSppPackage.change_rq(
b"\x2b\x49",
_build_payload(mode_id, mode_str, data, 1)
_build_payload(mode_id, mode_str, mode_data, 1)
)
else:
# Is built'in mode
# Is built-in mode
pkg = HuaweiSppPackage.change_rq(
b"\x2b\x49",
[(1, mode_id)]
Expand All @@ -179,48 +217,59 @@ async def on_package(self, package: HuaweiSppPackage):
}

available_modes = package.find_param(3)
self.preset_data = []
if not self.w_options_predefined and len(available_modes) > 0:
self.options = {i: KNOWN_BUILT_IN_PRESETS.get(i, f"preset_{i}") for i in available_modes}
# log.info(f"Read built-in options {self.options}")
for p_id in available_modes:
self.preset_data.append((p_id, KNOWN_BUILT_IN_PRESETS.get(p_id, f"preset_{p_id}"), None))
self.preset_data.extend(self.default_preset_data)

custom_modes = package.find_param(8)
if self.w_custom and len(custom_modes) > 0:
offset = 0
self.custom_preset_values = {}
while offset < len(custom_modes):
mode_id, mode_label, mode_data = _parse_custom_mode(custom_modes[offset:offset + 36])
self.custom_preset_values[mode_id] = mode_data
self.options[mode_id] = mode_label
self.preset_data.append((mode_id, mode_label, mode_data))
# log.info(f"Read custom mode id={mode_id}, label={mode_label}, data={mode_data}")
offset += 36

new_props["equalizer_preset_options"] = ",".join(self.options.values())
log.info(self.preset_data)

new_props["equalizer_preset_options"] = ",".join([l for _, l, _ in self.preset_data])
if self.w_custom and not self.w_fake_built_in:
new_props["equalizer_preset_create_options"] = ",".join(l for _, l, _ in FAKE_BUILT_IN_PRESETS)

current_mode = package.find_param(2)
if len(current_mode) == 1:
current_mode = int.from_bytes(current_mode, byteorder="big", signed=True)
new_props["equalizer_preset"] = self.options.get(current_mode, f"unknown_{current_mode}")

if current_mode in self.custom_preset_values:
data = _eq_bytes_to_array(self.custom_preset_values[current_mode])
new_props["equalizer_rows"] = json.dumps(data)
value = f"unknown_{current_mode}"
for p_id, label, data in self.preset_data:
if p_id == current_mode:
value = label
if data is not None:
new_props["equalizer_rows"] = json.dumps(_eq_bytes_to_array(data))
break
new_props["equalizer_preset"] = value

await self.driver.put_property("sound", None, new_props,
extend_group=True)


def _eq_bytes_to_array(data: bytes):
def _eq_bytes_to_array(data: bytes | str):
if isinstance(data, str):
data = bytes.fromhex(data)
return [int.from_bytes((x,), byteorder="big", signed=True) for x in data]


def _parse_custom_mode(data: bytes):
count_lines = data[1]
data_lines = data[2:2 + count_lines]
data_lines = data[2:2 + count_lines].hex()
label = data[2 + count_lines:].split(b"\x00", 1)[0].decode("utf8")
return data[0], label, data_lines


def _build_payload(mode_id: int, mode_str: str, data: bytes, action: int):
def _build_payload(mode_id: int, mode_str: str, data: bytes | str, action: int):
if isinstance(data, str):
data = bytes.fromhex(data)
return [
(1, mode_id),
(2, len(data)),
Expand Down
12 changes: 9 additions & 3 deletions openfreebuds/driver/huawei/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,22 @@ def to_table_string(self):
"""
Pretty-print this pacakge contents
"""
hex_len = 40
for p_type in self.parameters:
cand_l = self.parameters[p_type].hex()
if len(cand_l) > hex_len:
hex_len = len(cand_l)

out = build_table_row(12, "COMMAND_ID")
out += build_table_row(10, "2 bytes")
out += build_table_row(40, self.command_id.hex(), []) + "\n"
out += build_table_row(hex_len, self.command_id.hex(), []) + "\n"

out += 70 * "=" + "\n"
out += (30 + hex_len) * "=" + "\n"
for p_type in self.parameters:
p_value = self.parameters[p_type]
out += build_table_row(12, f"PARAM {p_type}")
out += build_table_row(10, f"{len(p_value)} bytes")
out += build_table_row(40, p_value.hex())
out += build_table_row(hex_len, p_value.hex())

if all(c < 128 for c in p_value):
# ASCII string
Expand Down
24 changes: 23 additions & 1 deletion openfreebuds_qt/app/module/sound_quality.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def __init__(self, *args, **kwargs):
self._eq_last_options: list[str] = []
self._eq_rows: list[QSlider] = []
self._last_preset_data: list[int] = []
self._last_copy_options: list[str] = []

self.setupUi(self)

Expand Down Expand Up @@ -101,7 +102,10 @@ async def update_ui(self, event: OfbCoreEvent):
if event.is_changed("sound", "equalizer_preset"):
value = sound.get("equalizer_preset")
options = sound.get("equalizer_preset_options")
copy_options = sound.get("equalizer_preset_create_options")
self.eq_root.setVisible(value is not None and options is not None)
if copy_options is not None:
self._last_copy_options = list(copy_options.split(","))
if options is not None:
self._eq_last_options = list(options.split(","))
fill_combo_box(self.eq_preset_box, self._eq_last_options, self.eq_preset_option_names, value)
Expand Down Expand Up @@ -140,7 +144,25 @@ async def on_sq_set_quality(self):
async def on_eq_preset_change(self, index: int):
async with qt_error_handler("OfbQtSoundQualityModule_ChangePreset", self.ctx):
value = self._eq_last_options[index]
await self.ofb.set_property("sound", "equalizer_preset", value)

if value in self._last_copy_options:
dialog = QMessageBox(QMessageBox.Icon.Information,
self.tr("Notice"),
self.tr("This preset isn't available in your device, it will be created as "
"custom preset."),
QMessageBox.StandardButton.Ok)
dialog.setWindowModality(Qt.WindowModality.WindowModal)
await run_dialog_async(dialog)

try:
await self.ofb.set_property("sound", "equalizer_preset", value)
except OfbTooManyItemsError:
dialog = QMessageBox(QMessageBox.Icon.Critical,
self.tr("Failed"),
self.tr("Can't create: too many custom preset created in device."),
QMessageBox.StandardButton.Ok)
dialog.setWindowModality(Qt.WindowModality.WindowModal)
await run_dialog_async(dialog)

@asyncSlot()
async def new_preset(self):
Expand Down
2 changes: 2 additions & 0 deletions openfreebuds_qt/qt_i18n.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ def get_eq_preset_names():
"equalizer_preset_hardbass": QApplication.translate("EqPresetName", "Bass-boost"),
"equalizer_preset_treble": QApplication.translate("EqPresetName", "Treble-boost"),
"equalizer_preset_voices": QApplication.translate("EqPresetName", "Voices"),
"equalizer_preset_symphony": QApplication.translate("EqPresetName", "Symphony"),
"equalizer_preset_hi_fi_live": QApplication.translate("EqPresetName", "Hi-Fi Live"),
}


Expand Down
17 changes: 17 additions & 0 deletions scripts/debug/huawei_pkg_parse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import sys
import traceback

from openfreebuds.driver.huawei.package import HuaweiSppPackage

while True:
try:
data = bytes.fromhex(input("HEX-stream: "))
if data[0] != b"\x5a":
data = b"\x5a\x00" + data.split(b"\x5a\x00", 1)[1]

print("")
print(f"HEX: {data.hex()}")
print("")
print(HuaweiSppPackage.from_bytes(data).to_table_string())
except Exception:
traceback.print_exc()

0 comments on commit fbb3ba9

Please sign in to comment.