From 07731082ebe9ba9d2c5ae6edd94fdb4cd102da5c Mon Sep 17 00:00:00 2001 From: Bodong Yang <86948717+Bodong-Yang@users.noreply.github.com> Date: Wed, 19 Jun 2024 10:35:26 +0900 Subject: [PATCH 1/2] refactor: rpi_boot: detects slot by partition tables, not by checking slot fslabel (#321) This PR refines the rpi_boot module to detect slot by examining the partition layout, not relying on reading the fslabel, and explicitly requires the expected partition tables(but support extra partitions after partition ID 3). Also this PR implements the feature that rpi_boot will correct the active slot's fslabel with correct slot_id after slot detection. --- src/otaclient/app/boot_control/_common.py | 26 +++- src/otaclient/app/boot_control/_rpi_boot.py | 131 +++++++++++--------- 2 files changed, 99 insertions(+), 58 deletions(-) diff --git a/src/otaclient/app/boot_control/_common.py b/src/otaclient/app/boot_control/_common.py index 2f935e5a4..c31a50efb 100644 --- a/src/otaclient/app/boot_control/_common.py +++ b/src/otaclient/app/boot_control/_common.py @@ -194,6 +194,30 @@ def get_parent_dev(cls, child_device: str, *, raise_exception: bool = True) -> s cmd = ["lsblk", "-idpno", "PKNAME", child_device] return subprocess_check_output(cmd, raise_exception=raise_exception) + @classmethod + def get_device_tree( + cls, parent_dev: str, *, raise_exception: bool = True + ) -> list[str]: + """Get the device tree of a parent device. + + For example, for sda with 3 partitions, we will get: + ["/dev/sda", "/dev/sda1", "/dev/sda2", "/dev/sda3"] + + This function is implemented by calling: + lsblk -lnpo NAME + + Args: + parent_dev (str): The parent device to be checked. + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. + + Returns: + str: _description_ + """ + cmd = ["lsblk", "-lnpo", "NAME", parent_dev] + raw_res = subprocess_check_output(cmd, raise_exception=raise_exception) + return raw_res.splitlines() + @classmethod def set_ext4_fslabel(cls, dev: str, fslabel: str, *, raise_exception: bool = True): """Set to ext4 formatted . @@ -724,7 +748,7 @@ def prepare_standby_dev( # TODO: in the future if in-place update mode is implemented, do a # fschck over the standby slot file system. if fslabel: - CMDHelperFuncs.set_ext4_fslabel(self.active_slot_dev, fslabel=fslabel) + CMDHelperFuncs.set_ext4_fslabel(self.standby_slot_dev, fslabel=fslabel) def umount_all(self, *, ignore_error: bool = True): logger.debug("unmount standby slot and active slot mount point...") diff --git a/src/otaclient/app/boot_control/_rpi_boot.py b/src/otaclient/app/boot_control/_rpi_boot.py index 330d7022b..d51b1ae7a 100644 --- a/src/otaclient/app/boot_control/_rpi_boot.py +++ b/src/otaclient/app/boot_control/_rpi_boot.py @@ -19,6 +19,7 @@ import logging import os import re +import subprocess from pathlib import Path from string import Template from typing import Generator @@ -33,11 +34,7 @@ from otaclient.app.boot_control.configs import rpi_boot_cfg as cfg from otaclient.app.boot_control.protocol import BootControllerProtocol from otaclient_api.v2 import types as api_types -from otaclient_common.common import ( - replace_atomic, - subprocess_call, - subprocess_check_output, -) +from otaclient_common.common import replace_atomic, subprocess_call logger = logging.getLogger(__name__) @@ -54,12 +51,13 @@ class _RPIBootControllerError(Exception): class _RPIBootControl: """Boot control helper for rpi4 support. - Expected partition layout: - /dev/sda: - - sda1: fat32, fslabel=systemb-boot - - sda2: ext4, fslabel=slot_a - - sda3: ext4, fslabel=slot_b + Supported partition layout: + /dev/sd: + - sd1: fat32, fslabel=systemb-boot + - sd2: ext4, fslabel=slot_a + - sd3: ext4, fslabel=slot_b slot is the fslabel for each AB rootfs. + NOTE that we allow extra partitions with ID after 3. This class provides the following features: 1. AB partition detection, @@ -90,60 +88,79 @@ def __init__(self) -> None: raise ValueError(_err_msg) self._init_slots_info() self._init_boot_files() + self._check_active_slot_id() - def _init_slots_info(self): - """Get current/standby slots info.""" - logger.debug("checking and initializing slots info...") - try: - # detect active slot - _active_slot_dev = CMDHelperFuncs.get_current_rootfs_dev() - assert _active_slot_dev - self._active_slot_dev = _active_slot_dev + def _check_active_slot_id(self): + """Check whether the active slot fslabel is matching the slot id. - _active_slot = CMDHelperFuncs.get_attrs_by_dev( - "LABEL", str(self._active_slot_dev) + If mismatched, try to correct the problem. + """ + fslabel = self.active_slot + actual_fslabel = CMDHelperFuncs.get_attrs_by_dev( + "LABEL", self.active_slot_dev, raise_exception=False + ) + if actual_fslabel == fslabel: + return + + logger.warning( + ( + f"current active slot is {fslabel}, but its fslabel is {actual_fslabel}, " + f"try to correct the fslabel with slot id {fslabel}..." ) - assert _active_slot - self._active_slot = _active_slot - - # detect standby slot - # NOTE: using the similar logic like grub, detect the silibing dev - # of the active slot as standby slot - _parent = CMDHelperFuncs.get_parent_dev(str(self._active_slot_dev)) - assert _parent - - # list children device file from parent device - # exclude parent dev(always in the front) - # expected raw result from lsblk: - # NAME="/dev/sdx" - # NAME="/dev/sdx1" # system-boot - # NAME="/dev/sdx2" # slot_a - # NAME="/dev/sdx3" # slot_b - _check_dev_family_cmd = ["lsblk", "-Ppo", "NAME", _parent] - _raw_child_partitions = subprocess_check_output( - _check_dev_family_cmd, raise_exception=True + ) + try: + CMDHelperFuncs.set_ext4_fslabel(self.active_slot_dev, fslabel) + os.sync() + except subprocess.CalledProcessError as e: + logger.error( + f"failed to correct the fslabel mismatched: {e!r}, {e.stderr.decode()}" ) + logger.error("this might cause problem on future OTA update!") + def _init_slots_info(self): + """Get current/standby slots info.""" + logger.debug("checking and initializing slots info...") + try: + # ------ detect active slot ------ # + active_slot_dev = CMDHelperFuncs.get_current_rootfs_dev() + assert active_slot_dev + self._active_slot_dev = active_slot_dev + + # detect the parent device of boot device + # i.e., for /dev/sda2 here we get /dev/sda + parent_dev = CMDHelperFuncs.get_parent_dev(str(self._active_slot_dev)) + assert parent_dev + + # get device tree, for /dev/sda device, we will get: + # ["/dev/sda", "/dev/sda1", "/dev/sda2", "/dev/sda3"] + _device_tree = CMDHelperFuncs.get_device_tree(parent_dev) + # remove the parent dev itself and system-boot partition + device_tree = _device_tree[2:] + + # Now we should only have two partitions in the device_tree list: + # /dev/sda2, /dev/sda3 + # NOTE that we allow extra partitions presented after sd3. + assert ( + len(device_tree) >= 2 + ), f"unexpected partition layout: {_device_tree=}" + + # get the active slot ID by its position in the disk try: - # NOTE: exclude the first 2 lines(parent and system-boot) - _child_partitions = [ - raw.split("=")[-1].strip('"') - for raw in _raw_child_partitions.splitlines()[2:] - ] - if ( - len(_child_partitions) != 2 - or self._active_slot_dev not in _child_partitions - ): - raise ValueError - _child_partitions.remove(self._active_slot_dev) - except Exception: + idx = device_tree.index(active_slot_dev) + except ValueError: raise ValueError( - f"unexpected partition layout: {_raw_child_partitions}" - ) from None - # it is OK if standby_slot dev doesn't have fslabel or fslabel != standby_slot_id - # we will always set the fslabel - self._standby_slot = self.AB_FLIPS[self._active_slot] - self._standby_slot_dev = _child_partitions[0] + f"active lost is not in the device tree: {active_slot_dev=}, {device_tree=}" + ) + + if idx == 0: # slot_a + self._active_slot = self.SLOT_A + self._standby_slot = self.SLOT_B + self._standby_slot_dev = device_tree[1] + elif idx == 1: # slot_b + self._active_slot = self.SLOT_B + self._standby_slot = self.SLOT_A + self._standby_slot_dev = device_tree[0] + logger.info( f"rpi_boot: active_slot: {self._active_slot}({self._active_slot_dev}), " f"standby_slot: {self._standby_slot}({self._standby_slot_dev})" From deeb27d562ca63748aead78adf8c6d8a0824270e Mon Sep 17 00:00:00 2001 From: Bodong Yang <86948717+Bodong-Yang@users.noreply.github.com> Date: Wed, 19 Jun 2024 19:01:29 +0900 Subject: [PATCH 2/2] refactor: rpi_boot: use chroot to run flash-kernel, ditch double reboot strategy (#318) This PR introduces one-step firmware install strategy to rpi_boot to deprecate the previously used double steps firmware update strategy. It also includes some internal refinements in rpi_boot. * New features 1. support flash-kernel directly in post-update phase with chroot to standby slot. 2. now if system-boot partition is not mounted, rpi_boot will try to mount it. 3. rpi_boot now allows extra partitions after partition ID 3. * Other changes 1. otaclient_common.linux: implement new subprocess_run_wrapper, which supports chroot. 2. otaclient.boot_control.common: implement a plain version of mount. --- src/otaclient/app/boot_control/_common.py | 31 + src/otaclient/app/boot_control/_rpi_boot.py | 542 +++++++++--------- src/otaclient/app/boot_control/configs.py | 12 - src/otaclient_common/common.py | 36 +- src/otaclient_common/linux.py | 56 ++ .../test_boot_control/test_rpi_boot.py | 371 ++++++------ 6 files changed, 542 insertions(+), 506 deletions(-) diff --git a/src/otaclient/app/boot_control/_common.py b/src/otaclient/app/boot_control/_common.py index c31a50efb..52b9ca2f2 100644 --- a/src/otaclient/app/boot_control/_common.py +++ b/src/otaclient/app/boot_control/_common.py @@ -32,6 +32,7 @@ subprocess_check_output, write_str_to_file_sync, ) +from otaclient_common.typing import StrOrPath logger = logging.getLogger(__name__) @@ -234,6 +235,36 @@ def set_ext4_fslabel(cls, dev: str, fslabel: str, *, raise_exception: bool = Tru cmd = ["e2label", dev, fslabel] subprocess_call(cmd, raise_exception=raise_exception) + @classmethod + def mount( + cls, + target: StrOrPath, + mount_point: StrOrPath, + *, + options: list[str] | None = None, + params: list[str] | None = None, + raise_exception: bool = True, + ) -> None: + """Thin wrapper to call mount using subprocess. + + This will call the following: + mount [-o ,[[,...]] [ [[...]]] + + Args: + target (StrOrPath): The target device to mount. + mount_point (StrOrPath): The mount point to mount to. + options (list[str] | None, optional): A list of options, append after -o. Defaults to None. + params (list[str] | None, optional): A list of params. Defaults to None. + raise_exception (bool, optional): Whether to raise exception on failed call. Defaults to True. + """ + cmd = ["mount"] + if options: + cmd.extend(["-o", ",".join(options)]) + if params: + cmd.extend(params) + cmd = [*cmd, str(target), str(mount_point)] + subprocess_call(cmd, raise_exception=raise_exception) + @classmethod def mount_rw( cls, target: str, mount_point: Path | str, *, raise_exception: bool = True diff --git a/src/otaclient/app/boot_control/_rpi_boot.py b/src/otaclient/app/boot_control/_rpi_boot.py index d51b1ae7a..19b980b46 100644 --- a/src/otaclient/app/boot_control/_rpi_boot.py +++ b/src/otaclient/app/boot_control/_rpi_boot.py @@ -16,13 +16,15 @@ from __future__ import annotations +import contextlib import logging import os -import re import subprocess from pathlib import Path from string import Template -from typing import Generator +from typing import Any, Generator, Literal + +from typing_extensions import Self import otaclient.app.errors as ota_errors from otaclient.app.boot_control._common import ( @@ -34,18 +36,62 @@ from otaclient.app.boot_control.configs import rpi_boot_cfg as cfg from otaclient.app.boot_control.protocol import BootControllerProtocol from otaclient_api.v2 import types as api_types -from otaclient_common.common import replace_atomic, subprocess_call +from otaclient_common.common import replace_atomic +from otaclient_common.linux import subprocess_run_wrapper +from otaclient_common.typing import StrOrPath logger = logging.getLogger(__name__) + +# ------ types ------ # +class SlotID(str): + """slot_id for A/B slots.""" + + VALID_SLOTS = ["slot_a", "slot_b"] + + def __new__(cls, _in: str | Self) -> Self: + if isinstance(_in, cls): + return _in + if _in in cls.VALID_SLOTS: + return str.__new__(cls, _in) + raise ValueError(f"{_in=} is not valid slot num, should be {cls.VALID_SLOTS=}") + + +class _RPIBootControllerError(Exception): + """rpi_boot module internal used exception.""" + + +# ------ consts ------ # +CONFIG_TXT = "config.txt" # primary boot cfg +TRYBOOT_TXT = "tryboot.txt" # tryboot boot cfg +VMLINUZ = "vmlinuz" +INITRD_IMG = "initrd.img" +CMDLINE_TXT = "cmdline.txt" + +SYSTEM_BOOT_FSLABEL = "system-boot" +SLOT_A = SlotID("slot_a") +SLOT_B = SlotID("slot_b") +AB_FLIPS = {SLOT_A: SLOT_B, SLOT_B: SLOT_A} +SEP_CHAR = "_" +"""separator between boot files name and slot suffix.""" + _FSTAB_TEMPLATE_STR = ( "LABEL=${rootfs_fslabel}\t/\text4\tdiscard,x-systemd.growfs\t0\t1\n" "LABEL=system-boot\t/boot/firmware\tvfat\tdefaults\t0\t1\n" ) -class _RPIBootControllerError(Exception): - """rpi_boot module internal used exception.""" +# ------ helper functions ------ # +BOOTFILES = Literal["vmlinuz", "initrd.img", "config.txt", "tryboot.txt", "cmdline.txt"] + + +def get_sysboot_files_fpath(boot_fname: BOOTFILES, slot: SlotID) -> Path: + """Get the boot files fpath for specific slot from /boot/firmware. + + For example, for vmlinuz for slot_a, we get /boot/firmware/vmlinuz_slot_a + """ + fname = f"{boot_fname}{SEP_CHAR}{slot}" + return Path(cfg.SYSTEM_BOOT_MOUNT_POINT) / fname class _RPIBootControl: @@ -56,40 +102,112 @@ class _RPIBootControl: - sd1: fat32, fslabel=systemb-boot - sd2: ext4, fslabel=slot_a - sd3: ext4, fslabel=slot_b - slot is the fslabel for each AB rootfs. + slot_id is also the fslabel for each AB rootfs. NOTE that we allow extra partitions with ID after 3. - This class provides the following features: - 1. AB partition detection, - 2. boot files checking, - 3. switch boot(tryboot.txt setup and tryboot reboot), - 4. finalize switching boot. - Boot files for each slot have the following naming format: - _ - i.e., config.txt for slot_a will be config.txt_slot_a + + For example, config.txt for slot_a will be config.txt_slot_a """ - SLOT_A = cfg.SLOT_A_FSLABEL - SLOT_B = cfg.SLOT_B_FSLABEL - AB_FLIPS = { - SLOT_A: SLOT_B, - SLOT_B: SLOT_A, - } - SEP_CHAR = "_" - def __init__(self) -> None: - self.system_boot_path = Path(cfg.SYSTEM_BOOT_MOUNT_POINT) + self.system_boot_mp = Path(cfg.SYSTEM_BOOT_MOUNT_POINT) + self.system_boot_mp.mkdir(exist_ok=True) + + # sanity check, ensure we are running at raspberry pi device + model_fpath = Path(cfg.RPI_MODEL_FILE) + err_not_rpi_device = f"{cfg.RPI_MODEL_FILE} doesn't exist! Are we running at raspberry pi device?" + if not model_fpath.is_file(): + logger.error(err_not_rpi_device) + raise _RPIBootControllerError(err_not_rpi_device) + + model_info = model_fpath.read_text() + if model_info.find("Pi") == -1: + raise _RPIBootControllerError(err_not_rpi_device) + logger.info(f"{model_info=}") + + try: + # ------ detect active slot ------ # + active_slot_dev = CMDHelperFuncs.get_current_rootfs_dev() + assert active_slot_dev + self.active_slot_dev = active_slot_dev + except Exception as e: + _err_msg = f"failed to detect current rootfs device: {e!r}" + logger.error(_err_msg) + raise _RPIBootControllerError(_err_msg) from e + + try: + # detect the parent device of boot device + # i.e., for /dev/sda2 here we get /dev/sda + parent_dev = CMDHelperFuncs.get_parent_dev(str(self.active_slot_dev)) + + # get device tree, for /dev/sda device, we will get: + # ["/dev/sda", "/dev/sda1", "/dev/sda2", "/dev/sda3"] + device_tree = CMDHelperFuncs.get_device_tree(parent_dev) + logger.info(device_tree) + + # NOTE that we allow extra partitions presented after sd3. + assert len(device_tree) >= 4, "need at least 3 partitions" + except Exception as e: + _err_msg = f"failed to detect partition layout: {e!r}" + logger.error(_err_msg) + raise _RPIBootControllerError(_err_msg) + + # check system-boot partition mount + system_boot_partition = device_tree[1] if not CMDHelperFuncs.is_target_mounted( - self.system_boot_path, raise_exception=False + self.system_boot_mp, raise_exception=False ): - _err_msg = "system-boot is not presented or not mounted!" - logger.error(_err_msg) - raise ValueError(_err_msg) - self._init_slots_info() - self._init_boot_files() + _err_msg = f"system-boot is not mounted at {self.system_boot_mp}, try to mount it..." + logger.warning(_err_msg) + + try: + CMDHelperFuncs.mount( + system_boot_partition, + self.system_boot_mp, + options=["defaults"], + ) + except subprocess.CalledProcessError as e: + _err_msg = ( + f"failed to mount system-boot partition: {e!r}, {e.stderr.decode()}" + ) + logger.error(_err_msg) + raise _RPIBootControllerError(_err_msg) from e + + # check slots + # sd2 and sd3 + rootfs_partitions = device_tree[2:4] + + # get the active slot ID by its position in the disk + try: + idx = rootfs_partitions.index(active_slot_dev) + except ValueError: + raise _RPIBootControllerError( + f"active slot dev not found: {active_slot_dev=}, {rootfs_partitions=}" + ) + + if idx == 0: # slot_a + self.active_slot = SLOT_A + self.standby_slot = SLOT_B + self.standby_slot_dev = rootfs_partitions[1] + elif idx == 1: # slot_b + self.active_slot = SLOT_B + self.standby_slot = SLOT_A + self.standby_slot_dev = rootfs_partitions[0] + + logger.info( + f"rpi_boot: active_slot: {self.active_slot}({self.active_slot_dev}), " + f"standby_slot: {self.standby_slot}({self.standby_slot_dev})" + ) + + # ------ continue rpi_boot starts up ------ # + self._check_boot_files() self._check_active_slot_id() + # NOTE(20240604): for backward compatibility, always remove flag file + flag_file = Path(cfg.SYSTEM_BOOT_MOUNT_POINT) / cfg.SWITCH_BOOT_FLAG_FILE + flag_file.unlink(missing_ok=True) + def _check_active_slot_id(self): """Check whether the active slot fslabel is matching the slot id. @@ -117,229 +235,162 @@ def _check_active_slot_id(self): ) logger.error("this might cause problem on future OTA update!") - def _init_slots_info(self): - """Get current/standby slots info.""" - logger.debug("checking and initializing slots info...") - try: - # ------ detect active slot ------ # - active_slot_dev = CMDHelperFuncs.get_current_rootfs_dev() - assert active_slot_dev - self._active_slot_dev = active_slot_dev - - # detect the parent device of boot device - # i.e., for /dev/sda2 here we get /dev/sda - parent_dev = CMDHelperFuncs.get_parent_dev(str(self._active_slot_dev)) - assert parent_dev - - # get device tree, for /dev/sda device, we will get: - # ["/dev/sda", "/dev/sda1", "/dev/sda2", "/dev/sda3"] - _device_tree = CMDHelperFuncs.get_device_tree(parent_dev) - # remove the parent dev itself and system-boot partition - device_tree = _device_tree[2:] - - # Now we should only have two partitions in the device_tree list: - # /dev/sda2, /dev/sda3 - # NOTE that we allow extra partitions presented after sd3. - assert ( - len(device_tree) >= 2 - ), f"unexpected partition layout: {_device_tree=}" - - # get the active slot ID by its position in the disk - try: - idx = device_tree.index(active_slot_dev) - except ValueError: - raise ValueError( - f"active lost is not in the device tree: {active_slot_dev=}, {device_tree=}" - ) - - if idx == 0: # slot_a - self._active_slot = self.SLOT_A - self._standby_slot = self.SLOT_B - self._standby_slot_dev = device_tree[1] - elif idx == 1: # slot_b - self._active_slot = self.SLOT_B - self._standby_slot = self.SLOT_A - self._standby_slot_dev = device_tree[0] - - logger.info( - f"rpi_boot: active_slot: {self._active_slot}({self._active_slot_dev}), " - f"standby_slot: {self._standby_slot}({self._standby_slot_dev})" - ) - except Exception as e: - _err_msg = f"failed to detect AB partition: {e!r}" - logger.error(_err_msg) - raise _RPIBootControllerError(_err_msg) from e - - def _init_boot_files(self): + def _check_boot_files(self): """Check the availability of boot files. The following boot files will be checked: 1. config.txt_ (required) 2. cmdline.txt_ (required) - 3. vmlinuz_ - 4. initrd.img_ If any of the required files are missing, BootControlInitError will be raised. In such case, a reinstall/setup of AB partition boot files is required. """ logger.debug("checking boot files...") - # boot file - self.config_txt = self.system_boot_path / cfg.CONFIG_TXT - self.tryboot_txt = self.system_boot_path / cfg.TRYBOOT_TXT + active_slot, standby_slot = self.active_slot, self.standby_slot - # active slot - self.config_txt_active_slot = ( - self.system_boot_path / f"{cfg.CONFIG_TXT}{self.SEP_CHAR}{self.active_slot}" - ) - if not self.config_txt_active_slot.is_file(): - _err_msg = f"missing {self.config_txt_active_slot=}" + # ------ check active slot boot files ------ # + config_txt_active_slot = get_sysboot_files_fpath(CONFIG_TXT, active_slot) + if not config_txt_active_slot.is_file(): + _err_msg = f"missing {config_txt_active_slot=}" logger.error(_err_msg) raise _RPIBootControllerError(_err_msg) - self.cmdline_txt_active_slot = ( - self.system_boot_path - / f"{cfg.CMDLINE_TXT}{self.SEP_CHAR}{self.active_slot}" - ) - if not self.cmdline_txt_active_slot.is_file(): - _err_msg = f"missing {self.cmdline_txt_active_slot=}" + + cmdline_txt_active_slot = get_sysboot_files_fpath(CMDLINE_TXT, active_slot) + if not cmdline_txt_active_slot.is_file(): + _err_msg = f"missing {cmdline_txt_active_slot=}" logger.error(_err_msg) raise _RPIBootControllerError(_err_msg) - self.vmlinuz_active_slot = ( - self.system_boot_path / f"{cfg.VMLINUZ}{self.SEP_CHAR}{self.active_slot}" - ) - self.initrd_img_active_slot = ( - self.system_boot_path / f"{cfg.INITRD_IMG}{self.SEP_CHAR}{self.active_slot}" - ) - # standby slot - self.config_txt_standby_slot = ( - self.system_boot_path - / f"{cfg.CONFIG_TXT}{self.SEP_CHAR}{self.standby_slot}" - ) - if not self.config_txt_standby_slot.is_file(): - _err_msg = f"missing {self.config_txt_standby_slot=}" + + # ------ check standby slot boot files ------ # + config_txt_standby_slot = get_sysboot_files_fpath(CONFIG_TXT, standby_slot) + if not config_txt_standby_slot.is_file(): + _err_msg = f"missing {config_txt_standby_slot=}" logger.error(_err_msg) raise _RPIBootControllerError(_err_msg) - self.cmdline_txt_standby_slot = ( - self.system_boot_path - / f"{cfg.CMDLINE_TXT}{self.SEP_CHAR}{self.standby_slot}" - ) - if not self.cmdline_txt_standby_slot.is_file(): - _err_msg = f"missing {self.cmdline_txt_standby_slot=}" + + cmdline_txt_standby_slot = get_sysboot_files_fpath(CMDLINE_TXT, standby_slot) + if not cmdline_txt_standby_slot.is_file(): + _err_msg = f"missing {cmdline_txt_standby_slot=}" logger.error(_err_msg) raise _RPIBootControllerError(_err_msg) - self.vmlinuz_standby_slot = ( - self.system_boot_path / f"{cfg.VMLINUZ}{self.SEP_CHAR}{self.standby_slot}" - ) - self.initrd_img_standby_slot = ( - self.system_boot_path - / f"{cfg.INITRD_IMG}{self.SEP_CHAR}{self.standby_slot}" + + @staticmethod + @contextlib.contextmanager + def _prepare_flash_kernel(target_slot_mp: StrOrPath) -> Generator[None, Any, None]: + """Do a bind mount of /boot/firmware, /proc and /sys to the standby slot, + preparing for calling flash-kernel with chroot. + + flash-kernel requires at least these mounts to work properly. + """ + target_slot_mp = Path(target_slot_mp) + mounts: dict[str, str] = {} + + # we need to mount /proc, /sys and /boot/firmware to make flash-kernel works + system_boot_mp = target_slot_mp / Path(cfg.SYSTEM_BOOT_MOUNT_POINT).relative_to( + "/" ) + mounts[str(system_boot_mp)] = cfg.SYSTEM_BOOT_MOUNT_POINT - def _update_firmware(self): + proc_mp = target_slot_mp / "proc" + mounts[str(proc_mp)] = "/proc" + + sys_mp = target_slot_mp / "sys" + mounts[str(sys_mp)] = "/sys" + + try: + for _mp, _src in mounts.items(): + CMDHelperFuncs.mount( + _src, + _mp, + options=["bind"], + params=["--make-unbindable"], + ) + yield + # NOTE: passthrough the mount failure to caller + finally: + for _mp in mounts: + CMDHelperFuncs.umount(_mp, raise_exception=False) + + def update_firmware(self, *, target_slot: SlotID, target_slot_mp: StrOrPath): """Call flash-kernel to install new dtb files, boot firmwares and kernel, initrd.img - from current rootfs to system-boot partition. + from target slot. + + The following things will be done: + 1. bind mount the /boot/firmware and /proc into the target slot. + 2. chroot into the target slot's rootfs, execute flash-kernel """ - logger.info("update firmware with flash-kernel...") + logger.info(f"try to flash-kernel from {target_slot}...") try: - subprocess_call("flash-kernel", raise_exception=True) - os.sync() - except Exception as e: - _err_msg = f"flash-kernel failed: {e!r}" + with self._prepare_flash_kernel(target_slot_mp): + subprocess_run_wrapper( + ["/usr/sbin/flash-kernel"], + check=True, + check_output=True, + chroot=target_slot_mp, + # must set this env variable to make flash-kernel work under chroot + env={"FK_FORCE": "yes"}, + ) + os.sync() + except subprocess.CalledProcessError as e: + _err_msg = f"flash-kernel failed: {e!r}\nstderr: {e.stderr.decode()}\nstdout: {e.stdout.decode()}" logger.error(_err_msg) raise _RPIBootControllerError(_err_msg) try: - # check if the vmlinuz and initrd.img presented in /boot/firmware(system-boot), - # if so, it means that flash-kernel works and copies the kernel, inird.img from /boot, - # then we rename vmlinuz and initrd.img to vmlinuz_ and initrd.img_ - if (_vmlinuz := Path(cfg.SYSTEM_BOOT_MOUNT_POINT) / cfg.VMLINUZ).is_file(): - os.replace(_vmlinuz, self.vmlinuz_active_slot) - if ( - _initrd_img := Path(cfg.SYSTEM_BOOT_MOUNT_POINT) / cfg.INITRD_IMG - ).is_file(): - os.replace(_initrd_img, self.initrd_img_active_slot) + # flash-kernel will install the kernel and initrd.img files from /boot to /boot/firmware + if (vmlinuz := self.system_boot_mp / VMLINUZ).is_file(): + os.replace( + vmlinuz, + get_sysboot_files_fpath(VMLINUZ, target_slot), + ) + + if (initrd_img := self.system_boot_mp / INITRD_IMG).is_file(): + os.replace( + initrd_img, + get_sysboot_files_fpath(INITRD_IMG, target_slot), + ) + + # NOTE(20240603): for backward compatibility(downgrade), still create the flag file. + # The present of flag files means the firmware is updated. + flag_file = Path(cfg.SYSTEM_BOOT_MOUNT_POINT) / cfg.SWITCH_BOOT_FLAG_FILE + flag_file.write_text("") os.sync() except Exception as e: - _err_msg = ( - f"apply new kernel,initrd.img for {self.active_slot} failed: {e!r}" - ) + _err_msg = f"failed to apply new kernel,initrd.img for {target_slot}: {e!r}" logger.error(_err_msg) raise _RPIBootControllerError(_err_msg) # exposed API methods/properties - @property - def active_slot(self) -> str: - return self._active_slot - - @property - def standby_slot(self) -> str: - return self._standby_slot - - @property - def standby_slot_dev(self) -> str: - return self._standby_slot_dev - - @property - def active_slot_dev(self) -> str: - return self._active_slot_dev def finalize_switching_boot(self) -> bool: """Finalize switching boot by swapping config.txt and tryboot.txt if we should. Finalize switch boot: - 1. atomically replace tryboot.txt with tryboot.txt_standby_slot - 2. atomically replace config.txt with config.txt_active_slot - - Two-stage reboot: - In the first reboot after ota update applied, finalize_switching_boot will try - to update the firmware by calling external flash-kernel system script, finalize - the switch boot and then reboot again to apply the new firmware. - In the second reboot, finalize_switching_boot will do nothing but just return True. - - NOTE: we use a SWITCH_BOOT_FLAG_FILE to distinguish which reboot we are right now. - If SWITCH_BOOT_FLAG_FILE presented, we know that we are second reboot, - Otherwise, we know that we are at first reboot after switching boot. + 1. atomically replace tryboot.txt with tryboot.txt_ + 2. atomically replace config.txt with config.txt_ Returns: A bool indicates whether the switch boot succeeded or not. Note that no exception will be raised if finalizing failed. """ logger.info("finalizing switch boot...") - try: - _flag_file = self.system_boot_path / cfg.SWITCH_BOOT_FLAG_FILE - if _flag_file.is_file(): - # we are just after second reboot, after firmware_update, - # finalize the switch boot and then return True - logger.info("[rpi-boot]: just after 2nd reboot, finish up OTA") - - # cleanup bak files generated by flash-kernel script as - # we actually don't use those files - for _bak_file in self.system_boot_path.glob("**/*.bak"): - _bak_file.unlink(missing_ok=True) - _flag_file.unlink(missing_ok=True) - os.sync() - return True - - else: - # we are just after first reboot after ota update applied, - # finalize the switch boot, install the new firmware and then reboot again - logger.info( - "[rpi-boot]: just after 1st reboot, update firmware and finalizing boot files" - ) + current_slot, standby_slot = self.active_slot, self.standby_slot + config_txt_current = get_sysboot_files_fpath(CONFIG_TXT, current_slot) + config_txt_standby = get_sysboot_files_fpath(CONFIG_TXT, standby_slot) - replace_atomic(self.config_txt_active_slot, self.config_txt) - replace_atomic(self.config_txt_standby_slot, self.tryboot_txt) - logger.info( - "finalizing boot configuration," - f"replace {self.config_txt=} with {self.config_txt_active_slot=}, " - f"replace {self.tryboot_txt=} with {self.config_txt_standby_slot=}" - ) - self._update_firmware() - # set the flag file - write_str_to_file_sync(_flag_file, "") - # reboot to the same slot to apply the new firmware + try: + replace_atomic(config_txt_current, self.system_boot_mp / CONFIG_TXT) + replace_atomic(config_txt_standby, self.system_boot_mp / TRYBOOT_TXT) + logger.info( + "finalizing boot configuration," + f"replace {CONFIG_TXT} with {config_txt_current=}, " + f"replace {TRYBOOT_TXT} with {config_txt_standby=}" + ) - logger.info("reboot to apply new firmware...") - CMDHelperFuncs.reboot() # this function will make otaclient exit immediately + # on success switching boot, cleanup the bak files created by flash-kernel + for _bak_file in self.system_boot_mp.glob("**/*.bak"): + _bak_file.unlink(missing_ok=True) + return True except Exception as e: _err_msg = f"failed to finalize boot switching: {e!r}" logger.error(_err_msg) @@ -347,14 +398,14 @@ def finalize_switching_boot(self) -> bool: def prepare_tryboot_txt(self): """Copy the standby slot's config.txt as tryboot.txt.""" - logger.debug("prepare tryboot.txt...") try: - replace_atomic(self.config_txt_standby_slot, self.tryboot_txt) - logger.info( - f"replace {self.tryboot_txt=} with {self.config_txt_standby_slot=}" + replace_atomic( + get_sysboot_files_fpath(CONFIG_TXT, self.standby_slot), + self.system_boot_mp / TRYBOOT_TXT, ) + logger.info(f"set {TRYBOOT_TXT} as {self.standby_slot}'s one") except Exception as e: - _err_msg = f"failed to prepare tryboot.txt for {self._standby_slot}" + _err_msg = f"failed to prepare tryboot.txt for {self.standby_slot}" logger.error(_err_msg) raise _RPIBootControllerError(_err_msg) from e @@ -362,6 +413,7 @@ def reboot_tryboot(self): """Reboot with tryboot flag.""" logger.info(f"tryboot reboot to standby slot({self.standby_slot})...") try: + # NOTE: "0 tryboot" is a single param. CMDHelperFuncs.reboot(args=["0 tryboot"]) except Exception as e: _err_msg = "failed to reboot" @@ -393,69 +445,12 @@ def __init__(self) -> None: / Path(cfg.OTA_STATUS_DIR).relative_to("/"), finalize_switching_boot=self._rpiboot_control.finalize_switching_boot, ) - - # 20230613: remove any leftover flag file if ota_status is not UPDATING/ROLLBACKING - if self._ota_status_control.booted_ota_status not in ( - api_types.StatusOta.UPDATING, - api_types.StatusOta.ROLLBACKING, - ): - _flag_file = ( - self._rpiboot_control.system_boot_path / cfg.SWITCH_BOOT_FLAG_FILE - ) - _flag_file.unlink(missing_ok=True) - - logger.debug("rpi_boot initialization finished") + logger.info("rpi_boot starting finished") except Exception as e: _err_msg = f"failed to start rpi boot controller: {e!r}" logger.error(_err_msg) raise ota_errors.BootControlStartupFailed(_err_msg, module=__name__) from e - def _copy_kernel_for_standby_slot(self): - """Copy the kernel and initrd_img files from current slot /boot - to system-boot for standby slot. - - This method will checkout the vmlinuz- and initrd.img- - under /boot, and copy them to /boot/firmware(system-boot partition) under the name - vmlinuz_ and initrd.img_. - """ - logger.debug( - "prepare standby slot's kernel/initrd.img to system-boot partition..." - ) - try: - # search for kernel - _kernel_pa, _kernel_ver = ( - re.compile(rf"{cfg.VMLINUZ}-(?P.*)"), - None, - ) - # NOTE: if there is multiple kernel, pick the first one we encounted - # NOTE 2: according to ota-image specification, it should only be one - # version of kernel and initrd.img - for _candidate in self._mp_control.standby_boot_dir.glob( - f"{cfg.VMLINUZ}-*" - ): - if _ma := _kernel_pa.match(_candidate.name): - _kernel_ver = _ma.group("kernel_ver") - break - - if _kernel_ver is not None: - _kernel, _initrd_img = ( - self._mp_control.standby_boot_dir / f"{cfg.VMLINUZ}-{_kernel_ver}", - self._mp_control.standby_boot_dir - / f"{cfg.INITRD_IMG}-{_kernel_ver}", - ) - _kernel_sysboot, _initrd_img_sysboot = ( - self._rpiboot_control.vmlinuz_standby_slot, - self._rpiboot_control.initrd_img_standby_slot, - ) - replace_atomic(_kernel, _kernel_sysboot) - replace_atomic(_initrd_img, _initrd_img_sysboot) - else: - raise ValueError("failed to kernel in /boot folder at standby slot") - except Exception as e: - _err_msg = "failed to copy kernel/initrd_img for standby slot" - logger.error(_err_msg) - raise _RPIBootControllerError(f"{e!r}") - def _write_standby_fstab(self): """Override the standby's fstab file. @@ -502,12 +497,6 @@ def pre_update(self, version: str, *, standby_as_ref: bool, erase_standby: bool) ### update standby slot's ota_status files ### self._ota_status_control.pre_update_standby(version=version) - - # 20230613: remove any leftover flag file if presented - _flag_file = ( - self._rpiboot_control.system_boot_path / cfg.SWITCH_BOOT_FLAG_FILE - ) - _flag_file.unlink(missing_ok=True) except Exception as e: _err_msg = f"failed on pre_update: {e!r}" logger.error(_err_msg) @@ -544,9 +533,12 @@ def post_rollback(self): def post_update(self) -> Generator[None, None, None]: try: logger.info("rpi_boot: post-update setup...") - self._copy_kernel_for_standby_slot() self._mp_control.preserve_ota_folder_to_standby() self._write_standby_fstab() + self._rpiboot_control.update_firmware( + target_slot=self._rpiboot_control.standby_slot, + target_slot_mp=self._mp_control.standby_slot_mount_point, + ) self._rpiboot_control.prepare_tryboot_txt() self._mp_control.umount_all(ignore_error=True) yield # hand over control back to otaclient diff --git a/src/otaclient/app/boot_control/configs.py b/src/otaclient/app/boot_control/configs.py index 4369990bd..53a6e7c6c 100644 --- a/src/otaclient/app/boot_control/configs.py +++ b/src/otaclient/app/boot_control/configs.py @@ -67,21 +67,9 @@ class RPIBootControlConfig(BaseConfig): RPI_MODEL_FILE = "/proc/device-tree/model" RPI_MODEL_HINT = "Raspberry Pi 4 Model B" - # slot configuration - SLOT_A_FSLABEL = "slot_a" - SLOT_B_FSLABEL = "slot_b" - SYSTEM_BOOT_FSLABEL = "system-boot" - # boot folders SYSTEM_BOOT_MOUNT_POINT = "/boot/firmware" OTA_STATUS_DIR = "/boot/ota-status" - - # boot related files - CONFIG_TXT = "config.txt" # primary boot cfg - TRYBOOT_TXT = "tryboot.txt" # tryboot boot cfg - VMLINUZ = "vmlinuz" - INITRD_IMG = "initrd.img" - CMDLINE_TXT = "cmdline.txt" SWITCH_BOOT_FLAG_FILE = "._ota_switch_boot_finalized" diff --git a/src/otaclient_common/common.py b/src/otaclient_common/common.py index 10433bc2f..49faf21c8 100644 --- a/src/otaclient_common/common.py +++ b/src/otaclient_common/common.py @@ -22,7 +22,6 @@ import logging import os -import shlex import shutil import subprocess import time @@ -33,6 +32,8 @@ import requests +from otaclient_common.linux import subprocess_run_wrapper + logger = logging.getLogger(__name__) @@ -101,39 +102,6 @@ def write_str_to_file_sync(path: Union[Path, str], input: str): os.fsync(f.fileno()) -def subprocess_run_wrapper( - cmd: str | list[str], - *, - check: bool, - check_output: bool, - timeout: Optional[float] = None, -) -> subprocess.CompletedProcess[bytes]: - """A wrapper for subprocess.run method. - - NOTE: this is for the requirement of customized subprocess call - in the future, like chroot or nsenter before execution. - - Args: - cmd (str | list[str]): command to be executed. - check (bool): if True, raise CalledProcessError on non 0 return code. - check_output (bool): if True, the UTF-8 decoded stdout will be returned. - timeout (Optional[float], optional): timeout for execution. Defaults to None. - - Returns: - subprocess.CompletedProcess[bytes]: the result of the execution. - """ - if isinstance(cmd, str): - cmd = shlex.split(cmd) - - return subprocess.run( - cmd, - check=check, - stderr=subprocess.PIPE, - stdout=subprocess.PIPE if check_output else None, - timeout=timeout, - ) - - def subprocess_check_output( cmd: str | list[str], *, diff --git a/src/otaclient_common/linux.py b/src/otaclient_common/linux.py index 4c94596d9..0cc34df61 100644 --- a/src/otaclient_common/linux.py +++ b/src/otaclient_common/linux.py @@ -15,8 +15,14 @@ from __future__ import annotations +import os +import shlex +import subprocess from pathlib import Path from subprocess import check_call +from typing import Any, Callable, Optional + +from otaclient_common.typing import StrOrPath # # ------ swapfile handling ------ # @@ -150,3 +156,53 @@ def map_gid_by_grpnam(*, src_db: ParsedGroup, dst_db: ParsedGroup, gid: int) -> return dst_db._by_name[src_db._by_gid[gid]] except KeyError: raise ValueError(f"failed to find mapping for {gid}") + + +# +# ------ subprocess call ------ # +# + + +def subprocess_run_wrapper( + cmd: str | list[str], + *, + check: bool, + check_output: bool, + chroot: Optional[StrOrPath] = None, + env: Optional[dict[str, str]] = None, + timeout: Optional[float] = None, +) -> subprocess.CompletedProcess[bytes]: + """A wrapper for subprocess.run method. + + NOTE: this is for the requirement of customized subprocess call + in the future, like chroot or nsenter before execution. + + Args: + cmd (str | list[str]): command to be executed. + check (bool): if True, raise CalledProcessError on non 0 return code. + check_output (bool): if True, the UTF-8 decoded stdout will be returned. + timeout (Optional[float], optional): timeout for execution. Defaults to None. + + Returns: + subprocess.CompletedProcess[bytes]: the result of the execution. + """ + if isinstance(cmd, str): + cmd = shlex.split(cmd) + + preexec_fn: Optional[Callable[..., Any]] = None + if chroot: + + def _chroot(): + os.chroot(chroot) + + preexec_fn = _chroot + + return subprocess.run( + cmd, + check=check, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE if check_output else None, + timeout=timeout, + preexec_fn=preexec_fn, + env=env, + ) diff --git a/tests/test_otaclient/test_boot_control/test_rpi_boot.py b/tests/test_otaclient/test_boot_control/test_rpi_boot.py index f4b5b943e..b8e99d9dc 100644 --- a/tests/test_otaclient/test_boot_control/test_rpi_boot.py +++ b/tests/test_otaclient/test_boot_control/test_rpi_boot.py @@ -8,7 +8,8 @@ import pytest import pytest_mock -from otaclient.app.boot_control._rpi_boot import _FSTAB_TEMPLATE_STR +from otaclient.app.boot_control import _rpi_boot +from otaclient.app.boot_control._common import CMDHelperFuncs, SlotMountHelper from otaclient.app.boot_control.configs import rpi_boot_cfg from otaclient_api.v2 import types as api_types from tests.conftest import TestConfiguration as cfg @@ -16,68 +17,72 @@ logger = logging.getLogger(__name__) +# slot config +SLOT_A = "slot_a" +SLOT_B = "slot_b" +SLOT_A_DEV = "/dev/sda2" +SLOT_B_DEV = "/dev/sda3" +SEP_CHAR = "_" -class _RPIBootTestCfg: - # slot config - SLOT_A = rpi_boot_cfg.SLOT_A_FSLABEL - SLOT_B = rpi_boot_cfg.SLOT_B_FSLABEL - SLOT_A_DEV = "slot_a_dev" - SLOT_B_DEV = "slot_b_dev" - SEP_CHAR = "_" +# dummy boot files content +CONFIG_TXT_SLOT_A = "config_txt_slot_a" +CONFIG_TXT_SLOT_B = "config_txt_slot_b" +CMDLINE_TXT_SLOT_A = "cmdline_txt_slot_a" +CMDLINE_TXT_SLOT_B = "cmdline_txt_slot_b" - # dummy boot files content - CONFIG_TXT_SLOT_A = "config_txt_slot_a" - CONFIG_TXT_SLOT_B = "config_txt_slot_b" - CMDLINE_TXT_SLOT_A = "cmdline_txt_slot_a" - CMDLINE_TXT_SLOT_B = "cmdline_txt_slot_b" +# module path +RPI_BOOT_MODULE_PATH = "otaclient.app.boot_control._rpi_boot" +rpi_boot__RPIBootControl_MODULE = f"{RPI_BOOT_MODULE_PATH}._RPIBootControl" +rpi_boot_RPIBoot_CMDHelperFuncs_MODULE = f"{RPI_BOOT_MODULE_PATH}.CMDHelperFuncs" +boot_control_common_CMDHelperFuncs_MODULE = ( + f"{cfg.BOOT_CONTROL_COMMON_MODULE_PATH}.CMDHelperFuncs" +) - # module path - rpi_boot__RPIBootControl_MODULE = f"{cfg.RPI_BOOT_MODULE_PATH}._RPIBootControl" - rpi_boot_RPIBoot_CMDHelperFuncs_MODULE = ( - f"{cfg.RPI_BOOT_MODULE_PATH}.CMDHelperFuncs" - ) - boot_control_common_CMDHelperFuncs_MODULE = ( - f"{cfg.BOOT_CONTROL_COMMON_MODULE_PATH}.CMDHelperFuncs" - ) - - # image version - VERSION = "rpi_boot_test" +# image version +VERSION = "rpi_boot_test" class RPIBootABPartitionFSM: def __init__(self) -> None: - self._active_slot = _RPIBootTestCfg.SLOT_A - self._standby_slot = _RPIBootTestCfg.SLOT_B - self._active_slot_dev = _RPIBootTestCfg.SLOT_A_DEV - self._standby_slot_dev = _RPIBootTestCfg.SLOT_B_DEV + self.active_slot = SLOT_A + self.standby_slot = SLOT_B + self.active_slot_dev = SLOT_A_DEV + self.standby_slot_dev = SLOT_B_DEV + self.parent_dev = "/dev/sda" self.is_switched_boot = False def reboot_tryboot(self): - logger.info(f"tryboot to {self._standby_slot=}") + logger.info(f"tryboot to {self.standby_slot=}") self.is_switched_boot = True - self._active_slot, self._standby_slot = self._standby_slot, self._active_slot - self._active_slot_dev, self._standby_slot_dev = ( - self._standby_slot_dev, - self._active_slot_dev, + self.active_slot, self.standby_slot = self.standby_slot, self.active_slot + self.active_slot_dev, self.standby_slot_dev = ( + self.standby_slot_dev, + self.active_slot_dev, ) - def get_active_slot(self) -> str: - return self._active_slot - - def get_standby_slot(self) -> str: - return self._standby_slot - - def get_standby_slot_dev(self) -> str: - return self._standby_slot_dev - - def get_active_slot_dev(self) -> str: - return self._active_slot_dev + def get_current_rootfs_dev(self): + return self.active_slot_dev + def get_parent_dev(self, *args, **kwags): + return self.parent_dev -class _RebootEXP(BaseException): - """NOTE: use BaseException to escape normal Exception catch.""" + def get_device_tree(self, parent_dev): + assert parent_dev == self.parent_dev + # NOTE: we allow extra partitions after partition ID 3 + return [ + self.parent_dev, + "/dev/sda1", + SLOT_A_DEV, + SLOT_B_DEV, + "/dev/sda5", + ] - ... + def get_attrs_by_dev(self, _, dev, *args, **kwargs): + if dev == self.active_slot_dev: + return self.active_slot + elif dev == self.standby_slot_dev: + return self.standby_slot + raise ValueError class TestRPIBootControl: @@ -87,6 +92,9 @@ class TestRPIBootControl: @pytest.fixture def rpi_boot_ab_slot(self, tmp_path: Path, ab_slots: SlotMeta): + self.model_file = tmp_path / "model" + self.model_file.write_text(rpi_boot_cfg.RPI_MODEL_HINT) + self.slot_a_mp = Path(ab_slots.slot_a) self.slot_b_mp = Path(ab_slots.slot_b) @@ -96,11 +104,11 @@ def rpi_boot_ab_slot(self, tmp_path: Path, ab_slots: SlotMeta): ).relative_to("/") self.slot_a_ota_status_dir.mkdir(parents=True, exist_ok=True) slot_a_ota_status = self.slot_a_ota_status_dir / "status" - slot_a_ota_status.write_text("SUCCESS") + slot_a_ota_status.write_text(api_types.StatusOta.SUCCESS.name) slot_a_version = self.slot_a_ota_status_dir / "version" slot_a_version.write_text(cfg.CURRENT_VERSION) slot_a_slot_in_use = self.slot_a_ota_status_dir / "slot_in_use" - slot_a_slot_in_use.write_text(rpi_boot_cfg.SLOT_A_FSLABEL) + slot_a_slot_in_use.write_text(SLOT_A) # setup ota dir for slot_a slot_a_ota_dir = self.slot_a_mp / "boot" / "ota" slot_a_ota_dir.mkdir(parents=True, exist_ok=True) @@ -117,110 +125,145 @@ def rpi_boot_ab_slot(self, tmp_path: Path, ab_slots: SlotMeta): self.system_boot = tmp_path / "system-boot" self.system_boot.mkdir(parents=True, exist_ok=True) # NOTE: primary config.txt is for slot_a at the beginning - (self.system_boot / f"{rpi_boot_cfg.CONFIG_TXT}").write_text( - _RPIBootTestCfg.CONFIG_TXT_SLOT_A - ) + (self.system_boot / f"{_rpi_boot.CONFIG_TXT}").write_text(CONFIG_TXT_SLOT_A) # NOTE: rpi_boot controller now doesn't check the content of boot files, but only ensure the existence - ( - self.system_boot - / f"{rpi_boot_cfg.CONFIG_TXT}{_RPIBootTestCfg.SEP_CHAR}{_RPIBootTestCfg.SLOT_A}" - ).write_text(_RPIBootTestCfg.CONFIG_TXT_SLOT_A) - ( - self.system_boot - / f"{rpi_boot_cfg.CONFIG_TXT}{_RPIBootTestCfg.SEP_CHAR}{_RPIBootTestCfg.SLOT_B}" - ).write_text(_RPIBootTestCfg.CONFIG_TXT_SLOT_B) - ( - self.system_boot - / f"{rpi_boot_cfg.CMDLINE_TXT}{_RPIBootTestCfg.SEP_CHAR}{_RPIBootTestCfg.SLOT_A}" - ).write_text(_RPIBootTestCfg.CMDLINE_TXT_SLOT_A) - ( - self.system_boot - / f"{rpi_boot_cfg.CMDLINE_TXT}{_RPIBootTestCfg.SEP_CHAR}{_RPIBootTestCfg.SLOT_B}" - ).write_text(_RPIBootTestCfg.CMDLINE_TXT_SLOT_B) - ( - self.system_boot - / f"{rpi_boot_cfg.VMLINUZ}{_RPIBootTestCfg.SEP_CHAR}{_RPIBootTestCfg.SLOT_A}" - ).write_text("slot_a_vmlinux") - ( - self.system_boot - / f"{rpi_boot_cfg.INITRD_IMG}{_RPIBootTestCfg.SEP_CHAR}{_RPIBootTestCfg.SLOT_A}" - ).write_text("slot_a_initrdimg") + (self.system_boot / f"{_rpi_boot.CONFIG_TXT}{SEP_CHAR}{SLOT_A}").write_text( + CONFIG_TXT_SLOT_A + ) + (self.system_boot / f"{_rpi_boot.CONFIG_TXT}{SEP_CHAR}{SLOT_B}").write_text( + CONFIG_TXT_SLOT_B + ) + (self.system_boot / f"{_rpi_boot.CMDLINE_TXT}{SEP_CHAR}{SLOT_A}").write_text( + CMDLINE_TXT_SLOT_A + ) + (self.system_boot / f"{_rpi_boot.CMDLINE_TXT}{SEP_CHAR}{SLOT_B}").write_text( + CMDLINE_TXT_SLOT_B + ) + (self.system_boot / f"{_rpi_boot.VMLINUZ}{SEP_CHAR}{SLOT_A}").write_text( + "slot_a_vmlinux" + ) + (self.system_boot / f"{_rpi_boot.INITRD_IMG}{SEP_CHAR}{SLOT_A}").write_text( + "slot_a_initrdimg" + ) self.vmlinuz_slot_b = ( - self.system_boot - / f"{rpi_boot_cfg.VMLINUZ}{_RPIBootTestCfg.SEP_CHAR}{_RPIBootTestCfg.SLOT_B}" + self.system_boot / f"{_rpi_boot.VMLINUZ}{SEP_CHAR}{SLOT_B}" ) self.initrd_img_slot_b = ( - self.system_boot - / f"{rpi_boot_cfg.INITRD_IMG}{_RPIBootTestCfg.SEP_CHAR}{_RPIBootTestCfg.SLOT_B}" + self.system_boot / f"{_rpi_boot.INITRD_IMG}{SEP_CHAR}{SLOT_B}" ) @pytest.fixture(autouse=True) def mock_setup(self, mocker: pytest_mock.MockerFixture, rpi_boot_ab_slot): - from otaclient.app.boot_control._common import CMDHelperFuncs - from otaclient.app.boot_control._rpi_boot import _RPIBootControl - # start the test FSM - self._fsm = RPIBootABPartitionFSM() - - # mocking _RPIBootControl - _RPIBootControl.standby_slot = mocker.PropertyMock(wraps=self._fsm.get_standby_slot) # type: ignore - _RPIBootControl.active_slot = mocker.PropertyMock(wraps=self._fsm.get_active_slot) # type: ignore - _RPIBootControl.active_slot_dev = mocker.PropertyMock(wraps=self._fsm.get_active_slot_dev) # type: ignore - _RPIBootControl.standby_slot_dev = mocker.PropertyMock(wraps=self._fsm.get_standby_slot_dev) # type: ignore - _RPIBootControl._init_slots_info = mocker.Mock() - _RPIBootControl.reboot_tryboot = mocker.Mock( - side_effect=self._fsm.reboot_tryboot - ) - _RPIBootControl._update_firmware = mocker.Mock() - - # patch boot_control module - self._mocked__rpiboot_control = _RPIBootControl - mocker.patch(_RPIBootTestCfg.rpi_boot__RPIBootControl_MODULE, _RPIBootControl) + self.fsm = fsm = RPIBootABPartitionFSM() - # patch CMDHelperFuncs + # ------ patch CMDHelperFuncs ------ # # NOTE: also remember to patch CMDHelperFuncs in common - self._CMDHelper_mock = typing.cast( + self.CMDHelper_mock = CMDHelper_mock = typing.cast( CMDHelperFuncs, mocker.MagicMock(spec=CMDHelperFuncs) ) - self._CMDHelper_mock.is_target_mounted = mocker.Mock(return_value=True) - # NOTE: rpi_boot only call CMDHelperFuncs.reboot once in finalize_switch_boot method - self._CMDHelper_mock.reboot = mocker.Mock(side_effect=_RebootEXP("reboot")) + # NOTE: this is for system-boot mount check in _RPIBootControl; + CMDHelper_mock.is_target_mounted = mocker.Mock(return_value=True) + CMDHelper_mock.get_current_rootfs_dev = mocker.Mock( + wraps=fsm.get_current_rootfs_dev + ) + CMDHelper_mock.get_parent_dev = mocker.Mock(wraps=fsm.get_parent_dev) + CMDHelper_mock.get_device_tree = mocker.Mock(wraps=fsm.get_device_tree) + CMDHelper_mock.get_attrs_by_dev = mocker.Mock(wraps=fsm.get_attrs_by_dev) + + mocker.patch(rpi_boot_RPIBoot_CMDHelperFuncs_MODULE, self.CMDHelper_mock) + mocker.patch( + boot_control_common_CMDHelperFuncs_MODULE, + self.CMDHelper_mock, + ) + + # ------ patch _RPIBootControl ------ # + def _update_firmware(*, target_slot, **kwargs): + """Move the kernel and initrd images into /boot/firmware folder. + + This is done by update_firmware method. This simulates the update_firmware to slot_b. + """ + assert target_slot == SLOT_B + + _vmlinuz = self.slot_a_mp / "boot" / _rpi_boot.VMLINUZ + shutil.copy( + os.path.realpath(_vmlinuz), + self.system_boot / f"{_rpi_boot.VMLINUZ}{SEP_CHAR}{SLOT_B}", + ) + _initrd_img = self.slot_a_mp / "boot" / _rpi_boot.INITRD_IMG + shutil.copy( + os.path.realpath(_initrd_img), + self.system_boot / f"{_rpi_boot.INITRD_IMG}{SEP_CHAR}{SLOT_B}", + ) + + self.update_firmware_mock = update_firmware_mock = mocker.MagicMock( + wraps=_update_firmware + ) + mocker.patch( + f"{rpi_boot__RPIBootControl_MODULE}.update_firmware", update_firmware_mock + ) + self.reboot_tryboot_mock = reboot_tryboot_mock = mocker.MagicMock( + side_effect=fsm.reboot_tryboot + ) mocker.patch( - _RPIBootTestCfg.rpi_boot_RPIBoot_CMDHelperFuncs_MODULE, self._CMDHelper_mock + f"{rpi_boot__RPIBootControl_MODULE}.reboot_tryboot", reboot_tryboot_mock + ) + + # ------ patch slot mount helper ------ # + self.mp_control_mock = mp_control_mock = typing.cast( + SlotMountHelper, mocker.MagicMock() ) + + def _get_active_slot_mount_point(*args, **kwargs): + if fsm.active_slot == SLOT_A: + return self.slot_a_mp + elif fsm.active_slot == SLOT_B: + return self.slot_b_mp + + def _get_standby_slot_mount_point(*args, **kwargs): + if fsm.standby_slot == SLOT_A: + return self.slot_a_mp + elif fsm.standby_slot == SLOT_B: + return self.slot_b_mp + + type(mp_control_mock).active_slot_mount_point = mocker.PropertyMock( # type: ignore + wraps=_get_active_slot_mount_point + ) + type(mp_control_mock).standby_slot_mount_point = mocker.PropertyMock( # type: ignore + wraps=_get_standby_slot_mount_point + ) + mocker.patch( - _RPIBootTestCfg.boot_control_common_CMDHelperFuncs_MODULE, - self._CMDHelper_mock, + f"{RPI_BOOT_MODULE_PATH}.SlotMountHelper", + mocker.MagicMock(return_value=mp_control_mock), ) def test_rpi_boot_normal_update(self, mocker: pytest_mock.MockerFixture): from otaclient.app.boot_control._rpi_boot import RPIBootController # ------ patch rpi_boot_cfg for boot_controller_inst1.stage 1~3 ------# - _rpi_boot_cfg_path = f"{cfg.BOOT_CONTROL_CONFIG_MODULE_PATH}.rpi_boot_cfg" + rpi_boot_cfg_path = f"{cfg.BOOT_CONTROL_CONFIG_MODULE_PATH}.rpi_boot_cfg" mocker.patch( - f"{_rpi_boot_cfg_path}.SYSTEM_BOOT_MOUNT_POINT", str(self.system_boot) + f"{rpi_boot_cfg_path}.SYSTEM_BOOT_MOUNT_POINT", str(self.system_boot) ) - mocker.patch(f"{_rpi_boot_cfg_path}.ACTIVE_ROOTFS_PATH", str(self.slot_a_mp)) - mocker.patch(f"{_rpi_boot_cfg_path}.MOUNT_POINT", str(self.slot_b_mp)) + mocker.patch(f"{rpi_boot_cfg_path}.ACTIVE_ROOTFS_PATH", str(self.slot_a_mp)) + mocker.patch(f"{rpi_boot_cfg_path}.MOUNT_POINT", str(self.slot_b_mp)) mocker.patch( - f"{_rpi_boot_cfg_path}.ACTIVE_ROOT_MOUNT_POINT", str(self.slot_a_mp) + f"{rpi_boot_cfg_path}.ACTIVE_ROOT_MOUNT_POINT", str(self.slot_a_mp) ) + mocker.patch(f"{rpi_boot_cfg_path}.RPI_MODEL_FILE", str(self.model_file)) # ------ boot_controller_inst1.stage1: init ------ # rpi_boot_controller1 = RPIBootController() # ------ boot_controller_inst1.stage2: pre_update ------ # - # --- execution --- # - self.version = _RPIBootTestCfg.VERSION rpi_boot_controller1.pre_update( - version=self.version, + version=VERSION, standby_as_ref=False, erase_standby=False, ) - # --- assertions --- # - # 1. make sure the ota-status is updated properly - # 2. make sure the mount points are prepared + + # --- assertion --- # assert ( self.slot_a_ota_status_dir / "status" ).read_text() == api_types.StatusOta.FAILURE.name @@ -230,14 +273,14 @@ def test_rpi_boot_normal_update(self, mocker: pytest_mock.MockerFixture): assert ( (self.slot_a_ota_status_dir / "slot_in_use").read_text() == (self.slot_b_ota_status_dir / "slot_in_use").read_text() - == _RPIBootTestCfg.SLOT_B - ) - self._CMDHelper_mock.mount_rw.assert_called_once_with( - target=self._fsm._standby_slot_dev, mount_point=self.slot_b_mp + == SLOT_B ) - self._CMDHelper_mock.mount_ro.assert_called_once_with( - target=self._fsm._active_slot_dev, mount_point=self.slot_a_mp + self.mp_control_mock.prepare_standby_dev.assert_called_once_with( + erase_standby=mocker.ANY, + fslabel=self.fsm.standby_slot, ) + self.mp_control_mock.mount_standby.assert_called_once() + self.mp_control_mock.mount_active.assert_called_once() # ------ mocked in_update ------ # # this should be done by create_standby module, so we do it manually here instead @@ -252,82 +295,40 @@ def test_rpi_boot_normal_update(self, mocker: pytest_mock.MockerFixture): shutil.copy(os.path.realpath(_initrd_img), self.slot_b_boot_dir) # ------ boot_controller_inst1.stage3: post_update, reboot switch boot ------ # - # --- execution --- # _post_updater = rpi_boot_controller1.post_update() next(_post_updater) next(_post_updater, None) # actual reboot here - # --- assertions: --- # - # 1. make sure that retry boot is called - # 2. make sure that fstab file is updated for slot_b - # 3. assert kernel and initrd.img are copied to system-boot - # 4. make sure tryboot.txt is presented and correct - # 5. make sure config.txt is untouched - self._mocked__rpiboot_control.reboot_tryboot.assert_called_once() - assert self._fsm.is_switched_boot + + # --- assertion --- # + self.reboot_tryboot_mock.assert_called_once() + self.update_firmware_mock.assert_called_once() + assert self.fsm.is_switched_boot assert ( self.slot_b_mp / Path(rpi_boot_cfg.FSTAB_FPATH).relative_to("/") - ).read_text() == Template(_FSTAB_TEMPLATE_STR).substitute( - rootfs_fslabel=_RPIBootTestCfg.SLOT_B + ).read_text() == Template(_rpi_boot._FSTAB_TEMPLATE_STR).substitute( + rootfs_fslabel=SLOT_B ) assert self.initrd_img_slot_b.is_file() assert self.vmlinuz_slot_b.is_file() - assert ( - self.system_boot / "tryboot.txt" - ).read_text() == _RPIBootTestCfg.CONFIG_TXT_SLOT_B - assert ( - self.system_boot / "config.txt" - ).read_text() == _RPIBootTestCfg.CONFIG_TXT_SLOT_A + assert (self.system_boot / "tryboot.txt").read_text() == CONFIG_TXT_SLOT_B + assert (self.system_boot / "config.txt").read_text() == CONFIG_TXT_SLOT_A + # NOTE: backward compatibility with old double stage update strategy + assert not (self.system_boot / rpi_boot_cfg.SWITCH_BOOT_FLAG_FILE).is_file() # ------ boot_controller_inst2: first reboot ------ # - # patch rpi_boot_cfg for boot_controller_inst2 - _rpi_boot_cfg_path = f"{cfg.BOOT_CONTROL_CONFIG_MODULE_PATH}.rpi_boot_cfg" - mocker.patch(f"{_rpi_boot_cfg_path}.ACTIVE_ROOTFS_PATH", str(self.slot_b_mp)) - mocker.patch(f"{_rpi_boot_cfg_path}.MOUNT_POINT", str(self.slot_a_mp)) + mocker.patch(f"{rpi_boot_cfg_path}.ACTIVE_ROOTFS_PATH", str(self.slot_b_mp)) + mocker.patch(f"{rpi_boot_cfg_path}.MOUNT_POINT", str(self.slot_a_mp)) - # ------ boot_controller_inst2.stage1: first reboot finalizing switch boot and update firmware ------ # + # ------ boot_controller_inst2.stage1: finalize switchboot ------ # logger.info("1st reboot: finalize switch boot and update firmware....") - # --- execution --- # - # NOTE: raise a _RebootEXP to simulate reboot and interrupt the otaclient - with pytest.raises(_RebootEXP): - RPIBootController() # NOTE: init only - # --- assertions: --- # - # 1. assert that otaclient reboots the device - # 2. assert firmware update is called - # 3. assert reboot is called - # 4. assert switch boot finalized - # 5. assert slot_in_use is slot_b - # 6. make sure the SWITCH_BOOT_FLAG_FILE file is created - # 7. make sure ota_status is still UPDATING - self._mocked__rpiboot_control._update_firmware.assert_called_once() - self._CMDHelper_mock.reboot.assert_called_once() - assert ( - self.system_boot / "config.txt" - ).read_text() == _RPIBootTestCfg.CONFIG_TXT_SLOT_B + RPIBootController() + + # --- assertion --- # + assert (self.system_boot / "config.txt").read_text() == CONFIG_TXT_SLOT_B assert ( self.slot_b_ota_status_dir / rpi_boot_cfg.SLOT_IN_USE_FNAME ).read_text() == "slot_b" - assert (self.system_boot / rpi_boot_cfg.SWITCH_BOOT_FLAG_FILE).is_file() - assert ( - self.slot_b_ota_status_dir / rpi_boot_cfg.OTA_STATUS_FNAME - ).read_text() == api_types.StatusOta.UPDATING.name - - # ------ boot_controller_inst3.stage1: second reboot, apply updated firmware and finish up ota update ------ # - logger.info("2nd reboot: finish up ota update....") - # --- execution --- # - rpi_boot_controller4_2 = RPIBootController() - # --- assertions: --- # - # 1. make sure ota_status is SUCCESS - # 2. make sure the flag file is cleared - # 3. make sure the config.txt is still for slot_b - assert ( - rpi_boot_controller4_2.get_booted_ota_status() - == api_types.StatusOta.SUCCESS - ) + assert not (self.system_boot / rpi_boot_cfg.SWITCH_BOOT_FLAG_FILE).is_file() assert ( self.slot_b_ota_status_dir / rpi_boot_cfg.OTA_STATUS_FNAME ).read_text() == api_types.StatusOta.SUCCESS.name - assert not (self.system_boot / rpi_boot_cfg.SWITCH_BOOT_FLAG_FILE).is_file() - assert ( - rpi_boot_controller4_2._ota_status_control._load_current_slot_in_use() - == "slot_b" - )