diff --git a/src/otaclient/app/boot_control/_common.py b/src/otaclient/app/boot_control/_common.py index 2f935e5a4..52b9ca2f2 100644 --- a/src/otaclient/app/boot_control/_common.py +++ b/src/otaclient/app/boot_control/_common.py @@ -32,6 +32,7 @@ subprocess_check_output, write_str_to_file_sync, ) +from otaclient_common.typing import StrOrPath logger = logging.getLogger(__name__) @@ -194,6 +195,30 @@ def get_parent_dev(cls, child_device: str, *, raise_exception: bool = True) -> s cmd = ["lsblk", "-idpno", "PKNAME", child_device] return subprocess_check_output(cmd, raise_exception=raise_exception) + @classmethod + def get_device_tree( + cls, parent_dev: str, *, raise_exception: bool = True + ) -> list[str]: + """Get the device tree of a parent device. + + For example, for sda with 3 partitions, we will get: + ["/dev/sda", "/dev/sda1", "/dev/sda2", "/dev/sda3"] + + This function is implemented by calling: + lsblk -lnpo NAME + + Args: + parent_dev (str): The parent device to be checked. + raise_exception (bool, optional): raise exception on subprocess call failed. + Defaults to True. + + Returns: + str: _description_ + """ + cmd = ["lsblk", "-lnpo", "NAME", parent_dev] + raw_res = subprocess_check_output(cmd, raise_exception=raise_exception) + return raw_res.splitlines() + @classmethod def set_ext4_fslabel(cls, dev: str, fslabel: str, *, raise_exception: bool = True): """Set to ext4 formatted . @@ -210,6 +235,36 @@ def set_ext4_fslabel(cls, dev: str, fslabel: str, *, raise_exception: bool = Tru cmd = ["e2label", dev, fslabel] subprocess_call(cmd, raise_exception=raise_exception) + @classmethod + def mount( + cls, + target: StrOrPath, + mount_point: StrOrPath, + *, + options: list[str] | None = None, + params: list[str] | None = None, + raise_exception: bool = True, + ) -> None: + """Thin wrapper to call mount using subprocess. + + This will call the following: + mount [-o ,[[,...]] [ [[...]]] + + Args: + target (StrOrPath): The target device to mount. + mount_point (StrOrPath): The mount point to mount to. + options (list[str] | None, optional): A list of options, append after -o. Defaults to None. + params (list[str] | None, optional): A list of params. Defaults to None. + raise_exception (bool, optional): Whether to raise exception on failed call. Defaults to True. + """ + cmd = ["mount"] + if options: + cmd.extend(["-o", ",".join(options)]) + if params: + cmd.extend(params) + cmd = [*cmd, str(target), str(mount_point)] + subprocess_call(cmd, raise_exception=raise_exception) + @classmethod def mount_rw( cls, target: str, mount_point: Path | str, *, raise_exception: bool = True @@ -724,7 +779,7 @@ def prepare_standby_dev( # TODO: in the future if in-place update mode is implemented, do a # fschck over the standby slot file system. if fslabel: - CMDHelperFuncs.set_ext4_fslabel(self.active_slot_dev, fslabel=fslabel) + CMDHelperFuncs.set_ext4_fslabel(self.standby_slot_dev, fslabel=fslabel) def umount_all(self, *, ignore_error: bool = True): logger.debug("unmount standby slot and active slot mount point...") diff --git a/src/otaclient/app/boot_control/_rpi_boot.py b/src/otaclient/app/boot_control/_rpi_boot.py index 330d7022b..19b980b46 100644 --- a/src/otaclient/app/boot_control/_rpi_boot.py +++ b/src/otaclient/app/boot_control/_rpi_boot.py @@ -16,12 +16,15 @@ from __future__ import annotations +import contextlib import logging import os -import re +import subprocess from pathlib import Path from string import Template -from typing import Generator +from typing import Any, Generator, Literal + +from typing_extensions import Self import otaclient.app.errors as ota_errors from otaclient.app.boot_control._common import ( @@ -33,296 +36,361 @@ from otaclient.app.boot_control.configs import rpi_boot_cfg as cfg from otaclient.app.boot_control.protocol import BootControllerProtocol from otaclient_api.v2 import types as api_types -from otaclient_common.common import ( - replace_atomic, - subprocess_call, - subprocess_check_output, -) +from otaclient_common.common import replace_atomic +from otaclient_common.linux import subprocess_run_wrapper +from otaclient_common.typing import StrOrPath logger = logging.getLogger(__name__) + +# ------ types ------ # +class SlotID(str): + """slot_id for A/B slots.""" + + VALID_SLOTS = ["slot_a", "slot_b"] + + def __new__(cls, _in: str | Self) -> Self: + if isinstance(_in, cls): + return _in + if _in in cls.VALID_SLOTS: + return str.__new__(cls, _in) + raise ValueError(f"{_in=} is not valid slot num, should be {cls.VALID_SLOTS=}") + + +class _RPIBootControllerError(Exception): + """rpi_boot module internal used exception.""" + + +# ------ consts ------ # +CONFIG_TXT = "config.txt" # primary boot cfg +TRYBOOT_TXT = "tryboot.txt" # tryboot boot cfg +VMLINUZ = "vmlinuz" +INITRD_IMG = "initrd.img" +CMDLINE_TXT = "cmdline.txt" + +SYSTEM_BOOT_FSLABEL = "system-boot" +SLOT_A = SlotID("slot_a") +SLOT_B = SlotID("slot_b") +AB_FLIPS = {SLOT_A: SLOT_B, SLOT_B: SLOT_A} +SEP_CHAR = "_" +"""separator between boot files name and slot suffix.""" + _FSTAB_TEMPLATE_STR = ( "LABEL=${rootfs_fslabel}\t/\text4\tdiscard,x-systemd.growfs\t0\t1\n" "LABEL=system-boot\t/boot/firmware\tvfat\tdefaults\t0\t1\n" ) -class _RPIBootControllerError(Exception): - """rpi_boot module internal used exception.""" +# ------ helper functions ------ # +BOOTFILES = Literal["vmlinuz", "initrd.img", "config.txt", "tryboot.txt", "cmdline.txt"] + + +def get_sysboot_files_fpath(boot_fname: BOOTFILES, slot: SlotID) -> Path: + """Get the boot files fpath for specific slot from /boot/firmware. + + For example, for vmlinuz for slot_a, we get /boot/firmware/vmlinuz_slot_a + """ + fname = f"{boot_fname}{SEP_CHAR}{slot}" + return Path(cfg.SYSTEM_BOOT_MOUNT_POINT) / fname class _RPIBootControl: """Boot control helper for rpi4 support. - Expected partition layout: - /dev/sda: - - sda1: fat32, fslabel=systemb-boot - - sda2: ext4, fslabel=slot_a - - sda3: ext4, fslabel=slot_b - slot is the fslabel for each AB rootfs. - - This class provides the following features: - 1. AB partition detection, - 2. boot files checking, - 3. switch boot(tryboot.txt setup and tryboot reboot), - 4. finalize switching boot. + Supported partition layout: + /dev/sd: + - sd1: fat32, fslabel=systemb-boot + - sd2: ext4, fslabel=slot_a + - sd3: ext4, fslabel=slot_b + slot_id is also the fslabel for each AB rootfs. + NOTE that we allow extra partitions with ID after 3. Boot files for each slot have the following naming format: - _ - i.e., config.txt for slot_a will be config.txt_slot_a + + For example, config.txt for slot_a will be config.txt_slot_a """ - SLOT_A = cfg.SLOT_A_FSLABEL - SLOT_B = cfg.SLOT_B_FSLABEL - AB_FLIPS = { - SLOT_A: SLOT_B, - SLOT_B: SLOT_A, - } - SEP_CHAR = "_" - def __init__(self) -> None: - self.system_boot_path = Path(cfg.SYSTEM_BOOT_MOUNT_POINT) - if not CMDHelperFuncs.is_target_mounted( - self.system_boot_path, raise_exception=False - ): - _err_msg = "system-boot is not presented or not mounted!" + self.system_boot_mp = Path(cfg.SYSTEM_BOOT_MOUNT_POINT) + self.system_boot_mp.mkdir(exist_ok=True) + + # sanity check, ensure we are running at raspberry pi device + model_fpath = Path(cfg.RPI_MODEL_FILE) + err_not_rpi_device = f"{cfg.RPI_MODEL_FILE} doesn't exist! Are we running at raspberry pi device?" + if not model_fpath.is_file(): + logger.error(err_not_rpi_device) + raise _RPIBootControllerError(err_not_rpi_device) + + model_info = model_fpath.read_text() + if model_info.find("Pi") == -1: + raise _RPIBootControllerError(err_not_rpi_device) + logger.info(f"{model_info=}") + + try: + # ------ detect active slot ------ # + active_slot_dev = CMDHelperFuncs.get_current_rootfs_dev() + assert active_slot_dev + self.active_slot_dev = active_slot_dev + except Exception as e: + _err_msg = f"failed to detect current rootfs device: {e!r}" logger.error(_err_msg) - raise ValueError(_err_msg) - self._init_slots_info() - self._init_boot_files() + raise _RPIBootControllerError(_err_msg) from e - def _init_slots_info(self): - """Get current/standby slots info.""" - logger.debug("checking and initializing slots info...") try: - # detect active slot - _active_slot_dev = CMDHelperFuncs.get_current_rootfs_dev() - assert _active_slot_dev - self._active_slot_dev = _active_slot_dev + # detect the parent device of boot device + # i.e., for /dev/sda2 here we get /dev/sda + parent_dev = CMDHelperFuncs.get_parent_dev(str(self.active_slot_dev)) - _active_slot = CMDHelperFuncs.get_attrs_by_dev( - "LABEL", str(self._active_slot_dev) - ) - assert _active_slot - self._active_slot = _active_slot - - # detect standby slot - # NOTE: using the similar logic like grub, detect the silibing dev - # of the active slot as standby slot - _parent = CMDHelperFuncs.get_parent_dev(str(self._active_slot_dev)) - assert _parent - - # list children device file from parent device - # exclude parent dev(always in the front) - # expected raw result from lsblk: - # NAME="/dev/sdx" - # NAME="/dev/sdx1" # system-boot - # NAME="/dev/sdx2" # slot_a - # NAME="/dev/sdx3" # slot_b - _check_dev_family_cmd = ["lsblk", "-Ppo", "NAME", _parent] - _raw_child_partitions = subprocess_check_output( - _check_dev_family_cmd, raise_exception=True - ) + # get device tree, for /dev/sda device, we will get: + # ["/dev/sda", "/dev/sda1", "/dev/sda2", "/dev/sda3"] + device_tree = CMDHelperFuncs.get_device_tree(parent_dev) + logger.info(device_tree) - try: - # NOTE: exclude the first 2 lines(parent and system-boot) - _child_partitions = [ - raw.split("=")[-1].strip('"') - for raw in _raw_child_partitions.splitlines()[2:] - ] - if ( - len(_child_partitions) != 2 - or self._active_slot_dev not in _child_partitions - ): - raise ValueError - _child_partitions.remove(self._active_slot_dev) - except Exception: - raise ValueError( - f"unexpected partition layout: {_raw_child_partitions}" - ) from None - # it is OK if standby_slot dev doesn't have fslabel or fslabel != standby_slot_id - # we will always set the fslabel - self._standby_slot = self.AB_FLIPS[self._active_slot] - self._standby_slot_dev = _child_partitions[0] - logger.info( - f"rpi_boot: active_slot: {self._active_slot}({self._active_slot_dev}), " - f"standby_slot: {self._standby_slot}({self._standby_slot_dev})" - ) + # NOTE that we allow extra partitions presented after sd3. + assert len(device_tree) >= 4, "need at least 3 partitions" except Exception as e: - _err_msg = f"failed to detect AB partition: {e!r}" + _err_msg = f"failed to detect partition layout: {e!r}" logger.error(_err_msg) - raise _RPIBootControllerError(_err_msg) from e + raise _RPIBootControllerError(_err_msg) + + # check system-boot partition mount + system_boot_partition = device_tree[1] + if not CMDHelperFuncs.is_target_mounted( + self.system_boot_mp, raise_exception=False + ): + _err_msg = f"system-boot is not mounted at {self.system_boot_mp}, try to mount it..." + logger.warning(_err_msg) + + try: + CMDHelperFuncs.mount( + system_boot_partition, + self.system_boot_mp, + options=["defaults"], + ) + except subprocess.CalledProcessError as e: + _err_msg = ( + f"failed to mount system-boot partition: {e!r}, {e.stderr.decode()}" + ) + logger.error(_err_msg) + raise _RPIBootControllerError(_err_msg) from e + + # check slots + # sd2 and sd3 + rootfs_partitions = device_tree[2:4] + + # get the active slot ID by its position in the disk + try: + idx = rootfs_partitions.index(active_slot_dev) + except ValueError: + raise _RPIBootControllerError( + f"active slot dev not found: {active_slot_dev=}, {rootfs_partitions=}" + ) + + if idx == 0: # slot_a + self.active_slot = SLOT_A + self.standby_slot = SLOT_B + self.standby_slot_dev = rootfs_partitions[1] + elif idx == 1: # slot_b + self.active_slot = SLOT_B + self.standby_slot = SLOT_A + self.standby_slot_dev = rootfs_partitions[0] + + logger.info( + f"rpi_boot: active_slot: {self.active_slot}({self.active_slot_dev}), " + f"standby_slot: {self.standby_slot}({self.standby_slot_dev})" + ) + + # ------ continue rpi_boot starts up ------ # + self._check_boot_files() + self._check_active_slot_id() + + # NOTE(20240604): for backward compatibility, always remove flag file + flag_file = Path(cfg.SYSTEM_BOOT_MOUNT_POINT) / cfg.SWITCH_BOOT_FLAG_FILE + flag_file.unlink(missing_ok=True) + + def _check_active_slot_id(self): + """Check whether the active slot fslabel is matching the slot id. + + If mismatched, try to correct the problem. + """ + fslabel = self.active_slot + actual_fslabel = CMDHelperFuncs.get_attrs_by_dev( + "LABEL", self.active_slot_dev, raise_exception=False + ) + if actual_fslabel == fslabel: + return + + logger.warning( + ( + f"current active slot is {fslabel}, but its fslabel is {actual_fslabel}, " + f"try to correct the fslabel with slot id {fslabel}..." + ) + ) + try: + CMDHelperFuncs.set_ext4_fslabel(self.active_slot_dev, fslabel) + os.sync() + except subprocess.CalledProcessError as e: + logger.error( + f"failed to correct the fslabel mismatched: {e!r}, {e.stderr.decode()}" + ) + logger.error("this might cause problem on future OTA update!") - def _init_boot_files(self): + def _check_boot_files(self): """Check the availability of boot files. The following boot files will be checked: 1. config.txt_ (required) 2. cmdline.txt_ (required) - 3. vmlinuz_ - 4. initrd.img_ If any of the required files are missing, BootControlInitError will be raised. In such case, a reinstall/setup of AB partition boot files is required. """ logger.debug("checking boot files...") - # boot file - self.config_txt = self.system_boot_path / cfg.CONFIG_TXT - self.tryboot_txt = self.system_boot_path / cfg.TRYBOOT_TXT + active_slot, standby_slot = self.active_slot, self.standby_slot - # active slot - self.config_txt_active_slot = ( - self.system_boot_path / f"{cfg.CONFIG_TXT}{self.SEP_CHAR}{self.active_slot}" - ) - if not self.config_txt_active_slot.is_file(): - _err_msg = f"missing {self.config_txt_active_slot=}" + # ------ check active slot boot files ------ # + config_txt_active_slot = get_sysboot_files_fpath(CONFIG_TXT, active_slot) + if not config_txt_active_slot.is_file(): + _err_msg = f"missing {config_txt_active_slot=}" logger.error(_err_msg) raise _RPIBootControllerError(_err_msg) - self.cmdline_txt_active_slot = ( - self.system_boot_path - / f"{cfg.CMDLINE_TXT}{self.SEP_CHAR}{self.active_slot}" - ) - if not self.cmdline_txt_active_slot.is_file(): - _err_msg = f"missing {self.cmdline_txt_active_slot=}" + + cmdline_txt_active_slot = get_sysboot_files_fpath(CMDLINE_TXT, active_slot) + if not cmdline_txt_active_slot.is_file(): + _err_msg = f"missing {cmdline_txt_active_slot=}" logger.error(_err_msg) raise _RPIBootControllerError(_err_msg) - self.vmlinuz_active_slot = ( - self.system_boot_path / f"{cfg.VMLINUZ}{self.SEP_CHAR}{self.active_slot}" - ) - self.initrd_img_active_slot = ( - self.system_boot_path / f"{cfg.INITRD_IMG}{self.SEP_CHAR}{self.active_slot}" - ) - # standby slot - self.config_txt_standby_slot = ( - self.system_boot_path - / f"{cfg.CONFIG_TXT}{self.SEP_CHAR}{self.standby_slot}" - ) - if not self.config_txt_standby_slot.is_file(): - _err_msg = f"missing {self.config_txt_standby_slot=}" + + # ------ check standby slot boot files ------ # + config_txt_standby_slot = get_sysboot_files_fpath(CONFIG_TXT, standby_slot) + if not config_txt_standby_slot.is_file(): + _err_msg = f"missing {config_txt_standby_slot=}" logger.error(_err_msg) raise _RPIBootControllerError(_err_msg) - self.cmdline_txt_standby_slot = ( - self.system_boot_path - / f"{cfg.CMDLINE_TXT}{self.SEP_CHAR}{self.standby_slot}" - ) - if not self.cmdline_txt_standby_slot.is_file(): - _err_msg = f"missing {self.cmdline_txt_standby_slot=}" + + cmdline_txt_standby_slot = get_sysboot_files_fpath(CMDLINE_TXT, standby_slot) + if not cmdline_txt_standby_slot.is_file(): + _err_msg = f"missing {cmdline_txt_standby_slot=}" logger.error(_err_msg) raise _RPIBootControllerError(_err_msg) - self.vmlinuz_standby_slot = ( - self.system_boot_path / f"{cfg.VMLINUZ}{self.SEP_CHAR}{self.standby_slot}" - ) - self.initrd_img_standby_slot = ( - self.system_boot_path - / f"{cfg.INITRD_IMG}{self.SEP_CHAR}{self.standby_slot}" + + @staticmethod + @contextlib.contextmanager + def _prepare_flash_kernel(target_slot_mp: StrOrPath) -> Generator[None, Any, None]: + """Do a bind mount of /boot/firmware, /proc and /sys to the standby slot, + preparing for calling flash-kernel with chroot. + + flash-kernel requires at least these mounts to work properly. + """ + target_slot_mp = Path(target_slot_mp) + mounts: dict[str, str] = {} + + # we need to mount /proc, /sys and /boot/firmware to make flash-kernel works + system_boot_mp = target_slot_mp / Path(cfg.SYSTEM_BOOT_MOUNT_POINT).relative_to( + "/" ) + mounts[str(system_boot_mp)] = cfg.SYSTEM_BOOT_MOUNT_POINT - def _update_firmware(self): + proc_mp = target_slot_mp / "proc" + mounts[str(proc_mp)] = "/proc" + + sys_mp = target_slot_mp / "sys" + mounts[str(sys_mp)] = "/sys" + + try: + for _mp, _src in mounts.items(): + CMDHelperFuncs.mount( + _src, + _mp, + options=["bind"], + params=["--make-unbindable"], + ) + yield + # NOTE: passthrough the mount failure to caller + finally: + for _mp in mounts: + CMDHelperFuncs.umount(_mp, raise_exception=False) + + def update_firmware(self, *, target_slot: SlotID, target_slot_mp: StrOrPath): """Call flash-kernel to install new dtb files, boot firmwares and kernel, initrd.img - from current rootfs to system-boot partition. + from target slot. + + The following things will be done: + 1. bind mount the /boot/firmware and /proc into the target slot. + 2. chroot into the target slot's rootfs, execute flash-kernel """ - logger.info("update firmware with flash-kernel...") + logger.info(f"try to flash-kernel from {target_slot}...") try: - subprocess_call("flash-kernel", raise_exception=True) - os.sync() - except Exception as e: - _err_msg = f"flash-kernel failed: {e!r}" + with self._prepare_flash_kernel(target_slot_mp): + subprocess_run_wrapper( + ["/usr/sbin/flash-kernel"], + check=True, + check_output=True, + chroot=target_slot_mp, + # must set this env variable to make flash-kernel work under chroot + env={"FK_FORCE": "yes"}, + ) + os.sync() + except subprocess.CalledProcessError as e: + _err_msg = f"flash-kernel failed: {e!r}\nstderr: {e.stderr.decode()}\nstdout: {e.stdout.decode()}" logger.error(_err_msg) raise _RPIBootControllerError(_err_msg) try: - # check if the vmlinuz and initrd.img presented in /boot/firmware(system-boot), - # if so, it means that flash-kernel works and copies the kernel, inird.img from /boot, - # then we rename vmlinuz and initrd.img to vmlinuz_ and initrd.img_ - if (_vmlinuz := Path(cfg.SYSTEM_BOOT_MOUNT_POINT) / cfg.VMLINUZ).is_file(): - os.replace(_vmlinuz, self.vmlinuz_active_slot) - if ( - _initrd_img := Path(cfg.SYSTEM_BOOT_MOUNT_POINT) / cfg.INITRD_IMG - ).is_file(): - os.replace(_initrd_img, self.initrd_img_active_slot) + # flash-kernel will install the kernel and initrd.img files from /boot to /boot/firmware + if (vmlinuz := self.system_boot_mp / VMLINUZ).is_file(): + os.replace( + vmlinuz, + get_sysboot_files_fpath(VMLINUZ, target_slot), + ) + + if (initrd_img := self.system_boot_mp / INITRD_IMG).is_file(): + os.replace( + initrd_img, + get_sysboot_files_fpath(INITRD_IMG, target_slot), + ) + + # NOTE(20240603): for backward compatibility(downgrade), still create the flag file. + # The present of flag files means the firmware is updated. + flag_file = Path(cfg.SYSTEM_BOOT_MOUNT_POINT) / cfg.SWITCH_BOOT_FLAG_FILE + flag_file.write_text("") os.sync() except Exception as e: - _err_msg = ( - f"apply new kernel,initrd.img for {self.active_slot} failed: {e!r}" - ) + _err_msg = f"failed to apply new kernel,initrd.img for {target_slot}: {e!r}" logger.error(_err_msg) raise _RPIBootControllerError(_err_msg) # exposed API methods/properties - @property - def active_slot(self) -> str: - return self._active_slot - - @property - def standby_slot(self) -> str: - return self._standby_slot - - @property - def standby_slot_dev(self) -> str: - return self._standby_slot_dev - - @property - def active_slot_dev(self) -> str: - return self._active_slot_dev def finalize_switching_boot(self) -> bool: """Finalize switching boot by swapping config.txt and tryboot.txt if we should. Finalize switch boot: - 1. atomically replace tryboot.txt with tryboot.txt_standby_slot - 2. atomically replace config.txt with config.txt_active_slot - - Two-stage reboot: - In the first reboot after ota update applied, finalize_switching_boot will try - to update the firmware by calling external flash-kernel system script, finalize - the switch boot and then reboot again to apply the new firmware. - In the second reboot, finalize_switching_boot will do nothing but just return True. - - NOTE: we use a SWITCH_BOOT_FLAG_FILE to distinguish which reboot we are right now. - If SWITCH_BOOT_FLAG_FILE presented, we know that we are second reboot, - Otherwise, we know that we are at first reboot after switching boot. + 1. atomically replace tryboot.txt with tryboot.txt_ + 2. atomically replace config.txt with config.txt_ Returns: A bool indicates whether the switch boot succeeded or not. Note that no exception will be raised if finalizing failed. """ logger.info("finalizing switch boot...") - try: - _flag_file = self.system_boot_path / cfg.SWITCH_BOOT_FLAG_FILE - if _flag_file.is_file(): - # we are just after second reboot, after firmware_update, - # finalize the switch boot and then return True - logger.info("[rpi-boot]: just after 2nd reboot, finish up OTA") - - # cleanup bak files generated by flash-kernel script as - # we actually don't use those files - for _bak_file in self.system_boot_path.glob("**/*.bak"): - _bak_file.unlink(missing_ok=True) - _flag_file.unlink(missing_ok=True) - os.sync() - return True + current_slot, standby_slot = self.active_slot, self.standby_slot + config_txt_current = get_sysboot_files_fpath(CONFIG_TXT, current_slot) + config_txt_standby = get_sysboot_files_fpath(CONFIG_TXT, standby_slot) - else: - # we are just after first reboot after ota update applied, - # finalize the switch boot, install the new firmware and then reboot again - logger.info( - "[rpi-boot]: just after 1st reboot, update firmware and finalizing boot files" - ) - - replace_atomic(self.config_txt_active_slot, self.config_txt) - replace_atomic(self.config_txt_standby_slot, self.tryboot_txt) - logger.info( - "finalizing boot configuration," - f"replace {self.config_txt=} with {self.config_txt_active_slot=}, " - f"replace {self.tryboot_txt=} with {self.config_txt_standby_slot=}" - ) - self._update_firmware() - # set the flag file - write_str_to_file_sync(_flag_file, "") - # reboot to the same slot to apply the new firmware + try: + replace_atomic(config_txt_current, self.system_boot_mp / CONFIG_TXT) + replace_atomic(config_txt_standby, self.system_boot_mp / TRYBOOT_TXT) + logger.info( + "finalizing boot configuration," + f"replace {CONFIG_TXT} with {config_txt_current=}, " + f"replace {TRYBOOT_TXT} with {config_txt_standby=}" + ) - logger.info("reboot to apply new firmware...") - CMDHelperFuncs.reboot() # this function will make otaclient exit immediately + # on success switching boot, cleanup the bak files created by flash-kernel + for _bak_file in self.system_boot_mp.glob("**/*.bak"): + _bak_file.unlink(missing_ok=True) + return True except Exception as e: _err_msg = f"failed to finalize boot switching: {e!r}" logger.error(_err_msg) @@ -330,14 +398,14 @@ def finalize_switching_boot(self) -> bool: def prepare_tryboot_txt(self): """Copy the standby slot's config.txt as tryboot.txt.""" - logger.debug("prepare tryboot.txt...") try: - replace_atomic(self.config_txt_standby_slot, self.tryboot_txt) - logger.info( - f"replace {self.tryboot_txt=} with {self.config_txt_standby_slot=}" + replace_atomic( + get_sysboot_files_fpath(CONFIG_TXT, self.standby_slot), + self.system_boot_mp / TRYBOOT_TXT, ) + logger.info(f"set {TRYBOOT_TXT} as {self.standby_slot}'s one") except Exception as e: - _err_msg = f"failed to prepare tryboot.txt for {self._standby_slot}" + _err_msg = f"failed to prepare tryboot.txt for {self.standby_slot}" logger.error(_err_msg) raise _RPIBootControllerError(_err_msg) from e @@ -345,6 +413,7 @@ def reboot_tryboot(self): """Reboot with tryboot flag.""" logger.info(f"tryboot reboot to standby slot({self.standby_slot})...") try: + # NOTE: "0 tryboot" is a single param. CMDHelperFuncs.reboot(args=["0 tryboot"]) except Exception as e: _err_msg = "failed to reboot" @@ -376,69 +445,12 @@ def __init__(self) -> None: / Path(cfg.OTA_STATUS_DIR).relative_to("/"), finalize_switching_boot=self._rpiboot_control.finalize_switching_boot, ) - - # 20230613: remove any leftover flag file if ota_status is not UPDATING/ROLLBACKING - if self._ota_status_control.booted_ota_status not in ( - api_types.StatusOta.UPDATING, - api_types.StatusOta.ROLLBACKING, - ): - _flag_file = ( - self._rpiboot_control.system_boot_path / cfg.SWITCH_BOOT_FLAG_FILE - ) - _flag_file.unlink(missing_ok=True) - - logger.debug("rpi_boot initialization finished") + logger.info("rpi_boot starting finished") except Exception as e: _err_msg = f"failed to start rpi boot controller: {e!r}" logger.error(_err_msg) raise ota_errors.BootControlStartupFailed(_err_msg, module=__name__) from e - def _copy_kernel_for_standby_slot(self): - """Copy the kernel and initrd_img files from current slot /boot - to system-boot for standby slot. - - This method will checkout the vmlinuz- and initrd.img- - under /boot, and copy them to /boot/firmware(system-boot partition) under the name - vmlinuz_ and initrd.img_. - """ - logger.debug( - "prepare standby slot's kernel/initrd.img to system-boot partition..." - ) - try: - # search for kernel - _kernel_pa, _kernel_ver = ( - re.compile(rf"{cfg.VMLINUZ}-(?P.*)"), - None, - ) - # NOTE: if there is multiple kernel, pick the first one we encounted - # NOTE 2: according to ota-image specification, it should only be one - # version of kernel and initrd.img - for _candidate in self._mp_control.standby_boot_dir.glob( - f"{cfg.VMLINUZ}-*" - ): - if _ma := _kernel_pa.match(_candidate.name): - _kernel_ver = _ma.group("kernel_ver") - break - - if _kernel_ver is not None: - _kernel, _initrd_img = ( - self._mp_control.standby_boot_dir / f"{cfg.VMLINUZ}-{_kernel_ver}", - self._mp_control.standby_boot_dir - / f"{cfg.INITRD_IMG}-{_kernel_ver}", - ) - _kernel_sysboot, _initrd_img_sysboot = ( - self._rpiboot_control.vmlinuz_standby_slot, - self._rpiboot_control.initrd_img_standby_slot, - ) - replace_atomic(_kernel, _kernel_sysboot) - replace_atomic(_initrd_img, _initrd_img_sysboot) - else: - raise ValueError("failed to kernel in /boot folder at standby slot") - except Exception as e: - _err_msg = "failed to copy kernel/initrd_img for standby slot" - logger.error(_err_msg) - raise _RPIBootControllerError(f"{e!r}") - def _write_standby_fstab(self): """Override the standby's fstab file. @@ -485,12 +497,6 @@ def pre_update(self, version: str, *, standby_as_ref: bool, erase_standby: bool) ### update standby slot's ota_status files ### self._ota_status_control.pre_update_standby(version=version) - - # 20230613: remove any leftover flag file if presented - _flag_file = ( - self._rpiboot_control.system_boot_path / cfg.SWITCH_BOOT_FLAG_FILE - ) - _flag_file.unlink(missing_ok=True) except Exception as e: _err_msg = f"failed on pre_update: {e!r}" logger.error(_err_msg) @@ -527,9 +533,12 @@ def post_rollback(self): def post_update(self) -> Generator[None, None, None]: try: logger.info("rpi_boot: post-update setup...") - self._copy_kernel_for_standby_slot() self._mp_control.preserve_ota_folder_to_standby() self._write_standby_fstab() + self._rpiboot_control.update_firmware( + target_slot=self._rpiboot_control.standby_slot, + target_slot_mp=self._mp_control.standby_slot_mount_point, + ) self._rpiboot_control.prepare_tryboot_txt() self._mp_control.umount_all(ignore_error=True) yield # hand over control back to otaclient diff --git a/src/otaclient/app/boot_control/configs.py b/src/otaclient/app/boot_control/configs.py index 4369990bd..53a6e7c6c 100644 --- a/src/otaclient/app/boot_control/configs.py +++ b/src/otaclient/app/boot_control/configs.py @@ -67,21 +67,9 @@ class RPIBootControlConfig(BaseConfig): RPI_MODEL_FILE = "/proc/device-tree/model" RPI_MODEL_HINT = "Raspberry Pi 4 Model B" - # slot configuration - SLOT_A_FSLABEL = "slot_a" - SLOT_B_FSLABEL = "slot_b" - SYSTEM_BOOT_FSLABEL = "system-boot" - # boot folders SYSTEM_BOOT_MOUNT_POINT = "/boot/firmware" OTA_STATUS_DIR = "/boot/ota-status" - - # boot related files - CONFIG_TXT = "config.txt" # primary boot cfg - TRYBOOT_TXT = "tryboot.txt" # tryboot boot cfg - VMLINUZ = "vmlinuz" - INITRD_IMG = "initrd.img" - CMDLINE_TXT = "cmdline.txt" SWITCH_BOOT_FLAG_FILE = "._ota_switch_boot_finalized" diff --git a/src/otaclient_common/common.py b/src/otaclient_common/common.py index 10433bc2f..49faf21c8 100644 --- a/src/otaclient_common/common.py +++ b/src/otaclient_common/common.py @@ -22,7 +22,6 @@ import logging import os -import shlex import shutil import subprocess import time @@ -33,6 +32,8 @@ import requests +from otaclient_common.linux import subprocess_run_wrapper + logger = logging.getLogger(__name__) @@ -101,39 +102,6 @@ def write_str_to_file_sync(path: Union[Path, str], input: str): os.fsync(f.fileno()) -def subprocess_run_wrapper( - cmd: str | list[str], - *, - check: bool, - check_output: bool, - timeout: Optional[float] = None, -) -> subprocess.CompletedProcess[bytes]: - """A wrapper for subprocess.run method. - - NOTE: this is for the requirement of customized subprocess call - in the future, like chroot or nsenter before execution. - - Args: - cmd (str | list[str]): command to be executed. - check (bool): if True, raise CalledProcessError on non 0 return code. - check_output (bool): if True, the UTF-8 decoded stdout will be returned. - timeout (Optional[float], optional): timeout for execution. Defaults to None. - - Returns: - subprocess.CompletedProcess[bytes]: the result of the execution. - """ - if isinstance(cmd, str): - cmd = shlex.split(cmd) - - return subprocess.run( - cmd, - check=check, - stderr=subprocess.PIPE, - stdout=subprocess.PIPE if check_output else None, - timeout=timeout, - ) - - def subprocess_check_output( cmd: str | list[str], *, diff --git a/src/otaclient_common/linux.py b/src/otaclient_common/linux.py index 4c94596d9..0cc34df61 100644 --- a/src/otaclient_common/linux.py +++ b/src/otaclient_common/linux.py @@ -15,8 +15,14 @@ from __future__ import annotations +import os +import shlex +import subprocess from pathlib import Path from subprocess import check_call +from typing import Any, Callable, Optional + +from otaclient_common.typing import StrOrPath # # ------ swapfile handling ------ # @@ -150,3 +156,53 @@ def map_gid_by_grpnam(*, src_db: ParsedGroup, dst_db: ParsedGroup, gid: int) -> return dst_db._by_name[src_db._by_gid[gid]] except KeyError: raise ValueError(f"failed to find mapping for {gid}") + + +# +# ------ subprocess call ------ # +# + + +def subprocess_run_wrapper( + cmd: str | list[str], + *, + check: bool, + check_output: bool, + chroot: Optional[StrOrPath] = None, + env: Optional[dict[str, str]] = None, + timeout: Optional[float] = None, +) -> subprocess.CompletedProcess[bytes]: + """A wrapper for subprocess.run method. + + NOTE: this is for the requirement of customized subprocess call + in the future, like chroot or nsenter before execution. + + Args: + cmd (str | list[str]): command to be executed. + check (bool): if True, raise CalledProcessError on non 0 return code. + check_output (bool): if True, the UTF-8 decoded stdout will be returned. + timeout (Optional[float], optional): timeout for execution. Defaults to None. + + Returns: + subprocess.CompletedProcess[bytes]: the result of the execution. + """ + if isinstance(cmd, str): + cmd = shlex.split(cmd) + + preexec_fn: Optional[Callable[..., Any]] = None + if chroot: + + def _chroot(): + os.chroot(chroot) + + preexec_fn = _chroot + + return subprocess.run( + cmd, + check=check, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE if check_output else None, + timeout=timeout, + preexec_fn=preexec_fn, + env=env, + ) diff --git a/tests/test_otaclient/test_boot_control/test_rpi_boot.py b/tests/test_otaclient/test_boot_control/test_rpi_boot.py index f4b5b943e..b8e99d9dc 100644 --- a/tests/test_otaclient/test_boot_control/test_rpi_boot.py +++ b/tests/test_otaclient/test_boot_control/test_rpi_boot.py @@ -8,7 +8,8 @@ import pytest import pytest_mock -from otaclient.app.boot_control._rpi_boot import _FSTAB_TEMPLATE_STR +from otaclient.app.boot_control import _rpi_boot +from otaclient.app.boot_control._common import CMDHelperFuncs, SlotMountHelper from otaclient.app.boot_control.configs import rpi_boot_cfg from otaclient_api.v2 import types as api_types from tests.conftest import TestConfiguration as cfg @@ -16,68 +17,72 @@ logger = logging.getLogger(__name__) +# slot config +SLOT_A = "slot_a" +SLOT_B = "slot_b" +SLOT_A_DEV = "/dev/sda2" +SLOT_B_DEV = "/dev/sda3" +SEP_CHAR = "_" -class _RPIBootTestCfg: - # slot config - SLOT_A = rpi_boot_cfg.SLOT_A_FSLABEL - SLOT_B = rpi_boot_cfg.SLOT_B_FSLABEL - SLOT_A_DEV = "slot_a_dev" - SLOT_B_DEV = "slot_b_dev" - SEP_CHAR = "_" +# dummy boot files content +CONFIG_TXT_SLOT_A = "config_txt_slot_a" +CONFIG_TXT_SLOT_B = "config_txt_slot_b" +CMDLINE_TXT_SLOT_A = "cmdline_txt_slot_a" +CMDLINE_TXT_SLOT_B = "cmdline_txt_slot_b" - # dummy boot files content - CONFIG_TXT_SLOT_A = "config_txt_slot_a" - CONFIG_TXT_SLOT_B = "config_txt_slot_b" - CMDLINE_TXT_SLOT_A = "cmdline_txt_slot_a" - CMDLINE_TXT_SLOT_B = "cmdline_txt_slot_b" +# module path +RPI_BOOT_MODULE_PATH = "otaclient.app.boot_control._rpi_boot" +rpi_boot__RPIBootControl_MODULE = f"{RPI_BOOT_MODULE_PATH}._RPIBootControl" +rpi_boot_RPIBoot_CMDHelperFuncs_MODULE = f"{RPI_BOOT_MODULE_PATH}.CMDHelperFuncs" +boot_control_common_CMDHelperFuncs_MODULE = ( + f"{cfg.BOOT_CONTROL_COMMON_MODULE_PATH}.CMDHelperFuncs" +) - # module path - rpi_boot__RPIBootControl_MODULE = f"{cfg.RPI_BOOT_MODULE_PATH}._RPIBootControl" - rpi_boot_RPIBoot_CMDHelperFuncs_MODULE = ( - f"{cfg.RPI_BOOT_MODULE_PATH}.CMDHelperFuncs" - ) - boot_control_common_CMDHelperFuncs_MODULE = ( - f"{cfg.BOOT_CONTROL_COMMON_MODULE_PATH}.CMDHelperFuncs" - ) - - # image version - VERSION = "rpi_boot_test" +# image version +VERSION = "rpi_boot_test" class RPIBootABPartitionFSM: def __init__(self) -> None: - self._active_slot = _RPIBootTestCfg.SLOT_A - self._standby_slot = _RPIBootTestCfg.SLOT_B - self._active_slot_dev = _RPIBootTestCfg.SLOT_A_DEV - self._standby_slot_dev = _RPIBootTestCfg.SLOT_B_DEV + self.active_slot = SLOT_A + self.standby_slot = SLOT_B + self.active_slot_dev = SLOT_A_DEV + self.standby_slot_dev = SLOT_B_DEV + self.parent_dev = "/dev/sda" self.is_switched_boot = False def reboot_tryboot(self): - logger.info(f"tryboot to {self._standby_slot=}") + logger.info(f"tryboot to {self.standby_slot=}") self.is_switched_boot = True - self._active_slot, self._standby_slot = self._standby_slot, self._active_slot - self._active_slot_dev, self._standby_slot_dev = ( - self._standby_slot_dev, - self._active_slot_dev, + self.active_slot, self.standby_slot = self.standby_slot, self.active_slot + self.active_slot_dev, self.standby_slot_dev = ( + self.standby_slot_dev, + self.active_slot_dev, ) - def get_active_slot(self) -> str: - return self._active_slot - - def get_standby_slot(self) -> str: - return self._standby_slot - - def get_standby_slot_dev(self) -> str: - return self._standby_slot_dev - - def get_active_slot_dev(self) -> str: - return self._active_slot_dev + def get_current_rootfs_dev(self): + return self.active_slot_dev + def get_parent_dev(self, *args, **kwags): + return self.parent_dev -class _RebootEXP(BaseException): - """NOTE: use BaseException to escape normal Exception catch.""" + def get_device_tree(self, parent_dev): + assert parent_dev == self.parent_dev + # NOTE: we allow extra partitions after partition ID 3 + return [ + self.parent_dev, + "/dev/sda1", + SLOT_A_DEV, + SLOT_B_DEV, + "/dev/sda5", + ] - ... + def get_attrs_by_dev(self, _, dev, *args, **kwargs): + if dev == self.active_slot_dev: + return self.active_slot + elif dev == self.standby_slot_dev: + return self.standby_slot + raise ValueError class TestRPIBootControl: @@ -87,6 +92,9 @@ class TestRPIBootControl: @pytest.fixture def rpi_boot_ab_slot(self, tmp_path: Path, ab_slots: SlotMeta): + self.model_file = tmp_path / "model" + self.model_file.write_text(rpi_boot_cfg.RPI_MODEL_HINT) + self.slot_a_mp = Path(ab_slots.slot_a) self.slot_b_mp = Path(ab_slots.slot_b) @@ -96,11 +104,11 @@ def rpi_boot_ab_slot(self, tmp_path: Path, ab_slots: SlotMeta): ).relative_to("/") self.slot_a_ota_status_dir.mkdir(parents=True, exist_ok=True) slot_a_ota_status = self.slot_a_ota_status_dir / "status" - slot_a_ota_status.write_text("SUCCESS") + slot_a_ota_status.write_text(api_types.StatusOta.SUCCESS.name) slot_a_version = self.slot_a_ota_status_dir / "version" slot_a_version.write_text(cfg.CURRENT_VERSION) slot_a_slot_in_use = self.slot_a_ota_status_dir / "slot_in_use" - slot_a_slot_in_use.write_text(rpi_boot_cfg.SLOT_A_FSLABEL) + slot_a_slot_in_use.write_text(SLOT_A) # setup ota dir for slot_a slot_a_ota_dir = self.slot_a_mp / "boot" / "ota" slot_a_ota_dir.mkdir(parents=True, exist_ok=True) @@ -117,110 +125,145 @@ def rpi_boot_ab_slot(self, tmp_path: Path, ab_slots: SlotMeta): self.system_boot = tmp_path / "system-boot" self.system_boot.mkdir(parents=True, exist_ok=True) # NOTE: primary config.txt is for slot_a at the beginning - (self.system_boot / f"{rpi_boot_cfg.CONFIG_TXT}").write_text( - _RPIBootTestCfg.CONFIG_TXT_SLOT_A - ) + (self.system_boot / f"{_rpi_boot.CONFIG_TXT}").write_text(CONFIG_TXT_SLOT_A) # NOTE: rpi_boot controller now doesn't check the content of boot files, but only ensure the existence - ( - self.system_boot - / f"{rpi_boot_cfg.CONFIG_TXT}{_RPIBootTestCfg.SEP_CHAR}{_RPIBootTestCfg.SLOT_A}" - ).write_text(_RPIBootTestCfg.CONFIG_TXT_SLOT_A) - ( - self.system_boot - / f"{rpi_boot_cfg.CONFIG_TXT}{_RPIBootTestCfg.SEP_CHAR}{_RPIBootTestCfg.SLOT_B}" - ).write_text(_RPIBootTestCfg.CONFIG_TXT_SLOT_B) - ( - self.system_boot - / f"{rpi_boot_cfg.CMDLINE_TXT}{_RPIBootTestCfg.SEP_CHAR}{_RPIBootTestCfg.SLOT_A}" - ).write_text(_RPIBootTestCfg.CMDLINE_TXT_SLOT_A) - ( - self.system_boot - / f"{rpi_boot_cfg.CMDLINE_TXT}{_RPIBootTestCfg.SEP_CHAR}{_RPIBootTestCfg.SLOT_B}" - ).write_text(_RPIBootTestCfg.CMDLINE_TXT_SLOT_B) - ( - self.system_boot - / f"{rpi_boot_cfg.VMLINUZ}{_RPIBootTestCfg.SEP_CHAR}{_RPIBootTestCfg.SLOT_A}" - ).write_text("slot_a_vmlinux") - ( - self.system_boot - / f"{rpi_boot_cfg.INITRD_IMG}{_RPIBootTestCfg.SEP_CHAR}{_RPIBootTestCfg.SLOT_A}" - ).write_text("slot_a_initrdimg") + (self.system_boot / f"{_rpi_boot.CONFIG_TXT}{SEP_CHAR}{SLOT_A}").write_text( + CONFIG_TXT_SLOT_A + ) + (self.system_boot / f"{_rpi_boot.CONFIG_TXT}{SEP_CHAR}{SLOT_B}").write_text( + CONFIG_TXT_SLOT_B + ) + (self.system_boot / f"{_rpi_boot.CMDLINE_TXT}{SEP_CHAR}{SLOT_A}").write_text( + CMDLINE_TXT_SLOT_A + ) + (self.system_boot / f"{_rpi_boot.CMDLINE_TXT}{SEP_CHAR}{SLOT_B}").write_text( + CMDLINE_TXT_SLOT_B + ) + (self.system_boot / f"{_rpi_boot.VMLINUZ}{SEP_CHAR}{SLOT_A}").write_text( + "slot_a_vmlinux" + ) + (self.system_boot / f"{_rpi_boot.INITRD_IMG}{SEP_CHAR}{SLOT_A}").write_text( + "slot_a_initrdimg" + ) self.vmlinuz_slot_b = ( - self.system_boot - / f"{rpi_boot_cfg.VMLINUZ}{_RPIBootTestCfg.SEP_CHAR}{_RPIBootTestCfg.SLOT_B}" + self.system_boot / f"{_rpi_boot.VMLINUZ}{SEP_CHAR}{SLOT_B}" ) self.initrd_img_slot_b = ( - self.system_boot - / f"{rpi_boot_cfg.INITRD_IMG}{_RPIBootTestCfg.SEP_CHAR}{_RPIBootTestCfg.SLOT_B}" + self.system_boot / f"{_rpi_boot.INITRD_IMG}{SEP_CHAR}{SLOT_B}" ) @pytest.fixture(autouse=True) def mock_setup(self, mocker: pytest_mock.MockerFixture, rpi_boot_ab_slot): - from otaclient.app.boot_control._common import CMDHelperFuncs - from otaclient.app.boot_control._rpi_boot import _RPIBootControl - # start the test FSM - self._fsm = RPIBootABPartitionFSM() - - # mocking _RPIBootControl - _RPIBootControl.standby_slot = mocker.PropertyMock(wraps=self._fsm.get_standby_slot) # type: ignore - _RPIBootControl.active_slot = mocker.PropertyMock(wraps=self._fsm.get_active_slot) # type: ignore - _RPIBootControl.active_slot_dev = mocker.PropertyMock(wraps=self._fsm.get_active_slot_dev) # type: ignore - _RPIBootControl.standby_slot_dev = mocker.PropertyMock(wraps=self._fsm.get_standby_slot_dev) # type: ignore - _RPIBootControl._init_slots_info = mocker.Mock() - _RPIBootControl.reboot_tryboot = mocker.Mock( - side_effect=self._fsm.reboot_tryboot - ) - _RPIBootControl._update_firmware = mocker.Mock() - - # patch boot_control module - self._mocked__rpiboot_control = _RPIBootControl - mocker.patch(_RPIBootTestCfg.rpi_boot__RPIBootControl_MODULE, _RPIBootControl) + self.fsm = fsm = RPIBootABPartitionFSM() - # patch CMDHelperFuncs + # ------ patch CMDHelperFuncs ------ # # NOTE: also remember to patch CMDHelperFuncs in common - self._CMDHelper_mock = typing.cast( + self.CMDHelper_mock = CMDHelper_mock = typing.cast( CMDHelperFuncs, mocker.MagicMock(spec=CMDHelperFuncs) ) - self._CMDHelper_mock.is_target_mounted = mocker.Mock(return_value=True) - # NOTE: rpi_boot only call CMDHelperFuncs.reboot once in finalize_switch_boot method - self._CMDHelper_mock.reboot = mocker.Mock(side_effect=_RebootEXP("reboot")) + # NOTE: this is for system-boot mount check in _RPIBootControl; + CMDHelper_mock.is_target_mounted = mocker.Mock(return_value=True) + CMDHelper_mock.get_current_rootfs_dev = mocker.Mock( + wraps=fsm.get_current_rootfs_dev + ) + CMDHelper_mock.get_parent_dev = mocker.Mock(wraps=fsm.get_parent_dev) + CMDHelper_mock.get_device_tree = mocker.Mock(wraps=fsm.get_device_tree) + CMDHelper_mock.get_attrs_by_dev = mocker.Mock(wraps=fsm.get_attrs_by_dev) + + mocker.patch(rpi_boot_RPIBoot_CMDHelperFuncs_MODULE, self.CMDHelper_mock) + mocker.patch( + boot_control_common_CMDHelperFuncs_MODULE, + self.CMDHelper_mock, + ) + + # ------ patch _RPIBootControl ------ # + def _update_firmware(*, target_slot, **kwargs): + """Move the kernel and initrd images into /boot/firmware folder. + + This is done by update_firmware method. This simulates the update_firmware to slot_b. + """ + assert target_slot == SLOT_B + + _vmlinuz = self.slot_a_mp / "boot" / _rpi_boot.VMLINUZ + shutil.copy( + os.path.realpath(_vmlinuz), + self.system_boot / f"{_rpi_boot.VMLINUZ}{SEP_CHAR}{SLOT_B}", + ) + _initrd_img = self.slot_a_mp / "boot" / _rpi_boot.INITRD_IMG + shutil.copy( + os.path.realpath(_initrd_img), + self.system_boot / f"{_rpi_boot.INITRD_IMG}{SEP_CHAR}{SLOT_B}", + ) + + self.update_firmware_mock = update_firmware_mock = mocker.MagicMock( + wraps=_update_firmware + ) + mocker.patch( + f"{rpi_boot__RPIBootControl_MODULE}.update_firmware", update_firmware_mock + ) + self.reboot_tryboot_mock = reboot_tryboot_mock = mocker.MagicMock( + side_effect=fsm.reboot_tryboot + ) mocker.patch( - _RPIBootTestCfg.rpi_boot_RPIBoot_CMDHelperFuncs_MODULE, self._CMDHelper_mock + f"{rpi_boot__RPIBootControl_MODULE}.reboot_tryboot", reboot_tryboot_mock + ) + + # ------ patch slot mount helper ------ # + self.mp_control_mock = mp_control_mock = typing.cast( + SlotMountHelper, mocker.MagicMock() ) + + def _get_active_slot_mount_point(*args, **kwargs): + if fsm.active_slot == SLOT_A: + return self.slot_a_mp + elif fsm.active_slot == SLOT_B: + return self.slot_b_mp + + def _get_standby_slot_mount_point(*args, **kwargs): + if fsm.standby_slot == SLOT_A: + return self.slot_a_mp + elif fsm.standby_slot == SLOT_B: + return self.slot_b_mp + + type(mp_control_mock).active_slot_mount_point = mocker.PropertyMock( # type: ignore + wraps=_get_active_slot_mount_point + ) + type(mp_control_mock).standby_slot_mount_point = mocker.PropertyMock( # type: ignore + wraps=_get_standby_slot_mount_point + ) + mocker.patch( - _RPIBootTestCfg.boot_control_common_CMDHelperFuncs_MODULE, - self._CMDHelper_mock, + f"{RPI_BOOT_MODULE_PATH}.SlotMountHelper", + mocker.MagicMock(return_value=mp_control_mock), ) def test_rpi_boot_normal_update(self, mocker: pytest_mock.MockerFixture): from otaclient.app.boot_control._rpi_boot import RPIBootController # ------ patch rpi_boot_cfg for boot_controller_inst1.stage 1~3 ------# - _rpi_boot_cfg_path = f"{cfg.BOOT_CONTROL_CONFIG_MODULE_PATH}.rpi_boot_cfg" + rpi_boot_cfg_path = f"{cfg.BOOT_CONTROL_CONFIG_MODULE_PATH}.rpi_boot_cfg" mocker.patch( - f"{_rpi_boot_cfg_path}.SYSTEM_BOOT_MOUNT_POINT", str(self.system_boot) + f"{rpi_boot_cfg_path}.SYSTEM_BOOT_MOUNT_POINT", str(self.system_boot) ) - mocker.patch(f"{_rpi_boot_cfg_path}.ACTIVE_ROOTFS_PATH", str(self.slot_a_mp)) - mocker.patch(f"{_rpi_boot_cfg_path}.MOUNT_POINT", str(self.slot_b_mp)) + mocker.patch(f"{rpi_boot_cfg_path}.ACTIVE_ROOTFS_PATH", str(self.slot_a_mp)) + mocker.patch(f"{rpi_boot_cfg_path}.MOUNT_POINT", str(self.slot_b_mp)) mocker.patch( - f"{_rpi_boot_cfg_path}.ACTIVE_ROOT_MOUNT_POINT", str(self.slot_a_mp) + f"{rpi_boot_cfg_path}.ACTIVE_ROOT_MOUNT_POINT", str(self.slot_a_mp) ) + mocker.patch(f"{rpi_boot_cfg_path}.RPI_MODEL_FILE", str(self.model_file)) # ------ boot_controller_inst1.stage1: init ------ # rpi_boot_controller1 = RPIBootController() # ------ boot_controller_inst1.stage2: pre_update ------ # - # --- execution --- # - self.version = _RPIBootTestCfg.VERSION rpi_boot_controller1.pre_update( - version=self.version, + version=VERSION, standby_as_ref=False, erase_standby=False, ) - # --- assertions --- # - # 1. make sure the ota-status is updated properly - # 2. make sure the mount points are prepared + + # --- assertion --- # assert ( self.slot_a_ota_status_dir / "status" ).read_text() == api_types.StatusOta.FAILURE.name @@ -230,14 +273,14 @@ def test_rpi_boot_normal_update(self, mocker: pytest_mock.MockerFixture): assert ( (self.slot_a_ota_status_dir / "slot_in_use").read_text() == (self.slot_b_ota_status_dir / "slot_in_use").read_text() - == _RPIBootTestCfg.SLOT_B - ) - self._CMDHelper_mock.mount_rw.assert_called_once_with( - target=self._fsm._standby_slot_dev, mount_point=self.slot_b_mp + == SLOT_B ) - self._CMDHelper_mock.mount_ro.assert_called_once_with( - target=self._fsm._active_slot_dev, mount_point=self.slot_a_mp + self.mp_control_mock.prepare_standby_dev.assert_called_once_with( + erase_standby=mocker.ANY, + fslabel=self.fsm.standby_slot, ) + self.mp_control_mock.mount_standby.assert_called_once() + self.mp_control_mock.mount_active.assert_called_once() # ------ mocked in_update ------ # # this should be done by create_standby module, so we do it manually here instead @@ -252,82 +295,40 @@ def test_rpi_boot_normal_update(self, mocker: pytest_mock.MockerFixture): shutil.copy(os.path.realpath(_initrd_img), self.slot_b_boot_dir) # ------ boot_controller_inst1.stage3: post_update, reboot switch boot ------ # - # --- execution --- # _post_updater = rpi_boot_controller1.post_update() next(_post_updater) next(_post_updater, None) # actual reboot here - # --- assertions: --- # - # 1. make sure that retry boot is called - # 2. make sure that fstab file is updated for slot_b - # 3. assert kernel and initrd.img are copied to system-boot - # 4. make sure tryboot.txt is presented and correct - # 5. make sure config.txt is untouched - self._mocked__rpiboot_control.reboot_tryboot.assert_called_once() - assert self._fsm.is_switched_boot + + # --- assertion --- # + self.reboot_tryboot_mock.assert_called_once() + self.update_firmware_mock.assert_called_once() + assert self.fsm.is_switched_boot assert ( self.slot_b_mp / Path(rpi_boot_cfg.FSTAB_FPATH).relative_to("/") - ).read_text() == Template(_FSTAB_TEMPLATE_STR).substitute( - rootfs_fslabel=_RPIBootTestCfg.SLOT_B + ).read_text() == Template(_rpi_boot._FSTAB_TEMPLATE_STR).substitute( + rootfs_fslabel=SLOT_B ) assert self.initrd_img_slot_b.is_file() assert self.vmlinuz_slot_b.is_file() - assert ( - self.system_boot / "tryboot.txt" - ).read_text() == _RPIBootTestCfg.CONFIG_TXT_SLOT_B - assert ( - self.system_boot / "config.txt" - ).read_text() == _RPIBootTestCfg.CONFIG_TXT_SLOT_A + assert (self.system_boot / "tryboot.txt").read_text() == CONFIG_TXT_SLOT_B + assert (self.system_boot / "config.txt").read_text() == CONFIG_TXT_SLOT_A + # NOTE: backward compatibility with old double stage update strategy + assert not (self.system_boot / rpi_boot_cfg.SWITCH_BOOT_FLAG_FILE).is_file() # ------ boot_controller_inst2: first reboot ------ # - # patch rpi_boot_cfg for boot_controller_inst2 - _rpi_boot_cfg_path = f"{cfg.BOOT_CONTROL_CONFIG_MODULE_PATH}.rpi_boot_cfg" - mocker.patch(f"{_rpi_boot_cfg_path}.ACTIVE_ROOTFS_PATH", str(self.slot_b_mp)) - mocker.patch(f"{_rpi_boot_cfg_path}.MOUNT_POINT", str(self.slot_a_mp)) + mocker.patch(f"{rpi_boot_cfg_path}.ACTIVE_ROOTFS_PATH", str(self.slot_b_mp)) + mocker.patch(f"{rpi_boot_cfg_path}.MOUNT_POINT", str(self.slot_a_mp)) - # ------ boot_controller_inst2.stage1: first reboot finalizing switch boot and update firmware ------ # + # ------ boot_controller_inst2.stage1: finalize switchboot ------ # logger.info("1st reboot: finalize switch boot and update firmware....") - # --- execution --- # - # NOTE: raise a _RebootEXP to simulate reboot and interrupt the otaclient - with pytest.raises(_RebootEXP): - RPIBootController() # NOTE: init only - # --- assertions: --- # - # 1. assert that otaclient reboots the device - # 2. assert firmware update is called - # 3. assert reboot is called - # 4. assert switch boot finalized - # 5. assert slot_in_use is slot_b - # 6. make sure the SWITCH_BOOT_FLAG_FILE file is created - # 7. make sure ota_status is still UPDATING - self._mocked__rpiboot_control._update_firmware.assert_called_once() - self._CMDHelper_mock.reboot.assert_called_once() - assert ( - self.system_boot / "config.txt" - ).read_text() == _RPIBootTestCfg.CONFIG_TXT_SLOT_B + RPIBootController() + + # --- assertion --- # + assert (self.system_boot / "config.txt").read_text() == CONFIG_TXT_SLOT_B assert ( self.slot_b_ota_status_dir / rpi_boot_cfg.SLOT_IN_USE_FNAME ).read_text() == "slot_b" - assert (self.system_boot / rpi_boot_cfg.SWITCH_BOOT_FLAG_FILE).is_file() - assert ( - self.slot_b_ota_status_dir / rpi_boot_cfg.OTA_STATUS_FNAME - ).read_text() == api_types.StatusOta.UPDATING.name - - # ------ boot_controller_inst3.stage1: second reboot, apply updated firmware and finish up ota update ------ # - logger.info("2nd reboot: finish up ota update....") - # --- execution --- # - rpi_boot_controller4_2 = RPIBootController() - # --- assertions: --- # - # 1. make sure ota_status is SUCCESS - # 2. make sure the flag file is cleared - # 3. make sure the config.txt is still for slot_b - assert ( - rpi_boot_controller4_2.get_booted_ota_status() - == api_types.StatusOta.SUCCESS - ) + assert not (self.system_boot / rpi_boot_cfg.SWITCH_BOOT_FLAG_FILE).is_file() assert ( self.slot_b_ota_status_dir / rpi_boot_cfg.OTA_STATUS_FNAME ).read_text() == api_types.StatusOta.SUCCESS.name - assert not (self.system_boot / rpi_boot_cfg.SWITCH_BOOT_FLAG_FILE).is_file() - assert ( - rpi_boot_controller4_2._ota_status_control._load_current_slot_in_use() - == "slot_b" - )