diff --git a/pyproject.toml b/pyproject.toml index 40d2e2bea..bb3b435f3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,7 +33,7 @@ dependencies = [ "pydantic-settings==2.2.1", "pyopenssl==24.1", "pyyaml>=3.12", - "requests<2.32,>=2.31", + "requests<2.33,>=2.32", "typing-extensions>=4.6.3", "urllib3<2,>=1.26.8", "uvicorn[standard]==0.20", diff --git a/src/otaclient/app/boot_control/_common.py b/src/otaclient/app/boot_control/_common.py index 2f935e5a4..c31a50efb 100644 --- a/src/otaclient/app/boot_control/_common.py +++ b/src/otaclient/app/boot_control/_common.py @@ -194,6 +194,30 @@ def get_parent_dev(cls, child_device: str, *, raise_exception: bool = True) -> s cmd = ["lsblk", "-idpno", "PKNAME", child_device] return subprocess_check_output(cmd, raise_exception=raise_exception) + @classmethod + def get_device_tree( + cls, parent_dev: str, *, raise_exception: bool = True + ) -> list[str]: + """Get the device tree of a parent device. + + For example, for sda with 3 partitions, we will get: + ["/dev/sda", "/dev/sda1", "/dev/sda2", "/dev/sda3"] + + This function is implemented by calling: + lsblk -lnpo NAME + + Args: + parent_dev (str): The parent device to be checked. + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. + + Returns: + str: _description_ + """ + cmd = ["lsblk", "-lnpo", "NAME", parent_dev] + raw_res = subprocess_check_output(cmd, raise_exception=raise_exception) + return raw_res.splitlines() + @classmethod def set_ext4_fslabel(cls, dev: str, fslabel: str, *, raise_exception: bool = True): """Set to ext4 formatted . @@ -724,7 +748,7 @@ def prepare_standby_dev( # TODO: in the future if in-place update mode is implemented, do a # fschck over the standby slot file system. if fslabel: - CMDHelperFuncs.set_ext4_fslabel(self.active_slot_dev, fslabel=fslabel) + CMDHelperFuncs.set_ext4_fslabel(self.standby_slot_dev, fslabel=fslabel) def umount_all(self, *, ignore_error: bool = True): logger.debug("unmount standby slot and active slot mount point...") diff --git a/src/otaclient/app/boot_control/_rpi_boot.py b/src/otaclient/app/boot_control/_rpi_boot.py index c1c4228ce..e32e9371f 100644 --- a/src/otaclient/app/boot_control/_rpi_boot.py +++ b/src/otaclient/app/boot_control/_rpi_boot.py @@ -36,7 +36,7 @@ from otaclient.app.boot_control.configs import rpi_boot_cfg as cfg from otaclient.app.boot_control.protocol import BootControllerProtocol from otaclient_api.v2 import types as api_types -from otaclient_common.common import replace_atomic, subprocess_check_output +from otaclient_common.common import replace_atomic from otaclient_common.linux import subprocess_run_wrapper from otaclient_common.typing import StrOrPath @@ -94,12 +94,13 @@ def get_sysboot_files_fpath(boot_fname: BOOTFILES, slot: SlotID) -> Path: class _RPIBootControl: """Boot control helper for rpi4 support. - Expected partition layout: - /dev/sda: - - sda1: fat32, fslabel=systemb-boot - - sda2: ext4, fslabel=slot_a - - sda3: ext4, fslabel=slot_b + Supported partition layout: + /dev/sd: + - sd1: fat32, fslabel=systemb-boot + - sd2: ext4, fslabel=slot_a + - sd3: ext4, fslabel=slot_b slot is the fslabel for each AB rootfs. + NOTE that we allow extra partitions with ID after 3. This class provides the following features: 1. AB partition detection, @@ -128,54 +129,45 @@ def _init_slots_info(self) -> None: logger.debug("checking and initializing slots info...") try: # ------ detect active slot ------ # - _active_slot_dev = CMDHelperFuncs.get_current_rootfs_dev() - assert _active_slot_dev - self.active_slot_dev = _active_slot_dev - - _active_slot = CMDHelperFuncs.get_attrs_by_dev( - "LABEL", str(self.active_slot_dev) - ) - assert _active_slot - self.active_slot = SlotID(_active_slot) - - # ------ detect standby slot ------ # - # NOTE: using the similar logic like grub, detect the silibing dev - # of the active slot as standby slot - _parent = CMDHelperFuncs.get_parent_dev(str(self.active_slot_dev)) - assert _parent - - # list children device file from parent device - # exclude parent dev(always in the front) - # expected raw result from lsblk: - # NAME="/dev/sdx" - # NAME="/dev/sdx1" # system-boot - # NAME="/dev/sdx2" # slot_a - # NAME="/dev/sdx3" # slot_b - _check_dev_family_cmd = ["lsblk", "-Ppo", "NAME", _parent] - _raw_child_partitions = subprocess_check_output( - _check_dev_family_cmd, raise_exception=True - ) - + active_slot_dev = CMDHelperFuncs.get_current_rootfs_dev() + assert active_slot_dev + self.active_slot_dev = active_slot_dev + + # detect the parent device of boot device + # i.e., for /dev/sda2 here we get /dev/sda + parent_dev = CMDHelperFuncs.get_parent_dev(str(self.active_slot_dev)) + assert parent_dev + + # get device tree, for /dev/sda device, we will get: + # ["/dev/sda", "/dev/sda1", "/dev/sda2", "/dev/sda3"] + _device_tree = CMDHelperFuncs.get_device_tree(parent_dev) + # remove the parent dev itself and system-boot partition + device_tree = _device_tree[2:] + + # Now we should only have two partitions in the device_tree list: + # /dev/sda2, /dev/sda3 + # NOTE that we allow extra partitions presented after sd3. + assert ( + len(device_tree) >= 2 + ), f"unexpected partition layout: {_device_tree=}" + + # get the active slot ID by its position in the disk try: - # NOTE: exclude the first 2 lines(parent and system-boot) - _child_partitions = [ - raw.split("=")[-1].strip('"') - for raw in _raw_child_partitions.splitlines()[2:] - ] - if ( - len(_child_partitions) != 2 - or self.active_slot_dev not in _child_partitions - ): - raise ValueError - _child_partitions.remove(self.active_slot_dev) - except Exception: + idx = device_tree.index(active_slot_dev) + except ValueError: raise ValueError( - f"unexpected partition layout: {_raw_child_partitions}" - ) from None - # it is OK if standby_slot dev doesn't have fslabel or fslabel != standby_slot_id - # we will always set the fslabel - self.standby_slot = AB_FLIPS[self.active_slot] - self.standby_slot_dev = _child_partitions[0] + f"active lost is not in the device tree: {active_slot_dev=}, {device_tree=}" + ) + + if idx == 0: # slot_a + self.active_slot = SLOT_A + self.standby_slot = SLOT_B + self.standby_slot_dev = device_tree[1] + elif idx == 1: # slot_b + self.active_slot = SLOT_B + self.standby_slot = SLOT_A + self.standby_slot_dev = device_tree[0] + logger.info( f"rpi_boot: active_slot: {self.active_slot}({self.active_slot_dev}), " f"standby_slot: {self.standby_slot}({self.standby_slot_dev})"